diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index a0af942e0..4db48e386 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -13,6 +13,7 @@ import ( "github.com/karlseguin/ccache" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" ) @@ -24,19 +25,19 @@ var ( ) type Filer struct { - store *FilerStoreWrapper - directoryCache *ccache.Cache - MasterClient *wdclient.MasterClient - fileIdDeletionChan chan string - GrpcDialOption grpc.DialOption + store *FilerStoreWrapper + directoryCache *ccache.Cache + MasterClient *wdclient.MasterClient + fileIdDeletionQueue *util.UnboundedQueue + GrpcDialOption grpc.DialOption } func NewFiler(masters []string, grpcDialOption grpc.DialOption) *Filer { f := &Filer{ - directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), - MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters), - fileIdDeletionChan: make(chan string, PaginationSize), - GrpcDialOption: grpcDialOption, + directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), + MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters), + fileIdDeletionQueue: util.NewUnboundedQueue(), + GrpcDialOption: grpcDialOption, } go f.loopProcessingDeletion() diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go index 9937685f7..3a64f636e 100644 --- a/weed/filer2/filer_deletion.go +++ b/weed/filer2/filer_deletion.go @@ -10,8 +10,6 @@ import ( func (f *Filer) loopProcessingDeletion() { - ticker := time.NewTicker(5 * time.Second) - lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { m := make(map[string]operation.LookupResult) for _, vid := range vids { @@ -31,36 +29,35 @@ func (f *Filer) loopProcessingDeletion() { return m, nil } - var fileIds []string + var deletionCount int for { - select { - case fid := <-f.fileIdDeletionChan: - fileIds = append(fileIds, fid) - if len(fileIds) >= 4096 { - glog.V(1).Infof("deleting fileIds len=%d", len(fileIds)) - operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) - fileIds = fileIds[:0] - } - case <-ticker.C: - if len(fileIds) > 0 { - glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds)) - operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) - fileIds = fileIds[:0] + deletionCount = 0 + f.fileIdDeletionQueue.Consume(func(fileIds []string) { + deletionCount = len(fileIds) + _, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) + if err != nil { + glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err) + } else { + glog.V(1).Infof("deleting fileIds len=%d", deletionCount) } + }) + + if deletionCount == 0 { + time.Sleep(1123 * time.Millisecond) } } } func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { for _, chunk := range chunks { - f.fileIdDeletionChan <- chunk.GetFileIdString() + f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) } } // DeleteFileByFileId direct delete by file id. // Only used when the fileId is not being managed by snapshots. func (f *Filer) DeleteFileByFileId(fileId string) { - f.fileIdDeletionChan <- fileId + f.fileIdDeletionQueue.EnQueue(fileId) } func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { diff --git a/weed/util/queue_unbounded.go b/weed/util/queue_unbounded.go new file mode 100644 index 000000000..664cd965e --- /dev/null +++ b/weed/util/queue_unbounded.go @@ -0,0 +1,45 @@ +package util + +import "sync" + +type UnboundedQueue struct { + outbound []string + outboundLock sync.RWMutex + inbound []string + inboundLock sync.RWMutex +} + +func NewUnboundedQueue() *UnboundedQueue { + q := &UnboundedQueue{} + return q +} + +func (q *UnboundedQueue) EnQueue(items ...string) { + q.inboundLock.Lock() + defer q.inboundLock.Unlock() + + q.outbound = append(q.outbound, items...) + +} + +func (q *UnboundedQueue) Consume(fn func([]string)) { + q.outboundLock.Lock() + defer q.outboundLock.Unlock() + + if len(q.outbound) == 0 { + q.inboundLock.Lock() + inbountLen := len(q.inbound) + if inbountLen > 0 { + t := q.outbound + q.outbound = q.inbound + q.inbound = t + } + q.inboundLock.Unlock() + } + + if len(q.outbound) > 0 { + fn(q.outbound) + q.outbound = q.outbound[:0] + } + +} diff --git a/weed/util/queue_unbounded_test.go b/weed/util/queue_unbounded_test.go new file mode 100644 index 000000000..2d02032cb --- /dev/null +++ b/weed/util/queue_unbounded_test.go @@ -0,0 +1,25 @@ +package util + +import "testing" + +func TestEnqueueAndConsume(t *testing.T) { + + q := NewUnboundedQueue() + + q.EnQueue("1", "2", "3") + + f := func(items []string) { + for _, t := range items { + println(t) + } + println("-----------------------") + } + q.Consume(f) + + q.Consume(f) + + q.EnQueue("4", "5") + q.EnQueue("6", "7") + q.Consume(f) + +}