diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java index 51053becd..19ae78277 100644 --- a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java +++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java @@ -10,7 +10,7 @@ import java.util.List; public class ByteBufferPool { - private static final int MIN_BUFFER_SIZE = 8 * 1024 * 1024; + private static final int MIN_BUFFER_SIZE = 1 * 1024 * 1024; private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class); private static final List bufferList = new ArrayList<>(); diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go index c4c90fb63..14e8cab1e 100644 --- a/weed/filer/read_write.go +++ b/weed/filer/read_write.go @@ -2,13 +2,9 @@ package filer import ( "bytes" - "fmt" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" - "io/ioutil" "math" - "net/http" "time" ) @@ -31,50 +27,17 @@ func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.Seaweed } -func ReadContent(filerAddress string, dir, name string) ([]byte, error) { - - target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name) - - data, _, err := util.Get(target) - - return data, err -} - -func SaveAs(host string, port int, dir, name string, contentType string, byteBuffer *bytes.Buffer) error { - var target string - if port == 0 { - target = fmt.Sprintf("http://%s%s/%s", host, dir, name) - } else { - target = fmt.Sprintf("http://%s:%d%s/%s", host, port, dir, name) +func ReadInsideFiler(filerClient filer_pb.SeaweedFilerClient, dir, name string) (content []byte, err error) { + request := &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, } - - // set the HTTP method, url, and request body - req, err := http.NewRequest(http.MethodPut, target, byteBuffer) + respLookupEntry, err := filer_pb.LookupEntry(filerClient, request) if err != nil { - return err + return } - - // set the request header Content-Type for json - if contentType != "" { - req.Header.Set("Content-Type", contentType) - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer util.CloseResponse(resp) - - b, err := ioutil.ReadAll(resp.Body) - if err != nil { - return err - } - - if resp.StatusCode >= 400 { - return fmt.Errorf("%s: %s %v", target, resp.Status, string(b)) - } - - return nil - + content = respLookupEntry.Entry.Content + return } func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, content []byte) error { diff --git a/weed/filer/s3iam_conf.go b/weed/filer/s3iam_conf.go index 92387fb09..55c976915 100644 --- a/weed/filer/s3iam_conf.go +++ b/weed/filer/s3iam_conf.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/chrislusf/seaweedfs/weed/pb/iam_pb" "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" "io" ) @@ -14,7 +15,7 @@ func ParseS3ConfigurationFromBytes(content []byte, config *iam_pb.S3ApiConfigura return nil } -func S3ConfigurationToText(writer io.Writer, config *iam_pb.S3ApiConfiguration) error { +func ProtoToText(writer io.Writer, config proto.Message) error { m := jsonpb.Marshaler{ EmitDefaults: false, diff --git a/weed/filer/s3iam_conf_test.go b/weed/filer/s3iam_conf_test.go index 65cc49840..da7d9c9f1 100644 --- a/weed/filer/s3iam_conf_test.go +++ b/weed/filer/s3iam_conf_test.go @@ -44,7 +44,7 @@ func TestS3Conf(t *testing.T) { }, } var buf bytes.Buffer - err := S3ConfigurationToText(&buf, s3Conf) + err := ProtoToText(&buf, s3Conf) assert.Equal(t, err, nil) s3ConfSaved := &iam_pb.S3ApiConfiguration{} err = ParseS3ConfigurationFromBytes(buf.Bytes(), s3ConfSaved) diff --git a/weed/iamapi/iamapi_server.go b/weed/iamapi/iamapi_server.go index eb18e996d..037594165 100644 --- a/weed/iamapi/iamapi_server.go +++ b/weed/iamapi/iamapi_server.go @@ -96,8 +96,8 @@ func (iam IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfigurat func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) { buf := bytes.Buffer{} - if err := filer.S3ConfigurationToText(&buf, s3cfg); err != nil { - return fmt.Errorf("S3ConfigurationToText: %s", err) + if err := filer.ProtoToText(&buf, s3cfg); err != nil { + return fmt.Errorf("ProtoToText: %s", err) } return pb.WithGrpcFilerClient( iam.option.FilerGrpcAddress, diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 3439b40df..44c3f7aa7 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/iam_pb" xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" @@ -51,8 +53,12 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag return iam } -func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) error { - content, err := filer.ReadContent(option.Filer, filer.IamConfigDirecotry, filer.IamIdentityFile) +func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) (err error) { + var content []byte + err = pb.WithFilerClient(option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + content, err = filer.ReadInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile) + return err + }) if err != nil { return fmt.Errorf("read S3 config: %v", err) } diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 624069b7e..3fdac1b26 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -2,7 +2,6 @@ package weed_server import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/util/log_buffer" "strings" "time" @@ -12,6 +11,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" +) + +const ( + // MaxUnsyncedEvents send empty notification with timestamp when certain amount of events have been filtered + MaxUnsyncedEvents = 1e3 ) func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error { @@ -25,7 +30,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) + eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -87,7 +92,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) + eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -152,12 +157,25 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati } } -func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { - return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { +func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + filtered := 0 + return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + defer func() { + if filtered > MaxUnsyncedEvents { + if err := stream.Send(&filer_pb.SubscribeMetadataResponse{ + EventNotification: &filer_pb.EventNotification{}, + TsNs: tsNs, + }); err == nil { + filtered = 0 + } + } + }() + + filtered++ foundSelf := false for _, sig := range eventNotification.Signatures { - if sig == clientSignature && clientSignature != 0 { + if sig == req.Signature && req.Signature != 0 { return nil } if sig == fs.filer.Signature { @@ -204,6 +222,7 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe glog.V(0).Infof("=> client %v: %+v", clientName, err) return err } + filtered = 0 return nil } } diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index e28309c6a..395852517 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -70,7 +70,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque return nil, md5Hash, 0, err, nil } if chunkOffset == 0 && !isAppend(r) { - if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && dataSize < 4*1024 { + if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) { chunkOffset += dataSize smallContent = make([]byte, dataSize) bytesBuffer.Read(smallContent) diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go index 52fcae1c6..a7e601bec 100644 --- a/weed/shell/command_fs_configure.go +++ b/weed/shell/command_fs_configure.go @@ -120,7 +120,9 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io if *apply { - if err := filer.SaveAs(commandEnv.option.FilerHost, int(commandEnv.option.FilerPort), filer.DirectoryEtcSeaweedFS, filer.FilerConfName, "text/plain; charset=utf-8", &buf); err != nil { + if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf.Bytes()) + }); err != nil && err != filer_pb.ErrNotFound { return err } diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go index ca51ef72f..5eab2ebd0 100644 --- a/weed/shell/command_s3_configure.go +++ b/weed/shell/command_s3_configure.go @@ -164,7 +164,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io } buf.Reset() - filer.S3ConfigurationToText(&buf, s3cfg) + filer.ProtoToText(&buf, s3cfg) fmt.Fprintf(writer, string(buf.Bytes())) fmt.Fprintln(writer) diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 64389fdb5..0b40cfb7c 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -206,7 +206,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[u if verbose { for _, fid := range orphanFileIds { - fmt.Fprintf(writer, "%sxxxxxxxx\n", fid) + fmt.Fprintf(writer, "%s\n", fid) } } @@ -410,7 +410,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder stri var orphanFileCount uint64 db.AscendingVisit(func(n needle_map.NeedleValue) error { // fmt.Printf("%d,%x\n", volumeId, n.Key) - orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s", volumeId, n.Key.String())) + orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s00000000", volumeId, n.Key.String())) orphanFileCount++ orphanDataSize += uint64(n.Size) return nil diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go index 5b5fa2704..0fa138496 100644 --- a/weed/wdclient/exclusive_locks/exclusive_locker.go +++ b/weed/wdclient/exclusive_locks/exclusive_locker.go @@ -18,10 +18,10 @@ const ( ) type ExclusiveLocker struct { - masterClient *wdclient.MasterClient token int64 lockTsNs int64 isLocking bool + masterClient *wdclient.MasterClient } func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker {