diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go index 8a662ef5e..b90517009 100644 --- a/unmaintained/volume_tailer/volume_tailer.go +++ b/unmaintained/volume_tailer/volume_tailer.go @@ -3,6 +3,7 @@ package main import ( "context" "flag" + "fmt" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -11,6 +12,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" "github.com/spf13/viper" "golang.org/x/tools/godoc/util" + "google.golang.org/grpc" "io" "log" @@ -31,16 +33,50 @@ func main() { vid := storage.VolumeId(*volumeId) - // find volume location, replication, ttl info - lookup, err := operation.Lookup(*master, vid.String()) - if err != nil { - log.Printf("Error looking up volume %d: %v", vid, err) - return - } - volumeServer := lookup.Locations[0].Url - log.Printf("volume %d is on volume server %s", vid, volumeServer) + err := TailVolume(*master, grpcDialOption, vid, func(n *storage.Needle) (err error) { + if n.Size == 0 { + println("-", n.String()) + return nil + } else { + println("+", n.String()) + } - err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + if *showTextFile { + + data := n.Data + if n.IsGzipped() { + if data, err = operation.UnGzipData(data); err != nil { + return err + } + } + if util.IsText(data) { + println(string(data)) + } + + println("-", n.String(), "compressed", n.IsGzipped(), "original size", len(data)) + } + return nil + }) + + if err != nil { + log.Printf("Error VolumeTail volume %d: %v", vid, err) + } + +} + +func TailVolume(master string, grpcDialOption grpc.DialOption, vid storage.VolumeId, fn func(n *storage.Needle) error) error { + // find volume location, replication, ttl info + lookup, err := operation.Lookup(master, vid.String()) + if err != nil { + return fmt.Errorf("Error looking up volume %d: %v", vid, err) + } + if len(lookup.Locations) == 0 { + return fmt.Errorf("unable to locate volume %d", vid) + } + + volumeServer := lookup.Locations[0].Url + + return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{ VolumeId: uint32(vid), @@ -84,36 +120,13 @@ func main() { n.ParseNeedleHeader(needleHeader) n.ReadNeedleBodyBytes(needleBody, storage.CurrentVersion) - if n.Size == 0 { - println("-", n.String()) - continue - } else { - println("+", n.String()) - } + err = fn(n) - if *showTextFile { - - data := n.Data - if n.IsGzipped() { - if data, err = operation.UnGzipData(data); err != nil { - continue - } - } - if util.IsText(data) { - println(string(data)) - } - - println("-", n.String(), "compressed", n.IsGzipped(), "original size", len(data)) + if err != nil { + return err } } - return nil - }) - - if err != nil { - log.Printf("Error VolumeTail volume %d: %v", vid, err) - } - }