diff --git a/weed-fs/src/pkg/topology/node.go b/weed-fs/src/pkg/topology/node.go index 81bad7da5..c65e53217 100644 --- a/weed-fs/src/pkg/topology/node.go +++ b/weed-fs/src/pkg/topology/node.go @@ -165,7 +165,7 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi for _, v := range dn.volumes { if uint64(v.Size) >= volumeSizeLimit { //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) - n.GetTopology().chanFullVolumes <- &v + n.GetTopology().chanFullVolumes <- v } } } diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go index 52d13135f..adaa16691 100644 --- a/weed-fs/src/pkg/topology/topology.go +++ b/weed-fs/src/pkg/topology/topology.go @@ -23,7 +23,7 @@ type Topology struct { chanDeadDataNodes chan *DataNode chanRecoveredDataNodes chan *DataNode - chanFullVolumes chan *storage.VolumeInfo + chanFullVolumes chan storage.VolumeInfo configuration *Configuration } @@ -42,7 +42,7 @@ func NewTopology(id string, confFile string, dirname string, sequenceFilename st t.chanDeadDataNodes = make(chan *DataNode) t.chanRecoveredDataNodes = make(chan *DataNode) - t.chanFullVolumes = make(chan *storage.VolumeInfo) + t.chanFullVolumes = make(chan storage.VolumeInfo) t.loadConfiguration(confFile) diff --git a/weed-fs/src/pkg/topology/topology_event_handling.go b/weed-fs/src/pkg/topology/topology_event_handling.go index 9d7999f93..4ff1a0bcc 100644 --- a/weed-fs/src/pkg/topology/topology_event_handling.go +++ b/weed-fs/src/pkg/topology/topology_event_handling.go @@ -26,7 +26,6 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { select { case v := <-t.chanFullVolumes: t.SetVolumeCapacityFull(v) - fmt.Println("Volume", v, "is full!") case dn := <-t.chanRecoveredDataNodes: t.RegisterRecoveredDataNode(dn) fmt.Println("DataNode", dn, "is back alive!") @@ -37,12 +36,15 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { } }() } -func (t *Topology) SetVolumeCapacityFull(volumeInfo *storage.VolumeInfo) { +func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { vl := t.GetVolumeLayout(volumeInfo.RepType) - vl.SetVolumeCapacityFull(volumeInfo.Id) + if !vl.SetVolumeCapacityFull(volumeInfo.Id) { + return false + } for _, dn := range vl.vid2location[volumeInfo.Id].list { dn.UpAdjustActiveVolumeCountDelta(-1) } + return true } func (t *Topology) UnRegisterDataNode(dn *DataNode) { for _, v := range dn.volumes { diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go index 2dee803d4..535effde6 100644 --- a/weed-fs/src/pkg/topology/volume_layout.go +++ b/weed-fs/src/pkg/topology/volume_layout.go @@ -38,8 +38,8 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } } -func (vl *VolumeLayout) Lookup(vid storage.VolumeId) (*[]*DataNode) { - return &vl.vid2location[vid].list +func (vl *VolumeLayout) Lookup(vid storage.VolumeId) *[]*DataNode { + return &vl.vid2location[vid].list } func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) { @@ -63,6 +63,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount() int { func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { for i, v := range vl.writables { if v == vid { + fmt.Println("Volume", vid, "becomes unwritable") vl.writables = append(vl.writables[:i], vl.writables[i+1:]...) return true } @@ -75,6 +76,7 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { return false } } + fmt.Println("Volume", vid, "becomes writable") vl.writables = append(vl.writables, vid) return true } @@ -91,7 +93,6 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { if vl.vid2location[vid].Add(dn) { if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() { - fmt.Println("Volume", vid, "becomes writable") return vl.setVolumeWritable(vid) } }