reporting volume size as early as possible

This commit is contained in:
Chris Lu 2012-12-03 22:54:08 -08:00
parent b3df7673ed
commit 6201ed537e
6 changed files with 74 additions and 38 deletions

View file

@ -114,6 +114,9 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
json.Unmarshal([]byte(r.FormValue("volumes")), volumes) json.Unmarshal([]byte(r.FormValue("volumes")), volumes)
debug(s, "volumes", r.FormValue("volumes")) debug(s, "volumes", r.FormValue("volumes"))
topo.RegisterVolumes(*volumes, ip, port, publicUrl, maxVolumeCount) topo.RegisterVolumes(*volumes, ip, port, publicUrl, maxVolumeCount)
m := make(map[string]interface{})
m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB)*1024*1024
writeJson(w, r, m)
} }
func dirStatusHandler(w http.ResponseWriter, r *http.Request) { func dirStatusHandler(w http.ResponseWriter, r *http.Request) {

View file

@ -317,8 +317,9 @@ func runVolume(cmd *Command, args []string) bool {
go func() { go func() {
connected := true connected := true
store.SetMaster(*masterNode)
for { for {
err := store.Join(*masterNode) err := store.Join()
if err == nil { if err == nil {
if !connected { if !connected {
connected = true connected = true

View file

@ -15,7 +15,8 @@ type NeedleMap struct {
deletionCounter int deletionCounter int
fileCounter int fileCounter int
deletionByteCounter uint32 deletionByteCounter uint64
fileByteCounter uint64
} }
func NewNeedleMap(file *os.File) *NeedleMap { func NewNeedleMap(file *os.File) *NeedleMap {
@ -44,19 +45,20 @@ func LoadNeedleMap(file *os.File) *NeedleMap {
key := util.BytesToUint64(bytes[i : i+8]) key := util.BytesToUint64(bytes[i : i+8])
offset := util.BytesToUint32(bytes[i+8 : i+12]) offset := util.BytesToUint32(bytes[i+8 : i+12])
size := util.BytesToUint32(bytes[i+12 : i+16]) size := util.BytesToUint32(bytes[i+12 : i+16])
nm.fileCounter++
nm.fileByteCounter = nm.fileByteCounter + uint64(size)
if offset > 0 { if offset > 0 {
oldSize := nm.m.Set(Key(key), offset, size) oldSize := nm.m.Set(Key(key), offset, size)
//log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
nm.fileCounter++
if oldSize > 0 { if oldSize > 0 {
nm.deletionCounter++ nm.deletionCounter++
nm.deletionByteCounter = nm.deletionByteCounter + oldSize nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
} }
} else { } else {
nm.m.Delete(Key(key)) nm.m.Delete(Key(key))
//log.Println("removing key", key) //log.Println("removing key", key)
nm.deletionCounter++ nm.deletionCounter++
nm.deletionByteCounter = nm.deletionByteCounter + size nm.deletionByteCounter = nm.deletionByteCounter + uint64(size)
} }
} }
@ -71,9 +73,10 @@ func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
util.Uint32toBytes(nm.bytes[8:12], offset) util.Uint32toBytes(nm.bytes[8:12], offset)
util.Uint32toBytes(nm.bytes[12:16], size) util.Uint32toBytes(nm.bytes[12:16], size)
nm.fileCounter++ nm.fileCounter++
nm.fileByteCounter = nm.fileByteCounter + uint64(size)
if oldSize > 0 { if oldSize > 0 {
nm.deletionCounter++ nm.deletionCounter++
nm.deletionByteCounter = nm.deletionByteCounter + oldSize nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
} }
return nm.indexFile.Write(nm.bytes) return nm.indexFile.Write(nm.bytes)
} }
@ -82,7 +85,7 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
return return
} }
func (nm *NeedleMap) Delete(key uint64) { func (nm *NeedleMap) Delete(key uint64) {
nm.deletionByteCounter = nm.deletionByteCounter + nm.m.Delete(Key(key)) nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key)))
util.Uint64toBytes(nm.bytes[0:8], key) util.Uint64toBytes(nm.bytes[0:8], key)
util.Uint32toBytes(nm.bytes[8:12], 0) util.Uint32toBytes(nm.bytes[8:12], 0)
util.Uint32toBytes(nm.bytes[12:16], 0) util.Uint32toBytes(nm.bytes[12:16], 0)
@ -92,3 +95,6 @@ func (nm *NeedleMap) Delete(key uint64) {
func (nm *NeedleMap) Close() { func (nm *NeedleMap) Close() {
nm.indexFile.Close() nm.indexFile.Close()
} }
func (nm *NeedleMap) ContentSize() uint64 {
return nm.fileByteCounter
}

View file

@ -18,6 +18,10 @@ type Store struct {
Ip string Ip string
PublicUrl string PublicUrl string
MaxVolumeCount int MaxVolumeCount int
//read from the master
masterNode string
volumeSizeLimit uint64
} }
func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) { func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) {
@ -70,15 +74,15 @@ func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error {
} }
func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
vid, err := NewVolumeId(volumeIdString) vid, err := NewVolumeId(volumeIdString)
if err != nil { if err != nil {
return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!"), false return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!"), false
} }
garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32) garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32)
if e != nil { if e != nil {
return errors.New("garbageThreshold " + garbageThresholdString + " is not a valid float number!"), false return errors.New("garbageThreshold " + garbageThresholdString + " is not a valid float number!"), false
} }
return nil, garbageThreshold < s.volumes[vid].garbageLevel() return nil, garbageThreshold < s.volumes[vid].garbageLevel()
} }
func (s *Store) CompactVolume(volumeIdString string) error { func (s *Store) CompactVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString) vid, err := NewVolumeId(volumeIdString)
@ -87,12 +91,12 @@ func (s *Store) CompactVolume(volumeIdString string) error {
} }
return s.volumes[vid].compact() return s.volumes[vid].compact()
} }
func (s *Store) CommitCompactVolume(volumeIdString string) (error) { func (s *Store) CommitCompactVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString) vid, err := NewVolumeId(volumeIdString)
if err != nil { if err != nil {
return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!") return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
} }
return s.volumes[vid].commitCompact() return s.volumes[vid].commitCompact()
} }
func (s *Store) loadExistingVolumes() { func (s *Store) loadExistingVolumes() {
if dirs, err := ioutil.ReadDir(s.dir); err == nil { if dirs, err := ioutil.ReadDir(s.dir); err == nil {
@ -115,16 +119,24 @@ func (s *Store) Status() []*VolumeInfo {
var stats []*VolumeInfo var stats []*VolumeInfo
for k, v := range s.volumes { for k, v := range s.volumes {
s := new(VolumeInfo) s := new(VolumeInfo)
s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.Size(), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.ContentSize(), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
stats = append(stats, s) stats = append(stats, s)
} }
return stats return stats
} }
func (s *Store) Join(mserver string) error {
type JoinResult struct {
VolumeSizeLimit uint64
}
func (s *Store) SetMaster(mserver string) {
s.masterNode = mserver
}
func (s *Store) Join() error {
stats := new([]*VolumeInfo) stats := new([]*VolumeInfo)
for k, v := range s.volumes { for k, v := range s.volumes {
s := new(VolumeInfo) s := new(VolumeInfo)
s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.Size(), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), uint64(v.Size()), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
*stats = append(*stats, s) *stats = append(*stats, s)
} }
bytes, _ := json.Marshal(stats) bytes, _ := json.Marshal(stats)
@ -134,8 +146,16 @@ func (s *Store) Join(mserver string) error {
values.Add("publicUrl", s.PublicUrl) values.Add("publicUrl", s.PublicUrl)
values.Add("volumes", string(bytes)) values.Add("volumes", string(bytes))
values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount)) values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount))
_, err := util.Post("http://"+mserver+"/dir/join", values) jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
return err if err != nil {
return err
}
var ret JoinResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
return err
}
s.volumeSizeLimit = ret.VolumeSizeLimit
return nil
} }
func (s *Store) Close() { func (s *Store) Close() {
for _, v := range s.volumes { for _, v := range s.volumes {
@ -144,9 +164,13 @@ func (s *Store) Close() {
} }
func (s *Store) Write(i VolumeId, n *Needle) uint32 { func (s *Store) Write(i VolumeId, n *Needle) uint32 {
if v := s.volumes[i]; v != nil { if v := s.volumes[i]; v != nil {
return v.write(n) size := v.write(n)
if s.volumeSizeLimit < v.ContentSize()+uint64(size) {
s.Join()
}
return size
} }
log.Println("volume",i, "not found!") log.Println("volume", i, "not found!")
return 0 return 0
} }
func (s *Store) Delete(i VolumeId, n *Needle) uint32 { func (s *Store) Delete(i VolumeId, n *Needle) uint32 {

View file

@ -132,7 +132,7 @@ func (v *Volume) read(n *Needle) (int, error) {
} }
func (v *Volume) garbageLevel() float64 { func (v *Volume) garbageLevel() float64 {
return float64(v.nm.deletionByteCounter)/float64(v.Size()) return float64(v.nm.deletionByteCounter)/float64(v.ContentSize())
} }
func (v *Volume) compact() error { func (v *Volume) compact() error {
@ -212,3 +212,6 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string)
return nil return nil
} }
func (v *Volume) ContentSize() uint64{
return v.nm.fileByteCounter
}

View file

@ -1,13 +1,12 @@
package storage package storage
import ( import ()
)
type VolumeInfo struct { type VolumeInfo struct {
Id VolumeId Id VolumeId
Size int64 Size uint64
RepType ReplicationType RepType ReplicationType
FileCount int FileCount int
DeleteCount int DeleteCount int
DeletedByteCount uint32 DeletedByteCount uint64
} }