refactor for reading super block

This commit is contained in:
Chris Lu 2012-12-20 22:32:21 -08:00
parent ebe7af1833
commit 79a49ada39
3 changed files with 92 additions and 76 deletions

View file

@ -1,65 +1,70 @@
package main package main
import ( import (
"pkg/storage" "log"
"log" "os"
"os" "path"
"path" "pkg/storage"
"strconv" "strconv"
) )
func init() { func init() {
cmdFix.Run = runFix // break init cycle cmdFix.Run = runFix // break init cycle
IsDebug = cmdFix.Flag.Bool("debug", false, "enable debug mode") IsDebug = cmdFix.Flag.Bool("debug", false, "enable debug mode")
} }
var cmdFix = &Command{ var cmdFix = &Command{
UsageLine: "fix -dir=/tmp -volumeId=234 -debug=1", UsageLine: "fix -dir=/tmp -volumeId=234 -debug=1",
Short: "run weed tool fix on index file if corrupted", Short: "run weed tool fix on index file if corrupted",
Long: `Fix runs the WeedFS fix command to re-create the index .idx file. Long: `Fix runs the WeedFS fix command to re-create the index .idx file.
`, `,
} }
var ( var (
dir = cmdFix.Flag.String("dir", "/tmp", "data directory to store files") dir = cmdFix.Flag.String("dir", "/tmp", "data directory to store files")
volumeId = cmdFix.Flag.Int("volumeId", -1, "a non-negative volume id. The volume should already exist in the dir. The volume index file should not exist.") volumeId = cmdFix.Flag.Int("volumeId", -1, "a non-negative volume id. The volume should already exist in the dir. The volume index file should not exist.")
) )
func runFix(cmd *Command, args []string) bool { func runFix(cmd *Command, args []string) bool {
if *volumeId == -1 { if *volumeId == -1 {
return false return false
} }
fileName := strconv.Itoa(*volumeId) fileName := strconv.Itoa(*volumeId)
dataFile, e := os.OpenFile(path.Join(*dir, fileName+".dat"), os.O_RDONLY, 0644) dataFile, e := os.OpenFile(path.Join(*dir, fileName+".dat"), os.O_RDONLY, 0644)
if e != nil { if e != nil {
log.Fatalf("Read Volume [ERROR] %s\n", e) log.Fatalf("Read Volume [ERROR] %s\n", e)
} }
defer dataFile.Close() defer dataFile.Close()
indexFile, ie := os.OpenFile(path.Join(*dir, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644) indexFile, ie := os.OpenFile(path.Join(*dir, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644)
if ie != nil { if ie != nil {
log.Fatalf("Create Volume Index [ERROR] %s\n", ie) log.Fatalf("Create Volume Index [ERROR] %s\n", ie)
} }
defer indexFile.Close() defer indexFile.Close()
//skip the volume super block dataFile.Seek(0, 0)
dataFile.Seek(storage.SuperBlockSize, 0) header := make([]byte, storage.SuperBlockSize)
if _, e := dataFile.Read(header); e != nil {
log.Fatalf("cannot read superblock: %s", e)
}
n, rest := storage.ReadNeedle(dataFile) ver, _, _ := storage.ParseSuperBlock(header)
dataFile.Seek(int64(rest), 1)
nm := storage.NewNeedleMap(indexFile) n, rest := storage.ReadNeedle(dataFile, ver)
offset := uint32(storage.SuperBlockSize) dataFile.Seek(int64(rest), 1)
for n != nil { nm := storage.NewNeedleMap(indexFile)
debug("key", n.Id, "volume offset", offset, "data_size", n.Size, "rest", rest) offset := uint32(storage.SuperBlockSize)
if n.Size > 0 { for n != nil {
count, pe := nm.Put(n.Id, offset/8, n.Size) debug("key", n.Id, "volume offset", offset, "data_size", n.Size, "rest", rest)
debug("saved", count, "with error", pe) if n.Size > 0 {
} count, pe := nm.Put(n.Id, offset/8, n.Size)
offset += rest+16 debug("saved", count, "with error", pe)
n, rest = storage.ReadNeedle(dataFile) }
dataFile.Seek(int64(rest), 1) offset += rest + 16
} n, rest = storage.ReadNeedle(dataFile, ver)
return true dataFile.Seek(int64(rest), 1)
}
return true
} }

View file

@ -1,7 +1,7 @@
package storage package storage
import ( import (
"errors" "errors"
"io" "io"
"os" "os"
"pkg/util" "pkg/util"
@ -15,14 +15,14 @@ func (n *Needle) Append(w io.Writer) uint32 {
util.Uint32toBytes(header[12:16], n.Size) util.Uint32toBytes(header[12:16], n.Size)
w.Write(header) w.Write(header)
w.Write(n.Data) w.Write(n.Data)
rest := 8 - ((n.Size + 16 + 4) % 8) rest := 8 - ((16 + n.Size + 4) % 8)
util.Uint32toBytes(header[0:4], n.Checksum.Value()) util.Uint32toBytes(header[0:4], n.Checksum.Value())
w.Write(header[0 : rest+4]) w.Write(header[0 : 4+rest])
return n.Size return n.Size
} }
func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
if version == Version1 { if version == Version1 {
bytes := make([]byte, size+16+4) bytes := make([]byte, 16+size+4)
ret, e := r.Read(bytes) ret, e := r.Read(bytes)
n.Cookie = util.BytesToUint32(bytes[0:4]) n.Cookie = util.BytesToUint32(bytes[0:4])
n.Id = util.BytesToUint64(bytes[4:12]) n.Id = util.BytesToUint64(bytes[4:12])
@ -33,20 +33,24 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
return 0, errors.New("CRC error! Data On Disk Corrupted!") return 0, errors.New("CRC error! Data On Disk Corrupted!")
} }
return ret, e return ret, e
}else if version == Version2 { } else if version == Version2 {
} }
return 0, errors.New("Unsupported Version!") return 0, errors.New("Unsupported Version!")
} }
func ReadNeedle(r *os.File) (*Needle, uint32) { func ReadNeedle(r *os.File, version Version) (n *Needle, bytesTillNextFile uint32) {
n := new(Needle) n = new(Needle)
bytes := make([]byte, 16) if version == Version1 {
count, e := r.Read(bytes) bytes := make([]byte, 16)
if count <= 0 || e != nil { count, e := r.Read(bytes)
return nil, 0 if count <= 0 || e != nil {
return nil, 0
}
n.Cookie = util.BytesToUint32(bytes[0:4])
n.Id = util.BytesToUint64(bytes[4:12])
n.Size = util.BytesToUint32(bytes[12:16])
rest := 8 - ((n.Size + 16 + 4) % 8)
bytesTillNextFile = n.Size + 4 + rest
} else if version == Version2 {
} }
n.Cookie = util.BytesToUint32(bytes[0:4]) return
n.Id = util.BytesToUint64(bytes[4:12])
n.Size = util.BytesToUint32(bytes[12:16])
rest := 8 - ((n.Size + 16 + 4) % 8)
return n, n.Size + 4 + rest
} }

View file

@ -18,8 +18,8 @@ type Volume struct {
dataFile *os.File dataFile *os.File
nm *NeedleMap nm *NeedleMap
replicaType ReplicationType
version Version version Version
replicaType ReplicationType
accessLock sync.Mutex accessLock sync.Mutex
} }
@ -51,11 +51,11 @@ func (v *Volume) load() error {
return nil return nil
} }
func (v *Volume) Version() Version { func (v *Volume) Version() Version {
return CurrentVersion return CurrentVersion
} }
func (v *Volume) Size() int64 { func (v *Volume) Size() int64 {
v.accessLock.Lock() v.accessLock.Lock()
defer v.accessLock.Unlock() defer v.accessLock.Unlock()
stat, e := v.dataFile.Stat() stat, e := v.dataFile.Stat()
if e == nil { if e == nil {
return stat.Size() return stat.Size()
@ -64,15 +64,15 @@ func (v *Volume) Size() int64 {
return -1 return -1
} }
func (v *Volume) Close() { func (v *Volume) Close() {
v.accessLock.Lock() v.accessLock.Lock()
defer v.accessLock.Unlock() defer v.accessLock.Unlock()
v.nm.Close() v.nm.Close()
v.dataFile.Close() v.dataFile.Close()
} }
func (v *Volume) maybeWriteSuperBlock() { func (v *Volume) maybeWriteSuperBlock() {
stat, _ := v.dataFile.Stat() stat, _ := v.dataFile.Stat()
if stat.Size() == 0 { if stat.Size() == 0 {
v.version = CurrentVersion v.version = CurrentVersion
header := make([]byte, SuperBlockSize) header := make([]byte, SuperBlockSize)
header[0] = byte(v.version) header[0] = byte(v.version)
header[1] = v.replicaType.Byte() header[1] = v.replicaType.Byte()
@ -85,12 +85,17 @@ func (v *Volume) readSuperBlock() error {
if _, e := v.dataFile.Read(header); e != nil { if _, e := v.dataFile.Read(header); e != nil {
return fmt.Errorf("cannot read superblock: %s", e) return fmt.Errorf("cannot read superblock: %s", e)
} }
v.version = Version(header[0])
var err error var err error
if v.replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { v.version, v.replicaType, err = ParseSuperBlock(header)
return fmt.Errorf("cannot read replica type: %s", err) return err
}
func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, e error) {
version = Version(header[0])
var err error
if replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
e = fmt.Errorf("cannot read replica type: %s", err)
} }
return nil return
} }
func (v *Volume) NeedToReplicate() bool { func (v *Volume) NeedToReplicate() bool {
return v.replicaType.GetCopyCount() > 1 return v.replicaType.GetCopyCount() > 1
@ -133,7 +138,7 @@ func (v *Volume) read(n *Needle) (int, error) {
} }
func (v *Volume) garbageLevel() float64 { func (v *Volume) garbageLevel() float64 {
return float64(v.nm.deletionByteCounter)/float64(v.ContentSize()) return float64(v.nm.deletionByteCounter) / float64(v.ContentSize())
} }
func (v *Volume) compact() error { func (v *Volume) compact() error {
@ -143,7 +148,7 @@ func (v *Volume) compact() error {
filePath := path.Join(v.dir, v.Id.String()) filePath := path.Join(v.dir, v.Id.String())
return v.copyDataAndGenerateIndexFile(filePath+".dat", filePath+".cpd", filePath+".cpx") return v.copyDataAndGenerateIndexFile(filePath+".dat", filePath+".cpd", filePath+".cpx")
} }
func (v *Volume) commitCompact() (error) { func (v *Volume) commitCompact() error {
v.accessLock.Lock() v.accessLock.Lock()
defer v.accessLock.Unlock() defer v.accessLock.Unlock()
v.dataFile.Close() v.dataFile.Close()
@ -185,7 +190,9 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string)
dst.Write(header) dst.Write(header)
} }
n, rest := ReadNeedle(src) version, _, _ := ParseSuperBlock(header)
n, rest := ReadNeedle(src, version)
nm := NewNeedleMap(idx) nm := NewNeedleMap(idx)
old_offset := uint32(SuperBlockSize) old_offset := uint32(SuperBlockSize)
new_offset := uint32(SuperBlockSize) new_offset := uint32(SuperBlockSize)
@ -208,11 +215,11 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string)
src.Seek(int64(rest-n.Size-4), 1) src.Seek(int64(rest-n.Size-4), 1)
} }
old_offset += rest + 16 old_offset += rest + 16
n, rest = ReadNeedle(src) n, rest = ReadNeedle(src, version)
} }
return nil return nil
} }
func (v *Volume) ContentSize() uint64{ func (v *Volume) ContentSize() uint64 {
return v.nm.fileByteCounter return v.nm.fileByteCounter
} }