add volume tailer

This commit is contained in:
Chris Lu 2019-04-18 11:05:02 -07:00
parent b142f9f1d5
commit 3dce1016cb
4 changed files with 228 additions and 148 deletions

View file

@ -0,0 +1,74 @@
package main
import (
"context"
"flag"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/security"
weed_server "github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/spf13/viper"
"io"
"log"
)
var (
master = flag.String("master", "localhost:9333", "master server host and port")
volumeId = flag.Int("volumeId", -1, "a volume id")
timeoutSeconds = flag.Int("timeoutSeconds", 0, "disconnect if no activity after these seconds")
)
func main() {
flag.Parse()
weed_server.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client")
vid := storage.VolumeId(*volumeId)
// find volume location, replication, ttl info
lookup, err := operation.Lookup(*master, vid.String())
if err != nil {
log.Printf("Error looking up volume %d: %v", vid, err)
return
}
volumeServer := lookup.Locations[0].Url
log.Printf("volume %d is on volume server %s", vid, volumeServer)
err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{
VolumeId: uint32(vid),
SinceNs: 0,
DrainingSeconds: uint32(*timeoutSeconds),
})
if err != nil {
return err
}
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return recvErr
}
}
println("header:", len(resp.NeedleHeader), "body:", len(resp.NeedleBody))
}
return nil
})
if err != nil {
log.Printf("Error VolumeTail volume %d: %v", vid, err)
}
}

View file

@ -42,7 +42,7 @@ service VolumeServer {
rpc CopyFile (CopyFileRequest) returns (stream CopyFileResponse) {
}
rpc VolumeStreamFollow (VolumeStreamFollowRequest) returns (stream VolumeStreamFollowResponse) {
rpc VolumeTail (VolumeTailRequest) returns (stream VolumeTailResponse) {
}
}
@ -166,12 +166,12 @@ message CopyFileResponse {
bytes file_content = 1;
}
message VolumeStreamFollowRequest {
message VolumeTailRequest {
uint32 volume_id = 1;
uint64 since_ns = 2;
uint32 drainingSeconds = 3;
}
message VolumeStreamFollowResponse {
message VolumeTailResponse {
bytes needle_header = 1;
bytes needle_body = 2;
}

View file

@ -39,8 +39,8 @@ It has these top-level messages:
VolumeCopyResponse
CopyFileRequest
CopyFileResponse
VolumeStreamFollowRequest
VolumeStreamFollowResponse
VolumeTailRequest
VolumeTailResponse
ReadVolumeFileStatusRequest
ReadVolumeFileStatusResponse
DiskStatus
@ -636,56 +636,56 @@ func (m *CopyFileResponse) GetFileContent() []byte {
return nil
}
type VolumeStreamFollowRequest struct {
type VolumeTailRequest struct {
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
SinceNs uint64 `protobuf:"varint,2,opt,name=since_ns,json=sinceNs" json:"since_ns,omitempty"`
DrainingSeconds uint32 `protobuf:"varint,3,opt,name=drainingSeconds" json:"drainingSeconds,omitempty"`
}
func (m *VolumeStreamFollowRequest) Reset() { *m = VolumeStreamFollowRequest{} }
func (m *VolumeStreamFollowRequest) String() string { return proto.CompactTextString(m) }
func (*VolumeStreamFollowRequest) ProtoMessage() {}
func (*VolumeStreamFollowRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{30} }
func (m *VolumeTailRequest) Reset() { *m = VolumeTailRequest{} }
func (m *VolumeTailRequest) String() string { return proto.CompactTextString(m) }
func (*VolumeTailRequest) ProtoMessage() {}
func (*VolumeTailRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{30} }
func (m *VolumeStreamFollowRequest) GetVolumeId() uint32 {
func (m *VolumeTailRequest) GetVolumeId() uint32 {
if m != nil {
return m.VolumeId
}
return 0
}
func (m *VolumeStreamFollowRequest) GetSinceNs() uint64 {
func (m *VolumeTailRequest) GetSinceNs() uint64 {
if m != nil {
return m.SinceNs
}
return 0
}
func (m *VolumeStreamFollowRequest) GetDrainingSeconds() uint32 {
func (m *VolumeTailRequest) GetDrainingSeconds() uint32 {
if m != nil {
return m.DrainingSeconds
}
return 0
}
type VolumeStreamFollowResponse struct {
type VolumeTailResponse struct {
NeedleHeader []byte `protobuf:"bytes,1,opt,name=needle_header,json=needleHeader,proto3" json:"needle_header,omitempty"`
NeedleBody []byte `protobuf:"bytes,2,opt,name=needle_body,json=needleBody,proto3" json:"needle_body,omitempty"`
}
func (m *VolumeStreamFollowResponse) Reset() { *m = VolumeStreamFollowResponse{} }
func (m *VolumeStreamFollowResponse) String() string { return proto.CompactTextString(m) }
func (*VolumeStreamFollowResponse) ProtoMessage() {}
func (*VolumeStreamFollowResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{31} }
func (m *VolumeTailResponse) Reset() { *m = VolumeTailResponse{} }
func (m *VolumeTailResponse) String() string { return proto.CompactTextString(m) }
func (*VolumeTailResponse) ProtoMessage() {}
func (*VolumeTailResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{31} }
func (m *VolumeStreamFollowResponse) GetNeedleHeader() []byte {
func (m *VolumeTailResponse) GetNeedleHeader() []byte {
if m != nil {
return m.NeedleHeader
}
return nil
}
func (m *VolumeStreamFollowResponse) GetNeedleBody() []byte {
func (m *VolumeTailResponse) GetNeedleBody() []byte {
if m != nil {
return m.NeedleBody
}
@ -899,8 +899,8 @@ func init() {
proto.RegisterType((*VolumeCopyResponse)(nil), "volume_server_pb.VolumeCopyResponse")
proto.RegisterType((*CopyFileRequest)(nil), "volume_server_pb.CopyFileRequest")
proto.RegisterType((*CopyFileResponse)(nil), "volume_server_pb.CopyFileResponse")
proto.RegisterType((*VolumeStreamFollowRequest)(nil), "volume_server_pb.VolumeStreamFollowRequest")
proto.RegisterType((*VolumeStreamFollowResponse)(nil), "volume_server_pb.VolumeStreamFollowResponse")
proto.RegisterType((*VolumeTailRequest)(nil), "volume_server_pb.VolumeTailRequest")
proto.RegisterType((*VolumeTailResponse)(nil), "volume_server_pb.VolumeTailResponse")
proto.RegisterType((*ReadVolumeFileStatusRequest)(nil), "volume_server_pb.ReadVolumeFileStatusRequest")
proto.RegisterType((*ReadVolumeFileStatusResponse)(nil), "volume_server_pb.ReadVolumeFileStatusResponse")
proto.RegisterType((*DiskStatus)(nil), "volume_server_pb.DiskStatus")
@ -935,7 +935,7 @@ type VolumeServerClient interface {
VolumeCopy(ctx context.Context, in *VolumeCopyRequest, opts ...grpc.CallOption) (*VolumeCopyResponse, error)
ReadVolumeFileStatus(ctx context.Context, in *ReadVolumeFileStatusRequest, opts ...grpc.CallOption) (*ReadVolumeFileStatusResponse, error)
CopyFile(ctx context.Context, in *CopyFileRequest, opts ...grpc.CallOption) (VolumeServer_CopyFileClient, error)
VolumeStreamFollow(ctx context.Context, in *VolumeStreamFollowRequest, opts ...grpc.CallOption) (VolumeServer_VolumeStreamFollowClient, error)
VolumeTail(ctx context.Context, in *VolumeTailRequest, opts ...grpc.CallOption) (VolumeServer_VolumeTailClient, error)
}
type volumeServerClient struct {
@ -1127,12 +1127,12 @@ func (x *volumeServerCopyFileClient) Recv() (*CopyFileResponse, error) {
return m, nil
}
func (c *volumeServerClient) VolumeStreamFollow(ctx context.Context, in *VolumeStreamFollowRequest, opts ...grpc.CallOption) (VolumeServer_VolumeStreamFollowClient, error) {
stream, err := grpc.NewClientStream(ctx, &_VolumeServer_serviceDesc.Streams[2], c.cc, "/volume_server_pb.VolumeServer/VolumeStreamFollow", opts...)
func (c *volumeServerClient) VolumeTail(ctx context.Context, in *VolumeTailRequest, opts ...grpc.CallOption) (VolumeServer_VolumeTailClient, error) {
stream, err := grpc.NewClientStream(ctx, &_VolumeServer_serviceDesc.Streams[2], c.cc, "/volume_server_pb.VolumeServer/VolumeTail", opts...)
if err != nil {
return nil, err
}
x := &volumeServerVolumeStreamFollowClient{stream}
x := &volumeServerVolumeTailClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
@ -1142,17 +1142,17 @@ func (c *volumeServerClient) VolumeStreamFollow(ctx context.Context, in *VolumeS
return x, nil
}
type VolumeServer_VolumeStreamFollowClient interface {
Recv() (*VolumeStreamFollowResponse, error)
type VolumeServer_VolumeTailClient interface {
Recv() (*VolumeTailResponse, error)
grpc.ClientStream
}
type volumeServerVolumeStreamFollowClient struct {
type volumeServerVolumeTailClient struct {
grpc.ClientStream
}
func (x *volumeServerVolumeStreamFollowClient) Recv() (*VolumeStreamFollowResponse, error) {
m := new(VolumeStreamFollowResponse)
func (x *volumeServerVolumeTailClient) Recv() (*VolumeTailResponse, error) {
m := new(VolumeTailResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
@ -1179,7 +1179,7 @@ type VolumeServerServer interface {
VolumeCopy(context.Context, *VolumeCopyRequest) (*VolumeCopyResponse, error)
ReadVolumeFileStatus(context.Context, *ReadVolumeFileStatusRequest) (*ReadVolumeFileStatusResponse, error)
CopyFile(*CopyFileRequest, VolumeServer_CopyFileServer) error
VolumeStreamFollow(*VolumeStreamFollowRequest, VolumeServer_VolumeStreamFollowServer) error
VolumeTail(*VolumeTailRequest, VolumeServer_VolumeTailServer) error
}
func RegisterVolumeServerServer(s *grpc.Server, srv VolumeServerServer) {
@ -1462,24 +1462,24 @@ func (x *volumeServerCopyFileServer) Send(m *CopyFileResponse) error {
return x.ServerStream.SendMsg(m)
}
func _VolumeServer_VolumeStreamFollow_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(VolumeStreamFollowRequest)
func _VolumeServer_VolumeTail_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(VolumeTailRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(VolumeServerServer).VolumeStreamFollow(m, &volumeServerVolumeStreamFollowServer{stream})
return srv.(VolumeServerServer).VolumeTail(m, &volumeServerVolumeTailServer{stream})
}
type VolumeServer_VolumeStreamFollowServer interface {
Send(*VolumeStreamFollowResponse) error
type VolumeServer_VolumeTailServer interface {
Send(*VolumeTailResponse) error
grpc.ServerStream
}
type volumeServerVolumeStreamFollowServer struct {
type volumeServerVolumeTailServer struct {
grpc.ServerStream
}
func (x *volumeServerVolumeStreamFollowServer) Send(m *VolumeStreamFollowResponse) error {
func (x *volumeServerVolumeTailServer) Send(m *VolumeTailResponse) error {
return x.ServerStream.SendMsg(m)
}
@ -1552,8 +1552,8 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{
ServerStreams: true,
},
{
StreamName: "VolumeStreamFollow",
Handler: _VolumeServer_VolumeStreamFollow_Handler,
StreamName: "VolumeTail",
Handler: _VolumeServer_VolumeTail_Handler,
ServerStreams: true,
},
},
@ -1563,86 +1563,86 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("volume_server.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 1291 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x58, 0x5b, 0x6f, 0xdc, 0x44,
0x14, 0x8e, 0xbb, 0x9b, 0xdb, 0xd9, 0xdd, 0x76, 0x3b, 0x49, 0x9b, 0x8d, 0x7b, 0x61, 0x3b, 0x2d,
0xed, 0xf6, 0x42, 0x0a, 0xad, 0x80, 0xc2, 0x13, 0x34, 0xa1, 0x22, 0x0f, 0x6d, 0x25, 0x87, 0x56,
0x20, 0x90, 0xac, 0x59, 0x7b, 0x92, 0x58, 0xb1, 0x3d, 0x5b, 0xcf, 0x38, 0x6d, 0x10, 0xfc, 0x1a,
0xde, 0x78, 0xe0, 0x1f, 0xf0, 0xcb, 0x90, 0x10, 0xf2, 0xcc, 0xd8, 0xf1, 0x35, 0xeb, 0xd2, 0xb7,
0xf1, 0xe7, 0x73, 0xbe, 0x73, 0xce, 0xf8, 0xcc, 0x99, 0x6f, 0x17, 0xd6, 0x8e, 0x99, 0x1f, 0x07,
0xd4, 0xe6, 0x34, 0x3a, 0xa6, 0xd1, 0xd6, 0x2c, 0x62, 0x82, 0xa1, 0x61, 0x01, 0xb4, 0x67, 0x53,
0xfc, 0x10, 0xd0, 0x53, 0x22, 0x9c, 0xc3, 0x1d, 0xea, 0x53, 0x41, 0x2d, 0xfa, 0x26, 0xa6, 0x5c,
0xa0, 0x4d, 0x58, 0xd9, 0xf7, 0x7c, 0x6a, 0x7b, 0x2e, 0x1f, 0x19, 0xe3, 0xce, 0x64, 0xd5, 0x5a,
0x4e, 0x9e, 0x77, 0x5d, 0x8e, 0x5f, 0xc2, 0x5a, 0xc1, 0x81, 0xcf, 0x58, 0xc8, 0x29, 0x7a, 0x02,
0xcb, 0x11, 0xe5, 0xb1, 0x2f, 0x94, 0x43, 0xef, 0xd1, 0xf5, 0xad, 0x72, 0xac, 0xad, 0xcc, 0x25,
0xf6, 0x85, 0x95, 0x9a, 0x63, 0x0f, 0xfa, 0xf9, 0x17, 0x68, 0x03, 0x96, 0x75, 0xec, 0x91, 0x31,
0x36, 0x26, 0xab, 0xd6, 0x92, 0x0a, 0x8d, 0x2e, 0xc3, 0x12, 0x17, 0x44, 0xc4, 0x7c, 0x74, 0x6e,
0x6c, 0x4c, 0x16, 0x2d, 0xfd, 0x84, 0xd6, 0x61, 0x91, 0x46, 0x11, 0x8b, 0x46, 0x1d, 0x69, 0xae,
0x1e, 0x10, 0x82, 0x2e, 0xf7, 0x7e, 0xa5, 0xa3, 0xee, 0xd8, 0x98, 0x0c, 0x2c, 0xb9, 0xc6, 0xcb,
0xb0, 0xf8, 0x5d, 0x30, 0x13, 0x27, 0xf8, 0x4b, 0x18, 0xbd, 0x26, 0x4e, 0x1c, 0x07, 0xaf, 0x65,
0x8e, 0xdb, 0x87, 0xd4, 0x39, 0x4a, 0x6b, 0xbf, 0x02, 0xab, 0x3a, 0x73, 0x9d, 0xc1, 0xc0, 0x5a,
0x51, 0xc0, 0xae, 0x8b, 0xbf, 0x81, 0xcd, 0x1a, 0x47, 0xbd, 0x07, 0x37, 0x61, 0x70, 0x40, 0xa2,
0x29, 0x39, 0xa0, 0x76, 0x44, 0x84, 0xc7, 0xa4, 0xb7, 0x61, 0xf5, 0x35, 0x68, 0x25, 0x18, 0xfe,
0x19, 0xcc, 0x02, 0x03, 0x0b, 0x66, 0xc4, 0x11, 0x6d, 0x82, 0xa3, 0x31, 0xf4, 0x66, 0x11, 0x25,
0xbe, 0xcf, 0x1c, 0x22, 0xa8, 0xdc, 0x85, 0x8e, 0x95, 0x87, 0xf0, 0x35, 0xb8, 0x52, 0x4b, 0xae,
0x12, 0xc4, 0x4f, 0x4a, 0xd9, 0xb3, 0x20, 0xf0, 0x5a, 0x85, 0xc6, 0x57, 0x2b, 0x59, 0x4b, 0x4f,
0xcd, 0xfb, 0x55, 0xe9, 0xad, 0x4f, 0x49, 0x18, 0xcf, 0x5a, 0x11, 0x97, 0x33, 0x4e, 0x5d, 0x33,
0xe6, 0x0d, 0xd5, 0x1c, 0xdb, 0xcc, 0xf7, 0xa9, 0x23, 0x3c, 0x16, 0xa6, 0xb4, 0xd7, 0x01, 0x9c,
0x0c, 0xd4, 0xad, 0x92, 0x43, 0xb0, 0x09, 0xa3, 0xaa, 0xab, 0xa6, 0xfd, 0xd3, 0x80, 0x4b, 0xdf,
0xea, 0x4d, 0x53, 0x81, 0x5b, 0x7d, 0x80, 0x62, 0xc8, 0x73, 0xe5, 0x90, 0xe5, 0x0f, 0xd4, 0xa9,
0x7c, 0xa0, 0xc4, 0x22, 0xa2, 0x33, 0xdf, 0x73, 0x88, 0xa4, 0xe8, 0x4a, 0x8a, 0x3c, 0x84, 0x86,
0xd0, 0x11, 0xc2, 0x1f, 0x2d, 0xca, 0x37, 0xc9, 0x12, 0x8f, 0xe0, 0x72, 0x39, 0x57, 0x5d, 0xc6,
0x17, 0xb0, 0xa1, 0x90, 0xbd, 0x93, 0xd0, 0xd9, 0x93, 0xa7, 0xa1, 0xd5, 0xa6, 0xff, 0x63, 0xc0,
0xa8, 0xea, 0xa8, 0xbb, 0xf8, 0x43, 0x77, 0xe0, 0x7d, 0xeb, 0x43, 0x1f, 0x41, 0x4f, 0x10, 0xcf,
0xb7, 0xd9, 0xfe, 0x3e, 0xa7, 0x62, 0xb4, 0x34, 0x36, 0x26, 0x5d, 0x0b, 0x12, 0xe8, 0xa5, 0x44,
0xd0, 0x5d, 0x18, 0x3a, 0xaa, 0x93, 0xed, 0x88, 0x1e, 0x7b, 0x3c, 0x61, 0x5e, 0x96, 0x89, 0x5d,
0x70, 0xd2, 0x0e, 0x57, 0x30, 0xc2, 0x30, 0xf0, 0xdc, 0x77, 0xb6, 0x1c, 0x20, 0xf2, 0xf8, 0xaf,
0x48, 0xb6, 0x9e, 0xe7, 0xbe, 0x7b, 0xe6, 0xf9, 0x74, 0x2f, 0x99, 0x02, 0xaf, 0xe1, 0xaa, 0x2a,
0x7e, 0x37, 0x74, 0x22, 0x1a, 0xd0, 0x50, 0x10, 0x7f, 0x9b, 0xcd, 0x4e, 0x5a, 0xb5, 0xc0, 0x26,
0xac, 0x70, 0x2f, 0x74, 0xa8, 0x1d, 0xaa, 0x31, 0xd4, 0xb5, 0x96, 0xe5, 0xf3, 0x0b, 0x8e, 0x9f,
0xc2, 0xb5, 0x06, 0x5e, 0xbd, 0xb3, 0x37, 0xa0, 0x2f, 0x13, 0x73, 0x58, 0x28, 0x68, 0x28, 0x24,
0x77, 0xdf, 0xea, 0x25, 0xd8, 0xb6, 0x82, 0xf0, 0x67, 0x80, 0x14, 0xc7, 0x73, 0x16, 0x87, 0xed,
0x8e, 0xe6, 0x25, 0x58, 0x2b, 0xb8, 0xe8, 0xde, 0x78, 0x0c, 0xeb, 0x0a, 0x7e, 0x15, 0x06, 0xad,
0xb9, 0x36, 0xe0, 0x52, 0xc9, 0x49, 0xb3, 0x3d, 0x4a, 0x83, 0x14, 0xef, 0x89, 0x33, 0xc9, 0x2e,
0xa7, 0x19, 0x14, 0xaf, 0x0a, 0xfc, 0x97, 0x01, 0x17, 0xd3, 0x31, 0xd2, 0x72, 0xd7, 0xdf, 0xb3,
0xed, 0x3a, 0x8d, 0x6d, 0xd7, 0x3d, 0x6d, 0xbb, 0x09, 0x0c, 0x39, 0x8b, 0x23, 0x87, 0xda, 0x2e,
0x11, 0xc4, 0x0e, 0x99, 0x4b, 0x75, 0x57, 0x9e, 0x57, 0xf8, 0x0e, 0x11, 0xe4, 0x05, 0x73, 0x29,
0x5e, 0x4f, 0x3f, 0x4a, 0xfe, 0x6b, 0xe2, 0x10, 0x2e, 0x24, 0xcf, 0x49, 0x5b, 0xb5, 0xac, 0xa1,
0xe7, 0x71, 0x3b, 0xed, 0x4e, 0x59, 0xc4, 0x8a, 0xb5, 0xea, 0xf1, 0x5d, 0xd5, 0x9a, 0xfa, 0xbd,
0x4b, 0x84, 0x7a, 0xdf, 0x49, 0xdf, 0xef, 0x10, 0x91, 0xbc, 0xc7, 0x9f, 0xc3, 0xf0, 0x34, 0x5e,
0xfb, 0x8e, 0xfa, 0x1d, 0x36, 0xf5, 0x51, 0x17, 0x11, 0x25, 0xc1, 0x33, 0xe6, 0xfb, 0xec, 0xed,
0x07, 0xb6, 0x3a, 0x9a, 0xc0, 0x05, 0x37, 0x22, 0x5e, 0xe8, 0x85, 0x07, 0x7b, 0xd4, 0x61, 0xa1,
0xcb, 0x65, 0xbe, 0x03, 0xab, 0x0c, 0xe3, 0x29, 0x98, 0x75, 0xe1, 0x4f, 0x6f, 0xcc, 0x90, 0x52,
0xd7, 0xa7, 0xf6, 0x21, 0x25, 0x2e, 0x8d, 0x74, 0x01, 0x7d, 0x05, 0x7e, 0x2f, 0xb1, 0x64, 0x3e,
0x68, 0xa3, 0x29, 0x73, 0x4f, 0x64, 0x2a, 0x7d, 0x0b, 0x14, 0xf4, 0x94, 0xb9, 0x27, 0xf8, 0x6b,
0xb8, 0x62, 0x51, 0xe2, 0xaa, 0x38, 0xf2, 0x98, 0xb7, 0x1f, 0x85, 0xff, 0x1a, 0x70, 0xb5, 0xde,
0xb9, 0xcd, 0x38, 0x7c, 0x00, 0x28, 0x1b, 0x37, 0xc2, 0x0b, 0x28, 0x17, 0x24, 0x98, 0xe9, 0xcd,
0x1a, 0xea, 0x99, 0xf3, 0x43, 0x8a, 0x57, 0x87, 0x53, 0xa7, 0x32, 0x9c, 0x12, 0xc6, 0xb4, 0x05,
0x72, 0x8c, 0x5d, 0xc5, 0xe8, 0xaa, 0x56, 0x28, 0x30, 0x66, 0xd6, 0x92, 0x71, 0x51, 0x31, 0x6a,
0x43, 0xc9, 0x78, 0x0d, 0x40, 0xf7, 0x48, 0x1c, 0xa6, 0xd3, 0x75, 0x55, 0x75, 0x48, 0x1c, 0x0a,
0xfc, 0x23, 0xc0, 0x8e, 0xc7, 0x8f, 0x54, 0xd5, 0xc9, 0x31, 0x71, 0xbd, 0x48, 0xdf, 0xa6, 0xc9,
0x32, 0x41, 0x88, 0xef, 0xeb, 0x9a, 0x92, 0x65, 0xa2, 0xac, 0x62, 0x4e, 0x5d, 0x9d, 0xbd, 0x5c,
0x27, 0xd8, 0x7e, 0x44, 0xa9, 0x4e, 0x54, 0xae, 0xf1, 0x1f, 0x06, 0xac, 0x3e, 0xa7, 0x81, 0x66,
0xbe, 0x0e, 0x70, 0xc0, 0x22, 0x16, 0x0b, 0x2f, 0xa4, 0x5c, 0x06, 0x58, 0xb4, 0x72, 0xc8, 0xff,
0x8f, 0x23, 0x95, 0x1e, 0xf5, 0xf7, 0x75, 0xed, 0x72, 0x9d, 0x60, 0x87, 0x94, 0xcc, 0x74, 0xb9,
0x72, 0x9d, 0xe8, 0x44, 0x2e, 0x88, 0x73, 0x24, 0xef, 0x8e, 0xae, 0xa5, 0x1e, 0x1e, 0xfd, 0x3d,
0x80, 0xbe, 0xee, 0x50, 0x29, 0x54, 0xd1, 0x2f, 0xd0, 0xcb, 0x09, 0x5c, 0x74, 0xab, 0xaa, 0x63,
0xab, 0x82, 0xd9, 0xfc, 0x78, 0x8e, 0x95, 0x9e, 0x19, 0x0b, 0x28, 0x84, 0x8b, 0x15, 0x01, 0x89,
0xee, 0x55, 0xbd, 0x9b, 0xe4, 0xa9, 0x79, 0xbf, 0x95, 0x6d, 0x16, 0x4f, 0xc0, 0x5a, 0x8d, 0x22,
0x44, 0x0f, 0xe6, 0xb0, 0x14, 0x54, 0xa9, 0xf9, 0x49, 0x4b, 0xeb, 0x2c, 0xea, 0x1b, 0x40, 0x55,
0xb9, 0x88, 0xee, 0xcf, 0xa5, 0x39, 0x95, 0xa3, 0xe6, 0x83, 0x76, 0xc6, 0x8d, 0x85, 0x2a, 0x21,
0x39, 0xb7, 0xd0, 0x82, 0x54, 0x9d, 0x5b, 0x68, 0x49, 0x9d, 0x2e, 0xa0, 0x23, 0x18, 0x96, 0x45,
0x26, 0xba, 0xdb, 0xf4, 0xcb, 0xa7, 0xa2, 0x61, 0xcd, 0x7b, 0x6d, 0x4c, 0xb3, 0x60, 0x14, 0xce,
0x17, 0x85, 0x20, 0xba, 0x53, 0xf5, 0xaf, 0x95, 0xb5, 0xe6, 0x64, 0xbe, 0x61, 0xbe, 0xa6, 0xb2,
0x38, 0xac, 0xab, 0xa9, 0x41, 0x79, 0xd6, 0xd5, 0xd4, 0xa4, 0x35, 0xf1, 0x02, 0xfa, 0x2d, 0x55,
0x1c, 0x25, 0xd1, 0x84, 0xb6, 0x9a, 0x68, 0xea, 0x55, 0x9b, 0xf9, 0xb0, 0xb5, 0x7d, 0x1a, 0xfb,
0x53, 0x23, 0x39, 0xeb, 0x39, 0xed, 0x54, 0x77, 0xd6, 0xab, 0x6a, 0xac, 0xee, 0xac, 0xd7, 0x09,
0xb0, 0x05, 0x34, 0x85, 0x41, 0x41, 0x4d, 0xa1, 0xdb, 0x4d, 0x9e, 0x45, 0x8d, 0x66, 0xde, 0x99,
0x6b, 0x97, 0xc5, 0xb0, 0xd3, 0xe9, 0xa5, 0xc7, 0x55, 0x63, 0x72, 0xc5, 0x79, 0x75, 0x7b, 0x9e,
0x59, 0x16, 0xe0, 0x27, 0x80, 0x53, 0xf1, 0x83, 0x6e, 0x36, 0xf9, 0xe5, 0x3f, 0xc5, 0xad, 0xb3,
0x8d, 0x32, 0xea, 0xb7, 0xb0, 0x5e, 0x77, 0xf5, 0xa2, 0x9a, 0x53, 0x78, 0xc6, 0xfd, 0x6e, 0x6e,
0xb5, 0x35, 0xcf, 0x02, 0xbf, 0x82, 0x95, 0x54, 0x4a, 0xa1, 0x1b, 0x55, 0xef, 0x92, 0xac, 0x33,
0xf1, 0x59, 0x26, 0xb9, 0x6e, 0xe2, 0xa9, 0x4e, 0xcc, 0x6b, 0x9d, 0xda, 0xa9, 0xd7, 0x24, 0xc8,
0x6a, 0xa7, 0x5e, 0xa3, 0x7c, 0x4a, 0x82, 0x4e, 0x97, 0xe4, 0x3f, 0x3b, 0x8f, 0xff, 0x0b, 0x00,
0x00, 0xff, 0xff, 0x9b, 0xf8, 0x72, 0xd3, 0xf0, 0x11, 0x00, 0x00,
// 1282 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x58, 0xdd, 0x73, 0xdb, 0x44,
0x10, 0x8f, 0x6a, 0x3b, 0xb6, 0xd7, 0x76, 0xeb, 0x5e, 0xd2, 0xc6, 0x55, 0x93, 0xe0, 0x5e, 0x4b,
0xeb, 0xb6, 0x21, 0x85, 0x74, 0x80, 0xc2, 0x13, 0x24, 0x81, 0x21, 0x0f, 0x6d, 0x67, 0x94, 0x36,
0x03, 0x94, 0x19, 0xcd, 0x45, 0xba, 0x24, 0x9a, 0xc8, 0x92, 0x2b, 0x9d, 0x42, 0xc3, 0xf0, 0xdf,
0x30, 0xbc, 0xf0, 0xc0, 0x3f, 0xc8, 0x0c, 0xc3, 0xdc, 0x87, 0x14, 0x7d, 0xc6, 0x2a, 0x7d, 0x3b,
0xed, 0xed, 0xfe, 0xf6, 0xe3, 0xf6, 0xf6, 0x7e, 0x36, 0x2c, 0x9d, 0xf9, 0x6e, 0x34, 0xa5, 0x66,
0x48, 0x83, 0x33, 0x1a, 0x6c, 0xce, 0x02, 0x9f, 0xf9, 0x68, 0x98, 0x11, 0x9a, 0xb3, 0x43, 0xfc,
0x04, 0xd0, 0x36, 0x61, 0xd6, 0xc9, 0x2e, 0x75, 0x29, 0xa3, 0x06, 0x7d, 0x1b, 0xd1, 0x90, 0xa1,
0x5b, 0xd0, 0x39, 0x72, 0x5c, 0x6a, 0x3a, 0x76, 0x38, 0xd2, 0xc6, 0x8d, 0x49, 0xd7, 0x68, 0xf3,
0xef, 0x3d, 0x3b, 0xc4, 0x2f, 0x61, 0x29, 0x63, 0x10, 0xce, 0x7c, 0x2f, 0xa4, 0xe8, 0x19, 0xb4,
0x03, 0x1a, 0x46, 0x2e, 0x93, 0x06, 0xbd, 0xad, 0xf5, 0xcd, 0xbc, 0xaf, 0xcd, 0xc4, 0x24, 0x72,
0x99, 0x11, 0xab, 0x63, 0x07, 0xfa, 0xe9, 0x0d, 0xb4, 0x02, 0x6d, 0xe5, 0x7b, 0xa4, 0x8d, 0xb5,
0x49, 0xd7, 0x58, 0x94, 0xae, 0xd1, 0x4d, 0x58, 0x0c, 0x19, 0x61, 0x51, 0x38, 0xba, 0x32, 0xd6,
0x26, 0x2d, 0x43, 0x7d, 0xa1, 0x65, 0x68, 0xd1, 0x20, 0xf0, 0x83, 0x51, 0x43, 0xa8, 0xcb, 0x0f,
0x84, 0xa0, 0x19, 0x3a, 0xbf, 0xd1, 0x51, 0x73, 0xac, 0x4d, 0x06, 0x86, 0x58, 0xe3, 0x36, 0xb4,
0xbe, 0x9b, 0xce, 0xd8, 0x39, 0xfe, 0x12, 0x46, 0x07, 0xc4, 0x8a, 0xa2, 0xe9, 0x81, 0x88, 0x71,
0xe7, 0x84, 0x5a, 0xa7, 0x71, 0xee, 0xb7, 0xa1, 0xab, 0x22, 0x57, 0x11, 0x0c, 0x8c, 0x8e, 0x14,
0xec, 0xd9, 0xf8, 0x1b, 0xb8, 0x55, 0x62, 0xa8, 0x6a, 0x70, 0x17, 0x06, 0xc7, 0x24, 0x38, 0x24,
0xc7, 0xd4, 0x0c, 0x08, 0x73, 0x7c, 0x61, 0xad, 0x19, 0x7d, 0x25, 0x34, 0xb8, 0x0c, 0xbf, 0x01,
0x3d, 0x83, 0xe0, 0x4f, 0x67, 0xc4, 0x62, 0x75, 0x9c, 0xa3, 0x31, 0xf4, 0x66, 0x01, 0x25, 0xae,
0xeb, 0x5b, 0x84, 0x51, 0x51, 0x85, 0x86, 0x91, 0x16, 0xe1, 0x35, 0xb8, 0x5d, 0x0a, 0x2e, 0x03,
0xc4, 0xcf, 0x72, 0xd1, 0xfb, 0xd3, 0xa9, 0x53, 0xcb, 0x35, 0x5e, 0x2d, 0x44, 0x2d, 0x2c, 0x15,
0xee, 0x57, 0xb9, 0x5d, 0x97, 0x12, 0x2f, 0x9a, 0xd5, 0x02, 0xce, 0x47, 0x1c, 0x9b, 0x26, 0xc8,
0x2b, 0xb2, 0x39, 0x76, 0x7c, 0xd7, 0xa5, 0x16, 0x73, 0x7c, 0x2f, 0x86, 0x5d, 0x07, 0xb0, 0x12,
0xa1, 0x6a, 0x95, 0x94, 0x04, 0xeb, 0x30, 0x2a, 0x9a, 0x2a, 0xd8, 0xbf, 0x34, 0xb8, 0xf1, 0xad,
0x2a, 0x9a, 0x74, 0x5c, 0xeb, 0x00, 0xb2, 0x2e, 0xaf, 0xe4, 0x5d, 0xe6, 0x0f, 0xa8, 0x51, 0x38,
0x20, 0xae, 0x11, 0xd0, 0x99, 0xeb, 0x58, 0x44, 0x40, 0x34, 0x05, 0x44, 0x5a, 0x84, 0x86, 0xd0,
0x60, 0xcc, 0x1d, 0xb5, 0xc4, 0x0e, 0x5f, 0xe2, 0x11, 0xdc, 0xcc, 0xc7, 0xaa, 0xd2, 0xf8, 0x02,
0x56, 0xa4, 0x64, 0xff, 0xdc, 0xb3, 0xf6, 0xc5, 0x6d, 0xa8, 0x55, 0xf4, 0x7f, 0x34, 0x18, 0x15,
0x0d, 0x55, 0x17, 0x7f, 0x68, 0x05, 0xde, 0x37, 0x3f, 0xf4, 0x11, 0xf4, 0x18, 0x71, 0x5c, 0xd3,
0x3f, 0x3a, 0x0a, 0x29, 0x1b, 0x2d, 0x8e, 0xb5, 0x49, 0xd3, 0x00, 0x2e, 0x7a, 0x29, 0x24, 0xe8,
0x21, 0x0c, 0x2d, 0xd9, 0xc9, 0x66, 0x40, 0xcf, 0x9c, 0x90, 0x23, 0xb7, 0x45, 0x60, 0xd7, 0xac,
0xb8, 0xc3, 0xa5, 0x18, 0x61, 0x18, 0x38, 0xf6, 0x3b, 0x53, 0x0c, 0x10, 0x71, 0xfd, 0x3b, 0x02,
0xad, 0xe7, 0xd8, 0xef, 0xbe, 0x77, 0x5c, 0xba, 0xcf, 0xa7, 0xc0, 0x01, 0xac, 0xca, 0xe4, 0xf7,
0x3c, 0x2b, 0xa0, 0x53, 0xea, 0x31, 0xe2, 0xee, 0xf8, 0xb3, 0xf3, 0x5a, 0x2d, 0x70, 0x0b, 0x3a,
0xa1, 0xe3, 0x59, 0xd4, 0xf4, 0xe4, 0x18, 0x6a, 0x1a, 0x6d, 0xf1, 0xfd, 0x22, 0xc4, 0xdb, 0xb0,
0x56, 0x81, 0xab, 0x2a, 0x7b, 0x07, 0xfa, 0x22, 0x30, 0xcb, 0xf7, 0x18, 0xf5, 0x98, 0xc0, 0xee,
0x1b, 0x3d, 0x2e, 0xdb, 0x91, 0x22, 0xfc, 0x19, 0x20, 0x89, 0xf1, 0xdc, 0x8f, 0xbc, 0x7a, 0x57,
0xf3, 0x06, 0x2c, 0x65, 0x4c, 0x54, 0x6f, 0x3c, 0x85, 0x65, 0x29, 0x7e, 0xed, 0x4d, 0x6b, 0x63,
0xad, 0xc0, 0x8d, 0x9c, 0x91, 0x42, 0xdb, 0x8a, 0x9d, 0x64, 0xdf, 0x89, 0x4b, 0xc1, 0x6e, 0xc6,
0x11, 0x64, 0x9f, 0x0a, 0xfc, 0xb7, 0x06, 0xd7, 0xe3, 0x31, 0x52, 0xb3, 0xea, 0xef, 0xd9, 0x76,
0x8d, 0xca, 0xb6, 0x6b, 0x5e, 0xb4, 0xdd, 0x04, 0x86, 0xa1, 0x1f, 0x05, 0x16, 0x35, 0x6d, 0xc2,
0x88, 0xe9, 0xf9, 0x36, 0x55, 0x5d, 0x79, 0x55, 0xca, 0x77, 0x09, 0x23, 0x2f, 0x7c, 0x9b, 0xe2,
0xe5, 0xf8, 0x50, 0xd2, 0xa7, 0x89, 0x3d, 0xb8, 0xc6, 0xbf, 0x79, 0x5b, 0xd5, 0xcc, 0xa1, 0xe7,
0x84, 0x66, 0xdc, 0x9d, 0x22, 0x89, 0x8e, 0xd1, 0x75, 0xc2, 0x3d, 0xd9, 0x9a, 0x6a, 0xdf, 0x26,
0x4c, 0xee, 0x37, 0xe2, 0xfd, 0x5d, 0xc2, 0xf8, 0x3e, 0xfe, 0x1c, 0x86, 0x17, 0xfe, 0xea, 0x77,
0x54, 0x14, 0x17, 0xfb, 0x15, 0x71, 0xdc, 0x0f, 0x6c, 0x71, 0x34, 0x81, 0x6b, 0x76, 0x40, 0x1c,
0xcf, 0xf1, 0x8e, 0xf7, 0xa9, 0xe5, 0x7b, 0x76, 0x28, 0xe2, 0x1c, 0x18, 0x79, 0x31, 0xfe, 0x39,
0xae, 0x99, 0x74, 0x7b, 0xf1, 0x42, 0x7a, 0x94, 0xda, 0x2e, 0x35, 0x4f, 0x28, 0xb1, 0x69, 0xa0,
0x02, 0xee, 0x4b, 0xe1, 0x0f, 0x42, 0xc6, 0xe7, 0x81, 0x52, 0x3a, 0xf4, 0xed, 0x73, 0x11, 0x42,
0xdf, 0x00, 0x29, 0xda, 0xf6, 0xed, 0x73, 0xfc, 0x35, 0xdc, 0x36, 0x28, 0xb1, 0x25, 0xbe, 0xb8,
0xd6, 0xf5, 0x47, 0xdf, 0xbf, 0x1a, 0xac, 0x96, 0x1b, 0xd7, 0x19, 0x7f, 0x1b, 0x80, 0x92, 0xf1,
0xc2, 0x9c, 0x29, 0x0d, 0x19, 0x99, 0xce, 0x54, 0x91, 0x86, 0x6a, 0xc6, 0xbc, 0x8a, 0xe5, 0xc5,
0x61, 0xd4, 0x28, 0x0c, 0x23, 0x8e, 0x18, 0x1f, 0x79, 0x0a, 0xb1, 0x29, 0x11, 0x6d, 0x79, 0xf4,
0x19, 0xc4, 0x44, 0x5b, 0x20, 0xb6, 0x24, 0xa2, 0x52, 0x14, 0x88, 0x6b, 0x00, 0xaa, 0x27, 0x22,
0x2f, 0x9e, 0xa6, 0x5d, 0xd9, 0x11, 0x91, 0xc7, 0xf0, 0x8f, 0x00, 0xbb, 0x4e, 0x78, 0x2a, 0xb3,
0xe6, 0xd7, 0xc2, 0x76, 0x02, 0xf5, 0x7a, 0xf2, 0x25, 0x97, 0x10, 0xd7, 0x55, 0x39, 0xf1, 0x25,
0x67, 0x52, 0x51, 0x48, 0x6d, 0x15, 0xbd, 0x58, 0x73, 0xd9, 0x51, 0x40, 0xa9, 0x0a, 0x54, 0xac,
0xf1, 0x1f, 0x1a, 0x74, 0x9f, 0xd3, 0xa9, 0x42, 0x5e, 0x07, 0x38, 0xf6, 0x03, 0x3f, 0x62, 0x8e,
0x47, 0x43, 0xe1, 0xa0, 0x65, 0xa4, 0x24, 0xff, 0xdf, 0x8f, 0x60, 0x76, 0xd4, 0x3d, 0x52, 0xb9,
0x8b, 0x35, 0x97, 0x9d, 0x50, 0x32, 0x53, 0xe9, 0x8a, 0x35, 0xe7, 0x85, 0x21, 0x23, 0xd6, 0xa9,
0x78, 0x2b, 0x9a, 0x86, 0xfc, 0xd8, 0xfa, 0x73, 0x00, 0x7d, 0xf5, 0xf6, 0x09, 0x62, 0x8a, 0x7e,
0x81, 0x5e, 0x8a, 0xd0, 0xa2, 0x7b, 0x45, 0xde, 0x5a, 0x24, 0xc8, 0xfa, 0xc7, 0x73, 0xb4, 0xd4,
0x8c, 0x58, 0x40, 0x1e, 0x5c, 0x2f, 0x10, 0x46, 0xf4, 0xa8, 0x68, 0x5d, 0x45, 0x47, 0xf5, 0xc7,
0xb5, 0x74, 0x13, 0x7f, 0x0c, 0x96, 0x4a, 0x18, 0x20, 0xda, 0x98, 0x83, 0x92, 0x61, 0xa1, 0xfa,
0x27, 0x35, 0xb5, 0x13, 0xaf, 0x6f, 0x01, 0x15, 0xe9, 0x21, 0x7a, 0x3c, 0x17, 0xe6, 0x82, 0x7e,
0xea, 0x1b, 0xf5, 0x94, 0x2b, 0x13, 0x95, 0xc4, 0x71, 0x6e, 0xa2, 0x19, 0x6a, 0x3a, 0x37, 0xd1,
0x1c, 0x1b, 0x5d, 0x40, 0xa7, 0x30, 0xcc, 0x93, 0x4a, 0xf4, 0xb0, 0xea, 0x97, 0x4e, 0x81, 0xb3,
0xea, 0x8f, 0xea, 0xa8, 0x26, 0xce, 0x28, 0x5c, 0xcd, 0x12, 0x3f, 0xf4, 0xa0, 0x68, 0x5f, 0x4a,
0x63, 0xf5, 0xc9, 0x7c, 0xc5, 0x74, 0x4e, 0x79, 0x32, 0x58, 0x96, 0x53, 0x05, 0xd3, 0x2c, 0xcb,
0xa9, 0x8a, 0x5b, 0xe2, 0x05, 0xf4, 0x7b, 0xcc, 0x30, 0x72, 0x24, 0x09, 0x6d, 0x56, 0xc1, 0x94,
0xb3, 0x34, 0xfd, 0x49, 0x6d, 0xfd, 0xd8, 0xf7, 0xa7, 0x1a, 0xbf, 0xeb, 0x29, 0xae, 0x54, 0x76,
0xd7, 0x8b, 0xec, 0xab, 0xec, 0xae, 0x97, 0x11, 0xae, 0x05, 0x74, 0x08, 0x83, 0x0c, 0x7b, 0x42,
0xf7, 0xab, 0x2c, 0xb3, 0x9c, 0x4c, 0x7f, 0x30, 0x57, 0x2f, 0xf1, 0x61, 0xc6, 0xd3, 0x4b, 0x8d,
0xab, 0xca, 0xe0, 0xb2, 0xf3, 0xea, 0xfe, 0x3c, 0xb5, 0xc4, 0xc1, 0x4f, 0x00, 0x17, 0x64, 0x07,
0xdd, 0xad, 0xb2, 0x4b, 0x1f, 0xc5, 0xbd, 0xcb, 0x95, 0x12, 0xe8, 0x5f, 0x61, 0xb9, 0xec, 0xe9,
0x45, 0x25, 0xb7, 0xf0, 0x92, 0xf7, 0x5d, 0xdf, 0xac, 0xab, 0x9e, 0x38, 0x7e, 0x0d, 0x9d, 0x98,
0x3a, 0xa1, 0x3b, 0x45, 0xeb, 0x1c, 0x8d, 0xd3, 0xf1, 0x65, 0x2a, 0xa9, 0x6e, 0x7a, 0x13, 0x97,
0x8a, 0x73, 0x9c, 0xea, 0x52, 0xa5, 0x88, 0x57, 0x75, 0xa9, 0xd2, 0x34, 0x89, 0x83, 0x1f, 0x2e,
0x8a, 0x7f, 0x6c, 0x9e, 0xfe, 0x17, 0x00, 0x00, 0xff, 0xff, 0x07, 0xe0, 0xca, 0x7c, 0xc8, 0x11,
0x00, 0x00,
}

View file

@ -9,57 +9,63 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage"
)
func (vs *VolumeServer) VolumeStreamFollow(req *volume_server_pb.VolumeStreamFollowRequest, stream volume_server_pb.VolumeServer_VolumeStreamFollowServer) error {
func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stream volume_server_pb.VolumeServer_VolumeTailServer) error {
v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
if v == nil {
return fmt.Errorf("not found volume id %d", req.VolumeId)
}
defer glog.V(1).Infof("tailing volume %d finished", v.Id)
lastTimestampNs := req.SinceNs
drainingSeconds := req.DrainingSeconds
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs)
if err != nil {
return fmt.Errorf("streamFollow: %v", err)
}
if req.DrainingSeconds == 0 {
continue
}
if lastProcessedTimestampNs == lastTimestampNs {
drainingSeconds--
if drainingSeconds <= 0 {
return nil
}
glog.V(0).Infof("volume %d drains requests with %d seconds remaining ...", v.Id, drainingSeconds)
} else {
drainingSeconds = req.DrainingSeconds
glog.V(0).Infof("volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds)
}
lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs)
if err != nil {
glog.Infof("sendNeedlesSince: %v", err)
return fmt.Errorf("streamFollow: %v", err)
}
time.Sleep(2 * time.Second)
if req.DrainingSeconds == 0 {
continue
}
if lastProcessedTimestampNs == lastTimestampNs {
drainingSeconds--
if drainingSeconds <= 0 {
return nil
}
glog.V(1).Infof("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds)
} else {
lastTimestampNs = lastProcessedTimestampNs
drainingSeconds = req.DrainingSeconds
glog.V(1).Infof("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds)
}
}
}
func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeStreamFollowServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs)
if err != nil {
return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err)
}
// log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne)
if isLastOne {
return lastTimestampNs, nil
// need to heart beat to the client to ensure the connection health
sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{})
return lastTimestampNs, sendErr
}
err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error {
sendErr := stream.Send(&volume_server_pb.VolumeStreamFollowResponse{
sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{
NeedleHeader: needleHeader,
NeedleBody: needleBody,
})