From 6d3a96eb562f13336fac7e52ccee905fd696c426 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 13 Mar 2021 22:07:39 -0800 Subject: [PATCH] filer: mysql2, postgres2 trigger actions on bucket creation and deletion fix https://github.com/chrislusf/seaweedfs/issues/1877 --- weed/filer/abstract_sql/abstract_sql_store.go | 19 +++++++++++++++ weed/filer/filer_on_meta_event.go | 18 +++++++++++++++ weed/filer/filerstore.go | 6 +++++ weed/filer/filerstore_wrapper.go | 23 +++++++++++++++++++ 4 files changed, 66 insertions(+) diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index 07ce56145..6f37a118b 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -32,6 +32,25 @@ type AbstractSqlStore struct { dbsLock sync.Mutex } +func (store *AbstractSqlStore) OnBucketCreation(bucket string) { + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + if store.dbs == nil { + return + } + store.dbs[bucket] = true +} +func (store *AbstractSqlStore) OnBucketDeletion(bucket string) { + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + if store.dbs == nil { + return + } + delete(store.dbs, bucket) +} + const ( DEFAULT_TABLE = "filemeta" ) diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go index b29324b61..c9f75a5ca 100644 --- a/weed/filer/filer_on_meta_event.go +++ b/weed/filer/filer_on_meta_event.go @@ -12,6 +12,24 @@ import ( // onMetadataChangeEvent is triggered after filer processed change events from local or remote filers func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) { f.maybeReloadFilerConfiguration(event) + f.onBucketEvents(event) +} + +func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) { + message := event.EventNotification + for _, sig := range message.Signatures { + if sig == f.Signature { + return + } + } + if f.DirBucketsPath == event.Directory { + if message.OldEntry == nil && message.NewEntry != nil { + f.Store.OnBucketCreation(message.NewEntry.Name) + } + if message.OldEntry != nil && message.NewEntry == nil { + f.Store.OnBucketDeletion(message.OldEntry.Name) + } + } } func (f *Filer) maybeReloadFilerConfiguration(event *filer_pb.SubscribeMetadataResponse) { diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index 8955a25c7..069920f2d 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -39,3 +39,9 @@ type FilerStore interface { Shutdown() } + +type BucketAware interface { + OnBucketCreation(bucket string) + OnBucketDeletion(bucket string) +} + diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index 64baac371..95848e61b 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -21,6 +21,8 @@ type VirtualFilerStore interface { DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error DeleteOneEntry(ctx context.Context, entry *Entry) error AddPathSpecificStore(path string, storeId string, store FilerStore) + OnBucketCreation(bucket string) + OnBucketDeletion(bucket string) } type FilerStoreWrapper struct { @@ -40,6 +42,27 @@ func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper { } } +func (fsw *FilerStoreWrapper) OnBucketCreation(bucket string) { + for _, store := range fsw.storeIdToStore { + if ba, ok := store.(BucketAware); ok { + ba.OnBucketCreation(bucket) + } + } + if ba, ok := fsw.defaultStore.(BucketAware); ok { + ba.OnBucketCreation(bucket) + } +} +func (fsw *FilerStoreWrapper) OnBucketDeletion(bucket string) { + for _, store := range fsw.storeIdToStore { + if ba, ok := store.(BucketAware); ok { + ba.OnBucketDeletion(bucket) + } + } + if ba, ok := fsw.defaultStore.(BucketAware); ok { + ba.OnBucketDeletion(bucket) + } +} + func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) { fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store) err := fsw.pathToStore.Put([]byte(path), storeId)