mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
add Frozen attribute to VolumeInfo
This commit is contained in:
parent
bf0ccf3461
commit
dd685fdd8d
|
@ -107,8 +107,14 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
if ip == "" {
|
if ip == "" {
|
||||||
ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
|
ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
|
||||||
}
|
}
|
||||||
port, _ := strconv.Atoi(r.FormValue("port"))
|
port, err := strconv.Atoi(r.FormValue("port"))
|
||||||
maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
|
if err != nil {
|
||||||
|
log.Printf("ERROR bad port number %s: %s", r.FormValue("port"), err)
|
||||||
|
}
|
||||||
|
maxVolumeCount, err := strconv.Atoi(r.FormValue("maxVolumeCount"))
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("ERROR bad maxVolumeCount %s: %s", r.FormValue("maxVolumeCount"), err)
|
||||||
|
}
|
||||||
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
|
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
|
||||||
publicUrl := r.FormValue("publicUrl")
|
publicUrl := r.FormValue("publicUrl")
|
||||||
volumes := new([]storage.VolumeInfo)
|
volumes := new([]storage.VolumeInfo)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
@ -60,10 +61,16 @@ func NewFrozenNeedleMap(file *os.File) (*NeedleMap, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (nm NeedleMap) IsFrozen() bool {
|
||||||
|
return nm.m == nil && nm.fm != nil
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
RowsToRead = 1024
|
RowsToRead = 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var MapIsFrozen = errors.New("Map is frozen!")
|
||||||
|
|
||||||
func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
|
func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
|
||||||
nm := NewNeedleMap(file)
|
nm := NewNeedleMap(file)
|
||||||
|
|
||||||
|
@ -124,6 +131,9 @@ func readIndexFile(indexFile *os.File, iterFun func([]byte) error) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
|
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
|
||||||
|
if nm.IsFrozen() {
|
||||||
|
return 0, MapIsFrozen
|
||||||
|
}
|
||||||
oldSize := nm.m.Set(Key(key), offset, size)
|
oldSize := nm.m.Set(Key(key), offset, size)
|
||||||
util.Uint64toBytes(nm.bytes[0:8], key)
|
util.Uint64toBytes(nm.bytes[0:8], key)
|
||||||
util.Uint32toBytes(nm.bytes[8:12], offset)
|
util.Uint32toBytes(nm.bytes[8:12], offset)
|
||||||
|
@ -140,13 +150,17 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
|
||||||
element, ok = nm.m.Get(Key(key))
|
element, ok = nm.m.Get(Key(key))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
func (nm *NeedleMap) Delete(key uint64) {
|
func (nm *NeedleMap) Delete(key uint64) error {
|
||||||
|
if nm.IsFrozen() {
|
||||||
|
return MapIsFrozen
|
||||||
|
}
|
||||||
nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key)))
|
nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key)))
|
||||||
util.Uint64toBytes(nm.bytes[0:8], key)
|
util.Uint64toBytes(nm.bytes[0:8], key)
|
||||||
util.Uint32toBytes(nm.bytes[8:12], 0)
|
util.Uint32toBytes(nm.bytes[8:12], 0)
|
||||||
util.Uint32toBytes(nm.bytes[12:16], 0)
|
util.Uint32toBytes(nm.bytes[12:16], 0)
|
||||||
nm.indexFile.Write(nm.bytes)
|
nm.indexFile.Write(nm.bytes)
|
||||||
nm.deletionCounter++
|
nm.deletionCounter++
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
func (nm *NeedleMap) Close() {
|
func (nm *NeedleMap) Close() {
|
||||||
nm.indexFile.Close()
|
nm.indexFile.Close()
|
||||||
|
|
|
@ -120,8 +120,16 @@ func (s *Store) loadExistingVolumes() {
|
||||||
func (s *Store) Status() []*VolumeInfo {
|
func (s *Store) Status() []*VolumeInfo {
|
||||||
var stats []*VolumeInfo
|
var stats []*VolumeInfo
|
||||||
for k, v := range s.volumes {
|
for k, v := range s.volumes {
|
||||||
s := new(VolumeInfo)
|
s := &VolumeInfo{
|
||||||
s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.ContentSize(), v.replicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
|
Id: VolumeId(k),
|
||||||
|
Size: v.ContentSize(),
|
||||||
|
RepType: v.replicaType,
|
||||||
|
Version: v.Version(),
|
||||||
|
FileCount: v.nm.fileCounter,
|
||||||
|
DeleteCount: v.nm.deletionCounter,
|
||||||
|
DeletedByteCount: v.nm.deletionByteCounter,
|
||||||
|
Frozen: !v.IsWritable(),
|
||||||
|
}
|
||||||
stats = append(stats, s)
|
stats = append(stats, s)
|
||||||
}
|
}
|
||||||
return stats
|
return stats
|
||||||
|
@ -134,6 +142,8 @@ type JoinResult struct {
|
||||||
func (s *Store) SetMaster(mserver string) {
|
func (s *Store) SetMaster(mserver string) {
|
||||||
s.masterNode = mserver
|
s.masterNode = mserver
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// call master's /dir/join
|
||||||
func (s *Store) Join() error {
|
func (s *Store) Join() error {
|
||||||
stats := new([]*VolumeInfo)
|
stats := new([]*VolumeInfo)
|
||||||
for k, v := range s.volumes {
|
for k, v := range s.volumes {
|
||||||
|
@ -171,7 +181,8 @@ func (s *Store) Close() {
|
||||||
func (s *Store) Write(i VolumeId, n *Needle) uint32 {
|
func (s *Store) Write(i VolumeId, n *Needle) uint32 {
|
||||||
if v := s.volumes[i]; v != nil {
|
if v := s.volumes[i]; v != nil {
|
||||||
size := v.write(n)
|
size := v.write(n)
|
||||||
if s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
|
if s.volumeSizeLimit < v.ContentSize()+uint64(size) &&
|
||||||
|
s.volumeSizeLimit >= v.ContentSize() {
|
||||||
log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
|
log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
|
||||||
s.Join()
|
s.Join()
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package storage
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -64,6 +65,18 @@ func (v *Volume) Size() int64 {
|
||||||
fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error())
|
fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error())
|
||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// a volume is writable, if its data file is writable and the index is not frozen
|
||||||
|
func (v *Volume) IsWritable() bool {
|
||||||
|
stat, e := v.dataFile.Stat()
|
||||||
|
if e != nil {
|
||||||
|
log.Printf("Failed to read file permission %s %s\n", v.dataFile.Name(), e.Error())
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// 4 for r, 2 for w, 1 for x
|
||||||
|
return stat.Mode().Perm()&0222 > 0 && !v.nm.IsFrozen()
|
||||||
|
}
|
||||||
|
|
||||||
func (v *Volume) Close() {
|
func (v *Volume) Close() {
|
||||||
v.accessLock.Lock()
|
v.accessLock.Lock()
|
||||||
defer v.accessLock.Unlock()
|
defer v.accessLock.Unlock()
|
||||||
|
|
|
@ -10,4 +10,5 @@ type VolumeInfo struct {
|
||||||
FileCount int
|
FileCount int
|
||||||
DeleteCount int
|
DeleteCount int
|
||||||
DeletedByteCount uint64
|
DeletedByteCount uint64
|
||||||
|
Frozen bool
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,9 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
|
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
|
||||||
return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion
|
return !v.Frozen &&
|
||||||
|
uint64(v.Size) < vl.volumeSizeLimit &&
|
||||||
|
v.Version == storage.CurrentVersion
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
|
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
|
||||||
|
@ -92,6 +94,7 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// FIXME: how to refuse if volume is unwritable/frozen?
|
||||||
fmt.Println("Volume", vid, "becomes writable")
|
fmt.Println("Volume", vid, "becomes writable")
|
||||||
vl.writables = append(vl.writables, vid)
|
vl.writables = append(vl.writables, vid)
|
||||||
if len(vl.writables) > 1 {
|
if len(vl.writables) > 1 {
|
||||||
|
|
Loading…
Reference in a new issue