diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 2aa994f6f..da7fb43bb 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -126,7 +126,7 @@ func doEachCopy(fileOrDir string, host string, path string) bool { } results, err := operation.SubmitFiles(*copy.master, parts, - *copy.replication, *copy.collection, + *copy.replication, *copy.collection, "", *copy.ttl, *copy.maxMB, copy.secret) if err != nil { fmt.Printf("Failed to submit file %s: %v", fileOrDir, err) diff --git a/weed/command/upload.go b/weed/command/upload.go index d7a468610..72ef0af73 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -20,6 +20,7 @@ type UploadOptions struct { include *string replication *string collection *string + dataCenter *string ttl *string maxMB *int secretKey *string @@ -33,6 +34,7 @@ func init() { upload.include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") upload.replication = cmdUpload.Flag.String("replication", "", "replication type") upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name") + upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name") upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") upload.maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit") upload.secretKey = cmdUpload.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") @@ -80,7 +82,7 @@ func runUpload(cmd *Command, args []string) bool { return e } results, e := operation.SubmitFiles(*upload.master, parts, - *upload.replication, *upload.collection, + *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.maxMB, secret) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) @@ -99,7 +101,7 @@ func runUpload(cmd *Command, args []string) bool { fmt.Println(e.Error()) } results, _ := operation.SubmitFiles(*upload.master, parts, - *upload.replication, *upload.collection, + *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.maxMB, secret) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 75d5afbde..349cddfce 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -23,6 +23,7 @@ type FilePart struct { ModTime int64 //in seconds Replication string Collection string + DataCenter string Ttl string Server string //this comes from assign result Fid string //this comes from assign result, but customizable @@ -37,7 +38,7 @@ type SubmitResult struct { } func SubmitFiles(master string, files []FilePart, - replication string, collection string, ttl string, maxMB int, + replication string, collection string, dataCenter string, ttl string, maxMB int, secret security.Secret, ) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) @@ -48,6 +49,7 @@ func SubmitFiles(master string, files []FilePart, Count: uint64(len(files)), Replication: replication, Collection: collection, + DataCenter: dataCenter, Ttl: ttl, } ret, err := Assign(master, ar) @@ -65,6 +67,7 @@ func SubmitFiles(master string, files []FilePart, file.Server = ret.Url file.Replication = replication file.Collection = collection + file.DataCenter = dataCenter results[index].Size, err = file.Upload(maxMB, master, secret) if err != nil { results[index].Error = err.Error() @@ -129,11 +132,46 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret Chunks: make([]*ChunkInfo, 0, chunks), } + var ret *AssignResult + var id string + if fi.DataCenter != "" { + ar := &VolumeAssignRequest{ + Count: uint64(chunks), + Replication: fi.Replication, + Collection: fi.Collection, + Ttl: fi.Ttl, + } + ret, err = Assign(master, ar) + if err != nil { + return + } + } for i := int64(0); i < chunks; i++ { - id, count, e := upload_one_chunk( + if fi.DataCenter == "" { + ar := &VolumeAssignRequest{ + Count: 1, + Replication: fi.Replication, + Collection: fi.Collection, + Ttl: fi.Ttl, + } + ret, err = Assign(master, ar) + if err != nil { + // delete all uploaded chunks + cm.DeleteChunks(master) + return + } + id = ret.Fid + } else { + id = ret.Fid + if i > 0 { + id += "_" + strconv.FormatInt(i, 10) + } + } + fileUrl := "http://" + ret.Url + "/" + id + count, e := upload_one_chunk( baseName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), - master, fi.Replication, fi.Collection, fi.Ttl, + master, fileUrl, jwt) if e != nil { // delete all uploaded chunks @@ -165,26 +203,15 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret } func upload_one_chunk(filename string, reader io.Reader, master, - replication string, collection string, ttl string, jwt security.EncodedJwt, -) (fid string, size uint32, e error) { - ar := &VolumeAssignRequest{ - Count: 1, - Replication: replication, - Collection: collection, - Ttl: ttl, - } - ret, err := Assign(master, ar) - if err != nil { - return "", 0, err - } - fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid + fileUrl string, jwt security.EncodedJwt, +) (size uint32, e error) { glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") uploadResult, uploadError := Upload(fileUrl, filename, reader, false, "application/octet-stream", nil, jwt) if uploadError != nil { - return fid, 0, uploadError + return 0, uploadError } - return fid, uploadResult.Size, nil + return uploadResult.Size, nil } func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {