Merge branch 'track-mount-e2e' of https://github.com/seaweedfs/seaweedfs into track-mount-e2e

This commit is contained in:
chrislu 2023-09-22 21:54:08 -07:00
commit 900abd44c1
23 changed files with 275 additions and 52 deletions

View file

@ -53,7 +53,7 @@ jobs:
overwrite: true overwrite: true
pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0
build_flags: -tags 5BytesOffset # optional, default is build_flags: -tags 5BytesOffset # optional, default is
ldflags: -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .` # Where to run `go build .`
project_path: weed project_path: weed
binary_name: weed-large-disk binary_name: weed-large-disk
@ -68,7 +68,7 @@ jobs:
release_tag: dev release_tag: dev
overwrite: true overwrite: true
pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0
ldflags: -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .` # Where to run `go build .`
project_path: weed project_path: weed
binary_name: weed-normal-disk binary_name: weed-normal-disk
@ -102,7 +102,7 @@ jobs:
overwrite: true overwrite: true
pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0
build_flags: -tags 5BytesOffset # optional, default is build_flags: -tags 5BytesOffset # optional, default is
ldflags: -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .` # Where to run `go build .`
project_path: weed project_path: weed
binary_name: weed-large-disk binary_name: weed-large-disk
@ -117,7 +117,7 @@ jobs:
release_tag: dev release_tag: dev
overwrite: true overwrite: true
pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0
ldflags: -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .` # Where to run `go build .`
project_path: weed project_path: weed
binary_name: weed-normal-disk binary_name: weed-normal-disk

View file

@ -38,7 +38,7 @@ jobs:
overwrite: true overwrite: true
pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0
# build_flags: -tags 5BytesOffset # optional, default is # build_flags: -tags 5BytesOffset # optional, default is
ldflags: -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .` # Where to run `go build .`
project_path: weed project_path: weed
binary_name: weed binary_name: weed
@ -52,7 +52,7 @@ jobs:
overwrite: true overwrite: true
pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0
build_flags: -tags 5BytesOffset # optional, default is build_flags: -tags 5BytesOffset # optional, default is
ldflags: -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .` # Where to run `go build .`
project_path: weed project_path: weed
binary_name: weed binary_name: weed

View file

@ -38,7 +38,7 @@ jobs:
overwrite: true overwrite: true
pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0
# build_flags: -tags 5BytesOffset # optional, default is # build_flags: -tags 5BytesOffset # optional, default is
ldflags: -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .` # Where to run `go build .`
project_path: weed project_path: weed
binary_name: weed binary_name: weed
@ -52,7 +52,7 @@ jobs:
overwrite: true overwrite: true
pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0
build_flags: -tags 5BytesOffset # optional, default is build_flags: -tags 5BytesOffset # optional, default is
ldflags: -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .` # Where to run `go build .`
project_path: weed project_path: weed
binary_name: weed binary_name: weed

View file

@ -38,7 +38,7 @@ jobs:
overwrite: true overwrite: true
pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0
# build_flags: -tags 5BytesOffset # optional, default is # build_flags: -tags 5BytesOffset # optional, default is
ldflags: -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .` # Where to run `go build .`
project_path: weed project_path: weed
binary_name: weed binary_name: weed
@ -52,7 +52,7 @@ jobs:
overwrite: true overwrite: true
pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0
build_flags: -tags 5BytesOffset # optional, default is build_flags: -tags 5BytesOffset # optional, default is
ldflags: -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .` # Where to run `go build .`
project_path: weed project_path: weed
binary_name: weed binary_name: weed

View file

@ -38,7 +38,7 @@ jobs:
overwrite: true overwrite: true
pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0
# build_flags: -tags 5BytesOffset # optional, default is # build_flags: -tags 5BytesOffset # optional, default is
ldflags: -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .` # Where to run `go build .`
project_path: weed project_path: weed
binary_name: weed binary_name: weed
@ -52,7 +52,7 @@ jobs:
overwrite: true overwrite: true
pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0
build_flags: -tags 5BytesOffset # optional, default is build_flags: -tags 5BytesOffset # optional, default is
ldflags: -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .` # Where to run `go build .`
project_path: weed project_path: weed
binary_name: weed binary_name: weed

View file

@ -39,7 +39,7 @@ jobs:
build_flags: -tags elastic,ydb,gocdk,tikv build_flags: -tags elastic,ydb,gocdk,tikv
pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0
# build_flags: -tags 5BytesOffset # optional, default is # build_flags: -tags 5BytesOffset # optional, default is
ldflags: -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .` # Where to run `go build .`
project_path: weed project_path: weed
binary_name: weed binary_name: weed
@ -53,7 +53,7 @@ jobs:
overwrite: true overwrite: true
pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0
build_flags: -tags 5BytesOffset,elastic,ydb,gocdk,tikv build_flags: -tags 5BytesOffset,elastic,ydb,gocdk,tikv
ldflags: -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .` # Where to run `go build .`
project_path: weed project_path: weed
binary_name: weed binary_name: weed

View file

@ -21,7 +21,7 @@ jobs:
e2e: e2e:
name: FUSE Mount name: FUSE Mount
runs-on: ubuntu-22.04 runs-on: ubuntu-22.04
timeout-minutes: 15 timeout-minutes: 30
steps: steps:
- name: Set up Go 1.x - name: Set up Go 1.x
uses: actions/setup-go@93397bea11091df50f3d7e59dc26a7711a8bcfbe # v2 uses: actions/setup-go@93397bea11091df50f3d7e59dc26a7711a8bcfbe # v2
@ -47,12 +47,12 @@ jobs:
echo "Starting FIO at: $(date)" echo "Starting FIO at: $(date)"
# Concurrent r/w # Concurrent r/w
echo 'Run randrw with size=16M bs=4k' echo 'Run randrw with size=16M bs=4k'
docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 60 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randrw --bs=4k --direct=1 --numjobs=8 --iodepth=32 --group_reporting --runtime=30 --time_based=1 docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 600 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randrw --bs=4k --direct=1 --numjobs=8 --ioengine=libaio --iodepth=32 --group_reporting --runtime=30 --time_based=1
echo "Verify FIO at: $(date)" echo "Verify FIO at: $(date)"
# Verified write # Verified write
echo 'Run randwrite with size=16M bs=4k' echo 'Run randwrite with size=16M bs=4k'
docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 60 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randwrite --bs=4k --direct=1 --numjobs=8 --iodepth=32 --group_reporting --runtime=30 --time_based=1 --do_verify=0 --verify=crc32c --verify_backlog=1 docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 600 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randwrite --bs=4k --direct=1 --numjobs=8 --ioengine=libaio --iodepth=32 --group_reporting --runtime=30 --time_based=1 --do_verify=0 --verify=crc32c --verify_backlog=1
- name: Run FIO bs 128k - name: Run FIO bs 128k
timeout-minutes: 15 timeout-minutes: 15

View file

@ -8,7 +8,7 @@ cgo ?= 0
binary: binary:
export SWCOMMIT=$(shell git rev-parse --short HEAD) export SWCOMMIT=$(shell git rev-parse --short HEAD)
export SWLDFLAGS="-X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=$(SWCOMMIT)" export SWLDFLAGS="-X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=$(SWCOMMIT)"
cd ../weed && CGO_ENABLED=$(cgo) GOOS=linux go build $(options) -tags "$(tags)" -ldflags "-extldflags -static $(SWLDFLAGS)" && mv weed ../docker/ cd ../weed && CGO_ENABLED=$(cgo) GOOS=linux go build $(options) -tags "$(tags)" -ldflags "-s -w -extldflags -static $(SWLDFLAGS)" && mv weed ../docker/
binary_race: options = -race binary_race: options = -race
binary_race: cgo = 1 binary_race: cgo = 1

View file

@ -7,7 +7,7 @@ all: install
.PHONY : clean debug_mount .PHONY : clean debug_mount
install: install:
go install go install -ldflags="-s -w"
clean: clean:
go clean $(SOURCE_DIR) go clean $(SOURCE_DIR)

View file

@ -21,14 +21,16 @@ import (
) )
const ( const (
charsetUpper = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" charsetUpper = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
charset = charsetUpper + "abcdefghijklmnopqrstuvwxyz/" charset = charsetUpper + "abcdefghijklmnopqrstuvwxyz/"
policyDocumentVersion = "2012-10-17" policyDocumentVersion = "2012-10-17"
StatementActionAdmin = "*" StatementActionAdmin = "*"
StatementActionWrite = "Put*" StatementActionWrite = "Put*"
StatementActionRead = "Get*" StatementActionWriteAcp = "PutBucketAcl"
StatementActionList = "List*" StatementActionRead = "Get*"
StatementActionTagging = "Tagging*" StatementActionReadAcp = "GetBucketAcl"
StatementActionList = "List*"
StatementActionTagging = "Tagging*"
) )
var ( var (
@ -44,8 +46,12 @@ func MapToStatementAction(action string) string {
return s3_constants.ACTION_ADMIN return s3_constants.ACTION_ADMIN
case StatementActionWrite: case StatementActionWrite:
return s3_constants.ACTION_WRITE return s3_constants.ACTION_WRITE
case StatementActionWriteAcp:
return s3_constants.ACTION_WRITE_ACP
case StatementActionRead: case StatementActionRead:
return s3_constants.ACTION_READ return s3_constants.ACTION_READ
case StatementActionReadAcp:
return s3_constants.ACTION_READ_ACP
case StatementActionList: case StatementActionList:
return s3_constants.ACTION_LIST return s3_constants.ACTION_LIST
case StatementActionTagging: case StatementActionTagging:
@ -61,8 +67,12 @@ func MapToIdentitiesAction(action string) string {
return StatementActionAdmin return StatementActionAdmin
case s3_constants.ACTION_WRITE: case s3_constants.ACTION_WRITE:
return StatementActionWrite return StatementActionWrite
case s3_constants.ACTION_WRITE_ACP:
return StatementActionWriteAcp
case s3_constants.ACTION_READ: case s3_constants.ACTION_READ:
return StatementActionRead return StatementActionRead
case s3_constants.ACTION_READ_ACP:
return StatementActionReadAcp
case s3_constants.ACTION_LIST: case s3_constants.ACTION_LIST:
return StatementActionList return StatementActionList
case s3_constants.ACTION_TAGGING: case s3_constants.ACTION_TAGGING:

View file

@ -27,7 +27,6 @@ type FileHandle struct {
dirtyPages *PageWriter dirtyPages *PageWriter
reader *filer.ChunkReadAt reader *filer.ChunkReadAt
contentType string contentType string
sync.RWMutex
isDeleted bool isDeleted bool
@ -102,8 +101,9 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
} }
func (fh *FileHandle) ReleaseHandle() { func (fh *FileHandle) ReleaseHandle() {
fh.Lock()
defer fh.Unlock() fhActiveLock := fh.wfs.fhLockTable.AcquireLock("ReleaseHandle", fh.fh, util.ExclusiveLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
fh.entryLock.Lock() fh.entryLock.Lock()
defer fh.entryLock.Unlock() defer fh.entryLock.Unlock()

View file

@ -78,6 +78,7 @@ type WFS struct {
dhmap *DirectoryHandleToInode dhmap *DirectoryHandleToInode
fuseServer *fuse.Server fuseServer *fuse.Server
IsOverQuota bool IsOverQuota bool
fhLockTable *util.LockTable[FileHandleId]
} }
func NewSeaweedFileSystem(option *Option) *WFS { func NewSeaweedFileSystem(option *Option) *WFS {
@ -88,6 +89,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)), inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
fhmap: NewFileHandleToInode(), fhmap: NewFileHandleToInode(),
dhmap: NewDirectoryHandleToInode(), dhmap: NewDirectoryHandleToInode(),
fhLockTable: util.NewLockTable[FileHandleId](),
} }
wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses))) wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))

View file

@ -1,6 +1,7 @@
package mount package mount
import ( import (
"github.com/seaweedfs/seaweedfs/weed/util"
"net/http" "net/http"
"time" "time"
@ -44,16 +45,16 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
} }
// lock source and target file handles // lock source and target file handles
fhOut.Lock() fhOutActiveLock := fhOut.wfs.fhLockTable.AcquireLock("CopyFileRange", fhOut.fh, util.ExclusiveLock)
defer fhOut.Unlock() defer fhOut.wfs.fhLockTable.ReleaseLock(fhOut.fh, fhOutActiveLock)
if fhOut.entry == nil { if fhOut.entry == nil {
return 0, fuse.ENOENT return 0, fuse.ENOENT
} }
if fhIn.fh != fhOut.fh { if fhIn.fh != fhOut.fh {
fhIn.RLock() fhInActiveLock := fhIn.wfs.fhLockTable.AcquireLock("CopyFileRange", fhIn.fh, util.ExclusiveLock)
defer fhIn.RUnlock() defer fhIn.wfs.fhLockTable.ReleaseLock(fhIn.fh, fhInActiveLock)
} }
// directories are not supported // directories are not supported

View file

@ -1,6 +1,7 @@
package mount package mount
import ( import (
"github.com/seaweedfs/seaweedfs/weed/util"
"syscall" "syscall"
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
@ -35,8 +36,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
} }
// lock the file until the proper offset was calculated // lock the file until the proper offset was calculated
fh.RLock() fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Lseek", fh.fh, util.SharedLock)
defer fh.RUnlock() defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
fh.entryLock.RLock() fh.entryLock.RLock()
defer fh.entryLock.RUnlock() defer fh.entryLock.RUnlock()

View file

@ -3,6 +3,7 @@ package mount
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/util"
"io" "io"
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
@ -41,8 +42,8 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
return nil, fuse.ENOENT return nil, fuse.ENOENT
} }
fh.RLock() fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Read", fh.fh, util.SharedLock)
defer fh.RUnlock() defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
offset := int64(in.Offset) offset := int64(in.Offset)
totalRead, err := readDataByFileHandle(buff, fh, offset) totalRead, err := readDataByFileHandle(buff, fh, offset)

View file

@ -7,6 +7,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"syscall" "syscall"
"time" "time"
) )
@ -89,8 +90,6 @@ func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Statu
} }
func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
fh.Lock()
defer fh.Unlock()
// flush works at fh level // flush works at fh level
fileFullPath := fh.FullPath() fileFullPath := fh.FullPath()
@ -105,6 +104,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
} }
} }
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("doFlush", fh.fh, util.ExclusiveLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
if !fh.dirtyMetadata { if !fh.dirtyMetadata {
return fuse.OK return fuse.OK
} }

View file

@ -2,6 +2,7 @@ package mount
import ( import (
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/util"
"net/http" "net/http"
"syscall" "syscall"
"time" "time"
@ -48,8 +49,8 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
tsNs := time.Now().UnixNano() tsNs := time.Now().UnixNano()
fh.Lock() fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Write", fh.fh, util.ExclusiveLock)
defer fh.Unlock() defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
entry := fh.GetEntry() entry := fh.GetEntry()
if entry == nil { if entry == nil {

View file

@ -89,10 +89,13 @@ func TestCanDo(t *testing.T) {
Actions: []Action{ Actions: []Action{
"Read:bucket1", "Read:bucket1",
"Write:bucket1/*", "Write:bucket1/*",
"WriteAcp:bucket1",
}, },
} }
assert.Equal(t, true, ident2.canDo(ACTION_READ, "bucket1", "/a/b/c/d.txt")) assert.Equal(t, true, ident2.canDo(ACTION_READ, "bucket1", "/a/b/c/d.txt"))
assert.Equal(t, true, ident2.canDo(ACTION_WRITE, "bucket1", "/a/b/c/d.txt")) assert.Equal(t, true, ident2.canDo(ACTION_WRITE, "bucket1", "/a/b/c/d.txt"))
assert.Equal(t, true, ident2.canDo(ACTION_WRITE_ACP, "bucket1", ""))
assert.Equal(t, false, ident2.canDo(ACTION_READ_ACP, "bucket1", ""))
assert.Equal(t, false, ident2.canDo(ACTION_LIST, "bucket1", "/a/b/c/d.txt")) assert.Equal(t, false, ident2.canDo(ACTION_LIST, "bucket1", "/a/b/c/d.txt"))
// across buckets // across buckets
@ -106,15 +109,18 @@ func TestCanDo(t *testing.T) {
assert.Equal(t, true, ident3.canDo(ACTION_READ, "bucket1", "/a/b/c/d.txt")) assert.Equal(t, true, ident3.canDo(ACTION_READ, "bucket1", "/a/b/c/d.txt"))
assert.Equal(t, true, ident3.canDo(ACTION_WRITE, "bucket1", "/a/b/c/d.txt")) assert.Equal(t, true, ident3.canDo(ACTION_WRITE, "bucket1", "/a/b/c/d.txt"))
assert.Equal(t, false, ident3.canDo(ACTION_LIST, "bucket1", "/a/b/other/some")) assert.Equal(t, false, ident3.canDo(ACTION_LIST, "bucket1", "/a/b/other/some"))
assert.Equal(t, false, ident3.canDo(ACTION_WRITE_ACP, "bucket1", ""))
// partial buckets // partial buckets
ident4 := &Identity{ ident4 := &Identity{
Name: "anything", Name: "anything",
Actions: []Action{ Actions: []Action{
"Read:special_*", "Read:special_*",
"ReadAcp:special_*",
}, },
} }
assert.Equal(t, true, ident4.canDo(ACTION_READ, "special_bucket", "/a/b/c/d.txt")) assert.Equal(t, true, ident4.canDo(ACTION_READ, "special_bucket", "/a/b/c/d.txt"))
assert.Equal(t, true, ident4.canDo(ACTION_READ_ACP, "special_bucket", ""))
assert.Equal(t, false, ident4.canDo(ACTION_READ, "bucket1", "/a/b/c/d.txt")) assert.Equal(t, false, ident4.canDo(ACTION_READ, "bucket1", "/a/b/c/d.txt"))
// admin buckets // admin buckets
@ -125,7 +131,9 @@ func TestCanDo(t *testing.T) {
}, },
} }
assert.Equal(t, true, ident5.canDo(ACTION_READ, "special_bucket", "/a/b/c/d.txt")) assert.Equal(t, true, ident5.canDo(ACTION_READ, "special_bucket", "/a/b/c/d.txt"))
assert.Equal(t, true, ident5.canDo(ACTION_READ_ACP, "special_bucket", ""))
assert.Equal(t, true, ident5.canDo(ACTION_WRITE, "special_bucket", "/a/b/c/d.txt")) assert.Equal(t, true, ident5.canDo(ACTION_WRITE, "special_bucket", "/a/b/c/d.txt"))
assert.Equal(t, true, ident5.canDo(ACTION_WRITE_ACP, "special_bucket", ""))
// anonymous buckets // anonymous buckets
ident6 := &Identity{ ident6 := &Identity{

View file

@ -1,11 +1,13 @@
package s3_constants package s3_constants
const ( const (
ACTION_READ = "Read" ACTION_READ = "Read"
ACTION_WRITE = "Write" ACTION_READ_ACP = "ReadAcp"
ACTION_ADMIN = "Admin" ACTION_WRITE = "Write"
ACTION_TAGGING = "Tagging" ACTION_WRITE_ACP = "WriteAcp"
ACTION_LIST = "List" ACTION_ADMIN = "Admin"
ACTION_TAGGING = "Tagging"
ACTION_LIST = "List"
SeaweedStorageDestinationHeader = "x-seaweedfs-destination" SeaweedStorageDestinationHeader = "x-seaweedfs-destination"
MultipartUploadsFolder = ".uploads" MultipartUploadsFolder = ".uploads"

View file

@ -7,7 +7,7 @@ import (
var ( var (
CircuitBreakerConfigDir = "/etc/s3" CircuitBreakerConfigDir = "/etc/s3"
CircuitBreakerConfigFile = "circuit_breaker.json" CircuitBreakerConfigFile = "circuit_breaker.json"
AllowedActions = []string{ACTION_READ, ACTION_WRITE, ACTION_LIST, ACTION_TAGGING, ACTION_ADMIN} AllowedActions = []string{ACTION_READ, ACTION_READ_ACP, ACTION_WRITE, ACTION_WRITE_ACP, ACTION_LIST, ACTION_TAGGING, ACTION_ADMIN}
LimitTypeCount = "Count" LimitTypeCount = "Count"
LimitTypeBytes = "MB" LimitTypeBytes = "MB"
Separator = ":" Separator = ":"

View file

@ -147,7 +147,7 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING)), "DELETE")).Queries("tagging", "") bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING)), "DELETE")).Queries("tagging", "")
// PutObjectACL // PutObjectACL
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectAclHandler, ACTION_WRITE)), "PUT")).Queries("acl", "") bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectAclHandler, ACTION_WRITE_ACP)), "PUT")).Queries("acl", "")
// PutObjectRetention // PutObjectRetention
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectRetentionHandler, ACTION_WRITE)), "PUT")).Queries("retention", "") bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectRetentionHandler, ACTION_WRITE)), "PUT")).Queries("retention", "")
// PutObjectLegalHold // PutObjectLegalHold
@ -156,7 +156,7 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectLockConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("object-lock", "") bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectLockConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("object-lock", "")
// GetObjectACL // GetObjectACL
bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectAclHandler, ACTION_READ)), "GET")).Queries("acl", "") bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectAclHandler, ACTION_READ_ACP)), "GET")).Queries("acl", "")
// objects with query // objects with query
@ -183,9 +183,9 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
bucket.Methods("POST").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteMultipleObjectsHandler, ACTION_WRITE)), "DELETE")).Queries("delete", "") bucket.Methods("POST").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteMultipleObjectsHandler, ACTION_WRITE)), "DELETE")).Queries("delete", "")
// GetBucketACL // GetBucketACL
bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketAclHandler, ACTION_READ)), "GET")).Queries("acl", "") bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketAclHandler, ACTION_READ_ACP)), "GET")).Queries("acl", "")
// PutBucketACL // PutBucketACL
bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketAclHandler, ACTION_WRITE)), "PUT")).Queries("acl", "") bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketAclHandler, ACTION_WRITE_ACP)), "PUT")).Queries("acl", "")
// GetBucketPolicy // GetBucketPolicy
bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketPolicyHandler, ACTION_READ)), "GET")).Queries("policy", "") bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketPolicyHandler, ACTION_READ)), "GET")).Queries("policy", "")

153
weed/util/lock_table.go Normal file
View file

@ -0,0 +1,153 @@
package util
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"sync"
"sync/atomic"
)
// LockTable is a table of locks that can be acquired.
// Locks are acquired in order of request.
type LockTable[T comparable] struct {
mu sync.Mutex
locks map[T]*LockEntry
lockIdSeq int64
}
type LockEntry struct {
mu sync.Mutex
waiters []*ActiveLock // ordered waiters that are blocked by exclusive locks
activeLockOwnerCount int32
lockType LockType
cond *sync.Cond
}
type LockType int
const (
SharedLock LockType = iota
ExclusiveLock
)
type ActiveLock struct {
ID int64
isDeleted bool
intention string // for debugging
}
func NewLockTable[T comparable]() *LockTable[T] {
return &LockTable[T]{
locks: make(map[T]*LockEntry),
}
}
func (lt *LockTable[T]) NewActiveLock(intention string) *ActiveLock {
id := atomic.AddInt64(&lt.lockIdSeq, 1)
l := &ActiveLock{ID: id, intention: intention}
return l
}
func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType) (lock *ActiveLock) {
lt.mu.Lock()
// Get or create the lock entry for the key
entry, exists := lt.locks[key]
if !exists {
entry = &LockEntry{}
entry.cond = sync.NewCond(&entry.mu)
lt.locks[key] = entry
}
lt.mu.Unlock()
lock = lt.NewActiveLock(intention)
// If the lock is held exclusively, wait
entry.mu.Lock()
if len(entry.waiters) > 0 || lockType == ExclusiveLock {
if glog.V(4) {
fmt.Printf("ActiveLock %d %s wait for %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount)
if len(entry.waiters) > 0 {
for _, waiter := range entry.waiters {
fmt.Printf(" %d", waiter.ID)
}
fmt.Printf("\n")
}
}
entry.waiters = append(entry.waiters, lock)
if lockType == ExclusiveLock {
for !lock.isDeleted && ((len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeLockOwnerCount > 0) {
entry.cond.Wait()
}
} else {
for !lock.isDeleted && (len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) {
entry.cond.Wait()
}
}
// Remove the transaction from the waiters list
if len(entry.waiters) > 0 && lock.ID == entry.waiters[0].ID {
entry.waiters = entry.waiters[1:]
entry.cond.Broadcast()
}
}
entry.activeLockOwnerCount++
// Otherwise, grant the lock
entry.lockType = lockType
if glog.V(4) {
fmt.Printf("ActiveLock %d %s locked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount)
if len(entry.waiters) > 0 {
for _, waiter := range entry.waiters {
fmt.Printf(" %d", waiter.ID)
}
fmt.Printf("\n")
}
}
entry.mu.Unlock()
return lock
}
func (lt *LockTable[T]) ReleaseLock(key T, lock *ActiveLock) {
lt.mu.Lock()
defer lt.mu.Unlock()
entry, exists := lt.locks[key]
if !exists {
return
}
entry.mu.Lock()
defer entry.mu.Unlock()
// Remove the transaction from the waiters list
for i, waiter := range entry.waiters {
if waiter == lock {
waiter.isDeleted = true
entry.waiters = append(entry.waiters[:i], entry.waiters[i+1:]...)
break
}
}
// If there are no waiters, release the lock
if len(entry.waiters) == 0 {
delete(lt.locks, key)
}
if glog.V(4) {
fmt.Printf("ActiveLock %d %s unlocked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, entry.lockType, len(entry.waiters), entry.activeLockOwnerCount)
if len(entry.waiters) > 0 {
for _, waiter := range entry.waiters {
fmt.Printf(" %d", waiter.ID)
}
fmt.Printf("\n")
}
}
entry.activeLockOwnerCount--
// Notify the next waiter
entry.cond.Broadcast()
}
func main() {
}

View file

@ -0,0 +1,42 @@
package util
import (
"fmt"
"math/rand"
"sync"
"testing"
"time"
)
func TestOrderedLock(t *testing.T) {
lt := NewLockTable[string]()
var wg sync.WaitGroup
// Simulate transactions requesting locks
for i := 1; i <= 50; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := "resource"
lockType := SharedLock
if i%5 == 0 {
lockType = ExclusiveLock
}
// Simulate attempting to acquire the lock
lock := lt.AcquireLock("", key, lockType)
// Lock acquired, perform some work
fmt.Printf("ActiveLock %d acquired lock %v\n", lock.ID, lockType)
// Simulate some work
time.Sleep(time.Duration(rand.Int31n(10)*10) * time.Millisecond)
// Release the lock
lt.ReleaseLock(key, lock)
fmt.Printf("ActiveLock %d released lock %v\n", lock.ID, lockType)
}(i)
}
wg.Wait()
}