mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
filer read write all via locations from MasterClient
This commit is contained in:
parent
1d779389cb
commit
888eb2abb5
|
@ -206,9 +206,17 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
|
||||||
|
|
||||||
func (f *Filer) deleteChunks(chunks []*filer_pb.FileChunk) {
|
func (f *Filer) deleteChunks(chunks []*filer_pb.FileChunk) {
|
||||||
for _, chunk := range chunks {
|
for _, chunk := range chunks {
|
||||||
if err := operation.DeleteFile(f.GetMaster(), chunk.FileId, ""); err != nil {
|
f.DeleteFileByFileId(chunk.FileId)
|
||||||
glog.V(0).Infof("deleting file %s: %v", chunk.FileId, err)
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *Filer) DeleteFileByFileId(fileId string) {
|
||||||
|
fileUrlOnVolume, err := f.MasterClient.LookupFileId(fileId)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(0).Infof("can not find file %s: %v", fileId, err)
|
||||||
|
}
|
||||||
|
if err := operation.DeleteFromVolumeServer(fileUrlOnVolume, ""); err != nil {
|
||||||
|
glog.V(0).Infof("deleting file %s: %v", fileId, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,14 @@ type DeleteResult struct {
|
||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func DeleteFromVolumeServer(fileUrlOnVolume string, jwt security.EncodedJwt) error {
|
||||||
|
err = util.Delete(fileUrlOnVolume, jwt)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to delete %s:%v", fileUrlOnVolume, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error {
|
func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error {
|
||||||
fileUrl, err := LookupFileId(master, fileId)
|
fileUrl, err := LookupFileId(master, fileId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -176,11 +176,11 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
|
||||||
if err = fs.filer.UpdateEntry(newEntry); err == nil {
|
if err = fs.filer.UpdateEntry(newEntry); err == nil {
|
||||||
for _, garbage := range unusedChunks {
|
for _, garbage := range unusedChunks {
|
||||||
glog.V(0).Infof("deleting %s old chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size))
|
glog.V(0).Infof("deleting %s old chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size))
|
||||||
operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId))
|
fs.filer.DeleteFileByFileId(garbage.FileId)
|
||||||
}
|
}
|
||||||
for _, garbage := range garbages {
|
for _, garbage := range garbages {
|
||||||
glog.V(0).Infof("deleting %s garbage chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size))
|
glog.V(0).Infof("deleting %s garbage chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size))
|
||||||
operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId))
|
fs.filer.DeleteFileByFileId(garbage.FileId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,6 @@ import (
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/filer2"
|
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
"mime"
|
"mime"
|
||||||
"mime/multipart"
|
"mime/multipart"
|
||||||
|
@ -63,7 +62,7 @@ func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request,
|
||||||
|
|
||||||
fileId := entry.Chunks[0].FileId
|
fileId := entry.Chunks[0].FileId
|
||||||
|
|
||||||
urlString, err := operation.LookupFileId(fs.filer.GetMaster(), fileId)
|
urlString, err := fs.filer.MasterClient.LookupFileId(fileId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err)
|
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err)
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
@ -223,7 +222,7 @@ func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int
|
||||||
|
|
||||||
for _, chunkView := range chunkViews {
|
for _, chunkView := range chunkViews {
|
||||||
|
|
||||||
urlString, err := operation.LookupFileId(fs.filer.GetMaster(), chunkView.FileId)
|
urlString, err := fs.filer.MasterClient.LookupFileId(chunkView.FileId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
|
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -49,7 +49,7 @@ func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Reques
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
} else {
|
} else {
|
||||||
fileId = entry.Chunks[0].FileId
|
fileId = entry.Chunks[0].FileId
|
||||||
urlLocation, err = operation.LookupFileId(fs.filer.GetMaster(), fileId)
|
urlLocation, err = fs.filer.MasterClient.LookupFileId(fileId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(1).Infof("operation LookupFileId %s failed, err is %s", fileId, err.Error())
|
glog.V(1).Infof("operation LookupFileId %s failed, err is %s", fileId, err.Error())
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
@ -176,7 +176,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
if ret.Name != "" {
|
if ret.Name != "" {
|
||||||
path += ret.Name
|
path += ret.Name
|
||||||
} else {
|
} else {
|
||||||
operation.DeleteFile(fs.filer.GetMaster(), fileId, fs.jwt(fileId)) //clean up
|
fs.filer.DeleteFileByFileId(fileId)
|
||||||
glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
|
glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
|
||||||
writeJsonError(w, r, http.StatusInternalServerError,
|
writeJsonError(w, r, http.StatusInternalServerError,
|
||||||
errors.New("Can not to write to folder "+path+" without a file name"))
|
errors.New("Can not to write to folder "+path+" without a file name"))
|
||||||
|
@ -205,7 +205,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}},
|
}},
|
||||||
}
|
}
|
||||||
if db_err := fs.filer.CreateEntry(entry); db_err != nil {
|
if db_err := fs.filer.CreateEntry(entry); db_err != nil {
|
||||||
operation.DeleteFile(fs.filer.GetMaster(), fileId, fs.jwt(fileId)) //clean up
|
fs.filer.DeleteFileByFileId(fileId)
|
||||||
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
|
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, db_err)
|
writeJsonError(w, r, http.StatusInternalServerError, db_err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -2,6 +2,12 @@ package wdclient
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
"strings"
|
||||||
|
"math/rand"
|
||||||
|
"errors"
|
||||||
|
"strconv"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Location struct {
|
type Location struct {
|
||||||
|
@ -14,6 +20,33 @@ type VidMap struct {
|
||||||
vid2Locations map[uint32][]Location
|
vid2Locations map[uint32][]Location
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (vc *VidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) {
|
||||||
|
id, err := strconv.Atoi(vid)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(1).Infof("Unknown volume id %s", vid)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
locations := vc.GetLocations(uint32(id))
|
||||||
|
if len(locations) == 0 {
|
||||||
|
return "", fmt.Errorf("volume %d not found", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
return locations[rand.Intn(len(locations))].Url, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vc *VidMap) LookupFileId(fileId string) (fullUrl string, err error) {
|
||||||
|
parts := strings.Split(fileId, ",")
|
||||||
|
if len(parts) != 2 {
|
||||||
|
return "", errors.New("Invalid fileId " + fileId)
|
||||||
|
}
|
||||||
|
serverUrl, lookupError := LookupVolumeServerUrl(parts[0])
|
||||||
|
if lookupError != nil {
|
||||||
|
return "", lookupError
|
||||||
|
}
|
||||||
|
return "http://" + serverUrl + "/" + fileId, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (vc *VidMap) GetLocations(vid uint32) (locations []Location) {
|
func (vc *VidMap) GetLocations(vid uint32) (locations []Location) {
|
||||||
vc.RLock()
|
vc.RLock()
|
||||||
defer vc.RUnlock()
|
defer vc.RUnlock()
|
||||||
|
|
Loading…
Reference in a new issue