From 99f037b958b5952628d281461d3bfb76fa433d8c Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 23 Aug 2023 00:31:33 -0700 Subject: [PATCH] streaming assign file ids --- weed/operation/assign_file_id.go | 105 +++++++++++++++++++++++++- weed/operation/assign_file_id_test.go | 68 +++++++++++++++++ weed/server/filer_server.go | 7 +- 3 files changed, 176 insertions(+), 4 deletions(-) create mode 100644 weed/operation/assign_file_id_test.go diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index c2f5a806d..a4753d234 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -4,11 +4,10 @@ import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/storage/needle" - "google.golang.org/grpc" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" ) type VolumeAssignRequest struct { @@ -34,6 +33,106 @@ type AssignResult struct { Replicas []Location `json:"replicas,omitempty"` } +// This is a proxy to the master server, only for assigning volume ids. +// It runs via grpc to the master server in streaming mode. +// The connection to the master would only be re-established when the last connection has error. +type AssignProxy struct { + grpcConnection *grpc.ClientConn + pool chan *singleThreadAssignProxy +} + +func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concurrency int) (ap *AssignProxy, err error) { + ap = &AssignProxy{ + pool: make(chan *singleThreadAssignProxy, concurrency), + } + ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn().ToGrpcAddress(), true, grpcDialOption) + if err != nil { + return nil, fmt.Errorf("fail to dial %s: %v", masterFn().ToGrpcAddress(), err) + } + for i := 0; i < concurrency; i++ { + ap.pool <- &singleThreadAssignProxy{} + } + return ap, nil +} + +func (ap *AssignProxy) Assign(primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) { + p := <-ap.pool + defer func() { + ap.pool <- p + }() + + return p.doAssign(ap.grpcConnection, primaryRequest, alternativeRequests...) +} + +type singleThreadAssignProxy struct { + assignClient master_pb.Seaweed_StreamAssignClient +} + +func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) { + if ap.assignClient == nil { + client := master_pb.NewSeaweedClient(grpcConnection) + ap.assignClient, err = client.StreamAssign(context.Background()) + if err != nil { + ap.assignClient = nil + return nil, fmt.Errorf("fail to create stream assign client: %v", err) + } + } + + var requests []*VolumeAssignRequest + requests = append(requests, primaryRequest) + requests = append(requests, alternativeRequests...) + ret = &AssignResult{} + + for _, request := range requests { + if request == nil { + continue + } + req := &master_pb.AssignRequest{ + Count: request.Count, + Replication: request.Replication, + Collection: request.Collection, + Ttl: request.Ttl, + DiskType: request.DiskType, + DataCenter: request.DataCenter, + Rack: request.Rack, + DataNode: request.DataNode, + WritableVolumeCount: request.WritableVolumeCount, + } + if err = ap.assignClient.Send(req); err != nil { + return nil, fmt.Errorf("StreamAssignSend: %v", err) + } + resp, grpcErr := ap.assignClient.Recv() + if grpcErr != nil { + return nil, grpcErr + } + if resp.Error != "" { + return nil, fmt.Errorf("StreamAssignRecv: %v", resp.Error) + } + + ret.Count = resp.Count + ret.Fid = resp.Fid + ret.Url = resp.Location.Url + ret.PublicUrl = resp.Location.PublicUrl + ret.GrpcPort = int(resp.Location.GrpcPort) + ret.Error = resp.Error + ret.Auth = security.EncodedJwt(resp.Auth) + for _, r := range resp.Replicas { + ret.Replicas = append(ret.Replicas, Location{ + Url: r.Url, + PublicUrl: r.PublicUrl, + DataCenter: r.DataCenter, + }) + } + + if ret.Count <= 0 { + continue + } + break + } + + return +} + func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { var requests []*VolumeAssignRequest diff --git a/weed/operation/assign_file_id_test.go b/weed/operation/assign_file_id_test.go new file mode 100644 index 000000000..f6362dceb --- /dev/null +++ b/weed/operation/assign_file_id_test.go @@ -0,0 +1,68 @@ +package operation + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" + "google.golang.org/grpc" + "testing" + "time" +) + +func BenchmarkWithConcurrency(b *testing.B) { + concurrencyLevels := []int{1, 10, 100, 1000} + + ap, _ := NewAssignProxy(func() pb.ServerAddress { + return pb.ServerAddress("localhost:9333") + }, grpc.WithInsecure(), 16) + + for _, concurrency := range concurrencyLevels { + b.Run( + fmt.Sprintf("Concurrency-%d", concurrency), + func(b *testing.B) { + for i := 0; i < b.N; i++ { + done := make(chan struct{}) + startTime := time.Now() + + for j := 0; j < concurrency; j++ { + go func() { + + ap.Assign(&VolumeAssignRequest{ + Count: 1, + }) + + done <- struct{}{} + }() + } + + for j := 0; j < concurrency; j++ { + <-done + } + + duration := time.Since(startTime) + b.Logf("Concurrency: %d, Duration: %v", concurrency, duration) + } + }, + ) + } +} + +func BenchmarkStreamAssign(b *testing.B) { + ap, _ := NewAssignProxy(func() pb.ServerAddress { + return pb.ServerAddress("localhost:9333") + }, grpc.WithInsecure(), 16) + for i := 0; i < b.N; i++ { + ap.Assign(&VolumeAssignRequest{ + Count: 1, + }) + } +} + +func BenchmarkUnaryAssign(b *testing.B) { + for i := 0; i < b.N; i++ { + Assign(func() pb.ServerAddress { + return pb.ServerAddress("localhost:9333") + }, grpc.WithInsecure(), &VolumeAssignRequest{ + Count: 1, + }) + } +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 8a6d341bb..8e40b2145 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -94,6 +94,9 @@ type FilerServer struct { // track known metadata listeners knownListenersLock sync.Mutex knownListeners map[int32]int32 + + // client to assign file id + assignProxy *operation.AssignProxy } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { @@ -131,6 +134,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec) go fs.filer.KeepMasterClientConnected() + fs.assignProxy, err = operation.NewAssignProxy(fs.filer.GetMaster, fs.grpcDialOption, 16) + if !util.LoadConfiguration("filer", false) { v.SetDefault("leveldb2.enabled", true) v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir) @@ -183,7 +188,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot) - return fs, nil + return fs, err } func (fs *FilerServer) checkWithMaster() {