mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge pull request #1 from chrislusf/master
This commit is contained in:
commit
272c7836c3
|
@ -10,7 +10,7 @@ import java.util.List;
|
||||||
|
|
||||||
public class ByteBufferPool {
|
public class ByteBufferPool {
|
||||||
|
|
||||||
private static final int MIN_BUFFER_SIZE = 8 * 1024 * 1024;
|
private static final int MIN_BUFFER_SIZE = 1 * 1024 * 1024;
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class);
|
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class);
|
||||||
|
|
||||||
private static final List<ByteBuffer> bufferList = new ArrayList<>();
|
private static final List<ByteBuffer> bufferList = new ArrayList<>();
|
||||||
|
|
|
@ -2,13 +2,9 @@ package filer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||||
"io/ioutil"
|
|
||||||
"math"
|
"math"
|
||||||
"net/http"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -31,50 +27,17 @@ func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.Seaweed
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReadContent(filerAddress string, dir, name string) ([]byte, error) {
|
func ReadInsideFiler(filerClient filer_pb.SeaweedFilerClient, dir, name string) (content []byte, err error) {
|
||||||
|
request := &filer_pb.LookupDirectoryEntryRequest{
|
||||||
target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name)
|
Directory: dir,
|
||||||
|
Name: name,
|
||||||
data, _, err := util.Get(target)
|
|
||||||
|
|
||||||
return data, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func SaveAs(host string, port int, dir, name string, contentType string, byteBuffer *bytes.Buffer) error {
|
|
||||||
var target string
|
|
||||||
if port == 0 {
|
|
||||||
target = fmt.Sprintf("http://%s%s/%s", host, dir, name)
|
|
||||||
} else {
|
|
||||||
target = fmt.Sprintf("http://%s:%d%s/%s", host, port, dir, name)
|
|
||||||
}
|
}
|
||||||
|
respLookupEntry, err := filer_pb.LookupEntry(filerClient, request)
|
||||||
// set the HTTP method, url, and request body
|
|
||||||
req, err := http.NewRequest(http.MethodPut, target, byteBuffer)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return
|
||||||
}
|
}
|
||||||
|
content = respLookupEntry.Entry.Content
|
||||||
// set the request header Content-Type for json
|
return
|
||||||
if contentType != "" {
|
|
||||||
req.Header.Set("Content-Type", contentType)
|
|
||||||
}
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer util.CloseResponse(resp)
|
|
||||||
|
|
||||||
b, err := ioutil.ReadAll(resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.StatusCode >= 400 {
|
|
||||||
return fmt.Errorf("%s: %s %v", target, resp.Status, string(b))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, content []byte) error {
|
func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, content []byte) error {
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
|
||||||
"github.com/golang/protobuf/jsonpb"
|
"github.com/golang/protobuf/jsonpb"
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
"io"
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -14,7 +15,7 @@ func ParseS3ConfigurationFromBytes(content []byte, config *iam_pb.S3ApiConfigura
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func S3ConfigurationToText(writer io.Writer, config *iam_pb.S3ApiConfiguration) error {
|
func ProtoToText(writer io.Writer, config proto.Message) error {
|
||||||
|
|
||||||
m := jsonpb.Marshaler{
|
m := jsonpb.Marshaler{
|
||||||
EmitDefaults: false,
|
EmitDefaults: false,
|
||||||
|
|
|
@ -44,7 +44,7 @@ func TestS3Conf(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
err := S3ConfigurationToText(&buf, s3Conf)
|
err := ProtoToText(&buf, s3Conf)
|
||||||
assert.Equal(t, err, nil)
|
assert.Equal(t, err, nil)
|
||||||
s3ConfSaved := &iam_pb.S3ApiConfiguration{}
|
s3ConfSaved := &iam_pb.S3ApiConfiguration{}
|
||||||
err = ParseS3ConfigurationFromBytes(buf.Bytes(), s3ConfSaved)
|
err = ParseS3ConfigurationFromBytes(buf.Bytes(), s3ConfSaved)
|
||||||
|
|
|
@ -96,8 +96,8 @@ func (iam IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfigurat
|
||||||
|
|
||||||
func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
|
func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
|
||||||
buf := bytes.Buffer{}
|
buf := bytes.Buffer{}
|
||||||
if err := filer.S3ConfigurationToText(&buf, s3cfg); err != nil {
|
if err := filer.ProtoToText(&buf, s3cfg); err != nil {
|
||||||
return fmt.Errorf("S3ConfigurationToText: %s", err)
|
return fmt.Errorf("ProtoToText: %s", err)
|
||||||
}
|
}
|
||||||
return pb.WithGrpcFilerClient(
|
return pb.WithGrpcFilerClient(
|
||||||
iam.option.FilerGrpcAddress,
|
iam.option.FilerGrpcAddress,
|
||||||
|
|
|
@ -4,6 +4,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
|
||||||
xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
|
xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
|
||||||
"github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
|
"github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
|
||||||
|
@ -51,8 +53,12 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag
|
||||||
return iam
|
return iam
|
||||||
}
|
}
|
||||||
|
|
||||||
func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) error {
|
func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) (err error) {
|
||||||
content, err := filer.ReadContent(option.Filer, filer.IamConfigDirecotry, filer.IamIdentityFile)
|
var content []byte
|
||||||
|
err = pb.WithFilerClient(option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
content, err = filer.ReadInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile)
|
||||||
|
return err
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("read S3 config: %v", err)
|
return fmt.Errorf("read S3 config: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package weed_server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -12,6 +11,12 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// MaxUnsyncedEvents send empty notification with timestamp when certain amount of events have been filtered
|
||||||
|
MaxUnsyncedEvents = 1e3
|
||||||
)
|
)
|
||||||
|
|
||||||
func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error {
|
func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error {
|
||||||
|
@ -25,7 +30,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
|
||||||
lastReadTime := time.Unix(0, req.SinceNs)
|
lastReadTime := time.Unix(0, req.SinceNs)
|
||||||
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
|
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
|
||||||
|
|
||||||
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
|
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
|
||||||
|
|
||||||
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
|
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
|
||||||
|
|
||||||
|
@ -87,7 +92,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
|
||||||
lastReadTime := time.Unix(0, req.SinceNs)
|
lastReadTime := time.Unix(0, req.SinceNs)
|
||||||
glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
|
glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
|
||||||
|
|
||||||
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
|
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
|
||||||
|
|
||||||
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
|
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
|
||||||
|
|
||||||
|
@ -152,12 +157,25 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
|
func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
|
||||||
return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
|
filtered := 0
|
||||||
|
|
||||||
|
return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
|
||||||
|
defer func() {
|
||||||
|
if filtered > MaxUnsyncedEvents {
|
||||||
|
if err := stream.Send(&filer_pb.SubscribeMetadataResponse{
|
||||||
|
EventNotification: &filer_pb.EventNotification{},
|
||||||
|
TsNs: tsNs,
|
||||||
|
}); err == nil {
|
||||||
|
filtered = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
filtered++
|
||||||
foundSelf := false
|
foundSelf := false
|
||||||
for _, sig := range eventNotification.Signatures {
|
for _, sig := range eventNotification.Signatures {
|
||||||
if sig == clientSignature && clientSignature != 0 {
|
if sig == req.Signature && req.Signature != 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if sig == fs.filer.Signature {
|
if sig == fs.filer.Signature {
|
||||||
|
@ -204,6 +222,7 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe
|
||||||
glog.V(0).Infof("=> client %v: %+v", clientName, err)
|
glog.V(0).Infof("=> client %v: %+v", clientName, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
filtered = 0
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
|
||||||
return nil, md5Hash, 0, err, nil
|
return nil, md5Hash, 0, err, nil
|
||||||
}
|
}
|
||||||
if chunkOffset == 0 && !isAppend(r) {
|
if chunkOffset == 0 && !isAppend(r) {
|
||||||
if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && dataSize < 4*1024 {
|
if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) {
|
||||||
chunkOffset += dataSize
|
chunkOffset += dataSize
|
||||||
smallContent = make([]byte, dataSize)
|
smallContent = make([]byte, dataSize)
|
||||||
bytesBuffer.Read(smallContent)
|
bytesBuffer.Read(smallContent)
|
||||||
|
|
|
@ -120,7 +120,9 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
|
||||||
|
|
||||||
if *apply {
|
if *apply {
|
||||||
|
|
||||||
if err := filer.SaveAs(commandEnv.option.FilerHost, int(commandEnv.option.FilerPort), filer.DirectoryEtcSeaweedFS, filer.FilerConfName, "text/plain; charset=utf-8", &buf); err != nil {
|
if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf.Bytes())
|
||||||
|
}); err != nil && err != filer_pb.ErrNotFound {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -164,7 +164,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io
|
||||||
}
|
}
|
||||||
|
|
||||||
buf.Reset()
|
buf.Reset()
|
||||||
filer.S3ConfigurationToText(&buf, s3cfg)
|
filer.ProtoToText(&buf, s3cfg)
|
||||||
|
|
||||||
fmt.Fprintf(writer, string(buf.Bytes()))
|
fmt.Fprintf(writer, string(buf.Bytes()))
|
||||||
fmt.Fprintln(writer)
|
fmt.Fprintln(writer)
|
||||||
|
|
|
@ -206,7 +206,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[u
|
||||||
|
|
||||||
if verbose {
|
if verbose {
|
||||||
for _, fid := range orphanFileIds {
|
for _, fid := range orphanFileIds {
|
||||||
fmt.Fprintf(writer, "%sxxxxxxxx\n", fid)
|
fmt.Fprintf(writer, "%s\n", fid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -410,7 +410,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder stri
|
||||||
var orphanFileCount uint64
|
var orphanFileCount uint64
|
||||||
db.AscendingVisit(func(n needle_map.NeedleValue) error {
|
db.AscendingVisit(func(n needle_map.NeedleValue) error {
|
||||||
// fmt.Printf("%d,%x\n", volumeId, n.Key)
|
// fmt.Printf("%d,%x\n", volumeId, n.Key)
|
||||||
orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s", volumeId, n.Key.String()))
|
orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s00000000", volumeId, n.Key.String()))
|
||||||
orphanFileCount++
|
orphanFileCount++
|
||||||
orphanDataSize += uint64(n.Size)
|
orphanDataSize += uint64(n.Size)
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -18,10 +18,10 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
type ExclusiveLocker struct {
|
type ExclusiveLocker struct {
|
||||||
masterClient *wdclient.MasterClient
|
|
||||||
token int64
|
token int64
|
||||||
lockTsNs int64
|
lockTsNs int64
|
||||||
isLocking bool
|
isLocking bool
|
||||||
|
masterClient *wdclient.MasterClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker {
|
func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker {
|
||||||
|
|
Loading…
Reference in a new issue