seaweedfs/weed/storage/disk_location.go
James Hartig 8e54e34576 volume: Don't unmount before deleting volume in copy
If we unmount first and then delete, the delete fails because the volume
was unmounted. Delete ends up doing the same thing as the unmount anyways.
2020-09-01 22:00:07 -04:00

324 lines
7.5 KiB
Go

package storage
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
type DiskLocation struct {
Directory string
MaxVolumeCount int
MinFreeSpacePercent float32
volumes map[needle.VolumeId]*Volume
volumesLock sync.RWMutex
// erasure coding
ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume
ecVolumesLock sync.RWMutex
isDiskSpaceLow bool
}
func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32) *DiskLocation {
location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount, MinFreeSpacePercent: minFreeSpacePercent}
location.volumes = make(map[needle.VolumeId]*Volume)
location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
go location.CheckDiskSpace()
return location
}
func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (needle.VolumeId, string, error) {
name := dir.Name()
if !dir.IsDir() && strings.HasSuffix(name, ".idx") {
base := name[:len(name)-len(".idx")]
collection, volumeId, err := parseCollectionVolumeId(base)
return volumeId, collection, err
}
return 0, "", fmt.Errorf("Path is not a volume: %s", name)
}
func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeId, err error) {
i := strings.LastIndex(base, "_")
if i > 0 {
collection, base = base[0:i], base[i+1:]
}
vol, err := needle.NewVolumeId(base)
return collection, vol, err
}
func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool {
name := fileInfo.Name()
if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") {
vid, collection, err := l.volumeIdFromPath(fileInfo)
if err != nil {
glog.Warningf("get volume id failed, %s, err : %s", name, err)
return false
}
// void loading one volume more than once
l.volumesLock.RLock()
_, found := l.volumes[vid]
l.volumesLock.RUnlock()
if found {
glog.V(1).Infof("loaded volume, %v", vid)
return true
}
v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0)
if e != nil {
glog.V(0).Infof("new volume %s error %s", name, e)
return false
}
l.SetVolume(vid, v)
size, _, _ := v.FileStat()
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
return true
}
return false
}
func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) {
task_queue := make(chan os.FileInfo, 10*concurrency)
go func() {
if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
for _, dir := range dirs {
task_queue <- dir
}
}
close(task_queue)
}()
var wg sync.WaitGroup
for workerNum := 0; workerNum < concurrency; workerNum++ {
wg.Add(1)
go func() {
defer wg.Done()
for dir := range task_queue {
_ = l.loadExistingVolume(dir, needleMapKind)
}
}()
}
wg.Wait()
}
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
l.concurrentLoadingVolumes(needleMapKind, 10)
glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
l.loadAllEcShards()
glog.V(0).Infof("Store started on dir: %s with %d ec shards", l.Directory, len(l.ecVolumes))
}
func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
l.volumesLock.Lock()
delVolsMap := l.unmountVolumeByCollection(collection)
l.volumesLock.Unlock()
l.ecVolumesLock.Lock()
delEcVolsMap := l.unmountEcVolumeByCollection(collection)
l.ecVolumesLock.Unlock()
errChain := make(chan error, 2)
var wg sync.WaitGroup
wg.Add(2)
go func() {
for _, v := range delVolsMap {
if err := v.Destroy(); err != nil {
errChain <- err
}
}
wg.Done()
}()
go func() {
for _, v := range delEcVolsMap {
v.Destroy()
}
wg.Done()
}()
go func() {
wg.Wait()
close(errChain)
}()
errBuilder := strings.Builder{}
for err := range errChain {
errBuilder.WriteString(err.Error())
errBuilder.WriteString("; ")
}
if errBuilder.Len() > 0 {
e = fmt.Errorf(errBuilder.String())
}
return
}
func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e error) {
l.volumesLock.Lock()
defer l.volumesLock.Unlock()
v, ok := l.volumes[vid]
if !ok {
return
}
e = v.Destroy()
if e != nil {
return
}
found = true
delete(l.volumes, vid)
return
}
func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool {
if fileInfo, found := l.LocateVolume(vid); found {
return l.loadExistingVolume(fileInfo, needleMapKind)
}
return false
}
func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error {
l.volumesLock.Lock()
defer l.volumesLock.Unlock()
_, ok := l.volumes[vid]
if !ok {
return fmt.Errorf("Volume not found, VolumeId: %d", vid)
}
_, err := l.deleteVolumeById(vid)
return err
}
func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
l.volumesLock.Lock()
defer l.volumesLock.Unlock()
v, ok := l.volumes[vid]
if !ok {
return fmt.Errorf("Volume not loaded, VolumeId: %d", vid)
}
v.Close()
delete(l.volumes, vid)
return nil
}
func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume {
deltaVols := make(map[needle.VolumeId]*Volume, 0)
for k, v := range l.volumes {
if v.Collection == collectionName && !v.isCompacting {
deltaVols[k] = v
}
}
for k := range deltaVols {
delete(l.volumes, k)
}
return deltaVols
}
func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) {
l.volumesLock.Lock()
defer l.volumesLock.Unlock()
l.volumes[vid] = volume
volume.location = l
}
func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) {
l.volumesLock.RLock()
defer l.volumesLock.RUnlock()
v, ok := l.volumes[vid]
return v, ok
}
func (l *DiskLocation) VolumesLen() int {
l.volumesLock.RLock()
defer l.volumesLock.RUnlock()
return len(l.volumes)
}
func (l *DiskLocation) Close() {
l.volumesLock.Lock()
for _, v := range l.volumes {
v.Close()
}
l.volumesLock.Unlock()
l.ecVolumesLock.Lock()
for _, ecVolume := range l.ecVolumes {
ecVolume.Close()
}
l.ecVolumesLock.Unlock()
return
}
func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.FileInfo, bool) {
if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil {
for _, fileInfo := range fileInfos {
volId, _, err := l.volumeIdFromPath(fileInfo)
if vid == volId && err == nil {
return fileInfo, true
}
}
}
return nil, false
}
func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) {
l.volumesLock.RLock()
defer l.volumesLock.RUnlock()
for _, vol := range l.volumes {
if vol.IsReadOnly() {
continue
}
datSize, idxSize, _ := vol.FileStat()
unUsedSpace += volumeSizeLimit - (datSize + idxSize)
}
return
}
func (l *DiskLocation) CheckDiskSpace() {
for {
if dir, e := filepath.Abs(l.Directory); e == nil {
s := stats.NewDiskStatus(dir)
if (s.PercentFree < l.MinFreeSpacePercent) != l.isDiskSpaceLow {
l.isDiskSpaceLow = !l.isDiskSpaceLow
}
if l.isDiskSpaceLow {
glog.V(0).Infof("dir %s freePercent %.2f%% < min %.2f%%, isLowDiskSpace: %v", dir, s.PercentFree, l.MinFreeSpacePercent, l.isDiskSpaceLow)
} else {
glog.V(4).Infof("dir %s freePercent %.2f%% < min %.2f%%, isLowDiskSpace: %v", dir, s.PercentFree, l.MinFreeSpacePercent, l.isDiskSpaceLow)
}
}
time.Sleep(time.Minute)
}
}