avoid data race read volume.IsEmpty (#4574)

* avoid data race read volume.IsEmpty

-   avoid phantom read isEmpty for onlyEmpty
-   use `v.DataBackend.GetStat()` in v.dataFileAccessLock scope

* add Destroy(onlyEmpty: true) test

* add Destroy(onlyEmpty: false) test

* remove unused `IsEmpty()`

* change literal `8` to `SuperBlockSize`
This commit is contained in:
柏杰 2023-06-15 05:39:58 +08:00 committed by GitHub
parent 1e22d5caf2
commit 0b0fb9b9e4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 164 additions and 18 deletions

View file

@ -137,7 +137,7 @@ func runBackup(cmd *Command, args []string) bool {
if datSize > stats.TailOffset { if datSize > stats.TailOffset {
// remove the old data // remove the old data
v.Destroy() v.Destroy(false)
// recreate an empty volume // recreate an empty volume
v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0) v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0)
if err != nil { if err != nil {

View file

@ -5,6 +5,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"io" "io"
"log" "log"
"time" "time"
@ -57,7 +58,7 @@ func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, wri
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
for _, diskInfo := range dn.DiskInfos { for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos { for _, v := range diskInfo.VolumeInfos {
if v.Size <= 8 && v.ModifiedAtSecond > 0 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { if v.Size <= super_block.SuperBlockSize && v.ModifiedAtSecond > 0 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
if *applyBalancing { if *applyBalancing {
log.Printf("deleting empty volume %d from %s", v.Id, dn.Id) log.Printf("deleting empty volume %d from %s", v.Id, dn.Id)
if deleteErr := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), if deleteErr := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id),

View file

@ -245,7 +245,7 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er
wg.Add(2) wg.Add(2)
go func() { go func() {
for _, v := range delVolsMap { for _, v := range delVolsMap {
if err := v.Destroy(); err != nil { if err := v.Destroy(false); err != nil {
errChain <- err errChain <- err
} }
} }
@ -276,12 +276,12 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er
return return
} }
func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e error) { func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId, onlyEmpty bool) (found bool, e error) {
v, ok := l.volumes[vid] v, ok := l.volumes[vid]
if !ok { if !ok {
return return
} }
e = v.Destroy() e = v.Destroy(onlyEmpty)
if e != nil { if e != nil {
return return
} }
@ -299,7 +299,7 @@ func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKi
var ErrVolumeNotFound = fmt.Errorf("volume not found") var ErrVolumeNotFound = fmt.Errorf("volume not found")
func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error { func (l *DiskLocation) DeleteVolume(vid needle.VolumeId, onlyEmpty bool) error {
l.volumesLock.Lock() l.volumesLock.Lock()
defer l.volumesLock.Unlock() defer l.volumesLock.Unlock()
@ -307,7 +307,7 @@ func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error {
if !ok { if !ok {
return ErrVolumeNotFound return ErrVolumeNotFound
} }
_, err := l.deleteVolumeById(vid) _, err := l.deleteVolumeById(vid, onlyEmpty)
return err return err
} }

View file

@ -320,7 +320,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
// delete expired volumes. // delete expired volumes.
location.volumesLock.Lock() location.volumesLock.Lock()
for _, vid := range deleteVids { for _, vid := range deleteVids {
found, err := location.deleteVolumeById(vid) found, err := location.deleteVolumeById(vid, false)
if err == nil { if err == nil {
if found { if found {
glog.V(0).Infof("volume %d is deleted", vid) glog.V(0).Infof("volume %d is deleted", vid)
@ -512,9 +512,6 @@ func (s *Store) DeleteVolume(i needle.VolumeId, onlyEmpty bool) error {
if v == nil { if v == nil {
return fmt.Errorf("delete volume %d not found on disk", i) return fmt.Errorf("delete volume %d not found on disk", i)
} }
if onlyEmpty && !v.IsEmpty() {
return fmt.Errorf("delete volume %d not empty", i)
}
message := master_pb.VolumeShortInformationMessage{ message := master_pb.VolumeShortInformationMessage{
Id: uint32(v.Id), Id: uint32(v.Id),
Collection: v.Collection, Collection: v.Collection,
@ -524,13 +521,15 @@ func (s *Store) DeleteVolume(i needle.VolumeId, onlyEmpty bool) error {
DiskType: string(v.location.DiskType), DiskType: string(v.location.DiskType),
} }
for _, location := range s.Locations { for _, location := range s.Locations {
err := location.DeleteVolume(i) err := location.DeleteVolume(i, onlyEmpty)
if err == nil { if err == nil {
glog.V(0).Infof("DeleteVolume %d", i) glog.V(0).Infof("DeleteVolume %d", i)
s.DeletedVolumesChan <- message s.DeletedVolumesChan <- message
return nil return nil
} else if err == ErrVolumeNotFound { } else if err == ErrVolumeNotFound {
continue continue
} else if err == ErrVolumeNotEmpty {
return fmt.Errorf("DeleteVolume %d: %v", i, err)
} else { } else {
glog.Errorf("DeleteVolume %d: %v", i, err) glog.Errorf("DeleteVolume %d: %v", i, err)
} }

View file

@ -133,6 +133,25 @@ func (v *Volume) ContentSize() uint64 {
return v.nm.ContentSize() return v.nm.ContentSize()
} }
func (v *Volume) doIsEmpty() (bool, error) {
if v.DataBackend != nil {
datFileSize, _, e := v.DataBackend.GetStat()
if e != nil {
glog.V(0).Infof("Failed to read file size %s %v", v.DataBackend.Name(), e)
return false, e
}
if datFileSize > super_block.SuperBlockSize {
return false, nil
}
}
if v.nm != nil {
if v.nm.ContentSize() > 0 {
return false, nil
}
}
return true, nil
}
func (v *Volume) DeletedSize() uint64 { func (v *Volume) DeletedSize() uint64 {
v.dataFileAccessLock.RLock() v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock() defer v.dataFileAccessLock.RUnlock()
@ -202,6 +221,10 @@ func (v *Volume) Close() {
v.dataFileAccessLock.Lock() v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock() defer v.dataFileAccessLock.Unlock()
v.doClose()
}
func (v *Volume) doClose() {
for v.isCommitCompacting { for v.isCommitCompacting {
time.Sleep(521 * time.Millisecond) time.Sleep(521 * time.Millisecond)
glog.Warningf("Volume Close wait for compaction %d", v.Id) glog.Warningf("Volume Close wait for compaction %d", v.Id)
@ -332,7 +355,3 @@ func (v *Volume) IsReadOnly() bool {
defer v.noWriteLock.RUnlock() defer v.noWriteLock.RUnlock()
return v.noWriteOrDelete || v.noWriteCanDelete || v.location.isDiskSpaceLow return v.noWriteOrDelete || v.noWriteCanDelete || v.location.isDiskSpaceLow
} }
func (v *Volume) IsEmpty() bool {
datSize, _, _ := v.FileStat()
return datSize <= super_block.SuperBlockSize && v.ContentSize() == 0
}

View file

@ -50,8 +50,24 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
return false return false
} }
var ErrVolumeNotEmpty = fmt.Errorf("volume not empty")
// Destroy removes everything related to this volume // Destroy removes everything related to this volume
func (v *Volume) Destroy() (err error) { func (v *Volume) Destroy(onlyEmpty bool) (err error) {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
if onlyEmpty {
isEmpty, e := v.doIsEmpty()
if e != nil {
err = fmt.Errorf("failed to read isEmpty %v", e)
return
}
if !isEmpty {
err = ErrVolumeNotEmpty
return
}
}
if v.isCompacting || v.isCommitCompacting { if v.isCompacting || v.isCommitCompacting {
err = fmt.Errorf("volume %d is compacting", v.Id) err = fmt.Errorf("volume %d is compacting", v.Id)
return return
@ -63,7 +79,7 @@ func (v *Volume) Destroy() (err error) {
backendStorage.DeleteFile(storageKey) backendStorage.DeleteFile(storageKey)
} }
} }
v.Close() v.doClose()
removeVolumeFiles(v.DataFileName()) removeVolumeFiles(v.DataFileName())
removeVolumeFiles(v.IndexFileName()) removeVolumeFiles(v.IndexFileName())
return return

View file

@ -1,7 +1,10 @@
package storage package storage
import ( import (
"errors"
"fmt" "fmt"
"github.com/stretchr/testify/assert"
"os"
"testing" "testing"
"time" "time"
@ -53,3 +56,111 @@ func TestSearchVolumesWithDeletedNeedles(t *testing.T) {
fmt.Printf("offset: %v, isLast: %v\n", offset.ToActualOffset(), isLast) fmt.Printf("offset: %v, isLast: %v\n", offset.ToActualOffset(), isLast)
} }
func isFileExist(path string) (bool, error) {
if _, err := os.Stat(path); err == nil {
return true, nil
} else if errors.Is(err, os.ErrNotExist) {
return false, nil
} else {
return false, err
}
}
func assertFileExist(t *testing.T, expected bool, path string) {
exist, err := isFileExist(path)
if err != nil {
t.Fatalf("isFileExist: %v", err)
}
assert.Equal(t, expected, exist)
}
func TestDestroyEmptyVolumeWithOnlyEmpty(t *testing.T) {
dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
path := v.DataBackend.Name()
// should can Destroy empty volume with onlyEmpty
assertFileExist(t, true, path)
err = v.Destroy(true)
if err != nil {
t.Fatalf("destroy volume: %v", err)
}
assertFileExist(t, false, path)
}
func TestDestroyEmptyVolumeWithoutOnlyEmpty(t *testing.T) {
dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
path := v.DataBackend.Name()
// should can Destroy empty volume without onlyEmpty
assertFileExist(t, true, path)
err = v.Destroy(false)
if err != nil {
t.Fatalf("destroy volume: %v", err)
}
assertFileExist(t, false, path)
}
func TestDestroyNonemptyVolumeWithOnlyEmpty(t *testing.T) {
dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
path := v.DataBackend.Name()
// should return "volume not empty" error and do not delete file when Destroy non-empty volume
_, _, _, err = v.writeNeedle2(newRandomNeedle(1), true, false)
if err != nil {
t.Fatalf("write needle: %v", err)
}
assert.Equal(t, uint64(1), v.FileCount())
assertFileExist(t, true, path)
err = v.Destroy(true)
assert.EqualError(t, err, "volume not empty")
assertFileExist(t, true, path)
// should keep working after "volume not empty"
_, _, _, err = v.writeNeedle2(newRandomNeedle(2), true, false)
if err != nil {
t.Fatalf("write needle: %v", err)
}
assert.Equal(t, uint64(2), v.FileCount())
}
func TestDestroyNonemptyVolumeWithoutOnlyEmpty(t *testing.T) {
dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
path := v.DataBackend.Name()
// should can Destroy non-empty volume without onlyEmpty
_, _, _, err = v.writeNeedle2(newRandomNeedle(1), true, false)
if err != nil {
t.Fatalf("write needle: %v", err)
}
assert.Equal(t, uint64(1), v.FileCount())
assertFileExist(t, true, path)
err = v.Destroy(false)
if err != nil {
t.Fatalf("destroy volume: %v", err)
}
assertFileExist(t, false, path)
}