mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
parent
d1494ea786
commit
bf9c4ed033
|
@ -1,96 +0,0 @@
|
||||||
// Copyright Tamás Gulácsi 2013 All rights reserved
|
|
||||||
// Use of this source is governed by the same rules as the weed-fs library.
|
|
||||||
// If this would be ambigous, than Apache License 2.0 has to be used.
|
|
||||||
//
|
|
||||||
// dump dumps the files of a volume to tar or unique files.
|
|
||||||
// Each file will have id#mimetype#original_name file format
|
|
||||||
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"archive/tar"
|
|
||||||
"bytes"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
// "io"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"pkg/storage"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
volumePath = flag.String("dir", "/tmp", "volume directory")
|
|
||||||
volumeId = flag.Int("id", 0, "volume Id")
|
|
||||||
dest = flag.String("out", "-", "output path. Produces tar if path ends with .tar; creates files otherwise.")
|
|
||||||
tarFh *tar.Writer
|
|
||||||
tarHeader tar.Header
|
|
||||||
counter int
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
var err error
|
|
||||||
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
if *dest == "-" {
|
|
||||||
*dest = ""
|
|
||||||
}
|
|
||||||
if *dest == "" || strings.HasSuffix(*dest, ".tar") {
|
|
||||||
var fh *os.File
|
|
||||||
if *dest == "" {
|
|
||||||
fh = os.Stdout
|
|
||||||
} else {
|
|
||||||
if fh, err = os.Create(*dest); err != nil {
|
|
||||||
log.Printf("cannot open output tar %s: %s", *dest, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
defer fh.Close()
|
|
||||||
tarFh = tar.NewWriter(fh)
|
|
||||||
defer tarFh.Close()
|
|
||||||
t := time.Now()
|
|
||||||
tarHeader = tar.Header{Mode: 0644,
|
|
||||||
ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(),
|
|
||||||
Typeflag: tar.TypeReg,
|
|
||||||
AccessTime: t, ChangeTime: t}
|
|
||||||
}
|
|
||||||
|
|
||||||
v, err := storage.NewVolume(*volumePath, storage.VolumeId(*volumeId), storage.CopyNil)
|
|
||||||
if v == nil || v.Version() == 0 || err != nil {
|
|
||||||
log.Printf("cannot load volume %d from %s (%s): %s", *volumeId, *volumePath, v, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Printf("volume: %s (ver. %d)", v, v.Version())
|
|
||||||
if err := v.WalkValues(walker); err != nil {
|
|
||||||
log.Printf("error while walking: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("%d files written.", counter)
|
|
||||||
}
|
|
||||||
|
|
||||||
func walker(n *storage.Needle) (err error) {
|
|
||||||
// log.Printf("Id=%d Size=%d Name=%s mime=%s", n.Id, n.Size, n.Name, n.Mime)
|
|
||||||
nm := fmt.Sprintf("%d#%s#%s", n.Id, bytes.Replace(n.Mime, []byte{'/'}, []byte{'_'}, -1), n.Name)
|
|
||||||
// log.Print(nm)
|
|
||||||
if tarFh != nil {
|
|
||||||
tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data))
|
|
||||||
if err = tarFh.WriteHeader(&tarHeader); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
_, err = tarFh.Write(n.Data)
|
|
||||||
} else {
|
|
||||||
if fh, e := os.Create(*dest + "/" + nm); e != nil {
|
|
||||||
return e
|
|
||||||
} else {
|
|
||||||
defer fh.Close()
|
|
||||||
_, err = fh.Write(n.Data)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
counter++
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
|
@ -1,52 +1,53 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Command struct {
|
type Command struct {
|
||||||
// Run runs the command.
|
// Run runs the command.
|
||||||
// The args are the arguments after the command name.
|
// The args are the arguments after the command name.
|
||||||
Run func(cmd *Command, args []string) bool
|
Run func(cmd *Command, args []string) bool
|
||||||
|
|
||||||
// UsageLine is the one-line usage message.
|
// UsageLine is the one-line usage message.
|
||||||
// The first word in the line is taken to be the command name.
|
// The first word in the line is taken to be the command name.
|
||||||
UsageLine string
|
UsageLine string
|
||||||
|
|
||||||
// Short is the short description shown in the 'go help' output.
|
// Short is the short description shown in the 'go help' output.
|
||||||
Short string
|
Short string
|
||||||
|
|
||||||
// Long is the long message shown in the 'go help <this-command>' output.
|
// Long is the long message shown in the 'go help <this-command>' output.
|
||||||
Long string
|
Long string
|
||||||
|
|
||||||
|
// Flag is a set of flags specific to this command.
|
||||||
|
Flag flag.FlagSet
|
||||||
|
|
||||||
// Flag is a set of flags specific to this command.
|
|
||||||
Flag flag.FlagSet
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Name returns the command's name: the first word in the usage line.
|
// Name returns the command's name: the first word in the usage line.
|
||||||
func (c *Command) Name() string {
|
func (c *Command) Name() string {
|
||||||
name := c.UsageLine
|
name := c.UsageLine
|
||||||
i := strings.Index(name, " ")
|
i := strings.Index(name, " ")
|
||||||
if i >= 0 {
|
if i >= 0 {
|
||||||
name = name[:i]
|
name = name[:i]
|
||||||
}
|
}
|
||||||
return name
|
return name
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Command) Usage() {
|
func (c *Command) Usage() {
|
||||||
fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine)
|
fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine)
|
||||||
fmt.Fprintf(os.Stderr, "Default Usage:\n")
|
fmt.Fprintf(os.Stderr, "Default Usage:\n")
|
||||||
c.Flag.PrintDefaults()
|
c.Flag.PrintDefaults()
|
||||||
fmt.Fprintf(os.Stderr, "Description:\n")
|
fmt.Fprintf(os.Stderr, "Description:\n")
|
||||||
fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long))
|
fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long))
|
||||||
os.Exit(2)
|
os.Exit(2)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Runnable reports whether the command can be run; otherwise
|
// Runnable reports whether the command can be run; otherwise
|
||||||
// it is a documentation pseudo-command such as importpath.
|
// it is a documentation pseudo-command such as importpath.
|
||||||
func (c *Command) Runnable() bool {
|
func (c *Command) Runnable() bool {
|
||||||
return c.Run != nil
|
return c.Run != nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,53 +1,54 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"fmt"
|
"os"
|
||||||
"os"
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
cmdShell.Run = runShell // break init cycle
|
cmdShell.Run = runShell // break init cycle
|
||||||
}
|
}
|
||||||
|
|
||||||
var cmdShell = &Command{
|
var cmdShell = &Command{
|
||||||
UsageLine: "shell",
|
UsageLine: "shell",
|
||||||
Short: "run interactive commands, now just echo",
|
Short: "run interactive commands, now just echo",
|
||||||
Long: `run interactive commands.
|
Long: `run interactive commands.
|
||||||
|
|
||||||
`,
|
`,
|
||||||
}
|
}
|
||||||
|
|
||||||
var ()
|
var (
|
||||||
|
)
|
||||||
|
|
||||||
func runShell(command *Command, args []string) bool {
|
func runShell(command *Command, args []string) bool {
|
||||||
r := bufio.NewReader(os.Stdin)
|
r := bufio.NewReader(os.Stdin)
|
||||||
o := bufio.NewWriter(os.Stdout)
|
o := bufio.NewWriter(os.Stdout)
|
||||||
e := bufio.NewWriter(os.Stderr)
|
e := bufio.NewWriter(os.Stderr)
|
||||||
prompt := func() {
|
prompt := func () {
|
||||||
o.WriteString("> ")
|
o.WriteString("> ")
|
||||||
o.Flush()
|
o.Flush()
|
||||||
}
|
};
|
||||||
readLine := func() string {
|
readLine := func () string {
|
||||||
ret, err := r.ReadString('\n')
|
ret, err := r.ReadString('\n')
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprint(e, err)
|
fmt.Fprint(e,err);
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
execCmd := func(cmd string) int {
|
execCmd := func (cmd string) int {
|
||||||
if cmd != "" {
|
if cmd != "" {
|
||||||
o.WriteString(cmd)
|
o.WriteString(cmd)
|
||||||
}
|
}
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd := ""
|
cmd := ""
|
||||||
for {
|
for {
|
||||||
prompt()
|
prompt()
|
||||||
cmd = readLine()
|
cmd = readLine()
|
||||||
execCmd(cmd)
|
execCmd(cmd)
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,7 @@ func upload(filename string, server string, fid string) (int, error) {
|
||||||
}
|
}
|
||||||
ret, e := operation.Upload("http://"+server+"/"+fid, filename, fh)
|
ret, e := operation.Upload("http://"+server+"/"+fid, filename, fh)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
return 0, e
|
return 0, e
|
||||||
}
|
}
|
||||||
return ret.Size, e
|
return ret.Size, e
|
||||||
}
|
}
|
||||||
|
|
|
@ -175,9 +175,9 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) {
|
||||||
w.Header().Set("Content-Type", "application/javascript")
|
w.Header().Set("Content-Type", "application/javascript")
|
||||||
var bytes []byte
|
var bytes []byte
|
||||||
if r.FormValue("pretty") != "" {
|
if r.FormValue("pretty") != "" {
|
||||||
bytes, _ = json.MarshalIndent(obj, "", " ")
|
bytes, _ = json.MarshalIndent(obj, "", " ")
|
||||||
} else {
|
} else {
|
||||||
bytes, _ = json.Marshal(obj)
|
bytes, _ = json.Marshal(obj)
|
||||||
}
|
}
|
||||||
callback := r.FormValue("callback")
|
callback := r.FormValue("callback")
|
||||||
if callback == "" {
|
if callback == "" {
|
||||||
|
|
|
@ -3,8 +3,8 @@ package directory
|
||||||
import (
|
import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"pkg/storage"
|
"pkg/storage"
|
||||||
"pkg/util"
|
|
||||||
"strings"
|
"strings"
|
||||||
|
"pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FileId struct {
|
type FileId struct {
|
||||||
|
@ -16,14 +16,14 @@ type FileId struct {
|
||||||
func NewFileId(VolumeId storage.VolumeId, Key uint64, Hashcode uint32) *FileId {
|
func NewFileId(VolumeId storage.VolumeId, Key uint64, Hashcode uint32) *FileId {
|
||||||
return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode}
|
return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode}
|
||||||
}
|
}
|
||||||
func ParseFileId(fid string) *FileId {
|
func ParseFileId(fid string) *FileId{
|
||||||
a := strings.Split(fid, ",")
|
a := strings.Split(fid, ",")
|
||||||
if len(a) != 2 {
|
if len(a) != 2 {
|
||||||
println("Invalid fid", fid, ", split length", len(a))
|
println("Invalid fid", fid, ", split length", len(a))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
vid_string, key_hash_string := a[0], a[1]
|
vid_string, key_hash_string := a[0], a[1]
|
||||||
volumeId, _ := storage.NewVolumeId(vid_string)
|
volumeId, _ := storage.NewVolumeId(vid_string)
|
||||||
key, hash := storage.ParseKeyHash(key_hash_string)
|
key, hash := storage.ParseKeyHash(key_hash_string)
|
||||||
return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash}
|
return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,32 +1,32 @@
|
||||||
package operation
|
package operation
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"net/url"
|
"net/url"
|
||||||
"pkg/storage"
|
"pkg/storage"
|
||||||
"pkg/topology"
|
"pkg/topology"
|
||||||
"pkg/util"
|
"pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type AllocateVolumeResult struct {
|
type AllocateVolumeResult struct {
|
||||||
Error string
|
Error string
|
||||||
}
|
}
|
||||||
|
|
||||||
func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error {
|
func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error {
|
||||||
values := make(url.Values)
|
values := make(url.Values)
|
||||||
values.Add("volume", vid.String())
|
values.Add("volume", vid.String())
|
||||||
values.Add("replicationType", repType.String())
|
values.Add("replicationType", repType.String())
|
||||||
jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
|
jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
var ret AllocateVolumeResult
|
var ret AllocateVolumeResult
|
||||||
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
|
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if ret.Error != "" {
|
if ret.Error != "" {
|
||||||
return errors.New(ret.Error)
|
return errors.New(ret.Error)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
package operation
|
package operation
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"log"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Delete(url string) error {
|
func Delete(url string) error {
|
||||||
|
|
|
@ -1,38 +1,38 @@
|
||||||
package operation
|
package operation
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"net/url"
|
||||||
_ "fmt"
|
"pkg/storage"
|
||||||
"net/url"
|
"pkg/util"
|
||||||
"pkg/storage"
|
_ "fmt"
|
||||||
"pkg/util"
|
"errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Location struct {
|
type Location struct {
|
||||||
Url string "url"
|
Url string "url"
|
||||||
PublicUrl string "publicUrl"
|
PublicUrl string "publicUrl"
|
||||||
}
|
}
|
||||||
type LookupResult struct {
|
type LookupResult struct {
|
||||||
Locations []Location "locations"
|
Locations []Location "locations"
|
||||||
Error string "error"
|
Error string "error"
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO: Add a caching for vid here
|
//TODO: Add a caching for vid here
|
||||||
func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) {
|
func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) {
|
||||||
values := make(url.Values)
|
values := make(url.Values)
|
||||||
values.Add("volumeId", vid.String())
|
values.Add("volumeId", vid.String())
|
||||||
jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
|
jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var ret LookupResult
|
var ret LookupResult
|
||||||
err = json.Unmarshal(jsonBlob, &ret)
|
err = json.Unmarshal(jsonBlob, &ret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if ret.Error != "" {
|
if ret.Error != ""{
|
||||||
return nil, errors.New(ret.Error)
|
return nil, errors.New(ret.Error)
|
||||||
}
|
}
|
||||||
return &ret, nil
|
return &ret, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,18 +3,18 @@ package operation
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
_ "fmt"
|
_ "fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"mime/multipart"
|
"mime/multipart"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
type UploadResult struct {
|
type UploadResult struct {
|
||||||
Size int
|
Size int
|
||||||
Error string
|
Error string
|
||||||
}
|
}
|
||||||
|
|
||||||
func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) {
|
func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) {
|
||||||
|
@ -26,7 +26,7 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult,
|
||||||
body_writer.Close()
|
body_writer.Close()
|
||||||
resp, err := http.Post(uploadUrl, content_type, body_buf)
|
resp, err := http.Post(uploadUrl, content_type, body_buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("failing to upload to", uploadUrl)
|
log.Println("failing to upload to", uploadUrl)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
@ -37,11 +37,11 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult,
|
||||||
var ret UploadResult
|
var ret UploadResult
|
||||||
err = json.Unmarshal(resp_body, &ret)
|
err = json.Unmarshal(resp_body, &ret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("failing to read upload resonse", uploadUrl, resp_body)
|
log.Println("failing to read upload resonse", uploadUrl, resp_body)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if ret.Error != "" {
|
if ret.Error != ""{
|
||||||
return nil, errors.New(ret.Error)
|
return nil, errors.New(ret.Error)
|
||||||
}
|
}
|
||||||
return &ret, nil
|
return &ret, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"pkg/operation"
|
"pkg/operation"
|
||||||
"pkg/storage"
|
"pkg/storage"
|
||||||
"pkg/topology"
|
"pkg/topology"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -24,7 +24,7 @@ type VolumeGrowth struct {
|
||||||
copy3factor int
|
copy3factor int
|
||||||
copyAll int
|
copyAll int
|
||||||
|
|
||||||
accessLock sync.Mutex
|
accessLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDefaultVolumeGrowth() *VolumeGrowth {
|
func NewDefaultVolumeGrowth() *VolumeGrowth {
|
||||||
|
@ -49,8 +49,8 @@ func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topolo
|
||||||
return 0, errors.New("Unknown Replication Type!")
|
return 0, errors.New("Unknown Replication Type!")
|
||||||
}
|
}
|
||||||
func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) {
|
func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) {
|
||||||
vg.accessLock.Lock()
|
vg.accessLock.Lock()
|
||||||
defer vg.accessLock.Unlock()
|
defer vg.accessLock.Unlock()
|
||||||
|
|
||||||
counter = 0
|
counter = 0
|
||||||
switch repType {
|
switch repType {
|
||||||
|
@ -182,7 +182,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
|
||||||
func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error {
|
func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error {
|
||||||
for _, server := range servers {
|
for _, server := range servers {
|
||||||
if err := operation.AllocateVolume(server, vid, repType); err == nil {
|
if err := operation.AllocateVolume(server, vid, repType); err == nil {
|
||||||
vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version: storage.CurrentVersion}
|
vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version:storage.CurrentVersion}
|
||||||
server.AddOrUpdateVolume(vi)
|
server.AddOrUpdateVolume(vi)
|
||||||
topo.RegisterVolumeLayout(&vi, server)
|
topo.RegisterVolumeLayout(&vi, server)
|
||||||
fmt.Println("Created Volume", vid, "on", server)
|
fmt.Println("Created Volume", vid, "on", server)
|
||||||
|
|
|
@ -5,7 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"pkg/storage"
|
"pkg/storage"
|
||||||
"pkg/topology"
|
"pkg/topology"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -80,7 +80,7 @@ func setup(topologyLayout string) *topology.Topology {
|
||||||
fmt.Println("data:", data)
|
fmt.Println("data:", data)
|
||||||
|
|
||||||
//need to connect all nodes first before server adding volumes
|
//need to connect all nodes first before server adding volumes
|
||||||
topo := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", "/tmp", "testing", 32*1024, 5)
|
topo := topology.NewTopology("mynetwork","/etc/weedfs/weedfs.conf","/tmp","testing",32*1024, 5)
|
||||||
mTopology := data.(map[string]interface{})
|
mTopology := data.(map[string]interface{})
|
||||||
for dcKey, dcValue := range mTopology {
|
for dcKey, dcValue := range mTopology {
|
||||||
dc := topology.NewDataCenter(dcKey)
|
dc := topology.NewDataCenter(dcKey)
|
||||||
|
@ -96,7 +96,7 @@ func setup(topologyLayout string) *topology.Topology {
|
||||||
rack.LinkChildNode(server)
|
rack.LinkChildNode(server)
|
||||||
for _, v := range serverMap["volumes"].([]interface{}) {
|
for _, v := range serverMap["volumes"].([]interface{}) {
|
||||||
m := v.(map[string]interface{})
|
m := v.(map[string]interface{})
|
||||||
vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion}
|
vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion}
|
||||||
server.AddOrUpdateVolume(vi)
|
server.AddOrUpdateVolume(vi)
|
||||||
}
|
}
|
||||||
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
|
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
|
||||||
|
@ -121,9 +121,10 @@ func TestRemoveDataCenter(t *testing.T) {
|
||||||
|
|
||||||
func TestReserveOneVolume(t *testing.T) {
|
func TestReserveOneVolume(t *testing.T) {
|
||||||
topo := setup(topologyLayout)
|
topo := setup(topologyLayout)
|
||||||
rand.Seed(time.Now().UnixNano())
|
rand.Seed(time.Now().UnixNano())
|
||||||
vg := &VolumeGrowth{copy1factor: 3, copy2factor: 2, copy3factor: 1, copyAll: 4}
|
vg:=&VolumeGrowth{copy1factor:3,copy2factor:2,copy3factor:1,copyAll:4}
|
||||||
if c, e := vg.GrowByCountAndType(1, storage.Copy000, topo); e == nil {
|
if c, e := vg.GrowByCountAndType(1,storage.Copy000,topo);e==nil{
|
||||||
t.Log("reserved", c)
|
t.Log("reserved", c)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
package sequence
|
package sequence
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/gob"
|
"encoding/gob"
|
||||||
"log"
|
"os"
|
||||||
"os"
|
"path"
|
||||||
"path"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
"log"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -27,21 +27,21 @@ type SequencerImpl struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSequencer(dirname string, filename string) (m *SequencerImpl) {
|
func NewSequencer(dirname string, filename string) (m *SequencerImpl) {
|
||||||
m = &SequencerImpl{dir: dirname, fileName: filename}
|
m = &SequencerImpl{dir: dirname, fileName: filename}
|
||||||
|
|
||||||
seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644)
|
seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644)
|
||||||
if se != nil {
|
if se != nil {
|
||||||
m.FileIdSequence = FileIdSaveInterval
|
m.FileIdSequence = FileIdSaveInterval
|
||||||
log.Println("Setting file id sequence", m.FileIdSequence)
|
log.Println("Setting file id sequence", m.FileIdSequence)
|
||||||
} else {
|
} else {
|
||||||
decoder := gob.NewDecoder(seqFile)
|
decoder := gob.NewDecoder(seqFile)
|
||||||
defer seqFile.Close()
|
defer seqFile.Close()
|
||||||
decoder.Decode(&m.FileIdSequence)
|
decoder.Decode(&m.FileIdSequence)
|
||||||
log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval)
|
log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval)
|
||||||
//in case the server stops between intervals
|
//in case the server stops between intervals
|
||||||
m.FileIdSequence += FileIdSaveInterval
|
m.FileIdSequence += FileIdSaveInterval
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
//count should be 1 or more
|
//count should be 1 or more
|
||||||
|
@ -60,12 +60,12 @@ func (m *SequencerImpl) NextFileId(count int) (uint64, int) {
|
||||||
return m.FileIdSequence - m.fileIdCounter, count
|
return m.FileIdSequence - m.fileIdCounter, count
|
||||||
}
|
}
|
||||||
func (m *SequencerImpl) saveSequence() {
|
func (m *SequencerImpl) saveSequence() {
|
||||||
log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq"))
|
log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq"))
|
||||||
seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644)
|
seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
log.Fatalf("Sequence File Save [ERROR] %s\n", e)
|
log.Fatalf("Sequence File Save [ERROR] %s\n", e)
|
||||||
}
|
}
|
||||||
defer seqFile.Close()
|
defer seqFile.Close()
|
||||||
encoder := gob.NewEncoder(seqFile)
|
encoder := gob.NewEncoder(seqFile)
|
||||||
encoder.Encode(m.FileIdSequence)
|
encoder.Encode(m.FileIdSequence)
|
||||||
}
|
}
|
||||||
|
|
|
@ -175,23 +175,3 @@ func (cm *CompactMap) Peek() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// iterate over the keys by calling iterate on each key till error is returned
|
|
||||||
func (cm *CompactMap) Walk(pedestrian func(*NeedleValue) error) (err error) {
|
|
||||||
var i int
|
|
||||||
for _, cs := range cm.list {
|
|
||||||
for key := cs.start; key < cs.end; key++ {
|
|
||||||
if i = cs.binarySearchValues(key); i >= 0 {
|
|
||||||
if err = pedestrian(&cs.values[i]); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, val := range cs.overflow {
|
|
||||||
if err = pedestrian(val); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,43 +1,43 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"pkg/util"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMemoryUsage(t *testing.T) {
|
func TestMemoryUsage(t *testing.T) {
|
||||||
|
|
||||||
indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
|
indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
|
||||||
if ie != nil {
|
if ie != nil {
|
||||||
log.Fatalln(ie)
|
log.Fatalln(ie)
|
||||||
}
|
}
|
||||||
LoadNewNeedleMap(indexFile)
|
LoadNewNeedleMap(indexFile)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadNewNeedleMap(file *os.File) CompactMap {
|
func LoadNewNeedleMap(file *os.File) CompactMap {
|
||||||
m := NewCompactMap()
|
m := NewCompactMap()
|
||||||
bytes := make([]byte, 16*1024)
|
bytes := make([]byte, 16*1024)
|
||||||
count, e := file.Read(bytes)
|
count, e := file.Read(bytes)
|
||||||
if count > 0 {
|
if count > 0 {
|
||||||
fstat, _ := file.Stat()
|
fstat, _ := file.Stat()
|
||||||
log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
|
log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
|
||||||
}
|
}
|
||||||
for count > 0 && e == nil {
|
for count > 0 && e == nil {
|
||||||
for i := 0; i < count; i += 16 {
|
for i := 0; i < count; i += 16 {
|
||||||
key := util.BytesToUint64(bytes[i : i+8])
|
key := util.BytesToUint64(bytes[i : i+8])
|
||||||
offset := util.BytesToUint32(bytes[i+8 : i+12])
|
offset := util.BytesToUint32(bytes[i+8 : i+12])
|
||||||
size := util.BytesToUint32(bytes[i+12 : i+16])
|
size := util.BytesToUint32(bytes[i+12 : i+16])
|
||||||
if offset > 0 {
|
if offset > 0 {
|
||||||
m.Set(Key(key), offset, size)
|
m.Set(Key(key), offset, size)
|
||||||
} else {
|
} else {
|
||||||
//delete(m, key)
|
//delete(m, key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
count, e = file.Read(bytes)
|
count, e = file.Read(bytes)
|
||||||
}
|
}
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,42 +18,42 @@ func TestXYZ(t *testing.T) {
|
||||||
m.Set(Key(i), i+11, i+5)
|
m.Set(Key(i), i+11, i+5)
|
||||||
}
|
}
|
||||||
|
|
||||||
// for i := uint32(0); i < 100; i++ {
|
// for i := uint32(0); i < 100; i++ {
|
||||||
// if v := m.Get(Key(i)); v != nil {
|
// if v := m.Get(Key(i)); v != nil {
|
||||||
// println(i, "=", v.Key, v.Offset, v.Size)
|
// println(i, "=", v.Key, v.Offset, v.Size)
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
|
|
||||||
for i := uint32(0); i < 10*batch; i++ {
|
for i := uint32(0); i < 10*batch; i++ {
|
||||||
v, ok := m.Get(Key(i))
|
v, ok := m.Get(Key(i))
|
||||||
if i%3 == 0 {
|
if i%3 == 0 {
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatal("key", i, "missing!")
|
t.Fatal("key", i, "missing!")
|
||||||
}
|
}
|
||||||
if v.Size != i+5 {
|
if v.Size != i+5 {
|
||||||
t.Fatal("key", i, "size", v.Size)
|
t.Fatal("key", i, "size", v.Size)
|
||||||
}
|
}
|
||||||
} else if i%37 == 0 {
|
} else if i%37 == 0 {
|
||||||
if ok && v.Size > 0 {
|
if ok && v.Size > 0 {
|
||||||
t.Fatal("key", i, "should have been deleted needle value", v)
|
t.Fatal("key", i, "should have been deleted needle value", v)
|
||||||
}
|
}
|
||||||
} else if i%2 == 0 {
|
} else if i%2 == 0 {
|
||||||
if v.Size != i {
|
if v.Size != i {
|
||||||
t.Fatal("key", i, "size", v.Size)
|
t.Fatal("key", i, "size", v.Size)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := uint32(10 * batch); i < 100*batch; i++ {
|
for i := uint32(10 * batch); i < 100*batch; i++ {
|
||||||
v, ok := m.Get(Key(i))
|
v, ok := m.Get(Key(i))
|
||||||
if i%37 == 0 {
|
if i%37 == 0 {
|
||||||
if ok && v.Size > 0 {
|
if ok && v.Size > 0 {
|
||||||
t.Fatal("key", i, "should have been deleted needle value", v)
|
t.Fatal("key", i, "should have been deleted needle value", v)
|
||||||
}
|
}
|
||||||
} else if i%2 == 0 {
|
} else if i%2 == 0 {
|
||||||
if v == nil {
|
if v==nil{
|
||||||
t.Fatal("key", i, "missing")
|
t.Fatal("key", i, "missing")
|
||||||
}
|
}
|
||||||
if v.Size != i {
|
if v.Size != i {
|
||||||
t.Fatal("key", i, "size", v.Size)
|
t.Fatal("key", i, "size", v.Size)
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,8 +98,3 @@ func (nm *NeedleMap) Close() {
|
||||||
func (nm *NeedleMap) ContentSize() uint64 {
|
func (nm *NeedleMap) ContentSize() uint64 {
|
||||||
return nm.fileByteCounter
|
return nm.fileByteCounter
|
||||||
}
|
}
|
||||||
|
|
||||||
// iterate through all needles using the iterator function
|
|
||||||
func (nm *NeedleMap) Walk(pedestrian func(*NeedleValue) error) (err error) {
|
|
||||||
return nm.m.Walk(pedestrian)
|
|
||||||
}
|
|
||||||
|
|
|
@ -2,10 +2,10 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"pkg/util"
|
"pkg/util"
|
||||||
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (n *Needle) Append(w io.Writer, version Version) uint32 {
|
func (n *Needle) Append(w io.Writer, version Version) uint32 {
|
||||||
|
@ -62,8 +62,7 @@ func (n *Needle) Append(w io.Writer, version Version) uint32 {
|
||||||
return n.Size
|
return n.Size
|
||||||
}
|
}
|
||||||
func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
|
func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
|
||||||
switch version {
|
if version == Version1 {
|
||||||
case Version1:
|
|
||||||
bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize)
|
bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize)
|
||||||
ret, e := r.Read(bytes)
|
ret, e := r.Read(bytes)
|
||||||
n.readNeedleHeader(bytes)
|
n.readNeedleHeader(bytes)
|
||||||
|
@ -73,7 +72,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
|
||||||
return 0, errors.New("CRC error! Data On Disk Corrupted!")
|
return 0, errors.New("CRC error! Data On Disk Corrupted!")
|
||||||
}
|
}
|
||||||
return ret, e
|
return ret, e
|
||||||
case Version2:
|
} else if version == Version2 {
|
||||||
if size == 0 {
|
if size == 0 {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
@ -96,7 +95,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
|
||||||
}
|
}
|
||||||
return ret, e
|
return ret, e
|
||||||
}
|
}
|
||||||
return 0, fmt.Errorf("Unsupported Version! (%d)", version)
|
return 0, errors.New("Unsupported Version!")
|
||||||
}
|
}
|
||||||
func (n *Needle) readNeedleHeader(bytes []byte) {
|
func (n *Needle) readNeedleHeader(bytes []byte) {
|
||||||
n.Cookie = util.BytesToUint32(bytes[0:4])
|
n.Cookie = util.BytesToUint32(bytes[0:4])
|
||||||
|
|
|
@ -1,123 +1,123 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ReplicationType string
|
type ReplicationType string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
Copy000 = ReplicationType("000") // single copy
|
Copy000 = ReplicationType("000") // single copy
|
||||||
Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
|
Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
|
||||||
Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
|
Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
|
||||||
Copy100 = ReplicationType("100") // 2 copies, each on different data center
|
Copy100 = ReplicationType("100") // 2 copies, each on different data center
|
||||||
Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
|
Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
|
||||||
Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
|
Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
|
||||||
LengthRelicationType = 6
|
LengthRelicationType = 6
|
||||||
CopyNil = ReplicationType(255) // nil value
|
CopyNil = ReplicationType(255) // nil value
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewReplicationTypeFromString(t string) (ReplicationType, error) {
|
func NewReplicationTypeFromString(t string) (ReplicationType, error) {
|
||||||
switch t {
|
switch t {
|
||||||
case "000":
|
case "000":
|
||||||
return Copy000, nil
|
return Copy000, nil
|
||||||
case "001":
|
case "001":
|
||||||
return Copy001, nil
|
return Copy001, nil
|
||||||
case "010":
|
case "010":
|
||||||
return Copy010, nil
|
return Copy010, nil
|
||||||
case "100":
|
case "100":
|
||||||
return Copy100, nil
|
return Copy100, nil
|
||||||
case "110":
|
case "110":
|
||||||
return Copy110, nil
|
return Copy110, nil
|
||||||
case "200":
|
case "200":
|
||||||
return Copy200, nil
|
return Copy200, nil
|
||||||
}
|
}
|
||||||
return Copy000, errors.New("Unknown Replication Type:" + t)
|
return Copy000, errors.New("Unknown Replication Type:"+t)
|
||||||
}
|
}
|
||||||
func NewReplicationTypeFromByte(b byte) (ReplicationType, error) {
|
func NewReplicationTypeFromByte(b byte) (ReplicationType, error) {
|
||||||
switch b {
|
switch b {
|
||||||
case byte(000):
|
case byte(000):
|
||||||
return Copy000, nil
|
return Copy000, nil
|
||||||
case byte(001):
|
case byte(001):
|
||||||
return Copy001, nil
|
return Copy001, nil
|
||||||
case byte(010):
|
case byte(010):
|
||||||
return Copy010, nil
|
return Copy010, nil
|
||||||
case byte(100):
|
case byte(100):
|
||||||
return Copy100, nil
|
return Copy100, nil
|
||||||
case byte(110):
|
case byte(110):
|
||||||
return Copy110, nil
|
return Copy110, nil
|
||||||
case byte(200):
|
case byte(200):
|
||||||
return Copy200, nil
|
return Copy200, nil
|
||||||
}
|
}
|
||||||
return Copy000, errors.New("Unknown Replication Type:" + string(b))
|
return Copy000, errors.New("Unknown Replication Type:"+string(b))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ReplicationType) String() string {
|
func (r *ReplicationType) String() string {
|
||||||
switch *r {
|
switch *r {
|
||||||
case Copy000:
|
case Copy000:
|
||||||
return "000"
|
return "000"
|
||||||
case Copy001:
|
case Copy001:
|
||||||
return "001"
|
return "001"
|
||||||
case Copy010:
|
case Copy010:
|
||||||
return "010"
|
return "010"
|
||||||
case Copy100:
|
case Copy100:
|
||||||
return "100"
|
return "100"
|
||||||
case Copy110:
|
case Copy110:
|
||||||
return "110"
|
return "110"
|
||||||
case Copy200:
|
case Copy200:
|
||||||
return "200"
|
return "200"
|
||||||
}
|
}
|
||||||
return "000"
|
return "000"
|
||||||
}
|
}
|
||||||
func (r *ReplicationType) Byte() byte {
|
func (r *ReplicationType) Byte() byte {
|
||||||
switch *r {
|
switch *r {
|
||||||
case Copy000:
|
case Copy000:
|
||||||
return byte(000)
|
return byte(000)
|
||||||
case Copy001:
|
case Copy001:
|
||||||
return byte(001)
|
return byte(001)
|
||||||
case Copy010:
|
case Copy010:
|
||||||
return byte(010)
|
return byte(010)
|
||||||
case Copy100:
|
case Copy100:
|
||||||
return byte(100)
|
return byte(100)
|
||||||
case Copy110:
|
case Copy110:
|
||||||
return byte(110)
|
return byte(110)
|
||||||
case Copy200:
|
case Copy200:
|
||||||
return byte(200)
|
return byte(200)
|
||||||
}
|
}
|
||||||
return byte(000)
|
return byte(000)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repType ReplicationType) GetReplicationLevelIndex() int {
|
func (repType ReplicationType)GetReplicationLevelIndex() int {
|
||||||
switch repType {
|
switch repType {
|
||||||
case Copy000:
|
case Copy000:
|
||||||
return 0
|
return 0
|
||||||
case Copy001:
|
case Copy001:
|
||||||
return 1
|
return 1
|
||||||
case Copy010:
|
case Copy010:
|
||||||
return 2
|
return 2
|
||||||
case Copy100:
|
case Copy100:
|
||||||
return 3
|
return 3
|
||||||
case Copy110:
|
case Copy110:
|
||||||
return 4
|
return 4
|
||||||
case Copy200:
|
case Copy200:
|
||||||
return 5
|
return 5
|
||||||
}
|
}
|
||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
func (repType ReplicationType) GetCopyCount() int {
|
func (repType ReplicationType)GetCopyCount() int {
|
||||||
switch repType {
|
switch repType {
|
||||||
case Copy000:
|
case Copy000:
|
||||||
return 1
|
return 1
|
||||||
case Copy001:
|
case Copy001:
|
||||||
return 2
|
return 2
|
||||||
case Copy010:
|
case Copy010:
|
||||||
return 2
|
return 2
|
||||||
case Copy100:
|
case Copy100:
|
||||||
return 2
|
return 2
|
||||||
case Copy110:
|
case Copy110:
|
||||||
return 3
|
return 3
|
||||||
case Copy200:
|
case Copy200:
|
||||||
return 3
|
return 3
|
||||||
}
|
}
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,13 +65,13 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
|
||||||
}
|
}
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) {
|
func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error {
|
||||||
if s.volumes[vid] != nil {
|
if s.volumes[vid] != nil {
|
||||||
return errors.New("Volume Id " + vid.String() + " already exists!")
|
return errors.New("Volume Id " + vid.String() + " already exists!")
|
||||||
}
|
}
|
||||||
log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType)
|
log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType)
|
||||||
s.volumes[vid], err = NewVolume(s.dir, vid, replicationType)
|
s.volumes[vid] = NewVolume(s.dir, vid, replicationType)
|
||||||
return err
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
|
func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
|
||||||
|
@ -107,10 +107,9 @@ func (s *Store) loadExistingVolumes() {
|
||||||
base := name[:len(name)-len(".dat")]
|
base := name[:len(name)-len(".dat")]
|
||||||
if vid, err := NewVolumeId(base); err == nil {
|
if vid, err := NewVolumeId(base); err == nil {
|
||||||
if s.volumes[vid] == nil {
|
if s.volumes[vid] == nil {
|
||||||
if v, e := NewVolume(s.dir, vid, CopyNil); e == nil {
|
v := NewVolume(s.dir, vid, CopyNil)
|
||||||
s.volumes[vid] = v
|
s.volumes[vid] = v
|
||||||
log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size())
|
log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size())
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,9 +24,9 @@ type Volume struct {
|
||||||
accessLock sync.Mutex
|
accessLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
|
func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) {
|
||||||
v = &Volume{dir: dirname, Id: id, replicaType: replicationType}
|
v = &Volume{dir: dirname, Id: id, replicaType: replicationType}
|
||||||
e = v.load()
|
v.load()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
func (v *Volume) load() error {
|
func (v *Volume) load() error {
|
||||||
|
@ -43,7 +43,6 @@ func (v *Volume) load() error {
|
||||||
} else {
|
} else {
|
||||||
v.maybeWriteSuperBlock()
|
v.maybeWriteSuperBlock()
|
||||||
}
|
}
|
||||||
// TODO: if .idx not exists, but .cdb exists, then use (but don't load!) that
|
|
||||||
indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
|
indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
|
||||||
if ie != nil {
|
if ie != nil {
|
||||||
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
|
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
|
||||||
|
@ -80,23 +79,21 @@ func (v *Volume) maybeWriteSuperBlock() {
|
||||||
v.dataFile.Write(header)
|
v.dataFile.Write(header)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func (v *Volume) readSuperBlock() (err error) {
|
func (v *Volume) readSuperBlock() error {
|
||||||
v.dataFile.Seek(0, 0)
|
v.dataFile.Seek(0, 0)
|
||||||
header := make([]byte, SuperBlockSize)
|
header := make([]byte, SuperBlockSize)
|
||||||
if _, e := v.dataFile.Read(header); e != nil {
|
if _, e := v.dataFile.Read(header); e != nil {
|
||||||
return fmt.Errorf("cannot read superblock: %s", e)
|
return fmt.Errorf("cannot read superblock: %s", e)
|
||||||
}
|
}
|
||||||
|
var err error
|
||||||
v.version, v.replicaType, err = ParseSuperBlock(header)
|
v.version, v.replicaType, err = ParseSuperBlock(header)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, err error) {
|
func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, e error) {
|
||||||
version = Version(header[0])
|
version = Version(header[0])
|
||||||
if version == 0 {
|
var err error
|
||||||
err = errors.New("Zero version impossible - bad superblock!")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
|
if replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
|
||||||
err = fmt.Errorf("cannot read replica type: %s", err)
|
e = fmt.Errorf("cannot read replica type: %s", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -224,39 +221,3 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string)
|
||||||
func (v *Volume) ContentSize() uint64 {
|
func (v *Volume) ContentSize() uint64 {
|
||||||
return v.nm.fileByteCounter
|
return v.nm.fileByteCounter
|
||||||
}
|
}
|
||||||
|
|
||||||
// Walk over the contained needles (call the function with each NeedleValue till error is returned)
|
|
||||||
func (v *Volume) WalkValues(pedestrian func(*Needle) error) error {
|
|
||||||
pedplus := func(nv *NeedleValue) (err error) {
|
|
||||||
n := new(Needle)
|
|
||||||
if nv.Offset > 0 {
|
|
||||||
v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0)
|
|
||||||
if _, err = n.Read(v.dataFile, nv.Size, v.version); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err = pedestrian(n); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return v.nm.Walk(pedplus)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Walk over the keys
|
|
||||||
func (v *Volume) WalkKeys(pedestrian func(Key) error) error {
|
|
||||||
pedplus := func(nv *NeedleValue) (err error) {
|
|
||||||
if nv.Offset > 0 && nv.Key > 0 {
|
|
||||||
if err = pedestrian(nv.Key); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return v.nm.Walk(pedplus)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *Volume) String() string {
|
|
||||||
return fmt.Sprintf("%d@%s:v%d:r%s", v.Id, v.dataFile.Name(),
|
|
||||||
v.Version(), v.replicaType)
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,18 +1,17 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
type VolumeId uint32
|
type VolumeId uint32
|
||||||
|
func NewVolumeId(vid string) (VolumeId,error) {
|
||||||
func NewVolumeId(vid string) (VolumeId, error) {
|
volumeId, err := strconv.ParseUint(vid, 10, 64)
|
||||||
volumeId, err := strconv.ParseUint(vid, 10, 64)
|
return VolumeId(volumeId), err
|
||||||
return VolumeId(volumeId), err
|
|
||||||
}
|
}
|
||||||
func (vid *VolumeId) String() string {
|
func (vid *VolumeId) String() string{
|
||||||
return strconv.FormatUint(uint64(*vid), 10)
|
return strconv.FormatUint(uint64(*vid), 10)
|
||||||
}
|
}
|
||||||
func (vid *VolumeId) Next() VolumeId {
|
func (vid *VolumeId) Next() VolumeId{
|
||||||
return VolumeId(uint32(*vid) + 1)
|
return VolumeId(uint32(*vid)+1)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,12 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import ()
|
import (
|
||||||
|
)
|
||||||
|
|
||||||
type Version uint8
|
type Version uint8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
Version1 = Version(1)
|
Version1 = Version(1)
|
||||||
Version2 = Version(2)
|
Version2 = Version(2)
|
||||||
CurrentVersion = Version2
|
CurrentVersion = Version2
|
||||||
)
|
)
|
||||||
|
|
|
@ -30,13 +30,13 @@ func TestLoadConfiguration(t *testing.T) {
|
||||||
</Configuration>
|
</Configuration>
|
||||||
`
|
`
|
||||||
c, err := NewConfiguration([]byte(confContent))
|
c, err := NewConfiguration([]byte(confContent))
|
||||||
|
|
||||||
fmt.Printf("%s\n", c)
|
fmt.Printf("%s\n", c)
|
||||||
if err != nil {
|
if err!=nil{
|
||||||
t.Fatalf("unmarshal error:%s", err.Error())
|
t.Fatalf("unmarshal error:%s",err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" {
|
if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" {
|
||||||
t.Fatalf("unmarshal error:%s", c)
|
t.Fatalf("unmarshal error:%s",c)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package topology
|
package topology
|
||||||
|
|
||||||
import ()
|
import (
|
||||||
|
)
|
||||||
|
|
||||||
type DataCenter struct {
|
type DataCenter struct {
|
||||||
NodeImpl
|
NodeImpl
|
||||||
|
@ -11,31 +12,31 @@ func NewDataCenter(id string) *DataCenter {
|
||||||
dc.id = NodeId(id)
|
dc.id = NodeId(id)
|
||||||
dc.nodeType = "DataCenter"
|
dc.nodeType = "DataCenter"
|
||||||
dc.children = make(map[NodeId]Node)
|
dc.children = make(map[NodeId]Node)
|
||||||
dc.NodeImpl.value = dc
|
dc.NodeImpl.value = dc
|
||||||
return dc
|
return dc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
|
func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
|
||||||
for _, c := range dc.Children() {
|
for _, c := range dc.Children() {
|
||||||
rack := c.(*Rack)
|
rack := c.(*Rack)
|
||||||
if string(rack.Id()) == rackName {
|
if string(rack.Id()) == rackName {
|
||||||
return rack
|
return rack
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rack := NewRack(rackName)
|
rack := NewRack(rackName)
|
||||||
dc.LinkChildNode(rack)
|
dc.LinkChildNode(rack)
|
||||||
return rack
|
return rack
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DataCenter) ToMap() interface{} {
|
func (dc *DataCenter) ToMap() interface{}{
|
||||||
m := make(map[string]interface{})
|
m := make(map[string]interface{})
|
||||||
m["Max"] = dc.GetMaxVolumeCount()
|
m["Max"] = dc.GetMaxVolumeCount()
|
||||||
m["Free"] = dc.FreeSpace()
|
m["Free"] = dc.FreeSpace()
|
||||||
var racks []interface{}
|
var racks []interface{}
|
||||||
for _, c := range dc.Children() {
|
for _, c := range dc.Children() {
|
||||||
rack := c.(*Rack)
|
rack := c.(*Rack)
|
||||||
racks = append(racks, rack.ToMap())
|
racks = append(racks, rack.ToMap())
|
||||||
}
|
}
|
||||||
m["Racks"] = racks
|
m["Racks"] = racks
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,14 +37,14 @@ func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) {
|
||||||
list = append(list, n)
|
list = append(list, n)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if n > len(list) {
|
if n > len(list){
|
||||||
return nil, false
|
return nil,false
|
||||||
}
|
}
|
||||||
for i := n; i > 0; i-- {
|
for i := n; i > 0; i-- {
|
||||||
r := rand.Intn(i)
|
r := rand.Intn(i)
|
||||||
t := list[r]
|
t := list[r]
|
||||||
list[r] = list[i-1]
|
list[r] = list[i-1]
|
||||||
list[i-1] = t
|
list[i-1] = t
|
||||||
}
|
}
|
||||||
return list[len(list)-n:], true
|
return list[len(list)-n:], true
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,39 +1,39 @@
|
||||||
package topology
|
package topology
|
||||||
|
|
||||||
import (
|
import (
|
||||||
_ "fmt"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
_ "fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestXYZ(t *testing.T) {
|
func TestXYZ(t *testing.T) {
|
||||||
topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5)
|
topo := NewTopology("topo","/etc/weed.conf", "/tmp","test",234,5)
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
dc := NewDataCenter("dc" + strconv.Itoa(i))
|
dc := NewDataCenter("dc" + strconv.Itoa(i))
|
||||||
dc.activeVolumeCount = i
|
dc.activeVolumeCount = i
|
||||||
dc.maxVolumeCount = 5
|
dc.maxVolumeCount = 5
|
||||||
topo.LinkChildNode(dc)
|
topo.LinkChildNode(dc)
|
||||||
}
|
}
|
||||||
nl := NewNodeList(topo.Children(), nil)
|
nl := NewNodeList(topo.Children(),nil)
|
||||||
|
|
||||||
picked, ret := nl.RandomlyPickN(1)
|
picked, ret := nl.RandomlyPickN(1)
|
||||||
if !ret || len(picked) != 1 {
|
if !ret || len(picked)!=1 {
|
||||||
t.Errorf("need to randomly pick 1 node")
|
t.Errorf("need to randomly pick 1 node")
|
||||||
}
|
}
|
||||||
|
|
||||||
picked, ret = nl.RandomlyPickN(4)
|
picked, ret = nl.RandomlyPickN(4)
|
||||||
if !ret || len(picked) != 4 {
|
if !ret || len(picked)!=4 {
|
||||||
t.Errorf("need to randomly pick 4 nodes")
|
t.Errorf("need to randomly pick 4 nodes")
|
||||||
}
|
}
|
||||||
|
|
||||||
picked, ret = nl.RandomlyPickN(5)
|
picked, ret = nl.RandomlyPickN(5)
|
||||||
if !ret || len(picked) != 5 {
|
if !ret || len(picked)!=5 {
|
||||||
t.Errorf("need to randomly pick 5 nodes")
|
t.Errorf("need to randomly pick 5 nodes")
|
||||||
}
|
}
|
||||||
|
|
||||||
picked, ret = nl.RandomlyPickN(6)
|
picked, ret = nl.RandomlyPickN(6)
|
||||||
if ret || len(picked) != 0 {
|
if ret || len(picked)!=0 {
|
||||||
t.Errorf("can not randomly pick 6 nodes:", ret, picked)
|
t.Errorf("can not randomly pick 6 nodes:", ret, picked)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,13 +19,13 @@ func NewRack(id string) *Rack {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Rack) FindDataNode(ip string, port int) *DataNode {
|
func (r *Rack) FindDataNode(ip string, port int) *DataNode {
|
||||||
for _, c := range r.Children() {
|
for _, c := range r.Children() {
|
||||||
dn := c.(*DataNode)
|
dn := c.(*DataNode)
|
||||||
if dn.MatchLocation(ip, port) {
|
if dn.MatchLocation(ip, port) {
|
||||||
return dn
|
return dn
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
|
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
|
||||||
for _, c := range r.Children() {
|
for _, c := range r.Children() {
|
||||||
|
|
|
@ -78,7 +78,7 @@ func setup(topologyLayout string) *Topology {
|
||||||
}
|
}
|
||||||
|
|
||||||
//need to connect all nodes first before server adding volumes
|
//need to connect all nodes first before server adding volumes
|
||||||
topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5)
|
topo := NewTopology("mynetwork","/etc/weed.conf","/tmp","test",234,5)
|
||||||
mTopology := data.(map[string]interface{})
|
mTopology := data.(map[string]interface{})
|
||||||
for dcKey, dcValue := range mTopology {
|
for dcKey, dcValue := range mTopology {
|
||||||
dc := NewDataCenter(dcKey)
|
dc := NewDataCenter(dcKey)
|
||||||
|
@ -94,7 +94,7 @@ func setup(topologyLayout string) *Topology {
|
||||||
rack.LinkChildNode(server)
|
rack.LinkChildNode(server)
|
||||||
for _, v := range serverMap["volumes"].([]interface{}) {
|
for _, v := range serverMap["volumes"].([]interface{}) {
|
||||||
m := v.(map[string]interface{})
|
m := v.(map[string]interface{})
|
||||||
vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion}
|
vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion}
|
||||||
server.AddOrUpdateVolume(vi)
|
server.AddOrUpdateVolume(vi)
|
||||||
}
|
}
|
||||||
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
|
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
|
||||||
|
@ -119,9 +119,9 @@ func TestRemoveDataCenter(t *testing.T) {
|
||||||
|
|
||||||
func TestReserveOneVolume(t *testing.T) {
|
func TestReserveOneVolume(t *testing.T) {
|
||||||
topo := setup(topologyLayout)
|
topo := setup(topologyLayout)
|
||||||
rand.Seed(time.Now().UnixNano())
|
rand.Seed(time.Now().UnixNano())
|
||||||
rand.Seed(1)
|
rand.Seed(1)
|
||||||
ret, node, vid := topo.RandomlyReserveOneVolume()
|
ret, node, vid := topo.RandomlyReserveOneVolume()
|
||||||
fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid)
|
fmt.Println("assigned :", ret, ", node :", node,", volume id:", vid)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,10 +101,10 @@ type VacuumVolumeResult struct {
|
||||||
func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) {
|
func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) {
|
||||||
values := make(url.Values)
|
values := make(url.Values)
|
||||||
values.Add("volume", vid.String())
|
values.Add("volume", vid.String())
|
||||||
values.Add("garbageThreshold", garbageThreshold)
|
values.Add("garbageThreshold", garbageThreshold)
|
||||||
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values)
|
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("parameters:", values)
|
fmt.Println("parameters:",values)
|
||||||
return err, false
|
return err, false
|
||||||
}
|
}
|
||||||
var ret VacuumVolumeResult
|
var ret VacuumVolumeResult
|
||||||
|
|
|
@ -52,14 +52,14 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
|
||||||
vl := t.GetVolumeLayout(v.RepType)
|
vl := t.GetVolumeLayout(v.RepType)
|
||||||
vl.SetVolumeUnavailable(dn, v.Id)
|
vl.SetVolumeUnavailable(dn, v.Id)
|
||||||
}
|
}
|
||||||
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
|
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
|
||||||
dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount())
|
dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount())
|
||||||
dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
|
dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
|
||||||
dn.Parent().UnlinkChildNode(dn.Id())
|
dn.Parent().UnlinkChildNode(dn.Id())
|
||||||
}
|
}
|
||||||
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
|
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
|
||||||
for _, v := range dn.volumes {
|
for _, v := range dn.volumes {
|
||||||
vl := t.GetVolumeLayout(v.RepType)
|
vl := t.GetVolumeLayout(v.RepType)
|
||||||
if vl.isWritable(&v) {
|
if vl.isWritable(&v) {
|
||||||
vl.SetVolumeAvailable(dn, v.Id)
|
vl.SetVolumeAvailable(dn, v.Id)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package topology
|
package topology
|
||||||
|
|
||||||
import ()
|
import (
|
||||||
|
)
|
||||||
|
|
||||||
func (t *Topology) ToMap() interface{} {
|
func (t *Topology) ToMap() interface{} {
|
||||||
m := make(map[string]interface{})
|
m := make(map[string]interface{})
|
||||||
|
|
|
@ -15,7 +15,7 @@ func (dnll *VolumeLocationList) Head() *DataNode {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dnll *VolumeLocationList) Length() int {
|
func (dnll *VolumeLocationList) Length() int {
|
||||||
return len(dnll.list)
|
return len(dnll.list)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
|
func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
|
||||||
|
@ -29,13 +29,13 @@ func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
|
func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
|
||||||
for i, dnl := range dnll.list {
|
for i, dnl := range dnll.list {
|
||||||
if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
|
if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
|
||||||
dnll.list = append(dnll.list[:i], dnll.list[i+1:]...)
|
dnll.list = append(dnll.list[:i],dnll.list[i+1:]...)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
|
func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
|
||||||
|
|
|
@ -1,33 +1,34 @@
|
||||||
package util
|
package util
|
||||||
|
|
||||||
func BytesToUint64(b []byte) (v uint64) {
|
func BytesToUint64(b []byte)(v uint64){
|
||||||
length := uint(len(b))
|
length := uint(len(b))
|
||||||
for i := uint(0); i < length-1; i++ {
|
for i :=uint(0);i<length-1;i++ {
|
||||||
v += uint64(b[i])
|
v += uint64(b[i])
|
||||||
v <<= 8
|
v <<= 8
|
||||||
}
|
}
|
||||||
v += uint64(b[length-1])
|
v+=uint64(b[length-1])
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
func BytesToUint32(b []byte) (v uint32) {
|
func BytesToUint32(b []byte)(v uint32){
|
||||||
length := uint(len(b))
|
length := uint(len(b))
|
||||||
for i := uint(0); i < length-1; i++ {
|
for i :=uint(0);i<length-1;i++ {
|
||||||
v += uint32(b[i])
|
v += uint32(b[i])
|
||||||
v <<= 8
|
v <<= 8
|
||||||
}
|
}
|
||||||
v += uint32(b[length-1])
|
v+=uint32(b[length-1])
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
func Uint64toBytes(b []byte, v uint64) {
|
func Uint64toBytes(b []byte, v uint64){
|
||||||
for i := uint(0); i < 8; i++ {
|
for i :=uint(0);i<8;i++ {
|
||||||
b[7-i] = byte(v >> (i * 8))
|
b[7-i] = byte(v>>(i*8))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func Uint32toBytes(b []byte, v uint32) {
|
func Uint32toBytes(b []byte, v uint32){
|
||||||
for i := uint(0); i < 4; i++ {
|
for i :=uint(0);i<4;i++ {
|
||||||
b[3-i] = byte(v >> (i * 8))
|
b[3-i] = byte(v>>(i*8))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func Uint8toBytes(b []byte, v uint8) {
|
func Uint8toBytes(b []byte, v uint8){
|
||||||
b[0] = byte(v)
|
b[0] = byte(v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,16 +1,16 @@
|
||||||
package util
|
package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ParseInt(text string, defaultValue int) int {
|
func ParseInt(text string, defaultValue int) int{
|
||||||
count, parseError := strconv.ParseUint(text, 10, 64)
|
count, parseError := strconv.ParseUint(text,10,64)
|
||||||
if parseError != nil {
|
if parseError!=nil {
|
||||||
if len(text) > 0 {
|
if len(text)>0{
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
return defaultValue
|
return defaultValue
|
||||||
}
|
}
|
||||||
return int(count)
|
return int(count)
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ func Post(url string, values url.Values) ([]byte, error) {
|
||||||
defer r.Body.Close()
|
defer r.Body.Close()
|
||||||
b, err := ioutil.ReadAll(r.Body)
|
b, err := ioutil.ReadAll(r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("read post result from", url, err)
|
log.Println("read post result from", url, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return b, nil
|
return b, nil
|
||||||
|
|
Loading…
Reference in a new issue