seaweedfs/weed/storage/volume_super_block.go

137 lines
3.8 KiB
Go

package storage
import (
"fmt"
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
)
const (
_SuperBlockSize = 8
)
/*
* Super block currently has 8 bytes allocated for each volume.
* Byte 0: version, 1 or 2
* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
* Byte 2 and byte 3: Time to live. See TTL for definition
* Byte 4 and byte 5: The number of times the volume has been compacted.
* Rest bytes: Reserved
*/
type SuperBlock struct {
version needle.Version
ReplicaPlacement *ReplicaPlacement
Ttl *needle.TTL
CompactionRevision uint16
Extra *master_pb.SuperBlockExtra
extraSize uint16
}
func (s *SuperBlock) BlockSize() int {
switch s.version {
case needle.Version2, needle.Version3:
return _SuperBlockSize + int(s.extraSize)
}
return _SuperBlockSize
}
func (s *SuperBlock) Version() needle.Version {
return s.version
}
func (s *SuperBlock) Bytes() []byte {
header := make([]byte, _SuperBlockSize)
header[0] = byte(s.version)
header[1] = s.ReplicaPlacement.Byte()
s.Ttl.ToBytes(header[2:4])
util.Uint16toBytes(header[4:6], s.CompactionRevision)
if s.Extra != nil {
extraData, err := proto.Marshal(s.Extra)
if err != nil {
glog.Fatalf("cannot marshal super block extra %+v: %v", s.Extra, err)
}
extraSize := len(extraData)
if extraSize > 256*256-2 {
// reserve a couple of bits for future extension
glog.Fatalf("super block extra size is %d bigger than %d", extraSize, 256*256-2)
}
s.extraSize = uint16(extraSize)
util.Uint16toBytes(header[6:8], s.extraSize)
header = append(header, extraData...)
}
return header
}
func (s *SuperBlock) Initialized() bool {
return s.ReplicaPlacement != nil && s.Ttl != nil
}
func (v *Volume) maybeWriteSuperBlock() error {
datSize, _, e := v.DataBackend.GetStat()
if e != nil {
glog.V(0).Infof("failed to stat datafile %s: %v", v.DataBackend.String(), e)
return e
}
if datSize == 0 {
v.SuperBlock.version = needle.CurrentVersion
_, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
if e != nil && os.IsPermission(e) {
//read-only, but zero length - recreate it!
var dataFile *os.File
if dataFile, e = os.Create(v.DataBackend.String()); e == nil {
v.DataBackend = backend.NewDiskFile(dataFile)
if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil {
v.readOnly = false
}
}
}
}
return e
}
func (v *Volume) readSuperBlock() (err error) {
v.SuperBlock, err = ReadSuperBlock(v.DataBackend)
return err
}
// ReadSuperBlock reads from data file and load it into volume's super block
func ReadSuperBlock(datBackend backend.DataStorageBackend) (superBlock SuperBlock, err error) {
header := make([]byte, _SuperBlockSize)
if _, e := datBackend.ReadAt(header, 0); e != nil {
err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.String(), e)
return
}
superBlock.version = needle.Version(header[0])
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
err = fmt.Errorf("cannot read replica type: %s", err.Error())
return
}
superBlock.Ttl = needle.LoadTTLFromBytes(header[2:4])
superBlock.CompactionRevision = util.BytesToUint16(header[4:6])
superBlock.extraSize = util.BytesToUint16(header[6:8])
if superBlock.extraSize > 0 {
// read more
extraData := make([]byte, int(superBlock.extraSize))
superBlock.Extra = &master_pb.SuperBlockExtra{}
err = proto.Unmarshal(extraData, superBlock.Extra)
if err != nil {
err = fmt.Errorf("cannot read volume %s super block extra: %v", datBackend.String(), err)
return
}
}
return
}