mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge pull request #2761 from zhuqunzhou/master
filer: support uploading file without needEnsureParentDir
This commit is contained in:
commit
d2acde2a61
|
@ -151,7 +151,7 @@ func (f *Filer) RollbackTransaction(ctx context.Context) error {
|
||||||
return f.Store.RollbackTransaction(ctx)
|
return f.Store.RollbackTransaction(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error {
|
func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32, needEnsureParentDir bool) error {
|
||||||
|
|
||||||
if string(entry.FullPath) == "/" {
|
if string(entry.FullPath) == "/" {
|
||||||
return nil
|
return nil
|
||||||
|
@ -169,9 +169,11 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
|
||||||
|
|
||||||
if oldEntry == nil {
|
if oldEntry == nil {
|
||||||
|
|
||||||
dirParts := strings.Split(string(entry.FullPath), "/")
|
if needEnsureParentDir {
|
||||||
if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
|
dirParts := strings.Split(string(entry.FullPath), "/")
|
||||||
return err
|
if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
|
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
|
||||||
|
|
|
@ -43,7 +43,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
|
||||||
entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset))
|
entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset))
|
||||||
|
|
||||||
// update the entry
|
// update the entry
|
||||||
err = f.CreateEntry(context.Background(), entry, false, false, nil)
|
err = f.CreateEntry(context.Background(), entry, false, false, nil, true)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ func TestCreateAndFind(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
|
if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, true); err != nil {
|
||||||
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ func TestCreateAndFind(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
|
if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, true); err != nil {
|
||||||
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ func TestCreateAndFind(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
|
if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, true); err != nil {
|
||||||
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ func TestCreateAndFind(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
|
if err := testFiler.CreateEntry(ctx, entry1, false, false, nil,true); err != nil {
|
||||||
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,7 +148,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
|
||||||
newEntry := filer.FromPbEntry(req.Directory, req.Entry)
|
newEntry := filer.FromPbEntry(req.Directory, req.Entry)
|
||||||
newEntry.Chunks = chunks
|
newEntry.Chunks = chunks
|
||||||
|
|
||||||
createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures)
|
createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures, true)
|
||||||
|
|
||||||
if createErr == nil {
|
if createErr == nil {
|
||||||
fs.filer.DeleteChunks(garbage)
|
fs.filer.DeleteChunks(garbage)
|
||||||
|
@ -271,7 +271,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
|
||||||
glog.V(0).Infof("MaybeManifestize: %v", err)
|
glog.V(0).Infof("MaybeManifestize: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil)
|
err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil, true)
|
||||||
|
|
||||||
return &filer_pb.AppendToEntryResponse{}, err
|
return &filer_pb.AppendToEntryResponse{}, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -173,7 +173,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.Seawee
|
||||||
Remote: entry.Remote,
|
Remote: entry.Remote,
|
||||||
Quota: entry.Quota,
|
Quota: entry.Quota,
|
||||||
}
|
}
|
||||||
if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures); createErr != nil {
|
if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures, true); createErr != nil {
|
||||||
return createErr
|
return createErr
|
||||||
}
|
}
|
||||||
if stream != nil {
|
if stream != nil {
|
||||||
|
|
|
@ -43,7 +43,7 @@ func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil {
|
if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, true); dbErr != nil {
|
||||||
glog.V(0).Infof("failing to update %s tagging : %v", path, dbErr)
|
glog.V(0).Infof("failing to update %s tagging : %v", path, dbErr)
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, err)
|
writeJsonError(w, r, http.StatusInternalServerError, err)
|
||||||
return
|
return
|
||||||
|
@ -107,7 +107,7 @@ func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Reque
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil {
|
if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, true); dbErr != nil {
|
||||||
glog.V(0).Infof("failing to delete %s tagging : %v", path, dbErr)
|
glog.V(0).Infof("failing to delete %s tagging : %v", path, dbErr)
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, err)
|
writeJsonError(w, r, http.StatusInternalServerError, err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -130,6 +130,10 @@ func isAppend(r *http.Request) bool {
|
||||||
return r.URL.Query().Get("op") == "append"
|
return r.URL.Query().Get("op") == "append"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func needEnsureParentDirEntry(r *http.Request) bool {
|
||||||
|
return r.URL.Query().Get("ensureParentDir") != "false"
|
||||||
|
}
|
||||||
|
|
||||||
func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) {
|
func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) {
|
||||||
|
|
||||||
// detect file mode
|
// detect file mode
|
||||||
|
@ -243,7 +247,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
|
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, needEnsureParentDirEntry(r)); dbErr != nil {
|
||||||
replyerr = dbErr
|
replyerr = dbErr
|
||||||
filerResult.Error = dbErr.Error()
|
filerResult.Error = dbErr.Error()
|
||||||
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
|
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
|
||||||
|
@ -320,7 +324,7 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http
|
||||||
Name: util.FullPath(path).Name(),
|
Name: util.FullPath(path).Name(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
|
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, true); dbErr != nil {
|
||||||
replyerr = dbErr
|
replyerr = dbErr
|
||||||
filerResult.Error = dbErr.Error()
|
filerResult.Error = dbErr.Error()
|
||||||
glog.V(0).Infof("failing to create dir %s on filer server : %v", path, dbErr)
|
glog.V(0).Infof("failing to create dir %s on filer server : %v", path, dbErr)
|
||||||
|
|
|
@ -93,7 +93,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
|
||||||
Size: int64(pu.OriginalDataSize),
|
Size: int64(pu.OriginalDataSize),
|
||||||
}
|
}
|
||||||
|
|
||||||
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
|
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, true); dbErr != nil {
|
||||||
fs.filer.DeleteChunks(entry.Chunks)
|
fs.filer.DeleteChunks(entry.Chunks)
|
||||||
err = dbErr
|
err = dbErr
|
||||||
filerResult.Error = dbErr.Error()
|
filerResult.Error = dbErr.Error()
|
||||||
|
|
Loading…
Reference in a new issue