diff --git a/weed-fs/src/cmd/dump/main.go b/weed-fs/src/cmd/dump/main.go new file mode 100644 index 000000000..e3e151eb7 --- /dev/null +++ b/weed-fs/src/cmd/dump/main.go @@ -0,0 +1,96 @@ +// Copyright Tamás Gulácsi 2013 All rights reserved +// Use of this source is governed by the same rules as the weed-fs library. +// If this would be ambigous, than Apache License 2.0 has to be used. +// +// dump dumps the files of a volume to tar or unique files. +// Each file will have id#mimetype#original_name file format + +package main + +import ( + "archive/tar" + "bytes" + "flag" + "fmt" + // "io" + "log" + "os" + "pkg/storage" + "strings" + "time" +) + +var ( + volumePath = flag.String("dir", "/tmp", "volume directory") + volumeId = flag.Int("id", 0, "volume Id") + dest = flag.String("out", "-", "output path. Produces tar if path ends with .tar; creates files otherwise.") + tarFh *tar.Writer + tarHeader tar.Header + counter int +) + +func main() { + var err error + + flag.Parse() + + if *dest == "-" { + *dest = "" + } + if *dest == "" || strings.HasSuffix(*dest, ".tar") { + var fh *os.File + if *dest == "" { + fh = os.Stdout + } else { + if fh, err = os.Create(*dest); err != nil { + log.Printf("cannot open output tar %s: %s", *dest, err) + return + } + } + defer fh.Close() + tarFh = tar.NewWriter(fh) + defer tarFh.Close() + t := time.Now() + tarHeader = tar.Header{Mode: 0644, + ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(), + Typeflag: tar.TypeReg, + AccessTime: t, ChangeTime: t} + } + + v, err := storage.NewVolume(*volumePath, storage.VolumeId(*volumeId), storage.CopyNil) + if v == nil || v.Version() == 0 || err != nil { + log.Printf("cannot load volume %d from %s (%s): %s", *volumeId, *volumePath, v, err) + return + } + log.Printf("volume: %s (ver. %d)", v, v.Version()) + if err := v.WalkValues(walker); err != nil { + log.Printf("error while walking: %s", err) + return + } + + log.Printf("%d files written.", counter) +} + +func walker(n *storage.Needle) (err error) { + // log.Printf("Id=%d Size=%d Name=%s mime=%s", n.Id, n.Size, n.Name, n.Mime) + nm := fmt.Sprintf("%d#%s#%s", n.Id, bytes.Replace(n.Mime, []byte{'/'}, []byte{'_'}, -1), n.Name) + // log.Print(nm) + if tarFh != nil { + tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data)) + if err = tarFh.WriteHeader(&tarHeader); err != nil { + return err + } + _, err = tarFh.Write(n.Data) + } else { + if fh, e := os.Create(*dest + "/" + nm); e != nil { + return e + } else { + defer fh.Close() + _, err = fh.Write(n.Data) + } + } + if err == nil { + counter++ + } + return +} diff --git a/weed-fs/src/cmd/weed/command.go b/weed-fs/src/cmd/weed/command.go index 8c725cafb..4d68ff151 100644 --- a/weed-fs/src/cmd/weed/command.go +++ b/weed-fs/src/cmd/weed/command.go @@ -1,53 +1,52 @@ package main import ( - "flag" - "fmt" - "os" - "strings" + "flag" + "fmt" + "os" + "strings" ) type Command struct { - // Run runs the command. - // The args are the arguments after the command name. - Run func(cmd *Command, args []string) bool + // Run runs the command. + // The args are the arguments after the command name. + Run func(cmd *Command, args []string) bool - // UsageLine is the one-line usage message. - // The first word in the line is taken to be the command name. - UsageLine string + // UsageLine is the one-line usage message. + // The first word in the line is taken to be the command name. + UsageLine string - // Short is the short description shown in the 'go help' output. - Short string + // Short is the short description shown in the 'go help' output. + Short string - // Long is the long message shown in the 'go help ' output. - Long string - - // Flag is a set of flags specific to this command. - Flag flag.FlagSet + // Long is the long message shown in the 'go help ' output. + Long string + // Flag is a set of flags specific to this command. + Flag flag.FlagSet } // Name returns the command's name: the first word in the usage line. func (c *Command) Name() string { - name := c.UsageLine - i := strings.Index(name, " ") - if i >= 0 { - name = name[:i] - } - return name + name := c.UsageLine + i := strings.Index(name, " ") + if i >= 0 { + name = name[:i] + } + return name } func (c *Command) Usage() { - fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) - fmt.Fprintf(os.Stderr, "Default Usage:\n") - c.Flag.PrintDefaults() - fmt.Fprintf(os.Stderr, "Description:\n") - fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) - os.Exit(2) + fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) + fmt.Fprintf(os.Stderr, "Default Usage:\n") + c.Flag.PrintDefaults() + fmt.Fprintf(os.Stderr, "Description:\n") + fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) + os.Exit(2) } // Runnable reports whether the command can be run; otherwise // it is a documentation pseudo-command such as importpath. func (c *Command) Runnable() bool { - return c.Run != nil + return c.Run != nil } diff --git a/weed-fs/src/cmd/weed/shell.go b/weed-fs/src/cmd/weed/shell.go index 78a4b9eb1..daf0b7e1f 100644 --- a/weed-fs/src/cmd/weed/shell.go +++ b/weed-fs/src/cmd/weed/shell.go @@ -1,54 +1,53 @@ package main import ( - "bufio" - "os" - "fmt" + "bufio" + "fmt" + "os" ) func init() { - cmdShell.Run = runShell // break init cycle + cmdShell.Run = runShell // break init cycle } var cmdShell = &Command{ - UsageLine: "shell", - Short: "run interactive commands, now just echo", - Long: `run interactive commands. + UsageLine: "shell", + Short: "run interactive commands, now just echo", + Long: `run interactive commands. `, } -var ( -) +var () func runShell(command *Command, args []string) bool { - r := bufio.NewReader(os.Stdin) - o := bufio.NewWriter(os.Stdout) - e := bufio.NewWriter(os.Stderr) - prompt := func () { - o.WriteString("> ") - o.Flush() - }; - readLine := func () string { - ret, err := r.ReadString('\n') - if err != nil { - fmt.Fprint(e,err); - os.Exit(1) - } - return ret - } - execCmd := func (cmd string) int { - if cmd != "" { - o.WriteString(cmd) - } - return 0 - } + r := bufio.NewReader(os.Stdin) + o := bufio.NewWriter(os.Stdout) + e := bufio.NewWriter(os.Stderr) + prompt := func() { + o.WriteString("> ") + o.Flush() + } + readLine := func() string { + ret, err := r.ReadString('\n') + if err != nil { + fmt.Fprint(e, err) + os.Exit(1) + } + return ret + } + execCmd := func(cmd string) int { + if cmd != "" { + o.WriteString(cmd) + } + return 0 + } - cmd := "" - for { - prompt() - cmd = readLine() - execCmd(cmd) - } - return true + cmd := "" + for { + prompt() + cmd = readLine() + execCmd(cmd) + } + return true } diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go index e25930b5d..5707fda56 100644 --- a/weed-fs/src/cmd/weed/upload.go +++ b/weed-fs/src/cmd/weed/upload.go @@ -67,7 +67,7 @@ func upload(filename string, server string, fid string) (int, error) { } ret, e := operation.Upload("http://"+server+"/"+fid, filename, fh) if e != nil { - return 0, e + return 0, e } return ret.Size, e } diff --git a/weed-fs/src/cmd/weed/weed.go b/weed-fs/src/cmd/weed/weed.go index 232520e75..e2eb41ced 100644 --- a/weed-fs/src/cmd/weed/weed.go +++ b/weed-fs/src/cmd/weed/weed.go @@ -175,9 +175,9 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { w.Header().Set("Content-Type", "application/javascript") var bytes []byte if r.FormValue("pretty") != "" { - bytes, _ = json.MarshalIndent(obj, "", " ") + bytes, _ = json.MarshalIndent(obj, "", " ") } else { - bytes, _ = json.Marshal(obj) + bytes, _ = json.Marshal(obj) } callback := r.FormValue("callback") if callback == "" { diff --git a/weed-fs/src/pkg/directory/file_id.go b/weed-fs/src/pkg/directory/file_id.go index 9ce556580..cd4204f32 100644 --- a/weed-fs/src/pkg/directory/file_id.go +++ b/weed-fs/src/pkg/directory/file_id.go @@ -3,8 +3,8 @@ package directory import ( "encoding/hex" "pkg/storage" - "strings" "pkg/util" + "strings" ) type FileId struct { @@ -16,14 +16,14 @@ type FileId struct { func NewFileId(VolumeId storage.VolumeId, Key uint64, Hashcode uint32) *FileId { return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode} } -func ParseFileId(fid string) *FileId{ +func ParseFileId(fid string) *FileId { a := strings.Split(fid, ",") if len(a) != 2 { println("Invalid fid", fid, ", split length", len(a)) return nil } vid_string, key_hash_string := a[0], a[1] - volumeId, _ := storage.NewVolumeId(vid_string) + volumeId, _ := storage.NewVolumeId(vid_string) key, hash := storage.ParseKeyHash(key_hash_string) return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash} } diff --git a/weed-fs/src/pkg/operation/allocate_volume.go b/weed-fs/src/pkg/operation/allocate_volume.go index 6a3512896..c93ccfb62 100644 --- a/weed-fs/src/pkg/operation/allocate_volume.go +++ b/weed-fs/src/pkg/operation/allocate_volume.go @@ -1,32 +1,32 @@ package operation import ( - "encoding/json" - "errors" - "net/url" - "pkg/storage" - "pkg/topology" - "pkg/util" + "encoding/json" + "errors" + "net/url" + "pkg/storage" + "pkg/topology" + "pkg/util" ) type AllocateVolumeResult struct { - Error string + Error string } func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error { - values := make(url.Values) - values.Add("volume", vid.String()) - values.Add("replicationType", repType.String()) - jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) - if err != nil { - return err - } - var ret AllocateVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err - } - if ret.Error != "" { - return errors.New(ret.Error) - } - return nil + values := make(url.Values) + values.Add("volume", vid.String()) + values.Add("replicationType", repType.String()) + jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) + if err != nil { + return err + } + var ret AllocateVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil } diff --git a/weed-fs/src/pkg/operation/delete_content.go b/weed-fs/src/pkg/operation/delete_content.go index aeab9c3ac..2bdb49651 100644 --- a/weed-fs/src/pkg/operation/delete_content.go +++ b/weed-fs/src/pkg/operation/delete_content.go @@ -1,8 +1,8 @@ package operation import ( - "net/http" "log" + "net/http" ) func Delete(url string) error { diff --git a/weed-fs/src/pkg/operation/lookup_volume_id.go b/weed-fs/src/pkg/operation/lookup_volume_id.go index c46c6670e..50a6d91e6 100644 --- a/weed-fs/src/pkg/operation/lookup_volume_id.go +++ b/weed-fs/src/pkg/operation/lookup_volume_id.go @@ -1,38 +1,38 @@ package operation import ( - "encoding/json" - "net/url" - "pkg/storage" - "pkg/util" - _ "fmt" - "errors" + "encoding/json" + "errors" + _ "fmt" + "net/url" + "pkg/storage" + "pkg/util" ) type Location struct { - Url string "url" - PublicUrl string "publicUrl" + Url string "url" + PublicUrl string "publicUrl" } type LookupResult struct { - Locations []Location "locations" - Error string "error" + Locations []Location "locations" + Error string "error" } //TODO: Add a caching for vid here func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) { - values := make(url.Values) - values.Add("volumeId", vid.String()) - jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) - if err != nil { - return nil, err - } - var ret LookupResult - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, err - } - if ret.Error != ""{ - return nil, errors.New(ret.Error) - } - return &ret, nil + values := make(url.Values) + values.Add("volumeId", vid.String()) + jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) + if err != nil { + return nil, err + } + var ret LookupResult + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, err + } + if ret.Error != "" { + return nil, errors.New(ret.Error) + } + return &ret, nil } diff --git a/weed-fs/src/pkg/operation/upload_content.go b/weed-fs/src/pkg/operation/upload_content.go index 7ed74e02f..0bdb697da 100644 --- a/weed-fs/src/pkg/operation/upload_content.go +++ b/weed-fs/src/pkg/operation/upload_content.go @@ -3,18 +3,18 @@ package operation import ( "bytes" "encoding/json" + "errors" _ "fmt" "io" "io/ioutil" - "log" + "log" "mime/multipart" "net/http" - "errors" ) type UploadResult struct { - Size int - Error string + Size int + Error string } func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) { @@ -26,7 +26,7 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, body_writer.Close() resp, err := http.Post(uploadUrl, content_type, body_buf) if err != nil { - log.Println("failing to upload to", uploadUrl) + log.Println("failing to upload to", uploadUrl) return nil, err } defer resp.Body.Close() @@ -37,11 +37,11 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, var ret UploadResult err = json.Unmarshal(resp_body, &ret) if err != nil { - log.Println("failing to read upload resonse", uploadUrl, resp_body) - return nil, err + log.Println("failing to read upload resonse", uploadUrl, resp_body) + return nil, err } - if ret.Error != ""{ - return nil, errors.New(ret.Error) + if ret.Error != "" { + return nil, errors.New(ret.Error) } return &ret, nil } diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go index 7cabf626e..ce0094a7c 100644 --- a/weed-fs/src/pkg/replication/volume_growth.go +++ b/weed-fs/src/pkg/replication/volume_growth.go @@ -7,7 +7,7 @@ import ( "pkg/operation" "pkg/storage" "pkg/topology" - "sync" + "sync" ) /* @@ -24,7 +24,7 @@ type VolumeGrowth struct { copy3factor int copyAll int - accessLock sync.Mutex + accessLock sync.Mutex } func NewDefaultVolumeGrowth() *VolumeGrowth { @@ -49,8 +49,8 @@ func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topolo return 0, errors.New("Unknown Replication Type!") } func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) { - vg.accessLock.Lock() - defer vg.accessLock.Unlock() + vg.accessLock.Lock() + defer vg.accessLock.Unlock() counter = 0 switch repType { @@ -182,7 +182,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { for _, server := range servers { if err := operation.AllocateVolume(server, vid, repType); err == nil { - vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version:storage.CurrentVersion} + vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(&vi, server) fmt.Println("Created Volume", vid, "on", server) diff --git a/weed-fs/src/pkg/replication/volume_growth_test.go b/weed-fs/src/pkg/replication/volume_growth_test.go index 51e47c193..659564c64 100644 --- a/weed-fs/src/pkg/replication/volume_growth_test.go +++ b/weed-fs/src/pkg/replication/volume_growth_test.go @@ -5,7 +5,7 @@ import ( "fmt" "math/rand" "pkg/storage" - "pkg/topology" + "pkg/topology" "testing" "time" ) @@ -80,7 +80,7 @@ func setup(topologyLayout string) *topology.Topology { fmt.Println("data:", data) //need to connect all nodes first before server adding volumes - topo := topology.NewTopology("mynetwork","/etc/weedfs/weedfs.conf","/tmp","testing",32*1024, 5) + topo := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", "/tmp", "testing", 32*1024, 5) mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := topology.NewDataCenter(dcKey) @@ -96,7 +96,7 @@ func setup(topologyLayout string) *topology.Topology { rack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) - vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion} + vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) @@ -121,10 +121,9 @@ func TestRemoveDataCenter(t *testing.T) { func TestReserveOneVolume(t *testing.T) { topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - vg:=&VolumeGrowth{copy1factor:3,copy2factor:2,copy3factor:1,copyAll:4} - if c, e := vg.GrowByCountAndType(1,storage.Copy000,topo);e==nil{ - t.Log("reserved", c) - } + rand.Seed(time.Now().UnixNano()) + vg := &VolumeGrowth{copy1factor: 3, copy2factor: 2, copy3factor: 1, copyAll: 4} + if c, e := vg.GrowByCountAndType(1, storage.Copy000, topo); e == nil { + t.Log("reserved", c) + } } - diff --git a/weed-fs/src/pkg/sequence/sequence.go b/weed-fs/src/pkg/sequence/sequence.go index bfdf1b368..c85289468 100644 --- a/weed-fs/src/pkg/sequence/sequence.go +++ b/weed-fs/src/pkg/sequence/sequence.go @@ -1,11 +1,11 @@ package sequence import ( - "encoding/gob" - "os" - "path" - "sync" + "encoding/gob" "log" + "os" + "path" + "sync" ) const ( @@ -27,21 +27,21 @@ type SequencerImpl struct { } func NewSequencer(dirname string, filename string) (m *SequencerImpl) { - m = &SequencerImpl{dir: dirname, fileName: filename} + m = &SequencerImpl{dir: dirname, fileName: filename} - seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) - if se != nil { - m.FileIdSequence = FileIdSaveInterval - log.Println("Setting file id sequence", m.FileIdSequence) - } else { - decoder := gob.NewDecoder(seqFile) - defer seqFile.Close() - decoder.Decode(&m.FileIdSequence) - log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) - //in case the server stops between intervals - m.FileIdSequence += FileIdSaveInterval - } - return + seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) + if se != nil { + m.FileIdSequence = FileIdSaveInterval + log.Println("Setting file id sequence", m.FileIdSequence) + } else { + decoder := gob.NewDecoder(seqFile) + defer seqFile.Close() + decoder.Decode(&m.FileIdSequence) + log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) + //in case the server stops between intervals + m.FileIdSequence += FileIdSaveInterval + } + return } //count should be 1 or more @@ -60,12 +60,12 @@ func (m *SequencerImpl) NextFileId(count int) (uint64, int) { return m.FileIdSequence - m.fileIdCounter, count } func (m *SequencerImpl) saveSequence() { - log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) - seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) - if e != nil { - log.Fatalf("Sequence File Save [ERROR] %s\n", e) - } - defer seqFile.Close() - encoder := gob.NewEncoder(seqFile) - encoder.Encode(m.FileIdSequence) + log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) + seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) + if e != nil { + log.Fatalf("Sequence File Save [ERROR] %s\n", e) + } + defer seqFile.Close() + encoder := gob.NewEncoder(seqFile) + encoder.Encode(m.FileIdSequence) } diff --git a/weed-fs/src/pkg/storage/compact_map.go b/weed-fs/src/pkg/storage/compact_map.go index 7365022ea..90ed42198 100644 --- a/weed-fs/src/pkg/storage/compact_map.go +++ b/weed-fs/src/pkg/storage/compact_map.go @@ -175,3 +175,23 @@ func (cm *CompactMap) Peek() { } } } + +// iterate over the keys by calling iterate on each key till error is returned +func (cm *CompactMap) Walk(pedestrian func(*NeedleValue) error) (err error) { + var i int + for _, cs := range cm.list { + for key := cs.start; key < cs.end; key++ { + if i = cs.binarySearchValues(key); i >= 0 { + if err = pedestrian(&cs.values[i]); err != nil { + return + } + } + } + for _, val := range cs.overflow { + if err = pedestrian(val); err != nil { + return err + } + } + } + return nil +} diff --git a/weed-fs/src/pkg/storage/compact_map_perf_test.go b/weed-fs/src/pkg/storage/compact_map_perf_test.go index 2e2227279..cfa521fc8 100644 --- a/weed-fs/src/pkg/storage/compact_map_perf_test.go +++ b/weed-fs/src/pkg/storage/compact_map_perf_test.go @@ -1,43 +1,43 @@ package storage import ( + "log" + "os" + "pkg/util" "testing" - "log" - "os" - "pkg/util" ) func TestMemoryUsage(t *testing.T) { - indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644) - if ie != nil { - log.Fatalln(ie) - } - LoadNewNeedleMap(indexFile) - + indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644) + if ie != nil { + log.Fatalln(ie) + } + LoadNewNeedleMap(indexFile) + } func LoadNewNeedleMap(file *os.File) CompactMap { - m := NewCompactMap() - bytes := make([]byte, 16*1024) - count, e := file.Read(bytes) - if count > 0 { - fstat, _ := file.Stat() - log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) - } - for count > 0 && e == nil { - for i := 0; i < count; i += 16 { - key := util.BytesToUint64(bytes[i : i+8]) - offset := util.BytesToUint32(bytes[i+8 : i+12]) - size := util.BytesToUint32(bytes[i+12 : i+16]) - if offset > 0 { - m.Set(Key(key), offset, size) - } else { - //delete(m, key) - } - } + m := NewCompactMap() + bytes := make([]byte, 16*1024) + count, e := file.Read(bytes) + if count > 0 { + fstat, _ := file.Stat() + log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) + } + for count > 0 && e == nil { + for i := 0; i < count; i += 16 { + key := util.BytesToUint64(bytes[i : i+8]) + offset := util.BytesToUint32(bytes[i+8 : i+12]) + size := util.BytesToUint32(bytes[i+12 : i+16]) + if offset > 0 { + m.Set(Key(key), offset, size) + } else { + //delete(m, key) + } + } - count, e = file.Read(bytes) - } - return m + count, e = file.Read(bytes) + } + return m } diff --git a/weed-fs/src/pkg/storage/compact_map_test.go b/weed-fs/src/pkg/storage/compact_map_test.go index c05515b29..e76e9578d 100644 --- a/weed-fs/src/pkg/storage/compact_map_test.go +++ b/weed-fs/src/pkg/storage/compact_map_test.go @@ -18,42 +18,42 @@ func TestXYZ(t *testing.T) { m.Set(Key(i), i+11, i+5) } -// for i := uint32(0); i < 100; i++ { -// if v := m.Get(Key(i)); v != nil { -// println(i, "=", v.Key, v.Offset, v.Size) -// } -// } - + // for i := uint32(0); i < 100; i++ { + // if v := m.Get(Key(i)); v != nil { + // println(i, "=", v.Key, v.Offset, v.Size) + // } + // } + for i := uint32(0); i < 10*batch; i++ { - v, ok := m.Get(Key(i)) + v, ok := m.Get(Key(i)) if i%3 == 0 { - if !ok { - t.Fatal("key", i, "missing!") - } + if !ok { + t.Fatal("key", i, "missing!") + } if v.Size != i+5 { t.Fatal("key", i, "size", v.Size) } } else if i%37 == 0 { - if ok && v.Size > 0 { + if ok && v.Size > 0 { t.Fatal("key", i, "should have been deleted needle value", v) } } else if i%2 == 0 { - if v.Size != i { + if v.Size != i { t.Fatal("key", i, "size", v.Size) } } } for i := uint32(10 * batch); i < 100*batch; i++ { - v, ok := m.Get(Key(i)) + v, ok := m.Get(Key(i)) if i%37 == 0 { - if ok && v.Size > 0 { - t.Fatal("key", i, "should have been deleted needle value", v) - } + if ok && v.Size > 0 { + t.Fatal("key", i, "should have been deleted needle value", v) + } } else if i%2 == 0 { - if v==nil{ - t.Fatal("key", i, "missing") - } + if v == nil { + t.Fatal("key", i, "missing") + } if v.Size != i { t.Fatal("key", i, "size", v.Size) } diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index e01c27630..9ce38c1f2 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -98,3 +98,8 @@ func (nm *NeedleMap) Close() { func (nm *NeedleMap) ContentSize() uint64 { return nm.fileByteCounter } + +// iterate through all needles using the iterator function +func (nm *NeedleMap) Walk(pedestrian func(*NeedleValue) error) (err error) { + return nm.m.Walk(pedestrian) +} diff --git a/weed-fs/src/pkg/storage/needle_read_write.go b/weed-fs/src/pkg/storage/needle_read_write.go index 00844bad3..fdb09a3c6 100644 --- a/weed-fs/src/pkg/storage/needle_read_write.go +++ b/weed-fs/src/pkg/storage/needle_read_write.go @@ -2,10 +2,10 @@ package storage import ( "errors" + "fmt" "io" "os" "pkg/util" - "fmt" ) func (n *Needle) Append(w io.Writer, version Version) uint32 { @@ -62,7 +62,8 @@ func (n *Needle) Append(w io.Writer, version Version) uint32 { return n.Size } func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { - if version == Version1 { + switch version { + case Version1: bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) ret, e := r.Read(bytes) n.readNeedleHeader(bytes) @@ -72,7 +73,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { return 0, errors.New("CRC error! Data On Disk Corrupted!") } return ret, e - } else if version == Version2 { + case Version2: if size == 0 { return 0, nil } @@ -95,7 +96,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { } return ret, e } - return 0, errors.New("Unsupported Version!") + return 0, fmt.Errorf("Unsupported Version! (%d)", version) } func (n *Needle) readNeedleHeader(bytes []byte) { n.Cookie = util.BytesToUint32(bytes[0:4]) diff --git a/weed-fs/src/pkg/storage/replication_type.go b/weed-fs/src/pkg/storage/replication_type.go index 86a9d219d..0902d1016 100644 --- a/weed-fs/src/pkg/storage/replication_type.go +++ b/weed-fs/src/pkg/storage/replication_type.go @@ -1,123 +1,123 @@ package storage import ( - "errors" + "errors" ) type ReplicationType string const ( - Copy000 = ReplicationType("000") // single copy - Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center - Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center - Copy100 = ReplicationType("100") // 2 copies, each on different data center - Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center - Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center - LengthRelicationType = 6 - CopyNil = ReplicationType(255) // nil value + Copy000 = ReplicationType("000") // single copy + Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center + Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center + Copy100 = ReplicationType("100") // 2 copies, each on different data center + Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center + Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center + LengthRelicationType = 6 + CopyNil = ReplicationType(255) // nil value ) func NewReplicationTypeFromString(t string) (ReplicationType, error) { - switch t { - case "000": - return Copy000, nil - case "001": - return Copy001, nil - case "010": - return Copy010, nil - case "100": - return Copy100, nil - case "110": - return Copy110, nil - case "200": - return Copy200, nil - } - return Copy000, errors.New("Unknown Replication Type:"+t) + switch t { + case "000": + return Copy000, nil + case "001": + return Copy001, nil + case "010": + return Copy010, nil + case "100": + return Copy100, nil + case "110": + return Copy110, nil + case "200": + return Copy200, nil + } + return Copy000, errors.New("Unknown Replication Type:" + t) } func NewReplicationTypeFromByte(b byte) (ReplicationType, error) { - switch b { - case byte(000): - return Copy000, nil - case byte(001): - return Copy001, nil - case byte(010): - return Copy010, nil - case byte(100): - return Copy100, nil - case byte(110): - return Copy110, nil - case byte(200): - return Copy200, nil - } - return Copy000, errors.New("Unknown Replication Type:"+string(b)) + switch b { + case byte(000): + return Copy000, nil + case byte(001): + return Copy001, nil + case byte(010): + return Copy010, nil + case byte(100): + return Copy100, nil + case byte(110): + return Copy110, nil + case byte(200): + return Copy200, nil + } + return Copy000, errors.New("Unknown Replication Type:" + string(b)) } func (r *ReplicationType) String() string { - switch *r { - case Copy000: - return "000" - case Copy001: - return "001" - case Copy010: - return "010" - case Copy100: - return "100" - case Copy110: - return "110" - case Copy200: - return "200" - } - return "000" + switch *r { + case Copy000: + return "000" + case Copy001: + return "001" + case Copy010: + return "010" + case Copy100: + return "100" + case Copy110: + return "110" + case Copy200: + return "200" + } + return "000" } func (r *ReplicationType) Byte() byte { - switch *r { - case Copy000: - return byte(000) - case Copy001: - return byte(001) - case Copy010: - return byte(010) - case Copy100: - return byte(100) - case Copy110: - return byte(110) - case Copy200: - return byte(200) - } - return byte(000) + switch *r { + case Copy000: + return byte(000) + case Copy001: + return byte(001) + case Copy010: + return byte(010) + case Copy100: + return byte(100) + case Copy110: + return byte(110) + case Copy200: + return byte(200) + } + return byte(000) } -func (repType ReplicationType)GetReplicationLevelIndex() int { - switch repType { - case Copy000: - return 0 - case Copy001: - return 1 - case Copy010: - return 2 - case Copy100: - return 3 - case Copy110: - return 4 - case Copy200: - return 5 - } - return -1 +func (repType ReplicationType) GetReplicationLevelIndex() int { + switch repType { + case Copy000: + return 0 + case Copy001: + return 1 + case Copy010: + return 2 + case Copy100: + return 3 + case Copy110: + return 4 + case Copy200: + return 5 + } + return -1 } -func (repType ReplicationType)GetCopyCount() int { - switch repType { - case Copy000: - return 1 - case Copy001: - return 2 - case Copy010: - return 2 - case Copy100: - return 2 - case Copy110: - return 3 - case Copy200: - return 3 - } - return 0 +func (repType ReplicationType) GetCopyCount() int { + switch repType { + case Copy000: + return 1 + case Copy001: + return 2 + case Copy010: + return 2 + case Copy100: + return 2 + case Copy110: + return 3 + case Copy200: + return 3 + } + return 0 } diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index a2c5f040b..79cf65e28 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -65,13 +65,13 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error } return e } -func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error { +func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) { if s.volumes[vid] != nil { return errors.New("Volume Id " + vid.String() + " already exists!") } log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType) - s.volumes[vid] = NewVolume(s.dir, vid, replicationType) - return nil + s.volumes[vid], err = NewVolume(s.dir, vid, replicationType) + return err } func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { @@ -107,9 +107,10 @@ func (s *Store) loadExistingVolumes() { base := name[:len(name)-len(".dat")] if vid, err := NewVolumeId(base); err == nil { if s.volumes[vid] == nil { - v := NewVolume(s.dir, vid, CopyNil) - s.volumes[vid] = v - log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size()) + if v, e := NewVolume(s.dir, vid, CopyNil); e == nil { + s.volumes[vid] = v + log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size()) + } } } } diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 0220bf895..5e64d0763 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -24,9 +24,9 @@ type Volume struct { accessLock sync.Mutex } -func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) { +func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) { v = &Volume{dir: dirname, Id: id, replicaType: replicationType} - v.load() + e = v.load() return } func (v *Volume) load() error { @@ -43,6 +43,7 @@ func (v *Volume) load() error { } else { v.maybeWriteSuperBlock() } + // TODO: if .idx not exists, but .cdb exists, then use (but don't load!) that indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) if ie != nil { return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) @@ -79,21 +80,23 @@ func (v *Volume) maybeWriteSuperBlock() { v.dataFile.Write(header) } } -func (v *Volume) readSuperBlock() error { +func (v *Volume) readSuperBlock() (err error) { v.dataFile.Seek(0, 0) header := make([]byte, SuperBlockSize) if _, e := v.dataFile.Read(header); e != nil { return fmt.Errorf("cannot read superblock: %s", e) } - var err error v.version, v.replicaType, err = ParseSuperBlock(header) return err } -func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, e error) { +func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, err error) { version = Version(header[0]) - var err error + if version == 0 { + err = errors.New("Zero version impossible - bad superblock!") + return + } if replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { - e = fmt.Errorf("cannot read replica type: %s", err) + err = fmt.Errorf("cannot read replica type: %s", err) } return } @@ -221,3 +224,39 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string) func (v *Volume) ContentSize() uint64 { return v.nm.fileByteCounter } + +// Walk over the contained needles (call the function with each NeedleValue till error is returned) +func (v *Volume) WalkValues(pedestrian func(*Needle) error) error { + pedplus := func(nv *NeedleValue) (err error) { + n := new(Needle) + if nv.Offset > 0 { + v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0) + if _, err = n.Read(v.dataFile, nv.Size, v.version); err != nil { + return + } + if err = pedestrian(n); err != nil { + return + } + } + return nil + } + return v.nm.Walk(pedplus) +} + +// Walk over the keys +func (v *Volume) WalkKeys(pedestrian func(Key) error) error { + pedplus := func(nv *NeedleValue) (err error) { + if nv.Offset > 0 && nv.Key > 0 { + if err = pedestrian(nv.Key); err != nil { + return + } + } + return nil + } + return v.nm.Walk(pedplus) +} + +func (v *Volume) String() string { + return fmt.Sprintf("%d@%s:v%d:r%s", v.Id, v.dataFile.Name(), + v.Version(), v.replicaType) +} diff --git a/weed-fs/src/pkg/storage/volume_id.go b/weed-fs/src/pkg/storage/volume_id.go index bf7396673..0333c6cf0 100644 --- a/weed-fs/src/pkg/storage/volume_id.go +++ b/weed-fs/src/pkg/storage/volume_id.go @@ -1,17 +1,18 @@ package storage import ( - "strconv" + "strconv" ) type VolumeId uint32 -func NewVolumeId(vid string) (VolumeId,error) { - volumeId, err := strconv.ParseUint(vid, 10, 64) - return VolumeId(volumeId), err + +func NewVolumeId(vid string) (VolumeId, error) { + volumeId, err := strconv.ParseUint(vid, 10, 64) + return VolumeId(volumeId), err } -func (vid *VolumeId) String() string{ - return strconv.FormatUint(uint64(*vid), 10) +func (vid *VolumeId) String() string { + return strconv.FormatUint(uint64(*vid), 10) } -func (vid *VolumeId) Next() VolumeId{ - return VolumeId(uint32(*vid)+1) +func (vid *VolumeId) Next() VolumeId { + return VolumeId(uint32(*vid) + 1) } diff --git a/weed-fs/src/pkg/storage/volume_version.go b/weed-fs/src/pkg/storage/volume_version.go index da91ad038..9702ae904 100644 --- a/weed-fs/src/pkg/storage/volume_version.go +++ b/weed-fs/src/pkg/storage/volume_version.go @@ -1,12 +1,11 @@ package storage -import ( -) +import () type Version uint8 const ( - Version1 = Version(1) - Version2 = Version(2) - CurrentVersion = Version2 + Version1 = Version(1) + Version2 = Version(2) + CurrentVersion = Version2 ) diff --git a/weed-fs/src/pkg/topology/configuration_test.go b/weed-fs/src/pkg/topology/configuration_test.go index 5542d1503..35d82c058 100644 --- a/weed-fs/src/pkg/topology/configuration_test.go +++ b/weed-fs/src/pkg/topology/configuration_test.go @@ -30,13 +30,13 @@ func TestLoadConfiguration(t *testing.T) { ` c, err := NewConfiguration([]byte(confContent)) - - fmt.Printf("%s\n", c) - if err!=nil{ - t.Fatalf("unmarshal error:%s",err.Error()) + + fmt.Printf("%s\n", c) + if err != nil { + t.Fatalf("unmarshal error:%s", err.Error()) } - + if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" { - t.Fatalf("unmarshal error:%s",c) + t.Fatalf("unmarshal error:%s", c) } } diff --git a/weed-fs/src/pkg/topology/data_center.go b/weed-fs/src/pkg/topology/data_center.go index 2ec68fd4b..a3b2b7d13 100644 --- a/weed-fs/src/pkg/topology/data_center.go +++ b/weed-fs/src/pkg/topology/data_center.go @@ -1,7 +1,6 @@ package topology -import ( -) +import () type DataCenter struct { NodeImpl @@ -12,31 +11,31 @@ func NewDataCenter(id string) *DataCenter { dc.id = NodeId(id) dc.nodeType = "DataCenter" dc.children = make(map[NodeId]Node) - dc.NodeImpl.value = dc + dc.NodeImpl.value = dc return dc } func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { - for _, c := range dc.Children() { - rack := c.(*Rack) - if string(rack.Id()) == rackName { - return rack - } - } - rack := NewRack(rackName) - dc.LinkChildNode(rack) - return rack + for _, c := range dc.Children() { + rack := c.(*Rack) + if string(rack.Id()) == rackName { + return rack + } + } + rack := NewRack(rackName) + dc.LinkChildNode(rack) + return rack } -func (dc *DataCenter) ToMap() interface{}{ - m := make(map[string]interface{}) - m["Max"] = dc.GetMaxVolumeCount() - m["Free"] = dc.FreeSpace() - var racks []interface{} - for _, c := range dc.Children() { - rack := c.(*Rack) - racks = append(racks, rack.ToMap()) - } - m["Racks"] = racks - return m +func (dc *DataCenter) ToMap() interface{} { + m := make(map[string]interface{}) + m["Max"] = dc.GetMaxVolumeCount() + m["Free"] = dc.FreeSpace() + var racks []interface{} + for _, c := range dc.Children() { + rack := c.(*Rack) + racks = append(racks, rack.ToMap()) + } + m["Racks"] = racks + return m } diff --git a/weed-fs/src/pkg/topology/node_list.go b/weed-fs/src/pkg/topology/node_list.go index 1d9e1891a..3115e0213 100644 --- a/weed-fs/src/pkg/topology/node_list.go +++ b/weed-fs/src/pkg/topology/node_list.go @@ -37,14 +37,14 @@ func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) { list = append(list, n) } } - if n > len(list){ - return nil,false + if n > len(list) { + return nil, false } for i := n; i > 0; i-- { - r := rand.Intn(i) - t := list[r] - list[r] = list[i-1] - list[i-1] = t + r := rand.Intn(i) + t := list[r] + list[r] = list[i-1] + list[i-1] = t } return list[len(list)-n:], true } diff --git a/weed-fs/src/pkg/topology/node_list_test.go b/weed-fs/src/pkg/topology/node_list_test.go index 0d16a0526..2fb4fa970 100644 --- a/weed-fs/src/pkg/topology/node_list_test.go +++ b/weed-fs/src/pkg/topology/node_list_test.go @@ -1,39 +1,39 @@ package topology import ( + _ "fmt" "strconv" "testing" - _ "fmt" ) func TestXYZ(t *testing.T) { - topo := NewTopology("topo","/etc/weed.conf", "/tmp","test",234,5) + topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5) for i := 0; i < 5; i++ { dc := NewDataCenter("dc" + strconv.Itoa(i)) dc.activeVolumeCount = i dc.maxVolumeCount = 5 topo.LinkChildNode(dc) } - nl := NewNodeList(topo.Children(),nil) + nl := NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(1) - if !ret || len(picked)!=1 { - t.Errorf("need to randomly pick 1 node") - } - - picked, ret = nl.RandomlyPickN(4) - if !ret || len(picked)!=4 { - t.Errorf("need to randomly pick 4 nodes") + picked, ret := nl.RandomlyPickN(1) + if !ret || len(picked) != 1 { + t.Errorf("need to randomly pick 1 node") } - picked, ret = nl.RandomlyPickN(5) - if !ret || len(picked)!=5 { - t.Errorf("need to randomly pick 5 nodes") - } + picked, ret = nl.RandomlyPickN(4) + if !ret || len(picked) != 4 { + t.Errorf("need to randomly pick 4 nodes") + } - picked, ret = nl.RandomlyPickN(6) - if ret || len(picked)!=0 { - t.Errorf("can not randomly pick 6 nodes:", ret, picked) - } + picked, ret = nl.RandomlyPickN(5) + if !ret || len(picked) != 5 { + t.Errorf("need to randomly pick 5 nodes") + } + + picked, ret = nl.RandomlyPickN(6) + if ret || len(picked) != 0 { + t.Errorf("can not randomly pick 6 nodes:", ret, picked) + } } diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go index 1555ef682..acc34417a 100644 --- a/weed-fs/src/pkg/topology/rack.go +++ b/weed-fs/src/pkg/topology/rack.go @@ -19,13 +19,13 @@ func NewRack(id string) *Rack { } func (r *Rack) FindDataNode(ip string, port int) *DataNode { - for _, c := range r.Children() { - dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - return dn - } - } - return nil + for _, c := range r.Children() { + dn := c.(*DataNode) + if dn.MatchLocation(ip, port) { + return dn + } + } + return nil } func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { for _, c := range r.Children() { diff --git a/weed-fs/src/pkg/topology/topo_test.go b/weed-fs/src/pkg/topology/topo_test.go index 83356b38c..71a901c8e 100644 --- a/weed-fs/src/pkg/topology/topo_test.go +++ b/weed-fs/src/pkg/topology/topo_test.go @@ -78,7 +78,7 @@ func setup(topologyLayout string) *Topology { } //need to connect all nodes first before server adding volumes - topo := NewTopology("mynetwork","/etc/weed.conf","/tmp","test",234,5) + topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5) mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := NewDataCenter(dcKey) @@ -94,7 +94,7 @@ func setup(topologyLayout string) *Topology { rack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) - vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion} + vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) @@ -119,9 +119,9 @@ func TestRemoveDataCenter(t *testing.T) { func TestReserveOneVolume(t *testing.T) { topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - rand.Seed(1) + rand.Seed(time.Now().UnixNano()) + rand.Seed(1) ret, node, vid := topo.RandomlyReserveOneVolume() - fmt.Println("assigned :", ret, ", node :", node,", volume id:", vid) + fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid) } diff --git a/weed-fs/src/pkg/topology/topology_compact.go b/weed-fs/src/pkg/topology/topology_compact.go index 050fe4cd8..dee6514f4 100644 --- a/weed-fs/src/pkg/topology/topology_compact.go +++ b/weed-fs/src/pkg/topology/topology_compact.go @@ -101,10 +101,10 @@ type VacuumVolumeResult struct { func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) { values := make(url.Values) values.Add("volume", vid.String()) - values.Add("garbageThreshold", garbageThreshold) + values.Add("garbageThreshold", garbageThreshold) jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values) if err != nil { - fmt.Println("parameters:",values) + fmt.Println("parameters:", values) return err, false } var ret VacuumVolumeResult diff --git a/weed-fs/src/pkg/topology/topology_event_handling.go b/weed-fs/src/pkg/topology/topology_event_handling.go index b33b4d768..debedc3d3 100644 --- a/weed-fs/src/pkg/topology/topology_event_handling.go +++ b/weed-fs/src/pkg/topology/topology_event_handling.go @@ -52,14 +52,14 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { vl := t.GetVolumeLayout(v.RepType) vl.SetVolumeUnavailable(dn, v.Id) } - dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) + dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) dn.Parent().UnlinkChildNode(dn.Id()) } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.RepType) + vl := t.GetVolumeLayout(v.RepType) if vl.isWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) } diff --git a/weed-fs/src/pkg/topology/topology_map.go b/weed-fs/src/pkg/topology/topology_map.go index 9ccf08ae3..b416ee943 100644 --- a/weed-fs/src/pkg/topology/topology_map.go +++ b/weed-fs/src/pkg/topology/topology_map.go @@ -1,7 +1,6 @@ package topology -import ( -) +import () func (t *Topology) ToMap() interface{} { m := make(map[string]interface{}) diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go index 64d8cdf43..507a240b5 100644 --- a/weed-fs/src/pkg/topology/volume_location.go +++ b/weed-fs/src/pkg/topology/volume_location.go @@ -15,7 +15,7 @@ func (dnll *VolumeLocationList) Head() *DataNode { } func (dnll *VolumeLocationList) Length() int { - return len(dnll.list) + return len(dnll.list) } func (dnll *VolumeLocationList) Add(loc *DataNode) bool { @@ -29,13 +29,13 @@ func (dnll *VolumeLocationList) Add(loc *DataNode) bool { } func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { - for i, dnl := range dnll.list { - if loc.Ip == dnl.Ip && loc.Port == dnl.Port { - dnll.list = append(dnll.list[:i],dnll.list[i+1:]...) - return true - } - } - return false + for i, dnl := range dnll.list { + if loc.Ip == dnl.Ip && loc.Port == dnl.Port { + dnll.list = append(dnll.list[:i], dnll.list[i+1:]...) + return true + } + } + return false } func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { diff --git a/weed-fs/src/pkg/util/bytes.go b/weed-fs/src/pkg/util/bytes.go index 177da06db..6cc3d7018 100644 --- a/weed-fs/src/pkg/util/bytes.go +++ b/weed-fs/src/pkg/util/bytes.go @@ -1,34 +1,33 @@ package util -func BytesToUint64(b []byte)(v uint64){ - length := uint(len(b)) - for i :=uint(0);i>(i*8)) - } +func Uint64toBytes(b []byte, v uint64) { + for i := uint(0); i < 8; i++ { + b[7-i] = byte(v >> (i * 8)) + } } -func Uint32toBytes(b []byte, v uint32){ - for i :=uint(0);i<4;i++ { - b[3-i] = byte(v>>(i*8)) - } +func Uint32toBytes(b []byte, v uint32) { + for i := uint(0); i < 4; i++ { + b[3-i] = byte(v >> (i * 8)) + } } -func Uint8toBytes(b []byte, v uint8){ - b[0] = byte(v) +func Uint8toBytes(b []byte, v uint8) { + b[0] = byte(v) } - diff --git a/weed-fs/src/pkg/util/parse.go b/weed-fs/src/pkg/util/parse.go index 6a4350e72..930da9522 100644 --- a/weed-fs/src/pkg/util/parse.go +++ b/weed-fs/src/pkg/util/parse.go @@ -1,16 +1,16 @@ package util import ( - "strconv" + "strconv" ) -func ParseInt(text string, defaultValue int) int{ - count, parseError := strconv.ParseUint(text,10,64) - if parseError!=nil { - if len(text)>0{ - return 0 - } - return defaultValue - } - return int(count) +func ParseInt(text string, defaultValue int) int { + count, parseError := strconv.ParseUint(text, 10, 64) + if parseError != nil { + if len(text) > 0 { + return 0 + } + return defaultValue + } + return int(count) } diff --git a/weed-fs/src/pkg/util/post.go b/weed-fs/src/pkg/util/post.go index f643faa6b..6e6ab0003 100644 --- a/weed-fs/src/pkg/util/post.go +++ b/weed-fs/src/pkg/util/post.go @@ -16,7 +16,7 @@ func Post(url string, values url.Values) ([]byte, error) { defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) if err != nil { - log.Println("read post result from", url, err) + log.Println("read post result from", url, err) return nil, err } return b, nil