assign new file id even on filer PUT operations

This commit is contained in:
Chris Lu 2019-01-02 12:58:26 -08:00
parent d67f7ddfaf
commit 28a41fda3e

View file

@ -31,33 +31,6 @@ type FilerPostResult struct {
Url string `json:"url,omitempty"`
}
func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) {
var entry *filer2.Entry
entry, err = fs.filer.FindEntry(filer2.FullPath(path))
if err == filer2.ErrNotFound {
return "", "", nil
}
if err != nil {
glog.V(0).Infoln("failing to find path in filer store", path, err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
if len(entry.Chunks) == 0 {
glog.V(1).Infof("empty entry: %s", path)
w.WriteHeader(http.StatusNoContent)
} else {
fileId = entry.Chunks[0].FileId
urlLocation, err = fs.filer.MasterClient.LookupFileId(fileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err is %s", fileId, err.Error())
w.WriteHeader(http.StatusNotFound)
}
}
return
}
func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) {
ar := &operation.VolumeAssignRequest{
Count: 1,
@ -109,11 +82,10 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
fileId, urlLocation, err := fs.queryFileInfoByPath(w, r, r.URL.Path)
if err == nil && fileId == "" {
fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
}
fileId, urlLocation, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
if err != nil || fileId == "" || urlLocation == "" {
glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
return
}
@ -186,10 +158,10 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
// update metadata in filer store
glog.V(4).Infoln("saving", path, "=>", fileId)
existingEntry, err := fs.filer.FindEntry(filer2.FullPath(path))
crTime := time.Now()
if err == nil && existingEntry != nil {
// glog.V(4).Infof("existing %s => %+v", path, existingEntry)
if existingEntry.IsDirectory() {
path += "/" + ret.Name
} else {
@ -215,6 +187,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ETag: etag,
}},
}
// glog.V(4).Infof("saving %s => %+v", path, entry)
if db_err := fs.filer.CreateEntry(entry); db_err != nil {
fs.filer.DeleteFileByFileId(fileId)
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)