Remove a volume server concurrent connection limit.

This commit is contained in:
chrislusf 2014-12-04 21:22:09 -08:00
parent 89fd1e4b6e
commit 7a6394378c

View file

@ -96,13 +96,8 @@ var (
wait sync.WaitGroup wait sync.WaitGroup
writeStats *stats writeStats *stats
readStats *stats readStats *stats
serverLimitChan map[string]chan bool
) )
func init() {
serverLimitChan = make(map[string]chan bool)
}
func runbenchmark(cmd *Command, args []string) bool { func runbenchmark(cmd *Command, args []string) bool {
fmt.Printf("This is Seaweed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) fmt.Printf("This is Seaweed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
if *b.maxCpu < 1 { if *b.maxCpu < 1 {
@ -148,10 +143,11 @@ func bench_write() {
close(idChan) close(idChan)
wait.Wait() wait.Wait()
writeStats.end = time.Now() writeStats.end = time.Now()
wait.Add(1) wait.Add(2)
finishChan <- true
finishChan <- true finishChan <- true
close(finishChan)
wait.Wait() wait.Wait()
close(finishChan)
writeStats.printStats() writeStats.printStats()
} }
@ -168,7 +164,9 @@ func bench_read() {
go readFiles(fileIdLineChan, &readStats.localStats[i]) go readFiles(fileIdLineChan, &readStats.localStats[i])
} }
wait.Wait() wait.Wait()
wait.Add(1)
finishChan <- true finishChan <- true
wait.Wait()
close(finishChan) close(finishChan)
readStats.end = time.Now() readStats.end = time.Now()
readStats.printStats() readStats.printStats()
@ -186,37 +184,26 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
waitForDeletions.Add(1) waitForDeletions.Add(1)
go func() { go func() {
defer waitForDeletions.Done()
for df := range delayedDeleteChan { for df := range delayedDeleteChan {
if df == nil {
break
}
if df.enterTime.After(time.Now()) { if df.enterTime.After(time.Now()) {
time.Sleep(df.enterTime.Sub(time.Now())) time.Sleep(df.enterTime.Sub(time.Now()))
} }
fp := df.fp if e := util.Delete("http://" + df.fp.Server + "/" + df.fp.Fid); e == nil {
serverLimitChan[fp.Server] <- true
if e := util.Delete("http://" + fp.Server + "/" + fp.Fid); e == nil {
s.completed++ s.completed++
} else { } else {
s.failed++ s.failed++
} }
<-serverLimitChan[fp.Server]
} }
waitForDeletions.Done()
}() }()
} }
for { for id := range idChan {
if id, ok := <-idChan; ok {
start := time.Now() start := time.Now()
fileSize := int64(*b.fileSize + rand.Intn(64)) fileSize := int64(*b.fileSize + rand.Intn(64))
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize} fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil { if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
if _, ok := serverLimitChan[fp.Server]; !ok {
serverLimitChan[fp.Server] = make(chan bool, 7)
}
serverLimitChan[fp.Server] <- true
if _, err := fp.Upload(0, *b.server); err == nil { if _, err := fp.Upload(0, *b.server); err == nil {
if rand.Intn(100) < *b.deletePercentage { if rand.Intn(100) < *b.deletePercentage {
s.total++ s.total++
@ -231,7 +218,6 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
fmt.Printf("Failed to write with error:%v\n", err) fmt.Printf("Failed to write with error:%v\n", err)
} }
writeStats.addSample(time.Now().Sub(start)) writeStats.addSample(time.Now().Sub(start))
<-serverLimitChan[fp.Server]
if *cmdBenchmark.IsDebug { if *cmdBenchmark.IsDebug {
fmt.Printf("writing %d file %s\n", id, fp.Fid) fmt.Printf("writing %d file %s\n", id, fp.Fid)
} }
@ -239,9 +225,6 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
s.failed++ s.failed++
println("writing file error:", err.Error()) println("writing file error:", err.Error())
} }
} else {
break
}
} }
close(delayedDeleteChan) close(delayedDeleteChan)
waitForDeletions.Wait() waitForDeletions.Wait()
@ -249,10 +232,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
func readFiles(fileIdLineChan chan string, s *stat) { func readFiles(fileIdLineChan chan string, s *stat) {
defer wait.Done() defer wait.Done()
serverLimitChan := make(map[string]chan bool)
masterLimitChan := make(chan bool, 1) masterLimitChan := make(chan bool, 1)
for { for fid := range fileIdLineChan {
if fid, ok := <-fileIdLineChan; ok {
if len(fid) == 0 { if len(fid) == 0 {
continue continue
} }
@ -278,10 +259,6 @@ func readFiles(fileIdLineChan chan string, s *stat) {
<-masterLimitChan <-masterLimitChan
} }
if server, ok := b.vid2server[vid]; ok { if server, ok := b.vid2server[vid]; ok {
if _, ok := serverLimitChan[server]; !ok {
serverLimitChan[server] = make(chan bool, 7)
}
serverLimitChan[server] <- true
url := "http://" + server + "/" + fid url := "http://" + server + "/" + fid
if bytesRead, err := util.Get(url); err == nil { if bytesRead, err := util.Get(url); err == nil {
s.completed++ s.completed++
@ -291,14 +268,10 @@ func readFiles(fileIdLineChan chan string, s *stat) {
s.failed++ s.failed++
fmt.Printf("Failed to read %s error:%v\n", url, err) fmt.Printf("Failed to read %s error:%v\n", url, err)
} }
<-serverLimitChan[server]
} else { } else {
s.failed++ s.failed++
println("!!!! volume id ", vid, " location not found!!!!!") println("!!!! volume id ", vid, " location not found!!!!!")
} }
} else {
break
}
} }
} }
@ -405,6 +378,7 @@ func (s *stats) checkProgress(testName string, finishChan chan bool) {
for { for {
select { select {
case <-finishChan: case <-finishChan:
wait.Done()
return return
case t := <-ticker: case t := <-ticker:
completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total