delete missing data from volumes in one replica

This commit is contained in:
Konstantin Lebedev 2022-04-25 22:59:46 +05:00
parent 42cc8d8aa3
commit 6d2fda27d2

View file

@ -34,6 +34,7 @@ func init() {
type commandVolumeFsck struct { type commandVolumeFsck struct {
env *CommandEnv env *CommandEnv
forcePurging bool
} }
func (c *commandVolumeFsck) Name() string { func (c *commandVolumeFsck) Name() string {
@ -68,6 +69,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
findMissingChunksInFilerPath := fsckCommand.String("findMissingChunksInFilerPath", "/", "used together with findMissingChunksInFiler") findMissingChunksInFilerPath := fsckCommand.String("findMissingChunksInFilerPath", "/", "used together with findMissingChunksInFiler")
findMissingChunksInVolumeId := fsckCommand.Int("findMissingChunksInVolumeId", 0, "used together with findMissingChunksInFiler") findMissingChunksInVolumeId := fsckCommand.Int("findMissingChunksInVolumeId", 0, "used together with findMissingChunksInFiler")
applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer") applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer")
c.forcePurging = *fsckCommand.Bool("forcePurging", false, "delete missing data from volumes in one replica")
purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler") purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler")
tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files") tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files")
@ -293,7 +295,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
} }
orphanFileIds := []string{} orphanFileIds := []string{}
for fid, foundInAllReplicas := range orphanReplicaFileIds { for fid, foundInAllReplicas := range orphanReplicaFileIds {
if !isSeveralReplicas[volumeId] || (isSeveralReplicas[volumeId] && foundInAllReplicas) { if !isSeveralReplicas[volumeId] || c.forcePurging || (isSeveralReplicas[volumeId] && foundInAllReplicas) {
orphanFileIds = append(orphanFileIds, fid) orphanFileIds = append(orphanFileIds, fid)
} }
} }
@ -301,7 +303,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
continue continue
} }
if verbose { if verbose {
fmt.Fprintf(writer, "purging process for volume %d", volumeId) fmt.Fprintf(writer, "purging process for volume %d.\n", volumeId)
} }
if isEcVolumeReplicas[volumeId] { if isEcVolumeReplicas[volumeId] {