Revert "remove duplicated metadata subscription in filer"

This reverts commit 34742be029.

Related to https://github.com/chrislusf/seaweedfs/issues/2545
This commit is contained in:
chrislu 2022-03-26 12:33:45 -07:00
parent fba1cfc2d6
commit 34b743c481

View file

@ -25,7 +25,7 @@ type MetaAggregator struct {
isLeader bool isLeader bool
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
MetaLogBuffer *log_buffer.LogBuffer MetaLogBuffer *log_buffer.LogBuffer
peerStatues map[pb.ServerAddress]int peerStatues map[pb.ServerAddress]struct{}
peerStatuesLock sync.Mutex peerStatuesLock sync.Mutex
// notifying clients // notifying clients
ListenersLock sync.Mutex ListenersLock sync.Mutex
@ -39,7 +39,7 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.
filer: filer, filer: filer,
self: self, self: self,
grpcDialOption: grpcDialOption, grpcDialOption: grpcDialOption,
peerStatues: make(map[pb.ServerAddress]int), peerStatues: make(map[pb.ServerAddress]struct{}),
} }
t.ListenersCond = sync.NewCond(&t.ListenersLock) t.ListenersCond = sync.NewCond(&t.ListenersLock)
t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() { t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() {
@ -56,40 +56,27 @@ func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
address := pb.ServerAddress(update.Address) address := pb.ServerAddress(update.Address)
if update.IsAdd { if update.IsAdd {
// every filer should subscribe to a new filer // every filer should subscribe to a new filer
if ma.setActive(address, true) { ma.setActive(address, true)
go ma.subscribeToOneFiler(ma.filer, ma.self, address) go ma.subscribeToOneFiler(ma.filer, ma.self, address)
}
} else { } else {
ma.setActive(address, false) ma.setActive(address, false)
} }
} }
func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) { func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) {
ma.peerStatuesLock.Lock() ma.peerStatuesLock.Lock()
defer ma.peerStatuesLock.Unlock() defer ma.peerStatuesLock.Unlock()
if isActive { if isActive {
if _, found := ma.peerStatues[address]; found { ma.peerStatues[address] = struct{}{}
ma.peerStatues[address] += 1
} else {
ma.peerStatues[address] = 1
notDuplicated = true
}
} else { } else {
if _, found := ma.peerStatues[address]; found { delete(ma.peerStatues, address)
ma.peerStatues[address] -= 1
}
if ma.peerStatues[address] <= 0 {
delete(ma.peerStatues, address)
}
} }
return
} }
func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) { func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) {
ma.peerStatuesLock.Lock() ma.peerStatuesLock.Lock()
defer ma.peerStatuesLock.Unlock() defer ma.peerStatuesLock.Unlock()
var count int _, isActive = ma.peerStatues[address]
count, isActive = ma.peerStatues[address] return
return count > 0 && isActive
} }
func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) { func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) {