mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
file store adds memdb implementation
This commit is contained in:
parent
a808df5019
commit
81af1bafba
|
@ -2,6 +2,7 @@ package embedded
|
|||
|
||||
import (
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
)
|
||||
|
||||
type EmbeddedStore struct {
|
||||
|
@ -16,10 +17,26 @@ func NewEmbeddedStore(dir string) (filer *EmbeddedStore, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (filer *EmbeddedStore) CreateFile(filePath string, fid string) (err error) {
|
||||
func (filer *EmbeddedStore) InsertEntry(entry *filer2.Entry) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (filer *EmbeddedStore) Mkdir(filePath string) (err error) {
|
||||
func (filer *EmbeddedStore) AddDirectoryLink(directory *filer2.Entry, delta int32) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (filer *EmbeddedStore) AppendFileChunk(fullpath filer2.FullPath, fileChunk filer2.FileChunk) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (filer *EmbeddedStore) FindEntry(fullpath filer2.FullPath) (found bool, entry *filer2.Entry, err error) {
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
func (filer *EmbeddedStore) DeleteEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (filer *EmbeddedStore) ListDirectoryEntries(fullpath filer2.FullPath) (entries []*filer2.Entry, err error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
|
|
@ -3,9 +3,11 @@ package filer2
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2/embedded"
|
||||
"github.com/karlseguin/ccache"
|
||||
"strings"
|
||||
"path/filepath"
|
||||
"time"
|
||||
"os"
|
||||
)
|
||||
|
||||
type Filer struct {
|
||||
|
@ -21,78 +23,132 @@ func NewFiler(master string) *Filer {
|
|||
}
|
||||
}
|
||||
|
||||
func NewEmbeddedFiler(master string, dir string) (*Filer, error) {
|
||||
_, err := embedded.NewEmbeddedStore(dir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create embedded filer store: %v", err)
|
||||
}
|
||||
return &Filer{
|
||||
master: master,
|
||||
// store: store,
|
||||
}, nil
|
||||
func (f *Filer) SetStore(store FilerStore) () {
|
||||
f.store = store
|
||||
}
|
||||
|
||||
func (f *Filer) CreateEntry(entry Entry) (error) {
|
||||
/*
|
||||
1. recursively ensure parent directory is created.
|
||||
2. get current parent directory, add link,
|
||||
3. add the file entry
|
||||
*/
|
||||
func (f *Filer) DisableDirectoryCache() () {
|
||||
f.directoryCache = nil
|
||||
}
|
||||
|
||||
recursivelyEnsureDirectory(entry.Dir, func(parent, name string) error {
|
||||
return nil
|
||||
})
|
||||
func (f *Filer) CreateEntry(entry *Entry) (error) {
|
||||
|
||||
return f.store.CreateEntry(entry)
|
||||
dirParts := strings.Split(string(entry.FullPath), "/")
|
||||
|
||||
// fmt.Printf("directory parts: %+v\n", dirParts)
|
||||
|
||||
var lastDirectoryEntry *Entry
|
||||
|
||||
for i := 1; i < len(dirParts); i++ {
|
||||
dirPath := "/" + filepath.Join(dirParts[:i]...)
|
||||
// fmt.Printf("%d directory: %+v\n", i, dirPath)
|
||||
|
||||
dirFound := false
|
||||
|
||||
// first check local cache
|
||||
dirEntry := f.cacheGetDirectory(dirPath)
|
||||
|
||||
// not found, check the store directly
|
||||
if dirEntry == nil {
|
||||
var dirFindErr error
|
||||
dirFound, dirEntry, dirFindErr = f.FindEntry(FullPath(dirPath))
|
||||
if dirFindErr != nil {
|
||||
return fmt.Errorf("findDirectory %s: %v", dirPath, dirFindErr)
|
||||
}
|
||||
}
|
||||
|
||||
// no such existing directory
|
||||
if !dirFound {
|
||||
|
||||
// create the directory
|
||||
now := time.Now()
|
||||
|
||||
dirEntry = &Entry{
|
||||
FullPath: FullPath(dirPath),
|
||||
Attr: Attr{
|
||||
Mtime: now,
|
||||
Crtime: now,
|
||||
Mode: os.ModeDir | 0660,
|
||||
Uid: entry.Uid,
|
||||
Gid: entry.Gid,
|
||||
Nlink: 2,
|
||||
},
|
||||
}
|
||||
|
||||
mkdirErr := f.store.InsertEntry(dirEntry)
|
||||
if mkdirErr != nil {
|
||||
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
|
||||
}
|
||||
}
|
||||
|
||||
// cache the directory entry
|
||||
f.cacheSetDirectory(dirPath, dirEntry, i)
|
||||
|
||||
// remember the direct parent directory entry
|
||||
if i == len(dirParts)-1 {
|
||||
lastDirectoryEntry = dirEntry
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if lastDirectoryEntry == nil {
|
||||
return fmt.Errorf("parent folder not found: %v", entry.FullPath)
|
||||
}
|
||||
|
||||
if !hasWritePermission(lastDirectoryEntry, entry) {
|
||||
return fmt.Errorf("no write permission in folder %v", lastDirectoryEntry.FullPath)
|
||||
}
|
||||
|
||||
if err := f.store.InsertEntry(entry); err != nil {
|
||||
return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
|
||||
}
|
||||
if err := f.store.AddDirectoryLink(lastDirectoryEntry, 1); err != nil {
|
||||
return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *Filer) AppendFileChunk(p FullPath, c FileChunk) (err error) {
|
||||
return f.store.AppendFileChunk(p, c)
|
||||
}
|
||||
|
||||
func (f *Filer) FindEntry(p FullPath) (found bool, fileEntry Entry, err error) {
|
||||
func (f *Filer) FindEntry(p FullPath) (found bool, entry *Entry, err error) {
|
||||
return f.store.FindEntry(p)
|
||||
}
|
||||
|
||||
func (f *Filer) DeleteEntry(p FullPath) (fileEntry Entry, err error) {
|
||||
func (f *Filer) DeleteEntry(p FullPath) (fileEntry *Entry, err error) {
|
||||
return f.store.DeleteEntry(p)
|
||||
}
|
||||
|
||||
func (f *Filer) ListDirectoryEntries(p FullPath) ([]Entry, error) {
|
||||
func (f *Filer) ListDirectoryEntries(p FullPath) ([]*Entry, error) {
|
||||
if strings.HasSuffix(string(p), "/") {
|
||||
p = p[0:len(p)-1]
|
||||
}
|
||||
return f.store.ListDirectoryEntries(p)
|
||||
}
|
||||
|
||||
func (f *Filer) UpdateEntry(entry Entry) (error) {
|
||||
return f.store.UpdateEntry(entry)
|
||||
}
|
||||
|
||||
func recursivelyEnsureDirectory(fullPath string, fn func(parent, name string) error) (error) {
|
||||
if strings.HasSuffix(fullPath, "/") {
|
||||
fullPath = fullPath[0:len(fullPath)-1]
|
||||
}
|
||||
nextPathEnd := strings.LastIndex(fullPath, "/")
|
||||
if nextPathEnd < 0 {
|
||||
func (f *Filer) cacheGetDirectory(dirpath string) (*Entry) {
|
||||
if f.directoryCache == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
dirName := fullPath[nextPathEnd+1:]
|
||||
parentDirPath := fullPath[0:nextPathEnd]
|
||||
|
||||
if parentDirPath == "" {
|
||||
parentDirPath = "/"
|
||||
item := f.directoryCache.Get(dirpath)
|
||||
if item == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := recursivelyEnsureDirectory(parentDirPath, fn); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := fn(parentDirPath, dirName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return item.Value().(*Entry)
|
||||
}
|
||||
|
||||
func (f *Filer) cacheGetDirectory(dirpath string) (error) {
|
||||
return nil
|
||||
func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
|
||||
|
||||
if f.directoryCache == nil {
|
||||
return
|
||||
}
|
||||
|
||||
minutes := 60
|
||||
if level < 10 {
|
||||
minutes -= level * 6
|
||||
}
|
||||
|
||||
f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute)
|
||||
}
|
||||
|
|
|
@ -4,28 +4,35 @@ import (
|
|||
"errors"
|
||||
"os"
|
||||
"time"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
type FileId string //file id in SeaweedFS
|
||||
type FullPath struct {
|
||||
Dir string //full path of the parent dir
|
||||
Name string //file name without path
|
||||
type FullPath string
|
||||
|
||||
func (fp FullPath) DirAndName() (string, string) {
|
||||
dir, name := filepath.Split(string(fp))
|
||||
if dir == "/" {
|
||||
return dir, name
|
||||
}
|
||||
if len(dir) < 1 {
|
||||
return "/", ""
|
||||
}
|
||||
return dir[:len(dir)-1], name
|
||||
}
|
||||
|
||||
type Attr struct {
|
||||
Mtime time.Time // time of last modification
|
||||
Crtime time.Time // time of creation (OS X only)
|
||||
Mode os.FileMode // file mode
|
||||
Uid uint32 // owner uid
|
||||
Gid uint32 // group gid
|
||||
IsDirectory bool
|
||||
Size uint64 // total size in bytes
|
||||
Nlink uint32 // number of links (usually 1)
|
||||
Mtime time.Time // time of last modification
|
||||
Crtime time.Time // time of creation (OS X only)
|
||||
Mode os.FileMode // file mode
|
||||
Uid uint32 // owner uid
|
||||
Gid uint32 // group gid
|
||||
Size uint64 // total size in bytes
|
||||
Nlink uint32 // number of links (usually 1)
|
||||
}
|
||||
|
||||
type Entry struct {
|
||||
Dir string `json:"dir,omitempty"` //full path of the parent dir
|
||||
Name string `json:"name,omitempty"` //file name without path
|
||||
FullPath
|
||||
|
||||
Attr
|
||||
|
||||
|
@ -40,23 +47,23 @@ type FileChunk struct {
|
|||
}
|
||||
|
||||
type AbstractFiler interface {
|
||||
CreateEntry(Entry) (error)
|
||||
CreateEntry(*Entry) (error)
|
||||
AppendFileChunk(FullPath, FileChunk) (err error)
|
||||
FindEntry(FullPath) (found bool, fileEntry Entry, err error)
|
||||
DeleteEntry(FullPath) (fileEntry Entry, err error)
|
||||
FindEntry(FullPath) (found bool, fileEntry *Entry, err error)
|
||||
DeleteEntry(FullPath) (fileEntry *Entry, err error)
|
||||
|
||||
ListDirectoryEntries(dirPath FullPath) ([]Entry, error)
|
||||
UpdateEntry(Entry) (error)
|
||||
ListDirectoryEntries(dirPath FullPath) ([]*Entry, error)
|
||||
UpdateEntry(*Entry) (error)
|
||||
}
|
||||
|
||||
var ErrNotFound = errors.New("filer: no entry is found in filer store")
|
||||
|
||||
type FilerStore interface {
|
||||
CreateEntry(Entry) (error)
|
||||
InsertEntry(*Entry) (error)
|
||||
AddDirectoryLink(directory *Entry, delta int32) (err error)
|
||||
AppendFileChunk(FullPath, FileChunk) (err error)
|
||||
FindEntry(FullPath) (found bool, fileEntry Entry, err error)
|
||||
DeleteEntry(FullPath) (fileEntry Entry, err error)
|
||||
FindEntry(FullPath) (found bool, entry *Entry, err error)
|
||||
DeleteEntry(FullPath) (fileEntry *Entry, err error)
|
||||
|
||||
ListDirectoryEntries(dirPath FullPath) ([]Entry, error)
|
||||
UpdateEntry(Entry) (error)
|
||||
ListDirectoryEntries(dirPath FullPath) ([]*Entry, error)
|
||||
}
|
||||
|
|
|
@ -1,30 +0,0 @@
|
|||
package filer2
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"path/filepath"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func TestRecursion(t *testing.T) {
|
||||
fullPath := "/home/chris/some/file/abc.jpg"
|
||||
expected := []string{
|
||||
"/", "home",
|
||||
"/home", "chris",
|
||||
"/home/chris", "some",
|
||||
"/home/chris/some", "file",
|
||||
}
|
||||
|
||||
dir, _ := filepath.Split(fullPath)
|
||||
|
||||
i := 0
|
||||
|
||||
recursivelyEnsureDirectory(dir, func(parent, name string) error {
|
||||
if parent != expected[i] || name != expected[i+1] {
|
||||
t.Errorf("recursive directory is wrong! parent=%s dirName=%s", parent, name)
|
||||
}
|
||||
fmt.Printf("processing folder %s \t parent=%s\n", name, parent)
|
||||
i += 2
|
||||
return nil
|
||||
})
|
||||
}
|
94
weed/filer2/memdb/memdb_store.go
Normal file
94
weed/filer2/memdb/memdb_store.go
Normal file
|
@ -0,0 +1,94 @@
|
|||
package memdb
|
||||
|
||||
import (
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
"github.com/google/btree"
|
||||
"strings"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type MemDbStore struct {
|
||||
tree *btree.BTree
|
||||
}
|
||||
|
||||
type Entry struct {
|
||||
*filer2.Entry
|
||||
}
|
||||
|
||||
func (a Entry) Less(b btree.Item) bool {
|
||||
return strings.Compare(string(a.FullPath), string(b.(Entry).FullPath)) < 0
|
||||
}
|
||||
|
||||
func NewMemDbStore() (filer *MemDbStore) {
|
||||
filer = &MemDbStore{}
|
||||
filer.tree = btree.New(8)
|
||||
return
|
||||
}
|
||||
|
||||
func (filer *MemDbStore) InsertEntry(entry *filer2.Entry) (err error) {
|
||||
// println("inserting", entry.FullPath)
|
||||
filer.tree.ReplaceOrInsert(Entry{entry})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (filer *MemDbStore) AddDirectoryLink(directory *filer2.Entry, delta int32) (err error) {
|
||||
directory.Nlink = uint32(int32(directory.Nlink) + delta)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (filer *MemDbStore) AppendFileChunk(fullpath filer2.FullPath, fileChunk filer2.FileChunk) (err error) {
|
||||
found, entry, err := filer.FindEntry(fullpath)
|
||||
if !found {
|
||||
return fmt.Errorf("No such file: %s", fullpath)
|
||||
}
|
||||
entry.Chunks = append(entry.Chunks, fileChunk)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (filer *MemDbStore) FindEntry(fullpath filer2.FullPath) (found bool, entry *filer2.Entry, err error) {
|
||||
item := filer.tree.Get(Entry{&filer2.Entry{FullPath: fullpath}})
|
||||
if item == nil {
|
||||
return false, nil, nil
|
||||
}
|
||||
entry = item.(Entry).Entry
|
||||
return true, entry, nil
|
||||
}
|
||||
|
||||
func (filer *MemDbStore) DeleteEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
|
||||
item := filer.tree.Delete(Entry{&filer2.Entry{FullPath: fullpath}})
|
||||
if item == nil {
|
||||
return nil, nil
|
||||
}
|
||||
entry = item.(Entry).Entry
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
func (filer *MemDbStore) ListDirectoryEntries(fullpath filer2.FullPath) (entries []*filer2.Entry, err error) {
|
||||
filer.tree.AscendGreaterOrEqual(Entry{&filer2.Entry{FullPath: fullpath}},
|
||||
func(item btree.Item) bool {
|
||||
entry := item.(Entry).Entry
|
||||
// println("checking", entry.FullPath)
|
||||
if entry.FullPath == fullpath {
|
||||
// skipping the current directory
|
||||
// println("skipping the folder", entry.FullPath)
|
||||
return true
|
||||
}
|
||||
dir, _ := entry.FullPath.DirAndName()
|
||||
if !strings.HasPrefix(dir, string(fullpath)) {
|
||||
// println("directory is:", dir, "fullpath:", fullpath)
|
||||
// println("breaking from", entry.FullPath)
|
||||
return false
|
||||
}
|
||||
if dir != string(fullpath) {
|
||||
// this could be items in deeper directories
|
||||
// println("skipping deeper folder", entry.FullPath)
|
||||
return true
|
||||
}
|
||||
// now process the directory items
|
||||
// println("adding entry", entry.FullPath)
|
||||
entries = append(entries, entry)
|
||||
return true
|
||||
},
|
||||
)
|
||||
return entries, nil
|
||||
}
|
96
weed/filer2/memdb/memdb_store_test.go
Normal file
96
weed/filer2/memdb/memdb_store_test.go
Normal file
|
@ -0,0 +1,96 @@
|
|||
package memdb
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||
)
|
||||
|
||||
func TestCreateAndFind(t *testing.T) {
|
||||
filer := filer2.NewFiler("")
|
||||
filer.SetStore(NewMemDbStore())
|
||||
filer.DisableDirectoryCache()
|
||||
|
||||
fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
|
||||
|
||||
entry1 := &filer2.Entry{
|
||||
FullPath: fullpath,
|
||||
Attr: filer2.Attr{
|
||||
Mode: 0440,
|
||||
Uid: 1234,
|
||||
Gid: 5678,
|
||||
},
|
||||
}
|
||||
|
||||
if err := filer.CreateEntry(entry1); err != nil {
|
||||
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
||||
return
|
||||
}
|
||||
|
||||
found, entry, err := filer.FindEntry(fullpath)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("find entry: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if !found {
|
||||
t.Errorf("Failed to find newly created file")
|
||||
return
|
||||
}
|
||||
|
||||
if entry.FullPath != entry1.FullPath {
|
||||
t.Errorf("find wrong entry: %v", entry.FullPath)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestCreateFileAndList(t *testing.T) {
|
||||
filer := filer2.NewFiler("")
|
||||
filer.SetStore(NewMemDbStore())
|
||||
filer.DisableDirectoryCache()
|
||||
|
||||
entry1 := &filer2.Entry{
|
||||
FullPath: filer2.FullPath("/home/chris/this/is/one/file1.jpg"),
|
||||
Attr: filer2.Attr{
|
||||
Mode: 0440,
|
||||
Uid: 1234,
|
||||
Gid: 5678,
|
||||
},
|
||||
}
|
||||
|
||||
entry2 := &filer2.Entry{
|
||||
FullPath: filer2.FullPath("/home/chris/this/is/one/file2.jpg"),
|
||||
Attr: filer2.Attr{
|
||||
Mode: 0440,
|
||||
Uid: 1234,
|
||||
Gid: 5678,
|
||||
},
|
||||
}
|
||||
|
||||
filer.CreateEntry(entry1)
|
||||
filer.CreateEntry(entry2)
|
||||
|
||||
entries, err := filer.ListDirectoryEntries(filer2.FullPath("/home/chris/this/is/one/"))
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("list entries: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(entries) != 2 {
|
||||
t.Errorf("list entries count: %v", len(entries))
|
||||
return
|
||||
}
|
||||
|
||||
if entries[0].FullPath != entry1.FullPath {
|
||||
t.Errorf("find wrong entry 1: %v", entries[0].FullPath)
|
||||
return
|
||||
}
|
||||
|
||||
if entries[1].FullPath != entry2.FullPath {
|
||||
t.Errorf("find wrong entry 2: %v", entries[1].FullPath)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
22
weed/filer2/permission.go
Normal file
22
weed/filer2/permission.go
Normal file
|
@ -0,0 +1,22 @@
|
|||
package filer2
|
||||
|
||||
func hasWritePermission(dir *Entry, entry *Entry) bool {
|
||||
|
||||
if dir == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if dir.Uid == entry.Uid && dir.Mode&0200 > 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
if dir.Gid == entry.Gid && dir.Mode&0020 > 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
if dir.Mode&0002 > 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
Loading…
Reference in a new issue