delegating old chunk purging to CreateEntry

1. filer add file user id and group id
2. auto-chunking set the file name correctly
3. delegating old chunk purging to CreateEntry
This commit is contained in:
Chris Lu 2018-07-21 17:47:59 -07:00
parent 852af28f91
commit 702d7ac424
2 changed files with 22 additions and 27 deletions

View file

@ -15,6 +15,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"os"
)
var (
OS_UID = uint32(os.Getuid())
OS_GID = uint32(os.Getgid())
) )
type FilerPostResult struct { type FilerPostResult struct {
@ -104,16 +110,16 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
/* /*
var path string var path string
if strings.HasPrefix(r.Header.Get("Content-Type"), "multipart/form-data; boundary=") { if strings.HasPrefix(r.Header.Get("Content-Type"), "multipart/form-data; boundary=") {
path, err = fs.multipartUploadAnalyzer(w, r, replication, collection, dataCenter) path, err = fs.multipartUploadAnalyzer(w, r, replication, collection, dataCenter)
} else { } else {
path, err = fs.monolithicUploadAnalyzer(w, r, replication, collection, dataCenter) path, err = fs.monolithicUploadAnalyzer(w, r, replication, collection, dataCenter)
} }
*/ */
fileId, urlLocation, err := fs.queryFileInfoByPath(w, r, r.URL.Path) fileId, urlLocation, err := fs.queryFileInfoByPath(w, r, r.URL.Path)
if fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, r.URL.Path); err == nil && fileId == "" { if err == nil && fileId == "" {
fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter) fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
} }
if err != nil || fileId == "" || urlLocation == "" { if err != nil || fileId == "" || urlLocation == "" {
@ -184,16 +190,6 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
} }
// also delete the old fid unless PUT operation
if r.Method != "PUT" {
if entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil {
oldFid := entry.Chunks[0].FileId
operation.DeleteFile(fs.filer.GetMaster(), oldFid, fs.jwt(oldFid))
} else if err != nil && err != filer2.ErrNotFound {
glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
}
}
glog.V(4).Infoln("saving", path, "=>", fileId) glog.V(4).Infoln("saving", path, "=>", fileId)
entry := &filer2.Entry{ entry := &filer2.Entry{
FullPath: filer2.FullPath(path), FullPath: filer2.FullPath(path),
@ -201,13 +197,15 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
Mtime: time.Now(), Mtime: time.Now(),
Crtime: time.Now(), Crtime: time.Now(),
Mode: 0660, Mode: 0660,
Uid: OS_UID,
Gid: OS_GID,
Replication: replication, Replication: replication,
Collection: collection, Collection: collection,
TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)), TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)),
}, },
Chunks: []*filer_pb.FileChunk{{ Chunks: []*filer_pb.FileChunk{{
FileId: fileId, FileId: fileId,
Size: uint64(r.ContentLength), Size: uint64(ret.Size),
Mtime: time.Now().UnixNano(), Mtime: time.Now().UnixNano(),
}}, }},
} }

View file

@ -7,6 +7,7 @@ import (
"net/http" "net/http"
"path" "path"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
@ -143,15 +144,9 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
} }
path := r.URL.Path path := r.URL.Path
// also delete the old fid unless PUT operation if strings.HasSuffix(path, "/") {
if r.Method != "PUT" { if fileName != "" {
if entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil { path += fileName
for _, chunk := range entry.Chunks {
oldFid := chunk.FileId
operation.DeleteFile(fs.filer.GetMaster(), oldFid, fs.jwt(oldFid))
}
} else if err != nil {
glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
} }
} }
@ -162,6 +157,8 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
Mtime: time.Now(), Mtime: time.Now(),
Crtime: time.Now(), Crtime: time.Now(),
Mode: 0660, Mode: 0660,
Uid: OS_UID,
Gid: OS_GID,
Replication: replication, Replication: replication,
Collection: collection, Collection: collection,
TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)), TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)),