From 5d2a1e8d4845e7a7f1dccd962bb0ee6a5f9d6081 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tam=C3=A1s=20Gul=C3=A1csi?= Date: Sat, 5 Jan 2013 23:06:44 +0100 Subject: [PATCH] add cmd/dump - a dumper Walk needed to be added to NeedleMap and CompactMap, to be able to add WalkKeys and WalkValues to volume. This is needed for iterating through all the stored needles in a volume - this was dump's purpose. --- weed-fs/src/cmd/dump/main.go | 96 +++++++++ weed-fs/src/cmd/weed/command.go | 59 +++-- weed-fs/src/cmd/weed/shell.go | 73 ++++--- weed-fs/src/cmd/weed/upload.go | 2 +- weed-fs/src/cmd/weed/weed.go | 4 +- weed-fs/src/pkg/directory/file_id.go | 6 +- weed-fs/src/pkg/operation/allocate_volume.go | 44 ++-- weed-fs/src/pkg/operation/delete_content.go | 2 +- weed-fs/src/pkg/operation/lookup_volume_id.go | 50 ++--- weed-fs/src/pkg/operation/upload_content.go | 18 +- weed-fs/src/pkg/replication/volume_growth.go | 10 +- .../src/pkg/replication/volume_growth_test.go | 17 +- weed-fs/src/pkg/sequence/sequence.go | 52 ++--- weed-fs/src/pkg/storage/compact_map.go | 20 ++ .../src/pkg/storage/compact_map_perf_test.go | 60 +++--- weed-fs/src/pkg/storage/compact_map_test.go | 38 ++-- weed-fs/src/pkg/storage/needle_map.go | 5 + weed-fs/src/pkg/storage/needle_read_write.go | 9 +- weed-fs/src/pkg/storage/replication_type.go | 202 +++++++++--------- weed-fs/src/pkg/storage/store.go | 13 +- weed-fs/src/pkg/storage/volume.go | 53 ++++- weed-fs/src/pkg/storage/volume_id.go | 17 +- weed-fs/src/pkg/storage/volume_version.go | 9 +- .../src/pkg/topology/configuration_test.go | 12 +- weed-fs/src/pkg/topology/data_center.go | 45 ++-- weed-fs/src/pkg/topology/node_list.go | 12 +- weed-fs/src/pkg/topology/node_list_test.go | 38 ++-- weed-fs/src/pkg/topology/rack.go | 14 +- weed-fs/src/pkg/topology/topo_test.go | 10 +- weed-fs/src/pkg/topology/topology_compact.go | 4 +- .../pkg/topology/topology_event_handling.go | 4 +- weed-fs/src/pkg/topology/topology_map.go | 3 +- weed-fs/src/pkg/topology/volume_location.go | 16 +- weed-fs/src/pkg/util/bytes.go | 53 +++-- weed-fs/src/pkg/util/parse.go | 20 +- weed-fs/src/pkg/util/post.go | 2 +- 36 files changed, 624 insertions(+), 468 deletions(-) create mode 100644 weed-fs/src/cmd/dump/main.go diff --git a/weed-fs/src/cmd/dump/main.go b/weed-fs/src/cmd/dump/main.go new file mode 100644 index 000000000..e3e151eb7 --- /dev/null +++ b/weed-fs/src/cmd/dump/main.go @@ -0,0 +1,96 @@ +// Copyright Tamás Gulácsi 2013 All rights reserved +// Use of this source is governed by the same rules as the weed-fs library. +// If this would be ambigous, than Apache License 2.0 has to be used. +// +// dump dumps the files of a volume to tar or unique files. +// Each file will have id#mimetype#original_name file format + +package main + +import ( + "archive/tar" + "bytes" + "flag" + "fmt" + // "io" + "log" + "os" + "pkg/storage" + "strings" + "time" +) + +var ( + volumePath = flag.String("dir", "/tmp", "volume directory") + volumeId = flag.Int("id", 0, "volume Id") + dest = flag.String("out", "-", "output path. Produces tar if path ends with .tar; creates files otherwise.") + tarFh *tar.Writer + tarHeader tar.Header + counter int +) + +func main() { + var err error + + flag.Parse() + + if *dest == "-" { + *dest = "" + } + if *dest == "" || strings.HasSuffix(*dest, ".tar") { + var fh *os.File + if *dest == "" { + fh = os.Stdout + } else { + if fh, err = os.Create(*dest); err != nil { + log.Printf("cannot open output tar %s: %s", *dest, err) + return + } + } + defer fh.Close() + tarFh = tar.NewWriter(fh) + defer tarFh.Close() + t := time.Now() + tarHeader = tar.Header{Mode: 0644, + ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(), + Typeflag: tar.TypeReg, + AccessTime: t, ChangeTime: t} + } + + v, err := storage.NewVolume(*volumePath, storage.VolumeId(*volumeId), storage.CopyNil) + if v == nil || v.Version() == 0 || err != nil { + log.Printf("cannot load volume %d from %s (%s): %s", *volumeId, *volumePath, v, err) + return + } + log.Printf("volume: %s (ver. %d)", v, v.Version()) + if err := v.WalkValues(walker); err != nil { + log.Printf("error while walking: %s", err) + return + } + + log.Printf("%d files written.", counter) +} + +func walker(n *storage.Needle) (err error) { + // log.Printf("Id=%d Size=%d Name=%s mime=%s", n.Id, n.Size, n.Name, n.Mime) + nm := fmt.Sprintf("%d#%s#%s", n.Id, bytes.Replace(n.Mime, []byte{'/'}, []byte{'_'}, -1), n.Name) + // log.Print(nm) + if tarFh != nil { + tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data)) + if err = tarFh.WriteHeader(&tarHeader); err != nil { + return err + } + _, err = tarFh.Write(n.Data) + } else { + if fh, e := os.Create(*dest + "/" + nm); e != nil { + return e + } else { + defer fh.Close() + _, err = fh.Write(n.Data) + } + } + if err == nil { + counter++ + } + return +} diff --git a/weed-fs/src/cmd/weed/command.go b/weed-fs/src/cmd/weed/command.go index 8c725cafb..4d68ff151 100644 --- a/weed-fs/src/cmd/weed/command.go +++ b/weed-fs/src/cmd/weed/command.go @@ -1,53 +1,52 @@ package main import ( - "flag" - "fmt" - "os" - "strings" + "flag" + "fmt" + "os" + "strings" ) type Command struct { - // Run runs the command. - // The args are the arguments after the command name. - Run func(cmd *Command, args []string) bool + // Run runs the command. + // The args are the arguments after the command name. + Run func(cmd *Command, args []string) bool - // UsageLine is the one-line usage message. - // The first word in the line is taken to be the command name. - UsageLine string + // UsageLine is the one-line usage message. + // The first word in the line is taken to be the command name. + UsageLine string - // Short is the short description shown in the 'go help' output. - Short string + // Short is the short description shown in the 'go help' output. + Short string - // Long is the long message shown in the 'go help ' output. - Long string - - // Flag is a set of flags specific to this command. - Flag flag.FlagSet + // Long is the long message shown in the 'go help ' output. + Long string + // Flag is a set of flags specific to this command. + Flag flag.FlagSet } // Name returns the command's name: the first word in the usage line. func (c *Command) Name() string { - name := c.UsageLine - i := strings.Index(name, " ") - if i >= 0 { - name = name[:i] - } - return name + name := c.UsageLine + i := strings.Index(name, " ") + if i >= 0 { + name = name[:i] + } + return name } func (c *Command) Usage() { - fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) - fmt.Fprintf(os.Stderr, "Default Usage:\n") - c.Flag.PrintDefaults() - fmt.Fprintf(os.Stderr, "Description:\n") - fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) - os.Exit(2) + fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) + fmt.Fprintf(os.Stderr, "Default Usage:\n") + c.Flag.PrintDefaults() + fmt.Fprintf(os.Stderr, "Description:\n") + fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) + os.Exit(2) } // Runnable reports whether the command can be run; otherwise // it is a documentation pseudo-command such as importpath. func (c *Command) Runnable() bool { - return c.Run != nil + return c.Run != nil } diff --git a/weed-fs/src/cmd/weed/shell.go b/weed-fs/src/cmd/weed/shell.go index 78a4b9eb1..daf0b7e1f 100644 --- a/weed-fs/src/cmd/weed/shell.go +++ b/weed-fs/src/cmd/weed/shell.go @@ -1,54 +1,53 @@ package main import ( - "bufio" - "os" - "fmt" + "bufio" + "fmt" + "os" ) func init() { - cmdShell.Run = runShell // break init cycle + cmdShell.Run = runShell // break init cycle } var cmdShell = &Command{ - UsageLine: "shell", - Short: "run interactive commands, now just echo", - Long: `run interactive commands. + UsageLine: "shell", + Short: "run interactive commands, now just echo", + Long: `run interactive commands. `, } -var ( -) +var () func runShell(command *Command, args []string) bool { - r := bufio.NewReader(os.Stdin) - o := bufio.NewWriter(os.Stdout) - e := bufio.NewWriter(os.Stderr) - prompt := func () { - o.WriteString("> ") - o.Flush() - }; - readLine := func () string { - ret, err := r.ReadString('\n') - if err != nil { - fmt.Fprint(e,err); - os.Exit(1) - } - return ret - } - execCmd := func (cmd string) int { - if cmd != "" { - o.WriteString(cmd) - } - return 0 - } + r := bufio.NewReader(os.Stdin) + o := bufio.NewWriter(os.Stdout) + e := bufio.NewWriter(os.Stderr) + prompt := func() { + o.WriteString("> ") + o.Flush() + } + readLine := func() string { + ret, err := r.ReadString('\n') + if err != nil { + fmt.Fprint(e, err) + os.Exit(1) + } + return ret + } + execCmd := func(cmd string) int { + if cmd != "" { + o.WriteString(cmd) + } + return 0 + } - cmd := "" - for { - prompt() - cmd = readLine() - execCmd(cmd) - } - return true + cmd := "" + for { + prompt() + cmd = readLine() + execCmd(cmd) + } + return true } diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go index e25930b5d..5707fda56 100644 --- a/weed-fs/src/cmd/weed/upload.go +++ b/weed-fs/src/cmd/weed/upload.go @@ -67,7 +67,7 @@ func upload(filename string, server string, fid string) (int, error) { } ret, e := operation.Upload("http://"+server+"/"+fid, filename, fh) if e != nil { - return 0, e + return 0, e } return ret.Size, e } diff --git a/weed-fs/src/cmd/weed/weed.go b/weed-fs/src/cmd/weed/weed.go index 232520e75..e2eb41ced 100644 --- a/weed-fs/src/cmd/weed/weed.go +++ b/weed-fs/src/cmd/weed/weed.go @@ -175,9 +175,9 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { w.Header().Set("Content-Type", "application/javascript") var bytes []byte if r.FormValue("pretty") != "" { - bytes, _ = json.MarshalIndent(obj, "", " ") + bytes, _ = json.MarshalIndent(obj, "", " ") } else { - bytes, _ = json.Marshal(obj) + bytes, _ = json.Marshal(obj) } callback := r.FormValue("callback") if callback == "" { diff --git a/weed-fs/src/pkg/directory/file_id.go b/weed-fs/src/pkg/directory/file_id.go index 9ce556580..cd4204f32 100644 --- a/weed-fs/src/pkg/directory/file_id.go +++ b/weed-fs/src/pkg/directory/file_id.go @@ -3,8 +3,8 @@ package directory import ( "encoding/hex" "pkg/storage" - "strings" "pkg/util" + "strings" ) type FileId struct { @@ -16,14 +16,14 @@ type FileId struct { func NewFileId(VolumeId storage.VolumeId, Key uint64, Hashcode uint32) *FileId { return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode} } -func ParseFileId(fid string) *FileId{ +func ParseFileId(fid string) *FileId { a := strings.Split(fid, ",") if len(a) != 2 { println("Invalid fid", fid, ", split length", len(a)) return nil } vid_string, key_hash_string := a[0], a[1] - volumeId, _ := storage.NewVolumeId(vid_string) + volumeId, _ := storage.NewVolumeId(vid_string) key, hash := storage.ParseKeyHash(key_hash_string) return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash} } diff --git a/weed-fs/src/pkg/operation/allocate_volume.go b/weed-fs/src/pkg/operation/allocate_volume.go index 6a3512896..c93ccfb62 100644 --- a/weed-fs/src/pkg/operation/allocate_volume.go +++ b/weed-fs/src/pkg/operation/allocate_volume.go @@ -1,32 +1,32 @@ package operation import ( - "encoding/json" - "errors" - "net/url" - "pkg/storage" - "pkg/topology" - "pkg/util" + "encoding/json" + "errors" + "net/url" + "pkg/storage" + "pkg/topology" + "pkg/util" ) type AllocateVolumeResult struct { - Error string + Error string } func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error { - values := make(url.Values) - values.Add("volume", vid.String()) - values.Add("replicationType", repType.String()) - jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) - if err != nil { - return err - } - var ret AllocateVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err - } - if ret.Error != "" { - return errors.New(ret.Error) - } - return nil + values := make(url.Values) + values.Add("volume", vid.String()) + values.Add("replicationType", repType.String()) + jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) + if err != nil { + return err + } + var ret AllocateVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil } diff --git a/weed-fs/src/pkg/operation/delete_content.go b/weed-fs/src/pkg/operation/delete_content.go index aeab9c3ac..2bdb49651 100644 --- a/weed-fs/src/pkg/operation/delete_content.go +++ b/weed-fs/src/pkg/operation/delete_content.go @@ -1,8 +1,8 @@ package operation import ( - "net/http" "log" + "net/http" ) func Delete(url string) error { diff --git a/weed-fs/src/pkg/operation/lookup_volume_id.go b/weed-fs/src/pkg/operation/lookup_volume_id.go index c46c6670e..50a6d91e6 100644 --- a/weed-fs/src/pkg/operation/lookup_volume_id.go +++ b/weed-fs/src/pkg/operation/lookup_volume_id.go @@ -1,38 +1,38 @@ package operation import ( - "encoding/json" - "net/url" - "pkg/storage" - "pkg/util" - _ "fmt" - "errors" + "encoding/json" + "errors" + _ "fmt" + "net/url" + "pkg/storage" + "pkg/util" ) type Location struct { - Url string "url" - PublicUrl string "publicUrl" + Url string "url" + PublicUrl string "publicUrl" } type LookupResult struct { - Locations []Location "locations" - Error string "error" + Locations []Location "locations" + Error string "error" } //TODO: Add a caching for vid here func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) { - values := make(url.Values) - values.Add("volumeId", vid.String()) - jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) - if err != nil { - return nil, err - } - var ret LookupResult - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, err - } - if ret.Error != ""{ - return nil, errors.New(ret.Error) - } - return &ret, nil + values := make(url.Values) + values.Add("volumeId", vid.String()) + jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) + if err != nil { + return nil, err + } + var ret LookupResult + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, err + } + if ret.Error != "" { + return nil, errors.New(ret.Error) + } + return &ret, nil } diff --git a/weed-fs/src/pkg/operation/upload_content.go b/weed-fs/src/pkg/operation/upload_content.go index 7ed74e02f..0bdb697da 100644 --- a/weed-fs/src/pkg/operation/upload_content.go +++ b/weed-fs/src/pkg/operation/upload_content.go @@ -3,18 +3,18 @@ package operation import ( "bytes" "encoding/json" + "errors" _ "fmt" "io" "io/ioutil" - "log" + "log" "mime/multipart" "net/http" - "errors" ) type UploadResult struct { - Size int - Error string + Size int + Error string } func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) { @@ -26,7 +26,7 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, body_writer.Close() resp, err := http.Post(uploadUrl, content_type, body_buf) if err != nil { - log.Println("failing to upload to", uploadUrl) + log.Println("failing to upload to", uploadUrl) return nil, err } defer resp.Body.Close() @@ -37,11 +37,11 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, var ret UploadResult err = json.Unmarshal(resp_body, &ret) if err != nil { - log.Println("failing to read upload resonse", uploadUrl, resp_body) - return nil, err + log.Println("failing to read upload resonse", uploadUrl, resp_body) + return nil, err } - if ret.Error != ""{ - return nil, errors.New(ret.Error) + if ret.Error != "" { + return nil, errors.New(ret.Error) } return &ret, nil } diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go index 7cabf626e..ce0094a7c 100644 --- a/weed-fs/src/pkg/replication/volume_growth.go +++ b/weed-fs/src/pkg/replication/volume_growth.go @@ -7,7 +7,7 @@ import ( "pkg/operation" "pkg/storage" "pkg/topology" - "sync" + "sync" ) /* @@ -24,7 +24,7 @@ type VolumeGrowth struct { copy3factor int copyAll int - accessLock sync.Mutex + accessLock sync.Mutex } func NewDefaultVolumeGrowth() *VolumeGrowth { @@ -49,8 +49,8 @@ func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topolo return 0, errors.New("Unknown Replication Type!") } func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) { - vg.accessLock.Lock() - defer vg.accessLock.Unlock() + vg.accessLock.Lock() + defer vg.accessLock.Unlock() counter = 0 switch repType { @@ -182,7 +182,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { for _, server := range servers { if err := operation.AllocateVolume(server, vid, repType); err == nil { - vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version:storage.CurrentVersion} + vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(&vi, server) fmt.Println("Created Volume", vid, "on", server) diff --git a/weed-fs/src/pkg/replication/volume_growth_test.go b/weed-fs/src/pkg/replication/volume_growth_test.go index 51e47c193..659564c64 100644 --- a/weed-fs/src/pkg/replication/volume_growth_test.go +++ b/weed-fs/src/pkg/replication/volume_growth_test.go @@ -5,7 +5,7 @@ import ( "fmt" "math/rand" "pkg/storage" - "pkg/topology" + "pkg/topology" "testing" "time" ) @@ -80,7 +80,7 @@ func setup(topologyLayout string) *topology.Topology { fmt.Println("data:", data) //need to connect all nodes first before server adding volumes - topo := topology.NewTopology("mynetwork","/etc/weedfs/weedfs.conf","/tmp","testing",32*1024, 5) + topo := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", "/tmp", "testing", 32*1024, 5) mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := topology.NewDataCenter(dcKey) @@ -96,7 +96,7 @@ func setup(topologyLayout string) *topology.Topology { rack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) - vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion} + vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) @@ -121,10 +121,9 @@ func TestRemoveDataCenter(t *testing.T) { func TestReserveOneVolume(t *testing.T) { topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - vg:=&VolumeGrowth{copy1factor:3,copy2factor:2,copy3factor:1,copyAll:4} - if c, e := vg.GrowByCountAndType(1,storage.Copy000,topo);e==nil{ - t.Log("reserved", c) - } + rand.Seed(time.Now().UnixNano()) + vg := &VolumeGrowth{copy1factor: 3, copy2factor: 2, copy3factor: 1, copyAll: 4} + if c, e := vg.GrowByCountAndType(1, storage.Copy000, topo); e == nil { + t.Log("reserved", c) + } } - diff --git a/weed-fs/src/pkg/sequence/sequence.go b/weed-fs/src/pkg/sequence/sequence.go index bfdf1b368..c85289468 100644 --- a/weed-fs/src/pkg/sequence/sequence.go +++ b/weed-fs/src/pkg/sequence/sequence.go @@ -1,11 +1,11 @@ package sequence import ( - "encoding/gob" - "os" - "path" - "sync" + "encoding/gob" "log" + "os" + "path" + "sync" ) const ( @@ -27,21 +27,21 @@ type SequencerImpl struct { } func NewSequencer(dirname string, filename string) (m *SequencerImpl) { - m = &SequencerImpl{dir: dirname, fileName: filename} + m = &SequencerImpl{dir: dirname, fileName: filename} - seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) - if se != nil { - m.FileIdSequence = FileIdSaveInterval - log.Println("Setting file id sequence", m.FileIdSequence) - } else { - decoder := gob.NewDecoder(seqFile) - defer seqFile.Close() - decoder.Decode(&m.FileIdSequence) - log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) - //in case the server stops between intervals - m.FileIdSequence += FileIdSaveInterval - } - return + seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) + if se != nil { + m.FileIdSequence = FileIdSaveInterval + log.Println("Setting file id sequence", m.FileIdSequence) + } else { + decoder := gob.NewDecoder(seqFile) + defer seqFile.Close() + decoder.Decode(&m.FileIdSequence) + log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) + //in case the server stops between intervals + m.FileIdSequence += FileIdSaveInterval + } + return } //count should be 1 or more @@ -60,12 +60,12 @@ func (m *SequencerImpl) NextFileId(count int) (uint64, int) { return m.FileIdSequence - m.fileIdCounter, count } func (m *SequencerImpl) saveSequence() { - log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) - seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) - if e != nil { - log.Fatalf("Sequence File Save [ERROR] %s\n", e) - } - defer seqFile.Close() - encoder := gob.NewEncoder(seqFile) - encoder.Encode(m.FileIdSequence) + log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) + seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) + if e != nil { + log.Fatalf("Sequence File Save [ERROR] %s\n", e) + } + defer seqFile.Close() + encoder := gob.NewEncoder(seqFile) + encoder.Encode(m.FileIdSequence) } diff --git a/weed-fs/src/pkg/storage/compact_map.go b/weed-fs/src/pkg/storage/compact_map.go index 7365022ea..90ed42198 100644 --- a/weed-fs/src/pkg/storage/compact_map.go +++ b/weed-fs/src/pkg/storage/compact_map.go @@ -175,3 +175,23 @@ func (cm *CompactMap) Peek() { } } } + +// iterate over the keys by calling iterate on each key till error is returned +func (cm *CompactMap) Walk(pedestrian func(*NeedleValue) error) (err error) { + var i int + for _, cs := range cm.list { + for key := cs.start; key < cs.end; key++ { + if i = cs.binarySearchValues(key); i >= 0 { + if err = pedestrian(&cs.values[i]); err != nil { + return + } + } + } + for _, val := range cs.overflow { + if err = pedestrian(val); err != nil { + return err + } + } + } + return nil +} diff --git a/weed-fs/src/pkg/storage/compact_map_perf_test.go b/weed-fs/src/pkg/storage/compact_map_perf_test.go index 2e2227279..cfa521fc8 100644 --- a/weed-fs/src/pkg/storage/compact_map_perf_test.go +++ b/weed-fs/src/pkg/storage/compact_map_perf_test.go @@ -1,43 +1,43 @@ package storage import ( + "log" + "os" + "pkg/util" "testing" - "log" - "os" - "pkg/util" ) func TestMemoryUsage(t *testing.T) { - indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644) - if ie != nil { - log.Fatalln(ie) - } - LoadNewNeedleMap(indexFile) - + indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644) + if ie != nil { + log.Fatalln(ie) + } + LoadNewNeedleMap(indexFile) + } func LoadNewNeedleMap(file *os.File) CompactMap { - m := NewCompactMap() - bytes := make([]byte, 16*1024) - count, e := file.Read(bytes) - if count > 0 { - fstat, _ := file.Stat() - log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) - } - for count > 0 && e == nil { - for i := 0; i < count; i += 16 { - key := util.BytesToUint64(bytes[i : i+8]) - offset := util.BytesToUint32(bytes[i+8 : i+12]) - size := util.BytesToUint32(bytes[i+12 : i+16]) - if offset > 0 { - m.Set(Key(key), offset, size) - } else { - //delete(m, key) - } - } + m := NewCompactMap() + bytes := make([]byte, 16*1024) + count, e := file.Read(bytes) + if count > 0 { + fstat, _ := file.Stat() + log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) + } + for count > 0 && e == nil { + for i := 0; i < count; i += 16 { + key := util.BytesToUint64(bytes[i : i+8]) + offset := util.BytesToUint32(bytes[i+8 : i+12]) + size := util.BytesToUint32(bytes[i+12 : i+16]) + if offset > 0 { + m.Set(Key(key), offset, size) + } else { + //delete(m, key) + } + } - count, e = file.Read(bytes) - } - return m + count, e = file.Read(bytes) + } + return m } diff --git a/weed-fs/src/pkg/storage/compact_map_test.go b/weed-fs/src/pkg/storage/compact_map_test.go index c05515b29..e76e9578d 100644 --- a/weed-fs/src/pkg/storage/compact_map_test.go +++ b/weed-fs/src/pkg/storage/compact_map_test.go @@ -18,42 +18,42 @@ func TestXYZ(t *testing.T) { m.Set(Key(i), i+11, i+5) } -// for i := uint32(0); i < 100; i++ { -// if v := m.Get(Key(i)); v != nil { -// println(i, "=", v.Key, v.Offset, v.Size) -// } -// } - + // for i := uint32(0); i < 100; i++ { + // if v := m.Get(Key(i)); v != nil { + // println(i, "=", v.Key, v.Offset, v.Size) + // } + // } + for i := uint32(0); i < 10*batch; i++ { - v, ok := m.Get(Key(i)) + v, ok := m.Get(Key(i)) if i%3 == 0 { - if !ok { - t.Fatal("key", i, "missing!") - } + if !ok { + t.Fatal("key", i, "missing!") + } if v.Size != i+5 { t.Fatal("key", i, "size", v.Size) } } else if i%37 == 0 { - if ok && v.Size > 0 { + if ok && v.Size > 0 { t.Fatal("key", i, "should have been deleted needle value", v) } } else if i%2 == 0 { - if v.Size != i { + if v.Size != i { t.Fatal("key", i, "size", v.Size) } } } for i := uint32(10 * batch); i < 100*batch; i++ { - v, ok := m.Get(Key(i)) + v, ok := m.Get(Key(i)) if i%37 == 0 { - if ok && v.Size > 0 { - t.Fatal("key", i, "should have been deleted needle value", v) - } + if ok && v.Size > 0 { + t.Fatal("key", i, "should have been deleted needle value", v) + } } else if i%2 == 0 { - if v==nil{ - t.Fatal("key", i, "missing") - } + if v == nil { + t.Fatal("key", i, "missing") + } if v.Size != i { t.Fatal("key", i, "size", v.Size) } diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index e01c27630..9ce38c1f2 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -98,3 +98,8 @@ func (nm *NeedleMap) Close() { func (nm *NeedleMap) ContentSize() uint64 { return nm.fileByteCounter } + +// iterate through all needles using the iterator function +func (nm *NeedleMap) Walk(pedestrian func(*NeedleValue) error) (err error) { + return nm.m.Walk(pedestrian) +} diff --git a/weed-fs/src/pkg/storage/needle_read_write.go b/weed-fs/src/pkg/storage/needle_read_write.go index 00844bad3..fdb09a3c6 100644 --- a/weed-fs/src/pkg/storage/needle_read_write.go +++ b/weed-fs/src/pkg/storage/needle_read_write.go @@ -2,10 +2,10 @@ package storage import ( "errors" + "fmt" "io" "os" "pkg/util" - "fmt" ) func (n *Needle) Append(w io.Writer, version Version) uint32 { @@ -62,7 +62,8 @@ func (n *Needle) Append(w io.Writer, version Version) uint32 { return n.Size } func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { - if version == Version1 { + switch version { + case Version1: bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) ret, e := r.Read(bytes) n.readNeedleHeader(bytes) @@ -72,7 +73,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { return 0, errors.New("CRC error! Data On Disk Corrupted!") } return ret, e - } else if version == Version2 { + case Version2: if size == 0 { return 0, nil } @@ -95,7 +96,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { } return ret, e } - return 0, errors.New("Unsupported Version!") + return 0, fmt.Errorf("Unsupported Version! (%d)", version) } func (n *Needle) readNeedleHeader(bytes []byte) { n.Cookie = util.BytesToUint32(bytes[0:4]) diff --git a/weed-fs/src/pkg/storage/replication_type.go b/weed-fs/src/pkg/storage/replication_type.go index 86a9d219d..0902d1016 100644 --- a/weed-fs/src/pkg/storage/replication_type.go +++ b/weed-fs/src/pkg/storage/replication_type.go @@ -1,123 +1,123 @@ package storage import ( - "errors" + "errors" ) type ReplicationType string const ( - Copy000 = ReplicationType("000") // single copy - Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center - Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center - Copy100 = ReplicationType("100") // 2 copies, each on different data center - Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center - Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center - LengthRelicationType = 6 - CopyNil = ReplicationType(255) // nil value + Copy000 = ReplicationType("000") // single copy + Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center + Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center + Copy100 = ReplicationType("100") // 2 copies, each on different data center + Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center + Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center + LengthRelicationType = 6 + CopyNil = ReplicationType(255) // nil value ) func NewReplicationTypeFromString(t string) (ReplicationType, error) { - switch t { - case "000": - return Copy000, nil - case "001": - return Copy001, nil - case "010": - return Copy010, nil - case "100": - return Copy100, nil - case "110": - return Copy110, nil - case "200": - return Copy200, nil - } - return Copy000, errors.New("Unknown Replication Type:"+t) + switch t { + case "000": + return Copy000, nil + case "001": + return Copy001, nil + case "010": + return Copy010, nil + case "100": + return Copy100, nil + case "110": + return Copy110, nil + case "200": + return Copy200, nil + } + return Copy000, errors.New("Unknown Replication Type:" + t) } func NewReplicationTypeFromByte(b byte) (ReplicationType, error) { - switch b { - case byte(000): - return Copy000, nil - case byte(001): - return Copy001, nil - case byte(010): - return Copy010, nil - case byte(100): - return Copy100, nil - case byte(110): - return Copy110, nil - case byte(200): - return Copy200, nil - } - return Copy000, errors.New("Unknown Replication Type:"+string(b)) + switch b { + case byte(000): + return Copy000, nil + case byte(001): + return Copy001, nil + case byte(010): + return Copy010, nil + case byte(100): + return Copy100, nil + case byte(110): + return Copy110, nil + case byte(200): + return Copy200, nil + } + return Copy000, errors.New("Unknown Replication Type:" + string(b)) } func (r *ReplicationType) String() string { - switch *r { - case Copy000: - return "000" - case Copy001: - return "001" - case Copy010: - return "010" - case Copy100: - return "100" - case Copy110: - return "110" - case Copy200: - return "200" - } - return "000" + switch *r { + case Copy000: + return "000" + case Copy001: + return "001" + case Copy010: + return "010" + case Copy100: + return "100" + case Copy110: + return "110" + case Copy200: + return "200" + } + return "000" } func (r *ReplicationType) Byte() byte { - switch *r { - case Copy000: - return byte(000) - case Copy001: - return byte(001) - case Copy010: - return byte(010) - case Copy100: - return byte(100) - case Copy110: - return byte(110) - case Copy200: - return byte(200) - } - return byte(000) + switch *r { + case Copy000: + return byte(000) + case Copy001: + return byte(001) + case Copy010: + return byte(010) + case Copy100: + return byte(100) + case Copy110: + return byte(110) + case Copy200: + return byte(200) + } + return byte(000) } -func (repType ReplicationType)GetReplicationLevelIndex() int { - switch repType { - case Copy000: - return 0 - case Copy001: - return 1 - case Copy010: - return 2 - case Copy100: - return 3 - case Copy110: - return 4 - case Copy200: - return 5 - } - return -1 +func (repType ReplicationType) GetReplicationLevelIndex() int { + switch repType { + case Copy000: + return 0 + case Copy001: + return 1 + case Copy010: + return 2 + case Copy100: + return 3 + case Copy110: + return 4 + case Copy200: + return 5 + } + return -1 } -func (repType ReplicationType)GetCopyCount() int { - switch repType { - case Copy000: - return 1 - case Copy001: - return 2 - case Copy010: - return 2 - case Copy100: - return 2 - case Copy110: - return 3 - case Copy200: - return 3 - } - return 0 +func (repType ReplicationType) GetCopyCount() int { + switch repType { + case Copy000: + return 1 + case Copy001: + return 2 + case Copy010: + return 2 + case Copy100: + return 2 + case Copy110: + return 3 + case Copy200: + return 3 + } + return 0 } diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index a2c5f040b..79cf65e28 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -65,13 +65,13 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error } return e } -func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error { +func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) { if s.volumes[vid] != nil { return errors.New("Volume Id " + vid.String() + " already exists!") } log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType) - s.volumes[vid] = NewVolume(s.dir, vid, replicationType) - return nil + s.volumes[vid], err = NewVolume(s.dir, vid, replicationType) + return err } func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { @@ -107,9 +107,10 @@ func (s *Store) loadExistingVolumes() { base := name[:len(name)-len(".dat")] if vid, err := NewVolumeId(base); err == nil { if s.volumes[vid] == nil { - v := NewVolume(s.dir, vid, CopyNil) - s.volumes[vid] = v - log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size()) + if v, e := NewVolume(s.dir, vid, CopyNil); e == nil { + s.volumes[vid] = v + log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size()) + } } } } diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 0220bf895..5e64d0763 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -24,9 +24,9 @@ type Volume struct { accessLock sync.Mutex } -func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) { +func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) { v = &Volume{dir: dirname, Id: id, replicaType: replicationType} - v.load() + e = v.load() return } func (v *Volume) load() error { @@ -43,6 +43,7 @@ func (v *Volume) load() error { } else { v.maybeWriteSuperBlock() } + // TODO: if .idx not exists, but .cdb exists, then use (but don't load!) that indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) if ie != nil { return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) @@ -79,21 +80,23 @@ func (v *Volume) maybeWriteSuperBlock() { v.dataFile.Write(header) } } -func (v *Volume) readSuperBlock() error { +func (v *Volume) readSuperBlock() (err error) { v.dataFile.Seek(0, 0) header := make([]byte, SuperBlockSize) if _, e := v.dataFile.Read(header); e != nil { return fmt.Errorf("cannot read superblock: %s", e) } - var err error v.version, v.replicaType, err = ParseSuperBlock(header) return err } -func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, e error) { +func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, err error) { version = Version(header[0]) - var err error + if version == 0 { + err = errors.New("Zero version impossible - bad superblock!") + return + } if replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { - e = fmt.Errorf("cannot read replica type: %s", err) + err = fmt.Errorf("cannot read replica type: %s", err) } return } @@ -221,3 +224,39 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string) func (v *Volume) ContentSize() uint64 { return v.nm.fileByteCounter } + +// Walk over the contained needles (call the function with each NeedleValue till error is returned) +func (v *Volume) WalkValues(pedestrian func(*Needle) error) error { + pedplus := func(nv *NeedleValue) (err error) { + n := new(Needle) + if nv.Offset > 0 { + v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0) + if _, err = n.Read(v.dataFile, nv.Size, v.version); err != nil { + return + } + if err = pedestrian(n); err != nil { + return + } + } + return nil + } + return v.nm.Walk(pedplus) +} + +// Walk over the keys +func (v *Volume) WalkKeys(pedestrian func(Key) error) error { + pedplus := func(nv *NeedleValue) (err error) { + if nv.Offset > 0 && nv.Key > 0 { + if err = pedestrian(nv.Key); err != nil { + return + } + } + return nil + } + return v.nm.Walk(pedplus) +} + +func (v *Volume) String() string { + return fmt.Sprintf("%d@%s:v%d:r%s", v.Id, v.dataFile.Name(), + v.Version(), v.replicaType) +} diff --git a/weed-fs/src/pkg/storage/volume_id.go b/weed-fs/src/pkg/storage/volume_id.go index bf7396673..0333c6cf0 100644 --- a/weed-fs/src/pkg/storage/volume_id.go +++ b/weed-fs/src/pkg/storage/volume_id.go @@ -1,17 +1,18 @@ package storage import ( - "strconv" + "strconv" ) type VolumeId uint32 -func NewVolumeId(vid string) (VolumeId,error) { - volumeId, err := strconv.ParseUint(vid, 10, 64) - return VolumeId(volumeId), err + +func NewVolumeId(vid string) (VolumeId, error) { + volumeId, err := strconv.ParseUint(vid, 10, 64) + return VolumeId(volumeId), err } -func (vid *VolumeId) String() string{ - return strconv.FormatUint(uint64(*vid), 10) +func (vid *VolumeId) String() string { + return strconv.FormatUint(uint64(*vid), 10) } -func (vid *VolumeId) Next() VolumeId{ - return VolumeId(uint32(*vid)+1) +func (vid *VolumeId) Next() VolumeId { + return VolumeId(uint32(*vid) + 1) } diff --git a/weed-fs/src/pkg/storage/volume_version.go b/weed-fs/src/pkg/storage/volume_version.go index da91ad038..9702ae904 100644 --- a/weed-fs/src/pkg/storage/volume_version.go +++ b/weed-fs/src/pkg/storage/volume_version.go @@ -1,12 +1,11 @@ package storage -import ( -) +import () type Version uint8 const ( - Version1 = Version(1) - Version2 = Version(2) - CurrentVersion = Version2 + Version1 = Version(1) + Version2 = Version(2) + CurrentVersion = Version2 ) diff --git a/weed-fs/src/pkg/topology/configuration_test.go b/weed-fs/src/pkg/topology/configuration_test.go index 5542d1503..35d82c058 100644 --- a/weed-fs/src/pkg/topology/configuration_test.go +++ b/weed-fs/src/pkg/topology/configuration_test.go @@ -30,13 +30,13 @@ func TestLoadConfiguration(t *testing.T) { ` c, err := NewConfiguration([]byte(confContent)) - - fmt.Printf("%s\n", c) - if err!=nil{ - t.Fatalf("unmarshal error:%s",err.Error()) + + fmt.Printf("%s\n", c) + if err != nil { + t.Fatalf("unmarshal error:%s", err.Error()) } - + if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" { - t.Fatalf("unmarshal error:%s",c) + t.Fatalf("unmarshal error:%s", c) } } diff --git a/weed-fs/src/pkg/topology/data_center.go b/weed-fs/src/pkg/topology/data_center.go index 2ec68fd4b..a3b2b7d13 100644 --- a/weed-fs/src/pkg/topology/data_center.go +++ b/weed-fs/src/pkg/topology/data_center.go @@ -1,7 +1,6 @@ package topology -import ( -) +import () type DataCenter struct { NodeImpl @@ -12,31 +11,31 @@ func NewDataCenter(id string) *DataCenter { dc.id = NodeId(id) dc.nodeType = "DataCenter" dc.children = make(map[NodeId]Node) - dc.NodeImpl.value = dc + dc.NodeImpl.value = dc return dc } func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { - for _, c := range dc.Children() { - rack := c.(*Rack) - if string(rack.Id()) == rackName { - return rack - } - } - rack := NewRack(rackName) - dc.LinkChildNode(rack) - return rack + for _, c := range dc.Children() { + rack := c.(*Rack) + if string(rack.Id()) == rackName { + return rack + } + } + rack := NewRack(rackName) + dc.LinkChildNode(rack) + return rack } -func (dc *DataCenter) ToMap() interface{}{ - m := make(map[string]interface{}) - m["Max"] = dc.GetMaxVolumeCount() - m["Free"] = dc.FreeSpace() - var racks []interface{} - for _, c := range dc.Children() { - rack := c.(*Rack) - racks = append(racks, rack.ToMap()) - } - m["Racks"] = racks - return m +func (dc *DataCenter) ToMap() interface{} { + m := make(map[string]interface{}) + m["Max"] = dc.GetMaxVolumeCount() + m["Free"] = dc.FreeSpace() + var racks []interface{} + for _, c := range dc.Children() { + rack := c.(*Rack) + racks = append(racks, rack.ToMap()) + } + m["Racks"] = racks + return m } diff --git a/weed-fs/src/pkg/topology/node_list.go b/weed-fs/src/pkg/topology/node_list.go index 1d9e1891a..3115e0213 100644 --- a/weed-fs/src/pkg/topology/node_list.go +++ b/weed-fs/src/pkg/topology/node_list.go @@ -37,14 +37,14 @@ func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) { list = append(list, n) } } - if n > len(list){ - return nil,false + if n > len(list) { + return nil, false } for i := n; i > 0; i-- { - r := rand.Intn(i) - t := list[r] - list[r] = list[i-1] - list[i-1] = t + r := rand.Intn(i) + t := list[r] + list[r] = list[i-1] + list[i-1] = t } return list[len(list)-n:], true } diff --git a/weed-fs/src/pkg/topology/node_list_test.go b/weed-fs/src/pkg/topology/node_list_test.go index 0d16a0526..2fb4fa970 100644 --- a/weed-fs/src/pkg/topology/node_list_test.go +++ b/weed-fs/src/pkg/topology/node_list_test.go @@ -1,39 +1,39 @@ package topology import ( + _ "fmt" "strconv" "testing" - _ "fmt" ) func TestXYZ(t *testing.T) { - topo := NewTopology("topo","/etc/weed.conf", "/tmp","test",234,5) + topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5) for i := 0; i < 5; i++ { dc := NewDataCenter("dc" + strconv.Itoa(i)) dc.activeVolumeCount = i dc.maxVolumeCount = 5 topo.LinkChildNode(dc) } - nl := NewNodeList(topo.Children(),nil) + nl := NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(1) - if !ret || len(picked)!=1 { - t.Errorf("need to randomly pick 1 node") - } - - picked, ret = nl.RandomlyPickN(4) - if !ret || len(picked)!=4 { - t.Errorf("need to randomly pick 4 nodes") + picked, ret := nl.RandomlyPickN(1) + if !ret || len(picked) != 1 { + t.Errorf("need to randomly pick 1 node") } - picked, ret = nl.RandomlyPickN(5) - if !ret || len(picked)!=5 { - t.Errorf("need to randomly pick 5 nodes") - } + picked, ret = nl.RandomlyPickN(4) + if !ret || len(picked) != 4 { + t.Errorf("need to randomly pick 4 nodes") + } - picked, ret = nl.RandomlyPickN(6) - if ret || len(picked)!=0 { - t.Errorf("can not randomly pick 6 nodes:", ret, picked) - } + picked, ret = nl.RandomlyPickN(5) + if !ret || len(picked) != 5 { + t.Errorf("need to randomly pick 5 nodes") + } + + picked, ret = nl.RandomlyPickN(6) + if ret || len(picked) != 0 { + t.Errorf("can not randomly pick 6 nodes:", ret, picked) + } } diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go index 1555ef682..acc34417a 100644 --- a/weed-fs/src/pkg/topology/rack.go +++ b/weed-fs/src/pkg/topology/rack.go @@ -19,13 +19,13 @@ func NewRack(id string) *Rack { } func (r *Rack) FindDataNode(ip string, port int) *DataNode { - for _, c := range r.Children() { - dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - return dn - } - } - return nil + for _, c := range r.Children() { + dn := c.(*DataNode) + if dn.MatchLocation(ip, port) { + return dn + } + } + return nil } func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { for _, c := range r.Children() { diff --git a/weed-fs/src/pkg/topology/topo_test.go b/weed-fs/src/pkg/topology/topo_test.go index 83356b38c..71a901c8e 100644 --- a/weed-fs/src/pkg/topology/topo_test.go +++ b/weed-fs/src/pkg/topology/topo_test.go @@ -78,7 +78,7 @@ func setup(topologyLayout string) *Topology { } //need to connect all nodes first before server adding volumes - topo := NewTopology("mynetwork","/etc/weed.conf","/tmp","test",234,5) + topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5) mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := NewDataCenter(dcKey) @@ -94,7 +94,7 @@ func setup(topologyLayout string) *Topology { rack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) - vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion} + vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) @@ -119,9 +119,9 @@ func TestRemoveDataCenter(t *testing.T) { func TestReserveOneVolume(t *testing.T) { topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - rand.Seed(1) + rand.Seed(time.Now().UnixNano()) + rand.Seed(1) ret, node, vid := topo.RandomlyReserveOneVolume() - fmt.Println("assigned :", ret, ", node :", node,", volume id:", vid) + fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid) } diff --git a/weed-fs/src/pkg/topology/topology_compact.go b/weed-fs/src/pkg/topology/topology_compact.go index 050fe4cd8..dee6514f4 100644 --- a/weed-fs/src/pkg/topology/topology_compact.go +++ b/weed-fs/src/pkg/topology/topology_compact.go @@ -101,10 +101,10 @@ type VacuumVolumeResult struct { func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) { values := make(url.Values) values.Add("volume", vid.String()) - values.Add("garbageThreshold", garbageThreshold) + values.Add("garbageThreshold", garbageThreshold) jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values) if err != nil { - fmt.Println("parameters:",values) + fmt.Println("parameters:", values) return err, false } var ret VacuumVolumeResult diff --git a/weed-fs/src/pkg/topology/topology_event_handling.go b/weed-fs/src/pkg/topology/topology_event_handling.go index b33b4d768..debedc3d3 100644 --- a/weed-fs/src/pkg/topology/topology_event_handling.go +++ b/weed-fs/src/pkg/topology/topology_event_handling.go @@ -52,14 +52,14 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { vl := t.GetVolumeLayout(v.RepType) vl.SetVolumeUnavailable(dn, v.Id) } - dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) + dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) dn.Parent().UnlinkChildNode(dn.Id()) } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.RepType) + vl := t.GetVolumeLayout(v.RepType) if vl.isWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) } diff --git a/weed-fs/src/pkg/topology/topology_map.go b/weed-fs/src/pkg/topology/topology_map.go index 9ccf08ae3..b416ee943 100644 --- a/weed-fs/src/pkg/topology/topology_map.go +++ b/weed-fs/src/pkg/topology/topology_map.go @@ -1,7 +1,6 @@ package topology -import ( -) +import () func (t *Topology) ToMap() interface{} { m := make(map[string]interface{}) diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go index 64d8cdf43..507a240b5 100644 --- a/weed-fs/src/pkg/topology/volume_location.go +++ b/weed-fs/src/pkg/topology/volume_location.go @@ -15,7 +15,7 @@ func (dnll *VolumeLocationList) Head() *DataNode { } func (dnll *VolumeLocationList) Length() int { - return len(dnll.list) + return len(dnll.list) } func (dnll *VolumeLocationList) Add(loc *DataNode) bool { @@ -29,13 +29,13 @@ func (dnll *VolumeLocationList) Add(loc *DataNode) bool { } func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { - for i, dnl := range dnll.list { - if loc.Ip == dnl.Ip && loc.Port == dnl.Port { - dnll.list = append(dnll.list[:i],dnll.list[i+1:]...) - return true - } - } - return false + for i, dnl := range dnll.list { + if loc.Ip == dnl.Ip && loc.Port == dnl.Port { + dnll.list = append(dnll.list[:i], dnll.list[i+1:]...) + return true + } + } + return false } func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { diff --git a/weed-fs/src/pkg/util/bytes.go b/weed-fs/src/pkg/util/bytes.go index 177da06db..6cc3d7018 100644 --- a/weed-fs/src/pkg/util/bytes.go +++ b/weed-fs/src/pkg/util/bytes.go @@ -1,34 +1,33 @@ package util -func BytesToUint64(b []byte)(v uint64){ - length := uint(len(b)) - for i :=uint(0);i>(i*8)) - } +func Uint64toBytes(b []byte, v uint64) { + for i := uint(0); i < 8; i++ { + b[7-i] = byte(v >> (i * 8)) + } } -func Uint32toBytes(b []byte, v uint32){ - for i :=uint(0);i<4;i++ { - b[3-i] = byte(v>>(i*8)) - } +func Uint32toBytes(b []byte, v uint32) { + for i := uint(0); i < 4; i++ { + b[3-i] = byte(v >> (i * 8)) + } } -func Uint8toBytes(b []byte, v uint8){ - b[0] = byte(v) +func Uint8toBytes(b []byte, v uint8) { + b[0] = byte(v) } - diff --git a/weed-fs/src/pkg/util/parse.go b/weed-fs/src/pkg/util/parse.go index 6a4350e72..930da9522 100644 --- a/weed-fs/src/pkg/util/parse.go +++ b/weed-fs/src/pkg/util/parse.go @@ -1,16 +1,16 @@ package util import ( - "strconv" + "strconv" ) -func ParseInt(text string, defaultValue int) int{ - count, parseError := strconv.ParseUint(text,10,64) - if parseError!=nil { - if len(text)>0{ - return 0 - } - return defaultValue - } - return int(count) +func ParseInt(text string, defaultValue int) int { + count, parseError := strconv.ParseUint(text, 10, 64) + if parseError != nil { + if len(text) > 0 { + return 0 + } + return defaultValue + } + return int(count) } diff --git a/weed-fs/src/pkg/util/post.go b/weed-fs/src/pkg/util/post.go index f643faa6b..6e6ab0003 100644 --- a/weed-fs/src/pkg/util/post.go +++ b/weed-fs/src/pkg/util/post.go @@ -16,7 +16,7 @@ func Post(url string, values url.Values) ([]byte, error) { defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) if err != nil { - log.Println("read post result from", url, err) + log.Println("read post result from", url, err) return nil, err } return b, nil