EC: after ec encoding, the source ec shards may fail to purge if the volume server has multiple disk locations

related to https://github.com/seaweedfs/seaweedfs/issues/3459
This commit is contained in:
chrislu 2022-08-21 14:52:37 -07:00
parent 8b3429858d
commit 5790d01c6f

View file

@ -173,24 +173,37 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds) glog.V(0).Infof("ec volume %s shard delete %v", bName, req.ShardIds)
for _, location := range vs.store.Locations {
if err := deleteEcShardIdsForEachLocation(bName, location, req.ShardIds); err != nil {
glog.Errorf("deleteEcShards from %s %s.%v: %v", location, bName, req.ShardIds, err)
return nil, err
}
}
return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
}
func deleteEcShardIdsForEachLocation(bName string, location *storage.DiskLocation, shardIds []uint32) error {
found := false found := false
var indexBaseFilename, dataBaseFilename string
for _, location := range vs.store.Locations { indexBaseFilename := path.Join(location.IdxDirectory, bName)
dataBaseFilename := path.Join(location.Directory, bName)
if util.FileExists(path.Join(location.IdxDirectory, bName+".ecx")) { if util.FileExists(path.Join(location.IdxDirectory, bName+".ecx")) {
for _, shardId := range shardIds {
shardFileName := dataBaseFilename + erasure_coding.ToExt(int(shardId))
if util.FileExists(shardFileName) {
found = true found = true
indexBaseFilename = path.Join(location.IdxDirectory, bName) os.Remove(shardFileName)
dataBaseFilename = path.Join(location.Directory, bName)
for _, shardId := range req.ShardIds {
os.Remove(dataBaseFilename + erasure_coding.ToExt(int(shardId)))
} }
break
} }
} }
if !found { if !found {
return nil, nil return nil
} }
// check whether to delete the .ecx and .ecj file also // check whether to delete the .ecx and .ecj file also
@ -198,15 +211,14 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
hasIdxFile := false hasIdxFile := false
existingShardCount := 0 existingShardCount := 0
for _, location := range vs.store.Locations {
fileInfos, err := os.ReadDir(location.Directory) fileInfos, err := os.ReadDir(location.Directory)
if err != nil { if err != nil {
continue return err
} }
if location.IdxDirectory != location.Directory { if location.IdxDirectory != location.Directory {
idxFileInfos, err := os.ReadDir(location.IdxDirectory) idxFileInfos, err := os.ReadDir(location.IdxDirectory)
if err != nil { if err != nil {
continue return err
} }
fileInfos = append(fileInfos, idxFileInfos...) fileInfos = append(fileInfos, idxFileInfos...)
} }
@ -223,20 +235,20 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
existingShardCount++ existingShardCount++
} }
} }
}
if hasEcxFile && existingShardCount == 0 { if hasEcxFile && existingShardCount == 0 {
if err := os.Remove(indexBaseFilename + ".ecx"); err != nil { if err := os.Remove(indexBaseFilename + ".ecx"); err != nil {
return nil, err return err
} }
os.Remove(indexBaseFilename + ".ecj") os.Remove(indexBaseFilename + ".ecj")
}
if !hasIdxFile { if !hasIdxFile {
// .vif is used for ec volumes and normal volumes // .vif is used for ec volumes and normal volumes
os.Remove(dataBaseFilename + ".vif") os.Remove(dataBaseFilename + ".vif")
} }
}
return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil return nil
} }
func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) { func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {