seaweedfs/weed/server/volume_grpc_tail.go

136 lines
4 KiB
Go
Raw Permalink Normal View History

package weed_server
import (
2019-04-20 18:35:20 +00:00
"context"
"fmt"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
2019-04-20 18:35:20 +00:00
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
2019-04-19 04:43:36 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
2019-04-20 18:35:20 +00:00
func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error {
2019-04-19 04:43:36 +00:00
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return fmt.Errorf("not found volume id %d", req.VolumeId)
}
2019-04-18 18:05:02 +00:00
defer glog.V(1).Infof("tailing volume %d finished", v.Id)
lastTimestampNs := req.SinceNs
2019-04-20 19:05:28 +00:00
drainingSeconds := req.IdleTimeoutSeconds
for {
2019-04-18 18:05:02 +00:00
lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs)
if err != nil {
glog.Infof("sendNeedlesSince: %v", err)
return fmt.Errorf("streamFollow: %v", err)
}
time.Sleep(2 * time.Second)
2019-04-20 19:05:28 +00:00
if req.IdleTimeoutSeconds == 0 {
2019-04-19 02:22:13 +00:00
lastTimestampNs = lastProcessedTimestampNs
2019-04-18 18:05:02 +00:00
continue
}
if lastProcessedTimestampNs == lastTimestampNs {
drainingSeconds--
if drainingSeconds <= 0 {
return nil
}
2019-04-18 18:05:02 +00:00
glog.V(1).Infof("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds)
} else {
lastTimestampNs = lastProcessedTimestampNs
2019-04-20 19:05:28 +00:00
drainingSeconds = req.IdleTimeoutSeconds
2019-04-18 18:05:02 +00:00
glog.V(1).Infof("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds)
}
2019-04-18 18:05:02 +00:00
}
}
2019-04-20 18:35:20 +00:00
func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs)
if err != nil {
return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err)
}
2019-04-18 18:05:02 +00:00
// log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne)
if isLastOne {
2019-04-18 18:05:02 +00:00
// need to heart beat to the client to ensure the connection health
2019-04-20 18:35:20 +00:00
sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true})
2019-04-18 18:05:02 +00:00
return lastTimestampNs, sendErr
}
2019-10-22 07:50:30 +00:00
scanner := &VolumeFileScanner4Tailing{
2019-10-22 07:50:50 +00:00
stream: stream,
2019-10-22 07:50:30 +00:00
}
err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToAcutalOffset(), scanner)
2019-10-22 07:50:30 +00:00
return scanner.lastProcessedTimestampNs, err
}
2019-04-20 18:35:20 +00:00
func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_server_pb.VolumeTailReceiverRequest) (*volume_server_pb.VolumeTailReceiverResponse, error) {
resp := &volume_server_pb.VolumeTailReceiverResponse{}
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return resp, fmt.Errorf("receiver not found volume id %d", req.VolumeId)
}
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
2019-04-20 19:05:28 +00:00
return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
2019-07-18 06:43:48 +00:00
_, _, err := vs.store.WriteVolumeNeedle(v.Id, n)
2019-04-20 18:35:20 +00:00
return err
})
}
2019-10-22 07:50:30 +00:00
// generate the volume idx
type VolumeFileScanner4Tailing struct {
2019-10-22 07:50:50 +00:00
stream volume_server_pb.VolumeServer_VolumeTailSenderServer
2019-10-22 07:50:30 +00:00
lastProcessedTimestampNs uint64
}
func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock storage.SuperBlock) error {
return nil
}
func (scanner *VolumeFileScanner4Tailing) ReadNeedleBody() bool {
return true
}
func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
isLastChunk := false
// need to send body by chunks
for i := 0; i < len(needleBody); i += BufferSizeLimit {
stopOffset := i + BufferSizeLimit
if stopOffset >= len(needleBody) {
isLastChunk = true
stopOffset = len(needleBody)
}
sendErr := scanner.stream.Send(&volume_server_pb.VolumeTailSenderResponse{
NeedleHeader: needleHeader,
NeedleBody: needleBody[i:stopOffset],
IsLastChunk: isLastChunk,
})
if sendErr != nil {
return sendErr
}
}
scanner.lastProcessedTimestampNs = n.AppendAtNs
return nil
}