refactor: prepare for snapshotting

This commit is contained in:
Chris Lu 2019-04-16 09:55:37 -07:00
parent af49aea0c6
commit 338e6d60a5
5 changed files with 9 additions and 8 deletions

View file

@ -246,7 +246,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecurs
} }
if shouldDeleteChunks { if shouldDeleteChunks {
f.DeleteChunks(entry.Chunks) f.DeleteChunks(p, entry.Chunks)
} }
if p == "/" { if p == "/" {

View file

@ -51,7 +51,7 @@ func (f *Filer) loopProcessingDeletion() {
} }
} }
func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { func (f *Filer) DeleteChunks(fullpath FullPath, chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks { for _, chunk := range chunks {
f.fileIdDeletionChan <- chunk.FileId f.fileIdDeletionChan <- chunk.FileId
} }
@ -67,7 +67,7 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
return return
} }
if newEntry == nil { if newEntry == nil {
f.DeleteChunks(oldEntry.Chunks) f.DeleteChunks(oldEntry.FullPath, oldEntry.Chunks)
} }
var toDelete []*filer_pb.FileChunk var toDelete []*filer_pb.FileChunk
@ -84,5 +84,5 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
toDelete = append(toDelete, oldChunk) toDelete = append(toDelete, oldChunk)
} }
} }
f.DeleteChunks(toDelete) f.DeleteChunks(oldEntry.FullPath, toDelete)
} }

View file

@ -114,7 +114,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
fullpath := filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))) fullpath := filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name)))
chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
fs.filer.DeleteChunks(garbages) fs.filer.DeleteChunks(fullpath, garbages)
if req.Entry.Attributes == nil { if req.Entry.Attributes == nil {
return nil, fmt.Errorf("can not create entry with empty attributes") return nil, fmt.Errorf("can not create entry with empty attributes")
@ -175,8 +175,8 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
} }
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil { if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
fs.filer.DeleteChunks(unusedChunks) fs.filer.DeleteChunks(entry.FullPath, unusedChunks)
fs.filer.DeleteChunks(garbages) fs.filer.DeleteChunks(entry.FullPath, garbages)
} }
fs.filer.NotifyUpdateEvent(entry, newEntry, true) fs.filer.NotifyUpdateEvent(entry, newEntry, true)

View file

@ -206,7 +206,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
// glog.V(4).Infof("saving %s => %+v", path, entry) // glog.V(4).Infof("saving %s => %+v", path, entry)
if db_err := fs.filer.CreateEntry(ctx, entry); db_err != nil { if db_err := fs.filer.CreateEntry(ctx, entry); db_err != nil {
fs.filer.DeleteFileByFileId(fileId) fs.filer.DeleteChunks(entry.FullPath, entry.Chunks)
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
writeJsonError(w, r, http.StatusInternalServerError, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err)
return return

View file

@ -170,6 +170,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
Chunks: fileChunks, Chunks: fileChunks,
} }
if db_err := fs.filer.CreateEntry(ctx, entry); db_err != nil { if db_err := fs.filer.CreateEntry(ctx, entry); db_err != nil {
fs.filer.DeleteChunks(entry.FullPath, entry.Chunks)
replyerr = db_err replyerr = db_err
filerResult.Error = db_err.Error() filerResult.Error = db_err.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)