mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
remove unused collection and replication from upload result
This commit is contained in:
parent
e3f40d538d
commit
689b4ecdcc
|
@ -583,7 +583,7 @@ func detectMimeType(f *os.File) string {
|
||||||
return mimeType
|
return mimeType
|
||||||
}
|
}
|
||||||
|
|
||||||
func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
|
func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
|
||||||
|
|
||||||
var fileId, host string
|
var fileId, host string
|
||||||
var auth security.EncodedJwt
|
var auth security.EncodedJwt
|
||||||
|
@ -611,7 +611,6 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off
|
||||||
}
|
}
|
||||||
|
|
||||||
fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
|
fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
|
||||||
collection, replication = resp.Collection, resp.Replication
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -621,7 +620,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}); flushErr != nil {
|
}); flushErr != nil {
|
||||||
return nil, collection, replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
|
return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
uploadOption := &operation.UploadOption{
|
uploadOption := &operation.UploadOption{
|
||||||
|
@ -635,10 +634,10 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off
|
||||||
}
|
}
|
||||||
uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
|
uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
|
||||||
if flushErr != nil {
|
if flushErr != nil {
|
||||||
return nil, collection, replication, fmt.Errorf("upload data: %v", flushErr)
|
return nil, fmt.Errorf("upload data: %v", flushErr)
|
||||||
}
|
}
|
||||||
if uploadResult.Error != "" {
|
if uploadResult.Error != "" {
|
||||||
return nil, collection, replication, fmt.Errorf("upload result: %v", uploadResult.Error)
|
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
|
||||||
}
|
}
|
||||||
return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil
|
return uploadResult.ToPbFileChunk(fileId, offset), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -258,7 +258,7 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0)
|
manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -269,4 +269,4 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error)
|
type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error)
|
||||||
|
|
|
@ -65,10 +65,6 @@ func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64)
|
||||||
return pages.uploadPipeline.MaybeReadDataAt(data, startOffset)
|
return pages.uploadPipeline.MaybeReadDataAt(data, startOffset)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pages *ChunkedDirtyPages) GetStorageOptions() (collection, replication string) {
|
|
||||||
return pages.collection, pages.replication
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) {
|
func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) {
|
||||||
|
|
||||||
mtime := time.Now().UnixNano()
|
mtime := time.Now().UnixNano()
|
||||||
|
@ -76,14 +72,13 @@ func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader
|
||||||
|
|
||||||
fileFullPath := pages.fh.FullPath()
|
fileFullPath := pages.fh.FullPath()
|
||||||
fileName := fileFullPath.Name()
|
fileName := fileFullPath.Name()
|
||||||
chunk, collection, replication, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset)
|
chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("%v saveToStorage [%d,%d): %v", fileFullPath, offset, offset+size, err)
|
glog.V(0).Infof("%v saveToStorage [%d,%d): %v", fileFullPath, offset, offset+size, err)
|
||||||
pages.lastErr = err
|
pages.lastErr = err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
chunk.Mtime = mtime
|
chunk.Mtime = mtime
|
||||||
pages.collection, pages.replication = collection, replication
|
|
||||||
pages.fh.AddChunks([]*filer_pb.FileChunk{chunk})
|
pages.fh.AddChunks([]*filer_pb.FileChunk{chunk})
|
||||||
pages.fh.entryViewCache = nil
|
pages.fh.entryViewCache = nil
|
||||||
glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size)
|
glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size)
|
||||||
|
|
|
@ -66,10 +66,6 @@ func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pw *PageWriter) GetStorageOptions() (collection, replication string) {
|
|
||||||
return pw.randomWriter.GetStorageOptions()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pw *PageWriter) LockForRead(startOffset, stopOffset int64) {
|
func (pw *PageWriter) LockForRead(startOffset, stopOffset int64) {
|
||||||
pw.randomWriter.LockForRead(startOffset, stopOffset)
|
pw.randomWriter.LockForRead(startOffset, stopOffset)
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@ type DirtyPages interface {
|
||||||
AddPage(offset int64, data []byte, isSequential bool)
|
AddPage(offset int64, data []byte, isSequential bool)
|
||||||
FlushData() error
|
FlushData() error
|
||||||
ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
|
ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
|
||||||
GetStorageOptions() (collection, replication string)
|
|
||||||
Destroy()
|
Destroy()
|
||||||
LockForRead(startOffset, stopOffset int64)
|
LockForRead(startOffset, stopOffset int64)
|
||||||
UnlockForRead(startOffset, stopOffset int64)
|
UnlockForRead(startOffset, stopOffset int64)
|
||||||
|
|
|
@ -15,7 +15,7 @@ import (
|
||||||
|
|
||||||
func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
|
func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
|
||||||
|
|
||||||
return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
|
return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, err error) {
|
||||||
var fileId, host string
|
var fileId, host string
|
||||||
var auth security.EncodedJwt
|
var auth security.EncodedJwt
|
||||||
|
|
||||||
|
@ -43,12 +43,11 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
|
||||||
fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
|
fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
|
||||||
loc := resp.Location
|
loc := resp.Location
|
||||||
host = wfs.AdjustedUrl(loc)
|
host = wfs.AdjustedUrl(loc)
|
||||||
collection, replication = resp.Collection, resp.Replication
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
|
return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
||||||
|
@ -67,11 +66,11 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
|
||||||
uploadResult, err, data := operation.Upload(reader, uploadOption)
|
uploadResult, err, data := operation.Upload(reader, uploadOption)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
|
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
|
||||||
return nil, "", "", fmt.Errorf("upload data: %v", err)
|
return nil, fmt.Errorf("upload data: %v", err)
|
||||||
}
|
}
|
||||||
if uploadResult.Error != "" {
|
if uploadResult.Error != "" {
|
||||||
glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
|
glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
|
||||||
return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error)
|
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
|
||||||
}
|
}
|
||||||
|
|
||||||
if offset == 0 {
|
if offset == 0 {
|
||||||
|
@ -79,6 +78,6 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
|
||||||
}
|
}
|
||||||
|
|
||||||
chunk = uploadResult.ToPbFileChunk(fileId, offset)
|
chunk = uploadResult.ToPbFileChunk(fileId, offset)
|
||||||
return chunk, collection, replication, nil
|
return chunk, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -255,11 +255,11 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
|
||||||
|
|
||||||
func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
|
func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
|
||||||
|
|
||||||
return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) {
|
return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, error) {
|
||||||
// assign one file id for one chunk
|
// assign one file id for one chunk
|
||||||
fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so)
|
fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so)
|
||||||
if assignErr != nil {
|
if assignErr != nil {
|
||||||
return nil, "", "", assignErr
|
return nil, assignErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// upload the chunk to the volume server
|
// upload the chunk to the volume server
|
||||||
|
@ -274,10 +274,10 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
|
||||||
}
|
}
|
||||||
uploadResult, uploadErr, _ := operation.Upload(reader, uploadOption)
|
uploadResult, uploadErr, _ := operation.Upload(reader, uploadOption)
|
||||||
if uploadErr != nil {
|
if uploadErr != nil {
|
||||||
return nil, "", "", uploadErr
|
return nil, uploadErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return uploadResult.ToPbFileChunk(fileId, offset), so.Collection, so.Replication, nil
|
return uploadResult.ToPbFileChunk(fileId, offset), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -105,8 +105,6 @@ type WebDavFile struct {
|
||||||
entryViewCache []filer.VisibleInterval
|
entryViewCache []filer.VisibleInterval
|
||||||
reader io.ReaderAt
|
reader io.ReaderAt
|
||||||
bufWriter *buffered_writer.BufferedWriteCloser
|
bufWriter *buffered_writer.BufferedWriteCloser
|
||||||
collection string
|
|
||||||
replication string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
|
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
|
||||||
|
@ -376,7 +374,7 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo,
|
||||||
return fs.stat(ctx, name)
|
return fs.stat(ctx, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
|
func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
|
||||||
|
|
||||||
var fileId, host string
|
var fileId, host string
|
||||||
var auth security.EncodedJwt
|
var auth security.EncodedJwt
|
||||||
|
@ -404,7 +402,6 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
|
||||||
}
|
}
|
||||||
|
|
||||||
fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
|
fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
|
||||||
f.collection, f.replication = resp.Collection, resp.Replication
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -414,7 +411,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}); flushErr != nil {
|
}); flushErr != nil {
|
||||||
return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
|
return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
||||||
|
@ -430,13 +427,13 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
|
||||||
uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
|
uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
|
||||||
if flushErr != nil {
|
if flushErr != nil {
|
||||||
glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
|
glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
|
||||||
return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
|
return nil, fmt.Errorf("upload data: %v", flushErr)
|
||||||
}
|
}
|
||||||
if uploadResult.Error != "" {
|
if uploadResult.Error != "" {
|
||||||
glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
|
glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
|
||||||
return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error)
|
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
|
||||||
}
|
}
|
||||||
return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil
|
return uploadResult.ToPbFileChunk(fileId, offset), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *WebDavFile) Write(buf []byte) (int, error) {
|
func (f *WebDavFile) Write(buf []byte) (int, error) {
|
||||||
|
@ -462,7 +459,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
|
||||||
f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
|
f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
|
||||||
|
|
||||||
var chunk *filer_pb.FileChunk
|
var chunk *filer_pb.FileChunk
|
||||||
chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
|
chunk, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
|
||||||
|
|
||||||
if flushErr != nil {
|
if flushErr != nil {
|
||||||
return fmt.Errorf("%s upload result: %v", f.name, flushErr)
|
return fmt.Errorf("%s upload result: %v", f.name, flushErr)
|
||||||
|
|
Loading…
Reference in a new issue