mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
ec volume support deletes
This commit is contained in:
parent
115558e5f5
commit
856da7aae2
|
@ -55,11 +55,11 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||||
|
|
||||||
// println("source:", volFileInfoResp.String())
|
// println("source:", volFileInfoResp.String())
|
||||||
// copy ecx file
|
// copy ecx file
|
||||||
if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx"); err != nil {
|
if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat"); err != nil {
|
if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,7 +95,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32,
|
func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32,
|
||||||
compactRevision uint32, stopOffset uint64, baseFileName, ext string) error {
|
compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool) error {
|
||||||
|
|
||||||
copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
|
copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
|
||||||
VolumeId: vid,
|
VolumeId: vid,
|
||||||
|
@ -109,7 +109,7 @@ func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.
|
||||||
return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
|
return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond))
|
err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
|
return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
|
||||||
}
|
}
|
||||||
|
@ -143,9 +143,13 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler) error {
|
func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) error {
|
||||||
glog.V(4).Infof("writing to %s", fileName)
|
glog.V(4).Infof("writing to %s", fileName)
|
||||||
dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
flags := os.O_WRONLY|os.O_CREATE|os.O_TRUNC
|
||||||
|
if isAppend {
|
||||||
|
flags = os.O_WRONLY|os.O_CREATE
|
||||||
|
}
|
||||||
|
dst, err := os.OpenFile(fileName, flags, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,6 +74,11 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s
|
||||||
} else {
|
} else {
|
||||||
rebuiltShardIds = generatedShardIds
|
rebuiltShardIds = generatedShardIds
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := erasure_coding.RebuildEcxFile(baseFileName); err != nil {
|
||||||
|
return nil, fmt.Errorf("RebuildEcxFile %s: %v", baseFileName, err)
|
||||||
|
}
|
||||||
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -97,7 +102,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
|
||||||
|
|
||||||
// copy ec data slices
|
// copy ec data slices
|
||||||
for _, shardId := range req.ShardIds {
|
for _, shardId := range req.ShardIds {
|
||||||
if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId))); err != nil {
|
if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -107,7 +112,12 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
|
||||||
}
|
}
|
||||||
|
|
||||||
// copy ecx file
|
// copy ecx file
|
||||||
if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx"); err != nil {
|
if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// copy ecj file
|
||||||
|
if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,6 +176,9 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
|
||||||
if err := os.Remove(baseFilename + ".ecx"); err != nil {
|
if err := os.Remove(baseFilename + ".ecx"); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if err := os.Remove(baseFilename + ".ecj"); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
|
return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
|
||||||
|
|
|
@ -32,7 +32,7 @@ func WriteSortedEcxFile(baseFileName string) (e error) {
|
||||||
|
|
||||||
ecxFile, err := os.OpenFile(baseFileName+".ecx", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
|
ecxFile, err := os.OpenFile(baseFileName+".ecx", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open dat file: %v", err)
|
return fmt.Errorf("failed to open ecx file: %v", err)
|
||||||
}
|
}
|
||||||
defer ecxFile.Close()
|
defer ecxFile.Close()
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ func WriteSortedEcxFile(baseFileName string) (e error) {
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open dat file: %v", err)
|
return fmt.Errorf("failed to visit ecx file: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -202,7 +202,7 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi
|
||||||
outputs, err := openEcFiles(baseFileName, false)
|
outputs, err := openEcFiles(baseFileName, false)
|
||||||
defer closeEcFiles(outputs)
|
defer closeEcFiles(outputs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open dat file: %v", err)
|
return fmt.Errorf("failed to open ec files %s: %v", baseFileName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for remainingSize > largeBlockSize*DataShardsCount {
|
for remainingSize > largeBlockSize*DataShardsCount {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package erasure_coding
|
package erasure_coding
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
|
@ -14,6 +15,10 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/storage/types"
|
"github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
NotFoundError = errors.New("needle not found")
|
||||||
|
)
|
||||||
|
|
||||||
type EcVolume struct {
|
type EcVolume struct {
|
||||||
VolumeId needle.VolumeId
|
VolumeId needle.VolumeId
|
||||||
Collection string
|
Collection string
|
||||||
|
@ -26,6 +31,8 @@ type EcVolume struct {
|
||||||
ShardLocationsRefreshTime time.Time
|
ShardLocationsRefreshTime time.Time
|
||||||
ShardLocationsLock sync.RWMutex
|
ShardLocationsLock sync.RWMutex
|
||||||
Version needle.Version
|
Version needle.Version
|
||||||
|
ecjFile *os.File
|
||||||
|
ecjFileAccessLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
|
func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
|
||||||
|
@ -34,8 +41,8 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu
|
||||||
baseFileName := EcShardFileName(collection, dir, int(vid))
|
baseFileName := EcShardFileName(collection, dir, int(vid))
|
||||||
|
|
||||||
// open ecx file
|
// open ecx file
|
||||||
if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); err != nil {
|
if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644); err != nil {
|
||||||
return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, err)
|
return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", baseFileName, err)
|
||||||
}
|
}
|
||||||
ecxFi, statErr := ev.ecxFile.Stat()
|
ecxFi, statErr := ev.ecxFile.Stat()
|
||||||
if statErr != nil {
|
if statErr != nil {
|
||||||
|
@ -44,6 +51,11 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu
|
||||||
ev.ecxFileSize = ecxFi.Size()
|
ev.ecxFileSize = ecxFi.Size()
|
||||||
ev.ecxCreatedAt = ecxFi.ModTime()
|
ev.ecxCreatedAt = ecxFi.ModTime()
|
||||||
|
|
||||||
|
// open ecj file
|
||||||
|
if ev.ecjFile, err = os.OpenFile(baseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err)
|
||||||
|
}
|
||||||
|
|
||||||
ev.ShardLocations = make(map[ShardId][]string)
|
ev.ShardLocations = make(map[ShardId][]string)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -93,6 +105,12 @@ func (ev *EcVolume) Close() {
|
||||||
for _, s := range ev.Shards {
|
for _, s := range ev.Shards {
|
||||||
s.Close()
|
s.Close()
|
||||||
}
|
}
|
||||||
|
if ev.ecjFile != nil {
|
||||||
|
ev.ecjFileAccessLock.Lock()
|
||||||
|
_ = ev.ecjFile.Close()
|
||||||
|
ev.ecjFile = nil
|
||||||
|
ev.ecjFileAccessLock.Unlock()
|
||||||
|
}
|
||||||
if ev.ecxFile != nil {
|
if ev.ecxFile != nil {
|
||||||
_ = ev.ecxFile.Close()
|
_ = ev.ecxFile.Close()
|
||||||
ev.ecxFile = nil
|
ev.ecxFile = nil
|
||||||
|
@ -107,6 +125,7 @@ func (ev *EcVolume) Destroy() {
|
||||||
s.Destroy()
|
s.Destroy()
|
||||||
}
|
}
|
||||||
os.Remove(ev.FileName() + ".ecx")
|
os.Remove(ev.FileName() + ".ecx")
|
||||||
|
os.Remove(ev.FileName() + ".ecj")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ev *EcVolume) FileName() string {
|
func (ev *EcVolume) FileName() string {
|
||||||
|
@ -167,16 +186,23 @@ func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle, version needle.Version
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
|
func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
|
||||||
|
return searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func searchNeedleFromEcx(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
|
||||||
var key types.NeedleId
|
var key types.NeedleId
|
||||||
buf := make([]byte, types.NeedleMapEntrySize)
|
buf := make([]byte, types.NeedleMapEntrySize)
|
||||||
l, h := int64(0), ev.ecxFileSize/types.NeedleMapEntrySize
|
l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
|
||||||
for l < h {
|
for l < h {
|
||||||
m := (l + h) / 2
|
m := (l + h) / 2
|
||||||
if _, err := ev.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
|
if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
|
||||||
return types.Offset{}, 0, fmt.Errorf("ecx file %d read at %d: %v", ev.ecxFileSize, m*types.NeedleMapEntrySize, err)
|
return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
|
||||||
}
|
}
|
||||||
key, offset, size = idx.IdxFileEntry(buf)
|
key, offset, size = idx.IdxFileEntry(buf)
|
||||||
if key == needleId {
|
if key == needleId {
|
||||||
|
if processNeedleFn != nil {
|
||||||
|
err = processNeedleFn(ecxFile, m*types.NeedleHeaderSize)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if key < needleId {
|
if key < needleId {
|
||||||
|
@ -186,6 +212,6 @@ func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Off
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = fmt.Errorf("needle id %d not found", needleId)
|
err = NotFoundError
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
98
weed/storage/erasure_coding/ec_volume_delete.go
Normal file
98
weed/storage/erasure_coding/ec_volume_delete.go
Normal file
|
@ -0,0 +1,98 @@
|
||||||
|
package erasure_coding
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
markNeedleDeleted = func(file *os.File, offset int64) error {
|
||||||
|
b := make([]byte, types.SizeSize)
|
||||||
|
util.Uint32toBytes(b, types.TombstoneFileSize)
|
||||||
|
n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("ecx write error: %v", err)
|
||||||
|
}
|
||||||
|
if n != types.SizeSize {
|
||||||
|
return fmt.Errorf("ecx written %d bytes, expecting %d", n, types.SizeSize)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func (ev *EcVolume) deleteNeedleFromEcx(needleId types.NeedleId) (err error) {
|
||||||
|
|
||||||
|
_, _, err = searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, markNeedleDeleted)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if err == NotFoundError {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
b := make([]byte, types.NeedleIdSize)
|
||||||
|
types.NeedleIdToBytes(b, needleId)
|
||||||
|
|
||||||
|
ev.ecjFileAccessLock.Lock()
|
||||||
|
|
||||||
|
ev.ecjFile.Seek(0, io.SeekEnd)
|
||||||
|
ev.ecjFile.Write(b)
|
||||||
|
|
||||||
|
ev.ecjFileAccessLock.Unlock()
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func RebuildEcxFile(baseFileName string) error {
|
||||||
|
|
||||||
|
if !util.FileExists(baseFileName + ".ecj") {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ecxFile, err := os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("rebuild: failed to open ecx file: %v", err)
|
||||||
|
}
|
||||||
|
defer ecxFile.Close()
|
||||||
|
|
||||||
|
fstat, err := ecxFile.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ecxFileSize := fstat.Size()
|
||||||
|
|
||||||
|
ecjFile, err := os.OpenFile(baseFileName+".ecj", os.O_RDWR, 0644)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("rebuild: failed to open ecj file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]byte, types.NeedleIdSize)
|
||||||
|
for {
|
||||||
|
n, _ := ecjFile.Read(buf)
|
||||||
|
if n != types.NeedleIdSize {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
needleId := types.BytesToNeedleId(buf)
|
||||||
|
|
||||||
|
_, _, err = searchNeedleFromEcx(ecxFile, ecxFileSize, needleId, markNeedleDeleted)
|
||||||
|
|
||||||
|
if err != nil && err != NotFoundError {
|
||||||
|
ecxFile.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
ecxFile.Close()
|
||||||
|
|
||||||
|
os.Remove(baseFileName + ".ecj")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/stats"
|
"github.com/chrislusf/seaweedfs/weed/stats"
|
||||||
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
|
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
|
||||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||||
"github.com/klauspost/reedsolomon"
|
"github.com/klauspost/reedsolomon"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -130,6 +131,9 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("locate in local ec volume: %v", err)
|
return 0, fmt.Errorf("locate in local ec volume: %v", err)
|
||||||
}
|
}
|
||||||
|
if size == types.TombstoneFileSize {
|
||||||
|
return 0, fmt.Errorf("entry %s is deleted", n.Id)
|
||||||
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
|
glog.V(4).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue