From 44d8e359888e605b89d7ba01cb72263e1e10bf27 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 31 Jul 2018 19:12:36 -0700 Subject: [PATCH] add listener for volumd id location for benchmark tool --- weed/command/benchmark.go | 30 ++++++++++++++++-------------- weed/wdclient/masterclient.go | 18 +++++++++++++++--- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 095794479..4ba00dc98 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -18,10 +18,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "context" ) type BenchmarkOptions struct { - server *string + masters *string concurrency *int numberOfFiles *int fileSize *int @@ -37,14 +39,15 @@ type BenchmarkOptions struct { } var ( - b BenchmarkOptions - sharedBytes []byte + b BenchmarkOptions + sharedBytes []byte + masterClient *wdclient.MasterClient ) func init() { cmdBenchmark.Run = runbenchmark // break init cycle cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information") - b.server = cmdBenchmark.Flag.String("server", "localhost:9333", "SeaweedFS master location") + b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location") b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes") b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding") b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread") @@ -113,6 +116,10 @@ func runbenchmark(cmd *Command, args []string) bool { defer pprof.StopCPUProfile() } + masterClient = wdclient.NewMasterClient(context.Background(), "benchmark", strings.Split(*b.masters, ",")) + go masterClient.KeepConnectedToMaster() + masterClient.WaitUntilConnected() + if *b.write { bench_write() } @@ -211,9 +218,9 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { Count: 1, Collection: *b.collection, } - if assignResult, err := operation.Assign(*b.server, ar); err == nil { + if assignResult, err := operation.Assign(masterClient.GetMaster(), ar); err == nil { fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection - if _, err := fp.Upload(0, *b.server, secret); err == nil { + if _, err := fp.Upload(0, masterClient.GetMaster(), secret); err == nil { if random.Intn(100) < *b.deletePercentage { s.total++ delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} @@ -241,7 +248,6 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { func readFiles(fileIdLineChan chan string, s *stat) { defer wait.Done() - random := rand.New(rand.NewSource(time.Now().UnixNano())) for fid := range fileIdLineChan { if len(fid) == 0 { @@ -253,17 +259,13 @@ func readFiles(fileIdLineChan chan string, s *stat) { if *cmdBenchmark.IsDebug { fmt.Printf("reading file %s\n", fid) } - parts := strings.SplitN(fid, ",", 2) - vid := parts[0] start := time.Now() - ret, err := operation.Lookup(*b.server, vid) - if err != nil || len(ret.Locations) == 0 { + url, err := masterClient.LookupFileId(fid) + if err != nil { s.failed++ - println("!!!! volume id ", vid, " location not found!!!!!") + println("!!!! ", fid, " location not found!!!!!") continue } - server := ret.Locations[random.Intn(len(ret.Locations))].Url - url := "http://" + server + "/" + fid if bytesRead, err := util.Get(url); err == nil { s.completed++ s.transferred += int64(len(bytesRead)) diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 347901f1a..6b09e4a90 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util" + "math/rand" ) type MasterClient struct { @@ -32,6 +33,12 @@ func (mc *MasterClient) GetMaster() string { return mc.currentMaster } +func (mc *MasterClient) WaitUntilConnected() { + for mc.currentMaster == "" { + time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond) + } +} + func (mc *MasterClient) KeepConnectedToMaster() { glog.V(0).Infof("%s bootstraps with masters %v", mc.name, mc.masters) for { @@ -50,9 +57,6 @@ func (mc *MasterClient) tryAllMasters() { return err } - glog.V(0).Infof("Connected to %v", master) - mc.currentMaster = master - if err = stream.Send(&master_pb.ClientListenRequest{Name: mc.name}); err != nil { glog.V(0).Infof("failed to send to %s: %v", master, err) return err @@ -74,9 +78,17 @@ func (mc *MasterClient) tryAllMasters() { for _, deletedVid := range volumeLocation.DeletedVids { mc.deleteLocation(deletedVid, loc) } + + if mc.currentMaster == "" { + glog.V(0).Infof("Connected to %v", master) + mc.currentMaster = master + } + } } + }) + mc.currentMaster = "" } }