diff --git a/weed/cluster/master_client.go b/weed/cluster/master_client.go index d1e769c1d..15009e132 100644 --- a/weed/cluster/master_client.go +++ b/weed/cluster/master_client.go @@ -2,6 +2,7 @@ package cluster import ( "context" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -10,7 +11,7 @@ import ( func ListExistingPeerUpdates(master pb.ServerAddress, grpcDialOption grpc.DialOption, filerGroup string, clientType string) (existingNodes []*master_pb.ClusterNodeUpdate) { - if grpcErr := pb.WithMasterClient(false, master, grpcDialOption, func(client master_pb.SeaweedClient) error { + if grpcErr := pb.WithMasterClient(false, master, grpcDialOption, false, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: clientType, FilerGroup: filerGroup, diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 977e2a6b8..db2d8c42c 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -572,7 +572,7 @@ func (worker *FileCopyWorker) WithFilerClient(streamingMode bool, fn func(filer_ err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, filerGrpcAddress, worker.options.grpcDialOption) + }, filerGrpcAddress, false, worker.options.grpcDialOption) return } diff --git a/weed/command/upload.go b/weed/command/upload.go index 389a72552..1f03f7b5a 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -4,11 +4,12 @@ import ( "context" "encoding/json" "fmt" + "os" + "path/filepath" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "google.golang.org/grpc" - "os" - "path/filepath" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/security" @@ -130,7 +131,7 @@ func runUpload(cmd *Command, args []string) bool { } func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress pb.ServerAddress) (replication string, err error) { - err = pb.WithMasterClient(false, masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error { + err = pb.WithMasterClient(false, masterAddress, grpcDialOption, false, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", masterAddress, err) diff --git a/weed/mount/wfs_filer_client.go b/weed/mount/wfs_filer_client.go index 020970df7..e991d8b39 100644 --- a/weed/mount/wfs_filer_client.go +++ b/weed/mount/wfs_filer_client.go @@ -25,7 +25,7 @@ func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFile err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, filerGrpcAddress, wfs.option.GrpcDialOption) + }, filerGrpcAddress, false, wfs.option.GrpcDialOption) if err != nil { glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err) diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index c107dbe45..89afb6e4d 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -1,11 +1,12 @@ package broker import ( + "time" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/wdclient" "google.golang.org/grpc" - "time" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -100,7 +101,7 @@ func (broker *MessageQueueBroker) GetDataCenter() string { func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { - return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { + return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, false, func(client master_pb.SeaweedClient) error { return fn(client) }) diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index c06e501a5..c1f2bba82 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -13,7 +13,7 @@ func WithVolumeServerClient(streamingMode bool, volumeServer pb.ServerAddress, g return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) - }, volumeServer.ToGrpcAddress(), grpcDialOption) + }, volumeServer.ToGrpcAddress(), false, grpcDialOption) } @@ -22,6 +22,6 @@ func WithMasterServerClient(streamingMode bool, masterServer pb.ServerAddress, g return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) - }, masterServer.ToGrpcAddress(), grpcDialOption) + }, masterServer.ToGrpcAddress(), false, grpcDialOption) } diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index a78ed0ca4..f3cca7fba 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -3,9 +3,6 @@ package pb import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" - "github.com/seaweedfs/seaweedfs/weed/util" "math/rand" "net/http" "strconv" @@ -13,6 +10,10 @@ import ( "sync" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -65,15 +66,17 @@ func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server { return grpc.NewServer(options...) } -func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { +func GrpcDial(ctx context.Context, address string, waitForReady bool, opts ...grpc.DialOption) (*grpc.ClientConn, error) { // opts = append(opts, grpc.WithBlock()) // opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second))) var options []grpc.DialOption + options = append(options, // grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(Max_Message_Size), grpc.MaxCallRecvMsgSize(Max_Message_Size), + grpc.WaitForReady(waitForReady), ), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 30 * time.Second, // client ping server if no activity for this long @@ -88,7 +91,7 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr return grpc.DialContext(ctx, address, options...) } -func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedGrpcClient, error) { +func getOrCreateConnection(address string, waitForReady bool, opts ...grpc.DialOption) (*versionedGrpcClient, error) { grpcClientsLock.Lock() defer grpcClientsLock.Unlock() @@ -99,7 +102,7 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG } ctx := context.Background() - grpcConnection, err := GrpcDial(ctx, address, opts...) + grpcConnection, err := GrpcDial(ctx, address, waitForReady, opts...) if err != nil { return nil, fmt.Errorf("fail to dial %s: %v", address, err) } @@ -115,10 +118,10 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG } // WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection. -func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { +func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address string, waitForReady bool, opts ...grpc.DialOption) error { if !streamingMode { - vgc, err := getOrCreateConnection(address, opts...) + vgc, err := getOrCreateConnection(address, waitForReady, opts...) if err != nil { return fmt.Errorf("getOrCreateConnection %s: %v", address, err) } @@ -138,7 +141,7 @@ func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address } return executionErr } else { - grpcConnection, err := GrpcDial(context.Background(), address, opts...) + grpcConnection, err := GrpcDial(context.Background(), address, waitForReady, opts...) if err != nil { return fmt.Errorf("fail to dial %s: %v", address, err) } @@ -200,11 +203,11 @@ func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) { return util.JoinHostPort(host, port) } -func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error { +func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, waitForReady bool, fn func(client master_pb.SeaweedClient) error) error { return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) - }, master.ToGrpcAddress(), grpcDialOption) + }, master.ToGrpcAddress(), waitForReady, grpcDialOption) } @@ -212,7 +215,7 @@ func WithVolumeServerClient(streamingMode bool, volumeServer ServerAddress, grpc return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) - }, volumeServer.ToGrpcAddress(), grpcDialOption) + }, volumeServer.ToGrpcAddress(), false, grpcDialOption) } @@ -220,7 +223,7 @@ func WithBrokerClient(streamingMode bool, broker ServerAddress, grpcDialOption g return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := mq_pb.NewSeaweedMessagingClient(grpcConnection) return fn(client) - }, broker.ToGrpcAddress(), grpcDialOption) + }, broker.ToGrpcAddress(), false, grpcDialOption) } @@ -230,7 +233,7 @@ func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[stri err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) - }, masterGrpcAddress.ToGrpcAddress(), grpcDialOption) + }, masterGrpcAddress.ToGrpcAddress(), false, grpcDialOption) if err == nil { return nil } @@ -244,7 +247,7 @@ func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDial return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := mq_pb.NewSeaweedMessagingClient(grpcConnection) return fn(client) - }, brokerGrpcAddress, grpcDialOption) + }, brokerGrpcAddress, false, grpcDialOption) } @@ -259,7 +262,7 @@ func WithGrpcFilerClient(streamingMode bool, filerGrpcAddress ServerAddress, grp return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, filerGrpcAddress.ToGrpcAddress(), grpcDialOption) + }, filerGrpcAddress.ToGrpcAddress(), false, grpcDialOption) } @@ -269,7 +272,7 @@ func WithOneOfGrpcFilerClients(streamingMode bool, filerAddresses []ServerAddres err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, filerAddress.ToGrpcAddress(), grpcDialOption) + }, filerAddress.ToGrpcAddress(), false, grpcDialOption) if err == nil { return nil } diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index d1a5d7ebd..c0321039b 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -2,9 +2,10 @@ package filersink import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/util" "sync" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -113,7 +114,7 @@ func (fs *FilerSink) WithFilerClient(streamingMode bool, fn func(filer_pb.Seawee return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, fs.grpcAddress, fs.grpcDialOption) + }, fs.grpcAddress, false, fs.grpcDialOption) } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 6c69b735c..2da883ba6 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -131,7 +131,7 @@ func (fs *FilerSource) WithFilerClient(streamingMode bool, fn func(filer_pb.Seaw return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, fs.grpcAddress, fs.grpcDialOption) + }, fs.grpcAddress, false, fs.grpcDialOption) } diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index 4e1dd09a7..b85ff485d 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -3,9 +3,10 @@ package s3api import ( "encoding/base64" "fmt" + "net/http" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "google.golang.org/grpc" - "net/http" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -18,7 +19,7 @@ func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.Sea return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, s3a.option.Filer.ToGrpcAddress(), s3a.option.GrpcDialOption) + }, s3a.option.Filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption) } diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go index 50c52c650..32cb2830d 100644 --- a/weed/server/filer_grpc_server_admin.go +++ b/weed/server/filer_grpc_server_admin.go @@ -3,6 +3,8 @@ package weed_server import ( "context" "fmt" + "time" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -10,7 +12,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/util" - "time" ) func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) { @@ -66,7 +67,7 @@ func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (res }) } if req.TargetType == cluster.MasterType { - pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client master_pb.SeaweedClient) error { + pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, false, func(client master_pb.SeaweedClient) error { pingResp, err := client.Ping(ctx, &master_pb.PingRequest{}) if pingResp != nil { resp.RemoteTimeNs = pingResp.StartTimeNs diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go index 1dd89ad60..fb2c5bd50 100644 --- a/weed/server/master_grpc_server_admin.go +++ b/weed/server/master_grpc_server_admin.go @@ -3,15 +3,16 @@ package weed_server import ( "context" "fmt" + "math/rand" + "sync" + "time" + "github.com/seaweedfs/raft" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" - "math/rand" - "sync" - "time" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) @@ -175,7 +176,7 @@ func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (r }) } if req.TargetType == cluster.MasterType { - pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client master_pb.SeaweedClient) error { + pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, false, func(client master_pb.SeaweedClient) error { pingResp, err := client.Ping(ctx, &master_pb.PingRequest{}) if pingResp != nil { resp.RemoteTimeNs = pingResp.StartTimeNs diff --git a/weed/server/master_server.go b/weed/server/master_server.go index fbc27e610..9adcafc6f 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,8 +1,8 @@ package weed_server import ( + "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/stats" "net/http" "net/http/httputil" "net/url" @@ -12,6 +12,8 @@ import ( "sync" "time" + "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -242,7 +244,6 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { } func (ms *MasterServer) startAdminScripts() { - v := util.GetViper() adminScripts := v.GetString("master.maintenance.scripts") if adminScripts == "" { @@ -342,8 +343,10 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF peerAddress := pb.ServerAddress(update.Address) peerName := string(peerAddress) - isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader - if update.IsAdd && isLeader { + if ms.Topo.HashicorpRaft.State() != hashicorpRaft.Leader { + return + } + if update.IsAdd { raftServerFound := false for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { if string(server.ID) == peerName { @@ -356,5 +359,27 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF hashicorpRaft.ServerID(peerName), hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) } + } else { + pb.WithMasterClient(false, peerAddress, ms.grpcDialOption, true, func(client master_pb.SeaweedClient) error { + ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*72) + defer cancel() + if _, err := client.Ping(ctx, &master_pb.PingRequest{Target: string(peerAddress), TargetType: cluster.MasterType}); err != nil { + glog.V(0).Infof("master %s didn't respond to pings. remove raft server", peerName) + if err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ + Id: peerName, + Force: false, + }) + return err + }); err != nil { + glog.Warningf("failed removing old raft server: %v", err) + return err + } + } else { + glog.V(0).Infof("master %s successfully responded to ping", peerName) + } + + return nil + }) } } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index c570ae2df..aace63fd8 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -3,13 +3,14 @@ package weed_server import ( "context" "fmt" + "path/filepath" + "time" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/util" - "path/filepath" - "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" @@ -280,7 +281,7 @@ func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequ }) } if req.TargetType == cluster.MasterType { - pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client master_pb.SeaweedClient) error { + pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error { pingResp, err := client.Ping(ctx, &master_pb.PingRequest{}) if pingResp != nil { resp.RemoteTimeNs = pingResp.StartTimeNs diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index c00524577..e55d821a8 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -94,7 +94,7 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti ctx, cancel := context.WithCancel(context.Background()) defer cancel() - grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), grpcDialOption) + grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), false, grpcDialOption) if err != nil { return "", fmt.Errorf("fail to dial %s : %v", masterAddress, err) } diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index d9928ed18..be152d246 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -3,13 +3,14 @@ package weed_server import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" - "github.com/seaweedfs/seaweedfs/weed/storage/backend" "io" "math" "os" "time" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/backend" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -81,7 +82,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre }() var preallocateSize int64 - if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, func(client master_pb.SeaweedClient) error { + if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(), err) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index eaa373dd0..b874ee9a2 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -127,7 +127,7 @@ func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption) + }, fs.option.Filer.ToGrpcAddress(), false, fs.option.GrpcDialOption) } func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string { diff --git a/weed/shell/command_cluster_check.go b/weed/shell/command_cluster_check.go index fef371b49..2cabf91b8 100644 --- a/weed/shell/command_cluster_check.go +++ b/weed/shell/command_cluster_check.go @@ -4,12 +4,13 @@ import ( "context" "flag" "fmt" + "io" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" - "io" ) func init() { @@ -97,7 +98,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i for _, master := range masters { for _, volumeServer := range volumeServers { fmt.Fprintf(writer, "checking master %s to volume server %s ... ", string(master), string(volumeServer)) - err := pb.WithMasterClient(false, master, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error { + err := pb.WithMasterClient(false, master, commandEnv.option.GrpcDialOption, false, func(client master_pb.SeaweedClient) error { pong, err := client.Ping(context.Background(), &master_pb.PingRequest{ Target: string(volumeServer), TargetType: cluster.VolumeServerType, @@ -120,7 +121,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i continue } fmt.Fprintf(writer, "checking master %s to %s ... ", string(sourceMaster), string(targetMaster)) - err := pb.WithMasterClient(false, sourceMaster, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error { + err := pb.WithMasterClient(false, sourceMaster, commandEnv.option.GrpcDialOption, false, func(client master_pb.SeaweedClient) error { pong, err := client.Ping(context.Background(), &master_pb.PingRequest{ Target: string(targetMaster), TargetType: cluster.MasterType, diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go index 78a5b7157..936f35b46 100644 --- a/weed/shell/command_collection_delete.go +++ b/weed/shell/command_collection_delete.go @@ -4,8 +4,9 @@ import ( "context" "flag" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "io" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) func init() { diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 391dd9199..2583bda80 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -3,10 +3,11 @@ package wdclient import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/stats" "math/rand" "time" + "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" @@ -52,7 +53,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri if err == nil && len(fullUrls) > 0 { return } - err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ VolumeOrFileIds: []string{fileId}, }) @@ -114,7 +115,7 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres if master == myMasterAddress { continue } - if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond) defer cancel() resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{}) @@ -150,7 +151,7 @@ func (mc *MasterClient) tryAllMasters() { func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) { glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master) stats.MasterClientConnectCounter.WithLabelValues("total").Inc() - gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -271,7 +272,7 @@ func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb. for mc.currentMaster == "" { time.Sleep(3 * time.Second) } - return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { return fn(client) }) })