diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index ca626e973..9d964489d 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -5,9 +5,10 @@ import ( "sync/atomic" "time" - "github.com/chrislusf/seaweedfs/weed/storage/needle" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -105,7 +106,9 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v } else { glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, dn.Url()) } - if isCommitSuccess { + } + if isCommitSuccess { + for _, dn := range locationlist.list { vl.SetVolumeAvailable(dn, vid) } } diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 7633b28be..a0c66516d 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -51,6 +51,9 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() + defer vl.ensureCorrectWritables(v) + defer vl.rememberOversizedVolume(v) + if _, ok := vl.vid2location[v.Id]; !ok { vl.vid2location[v.Id] = NewVolumeLocationList() } @@ -74,9 +77,6 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } } - vl.rememberOversizedVolume(v) - vl.ensureCorrectWritables(v) - } func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) { @@ -109,22 +109,13 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) { if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { if _, ok := vl.oversizedVolumes[v.Id]; !ok { - vl.addToWritable(v.Id) + vl.setVolumeWritable(v.Id) } } else { vl.removeFromWritable(v.Id) } } -func (vl *VolumeLayout) addToWritable(vid needle.VolumeId) { - for _, id := range vl.writables { - if vid == id { - return - } - } - vl.writables = append(vl.writables, vid) -} - func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool { return uint64(v.Size) >= vl.volumeSizeLimit } @@ -270,7 +261,17 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId) bo vl.accessLock.Lock() defer vl.accessLock.Unlock() + vInfo, err := dn.GetVolumesById(v.Id) + if err != nil { + return false + } + vl.vid2location[vid].Set(dn) + + if vInfo.ReadOnly { + return false + } + if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() { return vl.setVolumeWritable(vid) }