mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge pull request #435 from sparklxb/master
support uploading files to specific dataCenter
This commit is contained in:
commit
9fa648e570
|
@ -126,7 +126,7 @@ func doEachCopy(fileOrDir string, host string, path string) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
results, err := operation.SubmitFiles(*copy.master, parts,
|
results, err := operation.SubmitFiles(*copy.master, parts,
|
||||||
*copy.replication, *copy.collection,
|
*copy.replication, *copy.collection, "",
|
||||||
*copy.ttl, *copy.maxMB, copy.secret)
|
*copy.ttl, *copy.maxMB, copy.secret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Failed to submit file %s: %v", fileOrDir, err)
|
fmt.Printf("Failed to submit file %s: %v", fileOrDir, err)
|
||||||
|
|
|
@ -20,6 +20,7 @@ type UploadOptions struct {
|
||||||
include *string
|
include *string
|
||||||
replication *string
|
replication *string
|
||||||
collection *string
|
collection *string
|
||||||
|
dataCenter *string
|
||||||
ttl *string
|
ttl *string
|
||||||
maxMB *int
|
maxMB *int
|
||||||
secretKey *string
|
secretKey *string
|
||||||
|
@ -33,6 +34,7 @@ func init() {
|
||||||
upload.include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
|
upload.include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
|
||||||
upload.replication = cmdUpload.Flag.String("replication", "", "replication type")
|
upload.replication = cmdUpload.Flag.String("replication", "", "replication type")
|
||||||
upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name")
|
upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name")
|
||||||
|
upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name")
|
||||||
upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
|
upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
|
||||||
upload.maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
|
upload.maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
|
||||||
upload.secretKey = cmdUpload.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
|
upload.secretKey = cmdUpload.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
|
||||||
|
@ -80,7 +82,7 @@ func runUpload(cmd *Command, args []string) bool {
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
results, e := operation.SubmitFiles(*upload.master, parts,
|
results, e := operation.SubmitFiles(*upload.master, parts,
|
||||||
*upload.replication, *upload.collection,
|
*upload.replication, *upload.collection, *upload.dataCenter,
|
||||||
*upload.ttl, *upload.maxMB, secret)
|
*upload.ttl, *upload.maxMB, secret)
|
||||||
bytes, _ := json.Marshal(results)
|
bytes, _ := json.Marshal(results)
|
||||||
fmt.Println(string(bytes))
|
fmt.Println(string(bytes))
|
||||||
|
@ -99,7 +101,7 @@ func runUpload(cmd *Command, args []string) bool {
|
||||||
fmt.Println(e.Error())
|
fmt.Println(e.Error())
|
||||||
}
|
}
|
||||||
results, _ := operation.SubmitFiles(*upload.master, parts,
|
results, _ := operation.SubmitFiles(*upload.master, parts,
|
||||||
*upload.replication, *upload.collection,
|
*upload.replication, *upload.collection, *upload.dataCenter,
|
||||||
*upload.ttl, *upload.maxMB, secret)
|
*upload.ttl, *upload.maxMB, secret)
|
||||||
bytes, _ := json.Marshal(results)
|
bytes, _ := json.Marshal(results)
|
||||||
fmt.Println(string(bytes))
|
fmt.Println(string(bytes))
|
||||||
|
|
|
@ -23,6 +23,7 @@ type FilePart struct {
|
||||||
ModTime int64 //in seconds
|
ModTime int64 //in seconds
|
||||||
Replication string
|
Replication string
|
||||||
Collection string
|
Collection string
|
||||||
|
DataCenter string
|
||||||
Ttl string
|
Ttl string
|
||||||
Server string //this comes from assign result
|
Server string //this comes from assign result
|
||||||
Fid string //this comes from assign result, but customizable
|
Fid string //this comes from assign result, but customizable
|
||||||
|
@ -37,7 +38,7 @@ type SubmitResult struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func SubmitFiles(master string, files []FilePart,
|
func SubmitFiles(master string, files []FilePart,
|
||||||
replication string, collection string, ttl string, maxMB int,
|
replication string, collection string, dataCenter string, ttl string, maxMB int,
|
||||||
secret security.Secret,
|
secret security.Secret,
|
||||||
) ([]SubmitResult, error) {
|
) ([]SubmitResult, error) {
|
||||||
results := make([]SubmitResult, len(files))
|
results := make([]SubmitResult, len(files))
|
||||||
|
@ -48,6 +49,7 @@ func SubmitFiles(master string, files []FilePart,
|
||||||
Count: uint64(len(files)),
|
Count: uint64(len(files)),
|
||||||
Replication: replication,
|
Replication: replication,
|
||||||
Collection: collection,
|
Collection: collection,
|
||||||
|
DataCenter: dataCenter,
|
||||||
Ttl: ttl,
|
Ttl: ttl,
|
||||||
}
|
}
|
||||||
ret, err := Assign(master, ar)
|
ret, err := Assign(master, ar)
|
||||||
|
@ -65,6 +67,7 @@ func SubmitFiles(master string, files []FilePart,
|
||||||
file.Server = ret.Url
|
file.Server = ret.Url
|
||||||
file.Replication = replication
|
file.Replication = replication
|
||||||
file.Collection = collection
|
file.Collection = collection
|
||||||
|
file.DataCenter = dataCenter
|
||||||
results[index].Size, err = file.Upload(maxMB, master, secret)
|
results[index].Size, err = file.Upload(maxMB, master, secret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
results[index].Error = err.Error()
|
results[index].Error = err.Error()
|
||||||
|
@ -129,11 +132,46 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
|
||||||
Chunks: make([]*ChunkInfo, 0, chunks),
|
Chunks: make([]*ChunkInfo, 0, chunks),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ret *AssignResult
|
||||||
|
var id string
|
||||||
|
if fi.DataCenter != "" {
|
||||||
|
ar := &VolumeAssignRequest{
|
||||||
|
Count: uint64(chunks),
|
||||||
|
Replication: fi.Replication,
|
||||||
|
Collection: fi.Collection,
|
||||||
|
Ttl: fi.Ttl,
|
||||||
|
}
|
||||||
|
ret, err = Assign(master, ar)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
for i := int64(0); i < chunks; i++ {
|
for i := int64(0); i < chunks; i++ {
|
||||||
id, count, e := upload_one_chunk(
|
if fi.DataCenter == "" {
|
||||||
|
ar := &VolumeAssignRequest{
|
||||||
|
Count: 1,
|
||||||
|
Replication: fi.Replication,
|
||||||
|
Collection: fi.Collection,
|
||||||
|
Ttl: fi.Ttl,
|
||||||
|
}
|
||||||
|
ret, err = Assign(master, ar)
|
||||||
|
if err != nil {
|
||||||
|
// delete all uploaded chunks
|
||||||
|
cm.DeleteChunks(master)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
id = ret.Fid
|
||||||
|
} else {
|
||||||
|
id = ret.Fid
|
||||||
|
if i > 0 {
|
||||||
|
id += "_" + strconv.FormatInt(i, 10)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fileUrl := "http://" + ret.Url + "/" + id
|
||||||
|
count, e := upload_one_chunk(
|
||||||
baseName+"-"+strconv.FormatInt(i+1, 10),
|
baseName+"-"+strconv.FormatInt(i+1, 10),
|
||||||
io.LimitReader(fi.Reader, chunkSize),
|
io.LimitReader(fi.Reader, chunkSize),
|
||||||
master, fi.Replication, fi.Collection, fi.Ttl,
|
master, fileUrl,
|
||||||
jwt)
|
jwt)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
// delete all uploaded chunks
|
// delete all uploaded chunks
|
||||||
|
@ -165,26 +203,15 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func upload_one_chunk(filename string, reader io.Reader, master,
|
func upload_one_chunk(filename string, reader io.Reader, master,
|
||||||
replication string, collection string, ttl string, jwt security.EncodedJwt,
|
fileUrl string, jwt security.EncodedJwt,
|
||||||
) (fid string, size uint32, e error) {
|
) (size uint32, e error) {
|
||||||
ar := &VolumeAssignRequest{
|
|
||||||
Count: 1,
|
|
||||||
Replication: replication,
|
|
||||||
Collection: collection,
|
|
||||||
Ttl: ttl,
|
|
||||||
}
|
|
||||||
ret, err := Assign(master, ar)
|
|
||||||
if err != nil {
|
|
||||||
return "", 0, err
|
|
||||||
}
|
|
||||||
fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
|
|
||||||
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
|
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
|
||||||
uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
|
uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
|
||||||
"application/octet-stream", nil, jwt)
|
"application/octet-stream", nil, jwt)
|
||||||
if uploadError != nil {
|
if uploadError != nil {
|
||||||
return fid, 0, uploadError
|
return 0, uploadError
|
||||||
}
|
}
|
||||||
return fid, uploadResult.Size, nil
|
return uploadResult.Size, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
|
func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
|
||||||
|
|
Loading…
Reference in a new issue