diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 789a01330..ecb5ebd0d 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -172,10 +172,10 @@ func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeL for vid, locationList := range tmpMap { volumeLayout.accessLock.RLock() - isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid] + isReadOnly := volumeLayout.readonlyVolumes.IsTrue(vid) volumeLayout.accessLock.RUnlock() - if hasValue && isReadOnly { + if isReadOnly { continue } diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 9e84fd2da..e7659e0eb 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -13,14 +13,100 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) +type copyState int + +const ( + noCopies copyState = 0 + iota + insufficientCopies + enoughCopies +) + +type volumeState string + +const ( + readOnlyState volumeState = "ReadOnly" + oversizedState = "Oversized" +) + +type stateIndicator func(copyState) bool + +func ExistCopies() stateIndicator { + return func(state copyState) bool { return state != noCopies } +} + +func NoCopies() stateIndicator { + return func(state copyState) bool { return state == noCopies } +} + +type volumesBinaryState struct { + rp *super_block.ReplicaPlacement + name volumeState // the name for volume state (eg. "Readonly", "Oversized") + indicator stateIndicator // indicate whether the volumes should be marked as `name` + copyMap map[needle.VolumeId]*VolumeLocationList +} + +func NewVolumesBinaryState(name volumeState, rp *super_block.ReplicaPlacement, indicator stateIndicator) *volumesBinaryState { + return &volumesBinaryState{ + rp: rp, + name: name, + indicator: indicator, + copyMap: make(map[needle.VolumeId]*VolumeLocationList), + } +} + +func (v *volumesBinaryState) Dump() (res []uint32) { + for vid, list := range v.copyMap { + if v.indicator(v.copyState(list)) { + res = append(res, uint32(vid)) + } + } + return +} + +func (v *volumesBinaryState) IsTrue(vid needle.VolumeId) bool { + list, _ := v.copyMap[vid] + return v.indicator(v.copyState(list)) +} + +func (v *volumesBinaryState) Add(vid needle.VolumeId, dn *DataNode) { + list, _ := v.copyMap[vid] + if list != nil { + list.Set(dn) + return + } + list = NewVolumeLocationList() + list.Set(dn) + v.copyMap[vid] = list +} + +func (v *volumesBinaryState) Remove(vid needle.VolumeId, dn *DataNode) { + list, _ := v.copyMap[vid] + if list != nil { + list.Remove(dn) + if list.Length() == 0 { + delete(v.copyMap, vid) + } + } +} + +func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState { + if list == nil { + return noCopies + } + if list.Length() < v.rp.GetCopyCount() { + return insufficientCopies + } + return enoughCopies +} + // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { rp *super_block.ReplicaPlacement ttl *needle.TTL vid2location map[needle.VolumeId]*VolumeLocationList - writables []needle.VolumeId // transient array of writable volume id - readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes - oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes + writables []needle.VolumeId // transient array of writable volume id + readonlyVolumes *volumesBinaryState // readonly volumes + oversizedVolumes *volumesBinaryState // oversized volumes volumeSizeLimit uint64 replicationAsMin bool accessLock sync.RWMutex @@ -38,8 +124,8 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSi ttl: ttl, vid2location: make(map[needle.VolumeId]*VolumeLocationList), writables: *new([]needle.VolumeId), - readonlyVolumes: make(map[needle.VolumeId]bool), - oversizedVolumes: make(map[needle.VolumeId]bool), + readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()), + oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()), volumeSizeLimit: volumeSizeLimit, replicationAsMin: replicationAsMin, } @@ -54,7 +140,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { defer vl.accessLock.Unlock() defer vl.ensureCorrectWritables(v) - defer vl.rememberOversizedVolume(v) + defer vl.rememberOversizedVolume(v, dn) if _, ok := vl.vid2location[v.Id]; !ok { vl.vid2location[v.Id] = NewVolumeLocationList() @@ -66,24 +152,26 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if vInfo.ReadOnly { glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) - vl.readonlyVolumes[v.Id] = true + vl.readonlyVolumes.Add(v.Id, dn) return } else { - delete(vl.readonlyVolumes, v.Id) + vl.readonlyVolumes.Remove(v.Id, dn) } } else { glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) - delete(vl.readonlyVolumes, v.Id) + vl.readonlyVolumes.Remove(v.Id, dn) return } } } -func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) { +func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo, dn *DataNode) { if vl.isOversized(v) { - vl.oversizedVolumes[v.Id] = true + vl.oversizedVolumes.Add(v.Id, dn) + } else { + vl.oversizedVolumes.Remove(v.Id, dn) } } @@ -99,6 +187,8 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if location.Remove(dn) { + vl.readonlyVolumes.Remove(v.Id, dn) + vl.oversizedVolumes.Remove(v.Id, dn) vl.ensureCorrectWritables(v) if location.Length() == 0 { @@ -110,7 +200,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) { if vl.enoughCopies(v.Id) && vl.isWritable(v) { - if _, ok := vl.oversizedVolumes[v.Id]; !ok { + if !vl.oversizedVolumes.IsTrue(v.Id) { vl.setVolumeWritable(v.Id) } } else { @@ -251,6 +341,8 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) if location, ok := vl.vid2location[vid]; ok { if location.Remove(dn) { + vl.readonlyVolumes.Remove(vid, dn) + vl.oversizedVolumes.Remove(vid, dn) if location.Length() < vl.rp.GetCopyCount() { glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount()) return vl.removeFromWritable(vid) @@ -315,7 +407,7 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats { size, fileCount := vll.Stats(vid, freshThreshold) ret.FileCount += uint64(fileCount) ret.UsedSize += size - if vl.readonlyVolumes[vid] { + if vl.readonlyVolumes.IsTrue(vid) { ret.TotalSize += size } else { ret.TotalSize += vl.volumeSizeLimit diff --git a/weed/topology/volume_layout_test.go b/weed/topology/volume_layout_test.go new file mode 100644 index 000000000..e148d6107 --- /dev/null +++ b/weed/topology/volume_layout_test.go @@ -0,0 +1,116 @@ +package topology + +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" +) + +func TestVolumesBinaryState(t *testing.T) { + vids := []needle.VolumeId{ + needle.VolumeId(1), + needle.VolumeId(2), + needle.VolumeId(3), + needle.VolumeId(4), + needle.VolumeId(5), + } + + dns := []*DataNode{ + &DataNode{ + Ip: "127.0.0.1", + Port: 8081, + }, + &DataNode{ + Ip: "127.0.0.1", + Port: 8082, + }, + &DataNode{ + Ip: "127.0.0.1", + Port: 8083, + }, + } + + rp, _ := super_block.NewReplicaPlacementFromString("002") + + state_exist := NewVolumesBinaryState(readOnlyState, rp, ExistCopies()) + state_exist.Add(vids[0], dns[0]) + state_exist.Add(vids[0], dns[1]) + state_exist.Add(vids[1], dns[2]) + state_exist.Add(vids[2], dns[1]) + state_exist.Add(vids[4], dns[1]) + state_exist.Add(vids[4], dns[2]) + + state_no := NewVolumesBinaryState(readOnlyState, rp, NoCopies()) + state_no.Add(vids[0], dns[0]) + state_no.Add(vids[0], dns[1]) + state_no.Add(vids[3], dns[1]) + + tests := []struct { + name string + state *volumesBinaryState + expectResult []bool + update func() + expectResultAfterUpdate []bool + }{ + { + name: "mark true when exist copies", + state: state_exist, + expectResult: []bool{true, true, true, false, true}, + update: func() { + state_exist.Remove(vids[0], dns[2]) + state_exist.Remove(vids[1], dns[2]) + state_exist.Remove(vids[3], dns[2]) + state_exist.Remove(vids[4], dns[1]) + state_exist.Remove(vids[4], dns[2]) + }, + expectResultAfterUpdate: []bool{true, false, true, false, false}, + }, + { + name: "mark true when inexist copies", + state: state_no, + expectResult: []bool{false, true, true, false, true}, + update: func() { + state_no.Remove(vids[0], dns[2]) + state_no.Remove(vids[1], dns[2]) + state_no.Add(vids[2], dns[1]) + state_no.Remove(vids[3], dns[1]) + state_no.Remove(vids[4], dns[2]) + }, + expectResultAfterUpdate: []bool{false, true, false, true, true}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var result []bool + for index, _ := range vids { + result = append(result, test.state.IsTrue(vids[index])) + } + if len(result) != len(test.expectResult) { + t.Fatalf("len(result) != len(expectResult), got %d, expected %d\n", + len(result), len(test.expectResult)) + } + for index, val := range result { + if val != test.expectResult[index] { + t.Fatalf("result not matched, index %d, got %v, expect %v\n", + index, val, test.expectResult[index]) + } + } + test.update() + var updateResult []bool + for index, _ := range vids { + updateResult = append(updateResult, test.state.IsTrue(vids[index])) + } + if len(updateResult) != len(test.expectResultAfterUpdate) { + t.Fatalf("len(updateResult) != len(expectResultAfterUpdate), got %d, expected %d\n", + len(updateResult), len(test.expectResultAfterUpdate)) + } + for index, val := range updateResult { + if val != test.expectResultAfterUpdate[index] { + t.Fatalf("update result not matched, index %d, got %v, expect %v\n", + index, val, test.expectResultAfterUpdate[index]) + } + } + }) + } +}