reading/setting/reporting correct volume version

This commit is contained in:
Chris Lu 2012-12-17 17:51:39 -08:00
parent 6c8810e4d2
commit ab5e9727a9
5 changed files with 11 additions and 7 deletions

View file

@ -182,7 +182,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error {
for _, server := range servers { for _, server := range servers {
if err := operation.AllocateVolume(server, vid, repType); err == nil { if err := operation.AllocateVolume(server, vid, repType); err == nil {
vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType} vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version:storage.CurrentVersion}
server.AddOrUpdateVolume(vi) server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(&vi, server) topo.RegisterVolumeLayout(&vi, server)
fmt.Println("Created Volume", vid, "on", server) fmt.Println("Created Volume", vid, "on", server)

View file

@ -96,7 +96,7 @@ func setup(topologyLayout string) *topology.Topology {
rack.LinkChildNode(server) rack.LinkChildNode(server)
for _, v := range serverMap["volumes"].([]interface{}) { for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{}) m := v.(map[string]interface{})
vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64))} vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion}
server.AddOrUpdateVolume(vi) server.AddOrUpdateVolume(vi)
} }
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))

View file

@ -108,7 +108,7 @@ func (s *Store) loadExistingVolumes() {
if s.volumes[vid] == nil { if s.volumes[vid] == nil {
v := NewVolume(s.dir, vid, CopyNil) v := NewVolume(s.dir, vid, CopyNil)
s.volumes[vid] = v s.volumes[vid] = v
log.Println("In dir", s.dir, "reads volume = ", vid, ", replicationType =", v.replicaType) log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version,"size =", v.Size())
} }
} }
} }
@ -119,7 +119,7 @@ func (s *Store) Status() []*VolumeInfo {
var stats []*VolumeInfo var stats []*VolumeInfo
for k, v := range s.volumes { for k, v := range s.volumes {
s := new(VolumeInfo) s := new(VolumeInfo)
s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.ContentSize(), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.ContentSize(), v.replicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
stats = append(stats, s) stats = append(stats, s)
} }
return stats return stats
@ -136,7 +136,7 @@ func (s *Store) Join() error {
stats := new([]*VolumeInfo) stats := new([]*VolumeInfo)
for k, v := range s.volumes { for k, v := range s.volumes {
s := new(VolumeInfo) s := new(VolumeInfo)
s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), uint64(v.Size()), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), uint64(v.Size()), v.replicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
*stats = append(*stats, s) *stats = append(*stats, s)
} }
bytes, _ := json.Marshal(stats) bytes, _ := json.Marshal(stats)

View file

@ -53,6 +53,9 @@ func (v *Volume) load() error {
v.nm = LoadNeedleMap(indexFile) v.nm = LoadNeedleMap(indexFile)
return nil return nil
} }
func (v *Volume) Version() Version {
return CurrentVersion
}
func (v *Volume) Size() int64 { func (v *Volume) Size() int64 {
v.accessLock.Lock() v.accessLock.Lock()
defer v.accessLock.Unlock() defer v.accessLock.Unlock()
@ -72,8 +75,9 @@ func (v *Volume) Close() {
func (v *Volume) maybeWriteSuperBlock() { func (v *Volume) maybeWriteSuperBlock() {
stat, _ := v.dataFile.Stat() stat, _ := v.dataFile.Stat()
if stat.Size() == 0 { if stat.Size() == 0 {
v.version = CurrentVersion
header := make([]byte, SuperBlockSize) header := make([]byte, SuperBlockSize)
header[0] = byte(CurrentVersion) header[0] = byte(v.version)
header[1] = v.replicaType.Byte() header[1] = v.replicaType.Byte()
v.dataFile.Write(header) v.dataFile.Write(header)
} }

View file

@ -94,7 +94,7 @@ func setup(topologyLayout string) *Topology {
rack.LinkChildNode(server) rack.LinkChildNode(server)
for _, v := range serverMap["volumes"].([]interface{}) { for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{}) m := v.(map[string]interface{})
vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64))} vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion}
server.AddOrUpdateVolume(vi) server.AddOrUpdateVolume(vi)
} }
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))