From 91e4eca1e98cb5195346d90a2cc1fb9c92557213 Mon Sep 17 00:00:00 2001 From: James Hartig Date: Mon, 21 Sep 2020 22:41:38 -0400 Subject: [PATCH 1/5] Fix deadlock with KeepConnected and SendHeartbeat There's the potential where we're writing to a clientConn and it goes away and we're stuck keeping a read lock on clientChansLock. This causes KeepConnected to not be able to remove the client since it requires a write lock on clientChansLock. This ends up backing up SendHeartbeat because it can't get a read lock. --- weed/server/master_grpc_server.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index f3a2ee013..692909a29 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -187,7 +187,8 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ peerAddress := findClientAddress(stream.Context(), req.GrpcPort) - stopChan := make(chan bool) + // buffer by 1 so we don't end up getting stuck writing to stopChan forever + stopChan := make(chan bool, 1) clientName, messageChan := ms.addClient(req.Name, peerAddress) @@ -247,7 +248,12 @@ func (ms *MasterServer) addClient(clientType string, clientAddress string) (clie clientName = clientType + "@" + clientAddress glog.V(0).Infof("+ client %v", clientName) - messageChan = make(chan *master_pb.VolumeLocation) + // we buffer this because otherwise we end up in a potential deadlock where + // the KeepConnected loop is no longer listening on this channel but we're + // trying to send to it in SendHeartbeat and so we can't lock the + // clientChansLock to remove the channel and we're stuck writing to it + // 100 is probably overkill + messageChan = make(chan *master_pb.VolumeLocation, 100) ms.clientChansLock.Lock() ms.clientChans[clientName] = messageChan From 658fc2e5b619c453ddecc805bc363a25f75ff22d Mon Sep 17 00:00:00 2001 From: James Hartig Date: Mon, 21 Sep 2020 22:43:10 -0400 Subject: [PATCH 2/5] Allow option to enable volume pprof on server --- weed/command/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/command/server.go b/weed/command/server.go index 6f40263bb..aee62290a 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -99,7 +99,7 @@ func init() { serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 1024, "limit file size to avoid out of memory") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") - serverOptions.v.pprof = &False + serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name, {bucket}.{domainName}") From 8573ac82c6793cea0bea1129e853e37776e5ecd1 Mon Sep 17 00:00:00 2001 From: LIBA-S Date: Tue, 22 Sep 2020 21:31:14 +0800 Subject: [PATCH 3/5] Fix: remove the oversized state after compaction --- weed/topology/topology_vacuum.go | 4 +- weed/topology/volume_layout.go | 116 ++++++++++++++++++++++++---- weed/topology/volume_layout_test.go | 113 +++++++++++++++++++++++++++ 3 files changed, 218 insertions(+), 15 deletions(-) create mode 100644 weed/topology/volume_layout_test.go diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 789a01330..ecb5ebd0d 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -172,10 +172,10 @@ func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeL for vid, locationList := range tmpMap { volumeLayout.accessLock.RLock() - isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid] + isReadOnly := volumeLayout.readonlyVolumes.IsTrue(vid) volumeLayout.accessLock.RUnlock() - if hasValue && isReadOnly { + if isReadOnly { continue } diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 9e84fd2da..e3a70ef25 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -13,14 +13,100 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) +type copyState int + +const ( + noCopies copyState = 0 + iota + insufficientCopies + enoughCopies +) + +type volumeState string + +const ( + readOnlyState volumeState = "ReadOnly" + oversizedState = "Oversized" +) + +type stateIndicator func(copyState) bool + +func ExistCopies() stateIndicator { + return func(state copyState) bool { return state != noCopies } +} + +func NoCopies() stateIndicator { + return func(state copyState) bool { return state == noCopies } +} + +type volumesBinaryState struct { + copyCount int + name volumeState // the name for volume state (eg. "Readonly", "Oversized") + indicator stateIndicator // indicate whether the volumes should be marked as `name` + copyMap map[needle.VolumeId]*VolumeLocationList +} + +func NewVolumesBinaryState(name volumeState, copyCount int, indicator stateIndicator) *volumesBinaryState { + return &volumesBinaryState{ + copyCount: copyCount, + name: name, + indicator: indicator, + copyMap: make(map[needle.VolumeId]*VolumeLocationList), + } +} + +func (v *volumesBinaryState) Dump() (res []uint32) { + for vid, list := range v.copyMap { + if v.indicator(v.copyState(list)) { + res = append(res, uint32(vid)) + } + } + return +} + +func (v *volumesBinaryState) IsTrue(vid needle.VolumeId) bool { + list, _ := v.copyMap[vid] + return v.indicator(v.copyState(list)) +} + +func (v *volumesBinaryState) Add(vid needle.VolumeId, dn *DataNode) { + list, _ := v.copyMap[vid] + if list != nil { + list.Set(dn) + return + } + list = NewVolumeLocationList() + list.Set(dn) + v.copyMap[vid] = list +} + +func (v *volumesBinaryState) Remove(vid needle.VolumeId, dn *DataNode) { + list, _ := v.copyMap[vid] + if list != nil { + list.Remove(dn) + if list.Length() == 0 { + delete(v.copyMap, vid) + } + } +} + +func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState { + if list == nil { + return noCopies + } + if list.Length() < v.copyCount { + return insufficientCopies + } + return enoughCopies +} + // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { rp *super_block.ReplicaPlacement ttl *needle.TTL vid2location map[needle.VolumeId]*VolumeLocationList - writables []needle.VolumeId // transient array of writable volume id - readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes - oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes + writables []needle.VolumeId // transient array of writable volume id + readonlyVolumes *volumesBinaryState // readonly volumes + oversizedVolumes *volumesBinaryState // oversized volumes volumeSizeLimit uint64 replicationAsMin bool accessLock sync.RWMutex @@ -38,8 +124,8 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSi ttl: ttl, vid2location: make(map[needle.VolumeId]*VolumeLocationList), writables: *new([]needle.VolumeId), - readonlyVolumes: make(map[needle.VolumeId]bool), - oversizedVolumes: make(map[needle.VolumeId]bool), + readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp.GetCopyCount(), ExistCopies()), + oversizedVolumes: NewVolumesBinaryState(oversizedState, rp.GetCopyCount(), ExistCopies()), volumeSizeLimit: volumeSizeLimit, replicationAsMin: replicationAsMin, } @@ -54,7 +140,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { defer vl.accessLock.Unlock() defer vl.ensureCorrectWritables(v) - defer vl.rememberOversizedVolume(v) + defer vl.rememberOversizedVolume(v, dn) if _, ok := vl.vid2location[v.Id]; !ok { vl.vid2location[v.Id] = NewVolumeLocationList() @@ -66,24 +152,26 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if vInfo.ReadOnly { glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) - vl.readonlyVolumes[v.Id] = true + vl.readonlyVolumes.Add(v.Id, dn) return } else { - delete(vl.readonlyVolumes, v.Id) + vl.readonlyVolumes.Remove(v.Id, dn) } } else { glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) - delete(vl.readonlyVolumes, v.Id) + vl.readonlyVolumes.Remove(v.Id, dn) return } } } -func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) { +func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo, dn *DataNode) { if vl.isOversized(v) { - vl.oversizedVolumes[v.Id] = true + vl.oversizedVolumes.Add(v.Id, dn) + } else { + vl.oversizedVolumes.Remove(v.Id, dn) } } @@ -99,6 +187,8 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if location.Remove(dn) { + vl.readonlyVolumes.Remove(v.Id, dn) + vl.oversizedVolumes.Remove(v.Id, dn) vl.ensureCorrectWritables(v) if location.Length() == 0 { @@ -110,7 +200,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) { if vl.enoughCopies(v.Id) && vl.isWritable(v) { - if _, ok := vl.oversizedVolumes[v.Id]; !ok { + if vl.oversizedVolumes.IsTrue(v.Id) { vl.setVolumeWritable(v.Id) } } else { @@ -315,7 +405,7 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats { size, fileCount := vll.Stats(vid, freshThreshold) ret.FileCount += uint64(fileCount) ret.UsedSize += size - if vl.readonlyVolumes[vid] { + if vl.readonlyVolumes.IsTrue(vid) { ret.TotalSize += size } else { ret.TotalSize += vl.volumeSizeLimit diff --git a/weed/topology/volume_layout_test.go b/weed/topology/volume_layout_test.go new file mode 100644 index 000000000..16232fda7 --- /dev/null +++ b/weed/topology/volume_layout_test.go @@ -0,0 +1,113 @@ +package topology + +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func TestVolumesBinaryState(t *testing.T) { + vids := []needle.VolumeId{ + needle.VolumeId(1), + needle.VolumeId(2), + needle.VolumeId(3), + needle.VolumeId(4), + needle.VolumeId(5), + } + + dns := []*DataNode{ + &DataNode{ + Ip: "127.0.0.1", + Port: 8081, + }, + &DataNode{ + Ip: "127.0.0.1", + Port: 8082, + }, + &DataNode{ + Ip: "127.0.0.1", + Port: 8083, + }, + } + + state_exist := NewVolumesBinaryState(readOnlyState, 3, ExistCopies()) + state_exist.Add(vids[0], dns[0]) + state_exist.Add(vids[0], dns[1]) + state_exist.Add(vids[1], dns[2]) + state_exist.Add(vids[2], dns[1]) + state_exist.Add(vids[4], dns[1]) + state_exist.Add(vids[4], dns[2]) + + state_no := NewVolumesBinaryState(readOnlyState, 3, NoCopies()) + state_no.Add(vids[0], dns[0]) + state_no.Add(vids[0], dns[1]) + state_no.Add(vids[3], dns[1]) + + tests := []struct { + name string + state *volumesBinaryState + expectResult []bool + update func() + expectResultAfterUpdate []bool + }{ + { + name: "mark true when exist copies", + state: state_exist, + expectResult: []bool{true, true, true, false, true}, + update: func() { + state_exist.Remove(vids[0], dns[2]) + state_exist.Remove(vids[1], dns[2]) + state_exist.Remove(vids[3], dns[2]) + state_exist.Remove(vids[4], dns[1]) + state_exist.Remove(vids[4], dns[2]) + }, + expectResultAfterUpdate: []bool{true, false, true, false, false}, + }, + { + name: "mark true when inexist copies", + state: state_no, + expectResult: []bool{false, true, true, false, true}, + update: func() { + state_no.Remove(vids[0], dns[2]) + state_no.Remove(vids[1], dns[2]) + state_no.Add(vids[2], dns[1]) + state_no.Remove(vids[3], dns[1]) + state_no.Remove(vids[4], dns[2]) + }, + expectResultAfterUpdate: []bool{false, true, false, true, true}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var result []bool + for index, _ := range vids { + result = append(result, test.state.IsTrue(vids[index])) + } + if len(result) != len(test.expectResult) { + t.Fatalf("len(result) != len(expectResult), got %d, expected %d\n", + len(result), len(test.expectResult)) + } + for index, val := range result { + if val != test.expectResult[index] { + t.Fatalf("result not matched, index %d, got %v, expect %v\n", + index, val, test.expectResult[index]) + } + } + test.update() + var updateResult []bool + for index, _ := range vids { + updateResult = append(updateResult, test.state.IsTrue(vids[index])) + } + if len(updateResult) != len(test.expectResultAfterUpdate) { + t.Fatalf("len(updateResult) != len(expectResultAfterUpdate), got %d, expected %d\n", + len(updateResult), len(test.expectResultAfterUpdate)) + } + for index, val := range updateResult { + if val != test.expectResultAfterUpdate[index] { + t.Fatalf("update result not matched, index %d, got %v, expect %v\n", + index, val, test.expectResultAfterUpdate[index]) + } + } + }) + } +} From 4a1fe4b8e25c53c871c50edb624823435fd3e948 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 22 Sep 2020 09:16:07 -0700 Subject: [PATCH 4/5] add error logs --- weed/filesys/dir_rename.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index b9e9e300b..3f73d0eb6 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -23,7 +23,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector // find local old entry oldEntry, err := dir.wfs.metaCache.FindEntry(context.Background(), oldPath) if err != nil { - glog.V(0).Infof("dir Rename can not find source %s : %v", oldPath, err) + glog.Errorf("dir Rename can not find source %s : %v", oldPath, err) return fuse.ENOENT } @@ -41,6 +41,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector _, err := client.AtomicRenameEntry(ctx, request) if err != nil { + glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err) return fuse.EIO } From 3e52329cee39f3a83f58d3896514547093ff0cf8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 22 Sep 2020 15:05:37 -0700 Subject: [PATCH 5/5] Revert "Merge pull request #1479 from LIBA-S/fix_oversized" This reverts commit bd11f0b3e4c14e91c7c9c19cd19db454ff02b4d6, reversing changes made to ec5b9f1e91a8609d0e70bf9d26dc0840774153c4. --- weed/topology/topology_vacuum.go | 4 +- weed/topology/volume_layout.go | 116 ++++------------------------ weed/topology/volume_layout_test.go | 113 --------------------------- 3 files changed, 15 insertions(+), 218 deletions(-) delete mode 100644 weed/topology/volume_layout_test.go diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index ecb5ebd0d..789a01330 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -172,10 +172,10 @@ func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeL for vid, locationList := range tmpMap { volumeLayout.accessLock.RLock() - isReadOnly := volumeLayout.readonlyVolumes.IsTrue(vid) + isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid] volumeLayout.accessLock.RUnlock() - if isReadOnly { + if hasValue && isReadOnly { continue } diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index e3a70ef25..9e84fd2da 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -13,100 +13,14 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) -type copyState int - -const ( - noCopies copyState = 0 + iota - insufficientCopies - enoughCopies -) - -type volumeState string - -const ( - readOnlyState volumeState = "ReadOnly" - oversizedState = "Oversized" -) - -type stateIndicator func(copyState) bool - -func ExistCopies() stateIndicator { - return func(state copyState) bool { return state != noCopies } -} - -func NoCopies() stateIndicator { - return func(state copyState) bool { return state == noCopies } -} - -type volumesBinaryState struct { - copyCount int - name volumeState // the name for volume state (eg. "Readonly", "Oversized") - indicator stateIndicator // indicate whether the volumes should be marked as `name` - copyMap map[needle.VolumeId]*VolumeLocationList -} - -func NewVolumesBinaryState(name volumeState, copyCount int, indicator stateIndicator) *volumesBinaryState { - return &volumesBinaryState{ - copyCount: copyCount, - name: name, - indicator: indicator, - copyMap: make(map[needle.VolumeId]*VolumeLocationList), - } -} - -func (v *volumesBinaryState) Dump() (res []uint32) { - for vid, list := range v.copyMap { - if v.indicator(v.copyState(list)) { - res = append(res, uint32(vid)) - } - } - return -} - -func (v *volumesBinaryState) IsTrue(vid needle.VolumeId) bool { - list, _ := v.copyMap[vid] - return v.indicator(v.copyState(list)) -} - -func (v *volumesBinaryState) Add(vid needle.VolumeId, dn *DataNode) { - list, _ := v.copyMap[vid] - if list != nil { - list.Set(dn) - return - } - list = NewVolumeLocationList() - list.Set(dn) - v.copyMap[vid] = list -} - -func (v *volumesBinaryState) Remove(vid needle.VolumeId, dn *DataNode) { - list, _ := v.copyMap[vid] - if list != nil { - list.Remove(dn) - if list.Length() == 0 { - delete(v.copyMap, vid) - } - } -} - -func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState { - if list == nil { - return noCopies - } - if list.Length() < v.copyCount { - return insufficientCopies - } - return enoughCopies -} - // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { rp *super_block.ReplicaPlacement ttl *needle.TTL vid2location map[needle.VolumeId]*VolumeLocationList - writables []needle.VolumeId // transient array of writable volume id - readonlyVolumes *volumesBinaryState // readonly volumes - oversizedVolumes *volumesBinaryState // oversized volumes + writables []needle.VolumeId // transient array of writable volume id + readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes + oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes volumeSizeLimit uint64 replicationAsMin bool accessLock sync.RWMutex @@ -124,8 +38,8 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSi ttl: ttl, vid2location: make(map[needle.VolumeId]*VolumeLocationList), writables: *new([]needle.VolumeId), - readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp.GetCopyCount(), ExistCopies()), - oversizedVolumes: NewVolumesBinaryState(oversizedState, rp.GetCopyCount(), ExistCopies()), + readonlyVolumes: make(map[needle.VolumeId]bool), + oversizedVolumes: make(map[needle.VolumeId]bool), volumeSizeLimit: volumeSizeLimit, replicationAsMin: replicationAsMin, } @@ -140,7 +54,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { defer vl.accessLock.Unlock() defer vl.ensureCorrectWritables(v) - defer vl.rememberOversizedVolume(v, dn) + defer vl.rememberOversizedVolume(v) if _, ok := vl.vid2location[v.Id]; !ok { vl.vid2location[v.Id] = NewVolumeLocationList() @@ -152,26 +66,24 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if vInfo.ReadOnly { glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) - vl.readonlyVolumes.Add(v.Id, dn) + vl.readonlyVolumes[v.Id] = true return } else { - vl.readonlyVolumes.Remove(v.Id, dn) + delete(vl.readonlyVolumes, v.Id) } } else { glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) - vl.readonlyVolumes.Remove(v.Id, dn) + delete(vl.readonlyVolumes, v.Id) return } } } -func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo, dn *DataNode) { +func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) { if vl.isOversized(v) { - vl.oversizedVolumes.Add(v.Id, dn) - } else { - vl.oversizedVolumes.Remove(v.Id, dn) + vl.oversizedVolumes[v.Id] = true } } @@ -187,8 +99,6 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if location.Remove(dn) { - vl.readonlyVolumes.Remove(v.Id, dn) - vl.oversizedVolumes.Remove(v.Id, dn) vl.ensureCorrectWritables(v) if location.Length() == 0 { @@ -200,7 +110,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) { if vl.enoughCopies(v.Id) && vl.isWritable(v) { - if vl.oversizedVolumes.IsTrue(v.Id) { + if _, ok := vl.oversizedVolumes[v.Id]; !ok { vl.setVolumeWritable(v.Id) } } else { @@ -405,7 +315,7 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats { size, fileCount := vll.Stats(vid, freshThreshold) ret.FileCount += uint64(fileCount) ret.UsedSize += size - if vl.readonlyVolumes.IsTrue(vid) { + if vl.readonlyVolumes[vid] { ret.TotalSize += size } else { ret.TotalSize += vl.volumeSizeLimit diff --git a/weed/topology/volume_layout_test.go b/weed/topology/volume_layout_test.go deleted file mode 100644 index 16232fda7..000000000 --- a/weed/topology/volume_layout_test.go +++ /dev/null @@ -1,113 +0,0 @@ -package topology - -import ( - "testing" - - "github.com/chrislusf/seaweedfs/weed/storage/needle" -) - -func TestVolumesBinaryState(t *testing.T) { - vids := []needle.VolumeId{ - needle.VolumeId(1), - needle.VolumeId(2), - needle.VolumeId(3), - needle.VolumeId(4), - needle.VolumeId(5), - } - - dns := []*DataNode{ - &DataNode{ - Ip: "127.0.0.1", - Port: 8081, - }, - &DataNode{ - Ip: "127.0.0.1", - Port: 8082, - }, - &DataNode{ - Ip: "127.0.0.1", - Port: 8083, - }, - } - - state_exist := NewVolumesBinaryState(readOnlyState, 3, ExistCopies()) - state_exist.Add(vids[0], dns[0]) - state_exist.Add(vids[0], dns[1]) - state_exist.Add(vids[1], dns[2]) - state_exist.Add(vids[2], dns[1]) - state_exist.Add(vids[4], dns[1]) - state_exist.Add(vids[4], dns[2]) - - state_no := NewVolumesBinaryState(readOnlyState, 3, NoCopies()) - state_no.Add(vids[0], dns[0]) - state_no.Add(vids[0], dns[1]) - state_no.Add(vids[3], dns[1]) - - tests := []struct { - name string - state *volumesBinaryState - expectResult []bool - update func() - expectResultAfterUpdate []bool - }{ - { - name: "mark true when exist copies", - state: state_exist, - expectResult: []bool{true, true, true, false, true}, - update: func() { - state_exist.Remove(vids[0], dns[2]) - state_exist.Remove(vids[1], dns[2]) - state_exist.Remove(vids[3], dns[2]) - state_exist.Remove(vids[4], dns[1]) - state_exist.Remove(vids[4], dns[2]) - }, - expectResultAfterUpdate: []bool{true, false, true, false, false}, - }, - { - name: "mark true when inexist copies", - state: state_no, - expectResult: []bool{false, true, true, false, true}, - update: func() { - state_no.Remove(vids[0], dns[2]) - state_no.Remove(vids[1], dns[2]) - state_no.Add(vids[2], dns[1]) - state_no.Remove(vids[3], dns[1]) - state_no.Remove(vids[4], dns[2]) - }, - expectResultAfterUpdate: []bool{false, true, false, true, true}, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - var result []bool - for index, _ := range vids { - result = append(result, test.state.IsTrue(vids[index])) - } - if len(result) != len(test.expectResult) { - t.Fatalf("len(result) != len(expectResult), got %d, expected %d\n", - len(result), len(test.expectResult)) - } - for index, val := range result { - if val != test.expectResult[index] { - t.Fatalf("result not matched, index %d, got %v, expect %v\n", - index, val, test.expectResult[index]) - } - } - test.update() - var updateResult []bool - for index, _ := range vids { - updateResult = append(updateResult, test.state.IsTrue(vids[index])) - } - if len(updateResult) != len(test.expectResultAfterUpdate) { - t.Fatalf("len(updateResult) != len(expectResultAfterUpdate), got %d, expected %d\n", - len(updateResult), len(test.expectResultAfterUpdate)) - } - for index, val := range updateResult { - if val != test.expectResultAfterUpdate[index] { - t.Fatalf("update result not matched, index %d, got %v, expect %v\n", - index, val, test.expectResultAfterUpdate[index]) - } - } - }) - } -}