properly working filer

This commit is contained in:
Chris Lu 2018-05-13 23:56:16 -07:00
parent f01d5616b3
commit c5cf9bd290
11 changed files with 203 additions and 186 deletions

View file

@ -91,14 +91,14 @@ func runCopy(cmd *Command, args []string) bool {
func doEachCopy(fileOrDir string, host string, path string) bool { func doEachCopy(fileOrDir string, host string, path string) bool {
f, err := os.Open(fileOrDir) f, err := os.Open(fileOrDir)
if err != nil { if err != nil {
fmt.Printf("Failed to open file %s: %v", fileOrDir, err) fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err)
return false return false
} }
defer f.Close() defer f.Close()
fi, err := f.Stat() fi, err := f.Stat()
if err != nil { if err != nil {
fmt.Printf("Failed to get stat for file %s: %v", fileOrDir, err) fmt.Printf("Failed to get stat for file %s: %v\n", fileOrDir, err)
return false return false
} }
@ -122,22 +122,22 @@ func doEachCopy(fileOrDir string, host string, path string) bool {
parts, err := operation.NewFileParts([]string{fileOrDir}) parts, err := operation.NewFileParts([]string{fileOrDir})
if err != nil { if err != nil {
fmt.Printf("Failed to read file %s: %v", fileOrDir, err) fmt.Printf("Failed to read file %s: %v\n", fileOrDir, err)
} }
results, err := operation.SubmitFiles(*copy.master, parts, results, err := operation.SubmitFiles(*copy.master, parts,
*copy.replication, *copy.collection, "", *copy.replication, *copy.collection, "",
*copy.ttl, *copy.maxMB, copy.secret) *copy.ttl, *copy.maxMB, copy.secret)
if err != nil { if err != nil {
fmt.Printf("Failed to submit file %s: %v", fileOrDir, err) fmt.Printf("Failed to submit file %s: %v\n", fileOrDir, err)
} }
if strings.HasSuffix(path, "/") { if strings.HasSuffix(path, "/") {
path = path + fi.Name() path = path + fi.Name()
} }
if err = filer_operation.RegisterFile(host, path, results[0].Fid, copy.secret); err != nil { if err = filer_operation.RegisterFile(host, path, results[0].Fid, parts[0].FileSize, copy.secret); err != nil {
fmt.Printf("Failed to register file %s on %s: %v", fileOrDir, host, err) fmt.Printf("Failed to register file %s on %s: %v\n", fileOrDir, host, err)
return false return false
} }

View file

@ -114,11 +114,24 @@ func (f *Filer) FindEntry(p FullPath) (found bool, entry *Entry, err error) {
} }
func (f *Filer) DeleteEntry(p FullPath) (fileEntry *Entry, err error) { func (f *Filer) DeleteEntry(p FullPath) (fileEntry *Entry, err error) {
found, entry, err := f.FindEntry(p)
if err != nil || !found {
return nil, err
}
if entry.IsDirectory() {
entries, err := f.ListDirectoryEntries(p, "", false, 1)
if err != nil {
return nil, fmt.Errorf("list folder %s: %v", p, err)
}
if len(entries) > 0 {
return nil, fmt.Errorf("folder %s is not empty", p)
}
}
return f.store.DeleteEntry(p) return f.store.DeleteEntry(p)
} }
func (f *Filer) ListDirectoryEntries(p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { func (f *Filer) ListDirectoryEntries(p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
if strings.HasSuffix(string(p), "/") { if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0:len(p)-1] p = p[0:len(p)-1]
} }
return f.store.ListDirectoryEntries(p, startFileName, inclusive, limit) return f.store.ListDirectoryEntries(p, startFileName, inclusive, limit)

View file

@ -21,6 +21,11 @@ func (fp FullPath) DirAndName() (string, string) {
return dir[:len(dir)-1], name return dir[:len(dir)-1], name
} }
func (fp FullPath) Name() (string) {
_, name := filepath.Split(string(fp))
return name
}
type Attr struct { type Attr struct {
Mtime time.Time // time of last modification Mtime time.Time // time of last modification
Crtime time.Time // time of creation (OS X only) Crtime time.Time // time of creation (OS X only)
@ -29,6 +34,10 @@ type Attr struct {
Gid uint32 // group gid Gid uint32 // group gid
} }
func (attr Attr) IsDirectory() (bool) {
return attr.Mode & os.ModeDir > 0
}
type Entry struct { type Entry struct {
FullPath FullPath

View file

@ -108,6 +108,14 @@ func TestCreateFileAndList(t *testing.T) {
return return
} }
// checking root directory
entries, _ = filer.ListDirectoryEntries(filer2.FullPath("/"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// add file3
file3Path := filer2.FullPath("/home/chris/this/is/file3.jpg") file3Path := filer2.FullPath("/home/chris/this/is/file3.jpg")
entry3 := &filer2.Entry{ entry3 := &filer2.Entry{
FullPath: file3Path, FullPath: file3Path,

View file

@ -6,6 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"strconv"
) )
type SubmitResult struct { type SubmitResult struct {
@ -16,13 +17,14 @@ type SubmitResult struct {
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
func RegisterFile(filer string, path string, fileId string, secret security.Secret) error { func RegisterFile(filer string, path string, fileId string, fileSize int64, secret security.Secret) error {
// TODO: jwt need to be used // TODO: jwt need to be used
_ = security.GenJwt(secret, fileId) _ = security.GenJwt(secret, fileId)
values := make(url.Values) values := make(url.Values)
values.Add("path", path) values.Add("path", path)
values.Add("fileId", fileId) values.Add("fileId", fileId)
values.Add("fileSize", strconv.FormatInt(fileSize, 10))
_, err := util.Post("http://"+filer+"/admin/register", values) _, err := util.Post("http://"+filer+"/admin/register", values)
if err != nil { if err != nil {
return fmt.Errorf("Failed to register path %s on filer %s to file id %s : %v", path, filer, fileId, err) return fmt.Errorf("Failed to register path %s on filer %s to file id %s : %v", path, filer, fileId, err)

View file

@ -8,11 +8,14 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filer2"
"path/filepath"
"github.com/chrislusf/seaweedfs/weed/glog"
) )
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) { func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
found, fileId, err := fs.filer.LookupDirectoryEntry(req.Directory, req.Name) found, entry, err := fs.filer.FindEntry(filer2.FullPath(filepath.Join(req.Directory, req.Name)))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -20,10 +23,15 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L
return nil, fmt.Errorf("%s not found under %s", req.Name, req.Directory) return nil, fmt.Errorf("%s not found under %s", req.Name, req.Directory)
} }
var fileId string
if !entry.IsDirectory() && len(entry.Chunks) > 0 {
fileId = string(entry.Chunks[0].Fid)
}
return &filer_pb.LookupDirectoryEntryResponse{ return &filer_pb.LookupDirectoryEntryResponse{
Entry: &filer_pb.Entry{ Entry: &filer_pb.Entry{
Name: req.Name, Name: req.Name,
IsDirectory: fileId == "", IsDirectory: entry.IsDirectory(),
FileId: fileId, FileId: fileId,
}, },
}, nil }, nil
@ -31,27 +39,21 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L
func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntriesRequest) (*filer_pb.ListEntriesResponse, error) { func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntriesRequest) (*filer_pb.ListEntriesResponse, error) {
directoryNames, err := fs.filer.ListDirectories(req.Directory) entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(req.Directory), "", false, 1000)
if err != nil {
return nil, err
}
files, err := fs.filer.ListFiles(req.Directory, "", 1000)
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp := &filer_pb.ListEntriesResponse{} resp := &filer_pb.ListEntriesResponse{}
for _, dir := range directoryNames { for _, entry := range entries {
var fileId string
if !entry.IsDirectory() && len(entry.Chunks) > 0 {
fileId = string(entry.Chunks[0].Fid)
}
resp.Entries = append(resp.Entries, &filer_pb.Entry{ resp.Entries = append(resp.Entries, &filer_pb.Entry{
Name: string(dir), Name: entry.Name(),
IsDirectory: true, IsDirectory: entry.IsDirectory(),
}) FileId: fileId,
}
for _, fileEntry := range files {
resp.Entries = append(resp.Entries, &filer_pb.Entry{
Name: fileEntry.Name,
IsDirectory: false,
FileId: string(fileEntry.Id),
}) })
} }
@ -97,12 +99,13 @@ func (fs *FilerServer) GetFileContent(ctx context.Context, req *filer_pb.GetFile
} }
func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) { func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
if req.IsDirectory { entry, err := fs.filer.DeleteEntry(filer2.FullPath(filepath.Join(req.Directory, req.Name)))
err = fs.filer.DeleteDirectory(req.Directory+req.Name, false) if err == nil {
} else { for _, chunk := range entry.Chunks {
fid, err := fs.filer.DeleteFile(req.Directory + req.Name) fid := string(chunk.Fid)
if err == nil && fid != "" { if err = operation.DeleteFile(fs.getMasterNode(), fid, fs.jwt(fid)); err != nil {
err = operation.DeleteFile(fs.getMasterNode(), fid, fs.jwt(fid)) glog.V(0).Infof("deleting file %s: %v", fid, err)
}
} }
} }
return nil, err return nil, err

View file

@ -9,21 +9,18 @@ import (
"sync" "sync"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/cassandra_store"
"github.com/chrislusf/seaweedfs/weed/filer/embedded_filer"
"github.com/chrislusf/seaweedfs/weed/filer/flat_namespace"
"github.com/chrislusf/seaweedfs/weed/filer/mysql_store" "github.com/chrislusf/seaweedfs/weed/filer/mysql_store"
"github.com/chrislusf/seaweedfs/weed/filer/postgres_store" "github.com/chrislusf/seaweedfs/weed/filer/postgres_store"
"github.com/chrislusf/seaweedfs/weed/filer/redis_store"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/filer2/memdb"
) )
type filerConf struct { type filerConf struct {
MysqlConf []mysql_store.MySqlConf `json:"mysql"` MysqlConf []mysql_store.MySqlConf `json:"mysql"`
mysql_store.ShardingConf mysql_store.ShardingConf
PostgresConf *postgres_store.PostgresConf `json:"postgres"` PostgresConf *postgres_store.PostgresConf `json:"postgres"`
} }
@ -52,7 +49,7 @@ type FilerServer struct {
redirectOnRead bool redirectOnRead bool
disableDirListing bool disableDirListing bool
secret security.Secret secret security.Secret
filer filer.Filer filer *filer2.Filer
maxMB int maxMB int
masterNodes *storage.MasterNodes masterNodes *storage.MasterNodes
} }
@ -86,28 +83,31 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, ip string, port int,
} }
if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 { if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 {
mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf, setting.IsSharding, setting.ShardCount) // mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf, setting.IsSharding, setting.ShardCount)
fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store) // fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store)
} else if setting.PostgresConf != nil { } else if setting.PostgresConf != nil {
fs.filer = postgres_store.NewPostgresStore(master, *setting.PostgresConf) // fs.filer = postgres_store.NewPostgresStore(master, *setting.PostgresConf)
} else if cassandra_server != "" { } else if cassandra_server != "" {
cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server) // cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
if err != nil { // if err != nil {
glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err) // glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err)
} // }
fs.filer = flat_namespace.NewFlatNamespaceFiler(master, cassandra_store) // fs.filer = flat_namespace.NewFlatNamespaceFiler(master, cassandra_store)
} else if redis_server != "" { } else if redis_server != "" {
redis_store := redis_store.NewRedisStore(redis_server, redis_password, redis_database) // redis_store := redis_store.NewRedisStore(redis_server, redis_password, redis_database)
fs.filer = flat_namespace.NewFlatNamespaceFiler(master, redis_store) // fs.filer = flat_namespace.NewFlatNamespaceFiler(master, redis_store)
} else { } else {
/*
if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil { if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil {
glog.Fatalf("Can not start filer in dir %s : %v", dir, err) glog.Fatalf("Can not start filer in dir %s : %v", dir, err)
return return
} }
*/
defaultMux.HandleFunc("/admin/mv", fs.moveHandler)
} }
fs.filer = filer2.NewFiler(master)
fs.filer.SetStore(memdb.NewMemDbStore())
defaultMux.HandleFunc("/admin/register", fs.registerHandler) defaultMux.HandleFunc("/admin/register", fs.registerHandler)
defaultMux.HandleFunc("/", fs.filerHandler) defaultMux.HandleFunc("/", fs.filerHandler)
if defaultMux != readonlyMux { if defaultMux != readonlyMux {

View file

@ -4,34 +4,30 @@ import (
"net/http" "net/http"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/filer2"
"strconv"
) )
/*
Move a folder or a file, with 4 Use cases:
mv fromDir toNewDir
mv fromDir toOldDir
mv fromFile toDir
mv fromFile toFile
Wildcard is not supported.
*/
func (fs *FilerServer) moveHandler(w http.ResponseWriter, r *http.Request) {
from := r.FormValue("from")
to := r.FormValue("to")
err := fs.filer.Move(from, to)
if err != nil {
glog.V(4).Infoln("moving", from, "->", to, err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err)
} else {
w.WriteHeader(http.StatusOK)
}
}
func (fs *FilerServer) registerHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) registerHandler(w http.ResponseWriter, r *http.Request) {
path := r.FormValue("path") path := r.FormValue("path")
fileId := r.FormValue("fileId") fileId := r.FormValue("fileId")
err := fs.filer.CreateFile(path, fileId) fileSize, err := strconv.ParseUint(r.FormValue("fileSize"), 10, 64)
if err != nil {
glog.V(4).Infof("register %s to %s parse fileSize %s: %v", fileId, path, r.FormValue("fileSize"), err)
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
entry := &filer2.Entry{
FullPath: filer2.FullPath(path),
Attr: filer2.Attr{
Mode: 0660,
},
Chunks: []filer2.FileChunk{{
Fid: filer2.FileId(fileId),
Size: fileSize,
}},
}
err = fs.filer.CreateEntry(entry)
if err != nil { if err != nil {
glog.V(4).Infof("register %s to %s error: %v", fileId, path, err) glog.V(4).Infof("register %s to %s error: %v", fileId, path, err)
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)

View file

@ -7,12 +7,11 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
ui "github.com/chrislusf/seaweedfs/weed/server/filer_ui" ui "github.com/chrislusf/seaweedfs/weed/server/filer_ui"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb" "github.com/chrislusf/seaweedfs/weed/filer2"
) )
// listDirectoryHandler lists directories and folers under a directory // listDirectoryHandler lists directories and folers under a directory
@ -20,56 +19,40 @@ import (
// sub directories are listed on the first page, when "lastFileName" // sub directories are listed on the first page, when "lastFileName"
// is empty. // is empty.
func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) {
if !strings.HasSuffix(r.URL.Path, "/") { path := r.URL.Path
return if strings.HasSuffix(path, "/") && len(path) > 1 {
path = path[:len(path)-1]
} }
limit, limit_err := strconv.Atoi(r.FormValue("limit")) limit, limit_err := strconv.Atoi(r.FormValue("limit"))
if limit_err != nil { if limit_err != nil {
limit = 100 limit = 100
} }
lastFileName := r.FormValue("lastFileName") lastFileName := r.FormValue("lastFileName")
files, err := fs.filer.ListFiles(r.URL.Path, lastFileName, limit)
if err == leveldb.ErrNotFound { entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(path), lastFileName, false, limit)
glog.V(0).Infof("Error %s", err)
if err != nil {
glog.V(0).Infof("listDirectory %s %s $d: %s", path, lastFileName, limit, err)
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return return
} }
directories, err2 := fs.filer.ListDirectories(r.URL.Path) shouldDisplayLoadMore := len(entries) == limit
if err2 == leveldb.ErrNotFound { if path == "/" {
glog.V(0).Infof("Error %s", err) path = ""
w.WriteHeader(http.StatusNotFound)
return
}
shouldDisplayLoadMore := len(files) > 0
lastFileName = ""
if len(files) > 0 {
lastFileName = files[len(files)-1].Name
files2, err3 := fs.filer.ListFiles(r.URL.Path, lastFileName, limit)
if err3 == leveldb.ErrNotFound {
glog.V(0).Infof("Error %s", err)
w.WriteHeader(http.StatusNotFound)
return
}
shouldDisplayLoadMore = len(files2) > 0
} }
args := struct { args := struct {
Path string Path string
Files interface{} Entries interface{}
Directories interface{}
Limit int Limit int
LastFileName string LastFileName string
ShouldDisplayLoadMore bool ShouldDisplayLoadMore bool
}{ }{
r.URL.Path, path,
files, entries,
directories,
limit, limit,
lastFileName, lastFileName,
shouldDisplayLoadMore, shouldDisplayLoadMore,
@ -83,7 +66,19 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
} }
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
if strings.HasSuffix(r.URL.Path, "/") { path := r.URL.Path
if strings.HasSuffix(path, "/") && len(path) > 1 {
path = path[:len(path)-1]
}
found, entry, err := fs.filer.FindEntry(filer2.FullPath(path))
if !found || err != nil {
glog.V(3).Infof("Not found %s: %v", path, err)
w.WriteHeader(http.StatusNotFound)
return
}
if entry.IsDirectory() {
if fs.disableDirListing { if fs.disableDirListing {
w.WriteHeader(http.StatusMethodNotAllowed) w.WriteHeader(http.StatusMethodNotAllowed)
return return
@ -92,13 +87,15 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return return
} }
fileId, err := fs.filer.FindFile(r.URL.Path) if len(entry.Chunks) == 0 {
if err == filer.ErrNotFound { glog.V(3).Infof("Empty %s: %v", path)
glog.V(3).Infoln("Not found in db", r.URL.Path) w.WriteHeader(http.StatusNoContent)
w.WriteHeader(http.StatusNotFound)
return return
} }
// FIXME pick the right fid
fileId := string(entry.Chunks[0].Fid)
urlLocation, err := operation.LookupFileId(fs.getMasterNode(), fileId) urlLocation, err := operation.LookupFileId(fs.getMasterNode(), fileId)
if err != nil { if err != nil {
glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error()) glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error())

View file

@ -22,6 +22,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/filer2"
) )
type FilerPostResult struct { type FilerPostResult struct {
@ -73,16 +74,19 @@ func makeFormData(filename, mimeType string, content io.Reader) (formData io.Rea
} }
func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) { func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) {
if fileId, err = fs.filer.FindFile(path); err != nil && err != filer.ErrNotFound { var found bool
var entry *filer2.Entry
if found, entry, err = fs.filer.FindEntry(filer2.FullPath(path)); err != nil {
glog.V(0).Infoln("failing to find path in filer store", path, err.Error()) glog.V(0).Infoln("failing to find path in filer store", path, err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
} else if fileId != "" && err == nil { } else if found {
fileId = string(entry.Chunks[0].Fid)
urlLocation, err = operation.LookupFileId(fs.getMasterNode(), fileId) urlLocation, err = operation.LookupFileId(fs.getMasterNode(), fileId)
if err != nil { if err != nil {
glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error()) glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error())
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
} }
} else if fileId == "" && err == filer.ErrNotFound { } else {
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
} }
return return
@ -313,7 +317,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
// also delete the old fid unless PUT operation // also delete the old fid unless PUT operation
if r.Method != "PUT" { if r.Method != "PUT" {
if oldFid, err := fs.filer.FindFile(path); err == nil { if found, entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil && found {
oldFid := string(entry.Chunks[0].Fid)
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
} else if err != nil && err != filer.ErrNotFound { } else if err != nil && err != filer.ErrNotFound {
glog.V(0).Infof("error %v occur when finding %s in filer store", err, path) glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
@ -321,7 +326,17 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
glog.V(4).Infoln("saving", path, "=>", fileId) glog.V(4).Infoln("saving", path, "=>", fileId)
if db_err := fs.filer.CreateFile(path, fileId); db_err != nil { entry := &filer2.Entry{
FullPath: filer2.FullPath(path),
Attr: filer2.Attr{
Mode: 0660,
},
Chunks: []filer2.FileChunk{{
Fid: filer2.FileId(fileId),
Size: uint64(r.ContentLength),
}},
}
if db_err := fs.filer.CreateEntry(entry); db_err != nil {
operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
writeJsonError(w, r, http.StatusInternalServerError, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err)
@ -400,13 +415,7 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
fileName = path.Base(fileName) fileName = path.Base(fileName)
} }
chunks := (int64(contentLength) / int64(chunkSize)) + 1 var fileChunks []filer2.FileChunk
cm := operation.ChunkManifest{
Name: fileName,
Size: 0, // don't know yet
Mime: "application/octet-stream",
Chunks: make([]*operation.ChunkInfo, 0, chunks),
}
totalBytesRead := int64(0) totalBytesRead := int64(0)
tmpBufferSize := int32(1024 * 1024) tmpBufferSize := int32(1024 * 1024)
@ -438,18 +447,18 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
} }
// upload the chunk to the volume server // upload the chunk to the volume server
chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(cm.Chunks.Len()+1), 10) chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(len(fileChunks)+1), 10)
uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId) uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId)
if uploadErr != nil { if uploadErr != nil {
return nil, uploadErr return nil, uploadErr
} }
// Save to chunk manifest structure // Save to chunk manifest structure
cm.Chunks = append(cm.Chunks, fileChunks = append(fileChunks,
&operation.ChunkInfo{ filer2.FileChunk{
Fid: filer2.FileId(fileId),
Offset: chunkOffset, Offset: chunkOffset,
Size: int64(chunkBufOffset), Size: uint64(chunkBufOffset),
Fid: fileId,
}, },
) )
@ -469,47 +478,30 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
} }
} }
cm.Size = totalBytesRead
manifestBuf, marshalErr := cm.Marshal()
if marshalErr != nil {
return nil, marshalErr
}
manifestStr := string(manifestBuf)
glog.V(4).Infoln("Generated chunk manifest: ", manifestStr)
manifestFileId, manifestUrlLocation, manifestAssignmentErr := fs.assignNewFileInfo(w, r, replication, collection)
if manifestAssignmentErr != nil {
return nil, manifestAssignmentErr
}
glog.V(4).Infoln("Manifest uploaded to:", manifestUrlLocation, "Fid:", manifestFileId)
filerResult.Fid = manifestFileId
u, _ := url.Parse(manifestUrlLocation)
q := u.Query()
q.Set("cm", "true")
u.RawQuery = q.Encode()
manifestUploadErr := fs.doUpload(u.String(), w, r, manifestBuf, fileName+"_manifest", "application/json", manifestFileId)
if manifestUploadErr != nil {
return nil, manifestUploadErr
}
path := r.URL.Path path := r.URL.Path
// also delete the old fid unless PUT operation // also delete the old fid unless PUT operation
if r.Method != "PUT" { if r.Method != "PUT" {
if oldFid, err := fs.filer.FindFile(path); err == nil { if found, entry, err := fs.filer.FindEntry(filer2.FullPath(path)); found && err == nil {
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) for _, chunk := range entry.Chunks {
} else if err != nil && err != filer.ErrNotFound { oldFid := string(chunk.Fid)
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
}
} else if err != nil {
glog.V(0).Infof("error %v occur when finding %s in filer store", err, path) glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
} }
} }
glog.V(4).Infoln("saving", path, "=>", manifestFileId) glog.V(4).Infoln("saving", path)
if db_err := fs.filer.CreateFile(path, manifestFileId); db_err != nil { entry := &filer2.Entry{
FullPath: filer2.FullPath(path),
Attr: filer2.Attr{
Mode: 0660,
},
Chunks: fileChunks,
}
if db_err := fs.filer.CreateEntry(entry); db_err != nil {
replyerr = db_err replyerr = db_err
filerResult.Error = db_err.Error() filerResult.Error = db_err.Error()
operation.DeleteFile(fs.getMasterNode(), manifestFileId, fs.jwt(manifestFileId)) //clean up
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
return return
} }
@ -532,23 +524,21 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht
} }
// curl -X DELETE http://localhost:8888/path/to // curl -X DELETE http://localhost:8888/path/to
// curl -X DELETE http://localhost:8888/path/to/?recursive=true
func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
var err error
var fid string entry, err := fs.filer.DeleteEntry(filer2.FullPath(r.URL.Path))
if strings.HasSuffix(r.URL.Path, "/") { if err != nil {
isRecursive := r.FormValue("recursive") == "true"
err = fs.filer.DeleteDirectory(r.URL.Path, isRecursive)
} else {
fid, err = fs.filer.DeleteFile(r.URL.Path)
if err == nil && fid != "" {
err = operation.DeleteFile(fs.getMasterNode(), fid, fs.jwt(fid))
}
}
if err == nil {
writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
} else {
glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error()) glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
return
} }
if entry != nil && !entry.IsDirectory() {
for _, chunk := range entry.Chunks {
oldFid := string(chunk.Fid)
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
}
}
writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
} }

View file

@ -26,22 +26,21 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
<div class="row"> <div class="row">
<ul> <ul>
{{$path := .Path }} {{$path := .Path }}
{{ range $dirs_index, $dir := .Directories }} {{ range $entry_index, $entry := .Entries }}
<li> <li>
<img src="https://www.w3.org/TR/WWWicn/folder.gif" width="20" height="23"> {{if $entry.IsDirectory}}
<a href={{ print $path $dir "/"}} > <img src="https://www.w3.org/TR/WWWicn/folder.gif" width="20" height="23">
{{ $dir }} <a href={{ print $path "/" $entry.Name "/"}} >
</a> {{ $entry.Name }}
</a>
{{else}}
<a href={{ print $path "/" $entry.Name }} >
{{ $entry.Name }}
</a>
{{end}}
</li> </li>
{{ end }} {{ end }}
{{ range $file_index, $file := .Files }}
<li>
<a href={{ print $path $file.Name}} >
{{ $file.Name }}
</a>
</li>
{{ end }}
</ul> </ul>
</div> </div>