diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 2ef89a040..c0d666b6d 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -90,25 +90,62 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv // VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) { + foundExistingVolume, err := vs.doDeleteUnmountedShards(ctx, req) + if err != nil { + return nil, err + } + + if !foundExistingVolume { + err = vs.doDeleteMountedShards(ctx, req) + } + + return &volume_server_pb.VolumeEcShardsDeleteResponse{}, err +} + +// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume +func (vs *VolumeServer) doDeleteUnmountedShards(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (foundVolume bool, err error) { + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { - return nil, fmt.Errorf("volume %d not found", req.VolumeId) + return false, nil } + baseFileName := v.FileName() for _, shardId := range req.ShardIds { if err := os.Remove(baseFileName + erasure_coding.ToExt(int(shardId))); err != nil { - return nil, err + return true, err } } if req.ShouldDeleteEcx { if err := os.Remove(baseFileName + ".ecx"); err != nil { - return nil, err + return true, err } } - return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil + return true, nil +} + +// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume +func (vs *VolumeServer) doDeleteMountedShards(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (error) { + + ecv, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId)) + if !found { + return fmt.Errorf("volume %d not found", req.VolumeId) + } + + for _, shardId := range req.ShardIds { + if shard, found := ecv.DeleteEcVolumeShard(erasure_coding.ShardId(shardId)); found { + shard.Destroy() + } + } + + if len(ecv.Shards) == 0 { + ecv.Destroy() + } + + return nil } func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) { diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 779d9ecd7..a3d097036 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -159,7 +159,7 @@ func balanceEcVolumes(commandEnv *commandEnv, collection string, applyBalancing if isOverLimit { - if err := spreadShardsIntoMoreDataNodes(ctx, commandEnv, averageShardsPerEcNode, vid, locations, possibleDestinationEcNodes, applyBalancing); err != nil { + if err := spreadShardsIntoMoreDataNodes(ctx, commandEnv, averageShardsPerEcNode, collection, vid, locations, possibleDestinationEcNodes, applyBalancing); err != nil { return err } @@ -170,7 +170,7 @@ func balanceEcVolumes(commandEnv *commandEnv, collection string, applyBalancing return nil } -func spreadShardsIntoMoreDataNodes(ctx context.Context, commandEnv *commandEnv, averageShardsPerEcNode int, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { +func spreadShardsIntoMoreDataNodes(ctx context.Context, commandEnv *commandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { for _, ecNode := range existingLocations { @@ -185,7 +185,7 @@ func spreadShardsIntoMoreDataNodes(ctx context.Context, commandEnv *commandEnv, fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId) - err := pickOneEcNodeAndMoveOneShard(ctx, commandEnv, averageShardsPerEcNode, ecNode, vid, shardId, possibleDestinationEcNodes, applyBalancing) + err := pickOneEcNodeAndMoveOneShard(ctx, commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing) if err != nil { return err } @@ -197,7 +197,7 @@ func spreadShardsIntoMoreDataNodes(ctx context.Context, commandEnv *commandEnv, return nil } -func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *commandEnv, averageShardsPerEcNode int, existingLocation *EcNode, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { +func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *commandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { sortEcNodes(possibleDestinationEcNodes) @@ -215,7 +215,7 @@ func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *commandEnv, a fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id) - err := moveOneShardToEcNode(ctx, commandEnv, existingLocation, vid, shardId, destEcNode, applyBalancing) + err := moveOneShardToEcNode(ctx, commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing) if err != nil { return err } @@ -227,10 +227,23 @@ func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *commandEnv, a return nil } -func moveOneShardToEcNode(ctx context.Context, commandEnv *commandEnv, existingLocation *EcNode, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error { +func moveOneShardToEcNode(ctx context.Context, commandEnv *commandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error { fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id) - return nil + + if !applyBalancing { + return nil + } + + // ask destination node to copy shard and the ecx file from source node, and mount it + copiedShardIds, err := oneServerCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, uint32(shardId), 1, vid, collection, existingLocation.info.Id) + if err != nil { + return err + } + + // ask source node to delete the shard, and maybe the ecx file + return sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds) + } func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits { diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 7ecd7bb8c..14d1ae96b 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -69,7 +69,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr } // apply to all volumes in the collection - volumeIds, err := collectVolumeForEcEncode(ctx, commandEnv, *collection, *quietPeriod) + volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *quietPeriod) if err != nil { return err } @@ -143,7 +143,7 @@ func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needl } // ask the source volume server to clean up copied ec shards - err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0], copiedShardIds) + err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) if err != nil { return fmt.Errorf("sourceServerDeleteEcShards %s %d.%v: %v", existingLocations[0], volumeId, copiedShardIds, err) } @@ -177,7 +177,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia go func(server *EcNode, startFromShardId uint32, shardCount int) { defer wg.Done() copiedShardIds, copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server, - startFromShardId, shardCount, volumeId, collection, existingLocation) + startFromShardId, shardCount, volumeId, collection, existingLocation.Url) if copyErr != nil { err = copyErr } else { @@ -203,23 +203,23 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, targetServer *EcNode, startFromShardId uint32, shardCount int, - volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (copiedShardIds []uint32, err error) { + volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) { var shardIdsToCopy []uint32 for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ { - fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.info.Id) + fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation, targetServer.info.Id) shardIdsToCopy = append(shardIdsToCopy, shardId) } err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - if targetServer.info.Id != existingLocation.Url { + if targetServer.info.Id != existingLocation { _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, ShardIds: shardIdsToCopy, - SourceDataNode: existingLocation.Url, + SourceDataNode: existingLocation, }) if copyErr != nil { return copyErr @@ -235,9 +235,9 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di return mountErr } - if targetServer.info.Id != existingLocation.Url { + if targetServer.info.Id != existingLocation { copiedShardIds = shardIdsToCopy - glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation.Url, volumeId, copiedShardIds) + glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds) } return nil @@ -251,11 +251,11 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di } func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, - volumeId needle.VolumeId, sourceLocation wdclient.Location, toBeDeletedShardIds []uint32) error { + volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error { shouldDeleteEcx := len(toBeDeletedShardIds) == erasure_coding.TotalShardsCount - return operation.WithVolumeServerClient(sourceLocation.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ VolumeId: uint32(volumeId), ShardIds: toBeDeletedShardIds, @@ -349,7 +349,7 @@ func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcN return } -func collectVolumeForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { +func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { var resp *master_pb.VolumeListResponse err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index ff72baf33..21f26e093 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -73,7 +73,7 @@ func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding if !found { return false } - if deleted := ecVolume.DeleteEcVolumeShard(shardId); deleted { + if _, deleted := ecVolume.DeleteEcVolumeShard(shardId); deleted { if len(ecVolume.Shards) == 0 { delete(l.ecVolumes, vid) } diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 8bd7237f5..44d8ca80f 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -61,7 +61,7 @@ func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool { return true } -func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) bool { +func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, deleted bool) { foundPosition := -1 for i, s := range ev.Shards { if s.ShardId == shardId { @@ -69,11 +69,13 @@ func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) bool { } } if foundPosition < 0 { - return false + return nil, false } + ecVolumeShard = ev.Shards[foundPosition] + ev.Shards = append(ev.Shards[:foundPosition], ev.Shards[foundPosition+1:]...) - return true + return ecVolumeShard, true } func (ev *EcVolume) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) { @@ -99,11 +101,16 @@ func (ev *EcVolume) Destroy() { ev.Close() - baseFileName := EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId)) for _, s := range ev.Shards { s.Destroy() } - os.Remove(baseFileName + ".ecx") + os.Remove(ev.FileName() + ".ecx") +} + +func (ev *EcVolume) FileName() string { + + return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId)) + } func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {