From 26aaccca088e099ea87d2ba60cfbdbc2e7f3d77f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 12 Apr 2019 11:13:47 -0700 Subject: [PATCH] add stress filer test code --- .../bench_filer_upload/bench_filer_upload.go | 122 ++++++++++++++ .../stress_filer_upload.go | 149 ++++++++++++++++++ 2 files changed, 271 insertions(+) create mode 100644 unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go create mode 100644 unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go diff --git a/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go new file mode 100644 index 000000000..79ecaff12 --- /dev/null +++ b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go @@ -0,0 +1,122 @@ +package main + +import ( + "bytes" + "flag" + "fmt" + "log" + "mime/multipart" + "net/http" + "os" + "strings" + "sync" + "time" +) + +var ( + size = flag.Int("size", 1024, "file size") + concurrency = flag.Int("c", 4, "concurrent number of uploads") + times = flag.Int("n", 1024, "repeated number of times") + destination = flag.String("to", "http://localhost:8888/", "destination directory on filer") + + statsChan = make(chan stat, 8) +) + +type stat struct { + size int64 +} + +func main() { + + flag.Parse() + + data := make([]byte, *size) + println("data len", len(data)) + + var wg sync.WaitGroup + for x := 0; x < *concurrency; x++ { + wg.Add(1) + + client := &http.Client{} + + go func() { + defer wg.Done() + for t := 0; t < *times; t++ { + if size, err := uploadFileToFiler(client, data, fmt.Sprintf("file%d", t), *destination); err == nil { + statsChan <- stat{ + size: size, + } + }else { + log.Fatalf("upload: %v", err) + } + } + }() + } + + go func() { + ticker := time.NewTicker(500 * time.Millisecond) + + var lastTime time.Time + var counter, size int64 + for { + select { + case stat := <-statsChan: + size += stat.size + counter++ + case x := <-ticker.C: + if !lastTime.IsZero() { + elapsed := x.Sub(lastTime).Seconds() + fmt.Fprintf(os.Stdout, "%.2f files/s, %.2f MB/s\n", + float64(counter)/elapsed, + float64(size/1024/1024)/elapsed) + } + lastTime = x + size = 0 + counter = 0 + } + } + }() + + wg.Wait() + +} + +func uploadFileToFiler(client *http.Client, data []byte, filename, destination string) (size int64, err error) { + + if !strings.HasSuffix(destination, "/") { + destination = destination + "/" + } + + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + part, err := writer.CreateFormFile("file", filename) + if err != nil { + return 0, fmt.Errorf("fail to create form %v: %v", filename, err) + } + + part.Write(data) + + err = writer.Close() + if err != nil { + return 0, fmt.Errorf("fail to write part %v: %v", filename, err) + } + + uri := destination + filename + + request, err := http.NewRequest("POST", uri, body) + request.Header.Set("Content-Type", writer.FormDataContentType()) + + resp, err := client.Do(request) + if err != nil { + return 0, fmt.Errorf("http POST %s: %v", uri, err) + } else { + body := &bytes.Buffer{} + _, err := body.ReadFrom(resp.Body) + if err != nil { + return 0, fmt.Errorf("read http POST %s response: %v", uri, err) + } + resp.Body.Close() + } + + return int64(len(data)), nil +} diff --git a/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go b/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go new file mode 100644 index 000000000..41fb85ae0 --- /dev/null +++ b/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go @@ -0,0 +1,149 @@ +package main + +import ( + "bytes" + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "math/rand" + "mime/multipart" + "net/http" + "os" + "path/filepath" + "strings" + "sync" + "time" +) + +var ( + dir = flag.String("dir", ".", "upload files under this directory") + concurrency = flag.Int("c", 1, "concurrent number of uploads") + times = flag.Int("n", 1, "repeated number of times") + destination = flag.String("to", "http://localhost:8888/", "destination directory on filer") + + statsChan = make(chan stat, 8) +) + +type stat struct { + size int64 +} + +func main() { + + flag.Parse() + + var fileNames []string + + files, err := ioutil.ReadDir(*dir) + if err != nil { + log.Fatalf("fail to read dir %v: %v", *dir, err) + } + + for _, file := range files { + if file.IsDir() { + continue + } + fileNames = append(fileNames, filepath.Join(*dir, file.Name())) + } + + var wg sync.WaitGroup + for x := 0; x < *concurrency; x++ { + wg.Add(1) + + client := &http.Client{} + + go func() { + defer wg.Done() + rand.Shuffle(len(fileNames), func(i, j int) { + fileNames[i], fileNames[j] = fileNames[j], fileNames[i] + }) + for t := 0; t < *times; t++ { + for _, filename := range fileNames { + if size, err := uploadFileToFiler(client, filename, *destination); err == nil { + statsChan <- stat{ + size: size, + } + } + } + } + }() + } + + go func() { + ticker := time.NewTicker(500 * time.Millisecond) + + var lastTime time.Time + var counter, size int64 + for { + select { + case stat := <-statsChan: + size += stat.size + counter++ + case x := <-ticker.C: + if !lastTime.IsZero() { + elapsed := x.Sub(lastTime).Seconds() + fmt.Fprintf(os.Stdout, "%.2f files/s, %.2f MB/s\n", + float64(counter)/elapsed, + float64(size/1024/1024)/elapsed) + } + lastTime = x + size = 0 + counter = 0 + } + } + }() + + wg.Wait() + +} + +func uploadFileToFiler(client *http.Client, filename, destination string) (size int64, err error) { + file, err := os.Open(filename) + if err != nil { + panic(err) + } + defer file.Close() + + fi, err := file.Stat() + + if !strings.HasSuffix(destination, "/") { + destination = destination + "/" + } + + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + part, err := writer.CreateFormFile("file", file.Name()) + if err != nil { + return 0, fmt.Errorf("fail to create form %v: %v", file.Name(), err) + } + _, err = io.Copy(part, file) + if err != nil { + return 0, fmt.Errorf("fail to write part %v: %v", file.Name(), err) + } + + err = writer.Close() + if err != nil { + return 0, fmt.Errorf("fail to write part %v: %v", file.Name(), err) + } + + uri := destination + file.Name() + + request, err := http.NewRequest("POST", uri, body) + request.Header.Set("Content-Type", writer.FormDataContentType()) + + resp, err := client.Do(request) + if err != nil { + return 0, fmt.Errorf("http POST %s: %v", uri, err) + } else { + body := &bytes.Buffer{} + _, err := body.ReadFrom(resp.Body) + if err != nil { + return 0, fmt.Errorf("read http POST %s response: %v", uri, err) + } + resp.Body.Close() + } + + return fi.Size(), nil +}