seaweedfs/unmaintained/load_test/load_test_leveldb/load_test_leveldb.go

156 lines
2.9 KiB
Go
Raw Permalink Normal View History

2019-07-21 17:45:25 +00:00
package load_test_leveldb
2019-07-03 04:25:18 +00:00
import (
"crypto/md5"
"flag"
"fmt"
"io"
"log"
"math/rand"
"os"
"sync"
"time"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)
var (
dir = flag.String("dir", "./t", "directory to store level db files")
2019-07-03 04:25:18 +00:00
useHash = flag.Bool("isHash", false, "hash the path as the key")
dbCount = flag.Int("dbCount", 1, "the number of leveldb")
)
func main() {
flag.Parse()
totalTenants := 300
totalYears := 3
opts := &opt.Options{
BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 4,
}
var dbs []*leveldb.DB
var chans []chan string
for d := 0; d < *dbCount; d++ {
2019-07-03 04:25:18 +00:00
dbFolder := fmt.Sprintf("%s/%02d", *dir, d)
os.MkdirAll(dbFolder, 0755)
db, err := leveldb.OpenFile(dbFolder, opts)
if err != nil {
log.Printf("filer store open dir %s: %v", *dir, err)
return
}
dbs = append(dbs, db)
chans = append(chans, make(chan string, 1024))
}
var wg sync.WaitGroup
for d := 0; d < *dbCount; d++ {
2019-07-03 04:25:18 +00:00
wg.Add(1)
go func(d int) {
2019-07-03 04:25:18 +00:00
defer wg.Done()
ch := chans[d]
db := dbs[d]
for p := range ch {
if *useHash {
insertAsHash(db, p)
} else {
2019-07-03 04:25:18 +00:00
insertAsFullPath(db, p)
}
}
}(d)
}
counter := int64(0)
lastResetTime := time.Now()
r := rand.New(rand.NewSource(35))
for y := 0; y < totalYears; y++ {
for m := 0; m < 12; m++ {
for d := 0; d < 31; d++ {
for h := 0; h < 24; h++ {
for min := 0; min < 60; min++ {
for i := 0; i < totalTenants; i++ {
p := fmt.Sprintf("tenent%03d/%4d/%02d/%02d/%02d/%02d", i, 2015+y, 1+m, 1+d, h, min)
x := r.Intn(*dbCount)
chans[x] <- p
counter++
}
t := time.Now()
if lastResetTime.Add(time.Second).Before(t) {
p := fmt.Sprintf("%4d/%02d/%02d/%02d/%02d", 2015+y, 1+m, 1+d, h, min)
fmt.Printf("%s = %4d put/sec\n", p, counter)
counter = 0
lastResetTime = t
}
}
}
}
}
}
for d := 0; d < *dbCount; d++ {
2019-07-03 04:25:18 +00:00
close(chans[d])
}
wg.Wait()
}
func insertAsFullPath(db *leveldb.DB, p string) {
_, getErr := db.Get([]byte(p), nil)
if getErr == leveldb.ErrNotFound {
putErr := db.Put([]byte(p), []byte(p), nil)
if putErr != nil {
log.Printf("failed to put %s", p)
}
}
}
func insertAsHash(db *leveldb.DB, p string) {
key := fmt.Sprintf("%d:%s", hashToLong(p), p)
_, getErr := db.Get([]byte(key), nil)
if getErr == leveldb.ErrNotFound {
putErr := db.Put([]byte(key), []byte(p), nil)
if putErr != nil {
log.Printf("failed to put %s", p)
}
}
}
func hashToLong(dir string) (v int64) {
h := md5.New()
io.WriteString(h, dir)
b := h.Sum(nil)
v += int64(b[0])
v <<= 8
v += int64(b[1])
v <<= 8
v += int64(b[2])
v <<= 8
v += int64(b[3])
v <<= 8
v += int64(b[4])
v <<= 8
v += int64(b[5])
v <<= 8
v += int64(b[6])
v <<= 8
v += int64(b[7])
return
}