fix enforcing volume size limit

git-svn-id: https://weed-fs.googlecode.com/svn/trunk@34 282b0af5-e82d-9cf1-ede4-77906d7719d0
This commit is contained in:
chris.lu@gmail.com 2011-12-29 08:07:19 +00:00
parent 298fdb4603
commit 4c2ca916ec

View file

@ -20,8 +20,8 @@ type MachineInfo struct {
PublicUrl string PublicUrl string
} }
type Machine struct { type Machine struct {
Server MachineInfo Server MachineInfo
Volumes []storage.VolumeInfo Volumes []storage.VolumeInfo
} }
type Mapper struct { type Mapper struct {
@ -30,12 +30,12 @@ type Mapper struct {
lock sync.Mutex lock sync.Mutex
Machines []*Machine Machines []*Machine
vid2machineId map[uint32]int vid2machineId map[uint32]int //machineId is +1 of the index of []*Machine, to detect not found entries
Writers []int // transient array of Writers volume id Writers []uint32 // transient array of Writers volume id
FileIdSequence uint64 FileIdSequence uint64
fileIdCounter uint64 fileIdCounter uint64
volumeSizeLimit uint32 volumeSizeLimit uint32
} }
@ -47,7 +47,7 @@ func NewMapper(dirname string, filename string, volumeSizeLimit uint32) (m *Mapp
m = &Mapper{dir: dirname, fileName: filename} m = &Mapper{dir: dirname, fileName: filename}
m.vid2machineId = make(map[uint32]int) m.vid2machineId = make(map[uint32]int)
m.volumeSizeLimit = volumeSizeLimit m.volumeSizeLimit = volumeSizeLimit
m.Writers = *new([]int) m.Writers = *new([]uint32)
m.Machines = *new([]*Machine) m.Machines = *new([]*Machine)
seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644)
@ -65,14 +65,18 @@ func NewMapper(dirname string, filename string, volumeSizeLimit uint32) (m *Mapp
return return
} }
func (m *Mapper) PickForWrite() (string, MachineInfo, os.Error) { func (m *Mapper) PickForWrite() (string, MachineInfo, os.Error) {
len_writers := len(m.Writers) len_writers := len(m.Writers)
if len_writers<=0 { if len_writers <= 0 {
log.Println("No more writable volumes!") log.Println("No more writable volumes!")
return "",m.Machines[rand.Intn(len(m.Machines))].Server, os.NewError("No more writable volumes!") return "", m.Machines[rand.Intn(len(m.Machines))].Server, os.NewError("No more writable volumes!")
} }
machine := m.Machines[m.Writers[rand.Intn(len_writers)]] vid := m.Writers[rand.Intn(len_writers)]
vid := machine.Volumes[rand.Intn(len(machine.Volumes))].Id machine_id := m.vid2machineId[vid]
return NewFileId(vid, m.NextFileId(), rand.Uint32()).String(), machine.Server,nil if machine_id > 0 {
machine := m.Machines[machine_id-1]
return NewFileId(vid, m.NextFileId(), rand.Uint32()).String(), machine.Server, nil
}
return "", m.Machines[rand.Intn(len(m.Machines))].Server, os.NewError("Strangely vid " + strconv.Uitoa64(uint64(vid)) + " is on no machine!")
} }
func (m *Mapper) NextFileId() uint64 { func (m *Mapper) NextFileId() uint64 {
if m.fileIdCounter <= 0 { if m.fileIdCounter <= 0 {
@ -84,13 +88,13 @@ func (m *Mapper) NextFileId() uint64 {
return m.FileIdSequence - m.fileIdCounter return m.FileIdSequence - m.fileIdCounter
} }
func (m *Mapper) Get(vid uint32) (*Machine, os.Error) { func (m *Mapper) Get(vid uint32) (*Machine, os.Error) {
machineId := m.vid2machineId[vid] machineId := m.vid2machineId[vid]
if machineId <=0{ if machineId <= 0 {
return nil, os.NewError("invalid volume id " + strconv.Uitob64(uint64(vid),10)) return nil, os.NewError("invalid volume id " + strconv.Uitob64(uint64(vid), 10))
} }
return m.Machines[machineId-1],nil return m.Machines[machineId-1], nil
} }
func (m *Mapper) Add(machine Machine){ func (m *Mapper) Add(machine Machine) {
//check existing machine, linearly //check existing machine, linearly
m.lock.Lock() m.lock.Lock()
foundExistingMachineId := -1 foundExistingMachineId := -1
@ -104,22 +108,22 @@ func (m *Mapper) Add(machine Machine){
if machineId < 0 { if machineId < 0 {
machineId = len(m.Machines) machineId = len(m.Machines)
m.Machines = append(m.Machines, &machine) m.Machines = append(m.Machines, &machine)
}else{ } else {
m.Machines[machineId] = &machine m.Machines[machineId] = &machine
} }
m.lock.Unlock() m.lock.Unlock()
//add to vid2machineId map, and Writers array //add to vid2machineId map, and Writers array
for _, v := range machine.Volumes { for _, v := range machine.Volumes {
//log.Println("Setting volume", v.Id, "to", machine.Server.Url) //log.Println("Setting volume", v.Id, "to", machine.Server.Url)
m.vid2machineId[v.Id] = machineId+1 //use base 1 indexed, to detect not found cases m.vid2machineId[v.Id] = machineId + 1 //use base 1 indexed, to detect not found cases
} }
//setting Writers, copy-on-write because of possible updating //setting Writers, copy-on-write because of possible updating
var writers []int var writers []uint32
for machine_index, machine_entry := range m.Machines { for _, machine_entry := range m.Machines {
for _, v := range machine_entry.Volumes { for _, v := range machine_entry.Volumes {
if v.Size < int64(m.volumeSizeLimit) { if v.Size < int64(m.volumeSizeLimit) {
writers = append(writers, machine_index) writers = append(writers, v.Id)
} }
} }
} }