passing full path when assign volume locations

This commit is contained in:
Chris Lu 2020-10-25 15:46:29 -07:00
parent f375b93aef
commit e219c57849
7 changed files with 16 additions and 17 deletions

View file

@ -219,7 +219,7 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi
fileCopyTaskChan <- FileCopyTask{ fileCopyTaskChan <- FileCopyTask{
sourceLocation: fileOrDir, sourceLocation: fileOrDir,
destinationUrlPath: destPath, destinationUrlPath: destPath+fi.Name(),
fileSize: fi.Size(), fileSize: fi.Size(),
fileMode: fi.Mode(), fileMode: fi.Mode(),
uid: uid, uid: uid,
@ -405,7 +405,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
Replication: *worker.options.replication, Replication: *worker.options.replication,
Collection: *worker.options.collection, Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec, TtlSec: worker.options.ttlSec,
Path: task.destinationUrlPath, Path: task.destinationUrlPath+fileName,
} }
assignResult, assignError = client.AssignVolume(context.Background(), request) assignResult, assignError = client.AssignVolume(context.Background(), request)

View file

@ -110,10 +110,8 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
go func() { go func() {
defer pages.writeWaitGroup.Done() defer pages.writeWaitGroup.Done()
dir, _ := pages.f.fullpath().DirAndName()
reader = io.LimitReader(reader, size) reader = io.LimitReader(reader, size)
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset) chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
if err != nil { if err != nil {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
pages.chunkSaveErrChan <- err pages.chunkSaveErrChan <- err

View file

@ -256,7 +256,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks) manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks) chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks)
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.dir.FullPath()), chunks) chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
if manifestErr != nil { if manifestErr != nil {
// not good, but should be ok // not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", manifestErr) glog.V(0).Infof("MaybeManifestize: %v", manifestErr)

View file

@ -10,9 +10,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
) )
func (wfs *WFS) saveDataAsChunk(dir string) filer.SaveDataAsChunkFunctionType { func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) { return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
var fileId, host string var fileId, host string
@ -26,7 +27,7 @@ func (wfs *WFS) saveDataAsChunk(dir string) filer.SaveDataAsChunkFunctionType {
Collection: wfs.option.Collection, Collection: wfs.option.Collection,
TtlSec: wfs.option.TtlSec, TtlSec: wfs.option.TtlSec,
DataCenter: wfs.option.DataCenter, DataCenter: wfs.option.DataCenter,
Path: dir, Path: string(fullPath),
} }
resp, err := client.AssignVolume(context.Background(), request) resp, err := client.AssignVolume(context.Background(), request)

View file

@ -15,7 +15,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
) )
func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, dir string) (replicatedChunks []*filer_pb.FileChunk, err error) { func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path string) (replicatedChunks []*filer_pb.FileChunk, err error) {
if len(sourceChunks) == 0 { if len(sourceChunks) == 0 {
return return
} }
@ -27,7 +27,7 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, dir str
wg.Add(1) wg.Add(1)
go func(chunk *filer_pb.FileChunk, index int) { go func(chunk *filer_pb.FileChunk, index int) {
defer wg.Done() defer wg.Done()
replicatedChunk, e := fs.replicateOneChunk(chunk, dir) replicatedChunk, e := fs.replicateOneChunk(chunk, path)
if e != nil { if e != nil {
err = e err = e
} }
@ -39,9 +39,9 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, dir str
return return
} }
func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, dir string) (*filer_pb.FileChunk, error) { func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path string) (*filer_pb.FileChunk, error) {
fileId, err := fs.fetchAndWrite(sourceChunk, dir) fileId, err := fs.fetchAndWrite(sourceChunk, path)
if err != nil { if err != nil {
return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err) return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err)
} }
@ -58,7 +58,7 @@ func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, dir stri
}, nil }, nil
} }
func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) (fileId string, err error) { func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) (fileId string, err error) {
filename, header, resp, err := fs.filerSource.ReadPart(sourceChunk.GetFileIdString()) filename, header, resp, err := fs.filerSource.ReadPart(sourceChunk.GetFileIdString())
if err != nil { if err != nil {
@ -77,7 +77,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string)
Collection: fs.collection, Collection: fs.collection,
TtlSec: fs.ttlSec, TtlSec: fs.ttlSec,
DataCenter: fs.dataCenter, DataCenter: fs.dataCenter,
Path: dir, Path: path,
} }
resp, err := client.AssignVolume(context.Background(), request) resp, err := client.AssignVolume(context.Background(), request)

View file

@ -96,7 +96,7 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
} }
} }
replicatedChunks, err := fs.replicateChunks(entry.Chunks, dir) replicatedChunks, err := fs.replicateChunks(entry.Chunks, key)
if err != nil { if err != nil {
// only warning here since the source chunk may have been deleted already // only warning here since the source chunk may have been deleted already
@ -180,7 +180,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
} }
// replicate the chunks that are new in the source // replicate the chunks that are new in the source
replicatedChunks, err := fs.replicateChunks(newChunks, newParentPath) replicatedChunks, err := fs.replicateChunks(newChunks, key)
if err != nil { if err != nil {
return true, fmt.Errorf("replicte %s chunks error: %v", key, err) return true, fmt.Errorf("replicte %s chunks error: %v", key, err)
} }

View file

@ -387,7 +387,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
Count: 1, Count: 1,
Replication: "", Replication: "",
Collection: f.fs.option.Collection, Collection: f.fs.option.Collection,
Path: dir, Path: f.name,
} }
resp, err := client.AssignVolume(ctx, request) resp, err := client.AssignVolume(ctx, request)