diff --git a/weed/command/backup.go b/weed/command/backup.go index 58eda638e..bbb6c6724 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -137,7 +137,7 @@ func runBackup(cmd *Command, args []string) bool { if datSize > stats.TailOffset { // remove the old data - v.Destroy() + v.Destroy(false) // recreate an empty volume v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0) if err != nil { diff --git a/weed/shell/command_volume_delete_empty.go b/weed/shell/command_volume_delete_empty.go index f02cf87d9..bb0a194e0 100644 --- a/weed/shell/command_volume_delete_empty.go +++ b/weed/shell/command_volume_delete_empty.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "io" "log" "time" @@ -57,7 +58,7 @@ func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, wri eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { - if v.Size <= 8 && v.ModifiedAtSecond > 0 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { + if v.Size <= super_block.SuperBlockSize && v.ModifiedAtSecond > 0 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { if *applyBalancing { log.Printf("deleting empty volume %d from %s", v.Id, dn.Id) if deleteErr := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 0629acff7..2ee1548a2 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -245,7 +245,7 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er wg.Add(2) go func() { for _, v := range delVolsMap { - if err := v.Destroy(); err != nil { + if err := v.Destroy(false); err != nil { errChain <- err } } @@ -276,12 +276,12 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er return } -func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e error) { +func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId, onlyEmpty bool) (found bool, e error) { v, ok := l.volumes[vid] if !ok { return } - e = v.Destroy() + e = v.Destroy(onlyEmpty) if e != nil { return } @@ -299,7 +299,7 @@ func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKi var ErrVolumeNotFound = fmt.Errorf("volume not found") -func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error { +func (l *DiskLocation) DeleteVolume(vid needle.VolumeId, onlyEmpty bool) error { l.volumesLock.Lock() defer l.volumesLock.Unlock() @@ -307,7 +307,7 @@ func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error { if !ok { return ErrVolumeNotFound } - _, err := l.deleteVolumeById(vid) + _, err := l.deleteVolumeById(vid, onlyEmpty) return err } diff --git a/weed/storage/store.go b/weed/storage/store.go index 43d4595bf..c7904c5c2 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -320,7 +320,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { // delete expired volumes. location.volumesLock.Lock() for _, vid := range deleteVids { - found, err := location.deleteVolumeById(vid) + found, err := location.deleteVolumeById(vid, false) if err == nil { if found { glog.V(0).Infof("volume %d is deleted", vid) @@ -512,9 +512,6 @@ func (s *Store) DeleteVolume(i needle.VolumeId, onlyEmpty bool) error { if v == nil { return fmt.Errorf("delete volume %d not found on disk", i) } - if onlyEmpty && !v.IsEmpty() { - return fmt.Errorf("delete volume %d not empty", i) - } message := master_pb.VolumeShortInformationMessage{ Id: uint32(v.Id), Collection: v.Collection, @@ -524,13 +521,15 @@ func (s *Store) DeleteVolume(i needle.VolumeId, onlyEmpty bool) error { DiskType: string(v.location.DiskType), } for _, location := range s.Locations { - err := location.DeleteVolume(i) + err := location.DeleteVolume(i, onlyEmpty) if err == nil { glog.V(0).Infof("DeleteVolume %d", i) s.DeletedVolumesChan <- message return nil } else if err == ErrVolumeNotFound { continue + } else if err == ErrVolumeNotEmpty { + return fmt.Errorf("DeleteVolume %d: %v", i, err) } else { glog.Errorf("DeleteVolume %d: %v", i, err) } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 79b0059d6..81c466f8f 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -133,6 +133,25 @@ func (v *Volume) ContentSize() uint64 { return v.nm.ContentSize() } +func (v *Volume) doIsEmpty() (bool, error) { + if v.DataBackend != nil { + datFileSize, _, e := v.DataBackend.GetStat() + if e != nil { + glog.V(0).Infof("Failed to read file size %s %v", v.DataBackend.Name(), e) + return false, e + } + if datFileSize > super_block.SuperBlockSize { + return false, nil + } + } + if v.nm != nil { + if v.nm.ContentSize() > 0 { + return false, nil + } + } + return true, nil +} + func (v *Volume) DeletedSize() uint64 { v.dataFileAccessLock.RLock() defer v.dataFileAccessLock.RUnlock() @@ -202,6 +221,10 @@ func (v *Volume) Close() { v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() + v.doClose() +} + +func (v *Volume) doClose() { for v.isCommitCompacting { time.Sleep(521 * time.Millisecond) glog.Warningf("Volume Close wait for compaction %d", v.Id) @@ -332,7 +355,3 @@ func (v *Volume) IsReadOnly() bool { defer v.noWriteLock.RUnlock() return v.noWriteOrDelete || v.noWriteCanDelete || v.location.isDiskSpaceLow } -func (v *Volume) IsEmpty() bool { - datSize, _, _ := v.FileStat() - return datSize <= super_block.SuperBlockSize && v.ContentSize() == 0 -} diff --git a/weed/storage/volume_write.go b/weed/storage/volume_write.go index edee8465c..ae607e691 100644 --- a/weed/storage/volume_write.go +++ b/weed/storage/volume_write.go @@ -50,8 +50,24 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool { return false } +var ErrVolumeNotEmpty = fmt.Errorf("volume not empty") + // Destroy removes everything related to this volume -func (v *Volume) Destroy() (err error) { +func (v *Volume) Destroy(onlyEmpty bool) (err error) { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + + if onlyEmpty { + isEmpty, e := v.doIsEmpty() + if e != nil { + err = fmt.Errorf("failed to read isEmpty %v", e) + return + } + if !isEmpty { + err = ErrVolumeNotEmpty + return + } + } if v.isCompacting || v.isCommitCompacting { err = fmt.Errorf("volume %d is compacting", v.Id) return @@ -63,7 +79,7 @@ func (v *Volume) Destroy() (err error) { backendStorage.DeleteFile(storageKey) } } - v.Close() + v.doClose() removeVolumeFiles(v.DataFileName()) removeVolumeFiles(v.IndexFileName()) return diff --git a/weed/storage/volume_write_test.go b/weed/storage/volume_write_test.go index 9979aa8f5..63815aecd 100644 --- a/weed/storage/volume_write_test.go +++ b/weed/storage/volume_write_test.go @@ -1,7 +1,10 @@ package storage import ( + "errors" "fmt" + "github.com/stretchr/testify/assert" + "os" "testing" "time" @@ -53,3 +56,111 @@ func TestSearchVolumesWithDeletedNeedles(t *testing.T) { fmt.Printf("offset: %v, isLast: %v\n", offset.ToActualOffset(), isLast) } + +func isFileExist(path string) (bool, error) { + if _, err := os.Stat(path); err == nil { + return true, nil + } else if errors.Is(err, os.ErrNotExist) { + return false, nil + } else { + return false, err + } +} + +func assertFileExist(t *testing.T, expected bool, path string) { + exist, err := isFileExist(path) + if err != nil { + t.Fatalf("isFileExist: %v", err) + } + assert.Equal(t, expected, exist) +} + +func TestDestroyEmptyVolumeWithOnlyEmpty(t *testing.T) { + dir := t.TempDir() + + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) + if err != nil { + t.Fatalf("volume creation: %v", err) + } + path := v.DataBackend.Name() + + // should can Destroy empty volume with onlyEmpty + assertFileExist(t, true, path) + err = v.Destroy(true) + if err != nil { + t.Fatalf("destroy volume: %v", err) + } + assertFileExist(t, false, path) +} + +func TestDestroyEmptyVolumeWithoutOnlyEmpty(t *testing.T) { + dir := t.TempDir() + + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) + if err != nil { + t.Fatalf("volume creation: %v", err) + } + path := v.DataBackend.Name() + + // should can Destroy empty volume without onlyEmpty + assertFileExist(t, true, path) + err = v.Destroy(false) + if err != nil { + t.Fatalf("destroy volume: %v", err) + } + assertFileExist(t, false, path) +} + +func TestDestroyNonemptyVolumeWithOnlyEmpty(t *testing.T) { + dir := t.TempDir() + + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) + if err != nil { + t.Fatalf("volume creation: %v", err) + } + path := v.DataBackend.Name() + + // should return "volume not empty" error and do not delete file when Destroy non-empty volume + _, _, _, err = v.writeNeedle2(newRandomNeedle(1), true, false) + if err != nil { + t.Fatalf("write needle: %v", err) + } + assert.Equal(t, uint64(1), v.FileCount()) + + assertFileExist(t, true, path) + err = v.Destroy(true) + assert.EqualError(t, err, "volume not empty") + assertFileExist(t, true, path) + + // should keep working after "volume not empty" + _, _, _, err = v.writeNeedle2(newRandomNeedle(2), true, false) + if err != nil { + t.Fatalf("write needle: %v", err) + } + + assert.Equal(t, uint64(2), v.FileCount()) +} + +func TestDestroyNonemptyVolumeWithoutOnlyEmpty(t *testing.T) { + dir := t.TempDir() + + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0) + if err != nil { + t.Fatalf("volume creation: %v", err) + } + path := v.DataBackend.Name() + + // should can Destroy non-empty volume without onlyEmpty + _, _, _, err = v.writeNeedle2(newRandomNeedle(1), true, false) + if err != nil { + t.Fatalf("write needle: %v", err) + } + assert.Equal(t, uint64(1), v.FileCount()) + + assertFileExist(t, true, path) + err = v.Destroy(false) + if err != nil { + t.Fatalf("destroy volume: %v", err) + } + assertFileExist(t, false, path) +}