diff --git a/go/storage/store.go b/go/storage/store.go index c8d1ca856..0238bbbbd 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -11,28 +11,32 @@ import ( "strings" ) -type Store struct { +type DiskLocation struct { + directory string + maxVolumeCount int volumes map[VolumeId]*Volume - dir string - Port int - Ip string - PublicUrl string - MaxVolumeCount int - +} +type Store struct { + Port int + Ip string + PublicUrl string + locations []*DiskLocation masterNode string dataCenter string //optional informaton, overwriting master setting if exists rack string //optional information, overwriting master setting if exists connected bool volumeSizeLimit uint64 //read from the master - } -func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) { - s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, dir: dirname, MaxVolumeCount: maxVolumeCount} - s.volumes = make(map[VolumeId]*Volume) - s.loadExistingVolumes() - - log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes") +func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) { + s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} + s.locations = make([]*DiskLocation, 0) + for i := 0; i < len(dirnames); i++ { + location := &DiskLocation{directory: dirnames[i], maxVolumeCount: maxVolumeCounts[i]} + location.volumes = make(map[VolumeId]*Volume) + location.loadExistingVolumes() + s.locations = append(s.locations, location) + } return } func (s *Store) AddVolume(volumeListString string, replicationType string) error { @@ -56,7 +60,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error } end, end_err := strconv.ParseUint(pair[1], 10, 64) if end_err != nil { - return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1] ) + return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { if err := s.addVolume(VolumeId(id), rt); err != nil { @@ -67,13 +71,35 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error } return e } +func (s *Store) findVolume(vid VolumeId) *Volume { + for _, location := range s.locations { + if v, found := location.volumes[vid]; found { + return v + } + } + return nil +} +func (s *Store) findFreeLocation() (ret *DiskLocation) { + max := 0 + for _, location := range s.locations { + currentFreeCount := location.maxVolumeCount - len(location.volumes) + if currentFreeCount > max { + max = currentFreeCount + ret = location + } + } + return ret +} func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) { - if s.volumes[vid] != nil { + if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %s already exists!", vid) } - log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType) - s.volumes[vid], err = NewVolume(s.dir, vid, replicationType) - return err + if location := s.findFreeLocation(); location != nil { + log.Println("In dir", location.directory, "adds volume =", vid, ", replicationType =", replicationType) + location.volumes[vid], err = NewVolume(location.directory, vid, replicationType) + return err + } + return fmt.Errorf("No more free space left") } func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { @@ -85,60 +111,76 @@ func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString if e != nil { return fmt.Errorf("garbageThreshold %s is not a valid float number!", garbageThresholdString), false } - return nil, garbageThreshold < s.volumes[vid].garbageLevel() + if v := s.findVolume(vid); v != nil { + return nil, garbageThreshold < v.garbageLevel() + } + return fmt.Errorf("volume id %s is not found during check compact!", vid), false } func (s *Store) CompactVolume(volumeIdString string) error { vid, err := NewVolumeId(volumeIdString) if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString) } - return s.volumes[vid].compact() + if v := s.findVolume(vid); v != nil { + return v.compact() + } + return fmt.Errorf("volume id %s is not found during compact!", vid) } func (s *Store) CommitCompactVolume(volumeIdString string) error { vid, err := NewVolumeId(volumeIdString) if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString) } - return s.volumes[vid].commitCompact() + if v := s.findVolume(vid); v != nil { + return v.commitCompact() + } + return fmt.Errorf("volume id %s is not found during commit compact!", vid) } func (s *Store) FreezeVolume(volumeIdString string) error { vid, err := NewVolumeId(volumeIdString) if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString) } - if s.volumes[vid].readOnly { - return fmt.Errorf("Volume %s is already read-only", volumeIdString) + if v := s.findVolume(vid); v != nil { + if v.readOnly { + return fmt.Errorf("Volume %s is already read-only", volumeIdString) + } + return v.freeze() + } else { + return fmt.Errorf("volume id %s is not found during freeze!", vid) } - return s.volumes[vid].freeze() } -func (s *Store) loadExistingVolumes() { - if dirs, err := ioutil.ReadDir(s.dir); err == nil { +func (l *DiskLocation) loadExistingVolumes() { + if dirs, err := ioutil.ReadDir(l.directory); err == nil { for _, dir := range dirs { name := dir.Name() if !dir.IsDir() && strings.HasSuffix(name, ".dat") { base := name[:len(name)-len(".dat")] if vid, err := NewVolumeId(base); err == nil { - if s.volumes[vid] == nil { - if v, e := NewVolume(s.dir, vid, CopyNil); e == nil { - s.volumes[vid] = v - log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size()) + if l.volumes[vid] == nil { + if v, e := NewVolume(l.directory, vid, CopyNil); e == nil { + l.volumes[vid] = v + log.Println("In dir", l.directory, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size()) } } } } } } + log.Println("Store started on dir:", l.directory, "with", len(l.volumes), "volumes", "max", l.maxVolumeCount) } func (s *Store) Status() []*VolumeInfo { var stats []*VolumeInfo - for k, v := range s.volumes { - s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(), - RepType: v.ReplicaType, Version: v.Version(), - FileCount: v.nm.FileCount(), - DeleteCount: v.nm.DeletedCount(), - DeletedByteCount: v.nm.DeletedSize(), - ReadOnly: v.readOnly} - stats = append(stats, s) + for _, location := range s.locations { + for k, v := range location.volumes { + s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(), + RepType: v.ReplicaType, Version: v.Version(), + FileCount: v.nm.FileCount(), + DeleteCount: v.nm.DeletedCount(), + DeletedByteCount: v.nm.DeletedSize(), + ReadOnly: v.readOnly} + stats = append(stats, s) + } } return stats } @@ -158,14 +200,18 @@ func (s *Store) SetRack(rack string) { } func (s *Store) Join() error { stats := new([]*VolumeInfo) - for k, v := range s.volumes { - s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()), - RepType: v.ReplicaType, Version: v.Version(), - FileCount: v.nm.FileCount(), - DeleteCount: v.nm.DeletedCount(), - DeletedByteCount: v.nm.DeletedSize(), - ReadOnly: v.readOnly} - *stats = append(*stats, s) + maxVolumeCount := 0 + for _, location := range s.locations { + maxVolumeCount = maxVolumeCount + location.maxVolumeCount + for k, v := range location.volumes { + s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()), + RepType: v.ReplicaType, Version: v.Version(), + FileCount: v.nm.FileCount(), + DeleteCount: v.nm.DeletedCount(), + DeletedByteCount: v.nm.DeletedSize(), + ReadOnly: v.readOnly} + *stats = append(*stats, s) + } } bytes, _ := json.Marshal(stats) values := make(url.Values) @@ -176,7 +222,7 @@ func (s *Store) Join() error { values.Add("ip", s.Ip) values.Add("publicUrl", s.PublicUrl) values.Add("volumes", string(bytes)) - values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount)) + values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount)) values.Add("dataCenter", s.dataCenter) values.Add("rack", s.rack) jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values) @@ -192,12 +238,14 @@ func (s *Store) Join() error { return nil } func (s *Store) Close() { - for _, v := range s.volumes { - v.Close() + for _, location := range s.locations { + for _, v := range location.volumes { + v.Close() + } } } func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { - if v := s.volumes[i]; v != nil { + if v := s.findVolume(i); v != nil { if v.readOnly { err = fmt.Errorf("Volume %s is read only!", i) return @@ -221,22 +269,22 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { return } func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { - if v := s.volumes[i]; v != nil && !v.readOnly { + if v := s.findVolume(i); v != nil && !v.readOnly { return v.delete(n) } return 0, nil } func (s *Store) Read(i VolumeId, n *Needle) (int, error) { - if v := s.volumes[i]; v != nil { + if v := s.findVolume(i); v != nil { return v.read(n) } return 0, fmt.Errorf("Volume %s not found!", i) } func (s *Store) GetVolume(i VolumeId) *Volume { - return s.volumes[i] + return s.findVolume(i) } func (s *Store) HasVolume(i VolumeId) bool { - _, ok := s.volumes[i] - return ok + v := s.findVolume(i) + return v != nil } diff --git a/go/weed/volume.go b/go/weed/volume.go index 759200927..ecd547860 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -29,17 +29,17 @@ var cmdVolume = &Command{ } var ( - vport = cmdVolume.Flag.Int("port", 8080, "http listen port") - volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "directory to store data files") - ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name") - publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible :") - masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") - vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting") - maxVolumeCount = cmdVolume.Flag.Int("max", 7, "maximum number of volumes") - vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds") - vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") - rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") + vport = cmdVolume.Flag.Int("port", 8080, "http listen port") + volumeFolders = cmdVolume.Flag.String("dir", "/tmp", "directories to store data files. dir[,dir]...") + maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...") + ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name") + publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible :") + masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") + vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting") + vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds") + vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") + rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") store *storage.Store ) @@ -295,21 +295,37 @@ func runVolume(cmd *Command, args []string) bool { *vMaxCpu = runtime.NumCPU() } runtime.GOMAXPROCS(*vMaxCpu) - fileInfo, err := os.Stat(*volumeFolder) - if err != nil { - log.Fatalf("No Existing Folder:%s", *volumeFolder) + folders := strings.Split(*volumeFolders, ",") + maxCountStrings := strings.Split(*maxVolumeCounts, ",") + maxCounts := make([]int, 0) + for _, maxString := range maxCountStrings { + if max, e := strconv.Atoi(maxString); e == nil { + maxCounts = append(maxCounts, max) + } else { + log.Fatalf("The max specified in -max not a valid number %s", max) + } } - if !fileInfo.IsDir() { - log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder) + if len(folders) != len(maxCounts) { + log.Fatalf("%d directories by -dir, but only %d max is set by -max", len(folders), len(maxCounts)) + } + for _, folder := range folders { + fileInfo, err := os.Stat(folder) + if err != nil { + log.Fatalf("No Existing Folder:%s", folder) + } + if !fileInfo.IsDir() { + log.Fatalf("Volume Folder should not be a file:%s", folder) + } + perm := fileInfo.Mode().Perm() + log.Println("Volume Folder", folder) + log.Println("Permission:", perm) } - perm := fileInfo.Mode().Perm() - log.Println("Volume Folder permission:", perm) if *publicUrl == "" { *publicUrl = *ip + ":" + strconv.Itoa(*vport) } - store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount) + store = storage.NewStore(*vport, *ip, *publicUrl, folders, maxCounts) defer store.Close() http.HandleFunc("/", storeHandler) http.HandleFunc("/status", statusHandler)