seaweedfs/weed/operation/tail_volume.go

83 lines
2 KiB
Go
Raw Permalink Normal View History

2019-04-19 04:43:36 +00:00
package operation
2019-04-19 04:17:43 +00:00
import (
"context"
"fmt"
"io"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
2019-04-19 04:43:36 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/needle"
2019-04-19 04:17:43 +00:00
"google.golang.org/grpc"
)
2019-04-19 04:43:36 +00:00
func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
2019-04-19 04:17:43 +00:00
// find volume location, replication, ttl info
2019-04-19 04:43:36 +00:00
lookup, err := Lookup(master, vid.String())
2019-04-19 04:17:43 +00:00
if err != nil {
return fmt.Errorf("look up volume %d: %v", vid, err)
}
if len(lookup.Locations) == 0 {
return fmt.Errorf("unable to locate volume %d", vid)
}
volumeServer := lookup.Locations[0].Url
2019-04-20 19:05:28 +00:00
return TailVolumeFromSource(volumeServer, grpcDialOption, vid, sinceNs, timeoutSeconds, fn)
2019-04-20 18:35:20 +00:00
}
2019-04-20 19:05:28 +00:00
func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
return WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
2019-04-19 04:17:43 +00:00
stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{
2019-04-20 19:05:28 +00:00
VolumeId: uint32(vid),
SinceNs: sinceNs,
IdleTimeoutSeconds: uint32(idleTimeoutSeconds),
2019-04-19 04:17:43 +00:00
})
if err != nil {
return err
}
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return recvErr
}
}
needleHeader := resp.NeedleHeader
needleBody := resp.NeedleBody
if len(needleHeader) == 0 {
continue
}
for !resp.IsLastChunk {
resp, recvErr = stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return recvErr
}
}
needleBody = append(needleBody, resp.NeedleBody...)
}
2019-04-19 04:43:36 +00:00
n := new(needle.Needle)
2019-04-19 04:17:43 +00:00
n.ParseNeedleHeader(needleHeader)
2019-04-19 04:43:36 +00:00
n.ReadNeedleBodyBytes(needleBody, needle.CurrentVersion)
2019-04-19 04:17:43 +00:00
err = fn(n)
if err != nil {
return err
}
}
return nil
})
}