seaweedfs/weed/storage/erasure_coding/ec_volume.go

261 lines
7.3 KiB
Go
Raw Permalink Normal View History

package erasure_coding
import (
2019-06-20 05:57:14 +00:00
"errors"
2019-05-28 05:00:36 +00:00
"fmt"
"math"
2019-05-28 05:00:36 +00:00
"os"
2019-05-28 05:54:58 +00:00
"sync"
"time"
"golang.org/x/exp/slices"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
)
2019-06-20 05:57:14 +00:00
var (
NotFoundError = errors.New("needle not found")
)
2019-05-28 04:40:51 +00:00
type EcVolume struct {
2019-05-28 05:54:58 +00:00
VolumeId needle.VolumeId
Collection string
dir string
dirIdx string
2019-05-28 05:54:58 +00:00
ecxFile *os.File
ecxFileSize int64
2019-06-05 04:52:37 +00:00
ecxCreatedAt time.Time
2019-05-28 05:54:58 +00:00
Shards []*EcVolumeShard
ShardLocations map[ShardId][]pb.ServerAddress
2019-05-28 05:54:58 +00:00
ShardLocationsRefreshTime time.Time
ShardLocationsLock sync.RWMutex
2019-05-31 07:58:51 +00:00
Version needle.Version
2019-06-20 05:57:14 +00:00
ecjFile *os.File
ecjFileAccessLock sync.Mutex
2021-02-16 10:47:02 +00:00
diskType types.DiskType
2019-05-28 05:00:36 +00:00
}
2021-02-16 10:47:02 +00:00
func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType}
2019-05-28 05:00:36 +00:00
dataBaseFileName := EcShardFileName(collection, dir, int(vid))
indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
2019-05-28 05:00:36 +00:00
// open ecx file
if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil {
return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err)
2019-05-28 05:00:36 +00:00
}
ecxFi, statErr := ev.ecxFile.Stat()
if statErr != nil {
_ = ev.ecxFile.Close()
return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr)
2019-05-28 05:00:36 +00:00
}
ev.ecxFileSize = ecxFi.Size()
2019-06-05 04:52:37 +00:00
ev.ecxCreatedAt = ecxFi.ModTime()
2019-05-28 05:00:36 +00:00
2019-06-20 05:57:14 +00:00
// open ecj file
if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err)
2019-06-20 05:57:14 +00:00
}
2019-12-28 20:44:59 +00:00
// read volume info
ev.Version = needle.Version3
2021-08-26 22:18:34 +00:00
if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
2019-12-28 20:44:59 +00:00
ev.Version = needle.Version(volumeInfo.Version)
2019-12-29 05:52:06 +00:00
} else {
2021-08-26 22:18:34 +00:00
volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
2019-12-28 20:44:59 +00:00
}
ev.ShardLocations = make(map[ShardId][]pb.ServerAddress)
2019-05-28 05:00:36 +00:00
return
2019-05-28 04:40:51 +00:00
}
2019-05-28 04:40:51 +00:00
func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
for _, s := range ev.Shards {
if s.ShardId == ecVolumeShard.ShardId {
return false
}
}
2019-05-28 04:40:51 +00:00
ev.Shards = append(ev.Shards, ecVolumeShard)
slices.SortFunc(ev.Shards, func(a, b *EcVolumeShard) int {
if a.VolumeId != b.VolumeId {
return int(a.VolumeId - b.VolumeId)
}
return int(a.ShardId - b.ShardId)
})
return true
}
func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, deleted bool) {
foundPosition := -1
2019-05-28 04:40:51 +00:00
for i, s := range ev.Shards {
if s.ShardId == shardId {
foundPosition = i
}
}
if foundPosition < 0 {
return nil, false
}
ecVolumeShard = ev.Shards[foundPosition]
2019-05-28 04:40:51 +00:00
ev.Shards = append(ev.Shards[:foundPosition], ev.Shards[foundPosition+1:]...)
return ecVolumeShard, true
}
2019-05-28 04:40:51 +00:00
func (ev *EcVolume) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
for _, s := range ev.Shards {
if s.ShardId == shardId {
return s, true
}
}
return nil, false
}
2019-05-28 04:40:51 +00:00
func (ev *EcVolume) Close() {
for _, s := range ev.Shards {
s.Close()
}
2019-06-20 05:57:14 +00:00
if ev.ecjFile != nil {
ev.ecjFileAccessLock.Lock()
_ = ev.ecjFile.Close()
ev.ecjFile = nil
ev.ecjFileAccessLock.Unlock()
}
2019-05-28 05:00:36 +00:00
if ev.ecxFile != nil {
_ = ev.ecxFile.Sync()
2019-05-28 05:00:36 +00:00
_ = ev.ecxFile.Close()
ev.ecxFile = nil
}
}
2019-05-30 16:47:54 +00:00
func (ev *EcVolume) Destroy() {
ev.Close()
for _, s := range ev.Shards {
s.Destroy()
}
os.Remove(ev.FileName(".ecx"))
os.Remove(ev.FileName(".ecj"))
os.Remove(ev.FileName(".vif"))
}
func (ev *EcVolume) FileName(ext string) string {
switch ext {
case ".ecx", ".ecj":
return ev.IndexBaseFileName() + ext
}
// .vif
return ev.DataBaseFileName() + ext
}
func (ev *EcVolume) DataBaseFileName() string {
return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
}
func (ev *EcVolume) IndexBaseFileName() string {
return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId))
2019-05-30 16:47:54 +00:00
}
func (ev *EcVolume) ShardSize() uint64 {
2019-06-05 04:52:37 +00:00
if len(ev.Shards) > 0 {
return uint64(ev.Shards[0].Size())
2019-06-05 04:52:37 +00:00
}
return 0
}
func (ev *EcVolume) Size() (size int64) {
for _, shard := range ev.Shards {
size += shard.Size()
}
return
}
2019-06-05 04:52:37 +00:00
func (ev *EcVolume) CreatedAt() time.Time {
return ev.ecxCreatedAt
}
func (ev *EcVolume) ShardIdList() (shardIds []ShardId) {
for _, s := range ev.Shards {
shardIds = append(shardIds, s.ShardId)
}
return
}
2019-05-28 04:40:51 +00:00
func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
prevVolumeId := needle.VolumeId(math.MaxUint32)
var m *master_pb.VolumeEcShardInformationMessage
2019-05-28 04:40:51 +00:00
for _, s := range ev.Shards {
if s.VolumeId != prevVolumeId {
m = &master_pb.VolumeEcShardInformationMessage{
Id: uint32(s.VolumeId),
Collection: s.Collection,
2021-02-16 10:47:02 +00:00
DiskType: string(ev.diskType),
}
messages = append(messages, m)
}
prevVolumeId = s.VolumeId
m.EcIndexBits = uint32(ShardBits(m.EcIndexBits).AddShardId(s.ShardId))
}
return
}
func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size types.Size, intervals []Interval, err error) {
// find the needle from ecx file
2019-06-21 08:14:10 +00:00
offset, size, err = ev.FindNeedleFromEcx(needleId)
if err != nil {
2019-06-21 08:14:10 +00:00
return types.Offset{}, 0, nil, fmt.Errorf("FindNeedleFromEcx: %v", err)
}
intervals = ev.LocateEcShardNeedleInterval(version, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version)))
return
}
2019-05-28 05:00:36 +00:00
func (ev *EcVolume) LocateEcShardNeedleInterval(version needle.Version, offset int64, size types.Size) (intervals []Interval) {
shard := ev.Shards[0]
// calculate the locations in the ec shards
intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset, types.Size(needle.GetActualSize(size, version)))
2019-05-28 05:00:36 +00:00
return
}
func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size types.Size, err error) {
return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
2019-06-20 05:57:14 +00:00
}
func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size types.Size, err error) {
2019-05-28 05:00:36 +00:00
var key types.NeedleId
buf := make([]byte, types.NeedleMapEntrySize)
2019-06-20 05:57:14 +00:00
l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
2019-05-28 05:00:36 +00:00
for l < h {
m := (l + h) / 2
2019-06-20 05:57:14 +00:00
if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
2019-05-28 05:00:36 +00:00
}
key, offset, size = idx.IdxFileEntry(buf)
if key == needleId {
2019-06-20 05:57:14 +00:00
if processNeedleFn != nil {
err = processNeedleFn(ecxFile, m*types.NeedleHeaderSize)
}
2019-05-28 05:00:36 +00:00
return
}
if key < needleId {
l = m + 1
} else {
h = m
}
}
2019-06-20 05:57:14 +00:00
err = NotFoundError
2019-05-27 18:59:03 +00:00
return
}