From a1bc529db65aa67ff7d52b76d700c093eb0c6c17 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 20 Sep 2012 02:11:08 -0700 Subject: [PATCH] lots of fix 1. sending 404 if not found 2. handle node-up/node-down/changing-max/volume-become-full --- weed-fs/src/cmd/weed/upload.go | 211 ++++++++++-------- weed-fs/src/cmd/weed/volume.go | 31 +-- weed-fs/src/pkg/replication/volume_growth.go | 2 +- weed-fs/src/pkg/topology/data_center.go | 1 + weed-fs/src/pkg/topology/data_node.go | 4 +- weed-fs/src/pkg/topology/rack.go | 7 +- weed-fs/src/pkg/topology/topology.go | 5 +- .../pkg/topology/topology_event_handling.go | 10 +- weed-fs/src/pkg/topology/volume_layout.go | 14 +- 9 files changed, 156 insertions(+), 129 deletions(-) diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go index 8502ca417..515816921 100644 --- a/weed-fs/src/cmd/weed/upload.go +++ b/weed-fs/src/cmd/weed/upload.go @@ -1,127 +1,150 @@ package main import ( - "bytes" - "encoding/json" - "errors" - "flag" - "fmt" - "io" - "io/ioutil" - "mime/multipart" - "net/http" - "net/url" - "os" - "pkg/util" - "strconv" + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "mime/multipart" + "net/http" + "net/url" + "os" + "pkg/util" + "strconv" ) +var uploadReplication *string + func init() { - cmdUpload.Run = runUpload // break init cycle - IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information") - server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location") + cmdUpload.Run = runUpload // break init cycle + IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information") + server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location") + uploadReplication = cmdUpload.Flag.String("replication", "00", "replication type(00,01,10,11)") } var cmdUpload = &Command{ - UsageLine: "upload -server=localhost:9333 file1 file2 file2", - Short: "upload a set of files, using consecutive file keys", - Long: `upload a set of files, using consecutive file keys. + UsageLine: "upload -server=localhost:9333 file1 [file2 file3]", + Short: "upload one or a list of files", + Long: `upload one or a list of files. + It uses consecutive file keys for the list of files. e.g. If the file1 uses key k, file2 can be read via k_1 `, } type AssignResult struct { - Fid string "fid" - Url string "url" - PublicUrl string "publicUrl" - Count int `json:",string"` - Error string "error" + Fid string "fid" + Url string "url" + PublicUrl string "publicUrl" + Count int + Error string "error" } func assign(count int) (*AssignResult, error) { - values := make(url.Values) - values.Add("count", strconv.Itoa(count)) - jsonBlob, err := util.Post("http://"+*server+"/dir/assign", values) - if err != nil { - return nil, err - } - var ret AssignResult - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, err - } - if ret.Count <= 0 { - return nil, errors.New(ret.Error) - } - return &ret, nil + values := make(url.Values) + values.Add("count", strconv.Itoa(count)) + values.Add("replication", *uploadReplication) + jsonBlob, err := util.Post("http://"+*server+"/dir/assign2", values) + if *IsDebug { + fmt.Println("debug", *IsDebug, "assign result :", string(jsonBlob)) + } + if err != nil { + return nil, err + } + var ret AssignResult + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, err + } + if ret.Count <= 0 { + return nil, errors.New(ret.Error) + } + return &ret, nil } type UploadResult struct { - Size int + Size int } func upload(filename string, uploadUrl string) (int, string) { - body_buf := bytes.NewBufferString("") - body_writer := multipart.NewWriter(body_buf) - file_writer, err := body_writer.CreateFormFile("file", filename) - if err != nil { - panic(err.Error()) - } - fh, err := os.Open(filename) - if err != nil { - panic(err.Error()) - } - io.Copy(file_writer, fh) - content_type := body_writer.FormDataContentType() - body_writer.Close() - resp, err := http.Post(uploadUrl, content_type, body_buf) - if err != nil { - panic(err.Error()) - } - defer resp.Body.Close() - resp_body, err := ioutil.ReadAll(resp.Body) - if err != nil { - panic(err.Error()) - } - var ret UploadResult - err = json.Unmarshal(resp_body, &ret) - if err != nil { - panic(err.Error()) - } - //fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl) - return ret.Size, uploadUrl + if *IsDebug { + fmt.Println("Start uploading file:", filename) + } + body_buf := bytes.NewBufferString("") + body_writer := multipart.NewWriter(body_buf) + file_writer, err := body_writer.CreateFormFile("file", filename) + if err != nil { + if *IsDebug { + fmt.Println("Failed to create form file:", filename) + } + panic(err.Error()) + } + fh, err := os.Open(filename) + if err != nil { + if *IsDebug { + fmt.Println("Failed to open file:", filename) + } + panic(err.Error()) + } + io.Copy(file_writer, fh) + content_type := body_writer.FormDataContentType() + body_writer.Close() + resp, err := http.Post(uploadUrl, content_type, body_buf) + if err != nil { + if *IsDebug { + fmt.Println("Failed to upload file to", uploadUrl) + } + panic(err.Error()) + } + defer resp.Body.Close() + resp_body, err := ioutil.ReadAll(resp.Body) + if *IsDebug { + fmt.Println("Upload response:", string(resp_body)) + } + if err != nil { + panic(err.Error()) + } + var ret UploadResult + err = json.Unmarshal(resp_body, &ret) + if err != nil { + panic(err.Error()) + } + //fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl) + return ret.Size, uploadUrl } type SubmitResult struct { - Fid string "fid" - Size int "size" + Fid string "fid" + Size int "size" } -func submit(files []string)([]SubmitResult) { - ret, err := assign(len(files)) - if err != nil { - panic(err) - } - results := make([]SubmitResult, len(files)) - for index, file := range files { - fid := ret.Fid - if index > 0 { - fid = fid + "_" + strconv.Itoa(index) - } - uploadUrl := "http://" + ret.PublicUrl + "/" + fid - results[index].Size, _ = upload(file, uploadUrl) - results[index].Fid = fid - } - return results +func submit(files []string) []SubmitResult { + ret, err := assign(len(files)) + if err != nil { + panic(err) + } + results := make([]SubmitResult, len(files)) + for index, file := range files { + fid := ret.Fid + if index > 0 { + fid = fid + "_" + strconv.Itoa(index) + } + uploadUrl := "http://" + ret.PublicUrl + "/" + fid + results[index].Size, _ = upload(file, uploadUrl) + results[index].Fid = fid + } + return results } func runUpload(cmd *Command, args []string) bool { - if len(cmdUpload.Flag.Args()) == 0 { - return false - } - results := submit(flag.Args()) - bytes, _ := json.Marshal(results) - fmt.Print(string(bytes)) - return true + *IsDebug = true + if len(cmdUpload.Flag.Args()) == 0 { + return false + } + results := submit(args) + bytes, _ := json.Marshal(results) + fmt.Print(string(bytes)) + return true } diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index bf941056c..66296c5ee 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -28,7 +28,7 @@ var cmdVolume = &Command{ var ( vport = cmdVolume.Flag.Int("port", 8080, "http listen port") - volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "data directory to store files") + volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "data directory to store files") volumes = cmdVolume.Flag.String("volumes", "", "comma-separated list, or ranges of volume ids") publicUrl = cmdVolume.Flag.String("publicUrl", "localhost:8080", "public url to serve data read") masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master directory server to store mappings") @@ -88,15 +88,16 @@ func GetHandler(w http.ResponseWriter, r *http.Request) { } cookie := n.Cookie count, e := store.Read(volumeId, n) - if e != nil { - w.WriteHeader(404) - } - if *IsDebug { - log.Println("read bytes", count, "error", e) + if *IsDebug { + log.Println("read bytes", count, "error", e) + } + if e != nil || count <= 0 { + w.WriteHeader(404) + return } if n.Cookie != cookie { log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - w.WriteHeader(404) + w.WriteHeader(404) return } if ext != "" { @@ -161,6 +162,7 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { writeJson(w, r, m) } func parseURLPath(path string) (vid, fid, ext string) { + sepIndex := strings.LastIndex(path, "/") commaIndex := strings.LastIndex(path[sepIndex:], ",") if commaIndex <= 0 { @@ -181,17 +183,17 @@ func parseURLPath(path string) (vid, fid, ext string) { } func runVolume(cmd *Command, args []string) bool { - fileInfo, err := os.Stat(*volumeFolder) + fileInfo, err := os.Stat(*volumeFolder) //TODO: now default to 1G, this value should come from server? - if err!=nil{ - log.Fatalf("No Existing Folder:%s", *volumeFolder) + if err != nil { + log.Fatalf("No Existing Folder:%s", *volumeFolder) } if !fileInfo.IsDir() { - log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder) + log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder) } - perm:=fileInfo.Mode().Perm() - log.Println("Volume Folder permission:", perm) - + perm := fileInfo.Mode().Perm() + log.Println("Volume Folder permission:", perm) + store = storage.NewStore(*vport, *publicUrl, *volumeFolder, *maxVolumeCount, *volumes) defer store.Close() http.HandleFunc("/", storeHandler) @@ -199,7 +201,6 @@ func runVolume(cmd *Command, args []string) bool { http.HandleFunc("/admin/assign_volume", assignVolumeHandler) http.HandleFunc("/admin/set_volume_locations_list", setVolumeLocationsHandler) - go func() { for { store.Join(*masterNode) diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go index 41d7bd18c..0176033d8 100644 --- a/weed-fs/src/pkg/replication/volume_growth.go +++ b/weed-fs/src/pkg/replication/volume_growth.go @@ -126,7 +126,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { for _, server := range servers { if err := AllocateVolume(server, vid, repType); err == nil { - vi := storage.VolumeInfo{Id: vid, Size: 0} + vi := storage.VolumeInfo{Id: vid, Size: 0, RepType:repType} server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(&vi, server) fmt.Println("Created Volume", vid, "on", server) diff --git a/weed-fs/src/pkg/topology/data_center.go b/weed-fs/src/pkg/topology/data_center.go index c661090e8..a5d8cc749 100644 --- a/weed-fs/src/pkg/topology/data_center.go +++ b/weed-fs/src/pkg/topology/data_center.go @@ -38,6 +38,7 @@ func (dc *DataCenter) GetOrCreateRack(ip string) *Rack { func (dc *DataCenter) ToMap() interface{}{ m := make(map[string]interface{}) + m["Max"] = dc.GetMaxVolumeCount() m["Free"] = dc.FreeSpace() var racks []interface{} for _, c := range dc.Children() { diff --git a/weed-fs/src/pkg/topology/data_node.go b/weed-fs/src/pkg/topology/data_node.go index cb625a41b..cfdae45c3 100644 --- a/weed-fs/src/pkg/topology/data_node.go +++ b/weed-fs/src/pkg/topology/data_node.go @@ -49,8 +49,8 @@ func (dn *DataNode) ToMap() interface{} { ret["Ip"] = dn.Ip ret["Port"] = dn.Port ret["Volumes"] = dn.GetActiveVolumeCount() - ret["MaxVolumeCount"] = dn.GetMaxVolumeCount() - ret["FreeVolumeCount"] = dn.FreeSpace() + ret["Max"] = dn.GetMaxVolumeCount() + ret["Free"] = dn.FreeSpace() ret["PublicUrl"] = dn.PublicUrl return ret } diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go index 16520d14a..bea7d61d7 100644 --- a/weed-fs/src/pkg/topology/rack.go +++ b/weed-fs/src/pkg/topology/rack.go @@ -32,10 +32,10 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol if dn.MatchLocation(ip, port) { dn.LastSeen = time.Now().Unix() if dn.Dead { - dn.Dead = false - r.GetTopology().chanRecoveredDataNodes <- dn + dn.Dead = false + r.GetTopology().chanRecoveredDataNodes <- dn + dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) } - dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) return dn } } @@ -51,6 +51,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol func (rack *Rack) ToMap() interface{} { m := make(map[string]interface{}) + m["Max"] = rack.GetMaxVolumeCount() m["Free"] = rack.FreeSpace() var dns []interface{} for _, c := range rack.Children() { diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go index 8dcd64dca..90f402e28 100644 --- a/weed-fs/src/pkg/topology/topology.go +++ b/weed-fs/src/pkg/topology/topology.go @@ -23,8 +23,6 @@ type Topology struct { chanDeadDataNodes chan *DataNode chanRecoveredDataNodes chan *DataNode chanFullVolumes chan *storage.VolumeInfo - chanIncomplemteVolumes chan *storage.VolumeInfo - chanRecoveredVolumes chan *storage.VolumeInfo } func NewTopology(id string, dirname string, filename string, volumeSizeLimit uint64, pulse int) *Topology { @@ -42,8 +40,6 @@ func NewTopology(id string, dirname string, filename string, volumeSizeLimit uin t.chanDeadDataNodes = make(chan *DataNode) t.chanRecoveredDataNodes = make(chan *DataNode) t.chanFullVolumes = make(chan *storage.VolumeInfo) - t.chanIncomplemteVolumes = make(chan *storage.VolumeInfo) - t.chanRecoveredVolumes = make(chan *storage.VolumeInfo) return t } @@ -124,6 +120,7 @@ func (t *Topology) GetOrCreateDataCenter(ip string) *DataCenter { func (t *Topology) ToMap() interface{} { m := make(map[string]interface{}) + m["Max"] = t.GetMaxVolumeCount() m["Free"] = t.FreeSpace() var dcs []interface{} for _, c := range t.Children() { diff --git a/weed-fs/src/pkg/topology/topology_event_handling.go b/weed-fs/src/pkg/topology/topology_event_handling.go index 813826a61..0ede2ba1e 100644 --- a/weed-fs/src/pkg/topology/topology_event_handling.go +++ b/weed-fs/src/pkg/topology/topology_event_handling.go @@ -18,10 +18,6 @@ func (t *Topology) StartRefreshWritableVolumes() { go func() { for { select { - case v := <-t.chanIncomplemteVolumes: - fmt.Println("Volume", v, "is incomplete!") - case v := <-t.chanRecoveredVolumes: - fmt.Println("Volume", v, "is recovered!") case v := <-t.chanFullVolumes: t.SetVolumeCapacityFull(v) fmt.Println("Volume", v, "is full!") @@ -38,6 +34,9 @@ func (t *Topology) StartRefreshWritableVolumes() { func (t *Topology) SetVolumeCapacityFull(volumeInfo *storage.VolumeInfo) { vl := t.GetVolumeLayout(volumeInfo.RepType) vl.SetVolumeCapacityFull(volumeInfo.Id) + for _, dn := range vl.vid2location[volumeInfo.Id].list { + dn.UpAdjustActiveVolumeCountDelta(-1) + } } func (t *Topology) UnRegisterDataNode(dn *DataNode) { for _, v := range dn.volumes { @@ -45,6 +44,9 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { vl := t.GetVolumeLayout(v.RepType) vl.SetVolumeUnavailable(dn, v.Id) } + dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) + dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) + dn.Parent().UnlinkChildNode(dn.Id()) } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go index ba091b9bb..5159e8e75 100644 --- a/weed-fs/src/pkg/topology/volume_layout.go +++ b/weed-fs/src/pkg/topology/volume_layout.go @@ -76,16 +76,18 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { } func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { - if vl.vid2location[vid].Remove(dn) { - if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() { - return vl.removeFromWritable(vid) - } - } - return false + if vl.vid2location[vid].Remove(dn) { + if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() { + fmt.Println("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount()) + return vl.removeFromWritable(vid) + } + } + return false } func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { if vl.vid2location[vid].Add(dn) { if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() { + fmt.Println("Volume", vid, "becomes writable") return vl.setVolumeWritable(vid) } }