From 79762385bdda8fd6bacc747922b557ccf8c45e74 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 21 Jul 2019 21:49:10 -0700 Subject: [PATCH] master: ensure only one exclusive vacuum process fix https://github.com/chrislusf/seaweedfs/issues/1011 --- weed/topology/topology.go | 2 ++ weed/topology/topology_vacuum.go | 11 +++++++++++ 2 files changed, 13 insertions(+) diff --git a/weed/topology/topology.go b/weed/topology/topology.go index aa01190c9..7cbf80b4d 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -16,6 +16,7 @@ import ( ) type Topology struct { + vacuumLockCounter int64 NodeImpl collectionMap *util.ConcurrentReadMap @@ -33,6 +34,7 @@ type Topology struct { Configuration *Configuration RaftServer raft.Server + } func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) *Topology { diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 438ca8de9..37a6a30b9 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -2,6 +2,7 @@ package topology import ( "context" + "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -121,6 +122,16 @@ func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, } func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) int { + + // if there is vacuum going on, return immediately + swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1) + if !swapped { + return 0 + } + defer atomic.StoreInt64(&t.vacuumLockCounter, 0) + + // now only one vacuum process going on + glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold) for _, col := range t.collectionMap.Items() { c := col.(*Collection)