refactoring

This commit is contained in:
Chris Lu 2019-04-18 20:04:44 -07:00
parent fa176fe80f
commit 072644969e

View file

@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"flag" "flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
@ -11,6 +12,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/spf13/viper" "github.com/spf13/viper"
"golang.org/x/tools/godoc/util" "golang.org/x/tools/godoc/util"
"google.golang.org/grpc"
"io" "io"
"log" "log"
@ -31,16 +33,50 @@ func main() {
vid := storage.VolumeId(*volumeId) vid := storage.VolumeId(*volumeId)
// find volume location, replication, ttl info err := TailVolume(*master, grpcDialOption, vid, func(n *storage.Needle) (err error) {
lookup, err := operation.Lookup(*master, vid.String()) if n.Size == 0 {
if err != nil { println("-", n.String())
log.Printf("Error looking up volume %d: %v", vid, err) return nil
return } else {
println("+", n.String())
} }
volumeServer := lookup.Locations[0].Url
log.Printf("volume %d is on volume server %s", vid, volumeServer)
err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { if *showTextFile {
data := n.Data
if n.IsGzipped() {
if data, err = operation.UnGzipData(data); err != nil {
return err
}
}
if util.IsText(data) {
println(string(data))
}
println("-", n.String(), "compressed", n.IsGzipped(), "original size", len(data))
}
return nil
})
if err != nil {
log.Printf("Error VolumeTail volume %d: %v", vid, err)
}
}
func TailVolume(master string, grpcDialOption grpc.DialOption, vid storage.VolumeId, fn func(n *storage.Needle) error) error {
// find volume location, replication, ttl info
lookup, err := operation.Lookup(master, vid.String())
if err != nil {
return fmt.Errorf("Error looking up volume %d: %v", vid, err)
}
if len(lookup.Locations) == 0 {
return fmt.Errorf("unable to locate volume %d", vid)
}
volumeServer := lookup.Locations[0].Url
return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{ stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{
VolumeId: uint32(vid), VolumeId: uint32(vid),
@ -84,36 +120,13 @@ func main() {
n.ParseNeedleHeader(needleHeader) n.ParseNeedleHeader(needleHeader)
n.ReadNeedleBodyBytes(needleBody, storage.CurrentVersion) n.ReadNeedleBodyBytes(needleBody, storage.CurrentVersion)
if n.Size == 0 { err = fn(n)
println("-", n.String())
continue
} else {
println("+", n.String())
}
if *showTextFile {
data := n.Data
if n.IsGzipped() {
if data, err = operation.UnGzipData(data); err != nil {
continue
}
}
if util.IsText(data) {
println(string(data))
}
println("-", n.String(), "compressed", n.IsGzipped(), "original size", len(data))
}
}
return nil
})
if err != nil { if err != nil {
log.Printf("Error VolumeTail volume %d: %v", vid, err) return err
} }
}
return nil
})
} }