mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
benchmarking writing data to a specific collection
This commit is contained in:
parent
eac9c4d86b
commit
a121453188
|
@ -17,12 +17,15 @@ type AssignResult struct {
|
||||||
Error string `json:"error"`
|
Error string `json:"error"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func Assign(server string, count int, replication string) (*AssignResult, error) {
|
func Assign(server string, count int, replication string, collection string) (*AssignResult, error) {
|
||||||
values := make(url.Values)
|
values := make(url.Values)
|
||||||
values.Add("count", strconv.Itoa(count))
|
values.Add("count", strconv.Itoa(count))
|
||||||
if replication != "" {
|
if replication != "" {
|
||||||
values.Add("replication", replication)
|
values.Add("replication", replication)
|
||||||
}
|
}
|
||||||
|
if collection != "" {
|
||||||
|
values.Add("collection", collection)
|
||||||
|
}
|
||||||
jsonBlob, err := util.Post("http://"+server+"/dir/assign", values)
|
jsonBlob, err := util.Post("http://"+server+"/dir/assign", values)
|
||||||
glog.V(2).Info("assign result :", string(jsonBlob))
|
glog.V(2).Info("assign result :", string(jsonBlob))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -18,6 +18,8 @@ type FilePart struct {
|
||||||
IsGzipped bool
|
IsGzipped bool
|
||||||
MimeType string
|
MimeType string
|
||||||
ModTime int64 //in seconds
|
ModTime int64 //in seconds
|
||||||
|
Replication string
|
||||||
|
Collection string
|
||||||
Server string //this comes from assign result
|
Server string //this comes from assign result
|
||||||
Fid string //this comes from assign result, but customizable
|
Fid string //this comes from assign result, but customizable
|
||||||
}
|
}
|
||||||
|
@ -30,12 +32,12 @@ type SubmitResult struct {
|
||||||
Error string `json:"error"`
|
Error string `json:"error"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func SubmitFiles(master string, files []FilePart, replication string, maxMB int) ([]SubmitResult, error) {
|
func SubmitFiles(master string, files []FilePart, replication string, collection string, maxMB int) ([]SubmitResult, error) {
|
||||||
results := make([]SubmitResult, len(files))
|
results := make([]SubmitResult, len(files))
|
||||||
for index, file := range files {
|
for index, file := range files {
|
||||||
results[index].FileName = file.FileName
|
results[index].FileName = file.FileName
|
||||||
}
|
}
|
||||||
ret, err := Assign(master, len(files), replication)
|
ret, err := Assign(master, len(files), replication, collection)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
for index, _ := range files {
|
for index, _ := range files {
|
||||||
results[index].Error = err.Error()
|
results[index].Error = err.Error()
|
||||||
|
@ -48,7 +50,9 @@ func SubmitFiles(master string, files []FilePart, replication string, maxMB int)
|
||||||
file.Fid = file.Fid + "_" + strconv.Itoa(index)
|
file.Fid = file.Fid + "_" + strconv.Itoa(index)
|
||||||
}
|
}
|
||||||
file.Server = ret.PublicUrl
|
file.Server = ret.PublicUrl
|
||||||
results[index].Size, err = file.Upload(maxMB, master, replication)
|
file.Replication = replication
|
||||||
|
file.Collection = collection
|
||||||
|
results[index].Size, err = file.Upload(maxMB, master)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
results[index].Error = err.Error()
|
results[index].Error = err.Error()
|
||||||
}
|
}
|
||||||
|
@ -95,7 +99,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
|
||||||
return ret, nil
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi FilePart) Upload(maxMB int, master, replication string) (retSize int, err error) {
|
func (fi FilePart) Upload(maxMB int, master string) (retSize int, err error) {
|
||||||
fileUrl := "http://" + fi.Server + "/" + fi.Fid
|
fileUrl := "http://" + fi.Server + "/" + fi.Fid
|
||||||
if fi.ModTime != 0 {
|
if fi.ModTime != 0 {
|
||||||
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
|
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
|
||||||
|
@ -108,7 +112,7 @@ func (fi FilePart) Upload(maxMB int, master, replication string) (retSize int, e
|
||||||
chunks := fi.FileSize/chunkSize + 1
|
chunks := fi.FileSize/chunkSize + 1
|
||||||
fids := make([]string, 0)
|
fids := make([]string, 0)
|
||||||
for i := int64(0); i < chunks; i++ {
|
for i := int64(0); i < chunks; i++ {
|
||||||
id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, replication)
|
id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
return 0, e
|
return 0, e
|
||||||
}
|
}
|
||||||
|
@ -126,8 +130,8 @@ func (fi FilePart) Upload(maxMB int, master, replication string) (retSize int, e
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func upload_one_chunk(filename string, reader io.Reader, master, replication string) (fid string, size int, e error) {
|
func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string) (fid string, size int, e error) {
|
||||||
ret, err := Assign(master, 1, replication)
|
ret, err := Assign(master, 1, replication, collection)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", 0, err
|
return "", 0, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -22,6 +23,8 @@ type BenchmarkOptions struct {
|
||||||
idListFile *string
|
idListFile *string
|
||||||
write *bool
|
write *bool
|
||||||
read *bool
|
read *bool
|
||||||
|
sequentialRead *bool
|
||||||
|
collection *string
|
||||||
vid2server map[string]string //cache for vid locations
|
vid2server map[string]string //cache for vid locations
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,6 +42,8 @@ func init() {
|
||||||
b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
|
b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
|
||||||
b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
|
b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
|
||||||
b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
|
b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
|
||||||
|
b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
|
||||||
|
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
|
||||||
}
|
}
|
||||||
|
|
||||||
var cmdBenchmark = &Command{
|
var cmdBenchmark = &Command{
|
||||||
|
@ -120,9 +125,9 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
|
||||||
if id, ok := <-idChan; ok {
|
if id, ok := <-idChan; ok {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: int64(*b.fileSize)}, FileSize: int64(*b.fileSize)}
|
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: int64(*b.fileSize)}, FileSize: int64(*b.fileSize)}
|
||||||
if assignResult, err := operation.Assign(*b.server, 1, ""); err == nil {
|
if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil {
|
||||||
fp.Server, fp.Fid = assignResult.PublicUrl, assignResult.Fid
|
fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
|
||||||
fp.Upload(0, *b.server, "")
|
fp.Upload(0, *b.server)
|
||||||
writeStats.addSample(time.Now().Sub(start))
|
writeStats.addSample(time.Now().Sub(start))
|
||||||
fileIdLineChan <- fp.Fid
|
fileIdLineChan <- fp.Fid
|
||||||
s.transferred += int64(*b.fileSize)
|
s.transferred += int64(*b.fileSize)
|
||||||
|
@ -212,6 +217,7 @@ func readFileIds(fileName string, fileIdLineChan chan string) {
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
|
|
||||||
r := bufio.NewReader(file)
|
r := bufio.NewReader(file)
|
||||||
|
if *b.sequentialRead {
|
||||||
for {
|
for {
|
||||||
if line, err := Readln(r); err == nil {
|
if line, err := Readln(r); err == nil {
|
||||||
fileIdLineChan <- string(line)
|
fileIdLineChan <- string(line)
|
||||||
|
@ -219,6 +225,20 @@ func readFileIds(fileName string, fileIdLineChan chan string) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
lines := make([]string, 0, *b.numberOfFiles)
|
||||||
|
for {
|
||||||
|
if line, err := Readln(r); err == nil {
|
||||||
|
lines = append(lines, string(line))
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i := 0; i < *b.numberOfFiles; i++ {
|
||||||
|
fileIdLineChan <- lines[rand.Intn(len(lines))]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
close(fileIdLineChan)
|
close(fileIdLineChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
uploadReplication *string
|
uploadReplication *string
|
||||||
|
uploadCollection *string
|
||||||
uploadDir *string
|
uploadDir *string
|
||||||
include *string
|
include *string
|
||||||
maxMB *int
|
maxMB *int
|
||||||
|
@ -21,7 +22,8 @@ func init() {
|
||||||
server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location")
|
server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location")
|
||||||
uploadDir = cmdUpload.Flag.String("dir", "", "Upload the whole folder recursively if specified.")
|
uploadDir = cmdUpload.Flag.String("dir", "", "Upload the whole folder recursively if specified.")
|
||||||
include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
|
include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
|
||||||
uploadReplication = cmdUpload.Flag.String("replication", "", "replication type(000,001,010,100,110,200)")
|
uploadReplication = cmdUpload.Flag.String("replication", "", "replication type")
|
||||||
|
uploadCollection = cmdUpload.Flag.String("collection", "", "optional collection name")
|
||||||
maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
|
maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,7 +67,7 @@ func runUpload(cmd *Command, args []string) bool {
|
||||||
if e != nil {
|
if e != nil {
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *maxMB)
|
results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB)
|
||||||
bytes, _ := json.Marshal(results)
|
bytes, _ := json.Marshal(results)
|
||||||
fmt.Println(string(bytes))
|
fmt.Println(string(bytes))
|
||||||
if e != nil {
|
if e != nil {
|
||||||
|
@ -82,7 +84,7 @@ func runUpload(cmd *Command, args []string) bool {
|
||||||
if e != nil {
|
if e != nil {
|
||||||
fmt.Println(e.Error())
|
fmt.Println(e.Error())
|
||||||
}
|
}
|
||||||
results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *maxMB)
|
results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB)
|
||||||
bytes, _ := json.Marshal(results)
|
bytes, _ := json.Marshal(results)
|
||||||
fmt.Println(string(bytes))
|
fmt.Println(string(bytes))
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,7 +94,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
|
||||||
}
|
}
|
||||||
|
|
||||||
debug("assigning file id for", fname)
|
debug("assigning file id for", fname)
|
||||||
assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"))
|
assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"))
|
||||||
if ae != nil {
|
if ae != nil {
|
||||||
writeJsonError(w, r, ae)
|
writeJsonError(w, r, ae)
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in a new issue