seaweedfs/weed/operation/tail_volume.go

90 lines
2.2 KiB
Go
Raw Normal View History

2019-04-19 04:43:36 +00:00
package operation
2019-04-19 04:17:43 +00:00
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
2019-04-19 04:17:43 +00:00
"io"
"google.golang.org/grpc"
2019-04-19 04:17:43 +00:00
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
2019-04-19 04:43:36 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/needle"
2019-04-19 04:17:43 +00:00
)
func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
2019-04-19 04:17:43 +00:00
// find volume location, replication, ttl info
lookup, err := LookupVolumeId(masterFn, grpcDialOption, vid.String())
2019-04-19 04:17:43 +00:00
if err != nil {
return fmt.Errorf("look up volume %d: %v", vid, err)
}
if len(lookup.Locations) == 0 {
return fmt.Errorf("unable to locate volume %d", vid)
}
volumeServer := lookup.Locations[0].ServerAddress()
2019-04-19 04:17:43 +00:00
2019-04-20 19:05:28 +00:00
return TailVolumeFromSource(volumeServer, grpcDialOption, vid, sinceNs, timeoutSeconds, fn)
2019-04-20 18:35:20 +00:00
}
func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
return WithVolumeServerClient(true, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
2019-04-19 04:17:43 +00:00
stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{
2019-04-20 19:05:28 +00:00
VolumeId: uint32(vid),
SinceNs: sinceNs,
IdleTimeoutSeconds: uint32(idleTimeoutSeconds),
2019-04-19 04:17:43 +00:00
})
if err != nil {
return err
}
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return recvErr
}
}
needleHeader := resp.NeedleHeader
needleBody := resp.NeedleBody
if len(needleHeader) == 0 {
continue
}
for !resp.IsLastChunk {
resp, recvErr = stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return recvErr
}
}
needleBody = append(needleBody, resp.NeedleBody...)
}
2019-04-19 04:43:36 +00:00
n := new(needle.Needle)
2019-04-19 04:17:43 +00:00
n.ParseNeedleHeader(needleHeader)
2021-08-10 09:50:28 +00:00
err = n.ReadNeedleBodyBytes(needleBody, needle.CurrentVersion)
if err != nil {
return err
}
2019-04-19 04:17:43 +00:00
err = fn(n)
if err != nil {
return err
}
}
return nil
})
}