mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
use the same context object in order to retry
This commit is contained in:
parent
0c298ef890
commit
72a64a5cf8
|
@ -466,7 +466,7 @@ func detectMimeType(f *os.File) string {
|
||||||
|
|
||||||
func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
|
func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
|
||||||
|
|
||||||
return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error {
|
return util.WithCachedGrpcClient(ctx, func(ctx context.Context, clientConn *grpc.ClientConn) error {
|
||||||
client := filer_pb.NewSeaweedFilerClient(clientConn)
|
client := filer_pb.NewSeaweedFilerClient(clientConn)
|
||||||
return fn(client)
|
return fn(client)
|
||||||
}, filerAddress, grpcDialOption)
|
}, filerAddress, grpcDialOption)
|
||||||
|
|
|
@ -22,7 +22,7 @@ func VolumeId(fileId string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
type FilerClient interface {
|
type FilerClient interface {
|
||||||
WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error
|
WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
|
func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
|
||||||
|
@ -33,7 +33,7 @@ func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath F
|
||||||
|
|
||||||
vid2Locations := make(map[string]*filer_pb.Locations)
|
vid2Locations := make(map[string]*filer_pb.Locations)
|
||||||
|
|
||||||
err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
|
glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
|
||||||
resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
|
resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
|
||||||
|
@ -97,7 +97,7 @@ func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath FullPat
|
||||||
|
|
||||||
dir, name := fullFilePath.DirAndName()
|
dir, name := fullFilePath.DirAndName()
|
||||||
|
|
||||||
err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.LookupDirectoryEntryRequest{
|
request := &filer_pb.LookupDirectoryEntryRequest{
|
||||||
Directory: dir,
|
Directory: dir,
|
||||||
|
@ -128,7 +128,7 @@ func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath FullPat
|
||||||
|
|
||||||
func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
|
func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
|
||||||
|
|
||||||
err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
lastEntryName := ""
|
lastEntryName := ""
|
||||||
|
|
||||||
|
|
|
@ -128,7 +128,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
|
||||||
}
|
}
|
||||||
glog.V(1).Infof("create: %v", req.String())
|
glog.V(1).Infof("create: %v", req.String())
|
||||||
|
|
||||||
if err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
if err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
|
if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
|
||||||
if strings.Contains(err.Error(), "EEXIST") {
|
if strings.Contains(err.Error(), "EEXIST") {
|
||||||
return fuse.EEXIST
|
return fuse.EEXIST
|
||||||
|
@ -139,11 +139,13 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
node := dir.newFile(req.Name, request.Entry)
|
var node fs.Node
|
||||||
if request.Entry.IsDirectory {
|
if request.Entry.IsDirectory {
|
||||||
|
node = dir.newDirectory(filer2.NewFullPath(dir.Path, req.Name), request.Entry)
|
||||||
return node, nil, nil
|
return node, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
node = dir.newFile(req.Name, request.Entry)
|
||||||
file := node.(*File)
|
file := node.(*File)
|
||||||
file.isOpen++
|
file.isOpen++
|
||||||
fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid)
|
fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid)
|
||||||
|
@ -165,7 +167,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.CreateEntryRequest{
|
request := &filer_pb.CreateEntryRequest{
|
||||||
Directory: dir.Path,
|
Directory: dir.Path,
|
||||||
|
@ -279,7 +281,7 @@ func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) erro
|
||||||
|
|
||||||
dir.wfs.cacheDelete(filePath)
|
dir.wfs.cacheDelete(filePath)
|
||||||
|
|
||||||
return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
return dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.DeleteEntryRequest{
|
request := &filer_pb.DeleteEntryRequest{
|
||||||
Directory: dir.Path,
|
Directory: dir.Path,
|
||||||
|
@ -303,7 +305,7 @@ func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error
|
||||||
|
|
||||||
dir.wfs.cacheDelete(filer2.NewFullPath(dir.Path, req.Name))
|
dir.wfs.cacheDelete(filer2.NewFullPath(dir.Path, req.Name))
|
||||||
|
|
||||||
return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
return dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.DeleteEntryRequest{
|
request := &filer_pb.DeleteEntryRequest{
|
||||||
Directory: dir.Path,
|
Directory: dir.Path,
|
||||||
|
@ -427,7 +429,7 @@ func (dir *Dir) saveEntry(ctx context.Context) error {
|
||||||
|
|
||||||
parentDir, name := filer2.FullPath(dir.Path).DirAndName()
|
parentDir, name := filer2.FullPath(dir.Path).DirAndName()
|
||||||
|
|
||||||
return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
return dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.UpdateEntryRequest{
|
request := &filer_pb.UpdateEntryRequest{
|
||||||
Directory: parentDir,
|
Directory: parentDir,
|
||||||
|
|
|
@ -35,7 +35,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
|
if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
|
||||||
glog.V(0).Infof("symlink %s/%s: %v", dir.Path, req.NewName, err)
|
glog.V(0).Infof("symlink %s/%s: %v", dir.Path, req.NewName, err)
|
||||||
return fuse.EIO
|
return fuse.EIO
|
||||||
|
|
|
@ -15,7 +15,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
|
||||||
newDir := newDirectory.(*Dir)
|
newDir := newDirectory.(*Dir)
|
||||||
glog.V(4).Infof("dir Rename %s/%s => %s/%s", dir.Path, req.OldName, newDir.Path, req.NewName)
|
glog.V(4).Infof("dir Rename %s/%s => %s/%s", dir.Path, req.OldName, newDir.Path, req.NewName)
|
||||||
|
|
||||||
err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.AtomicRenameEntryRequest{
|
request := &filer_pb.AtomicRenameEntryRequest{
|
||||||
OldDirectory: dir.Path,
|
OldDirectory: dir.Path,
|
||||||
|
|
|
@ -140,7 +140,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, reader io.
|
||||||
var fileId, host string
|
var fileId, host string
|
||||||
var auth security.EncodedJwt
|
var auth security.EncodedJwt
|
||||||
|
|
||||||
if err := pages.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
if err := pages.f.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.AssignVolumeRequest{
|
request := &filer_pb.AssignVolumeRequest{
|
||||||
Count: 1,
|
Count: 1,
|
||||||
|
|
|
@ -257,7 +257,7 @@ func (file *File) setEntry(entry *filer_pb.Entry) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (file *File) saveEntry(ctx context.Context) error {
|
func (file *File) saveEntry(ctx context.Context) error {
|
||||||
return file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
return file.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.UpdateEntryRequest{
|
request := &filer_pb.UpdateEntryRequest{
|
||||||
Directory: file.dir.Path,
|
Directory: file.dir.Path,
|
||||||
|
|
|
@ -169,7 +169,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = fh.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
err = fh.f.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
if fh.f.entry.Attributes != nil {
|
if fh.f.entry.Attributes != nil {
|
||||||
fh.f.entry.Attributes.Mime = fh.contentType
|
fh.f.entry.Attributes.Mime = fh.contentType
|
||||||
|
|
|
@ -88,22 +88,22 @@ func (wfs *WFS) Root() (fs.Node, error) {
|
||||||
return wfs.root, nil
|
return wfs.root, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
|
func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
|
||||||
|
|
||||||
err := util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
|
err := util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
|
||||||
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
||||||
return fn(client)
|
return fn(ctx2, client)
|
||||||
}, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
|
}, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if strings.Contains(err.Error(), "context canceled") {
|
if strings.Contains(err.Error(), "context canceled") {
|
||||||
time.Sleep(1337 * time.Millisecond)
|
time.Sleep(3337 * time.Millisecond)
|
||||||
glog.V(2).Infoln("retry context canceled request...")
|
glog.V(2).Infoln("retry context canceled request...")
|
||||||
return util.WithCachedGrpcClient(context.Background(), func(grpcConnection *grpc.ClientConn) error {
|
return util.WithCachedGrpcClient(context.Background(), func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
|
||||||
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
||||||
return fn(client)
|
return fn(ctx2, client)
|
||||||
}, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
|
}, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -163,7 +163,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
|
||||||
|
|
||||||
if wfs.stats.lastChecked < time.Now().Unix()-20 {
|
if wfs.stats.lastChecked < time.Now().Unix()-20 {
|
||||||
|
|
||||||
err := wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
err := wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.StatisticsRequest{
|
request := &filer_pb.StatisticsRequest{
|
||||||
Collection: wfs.option.Collection,
|
Collection: wfs.option.Collection,
|
||||||
|
|
|
@ -20,7 +20,7 @@ func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChu
|
||||||
fileIds = append(fileIds, chunk.GetFileIdString())
|
fileIds = append(fileIds, chunk.GetFileIdString())
|
||||||
}
|
}
|
||||||
|
|
||||||
wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
deleteFileIds(ctx, wfs.option.GrpcDialOption, client, fileIds)
|
deleteFileIds(ctx, wfs.option.GrpcDialOption, client, fileIds)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -117,7 +117,7 @@ func (wfs *WFS) maybeLoadEntry(ctx context.Context, dir, name string) (entry *fi
|
||||||
}
|
}
|
||||||
// glog.V(3).Infof("read entry cache miss %s", fullpath)
|
// glog.V(3).Infof("read entry cache miss %s", fullpath)
|
||||||
|
|
||||||
err = wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
err = wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.LookupDirectoryEntryRequest{
|
request := &filer_pb.LookupDirectoryEntryRequest{
|
||||||
Name: name,
|
Name: name,
|
||||||
|
|
|
@ -44,7 +44,7 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
lastError = WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error {
|
||||||
|
|
||||||
req := &master_pb.AssignRequest{
|
req := &master_pb.AssignRequest{
|
||||||
Count: primaryRequest.Count,
|
Count: primaryRequest.Count,
|
||||||
|
|
|
@ -117,7 +117,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
|
||||||
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
|
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
|
||||||
func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
|
func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
|
||||||
|
|
||||||
err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err = WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
|
|
||||||
req := &volume_server_pb.BatchDeleteRequest{
|
req := &volume_server_pb.BatchDeleteRequest{
|
||||||
FileIds: fileIds,
|
FileIds: fileIds,
|
||||||
|
|
|
@ -12,7 +12,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
|
func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(context.Context, volume_server_pb.VolumeServerClient) error) error {
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
@ -21,9 +21,9 @@ func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption,
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
|
return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
|
||||||
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
|
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
|
||||||
return fn(client)
|
return fn(ctx2, client)
|
||||||
}, grpcAddress, grpcDialOption)
|
}, grpcAddress, grpcDialOption)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -38,7 +38,7 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err
|
||||||
return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil
|
return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
|
func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(ctx2 context.Context, masterClient master_pb.SeaweedClient) error) error {
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
@ -47,9 +47,9 @@ func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption,
|
||||||
return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr)
|
return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
|
return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
|
||||||
client := master_pb.NewSeaweedClient(grpcConnection)
|
client := master_pb.NewSeaweedClient(grpcConnection)
|
||||||
return fn(client)
|
return fn(ctx2, client)
|
||||||
}, masterGrpcAddress, grpcDialOption)
|
}, masterGrpcAddress, grpcDialOption)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,12 +99,12 @@ func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []strin
|
||||||
|
|
||||||
//only query unknown_vids
|
//only query unknown_vids
|
||||||
|
|
||||||
err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
err := WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error {
|
||||||
|
|
||||||
req := &master_pb.LookupVolumeRequest{
|
req := &master_pb.LookupVolumeRequest{
|
||||||
VolumeIds: unknown_vids,
|
VolumeIds: unknown_vids,
|
||||||
}
|
}
|
||||||
resp, grpcErr := masterClient.LookupVolume(context.Background(), req)
|
resp, grpcErr := masterClient.LookupVolume(ctx, req)
|
||||||
if grpcErr != nil {
|
if grpcErr != nil {
|
||||||
return grpcErr
|
return grpcErr
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,9 +9,9 @@ import (
|
||||||
|
|
||||||
func Statistics(server string, grpcDialOption grpc.DialOption, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) {
|
func Statistics(server string, grpcDialOption grpc.DialOption, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) {
|
||||||
|
|
||||||
err = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
err = WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error {
|
||||||
|
|
||||||
grpcResponse, grpcErr := masterClient.Statistics(context.Background(), req)
|
grpcResponse, grpcErr := masterClient.Statistics(ctx, req)
|
||||||
if grpcErr != nil {
|
if grpcErr != nil {
|
||||||
return grpcErr
|
return grpcErr
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,9 +8,9 @@ import (
|
||||||
|
|
||||||
func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
|
func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
|
||||||
|
|
||||||
WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
WithVolumeServerClient(server, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
|
||||||
|
|
||||||
resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{
|
resp, err = client.VolumeSyncStatus(ctx, &volume_server_pb.VolumeSyncStatusRequest{
|
||||||
VolumeId: vid,
|
VolumeId: vid,
|
||||||
})
|
})
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -26,9 +26,9 @@ func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.Volume
|
||||||
}
|
}
|
||||||
|
|
||||||
func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
|
func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
|
||||||
return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
return WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
|
||||||
|
|
||||||
stream, err := client.VolumeTailSender(context.Background(), &volume_server_pb.VolumeTailSenderRequest{
|
stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{
|
||||||
VolumeId: uint32(vid),
|
VolumeId: uint32(vid),
|
||||||
SinceNs: sinceNs,
|
SinceNs: sinceNs,
|
||||||
IdleTimeoutSeconds: uint32(idleTimeoutSeconds),
|
IdleTimeoutSeconds: uint32(idleTimeoutSeconds),
|
||||||
|
|
|
@ -63,7 +63,7 @@ func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.Fi
|
||||||
var host string
|
var host string
|
||||||
var auth security.EncodedJwt
|
var auth security.EncodedJwt
|
||||||
|
|
||||||
if err := fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
if err := fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.AssignVolumeRequest{
|
request := &filer_pb.AssignVolumeRequest{
|
||||||
Count: 1,
|
Count: 1,
|
||||||
|
@ -104,11 +104,11 @@ func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.Fi
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerSink) withFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
|
func (fs *FilerSink) withFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
|
||||||
|
|
||||||
return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
|
return util.WithCachedGrpcClient(ctx, func(ctx context.Context, grpcConnection *grpc.ClientConn) error {
|
||||||
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
||||||
return fn(client)
|
return fn(ctx, client)
|
||||||
}, fs.grpcAddress, fs.grpcDialOption)
|
}, fs.grpcAddress, fs.grpcDialOption)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,7 +64,7 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerSink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error {
|
func (fs *FilerSink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error {
|
||||||
return fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
return fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
dir, name := filer2.FullPath(key).DirAndName()
|
dir, name := filer2.FullPath(key).DirAndName()
|
||||||
|
|
||||||
|
@ -87,7 +87,7 @@ func (fs *FilerSink) DeleteEntry(ctx context.Context, key string, isDirectory, d
|
||||||
|
|
||||||
func (fs *FilerSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error {
|
func (fs *FilerSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error {
|
||||||
|
|
||||||
return fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
return fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
dir, name := filer2.FullPath(key).DirAndName()
|
dir, name := filer2.FullPath(key).DirAndName()
|
||||||
|
|
||||||
|
@ -139,7 +139,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file
|
||||||
|
|
||||||
// read existing entry
|
// read existing entry
|
||||||
var existingEntry *filer_pb.Entry
|
var existingEntry *filer_pb.Entry
|
||||||
err = fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
err = fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.LookupDirectoryEntryRequest{
|
request := &filer_pb.LookupDirectoryEntryRequest{
|
||||||
Directory: dir,
|
Directory: dir,
|
||||||
|
@ -191,7 +191,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file
|
||||||
}
|
}
|
||||||
|
|
||||||
// save updated meta data
|
// save updated meta data
|
||||||
return true, fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
return true, fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.UpdateEntryRequest{
|
request := &filer_pb.UpdateEntryRequest{
|
||||||
Directory: newParentPath,
|
Directory: newParentPath,
|
||||||
|
|
|
@ -45,7 +45,7 @@ func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrl s
|
||||||
|
|
||||||
vid := volumeId(part)
|
vid := volumeId(part)
|
||||||
|
|
||||||
err = fs.withFilerClient(ctx, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
err = fs.withFilerClient(ctx, fs.grpcDialOption, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
glog.V(4).Infof("read lookup volume id locations: %v", vid)
|
glog.V(4).Infof("read lookup volume id locations: %v", vid)
|
||||||
resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
|
resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
|
||||||
|
@ -89,11 +89,11 @@ func (fs *FilerSource) ReadPart(ctx context.Context, part string) (filename stri
|
||||||
return filename, header, readCloser, err
|
return filename, header, readCloser, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerSource) withFilerClient(ctx context.Context, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
|
func (fs *FilerSource) withFilerClient(ctx context.Context, grpcDialOption grpc.DialOption, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
|
||||||
|
|
||||||
return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
|
return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
|
||||||
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
||||||
return fn(client)
|
return fn(ctx2, client)
|
||||||
}, fs.grpcAddress, fs.grpcDialOption)
|
}, fs.grpcAddress, fs.grpcDialOption)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,7 @@ func encodeResponse(response interface{}) []byte {
|
||||||
|
|
||||||
func (s3a *S3ApiServer) withFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
|
func (s3a *S3ApiServer) withFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
|
||||||
|
|
||||||
return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
|
return util.WithCachedGrpcClient(ctx, func(ctx context.Context, grpcConnection *grpc.ClientConn) error {
|
||||||
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
||||||
return fn(client)
|
return fn(client)
|
||||||
}, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption)
|
}, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption)
|
||||||
|
|
|
@ -124,8 +124,8 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func readFilerConfiguration(grpcDialOption grpc.DialOption, masterGrpcAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
|
func readFilerConfiguration(grpcDialOption grpc.DialOption, masterGrpcAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
|
||||||
err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error {
|
||||||
resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
|
resp, err := masterClient.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get master %s configuration: %v", masterGrpcAddress, err)
|
return fmt.Errorf("get master %s configuration: %v", masterGrpcAddress, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,8 +57,8 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, server := range collection.ListVolumeServers() {
|
for _, server := range collection.ListVolumeServers() {
|
||||||
err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
|
||||||
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
|
_, deleteErr := client.DeleteCollection(ctx, &volume_server_pb.DeleteCollectionRequest{
|
||||||
Collection: collectionName,
|
Collection: collectionName,
|
||||||
})
|
})
|
||||||
return deleteErr
|
return deleteErr
|
||||||
|
@ -77,8 +77,8 @@ func (ms *MasterServer) doDeleteEcCollection(collectionName string) error {
|
||||||
listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName)
|
listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName)
|
||||||
|
|
||||||
for _, server := range listOfEcServers {
|
for _, server := range listOfEcServers {
|
||||||
err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
|
||||||
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
|
_, deleteErr := client.DeleteCollection(ctx, &volume_server_pb.DeleteCollectionRequest{
|
||||||
Collection: collectionName,
|
Collection: collectionName,
|
||||||
})
|
})
|
||||||
return deleteErr
|
return deleteErr
|
||||||
|
|
|
@ -25,8 +25,8 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, server := range collection.ListVolumeServers() {
|
for _, server := range collection.ListVolumeServers() {
|
||||||
err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
|
||||||
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
|
_, deleteErr := client.DeleteCollection(ctx, &volume_server_pb.DeleteCollectionRequest{
|
||||||
Collection: collection.Name,
|
Collection: collection.Name,
|
||||||
})
|
})
|
||||||
return deleteErr
|
return deleteErr
|
||||||
|
|
|
@ -41,7 +41,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||||
// confirm size and timestamp
|
// confirm size and timestamp
|
||||||
var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
|
var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
|
||||||
var volumeFileName, idxFileName, datFileName string
|
var volumeFileName, idxFileName, datFileName string
|
||||||
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
|
||||||
var err error
|
var err error
|
||||||
volFileInfoResp, err = client.ReadVolumeFileStatus(ctx,
|
volFileInfoResp, err = client.ReadVolumeFileStatus(ctx,
|
||||||
&volume_server_pb.ReadVolumeFileStatusRequest{
|
&volume_server_pb.ReadVolumeFileStatusRequest{
|
||||||
|
|
|
@ -106,7 +106,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
|
||||||
|
|
||||||
baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
|
baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
|
||||||
|
|
||||||
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
|
||||||
|
|
||||||
// copy ec data slices
|
// copy ec data slices
|
||||||
for _, shardId := range req.ShardIds {
|
for _, shardId := range req.ShardIds {
|
||||||
|
|
|
@ -98,11 +98,11 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
|
func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
|
||||||
|
|
||||||
return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
|
return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
|
||||||
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
||||||
return fn(client)
|
return fn(ctx2, client)
|
||||||
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
|
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -137,7 +137,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm
|
||||||
return os.ErrExist
|
return os.ErrExist
|
||||||
}
|
}
|
||||||
|
|
||||||
return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
return fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
dir, name := filer2.FullPath(fullDirPath).DirAndName()
|
dir, name := filer2.FullPath(fullDirPath).DirAndName()
|
||||||
request := &filer_pb.CreateEntryRequest{
|
request := &filer_pb.CreateEntryRequest{
|
||||||
Directory: dir,
|
Directory: dir,
|
||||||
|
@ -186,7 +186,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
|
||||||
}
|
}
|
||||||
|
|
||||||
dir, name := filer2.FullPath(fullFilePath).DirAndName()
|
dir, name := filer2.FullPath(fullFilePath).DirAndName()
|
||||||
err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
err = fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
if err := filer_pb.CreateEntry(ctx, client, &filer_pb.CreateEntryRequest{
|
if err := filer_pb.CreateEntry(ctx, client, &filer_pb.CreateEntryRequest{
|
||||||
Directory: dir,
|
Directory: dir,
|
||||||
Entry: &filer_pb.Entry{
|
Entry: &filer_pb.Entry{
|
||||||
|
@ -251,7 +251,7 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string)
|
||||||
//_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath)
|
//_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath)
|
||||||
}
|
}
|
||||||
dir, name := filer2.FullPath(fullFilePath).DirAndName()
|
dir, name := filer2.FullPath(fullFilePath).DirAndName()
|
||||||
err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
err = fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.DeleteEntryRequest{
|
request := &filer_pb.DeleteEntryRequest{
|
||||||
Directory: dir,
|
Directory: dir,
|
||||||
|
@ -310,7 +310,7 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string)
|
||||||
oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName()
|
oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName()
|
||||||
newDir, newBaseName := filer2.FullPath(newName).DirAndName()
|
newDir, newBaseName := filer2.FullPath(newName).DirAndName()
|
||||||
|
|
||||||
return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
return fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.AtomicRenameEntryRequest{
|
request := &filer_pb.AtomicRenameEntryRequest{
|
||||||
OldDirectory: oldDir,
|
OldDirectory: oldDir,
|
||||||
|
@ -385,7 +385,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
|
||||||
var fileId, host string
|
var fileId, host string
|
||||||
var auth security.EncodedJwt
|
var auth security.EncodedJwt
|
||||||
|
|
||||||
if err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
if err = f.fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.AssignVolumeRequest{
|
request := &filer_pb.AssignVolumeRequest{
|
||||||
Count: 1,
|
Count: 1,
|
||||||
|
@ -429,7 +429,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
|
||||||
f.entry.Chunks = append(f.entry.Chunks, chunk)
|
f.entry.Chunks = append(f.entry.Chunks, chunk)
|
||||||
dir, _ := filer2.FullPath(f.name).DirAndName()
|
dir, _ := filer2.FullPath(f.name).DirAndName()
|
||||||
|
|
||||||
err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
err = f.fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
f.entry.Attributes.Mtime = time.Now().Unix()
|
f.entry.Attributes.Mtime = time.Now().Unix()
|
||||||
|
|
||||||
request := &filer_pb.UpdateEntryRequest{
|
request := &filer_pb.UpdateEntryRequest{
|
||||||
|
|
|
@ -56,7 +56,7 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption
|
||||||
|
|
||||||
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
|
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
|
||||||
|
|
||||||
err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
|
|
||||||
if targetServer.info.Id != existingLocation {
|
if targetServer.info.Id != existingLocation {
|
||||||
|
|
||||||
|
@ -216,7 +216,7 @@ func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOpt
|
||||||
|
|
||||||
fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
|
fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
|
||||||
|
|
||||||
return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
|
_, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
Collection: collection,
|
Collection: collection,
|
||||||
|
@ -232,7 +232,7 @@ func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
|
||||||
|
|
||||||
fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
|
fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
|
||||||
|
|
||||||
return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, deleteErr := volumeServerClient.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{
|
_, deleteErr := volumeServerClient.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
ShardIds: toBeUnmountedhardIds,
|
ShardIds: toBeUnmountedhardIds,
|
||||||
|
@ -246,7 +246,7 @@ func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
|
||||||
|
|
||||||
fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
|
fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
|
||||||
|
|
||||||
return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
|
_, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
Collection: collection,
|
Collection: collection,
|
||||||
|
|
|
@ -99,7 +99,7 @@ func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb
|
||||||
func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error {
|
func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error {
|
||||||
|
|
||||||
// mount volume
|
// mount volume
|
||||||
if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{
|
_, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{
|
||||||
VolumeId: uint32(vid),
|
VolumeId: uint32(vid),
|
||||||
})
|
})
|
||||||
|
@ -132,7 +132,7 @@ func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, v
|
||||||
|
|
||||||
fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
|
fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
|
||||||
|
|
||||||
err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, genErr := volumeServerClient.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{
|
_, genErr := volumeServerClient.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{
|
||||||
VolumeId: uint32(vid),
|
VolumeId: uint32(vid),
|
||||||
Collection: collection,
|
Collection: collection,
|
||||||
|
@ -170,7 +170,7 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
|
|
||||||
fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
|
fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
|
||||||
|
|
||||||
|
|
|
@ -120,7 +120,7 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol
|
||||||
|
|
||||||
for _, location := range locations {
|
for _, location := range locations {
|
||||||
|
|
||||||
err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
_, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
})
|
})
|
||||||
|
@ -138,7 +138,7 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol
|
||||||
|
|
||||||
func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
|
func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
|
||||||
|
|
||||||
err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
|
_, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
Collection: collection,
|
Collection: collection,
|
||||||
|
|
|
@ -170,7 +170,7 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *
|
||||||
func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption,
|
func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption,
|
||||||
collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) {
|
collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) {
|
||||||
|
|
||||||
err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(ctx, &volume_server_pb.VolumeEcShardsRebuildRequest{
|
resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(ctx, &volume_server_pb.VolumeEcShardsRebuildRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
Collection: collection,
|
Collection: collection,
|
||||||
|
@ -209,7 +209,7 @@ func prepareDataToRecover(ctx context.Context, commandEnv *CommandEnv, rebuilder
|
||||||
|
|
||||||
var copyErr error
|
var copyErr error
|
||||||
if applyBalancing {
|
if applyBalancing {
|
||||||
copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
|
_, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
Collection: collection,
|
Collection: collection,
|
||||||
|
|
|
@ -46,7 +46,7 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write
|
||||||
|
|
||||||
dir, name := filer2.FullPath(path).DirAndName()
|
dir, name := filer2.FullPath(path).DirAndName()
|
||||||
|
|
||||||
return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
|
return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.LookupDirectoryEntryRequest{
|
request := &filer_pb.LookupDirectoryEntryRequest{
|
||||||
Name: name,
|
Name: name,
|
||||||
|
|
|
@ -82,12 +82,12 @@ func duTraverseDirectory(ctx context.Context, writer io.Writer, filerClient file
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error {
|
func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
|
||||||
|
|
||||||
filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000)
|
filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000)
|
||||||
return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
|
return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
|
||||||
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
||||||
return fn(client)
|
return fn(ctx2, client)
|
||||||
}, filerGrpcAddress, env.option.GrpcDialOption)
|
}, filerGrpcAddress, env.option.GrpcDialOption)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -105,6 +105,6 @@ func (env *CommandEnv) getFilerClient(filerServer string, filerPort int64) *comm
|
||||||
filerPort: filerPort,
|
filerPort: filerPort,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func (c *commandFilerClient) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
|
func (c *commandFilerClient) WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
|
||||||
return c.env.withFilerClient(ctx, c.filerServer, c.filerPort, fn)
|
return c.env.withFilerClient(ctx, c.filerServer, c.filerPort, fn)
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W
|
||||||
|
|
||||||
dir, name := filer2.FullPath(path).DirAndName()
|
dir, name := filer2.FullPath(path).DirAndName()
|
||||||
|
|
||||||
return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
|
return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.LookupDirectoryEntryRequest{
|
request := &filer_pb.LookupDirectoryEntryRequest{
|
||||||
Name: name,
|
Name: name,
|
||||||
|
|
|
@ -55,7 +55,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
|
err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
sizeBuf := make([]byte, 4)
|
sizeBuf := make([]byte, 4)
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer
|
||||||
|
|
||||||
destinationDir, destinationName := filer2.FullPath(destinationPath).DirAndName()
|
destinationDir, destinationName := filer2.FullPath(destinationPath).DirAndName()
|
||||||
|
|
||||||
return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
|
return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
// collect destination entry info
|
// collect destination entry info
|
||||||
destinationRequest := &filer_pb.LookupDirectoryEntryRequest{
|
destinationRequest := &filer_pb.LookupDirectoryEntryRequest{
|
||||||
|
|
|
@ -113,7 +113,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
|
_, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
|
||||||
VolumeId: volumeInfo.Id,
|
VolumeId: volumeInfo.Id,
|
||||||
SourceDataNode: sourceNode.dataNode.Id,
|
SourceDataNode: sourceNode.dataNode.Id,
|
||||||
|
|
|
@ -51,7 +51,7 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io
|
||||||
}
|
}
|
||||||
|
|
||||||
func mountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
|
func mountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
|
||||||
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{
|
_, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
})
|
})
|
||||||
|
|
|
@ -88,7 +88,7 @@ func LiveMoveVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeI
|
||||||
|
|
||||||
func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) {
|
func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) {
|
||||||
|
|
||||||
err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
resp, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
|
resp, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
SourceDataNode: sourceVolumeServer,
|
SourceDataNode: sourceVolumeServer,
|
||||||
|
@ -104,7 +104,7 @@ func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId ne
|
||||||
|
|
||||||
func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
|
func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
|
||||||
|
|
||||||
return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, replicateErr := volumeServerClient.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{
|
_, replicateErr := volumeServerClient.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
SinceNs: lastAppendAtNs,
|
SinceNs: lastAppendAtNs,
|
||||||
|
@ -117,7 +117,7 @@ func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId ne
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
|
func deleteVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
|
||||||
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, deleteErr := volumeServerClient.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{
|
_, deleteErr := volumeServerClient.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
})
|
})
|
||||||
|
|
|
@ -118,7 +118,7 @@ func doVolumeTierDownload(ctx context.Context, commandEnv *CommandEnv, writer io
|
||||||
|
|
||||||
func downloadDatFromRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error {
|
func downloadDatFromRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error {
|
||||||
|
|
||||||
err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(ctx, &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{
|
stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(ctx, &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
Collection: collection,
|
Collection: collection,
|
||||||
|
|
|
@ -114,7 +114,7 @@ func doVolumeTierUpload(ctx context.Context, commandEnv *CommandEnv, writer io.W
|
||||||
|
|
||||||
func uploadDatToRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error {
|
func uploadDatToRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error {
|
||||||
|
|
||||||
err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(ctx, &volume_server_pb.VolumeTierMoveDatToRemoteRequest{
|
stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(ctx, &volume_server_pb.VolumeTierMoveDatToRemoteRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
Collection: collection,
|
Collection: collection,
|
||||||
|
|
|
@ -51,7 +51,7 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer
|
||||||
}
|
}
|
||||||
|
|
||||||
func unmountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
|
func unmountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
|
||||||
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, unmountErr := volumeServerClient.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{
|
_, unmountErr := volumeServerClient.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
})
|
})
|
||||||
|
|
|
@ -70,7 +70,7 @@ func (ce *CommandEnv) checkDirectory(ctx context.Context, filerServer string, fi
|
||||||
|
|
||||||
dir, name := filer2.FullPath(path).DirAndName()
|
dir, name := filer2.FullPath(path).DirAndName()
|
||||||
|
|
||||||
return ce.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
|
return ce.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
resp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
|
resp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
|
||||||
Directory: dir,
|
Directory: dir,
|
||||||
|
|
|
@ -230,7 +230,7 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras
|
||||||
|
|
||||||
glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId)
|
glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId)
|
||||||
|
|
||||||
err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error {
|
||||||
req := &master_pb.LookupEcVolumeRequest{
|
req := &master_pb.LookupEcVolumeRequest{
|
||||||
VolumeId: uint32(ecVolume.VolumeId),
|
VolumeId: uint32(ecVolume.VolumeId),
|
||||||
}
|
}
|
||||||
|
@ -278,7 +278,7 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [
|
||||||
|
|
||||||
func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
|
func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
|
||||||
|
|
||||||
err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
|
||||||
|
|
||||||
// copy data slice
|
// copy data slice
|
||||||
shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
|
shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
|
||||||
|
|
|
@ -87,7 +87,7 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shar
|
||||||
|
|
||||||
func (s *Store) doDeleteNeedleFromRemoteEcShard(ctx context.Context, sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
|
func (s *Store) doDeleteNeedleFromRemoteEcShard(ctx context.Context, sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
|
||||||
|
|
||||||
return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
|
||||||
|
|
||||||
// copy data slice
|
// copy data slice
|
||||||
_, err := client.VolumeEcBlobDelete(ctx, &volume_server_pb.VolumeEcBlobDeleteRequest{
|
_, err := client.VolumeEcBlobDelete(ctx, &volume_server_pb.VolumeEcBlobDeleteRequest{
|
||||||
|
|
|
@ -64,8 +64,6 @@ update needle map when receiving new .dat bytes. But seems not necessary now.)
|
||||||
|
|
||||||
func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error {
|
func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error {
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
startFromOffset, _, _ := v.FileStat()
|
startFromOffset, _, _ := v.FileStat()
|
||||||
appendAtNs, err := v.findLastAppendAtNs()
|
appendAtNs, err := v.findLastAppendAtNs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -74,7 +72,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
|
||||||
|
|
||||||
writeOffset := int64(startFromOffset)
|
writeOffset := int64(startFromOffset)
|
||||||
|
|
||||||
err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
|
||||||
|
|
||||||
stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{
|
stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{
|
||||||
VolumeId: uint32(v.Id),
|
VolumeId: uint32(v.Id),
|
||||||
|
|
|
@ -15,7 +15,7 @@ type AllocateVolumeResult struct {
|
||||||
|
|
||||||
func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error {
|
func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error {
|
||||||
|
|
||||||
return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
|
||||||
|
|
||||||
_, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{
|
_, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{
|
||||||
VolumeId: uint32(vid),
|
VolumeId: uint32(vid),
|
||||||
|
|
|
@ -19,8 +19,8 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi
|
||||||
errCount := int32(0)
|
errCount := int32(0)
|
||||||
for index, dn := range locationlist.list {
|
for index, dn := range locationlist.list {
|
||||||
go func(index int, url string, vid needle.VolumeId) {
|
go func(index int, url string, vid needle.VolumeId) {
|
||||||
err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err := operation.WithVolumeServerClient(url, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
|
resp, err := volumeServerClient.VacuumVolumeCheck(ctx, &volume_server_pb.VacuumVolumeCheckRequest{
|
||||||
VolumeId: uint32(vid),
|
VolumeId: uint32(vid),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -63,8 +63,8 @@ func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout,
|
||||||
for index, dn := range locationlist.list {
|
for index, dn := range locationlist.list {
|
||||||
go func(index int, url string, vid needle.VolumeId) {
|
go func(index int, url string, vid needle.VolumeId) {
|
||||||
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
|
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
|
||||||
err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err := operation.WithVolumeServerClient(url, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
|
_, err := volumeServerClient.VacuumVolumeCompact(ctx, &volume_server_pb.VacuumVolumeCompactRequest{
|
||||||
VolumeId: uint32(vid),
|
VolumeId: uint32(vid),
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
|
@ -93,8 +93,8 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v
|
||||||
isCommitSuccess := true
|
isCommitSuccess := true
|
||||||
for _, dn := range locationlist.list {
|
for _, dn := range locationlist.list {
|
||||||
glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
|
glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
|
||||||
err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
|
_, err := volumeServerClient.VacuumVolumeCommit(ctx, &volume_server_pb.VacuumVolumeCommitRequest{
|
||||||
VolumeId: uint32(vid),
|
VolumeId: uint32(vid),
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
|
@ -114,8 +114,8 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v
|
||||||
func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
|
func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
|
||||||
for _, dn := range locationlist.list {
|
for _, dn := range locationlist.list {
|
||||||
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
|
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
|
||||||
err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
|
_, err := volumeServerClient.VacuumVolumeCleanup(ctx, &volume_server_pb.VacuumVolumeCleanupRequest{
|
||||||
VolumeId: uint32(vid),
|
VolumeId: uint32(vid),
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -57,14 +57,14 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr
|
||||||
return grpc.DialContext(ctx, address, options...)
|
return grpc.DialContext(ctx, address, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithCachedGrpcClient(ctx context.Context, fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
|
func WithCachedGrpcClient(ctx context.Context, fn func(context.Context, *grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
|
||||||
|
|
||||||
grpcClientsLock.Lock()
|
grpcClientsLock.Lock()
|
||||||
|
|
||||||
existingConnection, found := grpcClients[address]
|
existingConnection, found := grpcClients[address]
|
||||||
if found {
|
if found {
|
||||||
grpcClientsLock.Unlock()
|
grpcClientsLock.Unlock()
|
||||||
err := fn(existingConnection)
|
err := fn(ctx, existingConnection)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
grpcClientsLock.Lock()
|
grpcClientsLock.Lock()
|
||||||
delete(grpcClients, address)
|
delete(grpcClients, address)
|
||||||
|
@ -83,7 +83,7 @@ func WithCachedGrpcClient(ctx context.Context, fn func(*grpc.ClientConn) error,
|
||||||
grpcClients[address] = grpcConnection
|
grpcClients[address] = grpcConnection
|
||||||
grpcClientsLock.Unlock()
|
grpcClientsLock.Unlock()
|
||||||
|
|
||||||
err = fn(grpcConnection)
|
err = fn(ctx, grpcConnection)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
grpcClientsLock.Lock()
|
grpcClientsLock.Lock()
|
||||||
delete(grpcClients, address)
|
delete(grpcClients, address)
|
||||||
|
|
|
@ -125,9 +125,9 @@ func withMasterClient(ctx context.Context, master string, grpcDialOption grpc.Di
|
||||||
return fmt.Errorf("failed to parse master grpc %v: %v", master, parseErr)
|
return fmt.Errorf("failed to parse master grpc %v: %v", master, parseErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
|
return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
|
||||||
client := master_pb.NewSeaweedClient(grpcConnection)
|
client := master_pb.NewSeaweedClient(grpcConnection)
|
||||||
return fn(ctx, client)
|
return fn(ctx2, client)
|
||||||
}, masterGrpcAddress, grpcDialOption)
|
}, masterGrpcAddress, grpcDialOption)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue