refactoring content uploading

This commit is contained in:
Chris Lu 2012-09-20 17:58:29 -07:00
parent 7d8e9f829c
commit 5e97179d06
6 changed files with 74 additions and 47 deletions

View file

@ -82,4 +82,5 @@ For the above operations, here are the todo list:
5. accept lookup for volume locations ALREADY EXISTS /dir/lookup 5. accept lookup for volume locations ALREADY EXISTS /dir/lookup
6. read topology/datacenter/rack layout 6. read topology/datacenter/rack layout
TODO:
1. replicate content to the other server if the replication type needs replicas

View file

@ -1,16 +1,12 @@
package main package main
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
"net/url" "net/url"
"os" "os"
"pkg/operation"
"pkg/util" "pkg/util"
"strconv" "strconv"
) )
@ -64,23 +60,10 @@ func assign(count int) (*AssignResult, error) {
return &ret, nil return &ret, nil
} }
type UploadResult struct { func upload(filename string, server string, fid string) (int) {
Size int
}
func upload(filename string, uploadUrl string) (int, string) {
if *IsDebug { if *IsDebug {
fmt.Println("Start uploading file:", filename) fmt.Println("Start uploading file:", filename)
} }
body_buf := bytes.NewBufferString("")
body_writer := multipart.NewWriter(body_buf)
file_writer, err := body_writer.CreateFormFile("file", filename)
if err != nil {
if *IsDebug {
fmt.Println("Failed to create form file:", filename)
}
panic(err.Error())
}
fh, err := os.Open(filename) fh, err := os.Open(filename)
if err != nil { if err != nil {
if *IsDebug { if *IsDebug {
@ -88,31 +71,8 @@ func upload(filename string, uploadUrl string) (int, string) {
} }
panic(err.Error()) panic(err.Error())
} }
io.Copy(file_writer, fh) ret, _ := operation.Upload(server, fid, filename, fh)
content_type := body_writer.FormDataContentType() return ret.Size
body_writer.Close()
resp, err := http.Post(uploadUrl, content_type, body_buf)
if err != nil {
if *IsDebug {
fmt.Println("Failed to upload file to", uploadUrl)
}
panic(err.Error())
}
defer resp.Body.Close()
resp_body, err := ioutil.ReadAll(resp.Body)
if *IsDebug {
fmt.Println("Upload response:", string(resp_body))
}
if err != nil {
panic(err.Error())
}
var ret UploadResult
err = json.Unmarshal(resp_body, &ret)
if err != nil {
panic(err.Error())
}
//fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl)
return ret.Size, uploadUrl
} }
type SubmitResult struct { type SubmitResult struct {
@ -131,8 +91,7 @@ func submit(files []string) []SubmitResult {
if index > 0 { if index > 0 {
fid = fid + "_" + strconv.Itoa(index) fid = fid + "_" + strconv.Itoa(index)
} }
uploadUrl := "http://" + ret.PublicUrl + "/" + fid results[index].Size = upload(file, ret.PublicUrl, fid)
results[index].Size, _ = upload(file, uploadUrl)
results[index].Fid = fid results[index].Fid = fid
} }
return results return results

View file

@ -137,6 +137,30 @@ func PostHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, ne) writeJson(w, r, ne)
} else { } else {
ret := store.Write(volumeId, needle) ret := store.Write(volumeId, needle)
if ret > 0 { //send to other replica locations
if r.FormValue("type") != "standard" {
waitTime, err := strconv.Atoi(r.FormValue("wait"))
lookupResult, lookupErr := operation.Lookup(*server, volumeId)
if lookupErr == nil {
sendFunc := func(background bool) {
postContentFunc := func(location operation.Location) bool{
return true
}
for _, location := range lookupResult.Locations {
if background {
go postContentFunc(location)
}else{
postContentFunc(location)
}
}
}
sendFunc(err == nil && waitTime > 0)
} else {
log.Println("Failed to lookup for", volumeId, lookupErr.Error())
}
}
}
m := make(map[string]uint32) m := make(map[string]uint32)
m["size"] = ret m["size"] = ret
writeJson(w, r, m) writeJson(w, r, m)

View file

@ -0,0 +1,40 @@
package operation
import (
"bytes"
"encoding/json"
"mime/multipart"
"net/http"
_ "fmt"
"io"
"io/ioutil"
)
type UploadResult struct {
Size int
}
func Upload(server string, fid string, filename string, reader io.Reader) (*UploadResult, error) {
body_buf := bytes.NewBufferString("")
body_writer := multipart.NewWriter(body_buf)
file_writer, err := body_writer.CreateFormFile("file", filename)
io.Copy(file_writer, reader)
content_type := body_writer.FormDataContentType()
body_writer.Close()
resp, err := http.Post("http://"+server+"/"+fid, content_type, body_buf)
if err != nil {
return nil, err
}
defer resp.Body.Close()
resp_body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var ret UploadResult
err = json.Unmarshal(resp_body, &ret)
if err != nil {
panic(err.Error())
}
//fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl)
return &ret, nil
}

View file

@ -135,6 +135,9 @@ func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
} }
return 0, errors.New("Not Found") return 0, errors.New("Not Found")
} }
func (s *Store) GetVolume(i VolumeId) *Volume {
return s.volumes[i]
}
func (s *Store) HasVolume(i VolumeId) bool { func (s *Store) HasVolume(i VolumeId) bool {
_, ok := s.volumes[i] _, ok := s.volumes[i]