a bit refactoring to prepare for volume format change and backward

compatibility.
This commit is contained in:
Chris Lu 2014-08-25 11:37:00 -07:00
parent aac5fa49de
commit 4c58cef24a
3 changed files with 68 additions and 57 deletions

View file

@ -12,22 +12,6 @@ import (
"time" "time"
) )
const (
SuperBlockSize = 8
)
type SuperBlock struct {
Version Version
ReplicaPlacement *ReplicaPlacement
}
func (s *SuperBlock) Bytes() []byte {
header := make([]byte, SuperBlockSize)
header[0] = byte(s.Version)
header[1] = s.ReplicaPlacement.Byte()
return header
}
type Volume struct { type Volume struct {
Id VolumeId Id VolumeId
dir string dir string
@ -122,7 +106,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
return e return e
} }
func (v *Volume) Version() Version { func (v *Volume) Version() Version {
return v.SuperBlock.Version return v.SuperBlock.Version()
} }
func (v *Volume) Size() int64 { func (v *Volume) Size() int64 {
stat, e := v.dataFile.Stat() stat, e := v.dataFile.Stat()
@ -138,44 +122,6 @@ func (v *Volume) Close() {
v.nm.Close() v.nm.Close()
_ = v.dataFile.Close() _ = v.dataFile.Close()
} }
func (v *Volume) maybeWriteSuperBlock() error {
stat, e := v.dataFile.Stat()
if e != nil {
glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
return e
}
if stat.Size() == 0 {
v.SuperBlock.Version = CurrentVersion
_, e = v.dataFile.Write(v.SuperBlock.Bytes())
if e != nil && os.IsPermission(e) {
//read-only, but zero length - recreate it!
if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
v.readOnly = false
}
}
}
}
return e
}
func (v *Volume) readSuperBlock() (err error) {
if _, err = v.dataFile.Seek(0, 0); err != nil {
return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error())
}
header := make([]byte, SuperBlockSize)
if _, e := v.dataFile.Read(header); e != nil {
return fmt.Errorf("cannot read superblock: %s", e.Error())
}
v.SuperBlock, err = ParseSuperBlock(header)
return err
}
func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
superBlock.Version = Version(header[0])
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
err = fmt.Errorf("cannot read replica type: %s", err.Error())
}
return
}
func (v *Volume) NeedToReplicate() bool { func (v *Volume) NeedToReplicate() bool {
return v.ReplicaPlacement.GetCopyCount() > 1 return v.ReplicaPlacement.GetCopyCount() > 1
} }

View file

@ -0,0 +1,65 @@
package storage
import (
"code.google.com/p/weed-fs/go/glog"
"fmt"
"os"
)
const (
SuperBlockSize = 8
)
type SuperBlock struct {
version Version
ReplicaPlacement *ReplicaPlacement
}
func (s *SuperBlock) Version() Version {
return s.version
}
func (s *SuperBlock) Bytes() []byte {
header := make([]byte, SuperBlockSize)
header[0] = byte(s.version)
header[1] = s.ReplicaPlacement.Byte()
return header
}
func (v *Volume) maybeWriteSuperBlock() error {
stat, e := v.dataFile.Stat()
if e != nil {
glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
return e
}
if stat.Size() == 0 {
v.SuperBlock.version = CurrentVersion
_, e = v.dataFile.Write(v.SuperBlock.Bytes())
if e != nil && os.IsPermission(e) {
//read-only, but zero length - recreate it!
if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
v.readOnly = false
}
}
}
}
return e
}
func (v *Volume) readSuperBlock() (err error) {
if _, err = v.dataFile.Seek(0, 0); err != nil {
return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error())
}
header := make([]byte, SuperBlockSize)
if _, e := v.dataFile.Read(header); e != nil {
return fmt.Errorf("cannot read superblock: %s", e.Error())
}
v.SuperBlock, err = ParseSuperBlock(header)
return err
}
func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
superBlock.version = Version(header[0])
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
err = fmt.Errorf("cannot read replica type: %s", err.Error())
}
return
}

View file

@ -100,7 +100,7 @@ func runExport(cmd *Command, args []string) bool {
var version storage.Version var version storage.Version
err = storage.ScanVolumeFile(*exportVolumePath, *exportCollection, vid, func(superBlock storage.SuperBlock) error { err = storage.ScanVolumeFile(*exportVolumePath, *exportCollection, vid, func(superBlock storage.SuperBlock) error {
version = superBlock.Version version = superBlock.Version()
return nil return nil
}, true, func(n *storage.Needle, offset int64) error { }, true, func(n *storage.Needle, offset int64) error {
nv, ok := nm.Get(n.Id) nv, ok := nm.Get(n.Id)