mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge branch 'upstreamMaster' into check_chunkviews_mr
# Conflicts: # weed/filer/filechunk_manifest.go # weed/filer/stream.go # weed/replication/repl_util/replication_util.go # weed/util/fasthttp_util.go
This commit is contained in:
commit
eb54993a4e
|
@ -1,5 +1,5 @@
|
||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
description: SeaweedFS
|
description: SeaweedFS
|
||||||
name: seaweedfs
|
name: seaweedfs
|
||||||
appVersion: "2.32"
|
appVersion: "2.34"
|
||||||
version: 2.32
|
version: 2.34
|
||||||
|
|
|
@ -4,7 +4,7 @@ global:
|
||||||
registry: ""
|
registry: ""
|
||||||
repository: ""
|
repository: ""
|
||||||
imageName: chrislusf/seaweedfs
|
imageName: chrislusf/seaweedfs
|
||||||
# imageTag: "2.32" - started using {.Chart.appVersion}
|
# imageTag: "2.34" - started using {.Chart.appVersion}
|
||||||
imagePullPolicy: IfNotPresent
|
imagePullPolicy: IfNotPresent
|
||||||
imagePullSecrets: imagepullsecret
|
imagePullSecrets: imagepullsecret
|
||||||
restartPolicy: Always
|
restartPolicy: Always
|
||||||
|
|
|
@ -305,7 +305,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
|
||||||
}
|
}
|
||||||
var bytes []byte
|
var bytes []byte
|
||||||
for _, url := range urls {
|
for _, url := range urls {
|
||||||
bytes, _, err = util.FastGet(url)
|
bytes, _, err = util.Get(url)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,7 +102,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
|
||||||
|
|
||||||
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
|
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
|
||||||
for _, urlString := range urlStrings {
|
for _, urlString := range urlStrings {
|
||||||
shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, isCheck, offset, size, func(data []byte) {
|
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
|
||||||
buffer.Write(data)
|
buffer.Write(data)
|
||||||
})
|
})
|
||||||
if !shouldRetry {
|
if !shouldRetry {
|
||||||
|
|
|
@ -23,10 +23,6 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func FileSize2(entry *Entry) (size uint64) {
|
|
||||||
return maxUint64(TotalSize(entry.Chunks), entry.Attr.FileSize)
|
|
||||||
}
|
|
||||||
|
|
||||||
func FileSize(entry *filer_pb.Entry) (size uint64) {
|
func FileSize(entry *filer_pb.Entry) (size uint64) {
|
||||||
return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize)
|
return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize)
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,7 @@ func ReadContent(filerAddress string, dir, name string) ([]byte, error) {
|
||||||
|
|
||||||
target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name)
|
target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name)
|
||||||
|
|
||||||
data, _, err := util.FastGet(target)
|
data, _, err := util.Get(target)
|
||||||
|
|
||||||
return data, err
|
return data, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,6 +61,7 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c
|
||||||
glog.Errorf("read chunk: %v", err)
|
glog.Errorf("read chunk: %v", err)
|
||||||
return fmt.Errorf("read chunk: %v", err)
|
return fmt.Errorf("read chunk: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = w.Write(data)
|
_, err = w.Write(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("write chunk: %v", err)
|
glog.Errorf("write chunk: %v", err)
|
||||||
|
@ -221,7 +222,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
|
||||||
var buffer bytes.Buffer
|
var buffer bytes.Buffer
|
||||||
var shouldRetry bool
|
var shouldRetry bool
|
||||||
for _, urlString := range urlStrings {
|
for _, urlString := range urlStrings {
|
||||||
shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), false, chunkView.Offset, int(chunkView.Size), func(data []byte) {
|
shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
|
||||||
buffer.Write(data)
|
buffer.Write(data)
|
||||||
})
|
})
|
||||||
if !shouldRetry {
|
if !shouldRetry {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package filesys
|
package filesys
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
|
@ -21,7 +22,7 @@ import (
|
||||||
type Dir struct {
|
type Dir struct {
|
||||||
name string
|
name string
|
||||||
wfs *WFS
|
wfs *WFS
|
||||||
entry *filer.Entry
|
entry *filer_pb.Entry
|
||||||
parent *Dir
|
parent *Dir
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,11 +59,11 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// attr.Inode = util.FullPath(dir.FullPath()).AsInode()
|
// attr.Inode = util.FullPath(dir.FullPath()).AsInode()
|
||||||
attr.Mode = dir.entry.Attr.Mode | os.ModeDir
|
attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir
|
||||||
attr.Mtime = dir.entry.Attr.Mtime
|
attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0)
|
||||||
attr.Crtime = dir.entry.Attr.Crtime
|
attr.Crtime = time.Unix(dir.entry.Attributes.Crtime, 0)
|
||||||
attr.Gid = dir.entry.Attr.Gid
|
attr.Gid = dir.entry.Attributes.Gid
|
||||||
attr.Uid = dir.entry.Attr.Uid
|
attr.Uid = dir.entry.Attributes.Uid
|
||||||
|
|
||||||
glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr)
|
glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr)
|
||||||
|
|
||||||
|
@ -102,13 +103,12 @@ func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
|
func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
|
||||||
dirPath := dir.FullPath()
|
f := dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node {
|
||||||
f := dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dirPath, name), func() fs.Node {
|
|
||||||
return &File{
|
return &File{
|
||||||
Name: name,
|
Name: name,
|
||||||
dir: dir,
|
dir: dir,
|
||||||
wfs: dir.wfs,
|
wfs: dir.wfs,
|
||||||
entry: filer.FromPbEntry(dirPath, entry),
|
entry: entry,
|
||||||
entryViewCache: nil,
|
entryViewCache: nil,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -119,7 +119,7 @@ func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
|
||||||
func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node {
|
func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node {
|
||||||
|
|
||||||
d := dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node {
|
d := dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node {
|
||||||
return &Dir{name: entry.Name, wfs: dir.wfs, entry: filer.FromPbEntry(dir.FullPath(), entry), parent: dir}
|
return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir}
|
||||||
})
|
})
|
||||||
d.(*Dir).parent = dir // in case dir node was created later
|
d.(*Dir).parent = dir // in case dir node was created later
|
||||||
return d
|
return d
|
||||||
|
@ -436,19 +436,19 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Valid.Mode() {
|
if req.Valid.Mode() {
|
||||||
dir.entry.Attr.Mode = req.Mode
|
dir.entry.Attributes.FileMode = uint32(req.Mode)
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Valid.Uid() {
|
if req.Valid.Uid() {
|
||||||
dir.entry.Attr.Uid = req.Uid
|
dir.entry.Attributes.Uid = req.Uid
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Valid.Gid() {
|
if req.Valid.Gid() {
|
||||||
dir.entry.Attr.Gid = req.Gid
|
dir.entry.Attributes.Gid = req.Gid
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Valid.Mtime() {
|
if req.Valid.Mtime() {
|
||||||
dir.entry.Attr.Mtime = req.Mtime
|
dir.entry.Attributes.Mtime = req.Mtime.Unix()
|
||||||
}
|
}
|
||||||
|
|
||||||
return dir.saveEntry()
|
return dir.saveEntry()
|
||||||
|
@ -527,14 +527,12 @@ func (dir *Dir) saveEntry() error {
|
||||||
|
|
||||||
return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
pbEntry := dir.entry.ToProtoEntry()
|
dir.wfs.mapPbIdFromLocalToFiler(dir.entry)
|
||||||
|
defer dir.wfs.mapPbIdFromFilerToLocal(dir.entry)
|
||||||
dir.wfs.mapPbIdFromLocalToFiler(pbEntry)
|
|
||||||
defer dir.wfs.mapPbIdFromFilerToLocal(pbEntry)
|
|
||||||
|
|
||||||
request := &filer_pb.UpdateEntryRequest{
|
request := &filer_pb.UpdateEntryRequest{
|
||||||
Directory: parentDir,
|
Directory: parentDir,
|
||||||
Entry: pbEntry,
|
Entry: dir.entry,
|
||||||
Signatures: []int32{dir.wfs.signature},
|
Signatures: []int32{dir.wfs.signature},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -552,5 +550,25 @@ func (dir *Dir) saveEntry() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dir *Dir) FullPath() string {
|
func (dir *Dir) FullPath() string {
|
||||||
return string(dir.entry.FullPath)
|
var parts []string
|
||||||
|
for p := dir; p != nil; p = p.parent {
|
||||||
|
if strings.HasPrefix(p.name, "/") {
|
||||||
|
if len(p.name) > 1 {
|
||||||
|
parts = append(parts, p.name[1:])
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
parts = append(parts, p.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(parts) == 0 {
|
||||||
|
return "/"
|
||||||
|
}
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
for i := len(parts) - 1; i >= 0; i-- {
|
||||||
|
buf.WriteString("/")
|
||||||
|
buf.WriteString(parts[i])
|
||||||
|
}
|
||||||
|
return buf.String()
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,7 +48,7 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
|
||||||
oldEntry.HardLinkCounter++
|
oldEntry.HardLinkCounter++
|
||||||
updateOldEntryRequest := &filer_pb.UpdateEntryRequest{
|
updateOldEntryRequest := &filer_pb.UpdateEntryRequest{
|
||||||
Directory: oldFile.dir.FullPath(),
|
Directory: oldFile.dir.FullPath(),
|
||||||
Entry: oldEntry.ToProtoEntry(),
|
Entry: oldEntry,
|
||||||
Signatures: []int32{dir.wfs.signature},
|
Signatures: []int32{dir.wfs.signature},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
|
||||||
Entry: &filer_pb.Entry{
|
Entry: &filer_pb.Entry{
|
||||||
Name: req.NewName,
|
Name: req.NewName,
|
||||||
IsDirectory: false,
|
IsDirectory: false,
|
||||||
Attributes: filer.EntryAttributeToPb(oldEntry),
|
Attributes: oldEntry.Attributes,
|
||||||
Chunks: oldEntry.Chunks,
|
Chunks: oldEntry.Chunks,
|
||||||
Extended: oldEntry.Extended,
|
Extended: oldEntry.Extended,
|
||||||
HardLinkId: oldEntry.HardLinkId,
|
HardLinkId: oldEntry.HardLinkId,
|
||||||
|
@ -152,12 +152,12 @@ func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (stri
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
if entry.Attr.Mode&os.ModeSymlink == 0 {
|
if os.FileMode(entry.Attributes.FileMode)&os.ModeSymlink == 0 {
|
||||||
return "", fuse.Errno(syscall.EINVAL)
|
return "", fuse.Errno(syscall.EINVAL)
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, entry.Attr.SymlinkTarget)
|
glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, entry.Attributes.SymlinkTarget)
|
||||||
|
|
||||||
return entry.Attr.SymlinkTarget, nil
|
return entry.Attributes.SymlinkTarget, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,7 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
fileSize := int64(entry.Attr.FileSize)
|
fileSize := int64(entry.Attributes.FileSize)
|
||||||
|
|
||||||
chunkSize := min(maxList.Size(), fileSize-maxList.Offset())
|
chunkSize := min(maxList.Size(), fileSize-maxList.Offset())
|
||||||
if chunkSize == 0 {
|
if chunkSize == 0 {
|
||||||
|
|
|
@ -33,7 +33,7 @@ type File struct {
|
||||||
Name string
|
Name string
|
||||||
dir *Dir
|
dir *Dir
|
||||||
wfs *WFS
|
wfs *WFS
|
||||||
entry *filer.Entry
|
entry *filer_pb.Entry
|
||||||
entryLock sync.RWMutex
|
entryLock sync.RWMutex
|
||||||
entryViewCache []filer.VisibleInterval
|
entryViewCache []filer.VisibleInterval
|
||||||
isOpen int
|
isOpen int
|
||||||
|
@ -56,18 +56,22 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if entry == nil {
|
||||||
|
return fuse.ENOENT
|
||||||
|
}
|
||||||
|
|
||||||
// attr.Inode = file.fullpath().AsInode()
|
// attr.Inode = file.fullpath().AsInode()
|
||||||
attr.Valid = time.Second
|
attr.Valid = time.Second
|
||||||
attr.Mode = os.FileMode(entry.Attr.Mode)
|
attr.Mode = os.FileMode(entry.Attributes.FileMode)
|
||||||
attr.Size = filer.FileSize2(entry)
|
attr.Size = filer.FileSize(entry)
|
||||||
if file.isOpen > 0 {
|
if file.isOpen > 0 {
|
||||||
attr.Size = entry.Attr.FileSize
|
attr.Size = entry.Attributes.FileSize
|
||||||
glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size)
|
glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size)
|
||||||
}
|
}
|
||||||
attr.Crtime = entry.Attr.Crtime
|
attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
|
||||||
attr.Mtime = entry.Attr.Mtime
|
attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
|
||||||
attr.Gid = entry.Attr.Gid
|
attr.Gid = entry.Attributes.Gid
|
||||||
attr.Uid = entry.Attr.Uid
|
attr.Uid = entry.Attributes.Uid
|
||||||
attr.Blocks = attr.Size/blockSize + 1
|
attr.Blocks = attr.Size/blockSize + 1
|
||||||
attr.BlockSize = uint32(file.wfs.option.ChunkSizeLimit)
|
attr.BlockSize = uint32(file.wfs.option.ChunkSizeLimit)
|
||||||
if entry.HardLinkCounter > 0 {
|
if entry.HardLinkCounter > 0 {
|
||||||
|
@ -126,7 +130,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
|
||||||
if req.Valid.Size() {
|
if req.Valid.Size() {
|
||||||
|
|
||||||
glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(entry.Chunks))
|
glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(entry.Chunks))
|
||||||
if req.Size < filer.FileSize2(entry) {
|
if req.Size < filer.FileSize(entry) {
|
||||||
// fmt.Printf("truncate %v \n", fullPath)
|
// fmt.Printf("truncate %v \n", fullPath)
|
||||||
var chunks []*filer_pb.FileChunk
|
var chunks []*filer_pb.FileChunk
|
||||||
var truncatedChunks []*filer_pb.FileChunk
|
var truncatedChunks []*filer_pb.FileChunk
|
||||||
|
@ -149,32 +153,32 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
|
||||||
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks)
|
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks)
|
||||||
file.reader = nil
|
file.reader = nil
|
||||||
}
|
}
|
||||||
entry.Attr.FileSize = req.Size
|
entry.Attributes.FileSize = req.Size
|
||||||
file.dirtyMetadata = true
|
file.dirtyMetadata = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Valid.Mode() {
|
if req.Valid.Mode() {
|
||||||
entry.Attr.Mode = req.Mode
|
entry.Attributes.FileMode = uint32(req.Mode)
|
||||||
file.dirtyMetadata = true
|
file.dirtyMetadata = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Valid.Uid() {
|
if req.Valid.Uid() {
|
||||||
entry.Attr.Uid = req.Uid
|
entry.Attributes.Uid = req.Uid
|
||||||
file.dirtyMetadata = true
|
file.dirtyMetadata = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Valid.Gid() {
|
if req.Valid.Gid() {
|
||||||
entry.Attr.Gid = req.Gid
|
entry.Attributes.Gid = req.Gid
|
||||||
file.dirtyMetadata = true
|
file.dirtyMetadata = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Valid.Crtime() {
|
if req.Valid.Crtime() {
|
||||||
entry.Attr.Crtime = req.Crtime
|
entry.Attributes.Crtime = req.Crtime.Unix()
|
||||||
file.dirtyMetadata = true
|
file.dirtyMetadata = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Valid.Mtime() {
|
if req.Valid.Mtime() {
|
||||||
entry.Attr.Mtime = req.Mtime
|
entry.Attributes.Mtime = req.Mtime.Unix()
|
||||||
file.dirtyMetadata = true
|
file.dirtyMetadata = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -259,7 +263,7 @@ func (file *File) Forget() {
|
||||||
file.wfs.fsNodeCache.DeleteFsNode(t)
|
file.wfs.fsNodeCache.DeleteFsNode(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer.Entry, err error) {
|
func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) {
|
||||||
entry = file.getEntry()
|
entry = file.getEntry()
|
||||||
if file.isOpen > 0 {
|
if file.isOpen > 0 {
|
||||||
return entry, nil
|
return entry, nil
|
||||||
|
@ -330,7 +334,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
|
||||||
entry.Chunks = append(entry.Chunks, newChunks...)
|
entry.Chunks = append(entry.Chunks, newChunks...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (file *File) setEntry(entry *filer.Entry) {
|
func (file *File) setEntry(entry *filer_pb.Entry) {
|
||||||
file.entryLock.Lock()
|
file.entryLock.Lock()
|
||||||
defer file.entryLock.Unlock()
|
defer file.entryLock.Unlock()
|
||||||
file.entry = entry
|
file.entry = entry
|
||||||
|
@ -346,17 +350,15 @@ func (file *File) clearEntry() {
|
||||||
file.reader = nil
|
file.reader = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (file *File) saveEntry(entry *filer.Entry) error {
|
func (file *File) saveEntry(entry *filer_pb.Entry) error {
|
||||||
return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
pbEntry := entry.ToProtoEntry()
|
file.wfs.mapPbIdFromLocalToFiler(entry)
|
||||||
|
defer file.wfs.mapPbIdFromFilerToLocal(entry)
|
||||||
file.wfs.mapPbIdFromLocalToFiler(pbEntry)
|
|
||||||
defer file.wfs.mapPbIdFromFilerToLocal(pbEntry)
|
|
||||||
|
|
||||||
request := &filer_pb.UpdateEntryRequest{
|
request := &filer_pb.UpdateEntryRequest{
|
||||||
Directory: file.dir.FullPath(),
|
Directory: file.dir.FullPath(),
|
||||||
Entry: pbEntry,
|
Entry: entry,
|
||||||
Signatures: []int32{file.wfs.signature},
|
Signatures: []int32{file.wfs.signature},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -373,7 +375,7 @@ func (file *File) saveEntry(entry *filer.Entry) error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (file *File) getEntry() *filer.Entry {
|
func (file *File) getEntry() *filer_pb.Entry {
|
||||||
file.entryLock.RLock()
|
file.entryLock.RLock()
|
||||||
defer file.entryLock.RUnlock()
|
defer file.entryLock.RUnlock()
|
||||||
return file.entry
|
return file.entry
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -41,7 +42,7 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle {
|
||||||
}
|
}
|
||||||
entry := fh.f.getEntry()
|
entry := fh.f.getEntry()
|
||||||
if entry != nil {
|
if entry != nil {
|
||||||
entry.Attr.FileSize = filer.FileSize2(entry)
|
entry.Attributes.FileSize = filer.FileSize(entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
return fh
|
return fh
|
||||||
|
@ -109,7 +110,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
|
||||||
return 0, io.EOF
|
return 0, io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
fileSize := int64(filer.FileSize2(entry))
|
fileSize := int64(filer.FileSize(entry))
|
||||||
fileFullPath := fh.f.fullpath()
|
fileFullPath := fh.f.fullpath()
|
||||||
|
|
||||||
if fileSize == 0 {
|
if fileSize == 0 {
|
||||||
|
@ -170,7 +171,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
|
||||||
}
|
}
|
||||||
|
|
||||||
entry.Content = nil
|
entry.Content = nil
|
||||||
entry.Attr.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attr.FileSize)))
|
entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize)))
|
||||||
glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
|
glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
|
||||||
|
|
||||||
fh.dirtyPages.AddPage(req.Offset, data)
|
fh.dirtyPages.AddPage(req.Offset, data)
|
||||||
|
@ -258,24 +259,26 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
entry.Attr.Mime = fh.contentType
|
if entry.Attributes != nil {
|
||||||
if entry.Attr.Uid == 0 {
|
entry.Attributes.Mime = fh.contentType
|
||||||
entry.Attr.Uid = header.Uid
|
if entry.Attributes.Uid == 0 {
|
||||||
|
entry.Attributes.Uid = header.Uid
|
||||||
}
|
}
|
||||||
if entry.Attr.Gid == 0 {
|
if entry.Attributes.Gid == 0 {
|
||||||
entry.Attr.Gid = header.Gid
|
entry.Attributes.Gid = header.Gid
|
||||||
}
|
}
|
||||||
if entry.Attr.Crtime.IsZero() {
|
if entry.Attributes.Crtime == 0 {
|
||||||
entry.Attr.Crtime = time.Now()
|
entry.Attributes.Crtime = time.Now().Unix()
|
||||||
|
}
|
||||||
|
entry.Attributes.Mtime = time.Now().Unix()
|
||||||
|
entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
|
||||||
|
entry.Attributes.Collection = fh.dirtyPages.collection
|
||||||
|
entry.Attributes.Replication = fh.dirtyPages.replication
|
||||||
}
|
}
|
||||||
entry.Attr.Mtime = time.Now()
|
|
||||||
entry.Attr.Mode = entry.Attr.Mode &^ fh.f.wfs.option.Umask
|
|
||||||
entry.Attr.Collection = fh.dirtyPages.collection
|
|
||||||
entry.Attr.Replication = fh.dirtyPages.replication
|
|
||||||
|
|
||||||
request := &filer_pb.CreateEntryRequest{
|
request := &filer_pb.CreateEntryRequest{
|
||||||
Directory: fh.f.dir.FullPath(),
|
Directory: fh.f.dir.FullPath(),
|
||||||
Entry: entry.ToProtoEntry(),
|
Entry: entry,
|
||||||
Signatures: []int32{fh.f.wfs.signature},
|
Signatures: []int32{fh.f.wfs.signature},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -126,7 +126,7 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
|
||||||
f.Name = target.name
|
f.Name = target.name
|
||||||
entry := f.getEntry()
|
entry := f.getEntry()
|
||||||
if entry != nil {
|
if entry != nil {
|
||||||
entry.FullPath = newPath
|
entry.Name = f.Name
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
parent.disconnectChild(target)
|
parent.disconnectChild(target)
|
||||||
|
|
|
@ -131,7 +131,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
|
||||||
})
|
})
|
||||||
|
|
||||||
entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath))
|
entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath))
|
||||||
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: filer.FromPbEntry(wfs.option.FilerMountRootPath, entry)}
|
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry}
|
||||||
wfs.fsNodeCache = newFsCache(wfs.root)
|
wfs.fsNodeCache = newFsCache(wfs.root)
|
||||||
|
|
||||||
if wfs.option.ConcurrentWriters > 0 {
|
if wfs.option.ConcurrentWriters > 0 {
|
||||||
|
|
|
@ -2,7 +2,6 @@ package filesys
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
|
||||||
|
|
||||||
"github.com/seaweedfs/fuse"
|
"github.com/seaweedfs/fuse"
|
||||||
|
|
||||||
|
@ -11,7 +10,7 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getxattr(entry *filer.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
|
func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
|
||||||
|
|
||||||
if entry == nil {
|
if entry == nil {
|
||||||
return fuse.ErrNoXattr
|
return fuse.ErrNoXattr
|
||||||
|
@ -39,7 +38,7 @@ func getxattr(entry *filer.Entry, req *fuse.GetxattrRequest, resp *fuse.Getxattr
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func setxattr(entry *filer.Entry, req *fuse.SetxattrRequest) error {
|
func setxattr(entry *filer_pb.Entry, req *fuse.SetxattrRequest) error {
|
||||||
|
|
||||||
if entry == nil {
|
if entry == nil {
|
||||||
return fuse.EIO
|
return fuse.EIO
|
||||||
|
@ -62,7 +61,7 @@ func setxattr(entry *filer.Entry, req *fuse.SetxattrRequest) error {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func removexattr(entry *filer.Entry, req *fuse.RemovexattrRequest) error {
|
func removexattr(entry *filer_pb.Entry, req *fuse.RemovexattrRequest) error {
|
||||||
|
|
||||||
if entry == nil {
|
if entry == nil {
|
||||||
return fuse.ErrNoXattr
|
return fuse.ErrNoXattr
|
||||||
|
@ -84,7 +83,7 @@ func removexattr(entry *filer.Entry, req *fuse.RemovexattrRequest) error {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func listxattr(entry *filer.Entry, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
|
func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
|
||||||
|
|
||||||
if entry == nil {
|
if entry == nil {
|
||||||
return fuse.EIO
|
return fuse.EIO
|
||||||
|
@ -109,7 +108,7 @@ func listxattr(entry *filer.Entry, req *fuse.ListxattrRequest, resp *fuse.Listxa
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer.Entry, err error) {
|
func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) {
|
||||||
|
|
||||||
fullpath := util.NewFullPath(dir, name)
|
fullpath := util.NewFullPath(dir, name)
|
||||||
// glog.V(3).Infof("read entry cache miss %s", fullpath)
|
// glog.V(3).Infof("read entry cache miss %s", fullpath)
|
||||||
|
@ -120,5 +119,5 @@ func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer.Entry, err error)
|
||||||
if cacheErr == filer_pb.ErrNotFound {
|
if cacheErr == filer_pb.ErrNotFound {
|
||||||
return nil, fuse.ENOENT
|
return nil, fuse.ENOENT
|
||||||
}
|
}
|
||||||
return cachedEntry, cacheErr
|
return cachedEntry.ToProtoEntry(), cacheErr
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
|
||||||
var shouldRetry bool
|
var shouldRetry bool
|
||||||
|
|
||||||
for _, fileUrl := range fileUrls {
|
for _, fileUrl := range fileUrls {
|
||||||
shouldRetry, err = util.FastReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), false, chunk.Offset, int(chunk.Size), func(data []byte) {
|
shouldRetry, err = util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
|
||||||
writeErr = writeFunc(data)
|
writeErr = writeFunc(data)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -206,7 +206,6 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
|
||||||
isTruncated = isTruncated || subIsTruncated
|
isTruncated = isTruncated || subIsTruncated
|
||||||
maxKeys -= subCounter
|
maxKeys -= subCounter
|
||||||
nextMarker = subDir + "/" + subNextMarker
|
nextMarker = subDir + "/" + subNextMarker
|
||||||
counter += subCounter
|
|
||||||
// finished processing this sub directory
|
// finished processing this sub directory
|
||||||
marker = subDir
|
marker = subDir
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 32)
|
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 34)
|
||||||
COMMIT = ""
|
COMMIT = ""
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -1,117 +0,0 @@
|
||||||
package util
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"fmt"
|
|
||||||
"github.com/valyala/fasthttp"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
fastClient = &fasthttp.Client{
|
|
||||||
NoDefaultUserAgentHeader: true, // Don't send: User-Agent: fasthttp
|
|
||||||
MaxConnsPerHost: 1024,
|
|
||||||
ReadBufferSize: 4096, // Make sure to set this big enough that your whole request can be read at once.
|
|
||||||
WriteBufferSize: 64 * 1024, // Same but for your response.
|
|
||||||
ReadTimeout: time.Second,
|
|
||||||
WriteTimeout: time.Second,
|
|
||||||
MaxIdleConnDuration: time.Minute,
|
|
||||||
DisableHeaderNamesNormalizing: true, // If you set the case on your headers correctly you can enable this.
|
|
||||||
DialDualStack: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Put everything in pools to prevent garbage.
|
|
||||||
bytesPool = sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
b := make([]byte, 0)
|
|
||||||
return &b
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
responsePool = sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
return make(chan *fasthttp.Response)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
func FastGet(url string) ([]byte, bool, error) {
|
|
||||||
|
|
||||||
req := fasthttp.AcquireRequest()
|
|
||||||
res := fasthttp.AcquireResponse()
|
|
||||||
defer fasthttp.ReleaseRequest(req)
|
|
||||||
defer fasthttp.ReleaseResponse(res)
|
|
||||||
|
|
||||||
req.SetRequestURIBytes([]byte(url))
|
|
||||||
req.Header.Add("Accept-Encoding", "gzip")
|
|
||||||
|
|
||||||
err := fastClient.Do(req, res)
|
|
||||||
if err != nil {
|
|
||||||
return nil, true, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var data []byte
|
|
||||||
contentEncoding := res.Header.Peek("Content-Encoding")
|
|
||||||
if bytes.Compare(contentEncoding, []byte("gzip")) == 0 {
|
|
||||||
data, err = res.BodyGunzip()
|
|
||||||
} else {
|
|
||||||
data = res.Body()
|
|
||||||
}
|
|
||||||
|
|
||||||
out := make([]byte, len(data))
|
|
||||||
copy(out, data)
|
|
||||||
|
|
||||||
if res.StatusCode() >= 400 {
|
|
||||||
retryable := res.StatusCode() >= 500
|
|
||||||
return nil, retryable, fmt.Errorf("%s: %d", url, res.StatusCode())
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, false, err
|
|
||||||
}
|
|
||||||
return out, false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func FastReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, isCheck bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
|
|
||||||
|
|
||||||
if cipherKey != nil {
|
|
||||||
return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
|
|
||||||
}
|
|
||||||
req := fasthttp.AcquireRequest()
|
|
||||||
res := fasthttp.AcquireResponse()
|
|
||||||
defer fasthttp.ReleaseRequest(req)
|
|
||||||
defer fasthttp.ReleaseResponse(res)
|
|
||||||
|
|
||||||
req.SetRequestURIBytes([]byte(fileUrl))
|
|
||||||
|
|
||||||
if isCheck {
|
|
||||||
req.Header.Add("Range", "bytes=0-1")
|
|
||||||
} else if isFullChunk {
|
|
||||||
req.Header.Add("Accept-Encoding", "gzip")
|
|
||||||
} else {
|
|
||||||
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = fastClient.Do(req, res); err != nil {
|
|
||||||
return true, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if res.StatusCode() >= 400 {
|
|
||||||
retryable = res.StatusCode() >= 500
|
|
||||||
return retryable, fmt.Errorf("%s: %d", fileUrl, res.StatusCode())
|
|
||||||
}
|
|
||||||
|
|
||||||
contentEncoding := res.Header.Peek("Content-Encoding")
|
|
||||||
if bytes.Compare(contentEncoding, []byte("gzip")) == 0 {
|
|
||||||
bodyData, err := res.BodyGunzip()
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
fn(bodyData)
|
|
||||||
} else {
|
|
||||||
fn(res.Body())
|
|
||||||
}
|
|
||||||
|
|
||||||
return false, nil
|
|
||||||
|
|
||||||
}
|
|
|
@ -313,7 +313,7 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
|
||||||
}
|
}
|
||||||
|
|
||||||
func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
|
func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
|
||||||
encryptedData, retryable, err := FastGet(fileUrl)
|
encryptedData, retryable, err := Get(fileUrl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
|
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue