diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go index b90517009..b234f5c4d 100644 --- a/unmaintained/volume_tailer/volume_tailer.go +++ b/unmaintained/volume_tailer/volume_tailer.go @@ -1,26 +1,22 @@ package main import ( - "context" "flag" - "fmt" + "log" + "time" "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/security" weed_server "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/spf13/viper" "golang.org/x/tools/godoc/util" - "google.golang.org/grpc" - - "io" - "log" ) var ( master = flag.String("master", "localhost:9333", "master server host and port") volumeId = flag.Int("volumeId", -1, "a volume id") + rewindDuration = flag.Duration("rewind", -1, "rewind back in time. -1 means from the first entry. 0 means from now.") timeoutSeconds = flag.Int("timeoutSeconds", 0, "disconnect if no activity after these seconds") showTextFile = flag.Bool("showTextFile", false, "display textual file content") ) @@ -33,7 +29,16 @@ func main() { vid := storage.VolumeId(*volumeId) - err := TailVolume(*master, grpcDialOption, vid, func(n *storage.Needle) (err error) { + var sinceTimeNs int64 + if *rewindDuration == 0 { + sinceTimeNs = time.Now().UnixNano() + } else if *rewindDuration == -1 { + sinceTimeNs = 0 + } else if *rewindDuration > 0 { + sinceTimeNs = time.Now().Add(-*rewindDuration).UnixNano() + } + + err := storage.TailVolume(*master, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *storage.Needle) (err error) { if n.Size == 0 { println("-", n.String()) return nil @@ -63,70 +68,3 @@ func main() { } } - -func TailVolume(master string, grpcDialOption grpc.DialOption, vid storage.VolumeId, fn func(n *storage.Needle) error) error { - // find volume location, replication, ttl info - lookup, err := operation.Lookup(master, vid.String()) - if err != nil { - return fmt.Errorf("Error looking up volume %d: %v", vid, err) - } - if len(lookup.Locations) == 0 { - return fmt.Errorf("unable to locate volume %d", vid) - } - - volumeServer := lookup.Locations[0].Url - - return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - - stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{ - VolumeId: uint32(vid), - SinceNs: 0, - DrainingSeconds: uint32(*timeoutSeconds), - }) - if err != nil { - return err - } - - for { - resp, recvErr := stream.Recv() - if recvErr != nil { - if recvErr == io.EOF { - break - } else { - return recvErr - } - } - - needleHeader := resp.NeedleHeader - needleBody := resp.NeedleBody - - if len(needleHeader) == 0 { - continue - } - - for !resp.IsLastChunk { - resp, recvErr = stream.Recv() - if recvErr != nil { - if recvErr == io.EOF { - break - } else { - return recvErr - } - } - needleBody = append(needleBody, resp.NeedleBody...) - } - - n := new(storage.Needle) - n.ParseNeedleHeader(needleHeader) - n.ReadNeedleBodyBytes(needleBody, storage.CurrentVersion) - - err = fn(n) - - if err != nil { - return err - } - - } - return nil - }) -} diff --git a/weed/storage/tail_volume.go.go b/weed/storage/tail_volume.go.go new file mode 100644 index 000000000..31ad058b1 --- /dev/null +++ b/weed/storage/tail_volume.go.go @@ -0,0 +1,78 @@ +package storage + +import ( + "context" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "google.golang.org/grpc" +) + +func TailVolume(master string, grpcDialOption grpc.DialOption, vid VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *Needle) error) error { + // find volume location, replication, ttl info + lookup, err := operation.Lookup(master, vid.String()) + if err != nil { + return fmt.Errorf("look up volume %d: %v", vid, err) + } + if len(lookup.Locations) == 0 { + return fmt.Errorf("unable to locate volume %d", vid) + } + + volumeServer := lookup.Locations[0].Url + + return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + + stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{ + VolumeId: uint32(vid), + SinceNs: sinceNs, + DrainingSeconds: uint32(timeoutSeconds), + }) + if err != nil { + return err + } + + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + + needleHeader := resp.NeedleHeader + needleBody := resp.NeedleBody + + if len(needleHeader) == 0 { + continue + } + + for !resp.IsLastChunk { + resp, recvErr = stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + needleBody = append(needleBody, resp.NeedleBody...) + } + + n := new(Needle) + n.ParseNeedleHeader(needleHeader) + n.ReadNeedleBodyBytes(needleBody, CurrentVersion) + + err = fn(n) + + if err != nil { + return err + } + + } + return nil + }) +}