From eed87791b71c27b1d2fe42f5acb87aaf1af5811c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 7 Dec 2020 00:10:29 -0800 Subject: [PATCH] s3: subscribe to s3.configure changes --- weed/s3api/auth_credentials.go | 18 ++++--- weed/s3api/auth_credentials_subscribe.go | 69 ++++++++++++++++++++++++ weed/s3api/s3api_server.go | 4 ++ 3 files changed, 83 insertions(+), 8 deletions(-) create mode 100644 weed/s3api/auth_credentials_subscribe.go diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 9fcd016e2..452557619 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -1,18 +1,15 @@ package s3api import ( - "bytes" "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "io/ioutil" "net/http" - xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" - "github.com/chrislusf/seaweedfs/weed/s3api/s3err" - "github.com/golang/protobuf/jsonpb" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/iam_pb" + xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" ) type Action string @@ -50,7 +47,7 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag domain: option.DomainName, } if err := iam.loadS3ApiConfigurationFromFiler(option); err != nil { - glog.Warningf("fail to load config %v", err) + glog.Warningf("fail to load config: %v", err) } if len(iam.identities) == 0 && option.Config != "" { if err := iam.loadS3ApiConfigurationFromFile(option.Config); err != nil { @@ -72,6 +69,7 @@ func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3A if err := iam.loadS3ApiConfiguration(s3ApiConfiguration); err != nil { return fmt.Errorf("laod S3 config: %v", err) } + glog.V(0).Infof("loaded %d s3 identities", len(iam.identities)) return nil } @@ -84,7 +82,7 @@ func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFile(fileName str } glog.V(1).Infof("load s3 config: %v", fileName) - if err := jsonpb.Unmarshal(bytes.NewReader(rawData), s3ApiConfiguration); err != nil { + if err := filer.ParseS3ConfigurationFromBytes(rawData, s3ApiConfiguration); err != nil { glog.Warningf("unmarshal error: %v", err) return fmt.Errorf("unmarshal %s error: %v", fileName, err) } @@ -95,6 +93,7 @@ func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFile(fileName str } func (iam *IdentityAccessManagement) loadS3ApiConfiguration(config *iam_pb.S3ApiConfiguration) error { + var identities []*Identity for _, ident := range config.Identities { t := &Identity{ Name: ident.Name, @@ -110,8 +109,11 @@ func (iam *IdentityAccessManagement) loadS3ApiConfiguration(config *iam_pb.S3Api SecretKey: cred.SecretKey, }) } - iam.identities = append(iam.identities, t) + identities = append(identities, t) } + + // atomically switch + iam.identities = identities return nil } diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go new file mode 100644 index 000000000..f541628bb --- /dev/null +++ b/weed/s3api/auth_credentials_subscribe.go @@ -0,0 +1,69 @@ +package s3api + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "io" + "time" +) + +func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, lastTsNs int64) error { + + processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { + + message := resp.EventNotification + if message.NewEntry == nil { + return nil + } + + dir := resp.Directory + + if message.NewParentPath != "" { + dir = message.NewParentPath + } + if dir == filer.IamConfigDirecotry && message.NewEntry.Name == filer.IamIdentityFile { + if err := s3a.iam.loadS3ApiConfigurationFromFiler(s3a.option); err != nil { + return err + } + } + + return nil + } + + for { + err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ + ClientName: clientName, + PathPrefix: prefix, + SinceNs: lastTsNs, + }) + if err != nil { + return fmt.Errorf("subscribe: %v", err) + } + + for { + resp, listenErr := stream.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } + + if err := processEventFn(resp); err != nil { + glog.Fatalf("process %v: %v", resp, err) + } + lastTsNs = resp.TsNs + } + }) + if err != nil { + glog.Errorf("subscribing filer meta change: %v", err) + } + time.Sleep(time.Second) + } +} diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 850a02171..18f8b563f 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -2,8 +2,10 @@ package s3api import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" "net/http" "strings" + "time" "github.com/gorilla/mux" "google.golang.org/grpc" @@ -32,6 +34,8 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer s3ApiServer.registerRouter(router) + go s3ApiServer.subscribeMetaEvents("s3", filer.IamConfigDirecotry+"/"+filer.IamIdentityFile, time.Now().UnixNano()) + return s3ApiServer, nil }