diff --git a/weed-fs/src/cmd/weed/command.go b/weed-fs/src/cmd/weed/command.go index 77adcac66..8c725cafb 100644 --- a/weed-fs/src/cmd/weed/command.go +++ b/weed-fs/src/cmd/weed/command.go @@ -38,7 +38,10 @@ func (c *Command) Name() string { } func (c *Command) Usage() { - fmt.Fprintf(os.Stderr, "Usage: %s\n", c.UsageLine) + fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) + fmt.Fprintf(os.Stderr, "Default Usage:\n") + c.Flag.PrintDefaults() + fmt.Fprintf(os.Stderr, "Description:\n") fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) os.Exit(2) } diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go index 014cc0d8b..17b72618b 100644 --- a/weed-fs/src/cmd/weed/master.go +++ b/weed-fs/src/cmd/weed/master.go @@ -8,6 +8,7 @@ import ( "pkg/storage" "strconv" "strings" + "time" ) func init() { @@ -30,6 +31,7 @@ var ( capacity = cmdMaster.Flag.Int("capacity", 100, "maximum number of volumes to hold") mapper *directory.Mapper volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") + mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") ) func dirLookupHandler(w http.ResponseWriter, r *http.Request) { @@ -64,7 +66,7 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) { if *IsDebug { log.Println(s, "volumes", r.FormValue("volumes")) } - mapper.Add(directory.NewMachine(s, publicUrl, *volumes)) + mapper.Add(directory.NewMachine(s, publicUrl, *volumes, time.Now().Unix())) } func dirStatusHandler(w http.ResponseWriter, r *http.Request) { writeJson(w, r, mapper) @@ -72,7 +74,7 @@ func dirStatusHandler(w http.ResponseWriter, r *http.Request) { func runMaster(cmd *Command, args []string) bool { log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") - mapper = directory.NewMapper(*metaFolder, "directory", uint64(*volumeSizeLimitMB)*1024*1024) + mapper = directory.NewMapper(*metaFolder, "directory", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse) http.HandleFunc("/dir/assign", dirAssignHandler) http.HandleFunc("/dir/lookup", dirLookupHandler) http.HandleFunc("/dir/join", dirJoinHandler) diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index 2514d8b01..00b4c2fbb 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -25,12 +25,12 @@ var cmdVolume = &Command{ } var ( - vport = cmdVolume.Flag.Int("port", 8080, "http listen port") + vport = cmdVolume.Flag.Int("port", 8080, "http listen port") chunkFolder = cmdVolume.Flag.String("dir", "/tmp", "data directory to store files") volumes = cmdVolume.Flag.String("volumes", "0,1-3,4", "comma-separated list of volume ids or range of ids") publicUrl = cmdVolume.Flag.String("publicUrl", "localhost:8080", "public url to serve data read") metaServer = cmdVolume.Flag.String("mserver", "localhost:9333", "master directory server to store mappings") - pulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") store *storage.Store ) @@ -85,7 +85,7 @@ func GetHandler(w http.ResponseWriter, r *http.Request) { } func PostHandler(w http.ResponseWriter, r *http.Request) { vid, _, _ := parseURLPath(r.URL.Path) - volumeId, e := storage.NewVolumeId(vid) + volumeId, e := storage.NewVolumeId(vid) if e != nil { writeJson(w, r, e) } else { @@ -103,7 +103,7 @@ func PostHandler(w http.ResponseWriter, r *http.Request) { func DeleteHandler(w http.ResponseWriter, r *http.Request) { n := new(storage.Needle) vid, fid, _ := parseURLPath(r.URL.Path) - volumeId, _ := storage.NewVolumeId(vid) + volumeId, _ := storage.NewVolumeId(vid) n.ParsePath(fid) if *IsDebug { @@ -162,7 +162,7 @@ func runVolume(cmd *Command, args []string) bool { go func() { for { store.Join(*metaServer) - time.Sleep(time.Duration(float32(*pulse*1e3)*(1+rand.Float32())) * time.Millisecond) + time.Sleep(time.Duration(float32(*vpulse*1e3)*(1+rand.Float32())) * time.Millisecond) } }() log.Println("store joined at", *metaServer) diff --git a/weed-fs/src/cmd/weed/weed.go b/weed-fs/src/cmd/weed/weed.go index 15ace50ef..0cff9bfeb 100644 --- a/weed-fs/src/cmd/weed/weed.go +++ b/weed-fs/src/cmd/weed/weed.go @@ -52,7 +52,7 @@ func main() { if args[0] == "help" { help(args[1:]) for _, cmd := range commands { - if len(args)>2 && cmd.Name() == args[1] && cmd.Run != nil { + if len(args)>=2 && cmd.Name() == args[1] && cmd.Run != nil { fmt.Fprintf(os.Stderr, "Default Parameters:\n") cmd.Flag.PrintDefaults() } @@ -93,12 +93,6 @@ The commands are: Use "weed help [command]" for more information about a command. -Additional help topics: -{{range .}}{{if not .Runnable}} - {{.Name | printf "%-11s"}} {{.Short}}{{end}}{{end}} - -Use "weed help [topic]" for more information about that topic. - ` var helpTemplate = `{{if .Runnable}}Usage: weed {{.UsageLine}} diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index 67cf0eaa8..4a1a12663 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -7,36 +7,38 @@ import ( "pkg/sequence" "pkg/storage" "pkg/util" - "sync" + "time" ) type Machine struct { Volumes []storage.VolumeInfo Url string //[:port] PublicUrl string + LastSeen int64 // unix time in seconds } type Mapper struct { - volumeLock sync.Mutex - Machines map[string]*Machine - vid2machineId map[storage.VolumeId]*Machine //machineId is +1 of the index of []*Machine, to detect not found entries - Writers []storage.VolumeId // transient array of Writers volume id + Machines map[string]*Machine + vid2machine map[storage.VolumeId]*Machine + Writers []storage.VolumeId // transient array of Writers volume id + pulse int64 volumeSizeLimit uint64 sequence sequence.Sequencer } -func NewMachine(server, publicUrl string, volumes []storage.VolumeInfo) *Machine { - return &Machine{Url: server, PublicUrl: publicUrl, Volumes: volumes} +func NewMachine(server, publicUrl string, volumes []storage.VolumeInfo, lastSeen int64) *Machine { + return &Machine{Url: server, PublicUrl: publicUrl, Volumes: volumes, LastSeen: lastSeen} } -func NewMapper(dirname string, filename string, volumeSizeLimit uint64) (m *Mapper) { +func NewMapper(dirname string, filename string, volumeSizeLimit uint64, pulse int) (m *Mapper) { m = &Mapper{} - m.vid2machineId = make(map[storage.VolumeId]*Machine) + m.vid2machine = make(map[storage.VolumeId]*Machine) m.volumeSizeLimit = volumeSizeLimit m.Writers = *new([]storage.VolumeId) m.Machines = make(map[string]*Machine) + m.pulse = int64(pulse) m.sequence = sequence.NewSequencer(dirname, filename) @@ -49,7 +51,7 @@ func (m *Mapper) PickForWrite(c string) (string, int, *Machine, error) { return "", 0, nil, errors.New("No more writable volumes!") } vid := m.Writers[rand.Intn(len_writers)] - machine := m.vid2machineId[vid] + machine := m.vid2machine[vid] if machine != nil { fileId, count := m.sequence.NextFileId(util.ParseInt(c, 1)) if count == 0 { @@ -60,29 +62,39 @@ func (m *Mapper) PickForWrite(c string) (string, int, *Machine, error) { return "", 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") } func (m *Mapper) Get(vid storage.VolumeId) (*Machine, error) { - machine := m.vid2machineId[vid] + machine := m.vid2machine[vid] if machine == nil { return nil, errors.New("invalid volume id " + vid.String()) } return machine, nil } func (m *Mapper) Add(machine *Machine) { - //check existing machine, linearly - //log.Println("Adding machine", machine.Server.Url) - m.volumeLock.Lock() m.Machines[machine.Url] = machine - m.volumeLock.Unlock() - - //add to vid2machineId map, and Writers array + //add to vid2machine map, and Writers array for _, v := range machine.Volumes { - m.vid2machineId[v.Id] = machine + m.vid2machine[v.Id] = machine } + m.refreshWritableVolumes() +} +func (m *Mapper) StartRefreshWritableVolumes() { + go func() { + for { + m.refreshWritableVolumes() + time.Sleep(time.Duration(float32(m.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) + } + }() +} + +func (m *Mapper) refreshWritableVolumes() { + freshThreshHold := time.Now().Unix() - 5*m.pulse //5 times of sleep interval //setting Writers, copy-on-write because of possible updating, this needs some future work! var writers []storage.VolumeId for _, machine_entry := range m.Machines { - for _, v := range machine_entry.Volumes { - if uint64(v.Size) < m.volumeSizeLimit { - writers = append(writers, v.Id) + if machine_entry.LastSeen > freshThreshHold { + for _, v := range machine_entry.Volumes { + if uint64(v.Size) < m.volumeSizeLimit { + writers = append(writers, v.Id) + } } } }