mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
refactoring
some previous chunk etag was using md5, which should be wrong.
This commit is contained in:
parent
17d5ac4cd3
commit
871efa4fc1
|
@ -14,9 +14,10 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/util/grace"
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/util/grace"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
@ -324,15 +325,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
|
||||||
}
|
}
|
||||||
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
|
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
|
||||||
|
|
||||||
chunks = append(chunks, &filer_pb.FileChunk{
|
chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0))
|
||||||
FileId: assignResult.FileId,
|
|
||||||
Offset: 0,
|
|
||||||
Size: uint64(uploadResult.Size),
|
|
||||||
Mtime: time.Now().UnixNano(),
|
|
||||||
ETag: uploadResult.Md5,
|
|
||||||
CipherKey: uploadResult.CipherKey,
|
|
||||||
IsGzipped: uploadResult.Gzip > 0,
|
|
||||||
})
|
|
||||||
|
|
||||||
fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
|
fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
|
||||||
}
|
}
|
||||||
|
@ -435,15 +428,8 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
|
||||||
uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
|
uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
chunksChan <- &filer_pb.FileChunk{
|
chunksChan <- uploadResult.ToPbFileChunk(assignResult.FileId, i * chunkSize)
|
||||||
FileId: assignResult.FileId,
|
|
||||||
Offset: i * chunkSize,
|
|
||||||
Size: uint64(uploadResult.Size),
|
|
||||||
Mtime: time.Now().UnixNano(),
|
|
||||||
ETag: uploadResult.ETag,
|
|
||||||
CipherKey: uploadResult.CipherKey,
|
|
||||||
IsGzipped: uploadResult.Gzip > 0,
|
|
||||||
}
|
|
||||||
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
|
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,16 +38,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// append to existing chunks
|
// append to existing chunks
|
||||||
chunk := &filer_pb.FileChunk{
|
entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset))
|
||||||
FileId: assignResult.Fid,
|
|
||||||
Offset: offset,
|
|
||||||
Size: uint64(uploadResult.Size),
|
|
||||||
Mtime: time.Now().UnixNano(),
|
|
||||||
ETag: uploadResult.ETag,
|
|
||||||
CipherKey: uploadResult.CipherKey,
|
|
||||||
IsGzipped: uploadResult.Gzip > 0,
|
|
||||||
}
|
|
||||||
entry.Chunks = append(entry.Chunks, chunk)
|
|
||||||
|
|
||||||
// update the entry
|
// update the entry
|
||||||
err = f.CreateEntry(context.Background(), entry, false)
|
err = f.CreateEntry(context.Background(), entry, false)
|
||||||
|
|
|
@ -187,15 +187,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
|
||||||
}
|
}
|
||||||
pages.f.wfs.chunkCache.SetChunk(fileId, data)
|
pages.f.wfs.chunkCache.SetChunk(fileId, data)
|
||||||
|
|
||||||
return &filer_pb.FileChunk{
|
return uploadResult.ToPbFileChunk(fileId, offset), nil
|
||||||
FileId: fileId,
|
|
||||||
Offset: offset,
|
|
||||||
Size: uint64(size),
|
|
||||||
Mtime: time.Now().UnixNano(),
|
|
||||||
ETag: uploadResult.ETag,
|
|
||||||
CipherKey: uploadResult.CipherKey,
|
|
||||||
IsGzipped: uploadResult.Gzip > 0,
|
|
||||||
}, nil
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,6 @@ package broker
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||||
|
@ -23,23 +22,13 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messag
|
||||||
|
|
||||||
dir, name := util.FullPath(targetFile).DirAndName()
|
dir, name := util.FullPath(targetFile).DirAndName()
|
||||||
|
|
||||||
chunk := &filer_pb.FileChunk{
|
|
||||||
FileId: assignResult.Fid,
|
|
||||||
Offset: 0, // needs to be fixed during appending
|
|
||||||
Size: uint64(uploadResult.Size),
|
|
||||||
Mtime: time.Now().UnixNano(),
|
|
||||||
ETag: uploadResult.ETag,
|
|
||||||
CipherKey: uploadResult.CipherKey,
|
|
||||||
IsGzipped: uploadResult.Gzip > 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
// append the chunk
|
// append the chunk
|
||||||
if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.AppendToEntryRequest{
|
request := &filer_pb.AppendToEntryRequest{
|
||||||
Directory: dir,
|
Directory: dir,
|
||||||
EntryName: name,
|
EntryName: name,
|
||||||
Chunks: []*filer_pb.FileChunk{chunk},
|
Chunks: []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(assignResult.Fid, 0)},
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := client.AppendToEntry(context.Background(), request)
|
_, err := client.AppendToEntry(context.Background(), request)
|
||||||
|
|
|
@ -14,8 +14,10 @@ import (
|
||||||
"net/textproto"
|
"net/textproto"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/security"
|
"github.com/chrislusf/seaweedfs/weed/security"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
|
@ -31,6 +33,18 @@ type UploadResult struct {
|
||||||
Md5 string `json:"md5,omitempty"`
|
Md5 string `json:"md5,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
|
||||||
|
return &filer_pb.FileChunk{
|
||||||
|
FileId: fileId,
|
||||||
|
Offset: offset,
|
||||||
|
Size: uint64(uploadResult.Size),
|
||||||
|
Mtime: time.Now().UnixNano(),
|
||||||
|
ETag: uploadResult.ETag,
|
||||||
|
CipherKey: uploadResult.CipherKey,
|
||||||
|
IsGzipped: uploadResult.Gzip > 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
client *http.Client
|
client *http.Client
|
||||||
)
|
)
|
||||||
|
|
|
@ -119,17 +119,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save to chunk manifest structure
|
// Save to chunk manifest structure
|
||||||
fileChunks = append(fileChunks,
|
fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
|
||||||
&filer_pb.FileChunk{
|
|
||||||
FileId: fileId,
|
|
||||||
Offset: chunkOffset,
|
|
||||||
Size: uint64(uploadResult.Size),
|
|
||||||
Mtime: time.Now().UnixNano(),
|
|
||||||
ETag: uploadResult.ETag,
|
|
||||||
CipherKey: uploadResult.CipherKey,
|
|
||||||
IsGzipped: uploadResult.Gzip > 0,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength)
|
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength)
|
||||||
|
|
||||||
|
|
|
@ -46,17 +46,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save to chunk manifest structure
|
// Save to chunk manifest structure
|
||||||
fileChunks := []*filer_pb.FileChunk{
|
fileChunks := []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, 0)}
|
||||||
{
|
|
||||||
FileId: fileId,
|
|
||||||
Offset: 0,
|
|
||||||
Size: uint64(uploadResult.Size),
|
|
||||||
Mtime: time.Now().UnixNano(),
|
|
||||||
ETag: uploadResult.Md5,
|
|
||||||
CipherKey: uploadResult.CipherKey,
|
|
||||||
IsGzipped: uploadResult.Gzip > 0,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// fmt.Printf("uploaded: %+v\n", uploadResult)
|
// fmt.Printf("uploaded: %+v\n", uploadResult)
|
||||||
|
|
||||||
|
|
|
@ -418,17 +418,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
|
||||||
return 0, fmt.Errorf("upload result: %v", uploadResult.Error)
|
return 0, fmt.Errorf("upload result: %v", uploadResult.Error)
|
||||||
}
|
}
|
||||||
|
|
||||||
chunk := &filer_pb.FileChunk{
|
f.entry.Chunks = append(f.entry.Chunks, uploadResult.ToPbFileChunk(fileId, f.off))
|
||||||
FileId: fileId,
|
|
||||||
Offset: f.off,
|
|
||||||
Size: uint64(len(buf)),
|
|
||||||
Mtime: time.Now().UnixNano(),
|
|
||||||
ETag: uploadResult.ETag,
|
|
||||||
CipherKey: uploadResult.CipherKey,
|
|
||||||
IsGzipped: uploadResult.Gzip > 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
f.entry.Chunks = append(f.entry.Chunks, chunk)
|
|
||||||
|
|
||||||
err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
f.entry.Attributes.Mtime = time.Now().Unix()
|
f.entry.Attributes.Mtime = time.Now().Unix()
|
||||||
|
|
Loading…
Reference in a new issue