add support for multiple folders and multiple max limit: eg

-dir=folder1,folder2,folder3 -max=7,8,9
This commit is contained in:
Chris Lu 2013-07-13 11:38:01 -07:00
parent 175456870a
commit d4105f9b46
2 changed files with 139 additions and 75 deletions

View file

@ -11,28 +11,32 @@ import (
"strings"
)
type Store struct {
type DiskLocation struct {
directory string
maxVolumeCount int
volumes map[VolumeId]*Volume
dir string
Port int
Ip string
PublicUrl string
MaxVolumeCount int
}
type Store struct {
Port int
Ip string
PublicUrl string
locations []*DiskLocation
masterNode string
dataCenter string //optional informaton, overwriting master setting if exists
rack string //optional information, overwriting master setting if exists
connected bool
volumeSizeLimit uint64 //read from the master
}
func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) {
s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, dir: dirname, MaxVolumeCount: maxVolumeCount}
s.volumes = make(map[VolumeId]*Volume)
s.loadExistingVolumes()
log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes")
func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
s.locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
location := &DiskLocation{directory: dirnames[i], maxVolumeCount: maxVolumeCounts[i]}
location.volumes = make(map[VolumeId]*Volume)
location.loadExistingVolumes()
s.locations = append(s.locations, location)
}
return
}
func (s *Store) AddVolume(volumeListString string, replicationType string) error {
@ -56,7 +60,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
}
end, end_err := strconv.ParseUint(pair[1], 10, 64)
if end_err != nil {
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1] )
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
if err := s.addVolume(VolumeId(id), rt); err != nil {
@ -67,13 +71,35 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
}
return e
}
func (s *Store) findVolume(vid VolumeId) *Volume {
for _, location := range s.locations {
if v, found := location.volumes[vid]; found {
return v
}
}
return nil
}
func (s *Store) findFreeLocation() (ret *DiskLocation) {
max := 0
for _, location := range s.locations {
currentFreeCount := location.maxVolumeCount - len(location.volumes)
if currentFreeCount > max {
max = currentFreeCount
ret = location
}
}
return ret
}
func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) {
if s.volumes[vid] != nil {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %s already exists!", vid)
}
log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType)
s.volumes[vid], err = NewVolume(s.dir, vid, replicationType)
return err
if location := s.findFreeLocation(); location != nil {
log.Println("In dir", location.directory, "adds volume =", vid, ", replicationType =", replicationType)
location.volumes[vid], err = NewVolume(location.directory, vid, replicationType)
return err
}
return fmt.Errorf("No more free space left")
}
func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
@ -85,60 +111,76 @@ func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString
if e != nil {
return fmt.Errorf("garbageThreshold %s is not a valid float number!", garbageThresholdString), false
}
return nil, garbageThreshold < s.volumes[vid].garbageLevel()
if v := s.findVolume(vid); v != nil {
return nil, garbageThreshold < v.garbageLevel()
}
return fmt.Errorf("volume id %s is not found during check compact!", vid), false
}
func (s *Store) CompactVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
}
return s.volumes[vid].compact()
if v := s.findVolume(vid); v != nil {
return v.compact()
}
return fmt.Errorf("volume id %s is not found during compact!", vid)
}
func (s *Store) CommitCompactVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
}
return s.volumes[vid].commitCompact()
if v := s.findVolume(vid); v != nil {
return v.commitCompact()
}
return fmt.Errorf("volume id %s is not found during commit compact!", vid)
}
func (s *Store) FreezeVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
}
if s.volumes[vid].readOnly {
return fmt.Errorf("Volume %s is already read-only", volumeIdString)
if v := s.findVolume(vid); v != nil {
if v.readOnly {
return fmt.Errorf("Volume %s is already read-only", volumeIdString)
}
return v.freeze()
} else {
return fmt.Errorf("volume id %s is not found during freeze!", vid)
}
return s.volumes[vid].freeze()
}
func (s *Store) loadExistingVolumes() {
if dirs, err := ioutil.ReadDir(s.dir); err == nil {
func (l *DiskLocation) loadExistingVolumes() {
if dirs, err := ioutil.ReadDir(l.directory); err == nil {
for _, dir := range dirs {
name := dir.Name()
if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
base := name[:len(name)-len(".dat")]
if vid, err := NewVolumeId(base); err == nil {
if s.volumes[vid] == nil {
if v, e := NewVolume(s.dir, vid, CopyNil); e == nil {
s.volumes[vid] = v
log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
if l.volumes[vid] == nil {
if v, e := NewVolume(l.directory, vid, CopyNil); e == nil {
l.volumes[vid] = v
log.Println("In dir", l.directory, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
}
}
}
}
}
}
log.Println("Store started on dir:", l.directory, "with", len(l.volumes), "volumes", "max", l.maxVolumeCount)
}
func (s *Store) Status() []*VolumeInfo {
var stats []*VolumeInfo
for k, v := range s.volumes {
s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
RepType: v.ReplicaType, Version: v.Version(),
FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(),
DeletedByteCount: v.nm.DeletedSize(),
ReadOnly: v.readOnly}
stats = append(stats, s)
for _, location := range s.locations {
for k, v := range location.volumes {
s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
RepType: v.ReplicaType, Version: v.Version(),
FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(),
DeletedByteCount: v.nm.DeletedSize(),
ReadOnly: v.readOnly}
stats = append(stats, s)
}
}
return stats
}
@ -158,14 +200,18 @@ func (s *Store) SetRack(rack string) {
}
func (s *Store) Join() error {
stats := new([]*VolumeInfo)
for k, v := range s.volumes {
s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
RepType: v.ReplicaType, Version: v.Version(),
FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(),
DeletedByteCount: v.nm.DeletedSize(),
ReadOnly: v.readOnly}
*stats = append(*stats, s)
maxVolumeCount := 0
for _, location := range s.locations {
maxVolumeCount = maxVolumeCount + location.maxVolumeCount
for k, v := range location.volumes {
s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
RepType: v.ReplicaType, Version: v.Version(),
FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(),
DeletedByteCount: v.nm.DeletedSize(),
ReadOnly: v.readOnly}
*stats = append(*stats, s)
}
}
bytes, _ := json.Marshal(stats)
values := make(url.Values)
@ -176,7 +222,7 @@ func (s *Store) Join() error {
values.Add("ip", s.Ip)
values.Add("publicUrl", s.PublicUrl)
values.Add("volumes", string(bytes))
values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount))
values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount))
values.Add("dataCenter", s.dataCenter)
values.Add("rack", s.rack)
jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
@ -192,12 +238,14 @@ func (s *Store) Join() error {
return nil
}
func (s *Store) Close() {
for _, v := range s.volumes {
v.Close()
for _, location := range s.locations {
for _, v := range location.volumes {
v.Close()
}
}
}
func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
if v := s.volumes[i]; v != nil {
if v := s.findVolume(i); v != nil {
if v.readOnly {
err = fmt.Errorf("Volume %s is read only!", i)
return
@ -221,22 +269,22 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
return
}
func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
if v := s.volumes[i]; v != nil && !v.readOnly {
if v := s.findVolume(i); v != nil && !v.readOnly {
return v.delete(n)
}
return 0, nil
}
func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
if v := s.volumes[i]; v != nil {
if v := s.findVolume(i); v != nil {
return v.read(n)
}
return 0, fmt.Errorf("Volume %s not found!", i)
}
func (s *Store) GetVolume(i VolumeId) *Volume {
return s.volumes[i]
return s.findVolume(i)
}
func (s *Store) HasVolume(i VolumeId) bool {
_, ok := s.volumes[i]
return ok
v := s.findVolume(i)
return v != nil
}

View file

@ -29,17 +29,17 @@ var cmdVolume = &Command{
}
var (
vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "directory to store data files")
ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name")
publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
maxVolumeCount = cmdVolume.Flag.Int("max", 7, "maximum number of volumes")
vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
volumeFolders = cmdVolume.Flag.String("dir", "/tmp", "directories to store data files. dir[,dir]...")
maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name")
publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
store *storage.Store
)
@ -295,21 +295,37 @@ func runVolume(cmd *Command, args []string) bool {
*vMaxCpu = runtime.NumCPU()
}
runtime.GOMAXPROCS(*vMaxCpu)
fileInfo, err := os.Stat(*volumeFolder)
if err != nil {
log.Fatalf("No Existing Folder:%s", *volumeFolder)
folders := strings.Split(*volumeFolders, ",")
maxCountStrings := strings.Split(*maxVolumeCounts, ",")
maxCounts := make([]int, 0)
for _, maxString := range maxCountStrings {
if max, e := strconv.Atoi(maxString); e == nil {
maxCounts = append(maxCounts, max)
} else {
log.Fatalf("The max specified in -max not a valid number %s", max)
}
}
if !fileInfo.IsDir() {
log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder)
if len(folders) != len(maxCounts) {
log.Fatalf("%d directories by -dir, but only %d max is set by -max", len(folders), len(maxCounts))
}
for _, folder := range folders {
fileInfo, err := os.Stat(folder)
if err != nil {
log.Fatalf("No Existing Folder:%s", folder)
}
if !fileInfo.IsDir() {
log.Fatalf("Volume Folder should not be a file:%s", folder)
}
perm := fileInfo.Mode().Perm()
log.Println("Volume Folder", folder)
log.Println("Permission:", perm)
}
perm := fileInfo.Mode().Perm()
log.Println("Volume Folder permission:", perm)
if *publicUrl == "" {
*publicUrl = *ip + ":" + strconv.Itoa(*vport)
}
store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount)
store = storage.NewStore(*vport, *ip, *publicUrl, folders, maxCounts)
defer store.Close()
http.HandleFunc("/", storeHandler)
http.HandleFunc("/status", statusHandler)