diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go index 9500bd9db..72da0ca64 100644 --- a/weed/mount/meta_cache/meta_cache_subscribe.go +++ b/weed/mount/meta_cache/meta_cache_subscribe.go @@ -57,11 +57,10 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil } - var clientEpoch int32 metadataFollowOption := &pb.MetadataFollowOption{ ClientName: "mount", ClientId: selfSignature, - ClientEpoch: clientEpoch, + ClientEpoch: 1, SelfSignature: selfSignature, PathPrefix: dir, AdditionalPathPrefixes: nil, @@ -71,7 +70,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil EventErrorType: pb.FatalOnError, } util.RetryUntil("followMetaUpdates", func() error { - clientEpoch++ + metadataFollowOption.ClientEpoch++ return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, processEventFn) }, func(err error) bool { glog.Errorf("follow metadata updates: %v", err) diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go index 766408909..1f6b30312 100644 --- a/weed/s3api/auth_credentials_subscribe.go +++ b/weed/s3api/auth_credentials_subscribe.go @@ -33,11 +33,10 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, p return nil } - var clientEpoch int32 metadataFollowOption := &pb.MetadataFollowOption{ ClientName: clientName, ClientId: s3a.randomClientId, - ClientEpoch: clientEpoch, + ClientEpoch: 1, SelfSignature: 0, PathPrefix: prefix, AdditionalPathPrefixes: nil, @@ -47,7 +46,7 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, p EventErrorType: pb.FatalOnError, } util.RetryUntil("followIamChanges", func() error { - clientEpoch++ + metadataFollowOption.ClientEpoch++ return pb.WithFilerClientFollowMetadata(s3a, metadataFollowOption, processEventFn) }, func(err error) bool { glog.V(0).Infof("iam follow metadata changes: %v", err) diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index bd24ab54f..eb69a6aeb 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -24,11 +24,13 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, peerAddress := findClientAddress(stream.Context(), 0) - alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId, req.ClientEpoch) - if alreadyKnown { + isReplacing, alreadyKnown, clientName := fs.addClient("", req.ClientName, peerAddress, req.ClientId, req.ClientEpoch) + if isReplacing { + fs.filer.MetaAggregator.ListenersCond.Broadcast() // nudges the subscribers that are waiting + } else if alreadyKnown { return fmt.Errorf("duplicated subscription detected for client %s id %d", clientName, req.ClientId) } - defer fs.deleteClient(clientName, req.ClientId, req.ClientEpoch) + defer fs.deleteClient("", clientName, req.ClientId, req.ClientEpoch) lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) @@ -64,6 +66,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersCond.Wait() fs.filer.MetaAggregator.ListenersLock.Unlock() + if !fs.hasClient(req.ClientId, req.ClientEpoch) { + return false + } return true }, eachLogEntryFn) if readInMemoryLogErr != nil { @@ -78,6 +83,10 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, if isDone { return nil } + if !fs.hasClient(req.ClientId, req.ClientEpoch) { + glog.V(0).Infof("client %v is closed", clientName) + return nil + } time.Sleep(1127 * time.Millisecond) } @@ -93,13 +102,15 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq // use negative client id to differentiate from addClient()/deleteClient() used in SubscribeMetadata() req.ClientId = -req.ClientId - alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId, req.ClientEpoch) - if alreadyKnown { + isReplacing, alreadyKnown, clientName := fs.addClient("local", req.ClientName, peerAddress, req.ClientId, req.ClientEpoch) + if isReplacing { + fs.listenersCond.Broadcast() // nudges the subscribers that are waiting + } else if alreadyKnown { return fmt.Errorf("duplicated local subscription detected for client %s clientId:%d", clientName, req.ClientId) } defer func() { - glog.V(0).Infof(" - %v local subscribe %s clientId:%d", clientName, req.PathPrefix, req.ClientId) - fs.deleteClient(clientName, req.ClientId, req.ClientEpoch) + glog.V(0).Infof("disconnect %v local subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId) + fs.deleteClient("local", clientName, req.ClientId, req.ClientEpoch) }() lastReadTime := time.Unix(0, req.SinceNs) @@ -141,6 +152,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq fs.listenersLock.Lock() fs.listenersCond.Wait() fs.listenersLock.Unlock() + if !fs.hasClient(req.ClientId, req.ClientEpoch) { + return false + } return true }, eachLogEntryFn) if readInMemoryLogErr != nil { @@ -155,6 +169,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq if isDone { return nil } + if !fs.hasClient(req.ClientId, req.ClientEpoch) { + return nil + } } return readInMemoryLogErr @@ -274,15 +291,16 @@ func matchByDirectory(dirPath string, directories []string) bool { return false } -func (fs *FilerServer) addClient(clientType string, clientAddress string, clientId int32, clientEpoch int32) (alreadyKnown bool, clientName string) { +func (fs *FilerServer) addClient(prefix string, clientType string, clientAddress string, clientId int32, clientEpoch int32) (isReplacing, alreadyKnown bool, clientName string) { clientName = clientType + "@" + clientAddress - glog.V(0).Infof("+ listener %v", clientName) + glog.V(0).Infof("+ %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch) if clientId != 0 { fs.knownListenersLock.Lock() defer fs.knownListenersLock.Unlock() epoch, found := fs.knownListeners[clientId] if !found || epoch < clientEpoch { fs.knownListeners[clientId] = clientEpoch + isReplacing = true } else { alreadyKnown = true } @@ -290,8 +308,8 @@ func (fs *FilerServer) addClient(clientType string, clientAddress string, client return } -func (fs *FilerServer) deleteClient(clientName string, clientId int32, clientEpoch int32) { - glog.V(0).Infof("- listener %v", clientName) +func (fs *FilerServer) deleteClient(prefix string, clientName string, clientId int32, clientEpoch int32) { + glog.V(0).Infof("- %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch) if clientId != 0 { fs.knownListenersLock.Lock() defer fs.knownListenersLock.Unlock() @@ -301,3 +319,15 @@ func (fs *FilerServer) deleteClient(clientName string, clientId int32, clientEpo } } } + +func (fs *FilerServer) hasClient(clientId int32, clientEpoch int32) bool { + if clientId != 0 { + fs.knownListenersLock.Lock() + defer fs.knownListenersLock.Unlock() + epoch, found := fs.knownListeners[clientId] + if found && epoch <= clientEpoch { + return true + } + } + return false +}