From dc1309f084bf9f420629bdf5c2cbe88c07400930 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 21 May 2021 01:28:00 -0700 Subject: [PATCH] FUSE mount: support multiple filers fix https://github.com/chrislusf/seaweedfs/issues/2015 fix https://github.com/chrislusf/seaweedfs/issues/1531 --- weed/command/mount_std.go | 20 ++++++++-------- weed/filesys/wfs.go | 13 +++++++---- weed/filesys/wfs_filer_client.go | 39 +++++++++++++++++++++++--------- weed/filesys/wfs_write.go | 2 +- weed/pb/grpc_client_server.go | 25 ++++++++++++++++++++ 5 files changed, 72 insertions(+), 27 deletions(-) diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 2474cf7dd..e72a2f2cf 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -51,9 +51,9 @@ func runMount(cmd *Command, args []string) bool { func RunMount(option *MountOptions, umask os.FileMode) bool { - filer := *option.filer + filers := strings.Split(*option.filer, ",") // parse filer grpc address - filerGrpcAddress, err := pb.ParseServerToGrpcAddress(filer) + filerGrpcAddresses, err := pb.ParseServersToGrpcAddresses(filers) if err != nil { glog.V(0).Infof("ParseFilerGrpcAddress: %v", err) return true @@ -64,22 +64,22 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") var cipher bool for i := 0; i < 10; i++ { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithOneOfGrpcFilerClients(filerGrpcAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err) + return fmt.Errorf("get filer grpc address %v configuration: %v", filerGrpcAddresses, err) } cipher = resp.Cipher return nil }) if err != nil { - glog.V(0).Infof("failed to talk to filer %s: %v", filerGrpcAddress, err) + glog.V(0).Infof("failed to talk to filer %v: %v", filerGrpcAddresses, err) glog.V(0).Infof("wait for %d seconds ...", i+1) time.Sleep(time.Duration(i+1) * time.Second) } } if err != nil { - glog.Errorf("failed to talk to filer %s: %v", filerGrpcAddress, err) + glog.Errorf("failed to talk to filer %v: %v", filerGrpcAddresses, err) return true } @@ -145,7 +145,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { options := []fuse.MountOption{ fuse.VolumeName(mountName), - fuse.FSName(filer + ":" + filerMountRootPath), + fuse.FSName(*option.filer + ":" + filerMountRootPath), fuse.Subtype("seaweedfs"), // fuse.NoAppleDouble(), // include .DS_Store, otherwise can not delete non-empty folders fuse.NoAppleXattr(), @@ -181,8 +181,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{ MountDirectory: dir, - FilerAddress: filer, - FilerGrpcAddress: filerGrpcAddress, + FilerAddresses: filers, + FilerGrpcAddresses: filerGrpcAddresses, GrpcDialOption: grpcDialOption, FilerMountRootPath: mountRoot, Collection: *option.collection, @@ -218,7 +218,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { c.Close() }) - glog.V(0).Infof("mounted %s%s to %s", filer, mountRoot, dir) + glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir) server := fs.New(c, nil) seaweedFileSystem.Server = server err = server.Serve(seaweedFileSystem) diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 4096d3595..b634420d6 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -28,8 +28,9 @@ import ( type Option struct { MountDirectory string - FilerAddress string - FilerGrpcAddress string + FilerAddresses []string + filerIndex int + FilerGrpcAddresses []string GrpcDialOption grpc.DialOption FilerMountRootPath string Collection string @@ -95,7 +96,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { }, signature: util.RandomInt32(), } - cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:8] + cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddresses[0] + option.FilerMountRootPath + util.Version()))[0:8] cacheDir := path.Join(option.CacheDir, cacheUniqueId) if option.CacheSizeMB > 0 { os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask) @@ -259,11 +260,13 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) { func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType { if wfs.option.VolumeServerAccess == "filerProxy" { return func(fileId string) (targetUrls []string, err error) { - return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil + return []string{"http://" + wfs.getCurrentFiler() + "/?proxyChunkId=" + fileId}, nil } } return filer.LookupFn(wfs) - +} +func (wfs *WFS) getCurrentFiler() string { + return wfs.option.FilerAddresses[wfs.option.filerIndex] } type NodeWithId uint64 diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go index 671d20ba2..95ebdb9b8 100644 --- a/weed/filesys/wfs_filer_client.go +++ b/weed/filesys/wfs_filer_client.go @@ -1,6 +1,7 @@ package filesys import ( + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" "google.golang.org/grpc" @@ -10,20 +11,36 @@ import ( var _ = filer_pb.FilerClient(&WFS{}) -func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) { - err := util.Retry("filer grpc "+wfs.option.FilerGrpcAddress, func() error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) - }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) + return util.Retry("filer grpc", func() error { + + i := wfs.option.filerIndex + n := len(wfs.option.FilerGrpcAddresses) + for x := 0; x < n; x++ { + + filerGrpcAddress := wfs.option.FilerGrpcAddresses[i] + err = pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, filerGrpcAddress, wfs.option.GrpcDialOption) + + if err != nil { + glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err) + } else { + wfs.option.filerIndex = i + return nil + } + + i++ + if i >= n { + i = 0 + } + + } + return err }) - if err == nil { - return nil - } - return err - } func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string { diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go index 730578202..42c13cfd0 100644 --- a/weed/filesys/wfs_write.go +++ b/weed/filesys/wfs_write.go @@ -56,7 +56,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.Sa fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) if wfs.option.VolumeServerAccess == "filerProxy" { - fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId) + fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId) } uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth) if err != nil { diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index 9efcd9bdc..cdac0ba99 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -111,6 +111,16 @@ func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) { return ParseServerAddress(server, 10000) } +func ParseServersToGrpcAddresses(servers []string) (serverGrpcAddresses []string, err error) { + for _, server := range servers { + if serverGrpcAddress, parseErr := ParseServerToGrpcAddress(server); parseErr == nil { + serverGrpcAddresses = append(serverGrpcAddresses, serverGrpcAddress) + } else { + return nil, parseErr + } + } + return +} func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) { @@ -202,3 +212,18 @@ func WithGrpcFilerClient(filerGrpcAddress string, grpcDialOption grpc.DialOption }, filerGrpcAddress, grpcDialOption) } + +func WithOneOfGrpcFilerClients(filerGrpcAddresses []string, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) { + + for _, filerGrpcAddress := range filerGrpcAddresses { + err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, filerGrpcAddress, grpcDialOption) + if err == nil { + return nil + } + } + + return err +}