add listener for volumd id location for benchmark tool

This commit is contained in:
Chris Lu 2018-07-31 19:12:36 -07:00
parent d09606c65b
commit 44d8e35988
2 changed files with 31 additions and 17 deletions

View file

@ -18,10 +18,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"context"
) )
type BenchmarkOptions struct { type BenchmarkOptions struct {
server *string masters *string
concurrency *int concurrency *int
numberOfFiles *int numberOfFiles *int
fileSize *int fileSize *int
@ -39,12 +41,13 @@ type BenchmarkOptions struct {
var ( var (
b BenchmarkOptions b BenchmarkOptions
sharedBytes []byte sharedBytes []byte
masterClient *wdclient.MasterClient
) )
func init() { func init() {
cmdBenchmark.Run = runbenchmark // break init cycle cmdBenchmark.Run = runbenchmark // break init cycle
cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information") cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
b.server = cmdBenchmark.Flag.String("server", "localhost:9333", "SeaweedFS master location") b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location")
b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes") b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding") b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread") b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
@ -113,6 +116,10 @@ func runbenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }
masterClient = wdclient.NewMasterClient(context.Background(), "benchmark", strings.Split(*b.masters, ","))
go masterClient.KeepConnectedToMaster()
masterClient.WaitUntilConnected()
if *b.write { if *b.write {
bench_write() bench_write()
} }
@ -211,9 +218,9 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
Count: 1, Count: 1,
Collection: *b.collection, Collection: *b.collection,
} }
if assignResult, err := operation.Assign(*b.server, ar); err == nil { if assignResult, err := operation.Assign(masterClient.GetMaster(), ar); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
if _, err := fp.Upload(0, *b.server, secret); err == nil { if _, err := fp.Upload(0, masterClient.GetMaster(), secret); err == nil {
if random.Intn(100) < *b.deletePercentage { if random.Intn(100) < *b.deletePercentage {
s.total++ s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
@ -241,7 +248,6 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
func readFiles(fileIdLineChan chan string, s *stat) { func readFiles(fileIdLineChan chan string, s *stat) {
defer wait.Done() defer wait.Done()
random := rand.New(rand.NewSource(time.Now().UnixNano()))
for fid := range fileIdLineChan { for fid := range fileIdLineChan {
if len(fid) == 0 { if len(fid) == 0 {
@ -253,17 +259,13 @@ func readFiles(fileIdLineChan chan string, s *stat) {
if *cmdBenchmark.IsDebug { if *cmdBenchmark.IsDebug {
fmt.Printf("reading file %s\n", fid) fmt.Printf("reading file %s\n", fid)
} }
parts := strings.SplitN(fid, ",", 2)
vid := parts[0]
start := time.Now() start := time.Now()
ret, err := operation.Lookup(*b.server, vid) url, err := masterClient.LookupFileId(fid)
if err != nil || len(ret.Locations) == 0 { if err != nil {
s.failed++ s.failed++
println("!!!! volume id ", vid, " location not found!!!!!") println("!!!! ", fid, " location not found!!!!!")
continue continue
} }
server := ret.Locations[random.Intn(len(ret.Locations))].Url
url := "http://" + server + "/" + fid
if bytesRead, err := util.Get(url); err == nil { if bytesRead, err := util.Get(url); err == nil {
s.completed++ s.completed++
s.transferred += int64(len(bytesRead)) s.transferred += int64(len(bytesRead))

View file

@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"math/rand"
) )
type MasterClient struct { type MasterClient struct {
@ -32,6 +33,12 @@ func (mc *MasterClient) GetMaster() string {
return mc.currentMaster return mc.currentMaster
} }
func (mc *MasterClient) WaitUntilConnected() {
for mc.currentMaster == "" {
time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
}
}
func (mc *MasterClient) KeepConnectedToMaster() { func (mc *MasterClient) KeepConnectedToMaster() {
glog.V(0).Infof("%s bootstraps with masters %v", mc.name, mc.masters) glog.V(0).Infof("%s bootstraps with masters %v", mc.name, mc.masters)
for { for {
@ -50,9 +57,6 @@ func (mc *MasterClient) tryAllMasters() {
return err return err
} }
glog.V(0).Infof("Connected to %v", master)
mc.currentMaster = master
if err = stream.Send(&master_pb.ClientListenRequest{Name: mc.name}); err != nil { if err = stream.Send(&master_pb.ClientListenRequest{Name: mc.name}); err != nil {
glog.V(0).Infof("failed to send to %s: %v", master, err) glog.V(0).Infof("failed to send to %s: %v", master, err)
return err return err
@ -74,9 +78,17 @@ func (mc *MasterClient) tryAllMasters() {
for _, deletedVid := range volumeLocation.DeletedVids { for _, deletedVid := range volumeLocation.DeletedVids {
mc.deleteLocation(deletedVid, loc) mc.deleteLocation(deletedVid, loc)
} }
if mc.currentMaster == "" {
glog.V(0).Infof("Connected to %v", master)
mc.currentMaster = master
}
} }
} }
}) })
mc.currentMaster = "" mc.currentMaster = ""
} }
} }