From 856da7aae2adefe7c25f68c792f9ed03977a4a0e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 19 Jun 2019 22:57:14 -0700 Subject: [PATCH] ec volume support deletes --- weed/server/volume_grpc_copy.go | 16 +-- weed/server/volume_grpc_erasure_coding.go | 17 +++- weed/storage/erasure_coding/ec_encoder.go | 6 +- weed/storage/erasure_coding/ec_volume.go | 38 +++++-- .../erasure_coding/ec_volume_delete.go | 98 +++++++++++++++++++ weed/storage/store_ec.go | 4 + 6 files changed, 162 insertions(+), 17 deletions(-) create mode 100644 weed/storage/erasure_coding/ec_volume_delete.go diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 89788d3c3..1ec69b43f 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -55,11 +55,11 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo // println("source:", volFileInfoResp.String()) // copy ecx file - if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx"); err != nil { + if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false); err != nil { return err } - if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat"); err != nil { + if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false); err != nil { return err } @@ -95,7 +95,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo } func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32, - compactRevision uint32, stopOffset uint64, baseFileName, ext string) error { + compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool) error { copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ VolumeId: vid, @@ -109,7 +109,7 @@ func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb. return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) } - err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond)) + err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend) if err != nil { return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) } @@ -143,9 +143,13 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse return nil } -func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler) error { +func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) error { glog.V(4).Infof("writing to %s", fileName) - dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + flags := os.O_WRONLY|os.O_CREATE|os.O_TRUNC + if isAppend { + flags = os.O_WRONLY|os.O_CREATE + } + dst, err := os.OpenFile(fileName, flags, 0644) if err != nil { return nil } diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index ab1310c4a..e676337e6 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -74,6 +74,11 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s } else { rebuiltShardIds = generatedShardIds } + + if err := erasure_coding.RebuildEcxFile(baseFileName); err != nil { + return nil, fmt.Errorf("RebuildEcxFile %s: %v", baseFileName, err) + } + break } } @@ -97,7 +102,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv // copy ec data slices for _, shardId := range req.ShardIds { - if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId))); err != nil { + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false); err != nil { return err } } @@ -107,7 +112,12 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv } // copy ecx file - if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx"); err != nil { + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false); err != nil { + return err + } + + // copy ecj file + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true); err != nil { return err } @@ -166,6 +176,9 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se if err := os.Remove(baseFilename + ".ecx"); err != nil { return nil, err } + if err := os.Remove(baseFilename + ".ecj"); err != nil { + return nil, err + } } return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go index 26130b4ba..97010a1ed 100644 --- a/weed/storage/erasure_coding/ec_encoder.go +++ b/weed/storage/erasure_coding/ec_encoder.go @@ -32,7 +32,7 @@ func WriteSortedEcxFile(baseFileName string) (e error) { ecxFile, err := os.OpenFile(baseFileName+".ecx", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - return fmt.Errorf("failed to open dat file: %v", err) + return fmt.Errorf("failed to open ecx file: %v", err) } defer ecxFile.Close() @@ -43,7 +43,7 @@ func WriteSortedEcxFile(baseFileName string) (e error) { }) if err != nil { - return fmt.Errorf("failed to open dat file: %v", err) + return fmt.Errorf("failed to visit ecx file: %v", err) } return nil @@ -202,7 +202,7 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi outputs, err := openEcFiles(baseFileName, false) defer closeEcFiles(outputs) if err != nil { - return fmt.Errorf("failed to open dat file: %v", err) + return fmt.Errorf("failed to open ec files %s: %v", baseFileName, err) } for remainingSize > largeBlockSize*DataShardsCount { diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index aea53f36e..acc9b1c37 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -1,6 +1,7 @@ package erasure_coding import ( + "errors" "fmt" "math" "os" @@ -14,6 +15,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/types" ) +var ( + NotFoundError = errors.New("needle not found") +) + type EcVolume struct { VolumeId needle.VolumeId Collection string @@ -26,6 +31,8 @@ type EcVolume struct { ShardLocationsRefreshTime time.Time ShardLocationsLock sync.RWMutex Version needle.Version + ecjFile *os.File + ecjFileAccessLock sync.Mutex } func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { @@ -34,8 +41,8 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu baseFileName := EcShardFileName(collection, dir, int(vid)) // open ecx file - if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); err != nil { - return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, err) + if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644); err != nil { + return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", baseFileName, err) } ecxFi, statErr := ev.ecxFile.Stat() if statErr != nil { @@ -44,6 +51,11 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu ev.ecxFileSize = ecxFi.Size() ev.ecxCreatedAt = ecxFi.ModTime() + // open ecj file + if ev.ecjFile, err = os.OpenFile(baseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err) + } + ev.ShardLocations = make(map[ShardId][]string) return @@ -93,6 +105,12 @@ func (ev *EcVolume) Close() { for _, s := range ev.Shards { s.Close() } + if ev.ecjFile != nil { + ev.ecjFileAccessLock.Lock() + _ = ev.ecjFile.Close() + ev.ecjFile = nil + ev.ecjFileAccessLock.Unlock() + } if ev.ecxFile != nil { _ = ev.ecxFile.Close() ev.ecxFile = nil @@ -107,6 +125,7 @@ func (ev *EcVolume) Destroy() { s.Destroy() } os.Remove(ev.FileName() + ".ecx") + os.Remove(ev.FileName() + ".ecj") } func (ev *EcVolume) FileName() string { @@ -167,16 +186,23 @@ func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle, version needle.Version } func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) { + return searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, nil) +} + +func searchNeedleFromEcx(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) { var key types.NeedleId buf := make([]byte, types.NeedleMapEntrySize) - l, h := int64(0), ev.ecxFileSize/types.NeedleMapEntrySize + l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize for l < h { m := (l + h) / 2 - if _, err := ev.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil { - return types.Offset{}, 0, fmt.Errorf("ecx file %d read at %d: %v", ev.ecxFileSize, m*types.NeedleMapEntrySize, err) + if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil { + return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err) } key, offset, size = idx.IdxFileEntry(buf) if key == needleId { + if processNeedleFn != nil { + err = processNeedleFn(ecxFile, m*types.NeedleHeaderSize) + } return } if key < needleId { @@ -186,6 +212,6 @@ func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Off } } - err = fmt.Errorf("needle id %d not found", needleId) + err = NotFoundError return } diff --git a/weed/storage/erasure_coding/ec_volume_delete.go b/weed/storage/erasure_coding/ec_volume_delete.go new file mode 100644 index 000000000..784dc2854 --- /dev/null +++ b/weed/storage/erasure_coding/ec_volume_delete.go @@ -0,0 +1,98 @@ +package erasure_coding + +import ( + "fmt" + "io" + "os" + + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + markNeedleDeleted = func(file *os.File, offset int64) error { + b := make([]byte, types.SizeSize) + util.Uint32toBytes(b, types.TombstoneFileSize) + n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize) + if err != nil { + return fmt.Errorf("ecx write error: %v", err) + } + if n != types.SizeSize { + return fmt.Errorf("ecx written %d bytes, expecting %d", n, types.SizeSize) + } + return nil + } +) + +func (ev *EcVolume) deleteNeedleFromEcx(needleId types.NeedleId) (err error) { + + _, _, err = searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, markNeedleDeleted) + + if err != nil { + if err == NotFoundError { + return nil + } + return err + } + + b := make([]byte, types.NeedleIdSize) + types.NeedleIdToBytes(b, needleId) + + ev.ecjFileAccessLock.Lock() + + ev.ecjFile.Seek(0, io.SeekEnd) + ev.ecjFile.Write(b) + + ev.ecjFileAccessLock.Unlock() + + return +} + +func RebuildEcxFile(baseFileName string) error { + + if !util.FileExists(baseFileName + ".ecj") { + return nil + } + + ecxFile, err := os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644) + if err != nil { + return fmt.Errorf("rebuild: failed to open ecx file: %v", err) + } + defer ecxFile.Close() + + fstat, err := ecxFile.Stat() + if err != nil { + return err + } + + ecxFileSize := fstat.Size() + + ecjFile, err := os.OpenFile(baseFileName+".ecj", os.O_RDWR, 0644) + if err != nil { + return fmt.Errorf("rebuild: failed to open ecj file: %v", err) + } + + buf := make([]byte, types.NeedleIdSize) + for { + n, _ := ecjFile.Read(buf) + if n != types.NeedleIdSize { + break + } + + needleId := types.BytesToNeedleId(buf) + + _, _, err = searchNeedleFromEcx(ecxFile, ecxFileSize, needleId, markNeedleDeleted) + + if err != nil && err != NotFoundError { + ecxFile.Close() + return err + } + + } + + ecxFile.Close() + + os.Remove(baseFileName + ".ecj") + + return nil +} diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index b39776dcf..96cef7169 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -15,6 +15,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/klauspost/reedsolomon" ) @@ -130,6 +131,9 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n if err != nil { return 0, fmt.Errorf("locate in local ec volume: %v", err) } + if size == types.TombstoneFileSize { + return 0, fmt.Errorf("entry %s is deleted", n.Id) + } glog.V(4).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)