mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
a990ef2106
fix https://github.com/chrislusf/seaweedfs/issues/1182 always use the non-duplicated fs.Node Forget() the fs.Node Rename will also use the right fs.Node Avoid using the same file handle for the same file
225 lines
6.9 KiB
Go
225 lines
6.9 KiB
Go
package filesys
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
|
"github.com/chrislusf/seaweedfs/weed/operation"
|
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/chrislusf/seaweedfs/weed/security"
|
|
)
|
|
|
|
type ContinuousDirtyPages struct {
|
|
hasData bool
|
|
Offset int64
|
|
Size int64
|
|
Data []byte
|
|
f *File
|
|
lock sync.Mutex
|
|
}
|
|
|
|
func newDirtyPages(file *File) *ContinuousDirtyPages {
|
|
return &ContinuousDirtyPages{
|
|
Data: nil,
|
|
f: file,
|
|
}
|
|
}
|
|
|
|
func (pages *ContinuousDirtyPages) releaseResource() {
|
|
if pages.Data != nil {
|
|
pages.f.wfs.bufPool.Put(pages.Data)
|
|
pages.Data = nil
|
|
atomic.AddInt32(&counter, -1)
|
|
glog.V(3).Infof("%s/%s releasing resource %d", pages.f.dir.Path, pages.f.Name, counter)
|
|
}
|
|
}
|
|
|
|
var counter = int32(0)
|
|
|
|
func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
|
|
|
|
pages.lock.Lock()
|
|
defer pages.lock.Unlock()
|
|
|
|
var chunk *filer_pb.FileChunk
|
|
|
|
if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
|
|
// this is more than what buffer can hold.
|
|
return pages.flushAndSave(ctx, offset, data)
|
|
}
|
|
|
|
if pages.Data == nil {
|
|
pages.Data = pages.f.wfs.bufPool.Get().([]byte)
|
|
atomic.AddInt32(&counter, 1)
|
|
glog.V(3).Infof("%s/%s acquire resource %d", pages.f.dir.Path, pages.f.Name, counter)
|
|
}
|
|
|
|
if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) ||
|
|
pages.Offset+int64(len(pages.Data)) < offset+int64(len(data)) {
|
|
// if the data is out of range,
|
|
// or buffer is full if adding new data,
|
|
// flush current buffer and add new data
|
|
|
|
glog.V(4).Infof("offset=%d, size=%d, existing pages offset=%d, pages size=%d, data=%d", offset, len(data), pages.Offset, pages.Size, len(pages.Data))
|
|
|
|
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
|
|
if chunk != nil {
|
|
glog.V(4).Infof("%s/%s add save [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
|
chunks = append(chunks, chunk)
|
|
}
|
|
} else {
|
|
glog.V(0).Infof("%s/%s add save [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
|
return
|
|
}
|
|
pages.Offset = offset
|
|
glog.V(4).Infof("copy data0: offset=%d, size=%d, existing pages offset=%d, pages size=%d, data=%d", offset, len(data), pages.Offset, pages.Size, len(pages.Data))
|
|
copy(pages.Data, data)
|
|
pages.Size = int64(len(data))
|
|
return
|
|
}
|
|
|
|
if offset != pages.Offset+pages.Size {
|
|
// when this happens, debug shows the data overlapping with existing data is empty
|
|
// the data is not just append
|
|
if offset == pages.Offset && int(pages.Size) < len(data) {
|
|
glog.V(4).Infof("copy data1: offset=%d, size=%d, existing pages offset=%d, pages size=%d, data=%d", offset, len(data), pages.Offset, pages.Size, len(pages.Data))
|
|
copy(pages.Data[pages.Size:], data[pages.Size:])
|
|
} else {
|
|
if pages.Size != 0 {
|
|
glog.V(1).Infof("%s/%s add page: pages [%d, %d) write [%d, %d)", pages.f.dir.Path, pages.f.Name, pages.Offset, pages.Offset+pages.Size, offset, offset+int64(len(data)))
|
|
}
|
|
return pages.flushAndSave(ctx, offset, data)
|
|
}
|
|
} else {
|
|
glog.V(4).Infof("copy data2: offset=%d, size=%d, existing pages offset=%d, pages size=%d, data=%d", offset, len(data), pages.Offset, pages.Size, len(pages.Data))
|
|
copy(pages.Data[offset-pages.Offset:], data)
|
|
}
|
|
|
|
pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset)
|
|
|
|
return
|
|
}
|
|
|
|
func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
|
|
|
|
var chunk *filer_pb.FileChunk
|
|
|
|
// flush existing
|
|
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
|
|
if chunk != nil {
|
|
glog.V(4).Infof("%s/%s flush existing [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId)
|
|
chunks = append(chunks, chunk)
|
|
}
|
|
} else {
|
|
glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
|
return
|
|
}
|
|
pages.Size = 0
|
|
pages.Offset = 0
|
|
|
|
// flush the new page
|
|
if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil {
|
|
if chunk != nil {
|
|
glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId)
|
|
chunks = append(chunks, chunk)
|
|
}
|
|
} else {
|
|
glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) {
|
|
|
|
pages.lock.Lock()
|
|
defer pages.lock.Unlock()
|
|
|
|
if pages.Size == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
|
|
pages.Size = 0
|
|
pages.Offset = 0
|
|
if chunk != nil {
|
|
glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
|
|
|
|
if pages.Size == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
glog.V(0).Infof("%s/%s saveExistingPagesToStorage [%d,%d): Data len=%d", pages.f.dir.Path, pages.f.Name, pages.Offset, pages.Size, len(pages.Data))
|
|
|
|
return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset)
|
|
}
|
|
|
|
func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
|
|
|
|
var fileId, host string
|
|
var auth security.EncodedJwt
|
|
|
|
if err := pages.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
|
|
|
|
request := &filer_pb.AssignVolumeRequest{
|
|
Count: 1,
|
|
Replication: pages.f.wfs.option.Replication,
|
|
Collection: pages.f.wfs.option.Collection,
|
|
TtlSec: pages.f.wfs.option.TtlSec,
|
|
DataCenter: pages.f.wfs.option.DataCenter,
|
|
}
|
|
|
|
resp, err := client.AssignVolume(ctx, request)
|
|
if err != nil {
|
|
glog.V(0).Infof("assign volume failure %v: %v", request, err)
|
|
return err
|
|
}
|
|
|
|
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
|
|
}
|
|
|
|
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
|
bufReader := bytes.NewReader(buf)
|
|
uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "", nil, auth)
|
|
if err != nil {
|
|
glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
|
|
return nil, fmt.Errorf("upload data: %v", err)
|
|
}
|
|
if uploadResult.Error != "" {
|
|
glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
|
|
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
|
|
}
|
|
|
|
return &filer_pb.FileChunk{
|
|
FileId: fileId,
|
|
Offset: offset,
|
|
Size: uint64(len(buf)),
|
|
Mtime: time.Now().UnixNano(),
|
|
ETag: uploadResult.ETag,
|
|
}, nil
|
|
|
|
}
|
|
|
|
func max(x, y int64) int64 {
|
|
if x > y {
|
|
return x
|
|
}
|
|
return y
|
|
}
|