filer: mysql2, postgres2 trigger actions on bucket creation and deletion

fix https://github.com/chrislusf/seaweedfs/issues/1877
This commit is contained in:
Chris Lu 2021-03-13 22:07:39 -08:00
parent cb423312a4
commit 6d3a96eb56
4 changed files with 66 additions and 0 deletions

View file

@ -32,6 +32,25 @@ type AbstractSqlStore struct {
dbsLock sync.Mutex dbsLock sync.Mutex
} }
func (store *AbstractSqlStore) OnBucketCreation(bucket string) {
store.dbsLock.Lock()
defer store.dbsLock.Unlock()
if store.dbs == nil {
return
}
store.dbs[bucket] = true
}
func (store *AbstractSqlStore) OnBucketDeletion(bucket string) {
store.dbsLock.Lock()
defer store.dbsLock.Unlock()
if store.dbs == nil {
return
}
delete(store.dbs, bucket)
}
const ( const (
DEFAULT_TABLE = "filemeta" DEFAULT_TABLE = "filemeta"
) )

View file

@ -12,6 +12,24 @@ import (
// onMetadataChangeEvent is triggered after filer processed change events from local or remote filers // onMetadataChangeEvent is triggered after filer processed change events from local or remote filers
func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) { func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) {
f.maybeReloadFilerConfiguration(event) f.maybeReloadFilerConfiguration(event)
f.onBucketEvents(event)
}
func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) {
message := event.EventNotification
for _, sig := range message.Signatures {
if sig == f.Signature {
return
}
}
if f.DirBucketsPath == event.Directory {
if message.OldEntry == nil && message.NewEntry != nil {
f.Store.OnBucketCreation(message.NewEntry.Name)
}
if message.OldEntry != nil && message.NewEntry == nil {
f.Store.OnBucketDeletion(message.OldEntry.Name)
}
}
} }
func (f *Filer) maybeReloadFilerConfiguration(event *filer_pb.SubscribeMetadataResponse) { func (f *Filer) maybeReloadFilerConfiguration(event *filer_pb.SubscribeMetadataResponse) {

View file

@ -39,3 +39,9 @@ type FilerStore interface {
Shutdown() Shutdown()
} }
type BucketAware interface {
OnBucketCreation(bucket string)
OnBucketDeletion(bucket string)
}

View file

@ -21,6 +21,8 @@ type VirtualFilerStore interface {
DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
DeleteOneEntry(ctx context.Context, entry *Entry) error DeleteOneEntry(ctx context.Context, entry *Entry) error
AddPathSpecificStore(path string, storeId string, store FilerStore) AddPathSpecificStore(path string, storeId string, store FilerStore)
OnBucketCreation(bucket string)
OnBucketDeletion(bucket string)
} }
type FilerStoreWrapper struct { type FilerStoreWrapper struct {
@ -40,6 +42,27 @@ func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
} }
} }
func (fsw *FilerStoreWrapper) OnBucketCreation(bucket string) {
for _, store := range fsw.storeIdToStore {
if ba, ok := store.(BucketAware); ok {
ba.OnBucketCreation(bucket)
}
}
if ba, ok := fsw.defaultStore.(BucketAware); ok {
ba.OnBucketCreation(bucket)
}
}
func (fsw *FilerStoreWrapper) OnBucketDeletion(bucket string) {
for _, store := range fsw.storeIdToStore {
if ba, ok := store.(BucketAware); ok {
ba.OnBucketDeletion(bucket)
}
}
if ba, ok := fsw.defaultStore.(BucketAware); ok {
ba.OnBucketDeletion(bucket)
}
}
func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) { func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) {
fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store) fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store)
err := fsw.pathToStore.Put([]byte(path), storeId) err := fsw.pathToStore.Put([]byte(path), storeId)