mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
commit
b3dd0ba446
4
go.mod
4
go.mod
|
@ -11,7 +11,7 @@ require (
|
||||||
github.com/aws/aws-sdk-go v1.33.5
|
github.com/aws/aws-sdk-go v1.33.5
|
||||||
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
|
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
|
||||||
github.com/cespare/xxhash v1.1.0
|
github.com/cespare/xxhash v1.1.0
|
||||||
github.com/chrislusf/raft v1.0.1
|
github.com/chrislusf/raft v1.0.2-0.20201002174524-b13c3bfdb011
|
||||||
github.com/coreos/go-semver v0.3.0 // indirect
|
github.com/coreos/go-semver v0.3.0 // indirect
|
||||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||||
github.com/disintegration/imaging v1.6.2
|
github.com/disintegration/imaging v1.6.2
|
||||||
|
@ -27,6 +27,7 @@ require (
|
||||||
github.com/go-sql-driver/mysql v1.5.0
|
github.com/go-sql-driver/mysql v1.5.0
|
||||||
github.com/gocql/gocql v0.0.0-20190829130954-e163eff7a8c6
|
github.com/gocql/gocql v0.0.0-20190829130954-e163eff7a8c6
|
||||||
github.com/gogo/protobuf v1.2.2-0.20190730201129-28a6bbf47e48 // indirect
|
github.com/gogo/protobuf v1.2.2-0.20190730201129-28a6bbf47e48 // indirect
|
||||||
|
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6
|
||||||
github.com/golang/protobuf v1.4.2
|
github.com/golang/protobuf v1.4.2
|
||||||
github.com/google/btree v1.0.0
|
github.com/google/btree v1.0.0
|
||||||
github.com/google/uuid v1.1.1
|
github.com/google/uuid v1.1.1
|
||||||
|
@ -78,6 +79,7 @@ require (
|
||||||
gocloud.dev/pubsub/rabbitpubsub v0.16.0
|
gocloud.dev/pubsub/rabbitpubsub v0.16.0
|
||||||
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
|
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
|
||||||
golang.org/x/net v0.0.0-20200202094626-16171245cfb2
|
golang.org/x/net v0.0.0-20200202094626-16171245cfb2
|
||||||
|
golang.org/x/sync v0.0.0-20200930132711-30421366ff76 // indirect
|
||||||
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5
|
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5
|
||||||
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5
|
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5
|
||||||
google.golang.org/api v0.9.0
|
google.golang.org/api v0.9.0
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -69,6 +69,8 @@ github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||||
github.com/chrislusf/raft v1.0.1 h1:Wa4ffkmkysW7cX3T/gMC/Mk3PhnOXhsqOVwQJcMndhw=
|
github.com/chrislusf/raft v1.0.1 h1:Wa4ffkmkysW7cX3T/gMC/Mk3PhnOXhsqOVwQJcMndhw=
|
||||||
github.com/chrislusf/raft v1.0.1/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
|
github.com/chrislusf/raft v1.0.1/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
|
||||||
|
github.com/chrislusf/raft v1.0.2-0.20201002174524-b13c3bfdb011 h1:vN1GvfLgDg8kIPCdhuVKAjlYpxG1B86jiKejB6MC/Q0=
|
||||||
|
github.com/chrislusf/raft v1.0.2-0.20201002174524-b13c3bfdb011/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
|
||||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||||
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa h1:OaNxuTZr7kxeODyLWsRMC+OD03aFUH+mW6r2d+MWa5Y=
|
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa h1:OaNxuTZr7kxeODyLWsRMC+OD03aFUH+mW6r2d+MWa5Y=
|
||||||
|
@ -613,6 +615,8 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ
|
||||||
golang.org/x/sync v0.0.0-20190412183630-56d357773e84/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20190412183630-56d357773e84/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
|
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
|
||||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.0.0-20200930132711-30421366ff76 h1:JnxiSYT3Nm0BT2a8CyvYyM6cnrWpidecD1UuSYbhKm0=
|
||||||
|
golang.org/x/sync v0.0.0-20200930132711-30421366ff76/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
|
82
test/s3/basic/object_tagging_test.go
Normal file
82
test/s3/basic/object_tagging_test.go
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
package basic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestObjectTagging(t *testing.T) {
|
||||||
|
|
||||||
|
input := &s3.PutObjectInput{
|
||||||
|
Bucket: aws.String("theBucket"),
|
||||||
|
Key: aws.String("testDir/testObject"),
|
||||||
|
}
|
||||||
|
|
||||||
|
svc.PutObject(input)
|
||||||
|
|
||||||
|
printTags()
|
||||||
|
|
||||||
|
setTags()
|
||||||
|
|
||||||
|
printTags()
|
||||||
|
|
||||||
|
clearTags()
|
||||||
|
|
||||||
|
printTags()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func printTags() {
|
||||||
|
response, err := svc.GetObjectTagging(
|
||||||
|
&s3.GetObjectTaggingInput{
|
||||||
|
Bucket: aws.String("theBucket"),
|
||||||
|
Key: aws.String("testDir/testObject"),
|
||||||
|
})
|
||||||
|
|
||||||
|
fmt.Println("printTags")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println(response.TagSet)
|
||||||
|
}
|
||||||
|
|
||||||
|
func setTags() {
|
||||||
|
|
||||||
|
response, err := svc.PutObjectTagging(&s3.PutObjectTaggingInput{
|
||||||
|
Bucket: aws.String("theBucket"),
|
||||||
|
Key: aws.String("testDir/testObject"),
|
||||||
|
Tagging: &s3.Tagging{
|
||||||
|
TagSet: []*s3.Tag{
|
||||||
|
{
|
||||||
|
Key: aws.String("kye2"),
|
||||||
|
Value: aws.String("value2"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
fmt.Println("setTags")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println(response.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func clearTags() {
|
||||||
|
|
||||||
|
response, err := svc.DeleteObjectTagging(&s3.DeleteObjectTaggingInput{
|
||||||
|
Bucket: aws.String("theBucket"),
|
||||||
|
Key: aws.String("testDir/testObject"),
|
||||||
|
})
|
||||||
|
|
||||||
|
fmt.Println("clearTags")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println(response.String())
|
||||||
|
}
|
|
@ -41,6 +41,7 @@ type MasterOptions struct {
|
||||||
disableHttp *bool
|
disableHttp *bool
|
||||||
metricsAddress *string
|
metricsAddress *string
|
||||||
metricsIntervalSec *int
|
metricsIntervalSec *int
|
||||||
|
raftResumeState *bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -59,6 +60,7 @@ func init() {
|
||||||
m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
|
m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
|
||||||
m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address <host>:<port>")
|
m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address <host>:<port>")
|
||||||
m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
|
m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
|
||||||
|
m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server")
|
||||||
}
|
}
|
||||||
|
|
||||||
var cmdMaster = &Command{
|
var cmdMaster = &Command{
|
||||||
|
@ -118,10 +120,10 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
|
||||||
glog.Fatalf("Master startup error: %v", e)
|
glog.Fatalf("Master startup error: %v", e)
|
||||||
}
|
}
|
||||||
// start raftServer
|
// start raftServer
|
||||||
raftServer := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"),
|
raftServer, err := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"),
|
||||||
peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, 5)
|
peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, 5, *masterOption.raftResumeState)
|
||||||
if raftServer == nil {
|
if raftServer == nil {
|
||||||
glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder)
|
glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
|
||||||
}
|
}
|
||||||
ms.SetRaftServer(raftServer)
|
ms.SetRaftServer(raftServer)
|
||||||
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
|
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
|
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
|
||||||
"os"
|
"os"
|
||||||
|
"os/user"
|
||||||
"path"
|
"path"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -92,6 +93,29 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
|
||||||
}
|
}
|
||||||
fileInfo, err := os.Stat(dir)
|
fileInfo, err := os.Stat(dir)
|
||||||
|
|
||||||
|
uid, gid := uint32(0), uint32(0)
|
||||||
|
mountMode := os.ModeDir | 0777
|
||||||
|
if err == nil {
|
||||||
|
mountMode = os.ModeDir | fileInfo.Mode()
|
||||||
|
uid, gid = util.GetFileUidGid(fileInfo)
|
||||||
|
fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, fileInfo.Mode())
|
||||||
|
} else {
|
||||||
|
fmt.Printf("can not stat %s\n", dir)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if uid == 0 {
|
||||||
|
if u, err := user.Current(); err == nil {
|
||||||
|
if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil {
|
||||||
|
uid = uint32(parsedId)
|
||||||
|
}
|
||||||
|
if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil {
|
||||||
|
gid = uint32(parsedId)
|
||||||
|
}
|
||||||
|
fmt.Printf("current uid=%d gid=%d\n", uid, gid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// mapping uid, gid
|
// mapping uid, gid
|
||||||
uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap)
|
uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -150,6 +174,9 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
|
||||||
CacheSizeMB: *option.cacheSizeMB,
|
CacheSizeMB: *option.cacheSizeMB,
|
||||||
DataCenter: *option.dataCenter,
|
DataCenter: *option.dataCenter,
|
||||||
EntryCacheTtl: 3 * time.Second,
|
EntryCacheTtl: 3 * time.Second,
|
||||||
|
MountUid: uid,
|
||||||
|
MountGid: gid,
|
||||||
|
MountMode: mountMode,
|
||||||
MountCtime: fileInfo.ModTime(),
|
MountCtime: fileInfo.ModTime(),
|
||||||
MountMtime: time.Now(),
|
MountMtime: time.Now(),
|
||||||
Umask: umask,
|
Umask: umask,
|
||||||
|
|
|
@ -81,6 +81,7 @@ func init() {
|
||||||
masterOptions.garbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
|
masterOptions.garbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
|
||||||
masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address")
|
masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address")
|
||||||
masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
|
masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
|
||||||
|
masterOptions.raftResumeState = cmdServer.Flag.Bool("resumeState", false, "resume previous state on start master server")
|
||||||
|
|
||||||
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
|
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
|
||||||
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
|
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
|
||||||
|
|
|
@ -3,7 +3,9 @@ package filer
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/golang/groupcache/singleflight"
|
||||||
"io"
|
"io"
|
||||||
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
@ -19,7 +21,10 @@ type ChunkReadAt struct {
|
||||||
readerLock sync.Mutex
|
readerLock sync.Mutex
|
||||||
fileSize int64
|
fileSize int64
|
||||||
|
|
||||||
chunkCache chunk_cache.ChunkCache
|
fetchGroup singleflight.Group
|
||||||
|
lastChunkFileId string
|
||||||
|
lastChunkData []byte
|
||||||
|
chunkCache chunk_cache.ChunkCache
|
||||||
}
|
}
|
||||||
|
|
||||||
// var _ = io.ReaderAt(&ChunkReadAt{})
|
// var _ = io.ReaderAt(&ChunkReadAt{})
|
||||||
|
@ -27,28 +32,36 @@ type ChunkReadAt struct {
|
||||||
type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error)
|
type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error)
|
||||||
|
|
||||||
func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
|
func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
|
||||||
|
|
||||||
|
vidCache := make(map[string]*filer_pb.Locations)
|
||||||
return func(fileId string) (targetUrl string, err error) {
|
return func(fileId string) (targetUrl string, err error) {
|
||||||
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
vid := VolumeId(fileId)
|
||||||
vid := VolumeId(fileId)
|
locations, found := vidCache[vid]
|
||||||
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
|
|
||||||
VolumeIds: []string{vid},
|
if !found {
|
||||||
|
// println("looking up volume", vid)
|
||||||
|
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
|
||||||
|
VolumeIds: []string{vid},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
locations = resp.LocationsMap[vid]
|
||||||
|
if locations == nil || len(locations.Locations) == 0 {
|
||||||
|
glog.V(0).Infof("failed to locate %s", fileId)
|
||||||
|
return fmt.Errorf("failed to locate %s", fileId)
|
||||||
|
}
|
||||||
|
vidCache[vid] = locations
|
||||||
|
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
}
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
locations := resp.LocationsMap[vid]
|
volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[rand.Intn(len(locations.Locations))].Url)
|
||||||
if locations == nil || len(locations.Locations) == 0 {
|
targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
|
||||||
glog.V(0).Infof("failed to locate %s", fileId)
|
|
||||||
return fmt.Errorf("failed to locate %s", fileId)
|
|
||||||
}
|
|
||||||
|
|
||||||
volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
|
|
||||||
|
|
||||||
targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -76,10 +89,16 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
|
||||||
|
|
||||||
var buffer []byte
|
var buffer []byte
|
||||||
startOffset, remaining := offset, int64(len(p))
|
startOffset, remaining := offset, int64(len(p))
|
||||||
|
var nextChunk *ChunkView
|
||||||
for i, chunk := range c.chunkViews {
|
for i, chunk := range c.chunkViews {
|
||||||
if remaining <= 0 {
|
if remaining <= 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
if i+1 < len(c.chunkViews) {
|
||||||
|
nextChunk = c.chunkViews[i+1]
|
||||||
|
} else {
|
||||||
|
nextChunk = nil
|
||||||
|
}
|
||||||
if startOffset < chunk.LogicOffset {
|
if startOffset < chunk.LogicOffset {
|
||||||
gap := int(chunk.LogicOffset - startOffset)
|
gap := int(chunk.LogicOffset - startOffset)
|
||||||
glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap))
|
glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap))
|
||||||
|
@ -95,7 +114,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
|
glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
|
||||||
buffer, err = c.readFromWholeChunkData(chunk)
|
buffer, err = c.readFromWholeChunkData(chunk, nextChunk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
|
glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
|
||||||
return
|
return
|
||||||
|
@ -123,27 +142,63 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView) (chunkData []byte, err error) {
|
func (c *ChunkReadAt) readFromWholeChunkData(chunkView, nextChunkView *ChunkView) (chunkData []byte, err error) {
|
||||||
|
|
||||||
glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize)
|
if c.lastChunkFileId == chunkView.FileId {
|
||||||
|
return c.lastChunkData, nil
|
||||||
chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
|
|
||||||
if chunkData != nil {
|
|
||||||
glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData)))
|
|
||||||
} else {
|
|
||||||
glog.V(4).Infof("doFetchFullChunkData %s", chunkView.FileId)
|
|
||||||
chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.chunkCache.SetChunk(chunkView.FileId, chunkData)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
v, doErr := c.readOneWholeChunk(chunkView)
|
||||||
|
|
||||||
|
if doErr != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
chunkData = v.([]byte)
|
||||||
|
|
||||||
|
c.lastChunkData = chunkData
|
||||||
|
c.lastChunkFileId = chunkView.FileId
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if c.chunkCache != nil && nextChunkView != nil {
|
||||||
|
c.readOneWholeChunk(nextChunkView)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
|
func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, error) {
|
||||||
|
|
||||||
return fetchChunk(c.lookupFileId, fileId, cipherKey, isGzipped)
|
var err error
|
||||||
|
|
||||||
|
return c.fetchGroup.Do(chunkView.FileId, func() (interface{}, error) {
|
||||||
|
|
||||||
|
glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize)
|
||||||
|
|
||||||
|
data := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
|
||||||
|
if data != nil {
|
||||||
|
glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data)))
|
||||||
|
} else {
|
||||||
|
var err error
|
||||||
|
data, err = c.doFetchFullChunkData(chunkView)
|
||||||
|
if err != nil {
|
||||||
|
return data, err
|
||||||
|
}
|
||||||
|
c.chunkCache.SetChunk(chunkView.FileId, data)
|
||||||
|
}
|
||||||
|
return data, err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) {
|
||||||
|
|
||||||
|
glog.V(2).Infof("+ doFetchFullChunkData %s", chunkView.FileId)
|
||||||
|
|
||||||
|
data, err := fetchChunk(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
|
||||||
|
|
||||||
|
glog.V(2).Infof("- doFetchFullChunkData %s", chunkView.FileId)
|
||||||
|
|
||||||
|
return data, err
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,9 +82,9 @@ func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *f
|
||||||
func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
|
func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
|
||||||
attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode()
|
attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode()
|
||||||
attr.Valid = time.Hour
|
attr.Valid = time.Hour
|
||||||
attr.Uid = dir.entry.Attributes.Uid
|
attr.Uid = dir.wfs.option.MountUid
|
||||||
attr.Gid = dir.entry.Attributes.Gid
|
attr.Gid = dir.wfs.option.MountGid
|
||||||
attr.Mode = os.FileMode(dir.entry.Attributes.FileMode)
|
attr.Mode = dir.wfs.option.MountMode
|
||||||
attr.Crtime = dir.wfs.option.MountCtime
|
attr.Crtime = dir.wfs.option.MountCtime
|
||||||
attr.Ctime = dir.wfs.option.MountCtime
|
attr.Ctime = dir.wfs.option.MountCtime
|
||||||
attr.Mtime = dir.wfs.option.MountMtime
|
attr.Mtime = dir.wfs.option.MountMtime
|
||||||
|
|
|
@ -37,6 +37,9 @@ type Option struct {
|
||||||
EntryCacheTtl time.Duration
|
EntryCacheTtl time.Duration
|
||||||
Umask os.FileMode
|
Umask os.FileMode
|
||||||
|
|
||||||
|
MountUid uint32
|
||||||
|
MountGid uint32
|
||||||
|
MountMode os.FileMode
|
||||||
MountCtime time.Time
|
MountCtime time.Time
|
||||||
MountMtime time.Time
|
MountMtime time.Time
|
||||||
|
|
||||||
|
|
104
weed/s3api/filer_util_tags.go
Normal file
104
weed/s3api/filer_util_tags.go
Normal file
|
@ -0,0 +1,104 @@
|
||||||
|
package s3api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
)
|
||||||
|
|
||||||
|
const(
|
||||||
|
S3TAG_PREFIX = "s3-"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s3a *S3ApiServer) getTags(parentDirectoryPath string, entryName string) (tags map[string]string, err error) {
|
||||||
|
|
||||||
|
err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
|
resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
|
||||||
|
Directory: parentDirectoryPath,
|
||||||
|
Name: entryName,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tags = make(map[string]string)
|
||||||
|
for k, v := range resp.Entry.Extended {
|
||||||
|
if strings.HasPrefix(k, S3TAG_PREFIX) {
|
||||||
|
tags[k[len(S3TAG_PREFIX):]] = string(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3a *S3ApiServer) setTags(parentDirectoryPath string, entryName string, tags map[string]string) (err error) {
|
||||||
|
|
||||||
|
return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
|
resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
|
||||||
|
Directory: parentDirectoryPath,
|
||||||
|
Name: entryName,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, _ := range resp.Entry.Extended {
|
||||||
|
if strings.HasPrefix(k, S3TAG_PREFIX) {
|
||||||
|
delete(resp.Entry.Extended, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.Entry.Extended == nil {
|
||||||
|
resp.Entry.Extended = make(map[string][]byte)
|
||||||
|
}
|
||||||
|
for k, v := range tags {
|
||||||
|
resp.Entry.Extended[S3TAG_PREFIX+k] = []byte(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return filer_pb.UpdateEntry(client, &filer_pb.UpdateEntryRequest{
|
||||||
|
Directory: parentDirectoryPath,
|
||||||
|
Entry: resp.Entry,
|
||||||
|
IsFromOtherCluster: false,
|
||||||
|
Signatures: nil,
|
||||||
|
})
|
||||||
|
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3a *S3ApiServer) rmTags(parentDirectoryPath string, entryName string) (err error) {
|
||||||
|
|
||||||
|
return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
|
resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
|
||||||
|
Directory: parentDirectoryPath,
|
||||||
|
Name: entryName,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
hasDeletion := false
|
||||||
|
for k, _ := range resp.Entry.Extended {
|
||||||
|
if strings.HasPrefix(k, S3TAG_PREFIX) {
|
||||||
|
delete(resp.Entry.Extended, k)
|
||||||
|
hasDeletion = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !hasDeletion {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return filer_pb.UpdateEntry(client, &filer_pb.UpdateEntryRequest{
|
||||||
|
Directory: parentDirectoryPath,
|
||||||
|
Entry: resp.Entry,
|
||||||
|
IsFromOtherCluster: false,
|
||||||
|
Signatures: nil,
|
||||||
|
})
|
||||||
|
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
117
weed/s3api/s3api_object_tagging_handlers.go
Normal file
117
weed/s3api/s3api_object_tagging_handlers.go
Normal file
|
@ -0,0 +1,117 @@
|
||||||
|
package s3api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/xml"
|
||||||
|
"fmt"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetObjectTaggingHandler - GET object tagging
|
||||||
|
// API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectTagging.html
|
||||||
|
func (s3a *S3ApiServer) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
|
bucket, object := getBucketAndObject(r)
|
||||||
|
|
||||||
|
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
|
||||||
|
dir, name := target.DirAndName()
|
||||||
|
|
||||||
|
tags, err := s3a.getTags(dir, name)
|
||||||
|
if err != nil {
|
||||||
|
if err == filer_pb.ErrNotFound {
|
||||||
|
glog.Errorf("GetObjectTaggingHandler %s: %v", r.URL, err)
|
||||||
|
writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL)
|
||||||
|
} else {
|
||||||
|
glog.Errorf("GetObjectTaggingHandler %s: %v", r.URL, err)
|
||||||
|
writeErrorResponse(w, s3err.ErrInternalError, r.URL)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
writeSuccessResponseXML(w, encodeResponse(FromTags(tags)))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// PutObjectTaggingHandler Put object tagging
|
||||||
|
// API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectTagging.html
|
||||||
|
func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
|
bucket, object := getBucketAndObject(r)
|
||||||
|
|
||||||
|
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
|
||||||
|
dir, name := target.DirAndName()
|
||||||
|
|
||||||
|
tagging := &Tagging{}
|
||||||
|
input, err := ioutil.ReadAll(io.LimitReader(r.Body, r.ContentLength))
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("PutObjectTaggingHandler read input %s: %v", r.URL, err)
|
||||||
|
writeErrorResponse(w, s3err.ErrInternalError, r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = xml.Unmarshal(input, tagging); err != nil {
|
||||||
|
glog.Errorf("PutObjectTaggingHandler Unmarshal %s: %v", r.URL, err)
|
||||||
|
writeErrorResponse(w, s3err.ErrMalformedXML, r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
tags := tagging.ToTags()
|
||||||
|
if len(tags) > 10 {
|
||||||
|
glog.Errorf("PutObjectTaggingHandler tags %s: %d tags more than 10", r.URL, len(tags))
|
||||||
|
writeErrorResponse(w, s3err.ErrInvalidTag, r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for k, v := range tags {
|
||||||
|
if len(k) > 128 {
|
||||||
|
glog.Errorf("PutObjectTaggingHandler tags %s: tag key %s longer than 128", r.URL, k)
|
||||||
|
writeErrorResponse(w, s3err.ErrInvalidTag, r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(v) > 256 {
|
||||||
|
glog.Errorf("PutObjectTaggingHandler tags %s: tag value %s longer than 256", r.URL, v)
|
||||||
|
writeErrorResponse(w, s3err.ErrInvalidTag, r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = s3a.setTags(dir, name, tagging.ToTags()); err != nil {
|
||||||
|
if err == filer_pb.ErrNotFound {
|
||||||
|
glog.Errorf("PutObjectTaggingHandler setTags %s: %v", r.URL, err)
|
||||||
|
writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL)
|
||||||
|
} else {
|
||||||
|
glog.Errorf("PutObjectTaggingHandler setTags %s: %v", r.URL, err)
|
||||||
|
writeErrorResponse(w, s3err.ErrInternalError, r.URL)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteObjectTaggingHandler Delete object tagging
|
||||||
|
// API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjectTagging.html
|
||||||
|
func (s3a *S3ApiServer) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
|
bucket, object := getBucketAndObject(r)
|
||||||
|
|
||||||
|
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
|
||||||
|
dir, name := target.DirAndName()
|
||||||
|
|
||||||
|
err := s3a.rmTags(dir, name)
|
||||||
|
if err != nil {
|
||||||
|
if err == filer_pb.ErrNotFound {
|
||||||
|
glog.Errorf("DeleteObjectTaggingHandler %s: %v", r.URL, err)
|
||||||
|
writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL)
|
||||||
|
} else {
|
||||||
|
glog.Errorf("DeleteObjectTaggingHandler %s: %v", r.URL, err)
|
||||||
|
writeErrorResponse(w, s3err.ErrInternalError, r.URL)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
}
|
|
@ -68,6 +68,13 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
|
||||||
// ListMultipartUploads
|
// ListMultipartUploads
|
||||||
bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListMultipartUploadsHandler, ACTION_WRITE), "GET")).Queries("uploads", "")
|
bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListMultipartUploadsHandler, ACTION_WRITE), "GET")).Queries("uploads", "")
|
||||||
|
|
||||||
|
// GetObjectTagging
|
||||||
|
bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.GetObjectTaggingHandler, ACTION_WRITE), "GET")).Queries("tagging", "")
|
||||||
|
// PutObjectTagging
|
||||||
|
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectTaggingHandler, ACTION_WRITE), "PUT")).Queries("tagging", "")
|
||||||
|
// DeleteObjectTagging
|
||||||
|
bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteObjectTaggingHandler, ACTION_WRITE), "DELETE")).Queries("tagging", "")
|
||||||
|
|
||||||
// CopyObject
|
// CopyObject
|
||||||
bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.CopyObjectHandler, ACTION_WRITE), "COPY"))
|
bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.CopyObjectHandler, ACTION_WRITE), "COPY"))
|
||||||
// PutObject
|
// PutObject
|
||||||
|
|
|
@ -61,6 +61,7 @@ const (
|
||||||
ErrInternalError
|
ErrInternalError
|
||||||
ErrInvalidCopyDest
|
ErrInvalidCopyDest
|
||||||
ErrInvalidCopySource
|
ErrInvalidCopySource
|
||||||
|
ErrInvalidTag
|
||||||
ErrAuthHeaderEmpty
|
ErrAuthHeaderEmpty
|
||||||
ErrSignatureVersionNotSupported
|
ErrSignatureVersionNotSupported
|
||||||
ErrMalformedPOSTRequest
|
ErrMalformedPOSTRequest
|
||||||
|
@ -188,6 +189,11 @@ var errorCodeResponse = map[ErrorCode]APIError{
|
||||||
Description: "Copy Source must mention the source bucket and key: sourcebucket/sourcekey.",
|
Description: "Copy Source must mention the source bucket and key: sourcebucket/sourcekey.",
|
||||||
HTTPStatusCode: http.StatusBadRequest,
|
HTTPStatusCode: http.StatusBadRequest,
|
||||||
},
|
},
|
||||||
|
ErrInvalidTag: {
|
||||||
|
Code: "InvalidArgument",
|
||||||
|
Description: "The Tag value you have provided is invalid",
|
||||||
|
HTTPStatusCode: http.StatusBadRequest,
|
||||||
|
},
|
||||||
ErrMalformedXML: {
|
ErrMalformedXML: {
|
||||||
Code: "MalformedXML",
|
Code: "MalformedXML",
|
||||||
Description: "The XML you provided was not well-formed or did not validate against our published schema.",
|
Description: "The XML you provided was not well-formed or did not validate against our published schema.",
|
||||||
|
|
38
weed/s3api/tags.go
Normal file
38
weed/s3api/tags.go
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
package s3api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/xml"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Tag struct {
|
||||||
|
Key string `xml:"Key"`
|
||||||
|
Value string `xml:"Value"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type TagSet struct {
|
||||||
|
Tag []Tag `xml:"Tag"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Tagging struct {
|
||||||
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Tagging"`
|
||||||
|
TagSet TagSet `xml:"TagSet"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tagging) ToTags() map[string]string {
|
||||||
|
output := make(map[string]string)
|
||||||
|
for _, tag := range t.TagSet.Tag {
|
||||||
|
output[tag.Key] = tag.Value
|
||||||
|
}
|
||||||
|
return output
|
||||||
|
}
|
||||||
|
|
||||||
|
func FromTags(tags map[string]string) (t *Tagging) {
|
||||||
|
t = &Tagging{}
|
||||||
|
for k, v := range tags {
|
||||||
|
t.TagSet.Tag = append(t.TagSet.Tag, Tag{
|
||||||
|
Key: k,
|
||||||
|
Value: v,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
50
weed/s3api/tags_test.go
Normal file
50
weed/s3api/tags_test.go
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
package s3api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/xml"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestXMLUnmarshall(t *testing.T) {
|
||||||
|
|
||||||
|
input := `<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<Tagging xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
|
||||||
|
<TagSet>
|
||||||
|
<Tag>
|
||||||
|
<Key>key1</Key>
|
||||||
|
<Value>value1</Value>
|
||||||
|
</Tag>
|
||||||
|
</TagSet>
|
||||||
|
</Tagging>
|
||||||
|
`
|
||||||
|
|
||||||
|
tags := &Tagging{}
|
||||||
|
|
||||||
|
xml.Unmarshal([]byte(input), tags)
|
||||||
|
|
||||||
|
assert.Equal(t, len(tags.TagSet.Tag), 1)
|
||||||
|
assert.Equal(t, tags.TagSet.Tag[0].Key, "key1")
|
||||||
|
assert.Equal(t, tags.TagSet.Tag[0].Value, "value1")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestXMLMarshall(t *testing.T) {
|
||||||
|
tags := &Tagging{
|
||||||
|
TagSet: TagSet{
|
||||||
|
[]Tag{
|
||||||
|
{
|
||||||
|
Key: "key1",
|
||||||
|
Value: "value1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
actual := string(encodeResponse(tags))
|
||||||
|
|
||||||
|
expected := `<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<Tagging xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><TagSet><Tag><Key>key1</Key><Value>value1</Value></Tag></TagSet></Tagging>`
|
||||||
|
assert.Equal(t, expected, actual)
|
||||||
|
|
||||||
|
}
|
|
@ -28,7 +28,31 @@ type RaftServer struct {
|
||||||
*raft.GrpcServer
|
*raft.GrpcServer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
|
type StateMachine struct {
|
||||||
|
raft.StateMachine
|
||||||
|
topo *topology.Topology
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s StateMachine) Save() ([]byte, error) {
|
||||||
|
state := topology.MaxVolumeIdCommand{
|
||||||
|
MaxVolumeId: s.topo.GetMaxVolumeId(),
|
||||||
|
}
|
||||||
|
glog.V(1).Infof("Save raft state %+v", state)
|
||||||
|
return json.Marshal(state)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s StateMachine) Recovery(data []byte) error {
|
||||||
|
state := topology.MaxVolumeIdCommand{}
|
||||||
|
err := json.Unmarshal(data, &state)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
glog.V(1).Infof("Recovery raft state %+v", state)
|
||||||
|
s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int, raftResumeState bool) (*RaftServer, error) {
|
||||||
s := &RaftServer{
|
s := &RaftServer{
|
||||||
peers: peers,
|
peers: peers,
|
||||||
serverAddr: serverAddr,
|
serverAddr: serverAddr,
|
||||||
|
@ -46,26 +70,41 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
|
||||||
transporter := raft.NewGrpcTransporter(grpcDialOption)
|
transporter := raft.NewGrpcTransporter(grpcDialOption)
|
||||||
glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
|
glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
|
||||||
|
|
||||||
// always clear previous metadata
|
if !raftResumeState {
|
||||||
os.RemoveAll(path.Join(s.dataDir, "conf"))
|
// always clear previous metadata
|
||||||
os.RemoveAll(path.Join(s.dataDir, "log"))
|
os.RemoveAll(path.Join(s.dataDir, "conf"))
|
||||||
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
|
os.RemoveAll(path.Join(s.dataDir, "log"))
|
||||||
|
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
|
||||||
|
}
|
||||||
|
if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
// Clear old cluster configurations if peers are changed
|
// Clear old cluster configurations if peers are changed
|
||||||
if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed {
|
if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed {
|
||||||
glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers)
|
glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, nil, topo, "")
|
stateMachine := StateMachine{topo: topo}
|
||||||
|
s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infoln(err)
|
glog.V(0).Infoln(err)
|
||||||
return nil
|
return nil, err
|
||||||
}
|
}
|
||||||
s.raftServer.SetHeartbeatInterval(500 * time.Millisecond)
|
s.raftServer.SetHeartbeatInterval(500 * time.Millisecond)
|
||||||
s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond)
|
s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond)
|
||||||
s.raftServer.Start()
|
if err := s.raftServer.LoadSnapshot(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := s.raftServer.Start(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
for _, peer := range s.peers {
|
for _, peer := range s.peers {
|
||||||
s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer))
|
if err := s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
|
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
|
||||||
|
@ -81,13 +120,13 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infoln(err)
|
glog.V(0).Infoln(err)
|
||||||
return nil
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
|
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
|
||||||
|
|
||||||
return s
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *RaftServer) Peers() (members []string) {
|
func (s *RaftServer) Peers() (members []string) {
|
||||||
|
|
|
@ -16,7 +16,7 @@ func CreateVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32
|
||||||
}
|
}
|
||||||
if preallocate != 0 {
|
if preallocate != 0 {
|
||||||
syscall.Fallocate(int(file.Fd()), 1, 0, preallocate)
|
syscall.Fallocate(int(file.Fd()), 1, 0, preallocate)
|
||||||
glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName)
|
glog.V(1).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName)
|
||||||
}
|
}
|
||||||
return NewDiskFile(file), nil
|
return NewDiskFile(file), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,7 +76,7 @@ func (c *TieredChunkCache) doGetChunk(fileId string, minSize uint64) (data []byt
|
||||||
return data
|
return data
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if minSize <= c.onDiskCacheSizeLimit2 {
|
{
|
||||||
data = c.diskCaches[2].getChunk(fid.Key)
|
data = c.diskCaches[2].getChunk(fid.Key)
|
||||||
if len(data) >= int(minSize) {
|
if len(data) >= int(minSize) {
|
||||||
return data
|
return data
|
||||||
|
@ -115,7 +115,7 @@ func (c *TieredChunkCache) doSetChunk(fileId string, data []byte) {
|
||||||
c.diskCaches[0].setChunk(fid.Key, data)
|
c.diskCaches[0].setChunk(fid.Key, data)
|
||||||
} else if len(data) <= int(c.onDiskCacheSizeLimit1) {
|
} else if len(data) <= int(c.onDiskCacheSizeLimit1) {
|
||||||
c.diskCaches[1].setChunk(fid.Key, data)
|
c.diskCaches[1].setChunk(fid.Key, data)
|
||||||
} else if len(data) <= int(c.onDiskCacheSizeLimit2) {
|
} else {
|
||||||
c.diskCaches[2].setChunk(fid.Key, data)
|
c.diskCaches[2].setChunk(fid.Key, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue