diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml index fd08a4232..024e3d32d 100644 --- a/k8s/seaweedfs/Chart.yaml +++ b/k8s/seaweedfs/Chart.yaml @@ -1,5 +1,5 @@ apiVersion: v1 description: SeaweedFS name: seaweedfs -appVersion: "2.32" -version: 2.32 +appVersion: "2.34" +version: 2.34 diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index e06d47ec4..2f4448026 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - # imageTag: "2.32" - started using {.Chart.appVersion} + # imageTag: "2.34" - started using {.Chart.appVersion} imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index af0793c70..4fedb55f1 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -305,7 +305,7 @@ func readFiles(fileIdLineChan chan string, s *stat) { } var bytes []byte for _, url := range urls { - bytes, _, err = util.FastGet(url) + bytes, _, err = util.Get(url) if err == nil { break } diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index c4f989394..da530257a 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -102,7 +102,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { for _, urlString := range urlStrings { - shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, isCheck, offset, size, func(data []byte) { + shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { buffer.Write(data) }) if !shouldRetry { diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index 542754e3b..68f308a51 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -23,10 +23,6 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) { return } -func FileSize2(entry *Entry) (size uint64) { - return maxUint64(TotalSize(entry.Chunks), entry.Attr.FileSize) -} - func FileSize(entry *filer_pb.Entry) (size uint64) { return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize) } diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go index 1a04c6503..546af8094 100644 --- a/weed/filer/read_write.go +++ b/weed/filer/read_write.go @@ -35,7 +35,7 @@ func ReadContent(filerAddress string, dir, name string) ([]byte, error) { target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name) - data, _, err := util.FastGet(target) + data, _, err := util.Get(target) return data, err } diff --git a/weed/filer/stream.go b/weed/filer/stream.go index f53563aa2..2584743c8 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -61,6 +61,7 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c glog.Errorf("read chunk: %v", err) return fmt.Errorf("read chunk: %v", err) } + _, err = w.Write(data) if err != nil { glog.Errorf("write chunk: %v", err) @@ -221,7 +222,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { var buffer bytes.Buffer var shouldRetry bool for _, urlString := range urlStrings { - shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), false, chunkView.Offset, int(chunkView.Size), func(data []byte) { + shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { buffer.Write(data) }) if !shouldRetry { diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 6168425af..33e1a0a3a 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -1,6 +1,7 @@ package filesys import ( + "bytes" "context" "math" "os" @@ -21,7 +22,7 @@ import ( type Dir struct { name string wfs *WFS - entry *filer.Entry + entry *filer_pb.Entry parent *Dir } @@ -58,11 +59,11 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { } // attr.Inode = util.FullPath(dir.FullPath()).AsInode() - attr.Mode = dir.entry.Attr.Mode | os.ModeDir - attr.Mtime = dir.entry.Attr.Mtime - attr.Crtime = dir.entry.Attr.Crtime - attr.Gid = dir.entry.Attr.Gid - attr.Uid = dir.entry.Attr.Uid + attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir + attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0) + attr.Crtime = time.Unix(dir.entry.Attributes.Crtime, 0) + attr.Gid = dir.entry.Attributes.Gid + attr.Uid = dir.entry.Attributes.Uid glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) @@ -102,13 +103,12 @@ func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { } func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { - dirPath := dir.FullPath() - f := dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dirPath, name), func() fs.Node { + f := dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node { return &File{ Name: name, dir: dir, wfs: dir.wfs, - entry: filer.FromPbEntry(dirPath, entry), + entry: entry, entryViewCache: nil, } }) @@ -119,7 +119,7 @@ func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node { d := dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node { - return &Dir{name: entry.Name, wfs: dir.wfs, entry: filer.FromPbEntry(dir.FullPath(), entry), parent: dir} + return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir} }) d.(*Dir).parent = dir // in case dir node was created later return d @@ -436,19 +436,19 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus } if req.Valid.Mode() { - dir.entry.Attr.Mode = req.Mode + dir.entry.Attributes.FileMode = uint32(req.Mode) } if req.Valid.Uid() { - dir.entry.Attr.Uid = req.Uid + dir.entry.Attributes.Uid = req.Uid } if req.Valid.Gid() { - dir.entry.Attr.Gid = req.Gid + dir.entry.Attributes.Gid = req.Gid } if req.Valid.Mtime() { - dir.entry.Attr.Mtime = req.Mtime + dir.entry.Attributes.Mtime = req.Mtime.Unix() } return dir.saveEntry() @@ -527,14 +527,12 @@ func (dir *Dir) saveEntry() error { return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - pbEntry := dir.entry.ToProtoEntry() - - dir.wfs.mapPbIdFromLocalToFiler(pbEntry) - defer dir.wfs.mapPbIdFromFilerToLocal(pbEntry) + dir.wfs.mapPbIdFromLocalToFiler(dir.entry) + defer dir.wfs.mapPbIdFromFilerToLocal(dir.entry) request := &filer_pb.UpdateEntryRequest{ Directory: parentDir, - Entry: pbEntry, + Entry: dir.entry, Signatures: []int32{dir.wfs.signature}, } @@ -552,5 +550,25 @@ func (dir *Dir) saveEntry() error { } func (dir *Dir) FullPath() string { - return string(dir.entry.FullPath) + var parts []string + for p := dir; p != nil; p = p.parent { + if strings.HasPrefix(p.name, "/") { + if len(p.name) > 1 { + parts = append(parts, p.name[1:]) + } + } else { + parts = append(parts, p.name) + } + } + + if len(parts) == 0 { + return "/" + } + + var buf bytes.Buffer + for i := len(parts) - 1; i >= 0; i-- { + buf.WriteString("/") + buf.WriteString(parts[i]) + } + return buf.String() } diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index 010d0141a..606e52fcb 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -48,7 +48,7 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f oldEntry.HardLinkCounter++ updateOldEntryRequest := &filer_pb.UpdateEntryRequest{ Directory: oldFile.dir.FullPath(), - Entry: oldEntry.ToProtoEntry(), + Entry: oldEntry, Signatures: []int32{dir.wfs.signature}, } @@ -58,7 +58,7 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f Entry: &filer_pb.Entry{ Name: req.NewName, IsDirectory: false, - Attributes: filer.EntryAttributeToPb(oldEntry), + Attributes: oldEntry.Attributes, Chunks: oldEntry.Chunks, Extended: oldEntry.Extended, HardLinkId: oldEntry.HardLinkId, @@ -152,12 +152,12 @@ func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (stri return "", err } - if entry.Attr.Mode&os.ModeSymlink == 0 { + if os.FileMode(entry.Attributes.FileMode)&os.ModeSymlink == 0 { return "", fuse.Errno(syscall.EINVAL) } - glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, entry.Attr.SymlinkTarget) + glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, entry.Attributes.SymlinkTarget) - return entry.Attr.SymlinkTarget, nil + return entry.Attributes.SymlinkTarget, nil } diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index b190b8473..8888cff96 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -74,7 +74,7 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD return false } - fileSize := int64(entry.Attr.FileSize) + fileSize := int64(entry.Attributes.FileSize) chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) if chunkSize == 0 { diff --git a/weed/filesys/file.go b/weed/filesys/file.go index db8062ceb..b73cd895b 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -33,7 +33,7 @@ type File struct { Name string dir *Dir wfs *WFS - entry *filer.Entry + entry *filer_pb.Entry entryLock sync.RWMutex entryViewCache []filer.VisibleInterval isOpen int @@ -56,18 +56,22 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) { } } + if entry == nil { + return fuse.ENOENT + } + // attr.Inode = file.fullpath().AsInode() attr.Valid = time.Second - attr.Mode = os.FileMode(entry.Attr.Mode) - attr.Size = filer.FileSize2(entry) + attr.Mode = os.FileMode(entry.Attributes.FileMode) + attr.Size = filer.FileSize(entry) if file.isOpen > 0 { - attr.Size = entry.Attr.FileSize + attr.Size = entry.Attributes.FileSize glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size) } - attr.Crtime = entry.Attr.Crtime - attr.Mtime = entry.Attr.Mtime - attr.Gid = entry.Attr.Gid - attr.Uid = entry.Attr.Uid + attr.Crtime = time.Unix(entry.Attributes.Crtime, 0) + attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) + attr.Gid = entry.Attributes.Gid + attr.Uid = entry.Attributes.Uid attr.Blocks = attr.Size/blockSize + 1 attr.BlockSize = uint32(file.wfs.option.ChunkSizeLimit) if entry.HardLinkCounter > 0 { @@ -126,7 +130,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f if req.Valid.Size() { glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(entry.Chunks)) - if req.Size < filer.FileSize2(entry) { + if req.Size < filer.FileSize(entry) { // fmt.Printf("truncate %v \n", fullPath) var chunks []*filer_pb.FileChunk var truncatedChunks []*filer_pb.FileChunk @@ -149,32 +153,32 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks) file.reader = nil } - entry.Attr.FileSize = req.Size + entry.Attributes.FileSize = req.Size file.dirtyMetadata = true } if req.Valid.Mode() { - entry.Attr.Mode = req.Mode + entry.Attributes.FileMode = uint32(req.Mode) file.dirtyMetadata = true } if req.Valid.Uid() { - entry.Attr.Uid = req.Uid + entry.Attributes.Uid = req.Uid file.dirtyMetadata = true } if req.Valid.Gid() { - entry.Attr.Gid = req.Gid + entry.Attributes.Gid = req.Gid file.dirtyMetadata = true } if req.Valid.Crtime() { - entry.Attr.Crtime = req.Crtime + entry.Attributes.Crtime = req.Crtime.Unix() file.dirtyMetadata = true } if req.Valid.Mtime() { - entry.Attr.Mtime = req.Mtime + entry.Attributes.Mtime = req.Mtime.Unix() file.dirtyMetadata = true } @@ -259,7 +263,7 @@ func (file *File) Forget() { file.wfs.fsNodeCache.DeleteFsNode(t) } -func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer.Entry, err error) { +func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) { entry = file.getEntry() if file.isOpen > 0 { return entry, nil @@ -330,7 +334,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { entry.Chunks = append(entry.Chunks, newChunks...) } -func (file *File) setEntry(entry *filer.Entry) { +func (file *File) setEntry(entry *filer_pb.Entry) { file.entryLock.Lock() defer file.entryLock.Unlock() file.entry = entry @@ -346,17 +350,15 @@ func (file *File) clearEntry() { file.reader = nil } -func (file *File) saveEntry(entry *filer.Entry) error { +func (file *File) saveEntry(entry *filer_pb.Entry) error { return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - pbEntry := entry.ToProtoEntry() - - file.wfs.mapPbIdFromLocalToFiler(pbEntry) - defer file.wfs.mapPbIdFromFilerToLocal(pbEntry) + file.wfs.mapPbIdFromLocalToFiler(entry) + defer file.wfs.mapPbIdFromFilerToLocal(entry) request := &filer_pb.UpdateEntryRequest{ Directory: file.dir.FullPath(), - Entry: pbEntry, + Entry: entry, Signatures: []int32{file.wfs.signature}, } @@ -373,7 +375,7 @@ func (file *File) saveEntry(entry *filer.Entry) error { }) } -func (file *File) getEntry() *filer.Entry { +func (file *File) getEntry() *filer_pb.Entry { file.entryLock.RLock() defer file.entryLock.RUnlock() return file.entry diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index be214c02d..25eaf7033 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -6,6 +6,7 @@ import ( "io" "math" "net/http" + "os" "sync" "time" @@ -41,7 +42,7 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle { } entry := fh.f.getEntry() if entry != nil { - entry.Attr.FileSize = filer.FileSize2(entry) + entry.Attributes.FileSize = filer.FileSize(entry) } return fh @@ -109,7 +110,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { return 0, io.EOF } - fileSize := int64(filer.FileSize2(entry)) + fileSize := int64(filer.FileSize(entry)) fileFullPath := fh.f.fullpath() if fileSize == 0 { @@ -170,7 +171,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f } entry.Content = nil - entry.Attr.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attr.FileSize))) + entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize))) glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) fh.dirtyPages.AddPage(req.Offset, data) @@ -258,24 +259,26 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { return nil } - entry.Attr.Mime = fh.contentType - if entry.Attr.Uid == 0 { - entry.Attr.Uid = header.Uid + if entry.Attributes != nil { + entry.Attributes.Mime = fh.contentType + if entry.Attributes.Uid == 0 { + entry.Attributes.Uid = header.Uid + } + if entry.Attributes.Gid == 0 { + entry.Attributes.Gid = header.Gid + } + if entry.Attributes.Crtime == 0 { + entry.Attributes.Crtime = time.Now().Unix() + } + entry.Attributes.Mtime = time.Now().Unix() + entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask) + entry.Attributes.Collection = fh.dirtyPages.collection + entry.Attributes.Replication = fh.dirtyPages.replication } - if entry.Attr.Gid == 0 { - entry.Attr.Gid = header.Gid - } - if entry.Attr.Crtime.IsZero() { - entry.Attr.Crtime = time.Now() - } - entry.Attr.Mtime = time.Now() - entry.Attr.Mode = entry.Attr.Mode &^ fh.f.wfs.option.Umask - entry.Attr.Collection = fh.dirtyPages.collection - entry.Attr.Replication = fh.dirtyPages.replication request := &filer_pb.CreateEntryRequest{ Directory: fh.f.dir.FullPath(), - Entry: entry.ToProtoEntry(), + Entry: entry, Signatures: []int32{fh.f.wfs.signature}, } diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go index 0740d607f..6b1012090 100644 --- a/weed/filesys/fscache.go +++ b/weed/filesys/fscache.go @@ -126,7 +126,7 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { f.Name = target.name entry := f.getEntry() if entry != nil { - entry.FullPath = newPath + entry.Name = f.Name } } parent.disconnectChild(target) diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 7c3907326..c6d9080a1 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -131,7 +131,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { }) entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath)) - wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: filer.FromPbEntry(wfs.option.FilerMountRootPath, entry)} + wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry} wfs.fsNodeCache = newFsCache(wfs.root) if wfs.option.ConcurrentWriters > 0 { diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go index a50d87ba7..92e43b675 100644 --- a/weed/filesys/xattr.go +++ b/weed/filesys/xattr.go @@ -2,7 +2,6 @@ package filesys import ( "context" - "github.com/chrislusf/seaweedfs/weed/filer" "github.com/seaweedfs/fuse" @@ -11,7 +10,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func getxattr(entry *filer.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { +func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { if entry == nil { return fuse.ErrNoXattr @@ -39,7 +38,7 @@ func getxattr(entry *filer.Entry, req *fuse.GetxattrRequest, resp *fuse.Getxattr } -func setxattr(entry *filer.Entry, req *fuse.SetxattrRequest) error { +func setxattr(entry *filer_pb.Entry, req *fuse.SetxattrRequest) error { if entry == nil { return fuse.EIO @@ -62,7 +61,7 @@ func setxattr(entry *filer.Entry, req *fuse.SetxattrRequest) error { } -func removexattr(entry *filer.Entry, req *fuse.RemovexattrRequest) error { +func removexattr(entry *filer_pb.Entry, req *fuse.RemovexattrRequest) error { if entry == nil { return fuse.ErrNoXattr @@ -84,7 +83,7 @@ func removexattr(entry *filer.Entry, req *fuse.RemovexattrRequest) error { } -func listxattr(entry *filer.Entry, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { +func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { if entry == nil { return fuse.EIO @@ -109,7 +108,7 @@ func listxattr(entry *filer.Entry, req *fuse.ListxattrRequest, resp *fuse.Listxa } -func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer.Entry, err error) { +func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) { fullpath := util.NewFullPath(dir, name) // glog.V(3).Infof("read entry cache miss %s", fullpath) @@ -120,5 +119,5 @@ func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer.Entry, err error) if cacheErr == filer_pb.ErrNotFound { return nil, fuse.ENOENT } - return cachedEntry, cacheErr + return cachedEntry.ToProtoEntry(), cacheErr } diff --git a/weed/replication/repl_util/replication_util.go b/weed/replication/repl_util/replication_util.go index 23fbe3292..519a9a201 100644 --- a/weed/replication/repl_util/replication_util.go +++ b/weed/replication/repl_util/replication_util.go @@ -20,7 +20,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer var shouldRetry bool for _, fileUrl := range fileUrls { - shouldRetry, err = util.FastReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), false, chunk.Offset, int(chunk.Size), func(data []byte) { + shouldRetry, err = util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { writeErr = writeFunc(data) }) if err != nil { diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index 2d36c6ec9..739cdd8f9 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -206,7 +206,6 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d isTruncated = isTruncated || subIsTruncated maxKeys -= subCounter nextMarker = subDir + "/" + subNextMarker - counter += subCounter // finished processing this sub directory marker = subDir } diff --git a/weed/util/constants.go b/weed/util/constants.go index fdb37d7f6..a7d8cc0bf 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 32) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 34) COMMIT = "" ) diff --git a/weed/util/fasthttp_util.go b/weed/util/fasthttp_util.go deleted file mode 100644 index 02c78e79d..000000000 --- a/weed/util/fasthttp_util.go +++ /dev/null @@ -1,117 +0,0 @@ -package util - -import ( - "bytes" - "fmt" - "github.com/valyala/fasthttp" - "sync" - "time" -) - -var ( - fastClient = &fasthttp.Client{ - NoDefaultUserAgentHeader: true, // Don't send: User-Agent: fasthttp - MaxConnsPerHost: 1024, - ReadBufferSize: 4096, // Make sure to set this big enough that your whole request can be read at once. - WriteBufferSize: 64 * 1024, // Same but for your response. - ReadTimeout: time.Second, - WriteTimeout: time.Second, - MaxIdleConnDuration: time.Minute, - DisableHeaderNamesNormalizing: true, // If you set the case on your headers correctly you can enable this. - DialDualStack: true, - } - - // Put everything in pools to prevent garbage. - bytesPool = sync.Pool{ - New: func() interface{} { - b := make([]byte, 0) - return &b - }, - } - - responsePool = sync.Pool{ - New: func() interface{} { - return make(chan *fasthttp.Response) - }, - } -) - -func FastGet(url string) ([]byte, bool, error) { - - req := fasthttp.AcquireRequest() - res := fasthttp.AcquireResponse() - defer fasthttp.ReleaseRequest(req) - defer fasthttp.ReleaseResponse(res) - - req.SetRequestURIBytes([]byte(url)) - req.Header.Add("Accept-Encoding", "gzip") - - err := fastClient.Do(req, res) - if err != nil { - return nil, true, err - } - - var data []byte - contentEncoding := res.Header.Peek("Content-Encoding") - if bytes.Compare(contentEncoding, []byte("gzip")) == 0 { - data, err = res.BodyGunzip() - } else { - data = res.Body() - } - - out := make([]byte, len(data)) - copy(out, data) - - if res.StatusCode() >= 400 { - retryable := res.StatusCode() >= 500 - return nil, retryable, fmt.Errorf("%s: %d", url, res.StatusCode()) - } - if err != nil { - return nil, false, err - } - return out, false, nil -} - -func FastReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, isCheck bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { - - if cipherKey != nil { - return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) - } - req := fasthttp.AcquireRequest() - res := fasthttp.AcquireResponse() - defer fasthttp.ReleaseRequest(req) - defer fasthttp.ReleaseResponse(res) - - req.SetRequestURIBytes([]byte(fileUrl)) - - if isCheck { - req.Header.Add("Range", "bytes=0-1") - } else if isFullChunk { - req.Header.Add("Accept-Encoding", "gzip") - } else { - req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) - } - - if err = fastClient.Do(req, res); err != nil { - return true, err - } - - if res.StatusCode() >= 400 { - retryable = res.StatusCode() >= 500 - return retryable, fmt.Errorf("%s: %d", fileUrl, res.StatusCode()) - } - - contentEncoding := res.Header.Peek("Content-Encoding") - if bytes.Compare(contentEncoding, []byte("gzip")) == 0 { - bodyData, err := res.BodyGunzip() - if err != nil { - return false, err - } - fn(bodyData) - } else { - fn(res.Body()) - } - - return false, nil - -} diff --git a/weed/util/http_util.go b/weed/util/http_util.go index eff282bab..135d10c45 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -313,7 +313,7 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is } func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) { - encryptedData, retryable, err := FastGet(fileUrl) + encryptedData, retryable, err := Get(fileUrl) if err != nil { return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err) }