From ae1994cbc1692133cc5f4ac30e8060a2b0d9dda8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 18 Jun 2020 09:52:35 -0700 Subject: [PATCH] erasure coding: fix cases where there are no .ecj files --- weed/server/volume_grpc_erasure_coding.go | 4 +--- weed/shell/command_ec_encode.go | 6 ++++++ weed/shell/command_volume_fix_replication.go | 5 ++++- weed/storage/erasure_coding/ec_decoder.go | 6 +++++- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 66dd5bf8d..3d11cff29 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -201,9 +201,7 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se if err := os.Remove(baseFilename + ".ecx"); err != nil { return nil, err } - if err := os.Remove(baseFilename + ".ecj"); err != nil { - return nil, err - } + os.Remove(baseFilename + ".ecj") } if !hasIdxFile { // .vif is used for ec volumes and normal volumes diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 165809d05..5a8146954 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -123,6 +123,8 @@ func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId for _, location := range locations { + fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url) + err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, markErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: uint32(volumeId), @@ -141,6 +143,8 @@ func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { + fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer) + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{ VolumeId: uint32(volumeId), @@ -204,6 +208,8 @@ func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { + fmt.Printf("parallelCopyEcShardsFromSource %d %s\n", volumeId, existingLocation.Url) + // parallelize shardIdChan := make(chan []uint32, len(targetServers)) var wg sync.WaitGroup diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 19da89b67..6b5e4e735 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -121,7 +121,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, VolumeId: volumeInfo.Id, SourceDataNode: sourceNode.dataNode.Id, }) - return fmt.Errorf("copying from %s => %s : %v", sourceNode.dataNode.Id, dst.dataNode.Id, replicateErr) + if replicateErr != nil { + return fmt.Errorf("copying from %s => %s : %v", sourceNode.dataNode.Id, dst.dataNode.Id, replicateErr) + } + return nil }) if err != nil { diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go index ae77cee3f..7b42d02e7 100644 --- a/weed/storage/erasure_coding/ec_decoder.go +++ b/weed/storage/erasure_coding/ec_decoder.go @@ -11,6 +11,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" ) // write .idx file from .ecx and .ecj files @@ -118,9 +119,12 @@ func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId } func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error { + if !util.FileExists(baseFileName+".ecj") { + return nil + } ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644) if openErr != nil { - return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr) + return fmt.Errorf("cannot open ec index %s.ecj: %v", baseFileName, openErr) } defer ecjFile.Close()