mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
cleanup
This commit is contained in:
parent
9655dc9ca9
commit
4fcfc9410f
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
)
|
)
|
||||||
|
@ -46,6 +47,8 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cleanupIntervals(visibles)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,6 +92,8 @@ func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cleanupIntervals(visibles)
|
||||||
|
|
||||||
return views
|
return views
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -102,6 +107,12 @@ func logPrintf(name string, visibles []*visibleInterval) {
|
||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var bufPool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return new(visibleInterval)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
func mergeIntoVisibles(visibles []*visibleInterval, chunk *filer_pb.FileChunk) (newVisibles []*visibleInterval) {
|
func mergeIntoVisibles(visibles []*visibleInterval, chunk *filer_pb.FileChunk) (newVisibles []*visibleInterval) {
|
||||||
for _, v := range visibles {
|
for _, v := range visibles {
|
||||||
if v.start < chunk.Offset && chunk.Offset < v.stop {
|
if v.start < chunk.Offset && chunk.Offset < v.stop {
|
||||||
|
@ -153,6 +164,12 @@ func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*v
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func cleanupIntervals(visibles []*visibleInterval) {
|
||||||
|
for _, v := range visibles {
|
||||||
|
bufPool.Put(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// find non-overlapping visible intervals
|
// find non-overlapping visible intervals
|
||||||
// visible interval map to one file chunk
|
// visible interval map to one file chunk
|
||||||
|
|
||||||
|
@ -164,7 +181,12 @@ type visibleInterval struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64) *visibleInterval {
|
func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64) *visibleInterval {
|
||||||
return &visibleInterval{start: start, stop: stop, fileId: fileId, modifiedTime: modifiedTime}
|
b := bufPool.Get().(*visibleInterval)
|
||||||
|
b.start = start
|
||||||
|
b.stop = stop
|
||||||
|
b.fileId = fileId
|
||||||
|
b.modifiedTime = modifiedTime
|
||||||
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
func min(x, y int64) int64 {
|
func min(x, y int64) int64 {
|
||||||
|
|
|
@ -161,6 +161,9 @@ func TestIntervalMerging(t *testing.T) {
|
||||||
if len(intervals) != len(testcase.Expected) {
|
if len(intervals) != len(testcase.Expected) {
|
||||||
t.Fatalf("failed to compact test case %d, len %d expected %d", i, len(intervals), len(testcase.Expected))
|
t.Fatalf("failed to compact test case %d, len %d expected %d", i, len(intervals), len(testcase.Expected))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cleanupIntervals(intervals)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package filer2
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -13,7 +14,7 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||||
"github.com/karlseguin/ccache"
|
"github.com/karlseguin/ccache"
|
||||||
"math"
|
"github.com/chrislusf/seaweedfs/weed/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Filer struct {
|
type Filer struct {
|
||||||
|
@ -240,7 +241,7 @@ func (f *Filer) DeleteFileByFileId(fileId string) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("can not find file %s: %v", fileId, err)
|
glog.V(0).Infof("can not find file %s: %v", fileId, err)
|
||||||
}
|
}
|
||||||
if _, err := operation.DeleteFilesAtOneVolumeServer(volumeServer, []string{fileId}); err != nil {
|
if _, err := operation.DeleteFilesAtOneVolumeServer(volumeServer, []string{fileId}); err != nil && err != storage.NotFound{
|
||||||
glog.V(0).Infof("deleting file %s: %v", fileId, err)
|
glog.V(0).Infof("deleting file %s: %v", fileId, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -188,8 +188,8 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
|
||||||
|
|
||||||
chunk, err := fh.dirtyPages.FlushToStorage(ctx)
|
chunk, err := fh.dirtyPages.FlushToStorage(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
glog.Errorf("flush %s/%s: %v", fh.f.dir.Path, fh.f.Name, err)
|
||||||
return fmt.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
return fmt.Errorf("flush %s/%s: %v", fh.f.dir.Path, fh.f.Name, err)
|
||||||
}
|
}
|
||||||
if chunk != nil {
|
if chunk != nil {
|
||||||
fh.f.entry.Chunks = append(fh.f.entry.Chunks, chunk)
|
fh.f.entry.Chunks = append(fh.f.entry.Chunks, chunk)
|
||||||
|
|
|
@ -105,7 +105,7 @@ func DeleteFilesAtOneVolumeServer(volumeServer string, fileIds []string) (ret []
|
||||||
|
|
||||||
resp, err := volumeServerClient.BatchDelete(context.Background(), req)
|
resp, err := volumeServerClient.BatchDelete(context.Background(), req)
|
||||||
|
|
||||||
fmt.Printf("deleted %v %v: %v\n", fileIds, err, resp)
|
// fmt.Printf("deleted %v %v: %v\n", fileIds, err, resp)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -121,7 +121,7 @@ func DeleteFilesAtOneVolumeServer(volumeServer string, fileIds []string) (ret []
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, result := range ret {
|
for _, result := range ret {
|
||||||
if result.Error != "" {
|
if result.Error != "" && result.Error != "Not Found" {
|
||||||
return nil, fmt.Errorf("delete fileId %s: %v", result.FileId, result.Error)
|
return nil, fmt.Errorf("delete fileId %s: %v", result.FileId, result.Error)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
|
||||||
before := topo.GetMaxVolumeId()
|
before := topo.GetMaxVolumeId()
|
||||||
topo.UpAdjustMaxVolumeId(c.MaxVolumeId)
|
topo.UpAdjustMaxVolumeId(c.MaxVolumeId)
|
||||||
|
|
||||||
glog.V(0).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId())
|
glog.V(1).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId())
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue