simpler logic

This commit is contained in:
Chris Lu 2018-11-18 10:07:30 -08:00
parent f8eb988347
commit 9655dc9ca9

View file

@ -3,7 +3,6 @@ package filer2
import ( import (
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"math"
"sort" "sort"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@ -103,137 +102,54 @@ func logPrintf(name string, visibles []*visibleInterval) {
*/ */
} }
func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*visibleInterval) { func mergeIntoVisibles(visibles []*visibleInterval, chunk *filer_pb.FileChunk) (newVisibles []*visibleInterval) {
for _, v := range visibles {
sort.Slice(chunks, func(i, j int) bool { if v.start < chunk.Offset && chunk.Offset < v.stop {
if chunks[i].Offset < chunks[j].Offset { newVisibles = append(newVisibles, newVisibleInterval(
return true v.start,
} chunk.Offset,
if chunks[i].Offset == chunks[j].Offset { v.fileId,
return chunks[i].Mtime < chunks[j].Mtime v.modifiedTime,
}
return false
})
if len(chunks) == 0 {
return
}
var parallelIntervals, intervals []*visibleInterval
var minStopInterval, upToDateInterval *visibleInterval
watermarkStart := chunks[0].Offset
for _, chunk := range chunks {
// log.Printf("checking chunk: [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size))
logPrintf("parallelIntervals", parallelIntervals)
for len(parallelIntervals) > 0 && watermarkStart < chunk.Offset {
logPrintf("parallelIntervals loop 1", parallelIntervals)
logPrintf("parallelIntervals loop 1 intervals", intervals)
minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals)
nextStop := min(minStopInterval.stop, chunk.Offset)
intervals = append(intervals, newVisibleInterval(
max(watermarkStart, minStopInterval.start),
nextStop,
upToDateInterval.fileId,
upToDateInterval.modifiedTime,
)) ))
watermarkStart = nextStop }
logPrintf("parallelIntervals loop intervals =>", intervals) chunkStop := chunk.Offset + int64(chunk.Size)
if v.start < chunkStop && chunkStop < v.stop {
// remove processed intervals, possibly multiple newVisibles = append(newVisibles, newVisibleInterval(
var remaining []*visibleInterval chunkStop,
for _, interval := range parallelIntervals { v.stop,
if interval.stop != watermarkStart { v.fileId,
remaining = append(remaining, interval) v.modifiedTime,
))
}
if chunkStop < v.start || v.stop <= chunk.Offset {
newVisibles = append(newVisibles, v)
} }
} }
parallelIntervals = remaining newVisibles = append(newVisibles, newVisibleInterval(
logPrintf("parallelIntervals loop 2", parallelIntervals)
logPrintf("parallelIntervals loop 2 intervals", intervals)
}
parallelIntervals = append(parallelIntervals, newVisibleInterval(
chunk.Offset, chunk.Offset,
chunk.Offset+int64(chunk.Size), chunk.Offset+int64(chunk.Size),
chunk.FileId, chunk.FileId,
chunk.Mtime, chunk.Mtime,
)) ))
}
logPrintf("parallelIntervals loop 3", parallelIntervals)
logPrintf("parallelIntervals loop 3 intervals", intervals)
for len(parallelIntervals) > 0 {
minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals)
intervals = append(intervals, newVisibleInterval(
max(watermarkStart, minStopInterval.start),
minStopInterval.stop,
upToDateInterval.fileId,
upToDateInterval.modifiedTime,
))
watermarkStart = minStopInterval.stop
// remove processed intervals, possibly multiple
var remaining []*visibleInterval
for _, interval := range parallelIntervals {
if interval.stop != watermarkStart {
remaining = append(remaining, interval)
}
}
parallelIntervals = remaining
}
logPrintf("parallelIntervals loop 4", parallelIntervals)
logPrintf("intervals", intervals)
// merge connected intervals, now the intervals are non-intersecting
var lastIntervalIndex int
var prevIntervalIndex int
for i, interval := range intervals {
if i == 0 {
prevIntervalIndex = i
lastIntervalIndex = i
continue
}
if intervals[i-1].fileId != interval.fileId ||
intervals[i-1].stop < intervals[i].start {
visibles = append(visibles, newVisibleInterval(
intervals[prevIntervalIndex].start,
intervals[i-1].stop,
intervals[prevIntervalIndex].fileId,
intervals[prevIntervalIndex].modifiedTime,
))
prevIntervalIndex = i
}
lastIntervalIndex = i
logPrintf("intervals loop 1 visibles", visibles)
}
visibles = append(visibles, newVisibleInterval(
intervals[prevIntervalIndex].start,
intervals[lastIntervalIndex].stop,
intervals[prevIntervalIndex].fileId,
intervals[prevIntervalIndex].modifiedTime,
))
logPrintf("visibles", visibles)
return return
} }
func findMinStopInterval(intervals []*visibleInterval) (minStopInterval, upToDateInterval *visibleInterval) { func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*visibleInterval) {
var latestMtime int64
latestIntervalIndex := 0 sort.Slice(chunks, func(i, j int) bool {
minStop := int64(math.MaxInt64) return chunks[i].Mtime < chunks[j].Mtime
minIntervalIndex := 0 })
for i, interval := range intervals {
if minStop > interval.stop { for _, chunk := range chunks {
minIntervalIndex = i visibles = mergeIntoVisibles(visibles, chunk)
minStop = interval.stop
} }
if latestMtime < interval.modifiedTime {
latestMtime = interval.modifiedTime sort.Slice(visibles, func(i, j int) bool {
latestIntervalIndex = i return visibles[i].start < visibles[j].start
} })
}
minStopInterval = intervals[minIntervalIndex] logPrintf("visibles", visibles)
upToDateInterval = intervals[latestIntervalIndex]
return return
} }
@ -257,10 +173,3 @@ func min(x, y int64) int64 {
} }
return y return y
} }
func max(x, y int64) int64 {
if x > y {
return x
}
return y
}