mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
findExtraChunksInVolumeServers in consideration of replication
This commit is contained in:
parent
3817e05dd0
commit
7f1383a41e
|
@ -245,13 +245,32 @@ func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInf
|
||||||
func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging bool) error {
|
func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging bool) error {
|
||||||
|
|
||||||
var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
|
var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
|
||||||
|
volumeIdOrphanFileIds := make(map[uint32]map[string]bool)
|
||||||
|
isSeveralReplicas := make(map[uint32]bool)
|
||||||
|
isEcVolumeReplicas := make(map[uint32]bool)
|
||||||
|
isReadOnlyReplicas := make(map[uint32]bool)
|
||||||
|
serverReplicas := make(map[uint32][]pb.ServerAddress)
|
||||||
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
|
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
|
||||||
for volumeId, vinfo := range volumeIdToVInfo {
|
for volumeId, vinfo := range volumeIdToVInfo {
|
||||||
inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, dataNodeId, volumeId, writer, verbose)
|
inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, dataNodeId, volumeId, writer, verbose)
|
||||||
if checkErr != nil {
|
if checkErr != nil {
|
||||||
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
|
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
|
||||||
}
|
}
|
||||||
|
isSeveralReplicas[volumeId] = false
|
||||||
|
if _, found := volumeIdOrphanFileIds[volumeId]; !found {
|
||||||
|
volumeIdOrphanFileIds[volumeId] = make(map[string]bool)
|
||||||
|
} else {
|
||||||
|
isSeveralReplicas[volumeId] = true
|
||||||
|
}
|
||||||
|
for _, fid := range orphanFileIds {
|
||||||
|
if isSeveralReplicas[volumeId] {
|
||||||
|
if _, found := volumeIdOrphanFileIds[volumeId][fid]; !found {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
volumeIdOrphanFileIds[volumeId][fid] = isSeveralReplicas[volumeId]
|
||||||
|
}
|
||||||
|
|
||||||
totalInUseCount += inUseCount
|
totalInUseCount += inUseCount
|
||||||
totalOrphanChunkCount += uint64(len(orphanFileIds))
|
totalOrphanChunkCount += uint64(len(orphanFileIds))
|
||||||
totalOrphanDataSize += orphanDataSize
|
totalOrphanDataSize += orphanDataSize
|
||||||
|
@ -261,31 +280,48 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
|
||||||
fmt.Fprintf(writer, "%s\n", fid)
|
fmt.Fprintf(writer, "%s\n", fid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
isEcVolumeReplicas[volumeId] = vinfo.isEcVolume
|
||||||
|
if isReadOnly, found := isReadOnlyReplicas[volumeId]; !(found && isReadOnly) {
|
||||||
|
isReadOnlyReplicas[volumeId] = vinfo.isReadOnly
|
||||||
|
}
|
||||||
|
serverReplicas[volumeId] = append(serverReplicas[volumeId], vinfo.server)
|
||||||
|
}
|
||||||
|
|
||||||
if applyPurging && len(orphanFileIds) > 0 {
|
for volumeId, orphanReplicaFileIds := range volumeIdOrphanFileIds {
|
||||||
|
if !(applyPurging && len(orphanReplicaFileIds) > 0) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
orphanFileIds := []string{}
|
||||||
|
for fid, foundInAllReplicas := range orphanReplicaFileIds {
|
||||||
|
if !isSeveralReplicas[volumeId] || (isSeveralReplicas[volumeId] && foundInAllReplicas) {
|
||||||
|
orphanFileIds = append(orphanFileIds, fid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !(len(orphanFileIds) > 0) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if verbose {
|
if verbose {
|
||||||
fmt.Fprintf(writer, "purging process for volume %d", volumeId)
|
fmt.Fprintf(writer, "purging process for volume %d", volumeId)
|
||||||
}
|
}
|
||||||
|
|
||||||
if vinfo.isEcVolume {
|
if isEcVolumeReplicas[volumeId] {
|
||||||
fmt.Fprintf(writer, "skip purging for Erasure Coded volume %d.\n", volumeId)
|
fmt.Fprintf(writer, "skip purging for Erasure Coded volume %d.\n", volumeId)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
for _, server := range serverReplicas[volumeId] {
|
||||||
needleVID := needle.VolumeId(volumeId)
|
needleVID := needle.VolumeId(volumeId)
|
||||||
|
|
||||||
if vinfo.isReadOnly {
|
if isReadOnlyReplicas[volumeId] {
|
||||||
err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, vinfo.server, true)
|
err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("mark volume %d read/write: %v", volumeId, err)
|
return fmt.Errorf("mark volume %d read/write: %v", volumeId, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Fprintf(writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, vinfo.server)
|
fmt.Fprintf(writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, server)
|
||||||
defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, vinfo.server, false)
|
defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false)
|
||||||
|
|
||||||
|
fmt.Fprintf(writer, "marked %d on server %v writable for forced purge\n", volumeId, server)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Fprintf(writer, "marked %d on server %v writable for forced purge\n", volumeId, vinfo.server)
|
|
||||||
|
|
||||||
if verbose {
|
if verbose {
|
||||||
fmt.Fprintf(writer, "purging files from volume %d\n", volumeId)
|
fmt.Fprintf(writer, "purging files from volume %d\n", volumeId)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue