seaweedfs/weed/storage/needle_map_metric.go

166 lines
4 KiB
Go
Raw Normal View History

package storage
import (
"fmt"
2019-04-15 06:00:37 +00:00
"os"
"sync/atomic"
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
2022-08-23 15:53:24 +00:00
boom "github.com/tylertreat/BoomFilters"
)
type mapMetric struct {
2019-04-15 06:00:37 +00:00
DeletionCounter uint32 `json:"DeletionCounter"`
FileCounter uint32 `json:"FileCounter"`
DeletionByteCounter uint64 `json:"DeletionByteCounter"`
FileByteCounter uint64 `json:"FileByteCounter"`
MaximumFileKey uint64 `json:"MaxFileKey"`
}
func (mm *mapMetric) logDelete(deletedByteCount Size) {
if mm == nil {
return
}
2019-04-15 06:00:37 +00:00
mm.LogDeletionCounter(deletedByteCount)
}
func (mm *mapMetric) logPut(key NeedleId, oldSize Size, newSize Size) {
if mm == nil {
return
}
2019-04-15 06:00:37 +00:00
mm.MaybeSetMaxFileKey(key)
mm.LogFileCounter(newSize)
2020-08-19 01:01:37 +00:00
if oldSize > 0 && oldSize.IsValid() {
2019-04-15 06:00:37 +00:00
mm.LogDeletionCounter(oldSize)
}
2019-04-15 06:00:37 +00:00
}
func (mm *mapMetric) LogFileCounter(newSize Size) {
if mm == nil {
return
}
2019-04-15 06:00:37 +00:00
atomic.AddUint32(&mm.FileCounter, 1)
atomic.AddUint64(&mm.FileByteCounter, uint64(newSize))
}
func (mm *mapMetric) LogDeletionCounter(oldSize Size) {
if mm == nil {
return
}
if oldSize > 0 {
2019-04-15 06:00:37 +00:00
atomic.AddUint32(&mm.DeletionCounter, 1)
atomic.AddUint64(&mm.DeletionByteCounter, uint64(oldSize))
}
}
func (mm *mapMetric) ContentSize() uint64 {
if mm == nil {
return 0
}
2019-04-15 06:00:37 +00:00
return atomic.LoadUint64(&mm.FileByteCounter)
}
func (mm *mapMetric) DeletedSize() uint64 {
if mm == nil {
return 0
}
2019-04-15 06:00:37 +00:00
return atomic.LoadUint64(&mm.DeletionByteCounter)
}
func (mm *mapMetric) FileCount() int {
if mm == nil {
return 0
}
2019-04-15 06:00:37 +00:00
return int(atomic.LoadUint32(&mm.FileCounter))
}
func (mm *mapMetric) DeletedCount() int {
if mm == nil {
return 0
}
2019-04-15 06:00:37 +00:00
return int(atomic.LoadUint32(&mm.DeletionCounter))
}
func (mm *mapMetric) MaxFileKey() NeedleId {
if mm == nil {
return 0
}
2019-04-15 06:00:37 +00:00
t := uint64(mm.MaximumFileKey)
2019-06-21 08:14:10 +00:00
return Uint64ToNeedleId(t)
2019-04-15 06:00:37 +00:00
}
func (mm *mapMetric) MaybeSetMaxFileKey(key NeedleId) {
if mm == nil {
return
}
2019-04-15 06:00:37 +00:00
if key > mm.MaxFileKey() {
atomic.StoreUint64(&mm.MaximumFileKey, uint64(key))
}
}
func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
mm = &mapMetric{}
2021-08-30 09:03:08 +00:00
var bf *boom.BloomFilter
buf := make([]byte, NeedleIdSize)
err = reverseWalkIndexFile(r, func(entryCount int64) {
2021-08-30 09:03:08 +00:00
bf = boom.NewBloomFilter(uint(entryCount), 0.001)
}, func(key NeedleId, offset Offset, size Size) error {
2019-04-15 06:00:37 +00:00
mm.MaybeSetMaxFileKey(key)
NeedleIdToBytes(buf, key)
2020-08-19 01:01:37 +00:00
if size.IsValid() {
mm.FileByteCounter += uint64(size)
}
mm.FileCounter++
2022-08-23 15:53:24 +00:00
if !bf.TestAndAdd(buf) {
// if !size.IsValid(), then this file is deleted already
if !size.IsValid() {
mm.DeletionCounter++
}
} else {
// deleted file
mm.DeletionCounter++
2020-08-19 01:01:37 +00:00
if size.IsValid() {
// previously already deleted file
mm.DeletionByteCounter += uint64(size)
}
}
return nil
})
return
}
func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key NeedleId, offset Offset, size Size) error) error {
fi, err := r.Stat()
if err != nil {
return fmt.Errorf("file %s stat error: %v", r.Name(), err)
}
fileSize := fi.Size()
2019-04-19 07:39:34 +00:00
if fileSize%NeedleMapEntrySize != 0 {
return fmt.Errorf("unexpected file %s size: %d", r.Name(), fileSize)
}
2019-04-19 07:39:34 +00:00
entryCount := fileSize / NeedleMapEntrySize
2018-07-25 05:17:56 +00:00
initFn(entryCount)
2018-07-25 05:17:56 +00:00
batchSize := int64(1024 * 4)
2019-04-19 07:39:34 +00:00
bytes := make([]byte, NeedleMapEntrySize*batchSize)
2018-07-25 05:17:56 +00:00
nextBatchSize := entryCount % batchSize
if nextBatchSize == 0 {
nextBatchSize = batchSize
}
remainingCount := entryCount - nextBatchSize
for remainingCount >= 0 {
2019-04-19 07:39:34 +00:00
_, e := r.ReadAt(bytes[:NeedleMapEntrySize*nextBatchSize], NeedleMapEntrySize*remainingCount)
// glog.V(0).Infoln("file", r.Name(), "readerOffset", NeedleMapEntrySize*remainingCount, "count", count, "e", e)
2018-07-25 05:17:56 +00:00
if e != nil {
return e
}
2018-07-25 05:17:56 +00:00
for i := int(nextBatchSize) - 1; i >= 0; i-- {
key, offset, size := idx.IdxFileEntry(bytes[i*NeedleMapEntrySize : i*NeedleMapEntrySize+NeedleMapEntrySize])
2018-07-25 05:17:56 +00:00
if e = fn(key, offset, size); e != nil {
return e
}
}
nextBatchSize = batchSize
remainingCount -= nextBatchSize
}
return nil
}