From 054374c765bc0dea51c524bf8cb137d1ea356254 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 12 Mar 2014 10:30:57 -0700 Subject: [PATCH] in progress, trying to make benchmark working better to reuse http connections. --- go/util/http_util.go | 20 ++++- go/weed/benchmark.go | 116 ++++++++++++++++++--------- go/weed/weed_server/master_server.go | 8 +- 3 files changed, 104 insertions(+), 40 deletions(-) diff --git a/go/util/http_util.go b/go/util/http_util.go index 80589dcfa..dd6b9a67d 100644 --- a/go/util/http_util.go +++ b/go/util/http_util.go @@ -2,13 +2,26 @@ package util import ( "code.google.com/p/weed-fs/go/glog" + "fmt" "io/ioutil" "net/http" "net/url" ) +var ( + client *http.Client + Transport *http.Transport +) + +func init() { + Transport = &http.Transport{ + MaxIdleConnsPerHost: 1024, + } + client = &http.Client{Transport: Transport} +} + func Post(url string, values url.Values) ([]byte, error) { - r, err := http.PostForm(url, values) + r, err := client.PostForm(url, values) if err != nil { glog.V(0).Infoln("post to", url, err) return nil, err @@ -23,13 +36,16 @@ func Post(url string, values url.Values) ([]byte, error) { } func Get(url string) ([]byte, error) { - r, err := http.Get(url) + r, err := client.Get(url) if err != nil { glog.V(0).Infoln("getting ", url, err) return nil, err } defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) + if r.StatusCode != 200 { + return nil, fmt.Errorf("%s: %s", url, r.Status) + } if err != nil { glog.V(0).Infoln("read get result from", url, err) return nil, err diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go index 99faf15b0..d7d80729d 100644 --- a/go/weed/benchmark.go +++ b/go/weed/benchmark.go @@ -11,6 +11,7 @@ import ( "math/rand" "os" "runtime" + "runtime/pprof" "strings" "sync" "time" @@ -26,6 +27,7 @@ type BenchmarkOptions struct { read *bool sequentialRead *bool collection *string + cpuprofile *string vid2server map[string]string //cache for vid locations } @@ -45,6 +47,8 @@ func init() { b.read = cmdBenchmark.Flag.Bool("read", true, "enable read") b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file") b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection") + b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "write cpu profile to file") + b.vid2server = make(map[string]string) } var cmdBenchmark = &Command{ @@ -80,62 +84,86 @@ var ( ) func runbenchmark(cmd *Command, args []string) bool { - finishChan := make(chan bool) - fileIdLineChan := make(chan string) - b.vid2server = make(map[string]string) - fmt.Printf("This is Weed File System version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH) + if *b.cpuprofile != "" { + f, err := os.Create(*b.cpuprofile) + if err != nil { + glog.Fatal(err) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } if *b.write { - writeStats = newStats() - idChan := make(chan int) - wait.Add(*b.concurrency) - go writeFileIds(*b.idListFile, fileIdLineChan, finishChan) - for i := 0; i < *b.concurrency; i++ { - go writeFiles(idChan, fileIdLineChan, writeStats) - } - writeStats.start = time.Now() - go writeStats.checkProgress("Writing Benchmark", finishChan) - for i := 0; i < *b.numberOfFiles; i++ { - idChan <- i - } - close(idChan) - wait.Wait() - writeStats.end = time.Now() - wait.Add(1) - finishChan <- true - finishChan <- true - wait.Wait() - writeStats.printStats() + bench_write() } if *b.read { - readStats = newStats() - wait.Add(*b.concurrency) - go readFileIds(*b.idListFile, fileIdLineChan) - readStats.start = time.Now() - go readStats.checkProgress("Randomly Reading Benchmark", finishChan) - for i := 0; i < *b.concurrency; i++ { - go readFiles(fileIdLineChan, readStats) - } - wait.Wait() - finishChan <- true - readStats.end = time.Now() - readStats.printStats() + bench_read() } return true } +func bench_write() { + fileIdLineChan := make(chan string) + finishChan := make(chan bool) + writeStats = newStats() + idChan := make(chan int) + wait.Add(*b.concurrency) + go writeFileIds(*b.idListFile, fileIdLineChan, finishChan) + for i := 0; i < *b.concurrency; i++ { + go writeFiles(idChan, fileIdLineChan, writeStats) + } + writeStats.start = time.Now() + go writeStats.checkProgress("Writing Benchmark", finishChan) + for i := 0; i < *b.numberOfFiles; i++ { + idChan <- i + } + close(idChan) + wait.Wait() + writeStats.end = time.Now() + wait.Add(1) + finishChan <- true + finishChan <- true + close(finishChan) + wait.Wait() + writeStats.printStats() +} + +func bench_read() { + fileIdLineChan := make(chan string) + finishChan := make(chan bool) + readStats = newStats() + wait.Add(*b.concurrency) + go readFileIds(*b.idListFile, fileIdLineChan) + readStats.start = time.Now() + go readStats.checkProgress("Randomly Reading Benchmark", finishChan) + for i := 0; i < *b.concurrency; i++ { + go readFiles(fileIdLineChan, readStats) + } + wait.Wait() + finishChan <- true + close(finishChan) + readStats.end = time.Now() + readStats.printStats() +} + func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) { + serverLimitChan := make(map[string]chan bool) for { if id, ok := <-idChan; ok { start := time.Now() fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: int64(*b.fileSize)}, FileSize: int64(*b.fileSize)} if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil { fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection + if _, ok := serverLimitChan[fp.Server]; !ok { + serverLimitChan[fp.Server] = make(chan bool, 7) + } + serverLimitChan[fp.Server] <- true fp.Upload(0, *b.server) writeStats.addSample(time.Now().Sub(start)) + <-serverLimitChan[fp.Server] fileIdLineChan <- fp.Fid s.transferred += int64(*b.fileSize) s.completed++ @@ -154,6 +182,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) { } func readFiles(fileIdLineChan chan string, s *stats) { + serverLimitChan := make(map[string]chan bool) + masterLimitChan := make(chan bool, 7) for { if fid, ok := <-fileIdLineChan; ok { if len(fid) == 0 { @@ -169,14 +199,20 @@ func readFiles(fileIdLineChan chan string, s *stats) { vid := parts[0] start := time.Now() if server, ok := b.vid2server[vid]; !ok { + masterLimitChan <- true if ret, err := operation.Lookup(*b.server, vid); err == nil { if len(ret.Locations) > 0 { server = ret.Locations[0].PublicUrl b.vid2server[vid] = server } } + <-masterLimitChan } if server, ok := b.vid2server[vid]; ok { + if _, ok := serverLimitChan[server]; !ok { + serverLimitChan[server] = make(chan bool, 7) + } + serverLimitChan[server] <- true url := "http://" + server + "/" + fid if bytesRead, err := util.Get(url); err == nil { s.completed++ @@ -186,6 +222,7 @@ func readFiles(fileIdLineChan chan string, s *stats) { s.failed++ println("!!!! Failed to read from ", url, " !!!!!") } + <-serverLimitChan[server] } else { s.failed++ println("!!!! volume id ", vid, " location not found!!!!!") @@ -270,7 +307,12 @@ func newStats() *stats { } func (s *stats) addSample(d time.Duration) { - s.data[int(d/benchBucket)]++ + index := int(d / benchBucket) + if 0 <= index && index < len(s.data) { + s.data[int(d/benchBucket)]++ + } else { + fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000) + } } func (s *stats) checkProgress(testName string, finishChan chan bool) { diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index d17e57531..606884283 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -5,6 +5,7 @@ import ( "code.google.com/p/weed-fs/go/replication" "code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/topology" + "code.google.com/p/weed-fs/go/util" "github.com/goraft/raft" "github.com/gorilla/mux" "net/http" @@ -28,7 +29,8 @@ type MasterServer struct { vg *replication.VolumeGrowth vgLock sync.Mutex - raftServer *RaftServer + raftServer *RaftServer + bounedLeaderChan chan int } func NewMasterServer(r *mux.Router, version string, port int, metaFolder string, @@ -47,6 +49,7 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string, garbageThreshold: garbageThreshold, whiteList: whiteList, } + ms.bounedLeaderChan = make(chan int, 16) seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq")) var e error if ms.topo, e = topology.NewTopology("topo", confFile, seq, @@ -95,6 +98,8 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ if ms.IsLeader() { f(w, r) } else { + ms.bounedLeaderChan <- 1 + defer func() { <-ms.bounedLeaderChan }() targetUrl, err := url.Parse("http://" + ms.raftServer.Leader()) if err != nil { writeJsonQuiet(w, r, map[string]interface{}{"error": "Leader URL Parse Error " + err.Error()}) @@ -102,6 +107,7 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ } glog.V(4).Infoln("proxying to leader", ms.raftServer.Leader()) proxy := httputil.NewSingleHostReverseProxy(targetUrl) + proxy.Transport = util.Transport proxy.ServeHTTP(w, r) } }