From 332d49432dfc5328b803ddbe540704d303500cfd Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 5 Oct 2021 01:58:30 -0700 Subject: [PATCH] reduce concurrent volume grow requests --- weed/server/master_grpc_server_volume.go | 4 +++- weed/server/master_server_handlers.go | 3 ++- weed/topology/volume_layout.go | 17 +++++++++++++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 95fdc3fd1..551e59990 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -54,6 +54,7 @@ func (ms *MasterServer) ProcessGrowRequest() { start := time.Now() _, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count) glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start)) + vl.DoneGrowRequest() if req.ErrCh != nil { req.ErrCh <- err @@ -135,10 +136,11 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) - if vl.ShouldGrowVolumes(option) { + if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) { if ms.Topo.AvailableSpaceFor(option) <= 0 { return nil, fmt.Errorf("no free volumes left for " + option.String()) } + vl.AddGrowRequest() ms.vgCh <- &topology.VolumeGrowRequest{ Option: option, Count: int(req.WritableVolumeCount), diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index 8187ca21f..50a3f12f6 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -115,13 +115,14 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) - if vl.ShouldGrowVolumes(option) { + if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) { glog.V(0).Infof("dirAssign volume growth %v from %v", option.String(), r.RemoteAddr) if ms.Topo.AvailableSpaceFor(option) <= 0 { writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left for " + option.String()}) return } errCh := make(chan error, 1) + vl.AddGrowRequest() ms.vgCh <- &topology.VolumeGrowRequest{ Option: option, Count: writableVolumeCount, diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 3e487fb2f..dfc3957f8 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -114,6 +114,8 @@ type VolumeLayout struct { volumeSizeLimit uint64 replicationAsMin bool accessLock sync.RWMutex + growRequestCount int + growRequestTime time.Time } type VolumeLayoutStats struct { @@ -310,6 +312,21 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n return &vid, count, locationList, nil } +func (vl *VolumeLayout) HasGrowRequest() bool { + if vl.growRequestCount > 0 && vl.growRequestTime.Add(time.Minute).After(time.Now()) { + return true + } + return false +} +func (vl *VolumeLayout) AddGrowRequest() { + vl.growRequestTime = time.Now() + vl.growRequestCount++ +} +func (vl *VolumeLayout) DoneGrowRequest() { + vl.growRequestTime = time.Unix(0,0) + vl.growRequestCount = 0 +} + func (vl *VolumeLayout) ShouldGrowVolumes(option *VolumeGrowOption) bool { active, crowded := vl.GetActiveVolumeCount(option) //glog.V(0).Infof("active volume: %d, high usage volume: %d\n", active, high)