retry context canceled request

This commit is contained in:
Chris Lu 2020-01-24 01:40:51 -08:00
parent 2f75264ec7
commit 107e8a56ea
2 changed files with 27 additions and 6 deletions

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"math" "math"
"os" "os"
"strings"
"sync" "sync"
"time" "time"
@ -47,8 +48,8 @@ type WFS struct {
listDirectoryEntriesCache *ccache.Cache listDirectoryEntriesCache *ccache.Cache
// contains all open handles, protected by handlesLock // contains all open handles, protected by handlesLock
handlesLock sync.Mutex handlesLock sync.Mutex
handles []*FileHandle handles []*FileHandle
pathToHandleIndex map[filer2.FullPath]int pathToHandleIndex map[filer2.FullPath]int
bufPool sync.Pool bufPool sync.Pool
@ -89,11 +90,24 @@ func (wfs *WFS) Root() (fs.Node, error) {
func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { err := util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection) client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client) return fn(client)
}, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
if err == nil {
return nil
}
if strings.Contains(err.Error(), "context canceled") {
time.Sleep(1337 * time.Millisecond)
glog.V(2).Infoln("retry context canceled request...")
return util.WithCachedGrpcClient(context.Background(), func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
}
return err
} }
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) { func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
@ -116,7 +130,7 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand
wfs.handles[i] = fileHandle wfs.handles[i] = fileHandle
fileHandle.handle = uint64(i) fileHandle.handle = uint64(i)
wfs.pathToHandleIndex[fullpath] = i wfs.pathToHandleIndex[fullpath] = i
glog.V(4).Infof( "%s reuse fh %d", fullpath,fileHandle.handle) glog.V(4).Infof("%s reuse fh %d", fullpath, fileHandle.handle)
return return
} }
} }
@ -124,7 +138,7 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand
wfs.handles = append(wfs.handles, fileHandle) wfs.handles = append(wfs.handles, fileHandle)
fileHandle.handle = uint64(len(wfs.handles) - 1) fileHandle.handle = uint64(len(wfs.handles) - 1)
wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle) wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
glog.V(4).Infof( "%s new fh %d", fullpath,fileHandle.handle) glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle)
return return
} }

View file

@ -64,7 +64,14 @@ func WithCachedGrpcClient(ctx context.Context, fn func(*grpc.ClientConn) error,
existingConnection, found := grpcClients[address] existingConnection, found := grpcClients[address]
if found { if found {
grpcClientsLock.Unlock() grpcClientsLock.Unlock()
return fn(existingConnection) err := fn(existingConnection)
if err != nil {
grpcClientsLock.Lock()
delete(grpcClients, address)
grpcClientsLock.Unlock()
existingConnection.Close()
}
return err
} }
grpcConnection, err := GrpcDial(ctx, address, opts...) grpcConnection, err := GrpcDial(ctx, address, opts...)