package weed_server import ( "fmt" "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stream volume_server_pb.VolumeServer_VolumeTailServer) error { v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return fmt.Errorf("not found volume id %d", req.VolumeId) } defer glog.V(1).Infof("tailing volume %d finished", v.Id) lastTimestampNs := req.SinceNs drainingSeconds := req.DrainingSeconds for { lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs) if err != nil { glog.Infof("sendNeedlesSince: %v", err) return fmt.Errorf("streamFollow: %v", err) } time.Sleep(2 * time.Second) if req.DrainingSeconds == 0 { lastTimestampNs = lastProcessedTimestampNs continue } if lastProcessedTimestampNs == lastTimestampNs { drainingSeconds-- if drainingSeconds <= 0 { return nil } glog.V(1).Infof("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds) } else { lastTimestampNs = lastProcessedTimestampNs drainingSeconds = req.DrainingSeconds glog.V(1).Infof("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds) } } } func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) { foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs) if err != nil { return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err) } // log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne) if isLastOne { // need to heart beat to the client to ensure the connection health sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{IsLastChunk: true}) return lastTimestampNs, sendErr } err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error { blockSizeLimit := 1024 * 1024 * 2 isLastChunk := false // need to send body by chunks for i := 0; i < len(needleBody); i += blockSizeLimit { stopOffset := i + blockSizeLimit if stopOffset >= len(needleBody) { isLastChunk = true stopOffset = len(needleBody) } sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{ NeedleHeader: needleHeader, NeedleBody: needleBody[i:stopOffset], IsLastChunk: isLastChunk, }) if sendErr != nil { return sendErr } } lastProcessedTimestampNs = needleAppendAtNs return nil }) return }