mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
volume: fail the volume deletion if compaction is in progress
fix https://github.com/chrislusf/seaweedfs/issues/1035
This commit is contained in:
parent
6f75df8660
commit
e40634e6b4
|
@ -33,6 +33,8 @@ type Volume struct {
|
||||||
|
|
||||||
lastCompactIndexOffset uint64
|
lastCompactIndexOffset uint64
|
||||||
lastCompactRevision uint16
|
lastCompactRevision uint16
|
||||||
|
|
||||||
|
isCompacting bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64) (v *Volume, e error) {
|
func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64) (v *Volume, e error) {
|
||||||
|
|
|
@ -43,6 +43,10 @@ func (v *Volume) Destroy() (err error) {
|
||||||
err = fmt.Errorf("%s is read-only", v.dataFile.Name())
|
err = fmt.Errorf("%s is read-only", v.dataFile.Name())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if v.isCompacting {
|
||||||
|
err = fmt.Errorf("volume %d is compacting", v.Id)
|
||||||
|
return
|
||||||
|
}
|
||||||
v.Close()
|
v.Close()
|
||||||
os.Remove(v.FileName() + ".dat")
|
os.Remove(v.FileName() + ".dat")
|
||||||
os.Remove(v.FileName() + ".idx")
|
os.Remove(v.FileName() + ".idx")
|
||||||
|
|
|
@ -27,6 +27,10 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
|
||||||
//v.accessLock.Lock()
|
//v.accessLock.Lock()
|
||||||
//defer v.accessLock.Unlock()
|
//defer v.accessLock.Unlock()
|
||||||
//glog.V(3).Infof("Got Compaction lock...")
|
//glog.V(3).Infof("Got Compaction lock...")
|
||||||
|
v.isCompacting = true
|
||||||
|
defer func() {
|
||||||
|
v.isCompacting = false
|
||||||
|
}()
|
||||||
|
|
||||||
filePath := v.FileName()
|
filePath := v.FileName()
|
||||||
v.lastCompactIndexOffset = v.nm.IndexFileSize()
|
v.lastCompactIndexOffset = v.nm.IndexFileSize()
|
||||||
|
@ -37,6 +41,12 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
|
||||||
|
|
||||||
func (v *Volume) Compact2() error {
|
func (v *Volume) Compact2() error {
|
||||||
glog.V(3).Infof("Compact2 volume %d ...", v.Id)
|
glog.V(3).Infof("Compact2 volume %d ...", v.Id)
|
||||||
|
|
||||||
|
v.isCompacting = true
|
||||||
|
defer func() {
|
||||||
|
v.isCompacting = false
|
||||||
|
}()
|
||||||
|
|
||||||
filePath := v.FileName()
|
filePath := v.FileName()
|
||||||
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
|
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
|
||||||
return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx")
|
return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx")
|
||||||
|
@ -44,8 +54,15 @@ func (v *Volume) Compact2() error {
|
||||||
|
|
||||||
func (v *Volume) CommitCompact() error {
|
func (v *Volume) CommitCompact() error {
|
||||||
glog.V(0).Infof("Committing volume %d vacuuming...", v.Id)
|
glog.V(0).Infof("Committing volume %d vacuuming...", v.Id)
|
||||||
|
|
||||||
|
v.isCompacting = true
|
||||||
|
defer func() {
|
||||||
|
v.isCompacting = false
|
||||||
|
}()
|
||||||
|
|
||||||
v.dataFileAccessLock.Lock()
|
v.dataFileAccessLock.Lock()
|
||||||
defer v.dataFileAccessLock.Unlock()
|
defer v.dataFileAccessLock.Unlock()
|
||||||
|
|
||||||
glog.V(3).Infof("Got volume %d committing lock...", v.Id)
|
glog.V(3).Infof("Got volume %d committing lock...", v.Id)
|
||||||
v.nm.Close()
|
v.nm.Close()
|
||||||
if err := v.dataFile.Close(); err != nil {
|
if err := v.dataFile.Close(); err != nil {
|
||||||
|
|
Loading…
Reference in a new issue