From cae998eda1252aa4df754b75fef7c08d438e705b Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 26 Mar 2022 13:00:16 -0700 Subject: [PATCH] Revert "Revert "remove duplicated metadata subscription in filer"" This reverts commit 34b743c481e65b25c052512603d1794eddfe839c. --- weed/filer/meta_aggregator.go | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 13c2239f0..1e8b89ad5 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -25,7 +25,7 @@ type MetaAggregator struct { isLeader bool grpcDialOption grpc.DialOption MetaLogBuffer *log_buffer.LogBuffer - peerStatues map[pb.ServerAddress]struct{} + peerStatues map[pb.ServerAddress]int peerStatuesLock sync.Mutex // notifying clients ListenersLock sync.Mutex @@ -39,7 +39,7 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc. filer: filer, self: self, grpcDialOption: grpcDialOption, - peerStatues: make(map[pb.ServerAddress]struct{}), + peerStatues: make(map[pb.ServerAddress]int), } t.ListenersCond = sync.NewCond(&t.ListenersLock) t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() { @@ -56,27 +56,40 @@ func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { address := pb.ServerAddress(update.Address) if update.IsAdd { // every filer should subscribe to a new filer - ma.setActive(address, true) - go ma.subscribeToOneFiler(ma.filer, ma.self, address) + if ma.setActive(address, true) { + go ma.subscribeToOneFiler(ma.filer, ma.self, address) + } } else { ma.setActive(address, false) } } -func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) { +func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) { ma.peerStatuesLock.Lock() defer ma.peerStatuesLock.Unlock() if isActive { - ma.peerStatues[address] = struct{}{} + if _, found := ma.peerStatues[address]; found { + ma.peerStatues[address] += 1 + } else { + ma.peerStatues[address] = 1 + notDuplicated = true + } } else { - delete(ma.peerStatues, address) + if _, found := ma.peerStatues[address]; found { + ma.peerStatues[address] -= 1 + } + if ma.peerStatues[address] <= 0 { + delete(ma.peerStatues, address) + } } + return } func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) { ma.peerStatuesLock.Lock() defer ma.peerStatuesLock.Unlock() - _, isActive = ma.peerStatues[address] - return + var count int + count, isActive = ma.peerStatues[address] + return count > 0 && isActive } func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) {