refactor Volume Assign function

This commit is contained in:
霍晓栋 2016-06-26 10:50:18 +08:00
parent c601ef03b1
commit 1f63094542
5 changed files with 57 additions and 40 deletions

View file

@ -205,7 +205,11 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
start := time.Now()
fileSize := int64(*b.fileSize + rand.Intn(64))
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
ar := &operation.VolumeAssignRequest{
Count: 1,
Collection: *b.collection,
}
if assignResult, err := operation.Assign(*b.server, ar); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
if _, err := fp.Upload(0, *b.server, secret); err == nil {
if rand.Intn(100) < *b.deletePercentage {

View file

@ -11,6 +11,16 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
type VolumeAssignRequest struct {
Count uint64
Replication string
Collection string
Ttl string
DataCenter string
Rack string
DataNode string
}
type AssignResult struct {
Fid string `json:"fid,omitempty"`
Url string `json:"url,omitempty"`
@ -19,47 +29,26 @@ type AssignResult struct {
Error string `json:"error,omitempty"`
}
/*
options params meaning:
options[0] main data Center
options[1] main rack
options[2] main data node
*/
func Assign(server string, count uint64, replication string, collection string, ttl string, options ...string) (*AssignResult, error) {
func Assign(server string, r *VolumeAssignRequest) (*AssignResult, error) {
values := make(url.Values)
values.Add("count", strconv.FormatUint(count, 10))
if replication != "" {
values.Add("replication", replication)
values.Add("count", strconv.FormatUint(r.Count, 10))
if r.Replication != "" {
values.Add("replication", r.Replication)
}
if collection != "" {
values.Add("collection", collection)
if r.Collection != "" {
values.Add("collection", r.Collection)
}
if ttl != "" {
values.Add("ttl", ttl)
if r.Ttl != "" {
values.Add("ttl", r.Ttl)
}
var dataCenter, rack, dataNode string
switch len(options) {
case 1:
dataCenter = options[0]
case 2:
dataCenter = options[0]
rack = options[1]
case 3:
dataCenter = options[0]
rack = options[1]
dataNode = options[2]
default:
if r.DataCenter != "" {
values.Add("dataCenter", r.DataCenter)
}
if dataCenter != "" {
values.Add("dataCenter", dataCenter)
if r.Rack != "" {
values.Add("rack", r.Rack)
}
if rack != "" {
values.Add("rack", rack)
}
if dataNode != "" {
values.Add("dataNode", dataNode)
if r.DataNode != "" {
values.Add("dataNode", r.DataNode)
}
jsonBlob, err := util.Post("http://"+server+"/dir/assign", values)

View file

@ -44,7 +44,13 @@ func SubmitFiles(master string, files []FilePart,
for index, file := range files {
results[index].FileName = file.FileName
}
ret, err := Assign(master, uint64(len(files)), replication, collection, ttl)
ar := &VolumeAssignRequest{
Count: uint64(len(files)),
Replication: replication,
Collection: collection,
Ttl: ttl,
}
ret, err := Assign(master, ar)
if err != nil {
for index, _ := range files {
results[index].Error = err.Error()
@ -164,7 +170,13 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
func upload_one_chunk(filename string, reader io.Reader, master,
replication string, collection string, ttl string, jwt security.EncodedJwt,
) (fid string, size uint32, e error) {
ret, err := Assign(master, 1, replication, collection, ttl)
ar := &VolumeAssignRequest{
Count: 1,
Replication: replication,
Collection: collection,
Ttl: ttl,
}
ret, err := Assign(master, ar)
if err != nil {
return "", 0, err
}

View file

@ -94,7 +94,13 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
debug("assigning file id for", fname)
r.ParseForm()
assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"), r.FormValue("ttl"))
ar := &operation.VolumeAssignRequest{
Count: 1,
Replication: r.FormValue("replication"),
Collection: r.FormValue("collection"),
Ttl: r.FormValue("ttl"),
}
assignResult, ae := operation.Assign(masterUrl, ar)
if ae != nil {
writeJsonError(w, r, http.StatusInternalServerError, ae)
return

View file

@ -87,7 +87,13 @@ func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Reques
}
func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string) (fileId, urlLocation string, err error) {
assignResult, ae := operation.Assign(fs.getMasterNode(), 1, replication, collection, r.URL.Query().Get("ttl"))
ar := &operation.VolumeAssignRequest{
Count: 1,
Replication: replication,
Collection: collection,
Ttl: r.URL.Query().Get("ttl"),
}
assignResult, ae := operation.Assign(fs.getMasterNode(), ar)
if ae != nil {
glog.V(0).Infoln("failing to assign a file id", ae.Error())
writeJsonError(w, r, http.StatusInternalServerError, ae)