Merge pull request #1479 from LIBA-S/fix_oversized

Fix: remove the oversized state after compaction
This commit is contained in:
Chris Lu 2020-09-22 08:03:30 -07:00 committed by GitHub
commit bd11f0b3e4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 218 additions and 15 deletions

View file

@ -172,10 +172,10 @@ func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeL
for vid, locationList := range tmpMap { for vid, locationList := range tmpMap {
volumeLayout.accessLock.RLock() volumeLayout.accessLock.RLock()
isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid] isReadOnly := volumeLayout.readonlyVolumes.IsTrue(vid)
volumeLayout.accessLock.RUnlock() volumeLayout.accessLock.RUnlock()
if hasValue && isReadOnly { if isReadOnly {
continue continue
} }

View file

@ -13,14 +13,100 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/super_block"
) )
type copyState int
const (
noCopies copyState = 0 + iota
insufficientCopies
enoughCopies
)
type volumeState string
const (
readOnlyState volumeState = "ReadOnly"
oversizedState = "Oversized"
)
type stateIndicator func(copyState) bool
func ExistCopies() stateIndicator {
return func(state copyState) bool { return state != noCopies }
}
func NoCopies() stateIndicator {
return func(state copyState) bool { return state == noCopies }
}
type volumesBinaryState struct {
copyCount int
name volumeState // the name for volume state (eg. "Readonly", "Oversized")
indicator stateIndicator // indicate whether the volumes should be marked as `name`
copyMap map[needle.VolumeId]*VolumeLocationList
}
func NewVolumesBinaryState(name volumeState, copyCount int, indicator stateIndicator) *volumesBinaryState {
return &volumesBinaryState{
copyCount: copyCount,
name: name,
indicator: indicator,
copyMap: make(map[needle.VolumeId]*VolumeLocationList),
}
}
func (v *volumesBinaryState) Dump() (res []uint32) {
for vid, list := range v.copyMap {
if v.indicator(v.copyState(list)) {
res = append(res, uint32(vid))
}
}
return
}
func (v *volumesBinaryState) IsTrue(vid needle.VolumeId) bool {
list, _ := v.copyMap[vid]
return v.indicator(v.copyState(list))
}
func (v *volumesBinaryState) Add(vid needle.VolumeId, dn *DataNode) {
list, _ := v.copyMap[vid]
if list != nil {
list.Set(dn)
return
}
list = NewVolumeLocationList()
list.Set(dn)
v.copyMap[vid] = list
}
func (v *volumesBinaryState) Remove(vid needle.VolumeId, dn *DataNode) {
list, _ := v.copyMap[vid]
if list != nil {
list.Remove(dn)
if list.Length() == 0 {
delete(v.copyMap, vid)
}
}
}
func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState {
if list == nil {
return noCopies
}
if list.Length() < v.copyCount {
return insufficientCopies
}
return enoughCopies
}
// mapping from volume to its locations, inverted from server to volume // mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct { type VolumeLayout struct {
rp *super_block.ReplicaPlacement rp *super_block.ReplicaPlacement
ttl *needle.TTL ttl *needle.TTL
vid2location map[needle.VolumeId]*VolumeLocationList vid2location map[needle.VolumeId]*VolumeLocationList
writables []needle.VolumeId // transient array of writable volume id writables []needle.VolumeId // transient array of writable volume id
readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes readonlyVolumes *volumesBinaryState // readonly volumes
oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes oversizedVolumes *volumesBinaryState // oversized volumes
volumeSizeLimit uint64 volumeSizeLimit uint64
replicationAsMin bool replicationAsMin bool
accessLock sync.RWMutex accessLock sync.RWMutex
@ -38,8 +124,8 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSi
ttl: ttl, ttl: ttl,
vid2location: make(map[needle.VolumeId]*VolumeLocationList), vid2location: make(map[needle.VolumeId]*VolumeLocationList),
writables: *new([]needle.VolumeId), writables: *new([]needle.VolumeId),
readonlyVolumes: make(map[needle.VolumeId]bool), readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp.GetCopyCount(), ExistCopies()),
oversizedVolumes: make(map[needle.VolumeId]bool), oversizedVolumes: NewVolumesBinaryState(oversizedState, rp.GetCopyCount(), ExistCopies()),
volumeSizeLimit: volumeSizeLimit, volumeSizeLimit: volumeSizeLimit,
replicationAsMin: replicationAsMin, replicationAsMin: replicationAsMin,
} }
@ -54,7 +140,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
defer vl.accessLock.Unlock() defer vl.accessLock.Unlock()
defer vl.ensureCorrectWritables(v) defer vl.ensureCorrectWritables(v)
defer vl.rememberOversizedVolume(v) defer vl.rememberOversizedVolume(v, dn)
if _, ok := vl.vid2location[v.Id]; !ok { if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewVolumeLocationList() vl.vid2location[v.Id] = NewVolumeLocationList()
@ -66,24 +152,26 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
if vInfo.ReadOnly { if vInfo.ReadOnly {
glog.V(1).Infof("vid %d removed from writable", v.Id) glog.V(1).Infof("vid %d removed from writable", v.Id)
vl.removeFromWritable(v.Id) vl.removeFromWritable(v.Id)
vl.readonlyVolumes[v.Id] = true vl.readonlyVolumes.Add(v.Id, dn)
return return
} else { } else {
delete(vl.readonlyVolumes, v.Id) vl.readonlyVolumes.Remove(v.Id, dn)
} }
} else { } else {
glog.V(1).Infof("vid %d removed from writable", v.Id) glog.V(1).Infof("vid %d removed from writable", v.Id)
vl.removeFromWritable(v.Id) vl.removeFromWritable(v.Id)
delete(vl.readonlyVolumes, v.Id) vl.readonlyVolumes.Remove(v.Id, dn)
return return
} }
} }
} }
func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) { func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo, dn *DataNode) {
if vl.isOversized(v) { if vl.isOversized(v) {
vl.oversizedVolumes[v.Id] = true vl.oversizedVolumes.Add(v.Id, dn)
} else {
vl.oversizedVolumes.Remove(v.Id, dn)
} }
} }
@ -99,6 +187,8 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
if location.Remove(dn) { if location.Remove(dn) {
vl.readonlyVolumes.Remove(v.Id, dn)
vl.oversizedVolumes.Remove(v.Id, dn)
vl.ensureCorrectWritables(v) vl.ensureCorrectWritables(v)
if location.Length() == 0 { if location.Length() == 0 {
@ -110,7 +200,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) { func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) {
if vl.enoughCopies(v.Id) && vl.isWritable(v) { if vl.enoughCopies(v.Id) && vl.isWritable(v) {
if _, ok := vl.oversizedVolumes[v.Id]; !ok { if vl.oversizedVolumes.IsTrue(v.Id) {
vl.setVolumeWritable(v.Id) vl.setVolumeWritable(v.Id)
} }
} else { } else {
@ -315,7 +405,7 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats {
size, fileCount := vll.Stats(vid, freshThreshold) size, fileCount := vll.Stats(vid, freshThreshold)
ret.FileCount += uint64(fileCount) ret.FileCount += uint64(fileCount)
ret.UsedSize += size ret.UsedSize += size
if vl.readonlyVolumes[vid] { if vl.readonlyVolumes.IsTrue(vid) {
ret.TotalSize += size ret.TotalSize += size
} else { } else {
ret.TotalSize += vl.volumeSizeLimit ret.TotalSize += vl.volumeSizeLimit

View file

@ -0,0 +1,113 @@
package topology
import (
"testing"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func TestVolumesBinaryState(t *testing.T) {
vids := []needle.VolumeId{
needle.VolumeId(1),
needle.VolumeId(2),
needle.VolumeId(3),
needle.VolumeId(4),
needle.VolumeId(5),
}
dns := []*DataNode{
&DataNode{
Ip: "127.0.0.1",
Port: 8081,
},
&DataNode{
Ip: "127.0.0.1",
Port: 8082,
},
&DataNode{
Ip: "127.0.0.1",
Port: 8083,
},
}
state_exist := NewVolumesBinaryState(readOnlyState, 3, ExistCopies())
state_exist.Add(vids[0], dns[0])
state_exist.Add(vids[0], dns[1])
state_exist.Add(vids[1], dns[2])
state_exist.Add(vids[2], dns[1])
state_exist.Add(vids[4], dns[1])
state_exist.Add(vids[4], dns[2])
state_no := NewVolumesBinaryState(readOnlyState, 3, NoCopies())
state_no.Add(vids[0], dns[0])
state_no.Add(vids[0], dns[1])
state_no.Add(vids[3], dns[1])
tests := []struct {
name string
state *volumesBinaryState
expectResult []bool
update func()
expectResultAfterUpdate []bool
}{
{
name: "mark true when exist copies",
state: state_exist,
expectResult: []bool{true, true, true, false, true},
update: func() {
state_exist.Remove(vids[0], dns[2])
state_exist.Remove(vids[1], dns[2])
state_exist.Remove(vids[3], dns[2])
state_exist.Remove(vids[4], dns[1])
state_exist.Remove(vids[4], dns[2])
},
expectResultAfterUpdate: []bool{true, false, true, false, false},
},
{
name: "mark true when inexist copies",
state: state_no,
expectResult: []bool{false, true, true, false, true},
update: func() {
state_no.Remove(vids[0], dns[2])
state_no.Remove(vids[1], dns[2])
state_no.Add(vids[2], dns[1])
state_no.Remove(vids[3], dns[1])
state_no.Remove(vids[4], dns[2])
},
expectResultAfterUpdate: []bool{false, true, false, true, true},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var result []bool
for index, _ := range vids {
result = append(result, test.state.IsTrue(vids[index]))
}
if len(result) != len(test.expectResult) {
t.Fatalf("len(result) != len(expectResult), got %d, expected %d\n",
len(result), len(test.expectResult))
}
for index, val := range result {
if val != test.expectResult[index] {
t.Fatalf("result not matched, index %d, got %v, expect %v\n",
index, val, test.expectResult[index])
}
}
test.update()
var updateResult []bool
for index, _ := range vids {
updateResult = append(updateResult, test.state.IsTrue(vids[index]))
}
if len(updateResult) != len(test.expectResultAfterUpdate) {
t.Fatalf("len(updateResult) != len(expectResultAfterUpdate), got %d, expected %d\n",
len(updateResult), len(test.expectResultAfterUpdate))
}
for index, val := range updateResult {
if val != test.expectResultAfterUpdate[index] {
t.Fatalf("update result not matched, index %d, got %v, expect %v\n",
index, val, test.expectResultAfterUpdate[index])
}
}
})
}
}