From ce7394f3bfe50b6dcb7ff1682bd7e5e1b5574da2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tam=C3=A1s=20Gul=C3=A1csi?= Date: Fri, 4 Jan 2013 22:49:40 +0100 Subject: [PATCH 1/7] sort writables to be able to skew write probability to lesser ids (older volumes) --- weed-fs/src/pkg/topology/volume_layout.go | 40 ++++++++++++++++++++--- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go index 314aca69f..23802ca81 100644 --- a/weed-fs/src/pkg/topology/volume_layout.go +++ b/weed-fs/src/pkg/topology/volume_layout.go @@ -5,12 +5,15 @@ import ( "fmt" "math/rand" "pkg/storage" + "sort" ) +type volumeIdList []storage.VolumeId + type VolumeLayout struct { repType storage.ReplicationType vid2location map[storage.VolumeId]*VolumeLocationList - writables []storage.VolumeId // transient array of writable volume id + writables volumeIdList // transient (sorted!) array of writable volume Ids pulse int64 volumeSizeLimit uint64 } @@ -19,7 +22,7 @@ func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pu return &VolumeLayout{ repType: repType, vid2location: make(map[storage.VolumeId]*VolumeLocationList), - writables: *new([]storage.VolumeId), + writables: make(volumeIdList, 0, 4), pulse: pulse, volumeSizeLimit: volumeSizeLimit, } @@ -33,13 +36,16 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() { if vl.isWritable(v) { vl.writables = append(vl.writables, v.Id) + if len(vl.writables) > 1 { + vl.writables.Sort() + } } } } } -func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool{ - return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion +func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { + return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion } func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { @@ -52,7 +58,13 @@ func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *Volume fmt.Println("No more writable volumes!") return nil, 0, nil, errors.New("No more writable volumes!") } - vid := vl.writables[rand.Intn(len_writers)] + var vid storage.VolumeId + if len_writers == 1 { + vid = vl.writables[0] + } else { + // skew for lesser indices + vid = vl.writables[rand.Intn(len_writers+1)%len_writers] + } locationList := vl.vid2location[vid] if locationList != nil { return &vid, count, locationList, nil @@ -82,6 +94,9 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { } fmt.Println("Volume", vid, "becomes writable") vl.writables = append(vl.writables, vid) + if len(vl.writables) > 1 { + vl.writables.Sort() + } return true } @@ -114,3 +129,18 @@ func (vl *VolumeLayout) ToMap() interface{} { //m["locations"] = vl.vid2location return m } + +func (vls volumeIdList) Len() int { return len(vls) } + +func (vls volumeIdList) Less(i, j int) bool { + return vls[i] < vls[j] +} + +func (vls volumeIdList) Swap(i, j int) { + vls[i], vls[j] = vls[j], vls[i] +} + +// convienence sorting +func (vls volumeIdList) Sort() { + sort.Sort(vls) +} From 824371035109225128f8942b64a817838a7c0c25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tam=C3=A1s=20Gul=C3=A1csi?= Date: Fri, 4 Jan 2013 23:33:07 +0100 Subject: [PATCH 2/7] add error return for (Un)GzipData --- weed-fs/src/cmd/weed/volume.go | 6 ++-- weed-fs/src/pkg/storage/compress.go | 44 ++++++++++------------------- weed-fs/src/pkg/storage/needle.go | 8 ++++-- 3 files changed, 24 insertions(+), 34 deletions(-) diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index 59a5623ea..576096dbb 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -42,7 +42,7 @@ var ( store *storage.Store ) -var fileNameEscaper = strings.NewReplacer("\\","\\\\","\"","\\\"") +var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") func statusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) @@ -156,7 +156,9 @@ func GetHandler(w http.ResponseWriter, r *http.Request) { if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { w.Header().Set("Content-Encoding", "gzip") } else { - n.Data = storage.UnGzipData(n.Data) + if n.Data, err = storage.UnGzipData(n.Data); err != nil { + debug("lookup error:", err, r.URL.Path) + } } } } diff --git a/weed-fs/src/pkg/storage/compress.go b/weed-fs/src/pkg/storage/compress.go index 9df85b4da..35de70600 100644 --- a/weed-fs/src/pkg/storage/compress.go +++ b/weed-fs/src/pkg/storage/compress.go @@ -10,54 +10,40 @@ import ( /* * Default more not to gzip since gzip can be done on client side. -*/ + */ func IsGzippable(ext, mtype string) bool { - if strings.HasPrefix(mtype, "text/"){ - return true - } - if ext == ".zip" { - return false - } - if ext == ".rar" { - return false - } - if ext == ".gz" { - return false - } - if ext == ".pdf" { + if strings.HasPrefix(mtype, "text/") { return true } - if ext == ".css" { + switch ext { + case ".zip", ".rar", ".gz", ".bz2", ".xz": + return false + case ".pdf", ".txt", ".html", ".css", ".js", ".json": return true } - if ext == ".js" { - return true - } - if ext == ".json" { - return true - } if strings.HasPrefix(mtype, "application/") { - if strings.HasSuffix(mtype, "xml") { - return true - } - if strings.HasSuffix(mtype, "script") { + if strings.HasSuffix(mtype, "xml") || + strings.HasSuffix(mtype, "script") { return true } } return false } -func GzipData(input []byte) []byte { + +func GzipData(input []byte) ([]byte, error) { buf := new(bytes.Buffer) w, _ := gzip.NewWriterLevel(buf, flate.BestCompression) if _, err := w.Write(input); err != nil { println("error compressing data:", err) + return nil, err } if err := w.Close(); err != nil { println("error closing compressed data:", err) + return nil, err } - return buf.Bytes() + return buf.Bytes(), nil } -func UnGzipData(input []byte) []byte { +func UnGzipData(input []byte) ([]byte, error) { buf := bytes.NewBuffer(input) r, _ := gzip.NewReader(buf) defer r.Close() @@ -65,5 +51,5 @@ func UnGzipData(input []byte) []byte { if err != nil { println("error uncompressing data:", err) } - return output + return output, err } diff --git a/weed-fs/src/pkg/storage/needle.go b/weed-fs/src/pkg/storage/needle.go index 867852362..1f778c7ff 100644 --- a/weed-fs/src/pkg/storage/needle.go +++ b/weed-fs/src/pkg/storage/needle.go @@ -12,8 +12,8 @@ import ( ) const ( - NeedleHeaderSize = 16 //should never change this - NeedlePaddingSize = 8 + NeedleHeaderSize = 16 //should never change this + NeedlePaddingSize = 8 NeedleChecksumSize = 4 ) @@ -64,7 +64,9 @@ func NewNeedle(r *http.Request) (n *Needle, fname string, e error) { mtype = contentType } if IsGzippable(ext, mtype) { - data = GzipData(data) + if data, e = GzipData(data); e != nil { + return + } n.SetGzipped() } if ext == ".gz" { From 5d2a1e8d4845e7a7f1dccd962bb0ee6a5f9d6081 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tam=C3=A1s=20Gul=C3=A1csi?= Date: Sat, 5 Jan 2013 23:06:44 +0100 Subject: [PATCH 3/7] add cmd/dump - a dumper Walk needed to be added to NeedleMap and CompactMap, to be able to add WalkKeys and WalkValues to volume. This is needed for iterating through all the stored needles in a volume - this was dump's purpose. --- weed-fs/src/cmd/dump/main.go | 96 +++++++++ weed-fs/src/cmd/weed/command.go | 59 +++-- weed-fs/src/cmd/weed/shell.go | 73 ++++--- weed-fs/src/cmd/weed/upload.go | 2 +- weed-fs/src/cmd/weed/weed.go | 4 +- weed-fs/src/pkg/directory/file_id.go | 6 +- weed-fs/src/pkg/operation/allocate_volume.go | 44 ++-- weed-fs/src/pkg/operation/delete_content.go | 2 +- weed-fs/src/pkg/operation/lookup_volume_id.go | 50 ++--- weed-fs/src/pkg/operation/upload_content.go | 18 +- weed-fs/src/pkg/replication/volume_growth.go | 10 +- .../src/pkg/replication/volume_growth_test.go | 17 +- weed-fs/src/pkg/sequence/sequence.go | 52 ++--- weed-fs/src/pkg/storage/compact_map.go | 20 ++ .../src/pkg/storage/compact_map_perf_test.go | 60 +++--- weed-fs/src/pkg/storage/compact_map_test.go | 38 ++-- weed-fs/src/pkg/storage/needle_map.go | 5 + weed-fs/src/pkg/storage/needle_read_write.go | 9 +- weed-fs/src/pkg/storage/replication_type.go | 202 +++++++++--------- weed-fs/src/pkg/storage/store.go | 13 +- weed-fs/src/pkg/storage/volume.go | 53 ++++- weed-fs/src/pkg/storage/volume_id.go | 17 +- weed-fs/src/pkg/storage/volume_version.go | 9 +- .../src/pkg/topology/configuration_test.go | 12 +- weed-fs/src/pkg/topology/data_center.go | 45 ++-- weed-fs/src/pkg/topology/node_list.go | 12 +- weed-fs/src/pkg/topology/node_list_test.go | 38 ++-- weed-fs/src/pkg/topology/rack.go | 14 +- weed-fs/src/pkg/topology/topo_test.go | 10 +- weed-fs/src/pkg/topology/topology_compact.go | 4 +- .../pkg/topology/topology_event_handling.go | 4 +- weed-fs/src/pkg/topology/topology_map.go | 3 +- weed-fs/src/pkg/topology/volume_location.go | 16 +- weed-fs/src/pkg/util/bytes.go | 53 +++-- weed-fs/src/pkg/util/parse.go | 20 +- weed-fs/src/pkg/util/post.go | 2 +- 36 files changed, 624 insertions(+), 468 deletions(-) create mode 100644 weed-fs/src/cmd/dump/main.go diff --git a/weed-fs/src/cmd/dump/main.go b/weed-fs/src/cmd/dump/main.go new file mode 100644 index 000000000..e3e151eb7 --- /dev/null +++ b/weed-fs/src/cmd/dump/main.go @@ -0,0 +1,96 @@ +// Copyright Tamás Gulácsi 2013 All rights reserved +// Use of this source is governed by the same rules as the weed-fs library. +// If this would be ambigous, than Apache License 2.0 has to be used. +// +// dump dumps the files of a volume to tar or unique files. +// Each file will have id#mimetype#original_name file format + +package main + +import ( + "archive/tar" + "bytes" + "flag" + "fmt" + // "io" + "log" + "os" + "pkg/storage" + "strings" + "time" +) + +var ( + volumePath = flag.String("dir", "/tmp", "volume directory") + volumeId = flag.Int("id", 0, "volume Id") + dest = flag.String("out", "-", "output path. Produces tar if path ends with .tar; creates files otherwise.") + tarFh *tar.Writer + tarHeader tar.Header + counter int +) + +func main() { + var err error + + flag.Parse() + + if *dest == "-" { + *dest = "" + } + if *dest == "" || strings.HasSuffix(*dest, ".tar") { + var fh *os.File + if *dest == "" { + fh = os.Stdout + } else { + if fh, err = os.Create(*dest); err != nil { + log.Printf("cannot open output tar %s: %s", *dest, err) + return + } + } + defer fh.Close() + tarFh = tar.NewWriter(fh) + defer tarFh.Close() + t := time.Now() + tarHeader = tar.Header{Mode: 0644, + ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(), + Typeflag: tar.TypeReg, + AccessTime: t, ChangeTime: t} + } + + v, err := storage.NewVolume(*volumePath, storage.VolumeId(*volumeId), storage.CopyNil) + if v == nil || v.Version() == 0 || err != nil { + log.Printf("cannot load volume %d from %s (%s): %s", *volumeId, *volumePath, v, err) + return + } + log.Printf("volume: %s (ver. %d)", v, v.Version()) + if err := v.WalkValues(walker); err != nil { + log.Printf("error while walking: %s", err) + return + } + + log.Printf("%d files written.", counter) +} + +func walker(n *storage.Needle) (err error) { + // log.Printf("Id=%d Size=%d Name=%s mime=%s", n.Id, n.Size, n.Name, n.Mime) + nm := fmt.Sprintf("%d#%s#%s", n.Id, bytes.Replace(n.Mime, []byte{'/'}, []byte{'_'}, -1), n.Name) + // log.Print(nm) + if tarFh != nil { + tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data)) + if err = tarFh.WriteHeader(&tarHeader); err != nil { + return err + } + _, err = tarFh.Write(n.Data) + } else { + if fh, e := os.Create(*dest + "/" + nm); e != nil { + return e + } else { + defer fh.Close() + _, err = fh.Write(n.Data) + } + } + if err == nil { + counter++ + } + return +} diff --git a/weed-fs/src/cmd/weed/command.go b/weed-fs/src/cmd/weed/command.go index 8c725cafb..4d68ff151 100644 --- a/weed-fs/src/cmd/weed/command.go +++ b/weed-fs/src/cmd/weed/command.go @@ -1,53 +1,52 @@ package main import ( - "flag" - "fmt" - "os" - "strings" + "flag" + "fmt" + "os" + "strings" ) type Command struct { - // Run runs the command. - // The args are the arguments after the command name. - Run func(cmd *Command, args []string) bool + // Run runs the command. + // The args are the arguments after the command name. + Run func(cmd *Command, args []string) bool - // UsageLine is the one-line usage message. - // The first word in the line is taken to be the command name. - UsageLine string + // UsageLine is the one-line usage message. + // The first word in the line is taken to be the command name. + UsageLine string - // Short is the short description shown in the 'go help' output. - Short string + // Short is the short description shown in the 'go help' output. + Short string - // Long is the long message shown in the 'go help ' output. - Long string - - // Flag is a set of flags specific to this command. - Flag flag.FlagSet + // Long is the long message shown in the 'go help ' output. + Long string + // Flag is a set of flags specific to this command. + Flag flag.FlagSet } // Name returns the command's name: the first word in the usage line. func (c *Command) Name() string { - name := c.UsageLine - i := strings.Index(name, " ") - if i >= 0 { - name = name[:i] - } - return name + name := c.UsageLine + i := strings.Index(name, " ") + if i >= 0 { + name = name[:i] + } + return name } func (c *Command) Usage() { - fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) - fmt.Fprintf(os.Stderr, "Default Usage:\n") - c.Flag.PrintDefaults() - fmt.Fprintf(os.Stderr, "Description:\n") - fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) - os.Exit(2) + fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) + fmt.Fprintf(os.Stderr, "Default Usage:\n") + c.Flag.PrintDefaults() + fmt.Fprintf(os.Stderr, "Description:\n") + fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) + os.Exit(2) } // Runnable reports whether the command can be run; otherwise // it is a documentation pseudo-command such as importpath. func (c *Command) Runnable() bool { - return c.Run != nil + return c.Run != nil } diff --git a/weed-fs/src/cmd/weed/shell.go b/weed-fs/src/cmd/weed/shell.go index 78a4b9eb1..daf0b7e1f 100644 --- a/weed-fs/src/cmd/weed/shell.go +++ b/weed-fs/src/cmd/weed/shell.go @@ -1,54 +1,53 @@ package main import ( - "bufio" - "os" - "fmt" + "bufio" + "fmt" + "os" ) func init() { - cmdShell.Run = runShell // break init cycle + cmdShell.Run = runShell // break init cycle } var cmdShell = &Command{ - UsageLine: "shell", - Short: "run interactive commands, now just echo", - Long: `run interactive commands. + UsageLine: "shell", + Short: "run interactive commands, now just echo", + Long: `run interactive commands. `, } -var ( -) +var () func runShell(command *Command, args []string) bool { - r := bufio.NewReader(os.Stdin) - o := bufio.NewWriter(os.Stdout) - e := bufio.NewWriter(os.Stderr) - prompt := func () { - o.WriteString("> ") - o.Flush() - }; - readLine := func () string { - ret, err := r.ReadString('\n') - if err != nil { - fmt.Fprint(e,err); - os.Exit(1) - } - return ret - } - execCmd := func (cmd string) int { - if cmd != "" { - o.WriteString(cmd) - } - return 0 - } + r := bufio.NewReader(os.Stdin) + o := bufio.NewWriter(os.Stdout) + e := bufio.NewWriter(os.Stderr) + prompt := func() { + o.WriteString("> ") + o.Flush() + } + readLine := func() string { + ret, err := r.ReadString('\n') + if err != nil { + fmt.Fprint(e, err) + os.Exit(1) + } + return ret + } + execCmd := func(cmd string) int { + if cmd != "" { + o.WriteString(cmd) + } + return 0 + } - cmd := "" - for { - prompt() - cmd = readLine() - execCmd(cmd) - } - return true + cmd := "" + for { + prompt() + cmd = readLine() + execCmd(cmd) + } + return true } diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go index e25930b5d..5707fda56 100644 --- a/weed-fs/src/cmd/weed/upload.go +++ b/weed-fs/src/cmd/weed/upload.go @@ -67,7 +67,7 @@ func upload(filename string, server string, fid string) (int, error) { } ret, e := operation.Upload("http://"+server+"/"+fid, filename, fh) if e != nil { - return 0, e + return 0, e } return ret.Size, e } diff --git a/weed-fs/src/cmd/weed/weed.go b/weed-fs/src/cmd/weed/weed.go index 232520e75..e2eb41ced 100644 --- a/weed-fs/src/cmd/weed/weed.go +++ b/weed-fs/src/cmd/weed/weed.go @@ -175,9 +175,9 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { w.Header().Set("Content-Type", "application/javascript") var bytes []byte if r.FormValue("pretty") != "" { - bytes, _ = json.MarshalIndent(obj, "", " ") + bytes, _ = json.MarshalIndent(obj, "", " ") } else { - bytes, _ = json.Marshal(obj) + bytes, _ = json.Marshal(obj) } callback := r.FormValue("callback") if callback == "" { diff --git a/weed-fs/src/pkg/directory/file_id.go b/weed-fs/src/pkg/directory/file_id.go index 9ce556580..cd4204f32 100644 --- a/weed-fs/src/pkg/directory/file_id.go +++ b/weed-fs/src/pkg/directory/file_id.go @@ -3,8 +3,8 @@ package directory import ( "encoding/hex" "pkg/storage" - "strings" "pkg/util" + "strings" ) type FileId struct { @@ -16,14 +16,14 @@ type FileId struct { func NewFileId(VolumeId storage.VolumeId, Key uint64, Hashcode uint32) *FileId { return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode} } -func ParseFileId(fid string) *FileId{ +func ParseFileId(fid string) *FileId { a := strings.Split(fid, ",") if len(a) != 2 { println("Invalid fid", fid, ", split length", len(a)) return nil } vid_string, key_hash_string := a[0], a[1] - volumeId, _ := storage.NewVolumeId(vid_string) + volumeId, _ := storage.NewVolumeId(vid_string) key, hash := storage.ParseKeyHash(key_hash_string) return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash} } diff --git a/weed-fs/src/pkg/operation/allocate_volume.go b/weed-fs/src/pkg/operation/allocate_volume.go index 6a3512896..c93ccfb62 100644 --- a/weed-fs/src/pkg/operation/allocate_volume.go +++ b/weed-fs/src/pkg/operation/allocate_volume.go @@ -1,32 +1,32 @@ package operation import ( - "encoding/json" - "errors" - "net/url" - "pkg/storage" - "pkg/topology" - "pkg/util" + "encoding/json" + "errors" + "net/url" + "pkg/storage" + "pkg/topology" + "pkg/util" ) type AllocateVolumeResult struct { - Error string + Error string } func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error { - values := make(url.Values) - values.Add("volume", vid.String()) - values.Add("replicationType", repType.String()) - jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) - if err != nil { - return err - } - var ret AllocateVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err - } - if ret.Error != "" { - return errors.New(ret.Error) - } - return nil + values := make(url.Values) + values.Add("volume", vid.String()) + values.Add("replicationType", repType.String()) + jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) + if err != nil { + return err + } + var ret AllocateVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil } diff --git a/weed-fs/src/pkg/operation/delete_content.go b/weed-fs/src/pkg/operation/delete_content.go index aeab9c3ac..2bdb49651 100644 --- a/weed-fs/src/pkg/operation/delete_content.go +++ b/weed-fs/src/pkg/operation/delete_content.go @@ -1,8 +1,8 @@ package operation import ( - "net/http" "log" + "net/http" ) func Delete(url string) error { diff --git a/weed-fs/src/pkg/operation/lookup_volume_id.go b/weed-fs/src/pkg/operation/lookup_volume_id.go index c46c6670e..50a6d91e6 100644 --- a/weed-fs/src/pkg/operation/lookup_volume_id.go +++ b/weed-fs/src/pkg/operation/lookup_volume_id.go @@ -1,38 +1,38 @@ package operation import ( - "encoding/json" - "net/url" - "pkg/storage" - "pkg/util" - _ "fmt" - "errors" + "encoding/json" + "errors" + _ "fmt" + "net/url" + "pkg/storage" + "pkg/util" ) type Location struct { - Url string "url" - PublicUrl string "publicUrl" + Url string "url" + PublicUrl string "publicUrl" } type LookupResult struct { - Locations []Location "locations" - Error string "error" + Locations []Location "locations" + Error string "error" } //TODO: Add a caching for vid here func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) { - values := make(url.Values) - values.Add("volumeId", vid.String()) - jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) - if err != nil { - return nil, err - } - var ret LookupResult - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, err - } - if ret.Error != ""{ - return nil, errors.New(ret.Error) - } - return &ret, nil + values := make(url.Values) + values.Add("volumeId", vid.String()) + jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) + if err != nil { + return nil, err + } + var ret LookupResult + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, err + } + if ret.Error != "" { + return nil, errors.New(ret.Error) + } + return &ret, nil } diff --git a/weed-fs/src/pkg/operation/upload_content.go b/weed-fs/src/pkg/operation/upload_content.go index 7ed74e02f..0bdb697da 100644 --- a/weed-fs/src/pkg/operation/upload_content.go +++ b/weed-fs/src/pkg/operation/upload_content.go @@ -3,18 +3,18 @@ package operation import ( "bytes" "encoding/json" + "errors" _ "fmt" "io" "io/ioutil" - "log" + "log" "mime/multipart" "net/http" - "errors" ) type UploadResult struct { - Size int - Error string + Size int + Error string } func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) { @@ -26,7 +26,7 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, body_writer.Close() resp, err := http.Post(uploadUrl, content_type, body_buf) if err != nil { - log.Println("failing to upload to", uploadUrl) + log.Println("failing to upload to", uploadUrl) return nil, err } defer resp.Body.Close() @@ -37,11 +37,11 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, var ret UploadResult err = json.Unmarshal(resp_body, &ret) if err != nil { - log.Println("failing to read upload resonse", uploadUrl, resp_body) - return nil, err + log.Println("failing to read upload resonse", uploadUrl, resp_body) + return nil, err } - if ret.Error != ""{ - return nil, errors.New(ret.Error) + if ret.Error != "" { + return nil, errors.New(ret.Error) } return &ret, nil } diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go index 7cabf626e..ce0094a7c 100644 --- a/weed-fs/src/pkg/replication/volume_growth.go +++ b/weed-fs/src/pkg/replication/volume_growth.go @@ -7,7 +7,7 @@ import ( "pkg/operation" "pkg/storage" "pkg/topology" - "sync" + "sync" ) /* @@ -24,7 +24,7 @@ type VolumeGrowth struct { copy3factor int copyAll int - accessLock sync.Mutex + accessLock sync.Mutex } func NewDefaultVolumeGrowth() *VolumeGrowth { @@ -49,8 +49,8 @@ func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topolo return 0, errors.New("Unknown Replication Type!") } func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) { - vg.accessLock.Lock() - defer vg.accessLock.Unlock() + vg.accessLock.Lock() + defer vg.accessLock.Unlock() counter = 0 switch repType { @@ -182,7 +182,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { for _, server := range servers { if err := operation.AllocateVolume(server, vid, repType); err == nil { - vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version:storage.CurrentVersion} + vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(&vi, server) fmt.Println("Created Volume", vid, "on", server) diff --git a/weed-fs/src/pkg/replication/volume_growth_test.go b/weed-fs/src/pkg/replication/volume_growth_test.go index 51e47c193..659564c64 100644 --- a/weed-fs/src/pkg/replication/volume_growth_test.go +++ b/weed-fs/src/pkg/replication/volume_growth_test.go @@ -5,7 +5,7 @@ import ( "fmt" "math/rand" "pkg/storage" - "pkg/topology" + "pkg/topology" "testing" "time" ) @@ -80,7 +80,7 @@ func setup(topologyLayout string) *topology.Topology { fmt.Println("data:", data) //need to connect all nodes first before server adding volumes - topo := topology.NewTopology("mynetwork","/etc/weedfs/weedfs.conf","/tmp","testing",32*1024, 5) + topo := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", "/tmp", "testing", 32*1024, 5) mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := topology.NewDataCenter(dcKey) @@ -96,7 +96,7 @@ func setup(topologyLayout string) *topology.Topology { rack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) - vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion} + vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) @@ -121,10 +121,9 @@ func TestRemoveDataCenter(t *testing.T) { func TestReserveOneVolume(t *testing.T) { topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - vg:=&VolumeGrowth{copy1factor:3,copy2factor:2,copy3factor:1,copyAll:4} - if c, e := vg.GrowByCountAndType(1,storage.Copy000,topo);e==nil{ - t.Log("reserved", c) - } + rand.Seed(time.Now().UnixNano()) + vg := &VolumeGrowth{copy1factor: 3, copy2factor: 2, copy3factor: 1, copyAll: 4} + if c, e := vg.GrowByCountAndType(1, storage.Copy000, topo); e == nil { + t.Log("reserved", c) + } } - diff --git a/weed-fs/src/pkg/sequence/sequence.go b/weed-fs/src/pkg/sequence/sequence.go index bfdf1b368..c85289468 100644 --- a/weed-fs/src/pkg/sequence/sequence.go +++ b/weed-fs/src/pkg/sequence/sequence.go @@ -1,11 +1,11 @@ package sequence import ( - "encoding/gob" - "os" - "path" - "sync" + "encoding/gob" "log" + "os" + "path" + "sync" ) const ( @@ -27,21 +27,21 @@ type SequencerImpl struct { } func NewSequencer(dirname string, filename string) (m *SequencerImpl) { - m = &SequencerImpl{dir: dirname, fileName: filename} + m = &SequencerImpl{dir: dirname, fileName: filename} - seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) - if se != nil { - m.FileIdSequence = FileIdSaveInterval - log.Println("Setting file id sequence", m.FileIdSequence) - } else { - decoder := gob.NewDecoder(seqFile) - defer seqFile.Close() - decoder.Decode(&m.FileIdSequence) - log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) - //in case the server stops between intervals - m.FileIdSequence += FileIdSaveInterval - } - return + seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) + if se != nil { + m.FileIdSequence = FileIdSaveInterval + log.Println("Setting file id sequence", m.FileIdSequence) + } else { + decoder := gob.NewDecoder(seqFile) + defer seqFile.Close() + decoder.Decode(&m.FileIdSequence) + log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) + //in case the server stops between intervals + m.FileIdSequence += FileIdSaveInterval + } + return } //count should be 1 or more @@ -60,12 +60,12 @@ func (m *SequencerImpl) NextFileId(count int) (uint64, int) { return m.FileIdSequence - m.fileIdCounter, count } func (m *SequencerImpl) saveSequence() { - log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) - seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) - if e != nil { - log.Fatalf("Sequence File Save [ERROR] %s\n", e) - } - defer seqFile.Close() - encoder := gob.NewEncoder(seqFile) - encoder.Encode(m.FileIdSequence) + log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) + seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) + if e != nil { + log.Fatalf("Sequence File Save [ERROR] %s\n", e) + } + defer seqFile.Close() + encoder := gob.NewEncoder(seqFile) + encoder.Encode(m.FileIdSequence) } diff --git a/weed-fs/src/pkg/storage/compact_map.go b/weed-fs/src/pkg/storage/compact_map.go index 7365022ea..90ed42198 100644 --- a/weed-fs/src/pkg/storage/compact_map.go +++ b/weed-fs/src/pkg/storage/compact_map.go @@ -175,3 +175,23 @@ func (cm *CompactMap) Peek() { } } } + +// iterate over the keys by calling iterate on each key till error is returned +func (cm *CompactMap) Walk(pedestrian func(*NeedleValue) error) (err error) { + var i int + for _, cs := range cm.list { + for key := cs.start; key < cs.end; key++ { + if i = cs.binarySearchValues(key); i >= 0 { + if err = pedestrian(&cs.values[i]); err != nil { + return + } + } + } + for _, val := range cs.overflow { + if err = pedestrian(val); err != nil { + return err + } + } + } + return nil +} diff --git a/weed-fs/src/pkg/storage/compact_map_perf_test.go b/weed-fs/src/pkg/storage/compact_map_perf_test.go index 2e2227279..cfa521fc8 100644 --- a/weed-fs/src/pkg/storage/compact_map_perf_test.go +++ b/weed-fs/src/pkg/storage/compact_map_perf_test.go @@ -1,43 +1,43 @@ package storage import ( + "log" + "os" + "pkg/util" "testing" - "log" - "os" - "pkg/util" ) func TestMemoryUsage(t *testing.T) { - indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644) - if ie != nil { - log.Fatalln(ie) - } - LoadNewNeedleMap(indexFile) - + indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644) + if ie != nil { + log.Fatalln(ie) + } + LoadNewNeedleMap(indexFile) + } func LoadNewNeedleMap(file *os.File) CompactMap { - m := NewCompactMap() - bytes := make([]byte, 16*1024) - count, e := file.Read(bytes) - if count > 0 { - fstat, _ := file.Stat() - log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) - } - for count > 0 && e == nil { - for i := 0; i < count; i += 16 { - key := util.BytesToUint64(bytes[i : i+8]) - offset := util.BytesToUint32(bytes[i+8 : i+12]) - size := util.BytesToUint32(bytes[i+12 : i+16]) - if offset > 0 { - m.Set(Key(key), offset, size) - } else { - //delete(m, key) - } - } + m := NewCompactMap() + bytes := make([]byte, 16*1024) + count, e := file.Read(bytes) + if count > 0 { + fstat, _ := file.Stat() + log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) + } + for count > 0 && e == nil { + for i := 0; i < count; i += 16 { + key := util.BytesToUint64(bytes[i : i+8]) + offset := util.BytesToUint32(bytes[i+8 : i+12]) + size := util.BytesToUint32(bytes[i+12 : i+16]) + if offset > 0 { + m.Set(Key(key), offset, size) + } else { + //delete(m, key) + } + } - count, e = file.Read(bytes) - } - return m + count, e = file.Read(bytes) + } + return m } diff --git a/weed-fs/src/pkg/storage/compact_map_test.go b/weed-fs/src/pkg/storage/compact_map_test.go index c05515b29..e76e9578d 100644 --- a/weed-fs/src/pkg/storage/compact_map_test.go +++ b/weed-fs/src/pkg/storage/compact_map_test.go @@ -18,42 +18,42 @@ func TestXYZ(t *testing.T) { m.Set(Key(i), i+11, i+5) } -// for i := uint32(0); i < 100; i++ { -// if v := m.Get(Key(i)); v != nil { -// println(i, "=", v.Key, v.Offset, v.Size) -// } -// } - + // for i := uint32(0); i < 100; i++ { + // if v := m.Get(Key(i)); v != nil { + // println(i, "=", v.Key, v.Offset, v.Size) + // } + // } + for i := uint32(0); i < 10*batch; i++ { - v, ok := m.Get(Key(i)) + v, ok := m.Get(Key(i)) if i%3 == 0 { - if !ok { - t.Fatal("key", i, "missing!") - } + if !ok { + t.Fatal("key", i, "missing!") + } if v.Size != i+5 { t.Fatal("key", i, "size", v.Size) } } else if i%37 == 0 { - if ok && v.Size > 0 { + if ok && v.Size > 0 { t.Fatal("key", i, "should have been deleted needle value", v) } } else if i%2 == 0 { - if v.Size != i { + if v.Size != i { t.Fatal("key", i, "size", v.Size) } } } for i := uint32(10 * batch); i < 100*batch; i++ { - v, ok := m.Get(Key(i)) + v, ok := m.Get(Key(i)) if i%37 == 0 { - if ok && v.Size > 0 { - t.Fatal("key", i, "should have been deleted needle value", v) - } + if ok && v.Size > 0 { + t.Fatal("key", i, "should have been deleted needle value", v) + } } else if i%2 == 0 { - if v==nil{ - t.Fatal("key", i, "missing") - } + if v == nil { + t.Fatal("key", i, "missing") + } if v.Size != i { t.Fatal("key", i, "size", v.Size) } diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index e01c27630..9ce38c1f2 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -98,3 +98,8 @@ func (nm *NeedleMap) Close() { func (nm *NeedleMap) ContentSize() uint64 { return nm.fileByteCounter } + +// iterate through all needles using the iterator function +func (nm *NeedleMap) Walk(pedestrian func(*NeedleValue) error) (err error) { + return nm.m.Walk(pedestrian) +} diff --git a/weed-fs/src/pkg/storage/needle_read_write.go b/weed-fs/src/pkg/storage/needle_read_write.go index 00844bad3..fdb09a3c6 100644 --- a/weed-fs/src/pkg/storage/needle_read_write.go +++ b/weed-fs/src/pkg/storage/needle_read_write.go @@ -2,10 +2,10 @@ package storage import ( "errors" + "fmt" "io" "os" "pkg/util" - "fmt" ) func (n *Needle) Append(w io.Writer, version Version) uint32 { @@ -62,7 +62,8 @@ func (n *Needle) Append(w io.Writer, version Version) uint32 { return n.Size } func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { - if version == Version1 { + switch version { + case Version1: bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) ret, e := r.Read(bytes) n.readNeedleHeader(bytes) @@ -72,7 +73,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { return 0, errors.New("CRC error! Data On Disk Corrupted!") } return ret, e - } else if version == Version2 { + case Version2: if size == 0 { return 0, nil } @@ -95,7 +96,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { } return ret, e } - return 0, errors.New("Unsupported Version!") + return 0, fmt.Errorf("Unsupported Version! (%d)", version) } func (n *Needle) readNeedleHeader(bytes []byte) { n.Cookie = util.BytesToUint32(bytes[0:4]) diff --git a/weed-fs/src/pkg/storage/replication_type.go b/weed-fs/src/pkg/storage/replication_type.go index 86a9d219d..0902d1016 100644 --- a/weed-fs/src/pkg/storage/replication_type.go +++ b/weed-fs/src/pkg/storage/replication_type.go @@ -1,123 +1,123 @@ package storage import ( - "errors" + "errors" ) type ReplicationType string const ( - Copy000 = ReplicationType("000") // single copy - Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center - Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center - Copy100 = ReplicationType("100") // 2 copies, each on different data center - Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center - Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center - LengthRelicationType = 6 - CopyNil = ReplicationType(255) // nil value + Copy000 = ReplicationType("000") // single copy + Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center + Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center + Copy100 = ReplicationType("100") // 2 copies, each on different data center + Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center + Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center + LengthRelicationType = 6 + CopyNil = ReplicationType(255) // nil value ) func NewReplicationTypeFromString(t string) (ReplicationType, error) { - switch t { - case "000": - return Copy000, nil - case "001": - return Copy001, nil - case "010": - return Copy010, nil - case "100": - return Copy100, nil - case "110": - return Copy110, nil - case "200": - return Copy200, nil - } - return Copy000, errors.New("Unknown Replication Type:"+t) + switch t { + case "000": + return Copy000, nil + case "001": + return Copy001, nil + case "010": + return Copy010, nil + case "100": + return Copy100, nil + case "110": + return Copy110, nil + case "200": + return Copy200, nil + } + return Copy000, errors.New("Unknown Replication Type:" + t) } func NewReplicationTypeFromByte(b byte) (ReplicationType, error) { - switch b { - case byte(000): - return Copy000, nil - case byte(001): - return Copy001, nil - case byte(010): - return Copy010, nil - case byte(100): - return Copy100, nil - case byte(110): - return Copy110, nil - case byte(200): - return Copy200, nil - } - return Copy000, errors.New("Unknown Replication Type:"+string(b)) + switch b { + case byte(000): + return Copy000, nil + case byte(001): + return Copy001, nil + case byte(010): + return Copy010, nil + case byte(100): + return Copy100, nil + case byte(110): + return Copy110, nil + case byte(200): + return Copy200, nil + } + return Copy000, errors.New("Unknown Replication Type:" + string(b)) } func (r *ReplicationType) String() string { - switch *r { - case Copy000: - return "000" - case Copy001: - return "001" - case Copy010: - return "010" - case Copy100: - return "100" - case Copy110: - return "110" - case Copy200: - return "200" - } - return "000" + switch *r { + case Copy000: + return "000" + case Copy001: + return "001" + case Copy010: + return "010" + case Copy100: + return "100" + case Copy110: + return "110" + case Copy200: + return "200" + } + return "000" } func (r *ReplicationType) Byte() byte { - switch *r { - case Copy000: - return byte(000) - case Copy001: - return byte(001) - case Copy010: - return byte(010) - case Copy100: - return byte(100) - case Copy110: - return byte(110) - case Copy200: - return byte(200) - } - return byte(000) + switch *r { + case Copy000: + return byte(000) + case Copy001: + return byte(001) + case Copy010: + return byte(010) + case Copy100: + return byte(100) + case Copy110: + return byte(110) + case Copy200: + return byte(200) + } + return byte(000) } -func (repType ReplicationType)GetReplicationLevelIndex() int { - switch repType { - case Copy000: - return 0 - case Copy001: - return 1 - case Copy010: - return 2 - case Copy100: - return 3 - case Copy110: - return 4 - case Copy200: - return 5 - } - return -1 +func (repType ReplicationType) GetReplicationLevelIndex() int { + switch repType { + case Copy000: + return 0 + case Copy001: + return 1 + case Copy010: + return 2 + case Copy100: + return 3 + case Copy110: + return 4 + case Copy200: + return 5 + } + return -1 } -func (repType ReplicationType)GetCopyCount() int { - switch repType { - case Copy000: - return 1 - case Copy001: - return 2 - case Copy010: - return 2 - case Copy100: - return 2 - case Copy110: - return 3 - case Copy200: - return 3 - } - return 0 +func (repType ReplicationType) GetCopyCount() int { + switch repType { + case Copy000: + return 1 + case Copy001: + return 2 + case Copy010: + return 2 + case Copy100: + return 2 + case Copy110: + return 3 + case Copy200: + return 3 + } + return 0 } diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index a2c5f040b..79cf65e28 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -65,13 +65,13 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error } return e } -func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error { +func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) { if s.volumes[vid] != nil { return errors.New("Volume Id " + vid.String() + " already exists!") } log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType) - s.volumes[vid] = NewVolume(s.dir, vid, replicationType) - return nil + s.volumes[vid], err = NewVolume(s.dir, vid, replicationType) + return err } func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { @@ -107,9 +107,10 @@ func (s *Store) loadExistingVolumes() { base := name[:len(name)-len(".dat")] if vid, err := NewVolumeId(base); err == nil { if s.volumes[vid] == nil { - v := NewVolume(s.dir, vid, CopyNil) - s.volumes[vid] = v - log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size()) + if v, e := NewVolume(s.dir, vid, CopyNil); e == nil { + s.volumes[vid] = v + log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size()) + } } } } diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 0220bf895..5e64d0763 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -24,9 +24,9 @@ type Volume struct { accessLock sync.Mutex } -func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) { +func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) { v = &Volume{dir: dirname, Id: id, replicaType: replicationType} - v.load() + e = v.load() return } func (v *Volume) load() error { @@ -43,6 +43,7 @@ func (v *Volume) load() error { } else { v.maybeWriteSuperBlock() } + // TODO: if .idx not exists, but .cdb exists, then use (but don't load!) that indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) if ie != nil { return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) @@ -79,21 +80,23 @@ func (v *Volume) maybeWriteSuperBlock() { v.dataFile.Write(header) } } -func (v *Volume) readSuperBlock() error { +func (v *Volume) readSuperBlock() (err error) { v.dataFile.Seek(0, 0) header := make([]byte, SuperBlockSize) if _, e := v.dataFile.Read(header); e != nil { return fmt.Errorf("cannot read superblock: %s", e) } - var err error v.version, v.replicaType, err = ParseSuperBlock(header) return err } -func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, e error) { +func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, err error) { version = Version(header[0]) - var err error + if version == 0 { + err = errors.New("Zero version impossible - bad superblock!") + return + } if replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { - e = fmt.Errorf("cannot read replica type: %s", err) + err = fmt.Errorf("cannot read replica type: %s", err) } return } @@ -221,3 +224,39 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string) func (v *Volume) ContentSize() uint64 { return v.nm.fileByteCounter } + +// Walk over the contained needles (call the function with each NeedleValue till error is returned) +func (v *Volume) WalkValues(pedestrian func(*Needle) error) error { + pedplus := func(nv *NeedleValue) (err error) { + n := new(Needle) + if nv.Offset > 0 { + v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0) + if _, err = n.Read(v.dataFile, nv.Size, v.version); err != nil { + return + } + if err = pedestrian(n); err != nil { + return + } + } + return nil + } + return v.nm.Walk(pedplus) +} + +// Walk over the keys +func (v *Volume) WalkKeys(pedestrian func(Key) error) error { + pedplus := func(nv *NeedleValue) (err error) { + if nv.Offset > 0 && nv.Key > 0 { + if err = pedestrian(nv.Key); err != nil { + return + } + } + return nil + } + return v.nm.Walk(pedplus) +} + +func (v *Volume) String() string { + return fmt.Sprintf("%d@%s:v%d:r%s", v.Id, v.dataFile.Name(), + v.Version(), v.replicaType) +} diff --git a/weed-fs/src/pkg/storage/volume_id.go b/weed-fs/src/pkg/storage/volume_id.go index bf7396673..0333c6cf0 100644 --- a/weed-fs/src/pkg/storage/volume_id.go +++ b/weed-fs/src/pkg/storage/volume_id.go @@ -1,17 +1,18 @@ package storage import ( - "strconv" + "strconv" ) type VolumeId uint32 -func NewVolumeId(vid string) (VolumeId,error) { - volumeId, err := strconv.ParseUint(vid, 10, 64) - return VolumeId(volumeId), err + +func NewVolumeId(vid string) (VolumeId, error) { + volumeId, err := strconv.ParseUint(vid, 10, 64) + return VolumeId(volumeId), err } -func (vid *VolumeId) String() string{ - return strconv.FormatUint(uint64(*vid), 10) +func (vid *VolumeId) String() string { + return strconv.FormatUint(uint64(*vid), 10) } -func (vid *VolumeId) Next() VolumeId{ - return VolumeId(uint32(*vid)+1) +func (vid *VolumeId) Next() VolumeId { + return VolumeId(uint32(*vid) + 1) } diff --git a/weed-fs/src/pkg/storage/volume_version.go b/weed-fs/src/pkg/storage/volume_version.go index da91ad038..9702ae904 100644 --- a/weed-fs/src/pkg/storage/volume_version.go +++ b/weed-fs/src/pkg/storage/volume_version.go @@ -1,12 +1,11 @@ package storage -import ( -) +import () type Version uint8 const ( - Version1 = Version(1) - Version2 = Version(2) - CurrentVersion = Version2 + Version1 = Version(1) + Version2 = Version(2) + CurrentVersion = Version2 ) diff --git a/weed-fs/src/pkg/topology/configuration_test.go b/weed-fs/src/pkg/topology/configuration_test.go index 5542d1503..35d82c058 100644 --- a/weed-fs/src/pkg/topology/configuration_test.go +++ b/weed-fs/src/pkg/topology/configuration_test.go @@ -30,13 +30,13 @@ func TestLoadConfiguration(t *testing.T) { ` c, err := NewConfiguration([]byte(confContent)) - - fmt.Printf("%s\n", c) - if err!=nil{ - t.Fatalf("unmarshal error:%s",err.Error()) + + fmt.Printf("%s\n", c) + if err != nil { + t.Fatalf("unmarshal error:%s", err.Error()) } - + if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" { - t.Fatalf("unmarshal error:%s",c) + t.Fatalf("unmarshal error:%s", c) } } diff --git a/weed-fs/src/pkg/topology/data_center.go b/weed-fs/src/pkg/topology/data_center.go index 2ec68fd4b..a3b2b7d13 100644 --- a/weed-fs/src/pkg/topology/data_center.go +++ b/weed-fs/src/pkg/topology/data_center.go @@ -1,7 +1,6 @@ package topology -import ( -) +import () type DataCenter struct { NodeImpl @@ -12,31 +11,31 @@ func NewDataCenter(id string) *DataCenter { dc.id = NodeId(id) dc.nodeType = "DataCenter" dc.children = make(map[NodeId]Node) - dc.NodeImpl.value = dc + dc.NodeImpl.value = dc return dc } func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { - for _, c := range dc.Children() { - rack := c.(*Rack) - if string(rack.Id()) == rackName { - return rack - } - } - rack := NewRack(rackName) - dc.LinkChildNode(rack) - return rack + for _, c := range dc.Children() { + rack := c.(*Rack) + if string(rack.Id()) == rackName { + return rack + } + } + rack := NewRack(rackName) + dc.LinkChildNode(rack) + return rack } -func (dc *DataCenter) ToMap() interface{}{ - m := make(map[string]interface{}) - m["Max"] = dc.GetMaxVolumeCount() - m["Free"] = dc.FreeSpace() - var racks []interface{} - for _, c := range dc.Children() { - rack := c.(*Rack) - racks = append(racks, rack.ToMap()) - } - m["Racks"] = racks - return m +func (dc *DataCenter) ToMap() interface{} { + m := make(map[string]interface{}) + m["Max"] = dc.GetMaxVolumeCount() + m["Free"] = dc.FreeSpace() + var racks []interface{} + for _, c := range dc.Children() { + rack := c.(*Rack) + racks = append(racks, rack.ToMap()) + } + m["Racks"] = racks + return m } diff --git a/weed-fs/src/pkg/topology/node_list.go b/weed-fs/src/pkg/topology/node_list.go index 1d9e1891a..3115e0213 100644 --- a/weed-fs/src/pkg/topology/node_list.go +++ b/weed-fs/src/pkg/topology/node_list.go @@ -37,14 +37,14 @@ func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) { list = append(list, n) } } - if n > len(list){ - return nil,false + if n > len(list) { + return nil, false } for i := n; i > 0; i-- { - r := rand.Intn(i) - t := list[r] - list[r] = list[i-1] - list[i-1] = t + r := rand.Intn(i) + t := list[r] + list[r] = list[i-1] + list[i-1] = t } return list[len(list)-n:], true } diff --git a/weed-fs/src/pkg/topology/node_list_test.go b/weed-fs/src/pkg/topology/node_list_test.go index 0d16a0526..2fb4fa970 100644 --- a/weed-fs/src/pkg/topology/node_list_test.go +++ b/weed-fs/src/pkg/topology/node_list_test.go @@ -1,39 +1,39 @@ package topology import ( + _ "fmt" "strconv" "testing" - _ "fmt" ) func TestXYZ(t *testing.T) { - topo := NewTopology("topo","/etc/weed.conf", "/tmp","test",234,5) + topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5) for i := 0; i < 5; i++ { dc := NewDataCenter("dc" + strconv.Itoa(i)) dc.activeVolumeCount = i dc.maxVolumeCount = 5 topo.LinkChildNode(dc) } - nl := NewNodeList(topo.Children(),nil) + nl := NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(1) - if !ret || len(picked)!=1 { - t.Errorf("need to randomly pick 1 node") - } - - picked, ret = nl.RandomlyPickN(4) - if !ret || len(picked)!=4 { - t.Errorf("need to randomly pick 4 nodes") + picked, ret := nl.RandomlyPickN(1) + if !ret || len(picked) != 1 { + t.Errorf("need to randomly pick 1 node") } - picked, ret = nl.RandomlyPickN(5) - if !ret || len(picked)!=5 { - t.Errorf("need to randomly pick 5 nodes") - } + picked, ret = nl.RandomlyPickN(4) + if !ret || len(picked) != 4 { + t.Errorf("need to randomly pick 4 nodes") + } - picked, ret = nl.RandomlyPickN(6) - if ret || len(picked)!=0 { - t.Errorf("can not randomly pick 6 nodes:", ret, picked) - } + picked, ret = nl.RandomlyPickN(5) + if !ret || len(picked) != 5 { + t.Errorf("need to randomly pick 5 nodes") + } + + picked, ret = nl.RandomlyPickN(6) + if ret || len(picked) != 0 { + t.Errorf("can not randomly pick 6 nodes:", ret, picked) + } } diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go index 1555ef682..acc34417a 100644 --- a/weed-fs/src/pkg/topology/rack.go +++ b/weed-fs/src/pkg/topology/rack.go @@ -19,13 +19,13 @@ func NewRack(id string) *Rack { } func (r *Rack) FindDataNode(ip string, port int) *DataNode { - for _, c := range r.Children() { - dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - return dn - } - } - return nil + for _, c := range r.Children() { + dn := c.(*DataNode) + if dn.MatchLocation(ip, port) { + return dn + } + } + return nil } func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { for _, c := range r.Children() { diff --git a/weed-fs/src/pkg/topology/topo_test.go b/weed-fs/src/pkg/topology/topo_test.go index 83356b38c..71a901c8e 100644 --- a/weed-fs/src/pkg/topology/topo_test.go +++ b/weed-fs/src/pkg/topology/topo_test.go @@ -78,7 +78,7 @@ func setup(topologyLayout string) *Topology { } //need to connect all nodes first before server adding volumes - topo := NewTopology("mynetwork","/etc/weed.conf","/tmp","test",234,5) + topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5) mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := NewDataCenter(dcKey) @@ -94,7 +94,7 @@ func setup(topologyLayout string) *Topology { rack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) - vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion} + vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) @@ -119,9 +119,9 @@ func TestRemoveDataCenter(t *testing.T) { func TestReserveOneVolume(t *testing.T) { topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - rand.Seed(1) + rand.Seed(time.Now().UnixNano()) + rand.Seed(1) ret, node, vid := topo.RandomlyReserveOneVolume() - fmt.Println("assigned :", ret, ", node :", node,", volume id:", vid) + fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid) } diff --git a/weed-fs/src/pkg/topology/topology_compact.go b/weed-fs/src/pkg/topology/topology_compact.go index 050fe4cd8..dee6514f4 100644 --- a/weed-fs/src/pkg/topology/topology_compact.go +++ b/weed-fs/src/pkg/topology/topology_compact.go @@ -101,10 +101,10 @@ type VacuumVolumeResult struct { func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) { values := make(url.Values) values.Add("volume", vid.String()) - values.Add("garbageThreshold", garbageThreshold) + values.Add("garbageThreshold", garbageThreshold) jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values) if err != nil { - fmt.Println("parameters:",values) + fmt.Println("parameters:", values) return err, false } var ret VacuumVolumeResult diff --git a/weed-fs/src/pkg/topology/topology_event_handling.go b/weed-fs/src/pkg/topology/topology_event_handling.go index b33b4d768..debedc3d3 100644 --- a/weed-fs/src/pkg/topology/topology_event_handling.go +++ b/weed-fs/src/pkg/topology/topology_event_handling.go @@ -52,14 +52,14 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { vl := t.GetVolumeLayout(v.RepType) vl.SetVolumeUnavailable(dn, v.Id) } - dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) + dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) dn.Parent().UnlinkChildNode(dn.Id()) } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.RepType) + vl := t.GetVolumeLayout(v.RepType) if vl.isWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) } diff --git a/weed-fs/src/pkg/topology/topology_map.go b/weed-fs/src/pkg/topology/topology_map.go index 9ccf08ae3..b416ee943 100644 --- a/weed-fs/src/pkg/topology/topology_map.go +++ b/weed-fs/src/pkg/topology/topology_map.go @@ -1,7 +1,6 @@ package topology -import ( -) +import () func (t *Topology) ToMap() interface{} { m := make(map[string]interface{}) diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go index 64d8cdf43..507a240b5 100644 --- a/weed-fs/src/pkg/topology/volume_location.go +++ b/weed-fs/src/pkg/topology/volume_location.go @@ -15,7 +15,7 @@ func (dnll *VolumeLocationList) Head() *DataNode { } func (dnll *VolumeLocationList) Length() int { - return len(dnll.list) + return len(dnll.list) } func (dnll *VolumeLocationList) Add(loc *DataNode) bool { @@ -29,13 +29,13 @@ func (dnll *VolumeLocationList) Add(loc *DataNode) bool { } func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { - for i, dnl := range dnll.list { - if loc.Ip == dnl.Ip && loc.Port == dnl.Port { - dnll.list = append(dnll.list[:i],dnll.list[i+1:]...) - return true - } - } - return false + for i, dnl := range dnll.list { + if loc.Ip == dnl.Ip && loc.Port == dnl.Port { + dnll.list = append(dnll.list[:i], dnll.list[i+1:]...) + return true + } + } + return false } func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { diff --git a/weed-fs/src/pkg/util/bytes.go b/weed-fs/src/pkg/util/bytes.go index 177da06db..6cc3d7018 100644 --- a/weed-fs/src/pkg/util/bytes.go +++ b/weed-fs/src/pkg/util/bytes.go @@ -1,34 +1,33 @@ package util -func BytesToUint64(b []byte)(v uint64){ - length := uint(len(b)) - for i :=uint(0);i>(i*8)) - } +func Uint64toBytes(b []byte, v uint64) { + for i := uint(0); i < 8; i++ { + b[7-i] = byte(v >> (i * 8)) + } } -func Uint32toBytes(b []byte, v uint32){ - for i :=uint(0);i<4;i++ { - b[3-i] = byte(v>>(i*8)) - } +func Uint32toBytes(b []byte, v uint32) { + for i := uint(0); i < 4; i++ { + b[3-i] = byte(v >> (i * 8)) + } } -func Uint8toBytes(b []byte, v uint8){ - b[0] = byte(v) +func Uint8toBytes(b []byte, v uint8) { + b[0] = byte(v) } - diff --git a/weed-fs/src/pkg/util/parse.go b/weed-fs/src/pkg/util/parse.go index 6a4350e72..930da9522 100644 --- a/weed-fs/src/pkg/util/parse.go +++ b/weed-fs/src/pkg/util/parse.go @@ -1,16 +1,16 @@ package util import ( - "strconv" + "strconv" ) -func ParseInt(text string, defaultValue int) int{ - count, parseError := strconv.ParseUint(text,10,64) - if parseError!=nil { - if len(text)>0{ - return 0 - } - return defaultValue - } - return int(count) +func ParseInt(text string, defaultValue int) int { + count, parseError := strconv.ParseUint(text, 10, 64) + if parseError != nil { + if len(text) > 0 { + return 0 + } + return defaultValue + } + return int(count) } diff --git a/weed-fs/src/pkg/util/post.go b/weed-fs/src/pkg/util/post.go index f643faa6b..6e6ab0003 100644 --- a/weed-fs/src/pkg/util/post.go +++ b/weed-fs/src/pkg/util/post.go @@ -16,7 +16,7 @@ func Post(url string, values url.Values) ([]byte, error) { defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) if err != nil { - log.Println("read post result from", url, err) + log.Println("read post result from", url, err) return nil, err } return b, nil From bf0ccf346198a65e0321b3cedfb25ef5dad73e2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tam=C3=A1s=20Gul=C3=A1csi?= Date: Sat, 12 Jan 2013 19:56:47 +0100 Subject: [PATCH 4/7] add CdbMap --- weed-fs/src/pkg/storage/cdb_map.go | 107 +++++++++++++++++++++++ weed-fs/src/pkg/storage/compact_map.go | 4 +- weed-fs/src/pkg/storage/needle_map.go | 112 ++++++++++++++++++------- weed-fs/src/pkg/storage/volume.go | 4 +- 4 files changed, 195 insertions(+), 32 deletions(-) create mode 100644 weed-fs/src/pkg/storage/cdb_map.go diff --git a/weed-fs/src/pkg/storage/cdb_map.go b/weed-fs/src/pkg/storage/cdb_map.go new file mode 100644 index 000000000..b80b3cde9 --- /dev/null +++ b/weed-fs/src/pkg/storage/cdb_map.go @@ -0,0 +1,107 @@ +package storage + +import ( + "github.com/tgulacsi/go-cdb" + "io" + "log" + "os" + "pkg/util" + "strings" +) + +type CdbMap struct { + db *cdb.Cdb + transient []byte + Filename string +} + +// Opens the CDB file and servers as a needle map +func NewCdbMap(filename string) (*CdbMap, error) { + m, err := cdb.Open(filename) + if err != nil { + return nil, err + } + return &CdbMap{db: m, transient: make([]byte, 8), + Filename: filename}, nil +} + +// writes the content of the index file to a CDB and returns that +func NewCdbMapFromIndex(indexFile *os.File) (*CdbMap, error) { + nm := indexFile.Name() + nm = nm[strings.LastIndex(nm, ".")+1:] + "cdb" + + var ( + key uint64 + offset uint32 + ok bool + ) + deleted := make(map[uint64]bool, 16) + gatherDeletes := func(buf []byte) error { + key = util.BytesToUint64(buf[:8]) + offset = util.BytesToUint32(buf[8:12]) + if offset > 0 { + if _, ok = deleted[key]; ok { //undelete + delete(deleted, key) + } + } else { + deleted[key] = true + } + return nil + } + if err := readIndexFile(indexFile, gatherDeletes); err != nil { + return nil, err + } + + w, err := cdb.NewWriter(nm) + if err != nil { + return nil, err + } + iterFun := func(buf []byte) error { + key = util.BytesToUint64(buf[:8]) + if _, ok = deleted[key]; !ok { + w.PutPair(buf[:8], buf[8:16]) + } + return nil + } + indexFile.Seek(0, 0) + err = readIndexFile(indexFile, iterFun) + w.Close() + if err != nil { + return nil, err + } + + return NewCdbMap(nm) +} + +func (m *CdbMap) Get(key Key) (element *NeedleValue, ok bool) { + util.Uint64toBytes(m.transient, uint64(key)) + data, err := m.db.Data(m.transient) + if err != nil { + if err == io.EOF { + return nil, false + } + log.Printf("error getting %s: %s", key, err) + return nil, false + } + return &NeedleValue{Key: key, + Offset: util.BytesToUint32(data[:4]), + Size: util.BytesToUint32(data[4:8]), + }, true +} + +func (m *CdbMap) Walk(pedestrian func(*NeedleValue) error) (err error) { + r, err := os.Open(m.Filename) + if err != nil { + return err + } + defer r.Close() + + iterFunc := func(elt cdb.Element) error { + return pedestrian(&NeedleValue{ + Key: Key(util.BytesToUint64(elt.Key[:8])), + Offset: util.BytesToUint32(elt.Data[:4]), + Size: util.BytesToUint32(elt.Data[4:8]), + }) + } + return cdb.DumpMap(r, iterFunc) +} diff --git a/weed-fs/src/pkg/storage/compact_map.go b/weed-fs/src/pkg/storage/compact_map.go index 90ed42198..61cc2c841 100644 --- a/weed-fs/src/pkg/storage/compact_map.go +++ b/weed-fs/src/pkg/storage/compact_map.go @@ -109,8 +109,8 @@ type CompactMap struct { list []CompactSection } -func NewCompactMap() CompactMap { - return CompactMap{} +func NewCompactMap() *CompactMap { + return &CompactMap{} } func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 { diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index 9ce38c1f2..9d7369509 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -1,6 +1,7 @@ package storage import ( + "io" "log" "os" "pkg/util" @@ -8,7 +9,8 @@ import ( type NeedleMap struct { indexFile *os.File - m CompactMap + m MapGetSetter // modifiable map + fm MapGetter // frozen map //transient bytes []byte @@ -19,52 +21,106 @@ type NeedleMap struct { fileByteCounter uint64 } +// Map interface for frozen maps +type MapGetter interface { + Get(key Key) (element *NeedleValue, ok bool) + Walk(pedestrian func(*NeedleValue) error) error +} + +// Modifiable map interface +type MapSetter interface { + Set(key Key, offset, size uint32) (oldsize uint32) + Delete(key Key) uint32 +} + +// Settable and gettable map +type MapGetSetter interface { + MapGetter + MapSetter +} + +// New in-memory needle map, backed by "file" index file func NewNeedleMap(file *os.File) *NeedleMap { - nm := &NeedleMap{ + return &NeedleMap{ m: NewCompactMap(), bytes: make([]byte, 16), indexFile: file, } - return nm +} + +// Nes frozen (on-disk, not modifiable(!)) needle map +func NewFrozenNeedleMap(file *os.File) (*NeedleMap, error) { + fm, err := NewCdbMapFromIndex(file) + if err != nil { + return nil, err + } + return &NeedleMap{ + fm: fm, + bytes: make([]byte, 16), + }, nil } const ( RowsToRead = 1024 ) -func LoadNeedleMap(file *os.File) *NeedleMap { +func LoadNeedleMap(file *os.File) (*NeedleMap, error) { nm := NewNeedleMap(file) - bytes := make([]byte, 16*RowsToRead) - count, e := nm.indexFile.Read(bytes) - if count > 0 { - fstat, _ := file.Stat() - log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) + + var ( + key uint64 + offset, size, oldSize uint32 + ) + iterFun := func(buf []byte) error { + key = util.BytesToUint64(buf[:8]) + offset = util.BytesToUint32(buf[8:12]) + size = util.BytesToUint32(buf[12:16]) + nm.fileCounter++ + nm.fileByteCounter = nm.fileByteCounter + uint64(size) + if offset > 0 { + oldSize = nm.m.Set(Key(key), offset, size) + //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) + if oldSize > 0 { + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) + } + } else { + nm.m.Delete(Key(key)) + //log.Println("removing key", key) + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(size) + } + + return nil + } + if err := readIndexFile(file, iterFun); err != nil { + return nil, err + } + return nm, nil +} + +// calls iterFun with each row (raw 16 bytes) +func readIndexFile(indexFile *os.File, iterFun func([]byte) error) error { + buf := make([]byte, 16*RowsToRead) + count, e := io.ReadAtLeast(indexFile, buf, 16) + if e != nil && count > 0 { + fstat, err := indexFile.Stat() + if err != nil { + log.Println("ERROR stating %s: %s", indexFile, err) + } else { + log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) + } } for count > 0 && e == nil { for i := 0; i < count; i += 16 { - key := util.BytesToUint64(bytes[i : i+8]) - offset := util.BytesToUint32(bytes[i+8 : i+12]) - size := util.BytesToUint32(bytes[i+12 : i+16]) - nm.fileCounter++ - nm.fileByteCounter = nm.fileByteCounter + uint64(size) - if offset > 0 { - oldSize := nm.m.Set(Key(key), offset, size) - //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) - if oldSize > 0 { - nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) - } - } else { - nm.m.Delete(Key(key)) - //log.Println("removing key", key) - nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + uint64(size) + if e = iterFun(buf[i : i+16]); e != nil { + return e } } - count, e = nm.indexFile.Read(bytes) + count, e = io.ReadAtLeast(indexFile, buf, 16) } - return nm + return nil } func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 5e64d0763..71dfb5aee 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -48,8 +48,8 @@ func (v *Volume) load() error { if ie != nil { return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) } - v.nm = LoadNeedleMap(indexFile) - return nil + v.nm, e = LoadNeedleMap(indexFile) + return e } func (v *Volume) Version() Version { return v.version From dd685fdd8d8ac6d28dce0d25b72115e3315a30a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tam=C3=A1s=20Gul=C3=A1csi?= Date: Sun, 13 Jan 2013 17:07:38 +0100 Subject: [PATCH 5/7] add Frozen attribute to VolumeInfo --- weed-fs/src/cmd/weed/master.go | 10 ++++++++-- weed-fs/src/pkg/storage/needle_map.go | 16 +++++++++++++++- weed-fs/src/pkg/storage/store.go | 17 ++++++++++++++--- weed-fs/src/pkg/storage/volume.go | 13 +++++++++++++ weed-fs/src/pkg/storage/volume_info.go | 1 + weed-fs/src/pkg/topology/volume_layout.go | 5 ++++- 6 files changed, 55 insertions(+), 7 deletions(-) diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go index c60974a67..151ae31fc 100644 --- a/weed-fs/src/cmd/weed/master.go +++ b/weed-fs/src/cmd/weed/master.go @@ -107,8 +107,14 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) { if ip == "" { ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] } - port, _ := strconv.Atoi(r.FormValue("port")) - maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) + port, err := strconv.Atoi(r.FormValue("port")) + if err != nil { + log.Printf("ERROR bad port number %s: %s", r.FormValue("port"), err) + } + maxVolumeCount, err := strconv.Atoi(r.FormValue("maxVolumeCount")) + if err != nil { + log.Printf("ERROR bad maxVolumeCount %s: %s", r.FormValue("maxVolumeCount"), err) + } s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") publicUrl := r.FormValue("publicUrl") volumes := new([]storage.VolumeInfo) diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index 9d7369509..f103d10d8 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -1,6 +1,7 @@ package storage import ( + "errors" "io" "log" "os" @@ -60,10 +61,16 @@ func NewFrozenNeedleMap(file *os.File) (*NeedleMap, error) { }, nil } +func (nm NeedleMap) IsFrozen() bool { + return nm.m == nil && nm.fm != nil +} + const ( RowsToRead = 1024 ) +var MapIsFrozen = errors.New("Map is frozen!") + func LoadNeedleMap(file *os.File) (*NeedleMap, error) { nm := NewNeedleMap(file) @@ -124,6 +131,9 @@ func readIndexFile(indexFile *os.File, iterFun func([]byte) error) error { } func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { + if nm.IsFrozen() { + return 0, MapIsFrozen + } oldSize := nm.m.Set(Key(key), offset, size) util.Uint64toBytes(nm.bytes[0:8], key) util.Uint32toBytes(nm.bytes[8:12], offset) @@ -140,13 +150,17 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { element, ok = nm.m.Get(Key(key)) return } -func (nm *NeedleMap) Delete(key uint64) { +func (nm *NeedleMap) Delete(key uint64) error { + if nm.IsFrozen() { + return MapIsFrozen + } nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key))) util.Uint64toBytes(nm.bytes[0:8], key) util.Uint32toBytes(nm.bytes[8:12], 0) util.Uint32toBytes(nm.bytes[12:16], 0) nm.indexFile.Write(nm.bytes) nm.deletionCounter++ + return nil } func (nm *NeedleMap) Close() { nm.indexFile.Close() diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index 79cf65e28..b06c29902 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -120,8 +120,16 @@ func (s *Store) loadExistingVolumes() { func (s *Store) Status() []*VolumeInfo { var stats []*VolumeInfo for k, v := range s.volumes { - s := new(VolumeInfo) - s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.ContentSize(), v.replicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter + s := &VolumeInfo{ + Id: VolumeId(k), + Size: v.ContentSize(), + RepType: v.replicaType, + Version: v.Version(), + FileCount: v.nm.fileCounter, + DeleteCount: v.nm.deletionCounter, + DeletedByteCount: v.nm.deletionByteCounter, + Frozen: !v.IsWritable(), + } stats = append(stats, s) } return stats @@ -134,6 +142,8 @@ type JoinResult struct { func (s *Store) SetMaster(mserver string) { s.masterNode = mserver } + +// call master's /dir/join func (s *Store) Join() error { stats := new([]*VolumeInfo) for k, v := range s.volumes { @@ -171,7 +181,8 @@ func (s *Store) Close() { func (s *Store) Write(i VolumeId, n *Needle) uint32 { if v := s.volumes[i]; v != nil { size := v.write(n) - if s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() { + if s.volumeSizeLimit < v.ContentSize()+uint64(size) && + s.volumeSizeLimit >= v.ContentSize() { log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit) s.Join() } diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 71dfb5aee..6a79d6c40 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -3,6 +3,7 @@ package storage import ( "errors" "fmt" + "log" "os" "path" "sync" @@ -64,6 +65,18 @@ func (v *Volume) Size() int64 { fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error()) return -1 } + +// a volume is writable, if its data file is writable and the index is not frozen +func (v *Volume) IsWritable() bool { + stat, e := v.dataFile.Stat() + if e != nil { + log.Printf("Failed to read file permission %s %s\n", v.dataFile.Name(), e.Error()) + return false + } + // 4 for r, 2 for w, 1 for x + return stat.Mode().Perm()&0222 > 0 && !v.nm.IsFrozen() +} + func (v *Volume) Close() { v.accessLock.Lock() defer v.accessLock.Unlock() diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go index e4c5f6ec4..845301670 100644 --- a/weed-fs/src/pkg/storage/volume_info.go +++ b/weed-fs/src/pkg/storage/volume_info.go @@ -10,4 +10,5 @@ type VolumeInfo struct { FileCount int DeleteCount int DeletedByteCount uint64 + Frozen bool } diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go index 23802ca81..141a40072 100644 --- a/weed-fs/src/pkg/topology/volume_layout.go +++ b/weed-fs/src/pkg/topology/volume_layout.go @@ -45,7 +45,9 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { - return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion + return !v.Frozen && + uint64(v.Size) < vl.volumeSizeLimit && + v.Version == storage.CurrentVersion } func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { @@ -92,6 +94,7 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { return false } } + // FIXME: how to refuse if volume is unwritable/frozen? fmt.Println("Volume", vid, "becomes writable") vl.writables = append(vl.writables, vid) if len(vl.writables) > 1 { From f262fed19784ad85d7cfef985f3dfcc09bd7180c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tam=C3=A1s=20Gul=C3=A1csi?= Date: Mon, 14 Jan 2013 21:42:35 +0100 Subject: [PATCH 6/7] add "freeze" subcommand to volume --- weed-fs/src/cmd/weed/fix.go | 27 +++++++++--- weed-fs/src/cmd/weed/weed.go | 1 + weed-fs/src/pkg/storage/cdb_map.go | 7 ++- weed-fs/src/pkg/storage/needle_map.go | 39 +++++++++++++++-- weed-fs/src/pkg/storage/store.go | 9 ++++ weed-fs/src/pkg/storage/volume.go | 14 ++++-- weed-fs/src/pkg/util/file.go | 63 +++++++++++++++++++++++++++ 7 files changed, 144 insertions(+), 16 deletions(-) create mode 100644 weed-fs/src/pkg/util/file.go diff --git a/weed-fs/src/cmd/weed/fix.go b/weed-fs/src/cmd/weed/fix.go index 7bed70edd..53b6cfc75 100644 --- a/weed-fs/src/cmd/weed/fix.go +++ b/weed-fs/src/cmd/weed/fix.go @@ -1,6 +1,7 @@ package main import ( + "errors" "log" "os" "path" @@ -33,24 +34,36 @@ func runFix(cmd *Command, args []string) bool { } fileName := strconv.Itoa(*volumeId) - dataFile, e := os.OpenFile(path.Join(*dir, fileName+".dat"), os.O_RDONLY, 0644) + + if err := createIndexFile(path.Join(*dir, fileName+".dat")); err != nil { + log.Fatalf("[ERROR] " + err.Error()) + } + return true +} + +func createIndexFile(datafn string) error { + dataFile, e := os.OpenFile(datafn, os.O_RDONLY, 0644) if e != nil { - log.Fatalf("Read Volume [ERROR] %s\n", e) + return errors.New("Read Volume " + e.Error()) } defer dataFile.Close() - indexFile, ie := os.OpenFile(path.Join(*dir, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644) + // log.Printf("dataFile=%s", dataFile) + indexFile, ie := os.OpenFile(datafn[:len(datafn)-4]+".idx", os.O_WRONLY|os.O_CREATE, 0644) if ie != nil { - log.Fatalf("Create Volume Index [ERROR] %s\n", ie) + return errors.New("Create Volume Index " + ie.Error()) } defer indexFile.Close() dataFile.Seek(0, 0) header := make([]byte, storage.SuperBlockSize) if _, e := dataFile.Read(header); e != nil { - log.Fatalf("cannot read superblock: %s", e) + return errors.New("cannot read superblock: " + e.Error()) } - ver, _, _ := storage.ParseSuperBlock(header) + ver, _, e := storage.ParseSuperBlock(header) + if e != nil { + return errors.New("cannot parse superblock: " + e.Error()) + } n, rest := storage.ReadNeedleHeader(dataFile, ver) dataFile.Seek(int64(rest), 1) @@ -66,5 +79,5 @@ func runFix(cmd *Command, args []string) bool { n, rest = storage.ReadNeedleHeader(dataFile, ver) dataFile.Seek(int64(rest), 1) } - return true + return nil } diff --git a/weed-fs/src/cmd/weed/weed.go b/weed-fs/src/cmd/weed/weed.go index e2eb41ced..685027fb6 100644 --- a/weed-fs/src/cmd/weed/weed.go +++ b/weed-fs/src/cmd/weed/weed.go @@ -21,6 +21,7 @@ var server *string var commands = []*Command{ cmdFix, + cmdFreeze, cmdMaster, cmdUpload, cmdShell, diff --git a/weed-fs/src/pkg/storage/cdb_map.go b/weed-fs/src/pkg/storage/cdb_map.go index b80b3cde9..bffb2b9ea 100644 --- a/weed-fs/src/pkg/storage/cdb_map.go +++ b/weed-fs/src/pkg/storage/cdb_map.go @@ -28,7 +28,7 @@ func NewCdbMap(filename string) (*CdbMap, error) { // writes the content of the index file to a CDB and returns that func NewCdbMapFromIndex(indexFile *os.File) (*CdbMap, error) { nm := indexFile.Name() - nm = nm[strings.LastIndex(nm, ".")+1:] + "cdb" + nm = nm[:strings.LastIndex(nm, ".")+1] + "cdb" var ( key uint64 @@ -52,12 +52,14 @@ func NewCdbMapFromIndex(indexFile *os.File) (*CdbMap, error) { return nil, err } + log.Printf("deleted: %s\nnm=%s", deleted, nm) w, err := cdb.NewWriter(nm) if err != nil { return nil, err } iterFun := func(buf []byte) error { key = util.BytesToUint64(buf[:8]) + log.Printf("iter key=%d", key) if _, ok = deleted[key]; !ok { w.PutPair(buf[:8], buf[8:16]) } @@ -69,6 +71,9 @@ func NewCdbMapFromIndex(indexFile *os.File) (*CdbMap, error) { if err != nil { return nil, err } + if err = util.SetFilePerm(nil, nm, 0444, -1); err != nil { + return nil, err + } return NewCdbMap(nm) } diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index f103d10d8..c1b84358c 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -6,6 +6,7 @@ import ( "log" "os" "pkg/util" + "strings" ) type NeedleMap struct { @@ -50,10 +51,40 @@ func NewNeedleMap(file *os.File) *NeedleMap { } // Nes frozen (on-disk, not modifiable(!)) needle map -func NewFrozenNeedleMap(file *os.File) (*NeedleMap, error) { - fm, err := NewCdbMapFromIndex(file) - if err != nil { - return nil, err +func NewFrozenNeedleMap(fileName string) (*NeedleMap, error) { + if strings.HasSuffix(fileName, ".dat") { + fileName = fileName[:4] + } + var ( + fm *CdbMap + indexExists bool + ) + file, err := os.Open(fileName + ".idx") + if err != nil && os.IsNotExist(err) { + if fm, err = NewCdbMap(fileName + ".cdb"); err != nil { + log.Printf("error opening %s.cdb: %s", fileName, err) + fm = nil + } else { + if dstat, e := os.Stat(fileName + ".dat"); e == nil { + if cstat, e := os.Stat(fileName + ".cdb"); e == nil { + if cstat.ModTime().Before(dstat.ModTime()) { + return nil, errors.New("CDB file " + fileName + + ".cdb is older than data file " + fileName + ".dat!") + } + } + } + } + } else { + indexExists = true + } + if fm == nil { + fm, err = NewCdbMapFromIndex(file) + if err != nil { + return nil, err + } + if indexExists { + os.Remove(fileName + ".idx") + } } return &NeedleMap{ fm: fm, diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index b06c29902..161b855e3 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -33,6 +33,8 @@ func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *S log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes") return } + +// adds a volume to the store func (s *Store) AddVolume(volumeListString string, replicationType string) error { rt, e := NewReplicationTypeFromString(replicationType) if e != nil { @@ -74,6 +76,7 @@ func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err er return err } +// checks whether compaction is needed func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { vid, err := NewVolumeId(volumeIdString) if err != nil { @@ -85,6 +88,8 @@ func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString } return nil, garbageThreshold < s.volumes[vid].garbageLevel() } + +// compacts the volume func (s *Store) CompactVolume(volumeIdString string) error { vid, err := NewVolumeId(volumeIdString) if err != nil { @@ -92,6 +97,8 @@ func (s *Store) CompactVolume(volumeIdString string) error { } return s.volumes[vid].compact() } + +// commits the compaction func (s *Store) CommitCompactVolume(volumeIdString string) error { vid, err := NewVolumeId(volumeIdString) if err != nil { @@ -99,6 +106,8 @@ func (s *Store) CommitCompactVolume(volumeIdString string) error { } return s.volumes[vid].commitCompact() } + +// reads directory and loads volumes func (s *Store) loadExistingVolumes() { if dirs, err := ioutil.ReadDir(s.dir); err == nil { for _, dir := range dirs { diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 6a79d6c40..d694f27ba 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -6,6 +6,7 @@ import ( "log" "os" "path" + "pkg/util" "sync" ) @@ -45,13 +46,18 @@ func (v *Volume) load() error { v.maybeWriteSuperBlock() } // TODO: if .idx not exists, but .cdb exists, then use (but don't load!) that - indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) - if ie != nil { - return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + if !util.FileIsWritable(v.dataFile.Name()) { //Read-Only + v.nm, e = NewFrozenNeedleMap(fileName) + } else { + indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) + if ie != nil { + return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + } + v.nm, e = LoadNeedleMap(indexFile) } - v.nm, e = LoadNeedleMap(indexFile) return e } + func (v *Volume) Version() Version { return v.version } diff --git a/weed-fs/src/pkg/util/file.go b/weed-fs/src/pkg/util/file.go new file mode 100644 index 000000000..bf3ea66de --- /dev/null +++ b/weed-fs/src/pkg/util/file.go @@ -0,0 +1,63 @@ +package util + +import ( + "errors" + "log" + "os" +) + +// sets file (fh if not nil, otherwise fileName) permission to mask +// it will +// AND with the permission iff direction < 0 +// OR with the permission iff direction > 0 +// otherwise it will SET the permission to the mask +func SetFilePerm(fh *os.File, fileName string, mask os.FileMode, direction int8) (err error) { + var stat os.FileInfo + if fh == nil { + stat, err = os.Stat(fileName) + } else { + stat, err = fh.Stat() + } + if err != nil { + return err + } + + mode := stat.Mode() & ^os.ModePerm + // log.Printf("mode1=%d mask=%d", mode, mask) + if direction == 0 { + mode |= mask + } else if direction > 0 { + mode |= stat.Mode().Perm() | mask + } else { + mode |= stat.Mode().Perm() & mask + } + log.Printf("pmode=%d operm=%d => nmode=%d nperm=%d", + stat.Mode(), stat.Mode()&os.ModePerm, + mode, mode&os.ModePerm) + if mode == 0 { + return errors.New("Zero FileMode") + } + if fh == nil { + err = os.Chmod(fileName, mode) + } else { + err = fh.Chmod(mode) + } + return err +} + +// returns whether the filename exists - errors doesn't mean not exists! +func FileExists(fileName string) bool { + if _, e := os.Stat(fileName); e != nil && os.IsNotExist(e) { + return false + } + return true +} + +// returns whether the filename is POSSIBLY writable +//- whether it has some kind of writable bit set +func FileIsWritable(fileName string) bool { + if stat, e := os.Stat(fileName); e == nil { + return stat.Mode().Perm()&0222 > 0 + } + return false +} From 92ffba2ab9561c066ec12379f2288f2a3ea1d9c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tam=C3=A1s=20Gul=C3=A1csi?= Date: Mon, 14 Jan 2013 22:18:00 +0100 Subject: [PATCH 7/7] fix errors with frozen volume loading --- weed-fs/src/pkg/storage/needle_map.go | 11 +++++++++-- weed-fs/src/pkg/storage/store.go | 4 +++- weed-fs/src/pkg/storage/volume.go | 9 ++++++++- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index c1b84358c..b173eb47f 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -178,7 +178,11 @@ func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { return nm.indexFile.Write(nm.bytes) } func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { - element, ok = nm.m.Get(Key(key)) + if nm.m != nil { + element, ok = nm.m.Get(Key(key)) + } else { + element, ok = nm.fm.Get(Key(key)) + } return } func (nm *NeedleMap) Delete(key uint64) error { @@ -202,5 +206,8 @@ func (nm *NeedleMap) ContentSize() uint64 { // iterate through all needles using the iterator function func (nm *NeedleMap) Walk(pedestrian func(*NeedleValue) error) (err error) { - return nm.m.Walk(pedestrian) + if nm.m != nil { + return nm.m.Walk(pedestrian) + } + return nm.fm.Walk(pedestrian) } diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index 161b855e3..d9e94ee56 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -118,7 +118,9 @@ func (s *Store) loadExistingVolumes() { if s.volumes[vid] == nil { if v, e := NewVolume(s.dir, vid, CopyNil); e == nil { s.volumes[vid] = v - log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size()) + log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size(), "frozen?", !v.IsWritable()) + } else { + log.Println("ERROR loading volume", vid, "in dir", s.dir, ":", e.Error()) } } } diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index d694f27ba..9a7c33a42 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -36,7 +36,14 @@ func (v *Volume) load() error { fileName := path.Join(v.dir, v.Id.String()) v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) if e != nil { - return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + if os.IsPermission(e) { + if util.FileExists(fileName + ".cdb") { + v.dataFile, e = os.Open(fileName + ".dat") + } + } + if e != nil { + return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + } } if v.replicaType == CopyNil { if e = v.readSuperBlock(); e != nil {