mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
filer: DeleteFolderChildren for deleting large folders
This commit is contained in:
parent
0fa1269bc7
commit
bbb6ebc3c0
|
@ -59,15 +59,6 @@ const (
|
|||
# $HOME/.seaweedfs/filer.toml
|
||||
# /etc/seaweedfs/filer.toml
|
||||
|
||||
[memory]
|
||||
# local in memory, mostly for testing purpose
|
||||
enabled = false
|
||||
|
||||
[leveldb]
|
||||
# local on disk, mostly for simple single-machine setup, fairly scalable
|
||||
enabled = false
|
||||
dir = "." # directory to store level db files
|
||||
|
||||
[leveldb2]
|
||||
# local on disk, mostly for simple single-machine setup, fairly scalable
|
||||
# faster than previous leveldb, recommended.
|
||||
|
|
|
@ -15,6 +15,7 @@ type AbstractSqlStore struct {
|
|||
SqlUpdate string
|
||||
SqlFind string
|
||||
SqlDelete string
|
||||
SqlDeleteFolderChildren string
|
||||
SqlListExclusive string
|
||||
SqlListInclusive string
|
||||
}
|
||||
|
@ -132,6 +133,21 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.
|
|||
return nil
|
||||
}
|
||||
|
||||
func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
|
||||
|
||||
res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, hashToLong(string(fullpath)), fullpath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
|
||||
}
|
||||
|
||||
_, err = res.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
|
||||
|
||||
sqlText := store.SqlListExclusive
|
||||
|
|
|
@ -112,6 +112,17 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.Fu
|
|||
return nil
|
||||
}
|
||||
|
||||
func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
|
||||
|
||||
if err := store.session.Query(
|
||||
"DELETE FROM filemeta WHERE directory=?",
|
||||
fullpath).Exec(); err != nil {
|
||||
return fmt.Errorf("delete %s : %v", fullpath, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
|
||||
limit int) (entries []*filer2.Entry, err error) {
|
||||
|
||||
|
|
|
@ -123,6 +123,16 @@ func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat
|
|||
return nil
|
||||
}
|
||||
|
||||
func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
|
||||
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
|
||||
|
||||
if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil {
|
||||
return fmt.Errorf("deleteFolderChildren %s : %v", fullpath, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *EtcdStore) ListDirectoryEntries(
|
||||
ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int,
|
||||
) (entries []*filer2.Entry, err error) {
|
||||
|
|
|
@ -3,7 +3,6 @@ package filer2
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
@ -207,67 +206,6 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er
|
|||
return f.store.FindEntry(ctx, p)
|
||||
}
|
||||
|
||||
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
|
||||
entry, err := f.FindEntry(ctx, p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if entry.IsDirectory() {
|
||||
limit := int(1)
|
||||
if isRecursive {
|
||||
limit = math.MaxInt32
|
||||
}
|
||||
lastFileName := ""
|
||||
includeLastFile := false
|
||||
for limit > 0 {
|
||||
entries, err := f.ListDirectoryEntries(ctx, p, lastFileName, includeLastFile, PaginationSize)
|
||||
if err != nil {
|
||||
glog.Errorf("list folder %s: %v", p, err)
|
||||
return fmt.Errorf("list folder %s: %v", p, err)
|
||||
}
|
||||
|
||||
if len(entries) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
if isRecursive {
|
||||
for _, sub := range entries {
|
||||
lastFileName = sub.Name()
|
||||
err = f.DeleteEntryMetaAndData(ctx, sub.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
|
||||
if err != nil && !ignoreRecursiveError {
|
||||
return err
|
||||
}
|
||||
limit--
|
||||
if limit <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(entries) < PaginationSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
f.cacheDelDirectory(string(p))
|
||||
|
||||
}
|
||||
|
||||
if shouldDeleteChunks {
|
||||
f.DeleteChunks(p, entry.Chunks)
|
||||
}
|
||||
|
||||
if p == "/" {
|
||||
return nil
|
||||
}
|
||||
glog.V(3).Infof("deleting entry %v", p)
|
||||
|
||||
f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
|
||||
|
||||
return f.store.DeleteEntry(ctx, p)
|
||||
}
|
||||
|
||||
func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
|
||||
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
|
||||
p = p[0 : len(p)-1]
|
||||
|
|
|
@ -3,6 +3,8 @@ package filer2
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
|
@ -124,35 +126,42 @@ func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath string)
|
|||
return
|
||||
}
|
||||
|
||||
func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath string, fn func(entry *filer_pb.Entry)) (err error) {
|
||||
func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
|
||||
|
||||
err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
lastEntryName := ""
|
||||
|
||||
for {
|
||||
|
||||
request := &filer_pb.ListEntriesRequest{
|
||||
Directory: fullDirPath,
|
||||
Prefix: prefix,
|
||||
StartFromFileName: lastEntryName,
|
||||
Limit: PaginationSize,
|
||||
Limit: math.MaxUint32,
|
||||
}
|
||||
|
||||
glog.V(3).Infof("read directory: %v", request)
|
||||
resp, err := client.ListEntries(ctx, request)
|
||||
stream, err := client.ListEntries(ctx, request)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list %s: %v", fullDirPath, err)
|
||||
}
|
||||
|
||||
for _, entry := range resp.Entries {
|
||||
fn(entry)
|
||||
lastEntryName = entry.Name
|
||||
var prevEntry *filer_pb.Entry
|
||||
for {
|
||||
resp, recvErr := stream.Recv()
|
||||
if recvErr != nil {
|
||||
if recvErr == io.EOF {
|
||||
if prevEntry != nil {
|
||||
fn(prevEntry, true)
|
||||
}
|
||||
|
||||
if len(resp.Entries) < PaginationSize {
|
||||
break
|
||||
} else {
|
||||
return recvErr
|
||||
}
|
||||
|
||||
}
|
||||
if prevEntry != nil {
|
||||
fn(prevEntry, false)
|
||||
}
|
||||
prevEntry = resp.Entry
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
102
weed/filer2/filer_delete_entry.go
Normal file
102
weed/filer2/filer_delete_entry.go
Normal file
|
@ -0,0 +1,102 @@
|
|||
package filer2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
)
|
||||
|
||||
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
|
||||
if p == "/" {
|
||||
return nil
|
||||
}
|
||||
|
||||
entry, findErr := f.FindEntry(ctx, p)
|
||||
if findErr != nil {
|
||||
return findErr
|
||||
}
|
||||
|
||||
var chunks []*filer_pb.FileChunk
|
||||
chunks = append(chunks, entry.Chunks...)
|
||||
if entry.IsDirectory() {
|
||||
// delete the folder children, not including the folder itself
|
||||
var dirChunks []*filer_pb.FileChunk
|
||||
dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete directory %s: %v", p, err)
|
||||
}
|
||||
chunks = append(chunks, dirChunks...)
|
||||
f.cacheDelDirectory(string(p))
|
||||
}
|
||||
// delete the file or folder
|
||||
err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete file %s: %v", p, err)
|
||||
}
|
||||
|
||||
if shouldDeleteChunks {
|
||||
go f.DeleteChunks(chunks)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (chunks []*filer_pb.FileChunk, err error) {
|
||||
|
||||
lastFileName := ""
|
||||
includeLastFile := false
|
||||
for {
|
||||
entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize)
|
||||
if err != nil {
|
||||
glog.Errorf("list folder %s: %v", entry.FullPath, err)
|
||||
return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
|
||||
}
|
||||
if lastFileName == "" && !isRecursive && len(entries) > 0 {
|
||||
// only for first iteration in the loop
|
||||
return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
|
||||
}
|
||||
|
||||
for _, sub := range entries {
|
||||
lastFileName = sub.Name()
|
||||
var dirChunks []*filer_pb.FileChunk
|
||||
if sub.IsDirectory() {
|
||||
dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
|
||||
}
|
||||
if err != nil && !ignoreRecursiveError {
|
||||
return nil, err
|
||||
}
|
||||
if shouldDeleteChunks {
|
||||
chunks = append(chunks, dirChunks...)
|
||||
}
|
||||
}
|
||||
|
||||
if len(entries) < PaginationSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
f.cacheDelDirectory(string(entry.FullPath))
|
||||
|
||||
glog.V(3).Infof("deleting directory %v", entry.FullPath)
|
||||
|
||||
if storeDeletionErr := f.store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
|
||||
return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
|
||||
}
|
||||
f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
|
||||
|
||||
return chunks, nil
|
||||
}
|
||||
|
||||
func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool) (err error) {
|
||||
|
||||
glog.V(3).Infof("deleting entry %v", entry.FullPath)
|
||||
|
||||
if storeDeletionErr := f.store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
|
||||
return fmt.Errorf("filer store delete: %v", storeDeletionErr)
|
||||
}
|
||||
f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
|
||||
|
||||
return nil
|
||||
}
|
|
@ -51,9 +51,8 @@ func (f *Filer) loopProcessingDeletion() {
|
|||
}
|
||||
}
|
||||
|
||||
func (f *Filer) DeleteChunks(fullpath FullPath, chunks []*filer_pb.FileChunk) {
|
||||
func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
|
||||
for _, chunk := range chunks {
|
||||
glog.V(3).Infof("deleting %s chunk %s", fullpath, chunk.String())
|
||||
f.fileIdDeletionChan <- chunk.GetFileIdString()
|
||||
}
|
||||
}
|
||||
|
@ -70,7 +69,7 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
|
|||
return
|
||||
}
|
||||
if newEntry == nil {
|
||||
f.DeleteChunks(oldEntry.FullPath, oldEntry.Chunks)
|
||||
f.DeleteChunks(oldEntry.Chunks)
|
||||
}
|
||||
|
||||
var toDelete []*filer_pb.FileChunk
|
||||
|
@ -84,5 +83,5 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
|
|||
toDelete = append(toDelete, oldChunk)
|
||||
}
|
||||
}
|
||||
f.DeleteChunks(oldEntry.FullPath, toDelete)
|
||||
f.DeleteChunks(toDelete)
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ type FilerStore interface {
|
|||
// err == filer2.ErrNotFound if not found
|
||||
FindEntry(context.Context, FullPath) (entry *Entry, err error)
|
||||
DeleteEntry(context.Context, FullPath) (err error)
|
||||
DeleteFolderChildren(context.Context, FullPath) (err error)
|
||||
ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
|
||||
|
||||
BeginTransaction(ctx context.Context) (context.Context, error)
|
||||
|
@ -97,6 +98,16 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err
|
|||
return fsw.actualStore.DeleteEntry(ctx, fp)
|
||||
}
|
||||
|
||||
func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullPath) (err error) {
|
||||
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Inc()
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
|
||||
return fsw.actualStore.DeleteFolderChildren(ctx, fp)
|
||||
}
|
||||
|
||||
func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
|
||||
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc()
|
||||
start := time.Now()
|
||||
|
|
|
@ -5,12 +5,13 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
weed_util "github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
weed_util "github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -123,6 +124,34 @@ func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.Full
|
|||
return nil
|
||||
}
|
||||
|
||||
func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
|
||||
|
||||
batch := new(leveldb.Batch)
|
||||
|
||||
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
|
||||
iter := store.db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
|
||||
for iter.Next() {
|
||||
key := iter.Key()
|
||||
if !bytes.HasPrefix(key, directoryPrefix) {
|
||||
break
|
||||
}
|
||||
fileName := getNameFromKey(key)
|
||||
if fileName == "" {
|
||||
continue
|
||||
}
|
||||
batch.Delete([]byte(genKey(string(fullpath), fileName)))
|
||||
}
|
||||
iter.Release()
|
||||
|
||||
err = store.db.Write(batch, nil)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete %s : %v", fullpath, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
|
||||
limit int) (entries []*filer2.Entry, err error) {
|
||||
|
||||
|
|
|
@ -8,12 +8,13 @@ import (
|
|||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
weed_util "github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
weed_util "github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -134,6 +135,34 @@ func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.Ful
|
|||
return nil
|
||||
}
|
||||
|
||||
func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
|
||||
directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
|
||||
|
||||
batch := new(leveldb.Batch)
|
||||
|
||||
iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
|
||||
for iter.Next() {
|
||||
key := iter.Key()
|
||||
if !bytes.HasPrefix(key, directoryPrefix) {
|
||||
break
|
||||
}
|
||||
fileName := getNameFromKey(key)
|
||||
if fileName == "" {
|
||||
continue
|
||||
}
|
||||
batch.Delete(append(directoryPrefix, []byte(fileName)...))
|
||||
}
|
||||
iter.Release()
|
||||
|
||||
err = store.dbs[partitionId].Write(batch, nil)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete %s : %v", fullpath, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
|
||||
limit int) (entries []*filer2.Entry, err error) {
|
||||
|
||||
|
|
|
@ -1,132 +0,0 @@
|
|||
package memdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/google/btree"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func init() {
|
||||
filer2.Stores = append(filer2.Stores, &MemDbStore{})
|
||||
}
|
||||
|
||||
type MemDbStore struct {
|
||||
tree *btree.BTree
|
||||
treeLock sync.Mutex
|
||||
}
|
||||
|
||||
type entryItem struct {
|
||||
*filer2.Entry
|
||||
}
|
||||
|
||||
func (a entryItem) Less(b btree.Item) bool {
|
||||
return strings.Compare(string(a.FullPath), string(b.(entryItem).FullPath)) < 0
|
||||
}
|
||||
|
||||
func (store *MemDbStore) GetName() string {
|
||||
return "memory"
|
||||
}
|
||||
|
||||
func (store *MemDbStore) Initialize(configuration util.Configuration) (err error) {
|
||||
store.tree = btree.New(8)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *MemDbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
|
||||
return ctx, nil
|
||||
}
|
||||
func (store *MemDbStore) CommitTransaction(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
func (store *MemDbStore) RollbackTransaction(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *MemDbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
|
||||
// println("inserting", entry.FullPath)
|
||||
store.treeLock.Lock()
|
||||
store.tree.ReplaceOrInsert(entryItem{entry})
|
||||
store.treeLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *MemDbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
|
||||
if _, err = store.FindEntry(ctx, entry.FullPath); err != nil {
|
||||
return fmt.Errorf("no such file %s : %v", entry.FullPath, err)
|
||||
}
|
||||
store.treeLock.Lock()
|
||||
store.tree.ReplaceOrInsert(entryItem{entry})
|
||||
store.treeLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *MemDbStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
|
||||
item := store.tree.Get(entryItem{&filer2.Entry{FullPath: fullpath}})
|
||||
if item == nil {
|
||||
return nil, filer2.ErrNotFound
|
||||
}
|
||||
entry = item.(entryItem).Entry
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
func (store *MemDbStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
|
||||
store.treeLock.Lock()
|
||||
store.tree.Delete(entryItem{&filer2.Entry{FullPath: fullpath}})
|
||||
store.treeLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *MemDbStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
|
||||
|
||||
startFrom := string(fullpath)
|
||||
if startFileName != "" {
|
||||
startFrom = startFrom + "/" + startFileName
|
||||
}
|
||||
|
||||
store.tree.AscendGreaterOrEqual(entryItem{&filer2.Entry{FullPath: filer2.FullPath(startFrom)}},
|
||||
func(item btree.Item) bool {
|
||||
if limit <= 0 {
|
||||
return false
|
||||
}
|
||||
entry := item.(entryItem).Entry
|
||||
// println("checking", entry.FullPath)
|
||||
|
||||
if entry.FullPath == fullpath {
|
||||
// skipping the current directory
|
||||
// println("skipping the folder", entry.FullPath)
|
||||
return true
|
||||
}
|
||||
|
||||
dir, name := entry.FullPath.DirAndName()
|
||||
if name == startFileName {
|
||||
if inclusive {
|
||||
limit--
|
||||
entries = append(entries, entry)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// only iterate the same prefix
|
||||
if !strings.HasPrefix(string(entry.FullPath), string(fullpath)) {
|
||||
// println("breaking from", entry.FullPath)
|
||||
return false
|
||||
}
|
||||
|
||||
if dir != string(fullpath) {
|
||||
// this could be items in deeper directories
|
||||
// println("skipping deeper folder", entry.FullPath)
|
||||
return true
|
||||
}
|
||||
// now process the directory items
|
||||
// println("adding entry", entry.FullPath)
|
||||
limit--
|
||||
entries = append(entries, entry)
|
||||
return true
|
||||
},
|
||||
)
|
||||
return entries, nil
|
||||
}
|
|
@ -1,149 +0,0 @@
|
|||
package memdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCreateAndFind(t *testing.T) {
|
||||
filer := filer2.NewFiler(nil, nil)
|
||||
store := &MemDbStore{}
|
||||
store.Initialize(nil)
|
||||
filer.SetStore(store)
|
||||
filer.DisableDirectoryCache()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
|
||||
|
||||
entry1 := &filer2.Entry{
|
||||
FullPath: fullpath,
|
||||
Attr: filer2.Attr{
|
||||
Mode: 0440,
|
||||
Uid: 1234,
|
||||
Gid: 5678,
|
||||
},
|
||||
}
|
||||
|
||||
if err := filer.CreateEntry(ctx, entry1); err != nil {
|
||||
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
||||
return
|
||||
}
|
||||
|
||||
entry, err := filer.FindEntry(ctx, fullpath)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("find entry: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if entry.FullPath != entry1.FullPath {
|
||||
t.Errorf("find wrong entry: %v", entry.FullPath)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestCreateFileAndList(t *testing.T) {
|
||||
filer := filer2.NewFiler(nil, nil)
|
||||
store := &MemDbStore{}
|
||||
store.Initialize(nil)
|
||||
filer.SetStore(store)
|
||||
filer.DisableDirectoryCache()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
entry1 := &filer2.Entry{
|
||||
FullPath: filer2.FullPath("/home/chris/this/is/one/file1.jpg"),
|
||||
Attr: filer2.Attr{
|
||||
Mode: 0440,
|
||||
Uid: 1234,
|
||||
Gid: 5678,
|
||||
},
|
||||
}
|
||||
|
||||
entry2 := &filer2.Entry{
|
||||
FullPath: filer2.FullPath("/home/chris/this/is/one/file2.jpg"),
|
||||
Attr: filer2.Attr{
|
||||
Mode: 0440,
|
||||
Uid: 1234,
|
||||
Gid: 5678,
|
||||
},
|
||||
}
|
||||
|
||||
filer.CreateEntry(ctx, entry1)
|
||||
filer.CreateEntry(ctx, entry2)
|
||||
|
||||
// checking the 2 files
|
||||
entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "", false, 100)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("list entries: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(entries) != 2 {
|
||||
t.Errorf("list entries count: %v", len(entries))
|
||||
return
|
||||
}
|
||||
|
||||
if entries[0].FullPath != entry1.FullPath {
|
||||
t.Errorf("find wrong entry 1: %v", entries[0].FullPath)
|
||||
return
|
||||
}
|
||||
|
||||
if entries[1].FullPath != entry2.FullPath {
|
||||
t.Errorf("find wrong entry 2: %v", entries[1].FullPath)
|
||||
return
|
||||
}
|
||||
|
||||
// checking the offset
|
||||
entries, err = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "file1.jpg", false, 100)
|
||||
if len(entries) != 1 {
|
||||
t.Errorf("list entries count: %v", len(entries))
|
||||
return
|
||||
}
|
||||
|
||||
// checking one upper directory
|
||||
entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100)
|
||||
if len(entries) != 1 {
|
||||
t.Errorf("list entries count: %v", len(entries))
|
||||
return
|
||||
}
|
||||
|
||||
// checking root directory
|
||||
entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
|
||||
if len(entries) != 1 {
|
||||
t.Errorf("list entries count: %v", len(entries))
|
||||
return
|
||||
}
|
||||
|
||||
// add file3
|
||||
file3Path := filer2.FullPath("/home/chris/this/is/file3.jpg")
|
||||
entry3 := &filer2.Entry{
|
||||
FullPath: file3Path,
|
||||
Attr: filer2.Attr{
|
||||
Mode: 0440,
|
||||
Uid: 1234,
|
||||
Gid: 5678,
|
||||
},
|
||||
}
|
||||
filer.CreateEntry(ctx, entry3)
|
||||
|
||||
// checking one upper directory
|
||||
entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100)
|
||||
if len(entries) != 2 {
|
||||
t.Errorf("list entries count: %v", len(entries))
|
||||
return
|
||||
}
|
||||
|
||||
// delete file and count
|
||||
filer.DeleteEntryMetaAndData(ctx, file3Path, false, false, false)
|
||||
entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100)
|
||||
if len(entries) != 1 {
|
||||
t.Errorf("list entries count: %v", len(entries))
|
||||
return
|
||||
}
|
||||
|
||||
}
|
|
@ -46,6 +46,7 @@ func (store *MysqlStore) initialize(user, password, hostname string, port int, d
|
|||
store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?"
|
||||
store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
|
||||
store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
|
||||
store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?"
|
||||
store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? ORDER BY NAME ASC LIMIT ?"
|
||||
store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? ORDER BY NAME ASC LIMIT ?"
|
||||
|
||||
|
|
|
@ -45,6 +45,7 @@ func (store *PostgresStore) initialize(user, password, hostname string, port int
|
|||
store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4"
|
||||
store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
|
||||
store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
|
||||
store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2"
|
||||
store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4"
|
||||
store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4"
|
||||
|
||||
|
|
|
@ -99,6 +99,24 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath file
|
|||
return nil
|
||||
}
|
||||
|
||||
func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
|
||||
|
||||
members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete folder %s : %v", fullpath, err)
|
||||
}
|
||||
|
||||
for _, fileName := range members {
|
||||
path := filer2.NewFullPath(string(fullpath), fileName)
|
||||
_, err = store.Client.Del(string(path)).Result()
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
|
||||
limit int) (entries []*filer2.Entry, err error) {
|
||||
|
||||
|
|
|
@ -141,6 +141,38 @@ func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat
|
|||
return nil
|
||||
}
|
||||
|
||||
func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
|
||||
|
||||
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
|
||||
|
||||
tx := store.getTx(ctx)
|
||||
|
||||
iter, err := tx.Iter(directoryPrefix, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleteFolderChildren %s: %v", fullpath, err)
|
||||
}
|
||||
defer iter.Close()
|
||||
for iter.Valid() {
|
||||
key := iter.Key()
|
||||
if !bytes.HasPrefix(key, directoryPrefix) {
|
||||
break
|
||||
}
|
||||
fileName := getNameFromKey(key)
|
||||
if fileName == "" {
|
||||
iter.Next()
|
||||
continue
|
||||
}
|
||||
|
||||
if err = tx.Delete(genKey(string(fullpath), fileName)); err != nil {
|
||||
return fmt.Errorf("delete %s : %v", fullpath, err)
|
||||
}
|
||||
|
||||
iter.Next()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
|
||||
limit int) (entries []*filer2.Entry, err error) {
|
||||
|
||||
|
|
|
@ -1,12 +1,6 @@
|
|||
package s3api
|
||||
|
||||
import (
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
|
||||
"github.com/gorilla/mux"
|
||||
"google.golang.org/grpc"
|
||||
"net/http"
|
||||
|
|
|
@ -140,7 +140,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
|
|||
})
|
||||
|
||||
if err == nil {
|
||||
fs.filer.DeleteChunks(fullpath, garbages)
|
||||
fs.filer.DeleteChunks(garbages)
|
||||
}
|
||||
|
||||
return &filer_pb.CreateEntryResponse{}, err
|
||||
|
@ -189,8 +189,8 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
|
|||
}
|
||||
|
||||
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
|
||||
fs.filer.DeleteChunks(entry.FullPath, unusedChunks)
|
||||
fs.filer.DeleteChunks(entry.FullPath, garbages)
|
||||
fs.filer.DeleteChunks(unusedChunks)
|
||||
fs.filer.DeleteChunks(garbages)
|
||||
}
|
||||
|
||||
fs.filer.NotifyUpdateEvent(entry, newEntry, true)
|
||||
|
|
|
@ -18,7 +18,6 @@ import (
|
|||
_ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
|
||||
|
|
|
@ -194,7 +194,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w
|
|||
}
|
||||
// glog.V(4).Infof("saving %s => %+v", path, entry)
|
||||
if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil {
|
||||
fs.filer.DeleteChunks(entry.FullPath, entry.Chunks)
|
||||
fs.filer.DeleteChunks(entry.Chunks)
|
||||
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
|
||||
writeJsonError(w, r, http.StatusInternalServerError, dbErr)
|
||||
err = dbErr
|
||||
|
|
|
@ -177,7 +177,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
|
|||
Chunks: fileChunks,
|
||||
}
|
||||
if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil {
|
||||
fs.filer.DeleteChunks(entry.FullPath, entry.Chunks)
|
||||
fs.filer.DeleteChunks(entry.Chunks)
|
||||
replyerr = dbErr
|
||||
filerResult.Error = dbErr.Error()
|
||||
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
|
||||
|
|
Loading…
Reference in a new issue