From bf9c4ed033db62e45ae756a5dd3fd100a3ae8903 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 17 Jan 2013 00:15:05 -0800 Subject: [PATCH] Revert "add cmd/dump - a dumper" This reverts commit 5d2a1e8d4845e7a7f1dccd962bb0ee6a5f9d6081. --- weed-fs/src/cmd/dump/main.go | 96 --------- weed-fs/src/cmd/weed/command.go | 59 ++--- weed-fs/src/cmd/weed/shell.go | 73 +++---- weed-fs/src/cmd/weed/upload.go | 2 +- weed-fs/src/cmd/weed/weed.go | 4 +- weed-fs/src/pkg/directory/file_id.go | 6 +- weed-fs/src/pkg/operation/allocate_volume.go | 44 ++-- weed-fs/src/pkg/operation/delete_content.go | 2 +- weed-fs/src/pkg/operation/lookup_volume_id.go | 50 ++--- weed-fs/src/pkg/operation/upload_content.go | 18 +- weed-fs/src/pkg/replication/volume_growth.go | 10 +- .../src/pkg/replication/volume_growth_test.go | 17 +- weed-fs/src/pkg/sequence/sequence.go | 52 ++--- weed-fs/src/pkg/storage/compact_map.go | 20 -- .../src/pkg/storage/compact_map_perf_test.go | 60 +++--- weed-fs/src/pkg/storage/compact_map_test.go | 38 ++-- weed-fs/src/pkg/storage/needle_map.go | 5 - weed-fs/src/pkg/storage/needle_read_write.go | 9 +- weed-fs/src/pkg/storage/replication_type.go | 202 +++++++++--------- weed-fs/src/pkg/storage/store.go | 13 +- weed-fs/src/pkg/storage/volume.go | 53 +---- weed-fs/src/pkg/storage/volume_id.go | 17 +- weed-fs/src/pkg/storage/volume_version.go | 9 +- .../src/pkg/topology/configuration_test.go | 12 +- weed-fs/src/pkg/topology/data_center.go | 45 ++-- weed-fs/src/pkg/topology/node_list.go | 12 +- weed-fs/src/pkg/topology/node_list_test.go | 34 +-- weed-fs/src/pkg/topology/rack.go | 14 +- weed-fs/src/pkg/topology/topo_test.go | 10 +- weed-fs/src/pkg/topology/topology_compact.go | 4 +- .../pkg/topology/topology_event_handling.go | 4 +- weed-fs/src/pkg/topology/topology_map.go | 3 +- weed-fs/src/pkg/topology/volume_location.go | 16 +- weed-fs/src/pkg/util/bytes.go | 53 ++--- weed-fs/src/pkg/util/parse.go | 20 +- weed-fs/src/pkg/util/post.go | 2 +- 36 files changed, 466 insertions(+), 622 deletions(-) delete mode 100644 weed-fs/src/cmd/dump/main.go diff --git a/weed-fs/src/cmd/dump/main.go b/weed-fs/src/cmd/dump/main.go deleted file mode 100644 index e3e151eb7..000000000 --- a/weed-fs/src/cmd/dump/main.go +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright Tamás Gulácsi 2013 All rights reserved -// Use of this source is governed by the same rules as the weed-fs library. -// If this would be ambigous, than Apache License 2.0 has to be used. -// -// dump dumps the files of a volume to tar or unique files. -// Each file will have id#mimetype#original_name file format - -package main - -import ( - "archive/tar" - "bytes" - "flag" - "fmt" - // "io" - "log" - "os" - "pkg/storage" - "strings" - "time" -) - -var ( - volumePath = flag.String("dir", "/tmp", "volume directory") - volumeId = flag.Int("id", 0, "volume Id") - dest = flag.String("out", "-", "output path. Produces tar if path ends with .tar; creates files otherwise.") - tarFh *tar.Writer - tarHeader tar.Header - counter int -) - -func main() { - var err error - - flag.Parse() - - if *dest == "-" { - *dest = "" - } - if *dest == "" || strings.HasSuffix(*dest, ".tar") { - var fh *os.File - if *dest == "" { - fh = os.Stdout - } else { - if fh, err = os.Create(*dest); err != nil { - log.Printf("cannot open output tar %s: %s", *dest, err) - return - } - } - defer fh.Close() - tarFh = tar.NewWriter(fh) - defer tarFh.Close() - t := time.Now() - tarHeader = tar.Header{Mode: 0644, - ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(), - Typeflag: tar.TypeReg, - AccessTime: t, ChangeTime: t} - } - - v, err := storage.NewVolume(*volumePath, storage.VolumeId(*volumeId), storage.CopyNil) - if v == nil || v.Version() == 0 || err != nil { - log.Printf("cannot load volume %d from %s (%s): %s", *volumeId, *volumePath, v, err) - return - } - log.Printf("volume: %s (ver. %d)", v, v.Version()) - if err := v.WalkValues(walker); err != nil { - log.Printf("error while walking: %s", err) - return - } - - log.Printf("%d files written.", counter) -} - -func walker(n *storage.Needle) (err error) { - // log.Printf("Id=%d Size=%d Name=%s mime=%s", n.Id, n.Size, n.Name, n.Mime) - nm := fmt.Sprintf("%d#%s#%s", n.Id, bytes.Replace(n.Mime, []byte{'/'}, []byte{'_'}, -1), n.Name) - // log.Print(nm) - if tarFh != nil { - tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data)) - if err = tarFh.WriteHeader(&tarHeader); err != nil { - return err - } - _, err = tarFh.Write(n.Data) - } else { - if fh, e := os.Create(*dest + "/" + nm); e != nil { - return e - } else { - defer fh.Close() - _, err = fh.Write(n.Data) - } - } - if err == nil { - counter++ - } - return -} diff --git a/weed-fs/src/cmd/weed/command.go b/weed-fs/src/cmd/weed/command.go index 4d68ff151..8c725cafb 100644 --- a/weed-fs/src/cmd/weed/command.go +++ b/weed-fs/src/cmd/weed/command.go @@ -1,52 +1,53 @@ package main import ( - "flag" - "fmt" - "os" - "strings" + "flag" + "fmt" + "os" + "strings" ) type Command struct { - // Run runs the command. - // The args are the arguments after the command name. - Run func(cmd *Command, args []string) bool + // Run runs the command. + // The args are the arguments after the command name. + Run func(cmd *Command, args []string) bool - // UsageLine is the one-line usage message. - // The first word in the line is taken to be the command name. - UsageLine string + // UsageLine is the one-line usage message. + // The first word in the line is taken to be the command name. + UsageLine string - // Short is the short description shown in the 'go help' output. - Short string + // Short is the short description shown in the 'go help' output. + Short string - // Long is the long message shown in the 'go help ' output. - Long string + // Long is the long message shown in the 'go help ' output. + Long string + + // Flag is a set of flags specific to this command. + Flag flag.FlagSet - // Flag is a set of flags specific to this command. - Flag flag.FlagSet } // Name returns the command's name: the first word in the usage line. func (c *Command) Name() string { - name := c.UsageLine - i := strings.Index(name, " ") - if i >= 0 { - name = name[:i] - } - return name + name := c.UsageLine + i := strings.Index(name, " ") + if i >= 0 { + name = name[:i] + } + return name } func (c *Command) Usage() { - fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) - fmt.Fprintf(os.Stderr, "Default Usage:\n") - c.Flag.PrintDefaults() - fmt.Fprintf(os.Stderr, "Description:\n") - fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) - os.Exit(2) + fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) + fmt.Fprintf(os.Stderr, "Default Usage:\n") + c.Flag.PrintDefaults() + fmt.Fprintf(os.Stderr, "Description:\n") + fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) + os.Exit(2) } // Runnable reports whether the command can be run; otherwise // it is a documentation pseudo-command such as importpath. func (c *Command) Runnable() bool { - return c.Run != nil + return c.Run != nil } diff --git a/weed-fs/src/cmd/weed/shell.go b/weed-fs/src/cmd/weed/shell.go index daf0b7e1f..78a4b9eb1 100644 --- a/weed-fs/src/cmd/weed/shell.go +++ b/weed-fs/src/cmd/weed/shell.go @@ -1,53 +1,54 @@ package main import ( - "bufio" - "fmt" - "os" + "bufio" + "os" + "fmt" ) func init() { - cmdShell.Run = runShell // break init cycle + cmdShell.Run = runShell // break init cycle } var cmdShell = &Command{ - UsageLine: "shell", - Short: "run interactive commands, now just echo", - Long: `run interactive commands. + UsageLine: "shell", + Short: "run interactive commands, now just echo", + Long: `run interactive commands. `, } -var () +var ( +) func runShell(command *Command, args []string) bool { - r := bufio.NewReader(os.Stdin) - o := bufio.NewWriter(os.Stdout) - e := bufio.NewWriter(os.Stderr) - prompt := func() { - o.WriteString("> ") - o.Flush() - } - readLine := func() string { - ret, err := r.ReadString('\n') - if err != nil { - fmt.Fprint(e, err) - os.Exit(1) - } - return ret - } - execCmd := func(cmd string) int { - if cmd != "" { - o.WriteString(cmd) - } - return 0 - } + r := bufio.NewReader(os.Stdin) + o := bufio.NewWriter(os.Stdout) + e := bufio.NewWriter(os.Stderr) + prompt := func () { + o.WriteString("> ") + o.Flush() + }; + readLine := func () string { + ret, err := r.ReadString('\n') + if err != nil { + fmt.Fprint(e,err); + os.Exit(1) + } + return ret + } + execCmd := func (cmd string) int { + if cmd != "" { + o.WriteString(cmd) + } + return 0 + } - cmd := "" - for { - prompt() - cmd = readLine() - execCmd(cmd) - } - return true + cmd := "" + for { + prompt() + cmd = readLine() + execCmd(cmd) + } + return true } diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go index 5707fda56..e25930b5d 100644 --- a/weed-fs/src/cmd/weed/upload.go +++ b/weed-fs/src/cmd/weed/upload.go @@ -67,7 +67,7 @@ func upload(filename string, server string, fid string) (int, error) { } ret, e := operation.Upload("http://"+server+"/"+fid, filename, fh) if e != nil { - return 0, e + return 0, e } return ret.Size, e } diff --git a/weed-fs/src/cmd/weed/weed.go b/weed-fs/src/cmd/weed/weed.go index e2eb41ced..232520e75 100644 --- a/weed-fs/src/cmd/weed/weed.go +++ b/weed-fs/src/cmd/weed/weed.go @@ -175,9 +175,9 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { w.Header().Set("Content-Type", "application/javascript") var bytes []byte if r.FormValue("pretty") != "" { - bytes, _ = json.MarshalIndent(obj, "", " ") + bytes, _ = json.MarshalIndent(obj, "", " ") } else { - bytes, _ = json.Marshal(obj) + bytes, _ = json.Marshal(obj) } callback := r.FormValue("callback") if callback == "" { diff --git a/weed-fs/src/pkg/directory/file_id.go b/weed-fs/src/pkg/directory/file_id.go index cd4204f32..9ce556580 100644 --- a/weed-fs/src/pkg/directory/file_id.go +++ b/weed-fs/src/pkg/directory/file_id.go @@ -3,8 +3,8 @@ package directory import ( "encoding/hex" "pkg/storage" - "pkg/util" "strings" + "pkg/util" ) type FileId struct { @@ -16,14 +16,14 @@ type FileId struct { func NewFileId(VolumeId storage.VolumeId, Key uint64, Hashcode uint32) *FileId { return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode} } -func ParseFileId(fid string) *FileId { +func ParseFileId(fid string) *FileId{ a := strings.Split(fid, ",") if len(a) != 2 { println("Invalid fid", fid, ", split length", len(a)) return nil } vid_string, key_hash_string := a[0], a[1] - volumeId, _ := storage.NewVolumeId(vid_string) + volumeId, _ := storage.NewVolumeId(vid_string) key, hash := storage.ParseKeyHash(key_hash_string) return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash} } diff --git a/weed-fs/src/pkg/operation/allocate_volume.go b/weed-fs/src/pkg/operation/allocate_volume.go index c93ccfb62..6a3512896 100644 --- a/weed-fs/src/pkg/operation/allocate_volume.go +++ b/weed-fs/src/pkg/operation/allocate_volume.go @@ -1,32 +1,32 @@ package operation import ( - "encoding/json" - "errors" - "net/url" - "pkg/storage" - "pkg/topology" - "pkg/util" + "encoding/json" + "errors" + "net/url" + "pkg/storage" + "pkg/topology" + "pkg/util" ) type AllocateVolumeResult struct { - Error string + Error string } func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error { - values := make(url.Values) - values.Add("volume", vid.String()) - values.Add("replicationType", repType.String()) - jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) - if err != nil { - return err - } - var ret AllocateVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err - } - if ret.Error != "" { - return errors.New(ret.Error) - } - return nil + values := make(url.Values) + values.Add("volume", vid.String()) + values.Add("replicationType", repType.String()) + jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) + if err != nil { + return err + } + var ret AllocateVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil } diff --git a/weed-fs/src/pkg/operation/delete_content.go b/weed-fs/src/pkg/operation/delete_content.go index 2bdb49651..aeab9c3ac 100644 --- a/weed-fs/src/pkg/operation/delete_content.go +++ b/weed-fs/src/pkg/operation/delete_content.go @@ -1,8 +1,8 @@ package operation import ( - "log" "net/http" + "log" ) func Delete(url string) error { diff --git a/weed-fs/src/pkg/operation/lookup_volume_id.go b/weed-fs/src/pkg/operation/lookup_volume_id.go index 50a6d91e6..c46c6670e 100644 --- a/weed-fs/src/pkg/operation/lookup_volume_id.go +++ b/weed-fs/src/pkg/operation/lookup_volume_id.go @@ -1,38 +1,38 @@ package operation import ( - "encoding/json" - "errors" - _ "fmt" - "net/url" - "pkg/storage" - "pkg/util" + "encoding/json" + "net/url" + "pkg/storage" + "pkg/util" + _ "fmt" + "errors" ) type Location struct { - Url string "url" - PublicUrl string "publicUrl" + Url string "url" + PublicUrl string "publicUrl" } type LookupResult struct { - Locations []Location "locations" - Error string "error" + Locations []Location "locations" + Error string "error" } //TODO: Add a caching for vid here func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) { - values := make(url.Values) - values.Add("volumeId", vid.String()) - jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) - if err != nil { - return nil, err - } - var ret LookupResult - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, err - } - if ret.Error != "" { - return nil, errors.New(ret.Error) - } - return &ret, nil + values := make(url.Values) + values.Add("volumeId", vid.String()) + jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) + if err != nil { + return nil, err + } + var ret LookupResult + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, err + } + if ret.Error != ""{ + return nil, errors.New(ret.Error) + } + return &ret, nil } diff --git a/weed-fs/src/pkg/operation/upload_content.go b/weed-fs/src/pkg/operation/upload_content.go index 0bdb697da..7ed74e02f 100644 --- a/weed-fs/src/pkg/operation/upload_content.go +++ b/weed-fs/src/pkg/operation/upload_content.go @@ -3,18 +3,18 @@ package operation import ( "bytes" "encoding/json" - "errors" _ "fmt" "io" "io/ioutil" - "log" + "log" "mime/multipart" "net/http" + "errors" ) type UploadResult struct { - Size int - Error string + Size int + Error string } func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) { @@ -26,7 +26,7 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, body_writer.Close() resp, err := http.Post(uploadUrl, content_type, body_buf) if err != nil { - log.Println("failing to upload to", uploadUrl) + log.Println("failing to upload to", uploadUrl) return nil, err } defer resp.Body.Close() @@ -37,11 +37,11 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, var ret UploadResult err = json.Unmarshal(resp_body, &ret) if err != nil { - log.Println("failing to read upload resonse", uploadUrl, resp_body) - return nil, err + log.Println("failing to read upload resonse", uploadUrl, resp_body) + return nil, err } - if ret.Error != "" { - return nil, errors.New(ret.Error) + if ret.Error != ""{ + return nil, errors.New(ret.Error) } return &ret, nil } diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go index ce0094a7c..7cabf626e 100644 --- a/weed-fs/src/pkg/replication/volume_growth.go +++ b/weed-fs/src/pkg/replication/volume_growth.go @@ -7,7 +7,7 @@ import ( "pkg/operation" "pkg/storage" "pkg/topology" - "sync" + "sync" ) /* @@ -24,7 +24,7 @@ type VolumeGrowth struct { copy3factor int copyAll int - accessLock sync.Mutex + accessLock sync.Mutex } func NewDefaultVolumeGrowth() *VolumeGrowth { @@ -49,8 +49,8 @@ func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topolo return 0, errors.New("Unknown Replication Type!") } func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) { - vg.accessLock.Lock() - defer vg.accessLock.Unlock() + vg.accessLock.Lock() + defer vg.accessLock.Unlock() counter = 0 switch repType { @@ -182,7 +182,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { for _, server := range servers { if err := operation.AllocateVolume(server, vid, repType); err == nil { - vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version: storage.CurrentVersion} + vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version:storage.CurrentVersion} server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(&vi, server) fmt.Println("Created Volume", vid, "on", server) diff --git a/weed-fs/src/pkg/replication/volume_growth_test.go b/weed-fs/src/pkg/replication/volume_growth_test.go index 659564c64..51e47c193 100644 --- a/weed-fs/src/pkg/replication/volume_growth_test.go +++ b/weed-fs/src/pkg/replication/volume_growth_test.go @@ -5,7 +5,7 @@ import ( "fmt" "math/rand" "pkg/storage" - "pkg/topology" + "pkg/topology" "testing" "time" ) @@ -80,7 +80,7 @@ func setup(topologyLayout string) *topology.Topology { fmt.Println("data:", data) //need to connect all nodes first before server adding volumes - topo := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", "/tmp", "testing", 32*1024, 5) + topo := topology.NewTopology("mynetwork","/etc/weedfs/weedfs.conf","/tmp","testing",32*1024, 5) mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := topology.NewDataCenter(dcKey) @@ -96,7 +96,7 @@ func setup(topologyLayout string) *topology.Topology { rack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) - vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} + vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) @@ -121,9 +121,10 @@ func TestRemoveDataCenter(t *testing.T) { func TestReserveOneVolume(t *testing.T) { topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - vg := &VolumeGrowth{copy1factor: 3, copy2factor: 2, copy3factor: 1, copyAll: 4} - if c, e := vg.GrowByCountAndType(1, storage.Copy000, topo); e == nil { - t.Log("reserved", c) - } + rand.Seed(time.Now().UnixNano()) + vg:=&VolumeGrowth{copy1factor:3,copy2factor:2,copy3factor:1,copyAll:4} + if c, e := vg.GrowByCountAndType(1,storage.Copy000,topo);e==nil{ + t.Log("reserved", c) + } } + diff --git a/weed-fs/src/pkg/sequence/sequence.go b/weed-fs/src/pkg/sequence/sequence.go index c85289468..bfdf1b368 100644 --- a/weed-fs/src/pkg/sequence/sequence.go +++ b/weed-fs/src/pkg/sequence/sequence.go @@ -1,11 +1,11 @@ package sequence import ( - "encoding/gob" - "log" - "os" - "path" + "encoding/gob" + "os" + "path" "sync" + "log" ) const ( @@ -27,21 +27,21 @@ type SequencerImpl struct { } func NewSequencer(dirname string, filename string) (m *SequencerImpl) { - m = &SequencerImpl{dir: dirname, fileName: filename} + m = &SequencerImpl{dir: dirname, fileName: filename} - seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) - if se != nil { - m.FileIdSequence = FileIdSaveInterval - log.Println("Setting file id sequence", m.FileIdSequence) - } else { - decoder := gob.NewDecoder(seqFile) - defer seqFile.Close() - decoder.Decode(&m.FileIdSequence) - log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) - //in case the server stops between intervals - m.FileIdSequence += FileIdSaveInterval - } - return + seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) + if se != nil { + m.FileIdSequence = FileIdSaveInterval + log.Println("Setting file id sequence", m.FileIdSequence) + } else { + decoder := gob.NewDecoder(seqFile) + defer seqFile.Close() + decoder.Decode(&m.FileIdSequence) + log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) + //in case the server stops between intervals + m.FileIdSequence += FileIdSaveInterval + } + return } //count should be 1 or more @@ -60,12 +60,12 @@ func (m *SequencerImpl) NextFileId(count int) (uint64, int) { return m.FileIdSequence - m.fileIdCounter, count } func (m *SequencerImpl) saveSequence() { - log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) - seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) - if e != nil { - log.Fatalf("Sequence File Save [ERROR] %s\n", e) - } - defer seqFile.Close() - encoder := gob.NewEncoder(seqFile) - encoder.Encode(m.FileIdSequence) + log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) + seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) + if e != nil { + log.Fatalf("Sequence File Save [ERROR] %s\n", e) + } + defer seqFile.Close() + encoder := gob.NewEncoder(seqFile) + encoder.Encode(m.FileIdSequence) } diff --git a/weed-fs/src/pkg/storage/compact_map.go b/weed-fs/src/pkg/storage/compact_map.go index 90ed42198..7365022ea 100644 --- a/weed-fs/src/pkg/storage/compact_map.go +++ b/weed-fs/src/pkg/storage/compact_map.go @@ -175,23 +175,3 @@ func (cm *CompactMap) Peek() { } } } - -// iterate over the keys by calling iterate on each key till error is returned -func (cm *CompactMap) Walk(pedestrian func(*NeedleValue) error) (err error) { - var i int - for _, cs := range cm.list { - for key := cs.start; key < cs.end; key++ { - if i = cs.binarySearchValues(key); i >= 0 { - if err = pedestrian(&cs.values[i]); err != nil { - return - } - } - } - for _, val := range cs.overflow { - if err = pedestrian(val); err != nil { - return err - } - } - } - return nil -} diff --git a/weed-fs/src/pkg/storage/compact_map_perf_test.go b/weed-fs/src/pkg/storage/compact_map_perf_test.go index cfa521fc8..2e2227279 100644 --- a/weed-fs/src/pkg/storage/compact_map_perf_test.go +++ b/weed-fs/src/pkg/storage/compact_map_perf_test.go @@ -1,43 +1,43 @@ package storage import ( - "log" - "os" - "pkg/util" "testing" + "log" + "os" + "pkg/util" ) func TestMemoryUsage(t *testing.T) { - indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644) - if ie != nil { - log.Fatalln(ie) - } - LoadNewNeedleMap(indexFile) - + indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644) + if ie != nil { + log.Fatalln(ie) + } + LoadNewNeedleMap(indexFile) + } func LoadNewNeedleMap(file *os.File) CompactMap { - m := NewCompactMap() - bytes := make([]byte, 16*1024) - count, e := file.Read(bytes) - if count > 0 { - fstat, _ := file.Stat() - log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) - } - for count > 0 && e == nil { - for i := 0; i < count; i += 16 { - key := util.BytesToUint64(bytes[i : i+8]) - offset := util.BytesToUint32(bytes[i+8 : i+12]) - size := util.BytesToUint32(bytes[i+12 : i+16]) - if offset > 0 { - m.Set(Key(key), offset, size) - } else { - //delete(m, key) - } - } + m := NewCompactMap() + bytes := make([]byte, 16*1024) + count, e := file.Read(bytes) + if count > 0 { + fstat, _ := file.Stat() + log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) + } + for count > 0 && e == nil { + for i := 0; i < count; i += 16 { + key := util.BytesToUint64(bytes[i : i+8]) + offset := util.BytesToUint32(bytes[i+8 : i+12]) + size := util.BytesToUint32(bytes[i+12 : i+16]) + if offset > 0 { + m.Set(Key(key), offset, size) + } else { + //delete(m, key) + } + } - count, e = file.Read(bytes) - } - return m + count, e = file.Read(bytes) + } + return m } diff --git a/weed-fs/src/pkg/storage/compact_map_test.go b/weed-fs/src/pkg/storage/compact_map_test.go index e76e9578d..c05515b29 100644 --- a/weed-fs/src/pkg/storage/compact_map_test.go +++ b/weed-fs/src/pkg/storage/compact_map_test.go @@ -18,42 +18,42 @@ func TestXYZ(t *testing.T) { m.Set(Key(i), i+11, i+5) } - // for i := uint32(0); i < 100; i++ { - // if v := m.Get(Key(i)); v != nil { - // println(i, "=", v.Key, v.Offset, v.Size) - // } - // } - +// for i := uint32(0); i < 100; i++ { +// if v := m.Get(Key(i)); v != nil { +// println(i, "=", v.Key, v.Offset, v.Size) +// } +// } + for i := uint32(0); i < 10*batch; i++ { - v, ok := m.Get(Key(i)) + v, ok := m.Get(Key(i)) if i%3 == 0 { - if !ok { - t.Fatal("key", i, "missing!") - } + if !ok { + t.Fatal("key", i, "missing!") + } if v.Size != i+5 { t.Fatal("key", i, "size", v.Size) } } else if i%37 == 0 { - if ok && v.Size > 0 { + if ok && v.Size > 0 { t.Fatal("key", i, "should have been deleted needle value", v) } } else if i%2 == 0 { - if v.Size != i { + if v.Size != i { t.Fatal("key", i, "size", v.Size) } } } for i := uint32(10 * batch); i < 100*batch; i++ { - v, ok := m.Get(Key(i)) + v, ok := m.Get(Key(i)) if i%37 == 0 { - if ok && v.Size > 0 { - t.Fatal("key", i, "should have been deleted needle value", v) - } + if ok && v.Size > 0 { + t.Fatal("key", i, "should have been deleted needle value", v) + } } else if i%2 == 0 { - if v == nil { - t.Fatal("key", i, "missing") - } + if v==nil{ + t.Fatal("key", i, "missing") + } if v.Size != i { t.Fatal("key", i, "size", v.Size) } diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index 9ce38c1f2..e01c27630 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -98,8 +98,3 @@ func (nm *NeedleMap) Close() { func (nm *NeedleMap) ContentSize() uint64 { return nm.fileByteCounter } - -// iterate through all needles using the iterator function -func (nm *NeedleMap) Walk(pedestrian func(*NeedleValue) error) (err error) { - return nm.m.Walk(pedestrian) -} diff --git a/weed-fs/src/pkg/storage/needle_read_write.go b/weed-fs/src/pkg/storage/needle_read_write.go index fdb09a3c6..00844bad3 100644 --- a/weed-fs/src/pkg/storage/needle_read_write.go +++ b/weed-fs/src/pkg/storage/needle_read_write.go @@ -2,10 +2,10 @@ package storage import ( "errors" - "fmt" "io" "os" "pkg/util" + "fmt" ) func (n *Needle) Append(w io.Writer, version Version) uint32 { @@ -62,8 +62,7 @@ func (n *Needle) Append(w io.Writer, version Version) uint32 { return n.Size } func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { - switch version { - case Version1: + if version == Version1 { bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) ret, e := r.Read(bytes) n.readNeedleHeader(bytes) @@ -73,7 +72,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { return 0, errors.New("CRC error! Data On Disk Corrupted!") } return ret, e - case Version2: + } else if version == Version2 { if size == 0 { return 0, nil } @@ -96,7 +95,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { } return ret, e } - return 0, fmt.Errorf("Unsupported Version! (%d)", version) + return 0, errors.New("Unsupported Version!") } func (n *Needle) readNeedleHeader(bytes []byte) { n.Cookie = util.BytesToUint32(bytes[0:4]) diff --git a/weed-fs/src/pkg/storage/replication_type.go b/weed-fs/src/pkg/storage/replication_type.go index 0902d1016..86a9d219d 100644 --- a/weed-fs/src/pkg/storage/replication_type.go +++ b/weed-fs/src/pkg/storage/replication_type.go @@ -1,123 +1,123 @@ package storage import ( - "errors" + "errors" ) type ReplicationType string const ( - Copy000 = ReplicationType("000") // single copy - Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center - Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center - Copy100 = ReplicationType("100") // 2 copies, each on different data center - Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center - Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center - LengthRelicationType = 6 - CopyNil = ReplicationType(255) // nil value + Copy000 = ReplicationType("000") // single copy + Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center + Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center + Copy100 = ReplicationType("100") // 2 copies, each on different data center + Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center + Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center + LengthRelicationType = 6 + CopyNil = ReplicationType(255) // nil value ) func NewReplicationTypeFromString(t string) (ReplicationType, error) { - switch t { - case "000": - return Copy000, nil - case "001": - return Copy001, nil - case "010": - return Copy010, nil - case "100": - return Copy100, nil - case "110": - return Copy110, nil - case "200": - return Copy200, nil - } - return Copy000, errors.New("Unknown Replication Type:" + t) + switch t { + case "000": + return Copy000, nil + case "001": + return Copy001, nil + case "010": + return Copy010, nil + case "100": + return Copy100, nil + case "110": + return Copy110, nil + case "200": + return Copy200, nil + } + return Copy000, errors.New("Unknown Replication Type:"+t) } func NewReplicationTypeFromByte(b byte) (ReplicationType, error) { - switch b { - case byte(000): - return Copy000, nil - case byte(001): - return Copy001, nil - case byte(010): - return Copy010, nil - case byte(100): - return Copy100, nil - case byte(110): - return Copy110, nil - case byte(200): - return Copy200, nil - } - return Copy000, errors.New("Unknown Replication Type:" + string(b)) + switch b { + case byte(000): + return Copy000, nil + case byte(001): + return Copy001, nil + case byte(010): + return Copy010, nil + case byte(100): + return Copy100, nil + case byte(110): + return Copy110, nil + case byte(200): + return Copy200, nil + } + return Copy000, errors.New("Unknown Replication Type:"+string(b)) } func (r *ReplicationType) String() string { - switch *r { - case Copy000: - return "000" - case Copy001: - return "001" - case Copy010: - return "010" - case Copy100: - return "100" - case Copy110: - return "110" - case Copy200: - return "200" - } - return "000" + switch *r { + case Copy000: + return "000" + case Copy001: + return "001" + case Copy010: + return "010" + case Copy100: + return "100" + case Copy110: + return "110" + case Copy200: + return "200" + } + return "000" } func (r *ReplicationType) Byte() byte { - switch *r { - case Copy000: - return byte(000) - case Copy001: - return byte(001) - case Copy010: - return byte(010) - case Copy100: - return byte(100) - case Copy110: - return byte(110) - case Copy200: - return byte(200) - } - return byte(000) + switch *r { + case Copy000: + return byte(000) + case Copy001: + return byte(001) + case Copy010: + return byte(010) + case Copy100: + return byte(100) + case Copy110: + return byte(110) + case Copy200: + return byte(200) + } + return byte(000) } -func (repType ReplicationType) GetReplicationLevelIndex() int { - switch repType { - case Copy000: - return 0 - case Copy001: - return 1 - case Copy010: - return 2 - case Copy100: - return 3 - case Copy110: - return 4 - case Copy200: - return 5 - } - return -1 +func (repType ReplicationType)GetReplicationLevelIndex() int { + switch repType { + case Copy000: + return 0 + case Copy001: + return 1 + case Copy010: + return 2 + case Copy100: + return 3 + case Copy110: + return 4 + case Copy200: + return 5 + } + return -1 } -func (repType ReplicationType) GetCopyCount() int { - switch repType { - case Copy000: - return 1 - case Copy001: - return 2 - case Copy010: - return 2 - case Copy100: - return 2 - case Copy110: - return 3 - case Copy200: - return 3 - } - return 0 +func (repType ReplicationType)GetCopyCount() int { + switch repType { + case Copy000: + return 1 + case Copy001: + return 2 + case Copy010: + return 2 + case Copy100: + return 2 + case Copy110: + return 3 + case Copy200: + return 3 + } + return 0 } diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index 79cf65e28..a2c5f040b 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -65,13 +65,13 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error } return e } -func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) { +func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error { if s.volumes[vid] != nil { return errors.New("Volume Id " + vid.String() + " already exists!") } log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType) - s.volumes[vid], err = NewVolume(s.dir, vid, replicationType) - return err + s.volumes[vid] = NewVolume(s.dir, vid, replicationType) + return nil } func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { @@ -107,10 +107,9 @@ func (s *Store) loadExistingVolumes() { base := name[:len(name)-len(".dat")] if vid, err := NewVolumeId(base); err == nil { if s.volumes[vid] == nil { - if v, e := NewVolume(s.dir, vid, CopyNil); e == nil { - s.volumes[vid] = v - log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size()) - } + v := NewVolume(s.dir, vid, CopyNil) + s.volumes[vid] = v + log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size()) } } } diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 5e64d0763..0220bf895 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -24,9 +24,9 @@ type Volume struct { accessLock sync.Mutex } -func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) { +func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) { v = &Volume{dir: dirname, Id: id, replicaType: replicationType} - e = v.load() + v.load() return } func (v *Volume) load() error { @@ -43,7 +43,6 @@ func (v *Volume) load() error { } else { v.maybeWriteSuperBlock() } - // TODO: if .idx not exists, but .cdb exists, then use (but don't load!) that indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) if ie != nil { return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) @@ -80,23 +79,21 @@ func (v *Volume) maybeWriteSuperBlock() { v.dataFile.Write(header) } } -func (v *Volume) readSuperBlock() (err error) { +func (v *Volume) readSuperBlock() error { v.dataFile.Seek(0, 0) header := make([]byte, SuperBlockSize) if _, e := v.dataFile.Read(header); e != nil { return fmt.Errorf("cannot read superblock: %s", e) } + var err error v.version, v.replicaType, err = ParseSuperBlock(header) return err } -func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, err error) { +func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, e error) { version = Version(header[0]) - if version == 0 { - err = errors.New("Zero version impossible - bad superblock!") - return - } + var err error if replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { - err = fmt.Errorf("cannot read replica type: %s", err) + e = fmt.Errorf("cannot read replica type: %s", err) } return } @@ -224,39 +221,3 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string) func (v *Volume) ContentSize() uint64 { return v.nm.fileByteCounter } - -// Walk over the contained needles (call the function with each NeedleValue till error is returned) -func (v *Volume) WalkValues(pedestrian func(*Needle) error) error { - pedplus := func(nv *NeedleValue) (err error) { - n := new(Needle) - if nv.Offset > 0 { - v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0) - if _, err = n.Read(v.dataFile, nv.Size, v.version); err != nil { - return - } - if err = pedestrian(n); err != nil { - return - } - } - return nil - } - return v.nm.Walk(pedplus) -} - -// Walk over the keys -func (v *Volume) WalkKeys(pedestrian func(Key) error) error { - pedplus := func(nv *NeedleValue) (err error) { - if nv.Offset > 0 && nv.Key > 0 { - if err = pedestrian(nv.Key); err != nil { - return - } - } - return nil - } - return v.nm.Walk(pedplus) -} - -func (v *Volume) String() string { - return fmt.Sprintf("%d@%s:v%d:r%s", v.Id, v.dataFile.Name(), - v.Version(), v.replicaType) -} diff --git a/weed-fs/src/pkg/storage/volume_id.go b/weed-fs/src/pkg/storage/volume_id.go index 0333c6cf0..bf7396673 100644 --- a/weed-fs/src/pkg/storage/volume_id.go +++ b/weed-fs/src/pkg/storage/volume_id.go @@ -1,18 +1,17 @@ package storage import ( - "strconv" + "strconv" ) type VolumeId uint32 - -func NewVolumeId(vid string) (VolumeId, error) { - volumeId, err := strconv.ParseUint(vid, 10, 64) - return VolumeId(volumeId), err +func NewVolumeId(vid string) (VolumeId,error) { + volumeId, err := strconv.ParseUint(vid, 10, 64) + return VolumeId(volumeId), err } -func (vid *VolumeId) String() string { - return strconv.FormatUint(uint64(*vid), 10) +func (vid *VolumeId) String() string{ + return strconv.FormatUint(uint64(*vid), 10) } -func (vid *VolumeId) Next() VolumeId { - return VolumeId(uint32(*vid) + 1) +func (vid *VolumeId) Next() VolumeId{ + return VolumeId(uint32(*vid)+1) } diff --git a/weed-fs/src/pkg/storage/volume_version.go b/weed-fs/src/pkg/storage/volume_version.go index 9702ae904..da91ad038 100644 --- a/weed-fs/src/pkg/storage/volume_version.go +++ b/weed-fs/src/pkg/storage/volume_version.go @@ -1,11 +1,12 @@ package storage -import () +import ( +) type Version uint8 const ( - Version1 = Version(1) - Version2 = Version(2) - CurrentVersion = Version2 + Version1 = Version(1) + Version2 = Version(2) + CurrentVersion = Version2 ) diff --git a/weed-fs/src/pkg/topology/configuration_test.go b/weed-fs/src/pkg/topology/configuration_test.go index 35d82c058..5542d1503 100644 --- a/weed-fs/src/pkg/topology/configuration_test.go +++ b/weed-fs/src/pkg/topology/configuration_test.go @@ -30,13 +30,13 @@ func TestLoadConfiguration(t *testing.T) { ` c, err := NewConfiguration([]byte(confContent)) - - fmt.Printf("%s\n", c) - if err != nil { - t.Fatalf("unmarshal error:%s", err.Error()) + + fmt.Printf("%s\n", c) + if err!=nil{ + t.Fatalf("unmarshal error:%s",err.Error()) } - + if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" { - t.Fatalf("unmarshal error:%s", c) + t.Fatalf("unmarshal error:%s",c) } } diff --git a/weed-fs/src/pkg/topology/data_center.go b/weed-fs/src/pkg/topology/data_center.go index a3b2b7d13..2ec68fd4b 100644 --- a/weed-fs/src/pkg/topology/data_center.go +++ b/weed-fs/src/pkg/topology/data_center.go @@ -1,6 +1,7 @@ package topology -import () +import ( +) type DataCenter struct { NodeImpl @@ -11,31 +12,31 @@ func NewDataCenter(id string) *DataCenter { dc.id = NodeId(id) dc.nodeType = "DataCenter" dc.children = make(map[NodeId]Node) - dc.NodeImpl.value = dc + dc.NodeImpl.value = dc return dc } func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { - for _, c := range dc.Children() { - rack := c.(*Rack) - if string(rack.Id()) == rackName { - return rack - } - } - rack := NewRack(rackName) - dc.LinkChildNode(rack) - return rack + for _, c := range dc.Children() { + rack := c.(*Rack) + if string(rack.Id()) == rackName { + return rack + } + } + rack := NewRack(rackName) + dc.LinkChildNode(rack) + return rack } -func (dc *DataCenter) ToMap() interface{} { - m := make(map[string]interface{}) - m["Max"] = dc.GetMaxVolumeCount() - m["Free"] = dc.FreeSpace() - var racks []interface{} - for _, c := range dc.Children() { - rack := c.(*Rack) - racks = append(racks, rack.ToMap()) - } - m["Racks"] = racks - return m +func (dc *DataCenter) ToMap() interface{}{ + m := make(map[string]interface{}) + m["Max"] = dc.GetMaxVolumeCount() + m["Free"] = dc.FreeSpace() + var racks []interface{} + for _, c := range dc.Children() { + rack := c.(*Rack) + racks = append(racks, rack.ToMap()) + } + m["Racks"] = racks + return m } diff --git a/weed-fs/src/pkg/topology/node_list.go b/weed-fs/src/pkg/topology/node_list.go index 3115e0213..1d9e1891a 100644 --- a/weed-fs/src/pkg/topology/node_list.go +++ b/weed-fs/src/pkg/topology/node_list.go @@ -37,14 +37,14 @@ func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) { list = append(list, n) } } - if n > len(list) { - return nil, false + if n > len(list){ + return nil,false } for i := n; i > 0; i-- { - r := rand.Intn(i) - t := list[r] - list[r] = list[i-1] - list[i-1] = t + r := rand.Intn(i) + t := list[r] + list[r] = list[i-1] + list[i-1] = t } return list[len(list)-n:], true } diff --git a/weed-fs/src/pkg/topology/node_list_test.go b/weed-fs/src/pkg/topology/node_list_test.go index 2fb4fa970..0d16a0526 100644 --- a/weed-fs/src/pkg/topology/node_list_test.go +++ b/weed-fs/src/pkg/topology/node_list_test.go @@ -1,39 +1,39 @@ package topology import ( - _ "fmt" "strconv" "testing" + _ "fmt" ) func TestXYZ(t *testing.T) { - topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5) + topo := NewTopology("topo","/etc/weed.conf", "/tmp","test",234,5) for i := 0; i < 5; i++ { dc := NewDataCenter("dc" + strconv.Itoa(i)) dc.activeVolumeCount = i dc.maxVolumeCount = 5 topo.LinkChildNode(dc) } - nl := NewNodeList(topo.Children(), nil) + nl := NewNodeList(topo.Children(),nil) - picked, ret := nl.RandomlyPickN(1) - if !ret || len(picked) != 1 { - t.Errorf("need to randomly pick 1 node") - } + picked, ret := nl.RandomlyPickN(1) + if !ret || len(picked)!=1 { + t.Errorf("need to randomly pick 1 node") + } picked, ret = nl.RandomlyPickN(4) - if !ret || len(picked) != 4 { - t.Errorf("need to randomly pick 4 nodes") + if !ret || len(picked)!=4 { + t.Errorf("need to randomly pick 4 nodes") } - picked, ret = nl.RandomlyPickN(5) - if !ret || len(picked) != 5 { - t.Errorf("need to randomly pick 5 nodes") - } + picked, ret = nl.RandomlyPickN(5) + if !ret || len(picked)!=5 { + t.Errorf("need to randomly pick 5 nodes") + } - picked, ret = nl.RandomlyPickN(6) - if ret || len(picked) != 0 { - t.Errorf("can not randomly pick 6 nodes:", ret, picked) - } + picked, ret = nl.RandomlyPickN(6) + if ret || len(picked)!=0 { + t.Errorf("can not randomly pick 6 nodes:", ret, picked) + } } diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go index acc34417a..1555ef682 100644 --- a/weed-fs/src/pkg/topology/rack.go +++ b/weed-fs/src/pkg/topology/rack.go @@ -19,13 +19,13 @@ func NewRack(id string) *Rack { } func (r *Rack) FindDataNode(ip string, port int) *DataNode { - for _, c := range r.Children() { - dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - return dn - } - } - return nil + for _, c := range r.Children() { + dn := c.(*DataNode) + if dn.MatchLocation(ip, port) { + return dn + } + } + return nil } func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { for _, c := range r.Children() { diff --git a/weed-fs/src/pkg/topology/topo_test.go b/weed-fs/src/pkg/topology/topo_test.go index 71a901c8e..83356b38c 100644 --- a/weed-fs/src/pkg/topology/topo_test.go +++ b/weed-fs/src/pkg/topology/topo_test.go @@ -78,7 +78,7 @@ func setup(topologyLayout string) *Topology { } //need to connect all nodes first before server adding volumes - topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5) + topo := NewTopology("mynetwork","/etc/weed.conf","/tmp","test",234,5) mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := NewDataCenter(dcKey) @@ -94,7 +94,7 @@ func setup(topologyLayout string) *Topology { rack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) - vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} + vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) @@ -119,9 +119,9 @@ func TestRemoveDataCenter(t *testing.T) { func TestReserveOneVolume(t *testing.T) { topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - rand.Seed(1) + rand.Seed(time.Now().UnixNano()) + rand.Seed(1) ret, node, vid := topo.RandomlyReserveOneVolume() - fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid) + fmt.Println("assigned :", ret, ", node :", node,", volume id:", vid) } diff --git a/weed-fs/src/pkg/topology/topology_compact.go b/weed-fs/src/pkg/topology/topology_compact.go index dee6514f4..050fe4cd8 100644 --- a/weed-fs/src/pkg/topology/topology_compact.go +++ b/weed-fs/src/pkg/topology/topology_compact.go @@ -101,10 +101,10 @@ type VacuumVolumeResult struct { func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) { values := make(url.Values) values.Add("volume", vid.String()) - values.Add("garbageThreshold", garbageThreshold) + values.Add("garbageThreshold", garbageThreshold) jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values) if err != nil { - fmt.Println("parameters:", values) + fmt.Println("parameters:",values) return err, false } var ret VacuumVolumeResult diff --git a/weed-fs/src/pkg/topology/topology_event_handling.go b/weed-fs/src/pkg/topology/topology_event_handling.go index debedc3d3..b33b4d768 100644 --- a/weed-fs/src/pkg/topology/topology_event_handling.go +++ b/weed-fs/src/pkg/topology/topology_event_handling.go @@ -52,14 +52,14 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { vl := t.GetVolumeLayout(v.RepType) vl.SetVolumeUnavailable(dn, v.Id) } - dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) + dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) dn.Parent().UnlinkChildNode(dn.Id()) } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.RepType) + vl := t.GetVolumeLayout(v.RepType) if vl.isWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) } diff --git a/weed-fs/src/pkg/topology/topology_map.go b/weed-fs/src/pkg/topology/topology_map.go index b416ee943..9ccf08ae3 100644 --- a/weed-fs/src/pkg/topology/topology_map.go +++ b/weed-fs/src/pkg/topology/topology_map.go @@ -1,6 +1,7 @@ package topology -import () +import ( +) func (t *Topology) ToMap() interface{} { m := make(map[string]interface{}) diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go index 507a240b5..64d8cdf43 100644 --- a/weed-fs/src/pkg/topology/volume_location.go +++ b/weed-fs/src/pkg/topology/volume_location.go @@ -15,7 +15,7 @@ func (dnll *VolumeLocationList) Head() *DataNode { } func (dnll *VolumeLocationList) Length() int { - return len(dnll.list) + return len(dnll.list) } func (dnll *VolumeLocationList) Add(loc *DataNode) bool { @@ -29,13 +29,13 @@ func (dnll *VolumeLocationList) Add(loc *DataNode) bool { } func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { - for i, dnl := range dnll.list { - if loc.Ip == dnl.Ip && loc.Port == dnl.Port { - dnll.list = append(dnll.list[:i], dnll.list[i+1:]...) - return true - } - } - return false + for i, dnl := range dnll.list { + if loc.Ip == dnl.Ip && loc.Port == dnl.Port { + dnll.list = append(dnll.list[:i],dnll.list[i+1:]...) + return true + } + } + return false } func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { diff --git a/weed-fs/src/pkg/util/bytes.go b/weed-fs/src/pkg/util/bytes.go index 6cc3d7018..177da06db 100644 --- a/weed-fs/src/pkg/util/bytes.go +++ b/weed-fs/src/pkg/util/bytes.go @@ -1,33 +1,34 @@ package util -func BytesToUint64(b []byte) (v uint64) { - length := uint(len(b)) - for i := uint(0); i < length-1; i++ { - v += uint64(b[i]) - v <<= 8 - } - v += uint64(b[length-1]) - return +func BytesToUint64(b []byte)(v uint64){ + length := uint(len(b)) + for i :=uint(0);i> (i * 8)) - } +func Uint64toBytes(b []byte, v uint64){ + for i :=uint(0);i<8;i++ { + b[7-i] = byte(v>>(i*8)) + } } -func Uint32toBytes(b []byte, v uint32) { - for i := uint(0); i < 4; i++ { - b[3-i] = byte(v >> (i * 8)) - } +func Uint32toBytes(b []byte, v uint32){ + for i :=uint(0);i<4;i++ { + b[3-i] = byte(v>>(i*8)) + } } -func Uint8toBytes(b []byte, v uint8) { - b[0] = byte(v) +func Uint8toBytes(b []byte, v uint8){ + b[0] = byte(v) } + diff --git a/weed-fs/src/pkg/util/parse.go b/weed-fs/src/pkg/util/parse.go index 930da9522..6a4350e72 100644 --- a/weed-fs/src/pkg/util/parse.go +++ b/weed-fs/src/pkg/util/parse.go @@ -1,16 +1,16 @@ package util import ( - "strconv" + "strconv" ) -func ParseInt(text string, defaultValue int) int { - count, parseError := strconv.ParseUint(text, 10, 64) - if parseError != nil { - if len(text) > 0 { - return 0 - } - return defaultValue - } - return int(count) +func ParseInt(text string, defaultValue int) int{ + count, parseError := strconv.ParseUint(text,10,64) + if parseError!=nil { + if len(text)>0{ + return 0 + } + return defaultValue + } + return int(count) } diff --git a/weed-fs/src/pkg/util/post.go b/weed-fs/src/pkg/util/post.go index 6e6ab0003..f643faa6b 100644 --- a/weed-fs/src/pkg/util/post.go +++ b/weed-fs/src/pkg/util/post.go @@ -16,7 +16,7 @@ func Post(url string, values url.Values) ([]byte, error) { defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) if err != nil { - log.Println("read post result from", url, err) + log.Println("read post result from", url, err) return nil, err } return b, nil