Revert "add Frozen attribute to VolumeInfo"

This reverts commit dd685fdd8d.
This commit is contained in:
Chris Lu 2013-01-17 00:14:58 -08:00
parent 3b21317863
commit 5b49065a57
6 changed files with 7 additions and 55 deletions

View file

@ -107,14 +107,8 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
if ip == "" { if ip == "" {
ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
} }
port, err := strconv.Atoi(r.FormValue("port")) port, _ := strconv.Atoi(r.FormValue("port"))
if err != nil { maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
log.Printf("ERROR bad port number %s: %s", r.FormValue("port"), err)
}
maxVolumeCount, err := strconv.Atoi(r.FormValue("maxVolumeCount"))
if err != nil {
log.Printf("ERROR bad maxVolumeCount %s: %s", r.FormValue("maxVolumeCount"), err)
}
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
publicUrl := r.FormValue("publicUrl") publicUrl := r.FormValue("publicUrl")
volumes := new([]storage.VolumeInfo) volumes := new([]storage.VolumeInfo)

View file

@ -1,7 +1,6 @@
package storage package storage
import ( import (
"errors"
"io" "io"
"log" "log"
"os" "os"
@ -61,16 +60,10 @@ func NewFrozenNeedleMap(file *os.File) (*NeedleMap, error) {
}, nil }, nil
} }
func (nm NeedleMap) IsFrozen() bool {
return nm.m == nil && nm.fm != nil
}
const ( const (
RowsToRead = 1024 RowsToRead = 1024
) )
var MapIsFrozen = errors.New("Map is frozen!")
func LoadNeedleMap(file *os.File) (*NeedleMap, error) { func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewNeedleMap(file) nm := NewNeedleMap(file)
@ -131,9 +124,6 @@ func readIndexFile(indexFile *os.File, iterFun func([]byte) error) error {
} }
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
if nm.IsFrozen() {
return 0, MapIsFrozen
}
oldSize := nm.m.Set(Key(key), offset, size) oldSize := nm.m.Set(Key(key), offset, size)
util.Uint64toBytes(nm.bytes[0:8], key) util.Uint64toBytes(nm.bytes[0:8], key)
util.Uint32toBytes(nm.bytes[8:12], offset) util.Uint32toBytes(nm.bytes[8:12], offset)
@ -150,17 +140,13 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
element, ok = nm.m.Get(Key(key)) element, ok = nm.m.Get(Key(key))
return return
} }
func (nm *NeedleMap) Delete(key uint64) error { func (nm *NeedleMap) Delete(key uint64) {
if nm.IsFrozen() {
return MapIsFrozen
}
nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key))) nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key)))
util.Uint64toBytes(nm.bytes[0:8], key) util.Uint64toBytes(nm.bytes[0:8], key)
util.Uint32toBytes(nm.bytes[8:12], 0) util.Uint32toBytes(nm.bytes[8:12], 0)
util.Uint32toBytes(nm.bytes[12:16], 0) util.Uint32toBytes(nm.bytes[12:16], 0)
nm.indexFile.Write(nm.bytes) nm.indexFile.Write(nm.bytes)
nm.deletionCounter++ nm.deletionCounter++
return nil
} }
func (nm *NeedleMap) Close() { func (nm *NeedleMap) Close() {
nm.indexFile.Close() nm.indexFile.Close()

View file

@ -120,16 +120,8 @@ func (s *Store) loadExistingVolumes() {
func (s *Store) Status() []*VolumeInfo { func (s *Store) Status() []*VolumeInfo {
var stats []*VolumeInfo var stats []*VolumeInfo
for k, v := range s.volumes { for k, v := range s.volumes {
s := &VolumeInfo{ s := new(VolumeInfo)
Id: VolumeId(k), s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.ContentSize(), v.replicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
Size: v.ContentSize(),
RepType: v.replicaType,
Version: v.Version(),
FileCount: v.nm.fileCounter,
DeleteCount: v.nm.deletionCounter,
DeletedByteCount: v.nm.deletionByteCounter,
Frozen: !v.IsWritable(),
}
stats = append(stats, s) stats = append(stats, s)
} }
return stats return stats
@ -142,8 +134,6 @@ type JoinResult struct {
func (s *Store) SetMaster(mserver string) { func (s *Store) SetMaster(mserver string) {
s.masterNode = mserver s.masterNode = mserver
} }
// call master's /dir/join
func (s *Store) Join() error { func (s *Store) Join() error {
stats := new([]*VolumeInfo) stats := new([]*VolumeInfo)
for k, v := range s.volumes { for k, v := range s.volumes {
@ -181,8 +171,7 @@ func (s *Store) Close() {
func (s *Store) Write(i VolumeId, n *Needle) uint32 { func (s *Store) Write(i VolumeId, n *Needle) uint32 {
if v := s.volumes[i]; v != nil { if v := s.volumes[i]; v != nil {
size := v.write(n) size := v.write(n)
if s.volumeSizeLimit < v.ContentSize()+uint64(size) && if s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
s.volumeSizeLimit >= v.ContentSize() {
log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit) log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
s.Join() s.Join()
} }

View file

@ -3,7 +3,6 @@ package storage
import ( import (
"errors" "errors"
"fmt" "fmt"
"log"
"os" "os"
"path" "path"
"sync" "sync"
@ -65,18 +64,6 @@ func (v *Volume) Size() int64 {
fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error()) fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error())
return -1 return -1
} }
// a volume is writable, if its data file is writable and the index is not frozen
func (v *Volume) IsWritable() bool {
stat, e := v.dataFile.Stat()
if e != nil {
log.Printf("Failed to read file permission %s %s\n", v.dataFile.Name(), e.Error())
return false
}
// 4 for r, 2 for w, 1 for x
return stat.Mode().Perm()&0222 > 0 && !v.nm.IsFrozen()
}
func (v *Volume) Close() { func (v *Volume) Close() {
v.accessLock.Lock() v.accessLock.Lock()
defer v.accessLock.Unlock() defer v.accessLock.Unlock()

View file

@ -10,5 +10,4 @@ type VolumeInfo struct {
FileCount int FileCount int
DeleteCount int DeleteCount int
DeletedByteCount uint64 DeletedByteCount uint64
Frozen bool
} }

View file

@ -45,9 +45,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
} }
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
return !v.Frozen && return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion
uint64(v.Size) < vl.volumeSizeLimit &&
v.Version == storage.CurrentVersion
} }
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
@ -94,7 +92,6 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
return false return false
} }
} }
// FIXME: how to refuse if volume is unwritable/frozen?
fmt.Println("Volume", vid, "becomes writable") fmt.Println("Volume", vid, "becomes writable")
vl.writables = append(vl.writables, vid) vl.writables = append(vl.writables, vid)
if len(vl.writables) > 1 { if len(vl.writables) > 1 {