Volume filter function added.

This commit is contained in:
user 2021-11-11 17:36:26 +09:00
parent c387fe957b
commit dbb8003ce3

View file

@ -55,7 +55,6 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
if err != nil { if err != nil {
return fmt.Errorf("replication format: %v", err) return fmt.Errorf("replication format: %v", err)
} }
replicaPlacementInt32 := uint32(replicaPlacement.Byte())
// collect topology information // collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv) topologyInfo, _, err := collectTopologyInfo(commandEnv)
@ -64,6 +63,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
} }
vid := needle.VolumeId(*volumeIdInt) vid := needle.VolumeId(*volumeIdInt)
volumeFilter := getVolumeFilter(replicaPlacement, uint32(vid))
// find all data nodes with volumes that needs replication change // find all data nodes with volumes that needs replication change
var allLocations []location var allLocations []location
@ -71,7 +71,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
loc := newLocation(dc, string(rack), dn) loc := newLocation(dc, string(rack), dn)
for _, diskInfo := range dn.DiskInfos { for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos { for _, v := range diskInfo.VolumeInfos {
if v.Id == uint32(vid) && v.ReplicaPlacement != replicaPlacementInt32 { if volumeFilter(v) {
allLocations = append(allLocations, loc) allLocations = append(allLocations, loc)
continue continue
} }
@ -106,3 +106,10 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
return nil return nil
} }
func getVolumeFilter(replicaPlacement *super_block.ReplicaPlacement, volumeId uint32) func(message *master_pb.VolumeInformationMessage) bool {
replicaPlacementInt32 := uint32(replicaPlacement.Byte())
return func(v *master_pb.VolumeInformationMessage) bool {
return v.Id == volumeId && v.ReplicaPlacement != replicaPlacementInt32
}
}