master: broadcast new volume locations to clients to avoid possible racing condition

fix https://github.com/chrislusf/seaweedfs/issues/3220
This commit is contained in:
chrislu 2022-06-23 00:41:33 -07:00
parent 52c44d646e
commit 96496d5286
2 changed files with 30 additions and 16 deletions

View file

@ -52,8 +52,13 @@ func (ms *MasterServer) ProcessGrowRequest() {
go func() { go func() {
glog.V(1).Infoln("starting automatic volume grow") glog.V(1).Infoln("starting automatic volume grow")
start := time.Now() start := time.Now()
_, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count) newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count)
glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start)) glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start))
if err == nil {
for _, newVidLocation := range newVidLocations {
ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: newVidLocation})
}
}
vl.DoneGrowRequest() vl.DoneGrowRequest()
if req.ErrCh != nil { if req.ErrCh != nil {

View file

@ -3,6 +3,7 @@ package topology
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"math/rand" "math/rand"
"sync" "sync"
@ -77,42 +78,50 @@ func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
return return
} }
func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (count int, err error) { func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (result []*master_pb.VolumeLocation, err error) {
if targetCount == 0 { if targetCount == 0 {
targetCount = vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()) targetCount = vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount())
} }
count, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo) result, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo)
if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 { if len(result) > 0 && len(result)%option.ReplicaPlacement.GetCopyCount() == 0 {
return count, nil return result, nil
} }
return count, err return result, err
} }
func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) { func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (result []*master_pb.VolumeLocation, err error) {
vg.accessLock.Lock() vg.accessLock.Lock()
defer vg.accessLock.Unlock() defer vg.accessLock.Unlock()
for i := 0; i < targetCount; i++ { for i := 0; i < targetCount; i++ {
if c, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil { if res, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil {
counter += c result = append(result, res...)
} else { } else {
glog.V(0).Infof("create %d volume, created %d: %v", targetCount, counter, e) glog.V(0).Infof("create %d volume, created %d: %v", targetCount, len(result), e)
return counter, e return result, e
} }
} }
return return
} }
func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (int, error) { func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (result []*master_pb.VolumeLocation, err error) {
servers, e := vg.findEmptySlotsForOneVolume(topo, option) servers, e := vg.findEmptySlotsForOneVolume(topo, option)
if e != nil { if e != nil {
return 0, e return nil, e
} }
vid, raftErr := topo.NextVolumeId() vid, raftErr := topo.NextVolumeId()
if raftErr != nil { if raftErr != nil {
return 0, raftErr return nil, raftErr
} }
err := vg.grow(grpcDialOption, topo, vid, option, servers...) if err = vg.grow(grpcDialOption, topo, vid, option, servers...); err == nil {
return len(servers), err for _, server := range servers {
result = append(result, &master_pb.VolumeLocation{
Url: server.Url(),
PublicUrl: server.PublicUrl,
NewVids: []uint32{uint32(vid)},
})
}
}
return
} }
// 1. find the main data node // 1. find the main data node