mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
prepare for flexible super block
This commit is contained in:
parent
b46cf2bb0e
commit
9ba335a7c4
|
@ -48,11 +48,7 @@ func main() {
|
||||||
}
|
}
|
||||||
defer datFile.Close()
|
defer datFile.Close()
|
||||||
|
|
||||||
header := make([]byte, storage.SuperBlockSize)
|
superBlock, err := storage.ReadSuperBlock(datFile)
|
||||||
if _, e := datFile.Read(header); e != nil {
|
|
||||||
glog.Fatalf("cannot read volume %s super block: %v", fileName+".dat", e)
|
|
||||||
}
|
|
||||||
superBlock, err := storage.ParseSuperBlock(header)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("cannot parse existing super block: %v", err)
|
glog.Fatalf("cannot parse existing super block: %v", err)
|
||||||
|
|
|
@ -39,24 +39,26 @@ func main() {
|
||||||
}
|
}
|
||||||
indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_RDONLY, 0644)
|
indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_RDONLY, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("Read Volume Index [ERROR] %s\n", err)
|
glog.Fatalf("Read Volume Index %v", err)
|
||||||
}
|
}
|
||||||
defer indexFile.Close()
|
defer indexFile.Close()
|
||||||
datFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".dat"), os.O_RDONLY, 0644)
|
datFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".dat"), os.O_RDONLY, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("Read Volume Data [ERROR] %s\n", err)
|
glog.Fatalf("Read Volume Data %v", err)
|
||||||
}
|
}
|
||||||
defer datFile.Close()
|
defer datFile.Close()
|
||||||
|
|
||||||
newDatFile, err := os.Create(path.Join(*fixVolumePath, fileName+".dat_fixed"))
|
newDatFile, err := os.Create(path.Join(*fixVolumePath, fileName+".dat_fixed"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("Write New Volume Data [ERROR] %s\n", err)
|
glog.Fatalf("Write New Volume Data %v", err)
|
||||||
}
|
}
|
||||||
defer newDatFile.Close()
|
defer newDatFile.Close()
|
||||||
|
|
||||||
header := make([]byte, storage.SuperBlockSize)
|
superBlock, err := storage.ReadSuperBlock(datFile)
|
||||||
datFile.Read(header)
|
if err != nil {
|
||||||
newDatFile.Write(header)
|
glog.Fatalf("Read Volume Data superblock %v", err)
|
||||||
|
}
|
||||||
|
newDatFile.Write(superBlock.Bytes())
|
||||||
|
|
||||||
iterateEntries(datFile, indexFile, func(n *storage.Needle, offset int64) {
|
iterateEntries(datFile, indexFile, func(n *storage.Needle, offset int64) {
|
||||||
fmt.Printf("file id=%d name=%s size=%d dataSize=%d\n", n.Id, string(n.Name), n.Size, n.DataSize)
|
fmt.Printf("file id=%d name=%s size=%d dataSize=%d\n", n.Id, string(n.Name), n.Size, n.DataSize)
|
||||||
|
@ -74,8 +76,13 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl
|
||||||
readerOffset += int64(count)
|
readerOffset += int64(count)
|
||||||
|
|
||||||
// start to read dat file
|
// start to read dat file
|
||||||
offset := int64(storage.SuperBlockSize)
|
superblock, err := storage.ReadSuperBlock(datFile)
|
||||||
version := storage.Version2
|
if err != nil {
|
||||||
|
fmt.Printf("cannot read dat file super block: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
offset := int64(superblock.BlockSize())
|
||||||
|
version := superblock.Version()
|
||||||
n, rest, err := storage.ReadNeedleHeader(datFile, version, offset)
|
n, rest, err := storage.ReadNeedleHeader(datFile, version, offset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("cannot read needle header: %v", err)
|
fmt.Printf("cannot read needle header: %v", err)
|
||||||
|
|
|
@ -189,7 +189,7 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId,
|
||||||
|
|
||||||
version := v.Version()
|
version := v.Version()
|
||||||
|
|
||||||
offset := int64(SuperBlockSize)
|
offset := int64(v.SuperBlock.BlockSize())
|
||||||
n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
|
n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
err = fmt.Errorf("cannot read needle header: %v", e)
|
err = fmt.Errorf("cannot read needle header: %v", e)
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
SuperBlockSize = 8
|
_SuperBlockSize = 8
|
||||||
)
|
)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -27,11 +27,15 @@ type SuperBlock struct {
|
||||||
CompactRevision uint16
|
CompactRevision uint16
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SuperBlock) BlockSize() int {
|
||||||
|
return _SuperBlockSize
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SuperBlock) Version() Version {
|
func (s *SuperBlock) Version() Version {
|
||||||
return s.version
|
return s.version
|
||||||
}
|
}
|
||||||
func (s *SuperBlock) Bytes() []byte {
|
func (s *SuperBlock) Bytes() []byte {
|
||||||
header := make([]byte, SuperBlockSize)
|
header := make([]byte, _SuperBlockSize)
|
||||||
header[0] = byte(s.version)
|
header[0] = byte(s.version)
|
||||||
header[1] = s.ReplicaPlacement.Byte()
|
header[1] = s.ReplicaPlacement.Byte()
|
||||||
s.Ttl.ToBytes(header[2:4])
|
s.Ttl.ToBytes(header[2:4])
|
||||||
|
@ -59,18 +63,13 @@ func (v *Volume) maybeWriteSuperBlock() error {
|
||||||
}
|
}
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) readSuperBlock() (err error) {
|
func (v *Volume) readSuperBlock() (err error) {
|
||||||
if _, err = v.dataFile.Seek(0, 0); err != nil {
|
v.SuperBlock, err = ReadSuperBlock(v.dataFile)
|
||||||
return fmt.Errorf("cannot seek to the beginning of %s: %v", v.dataFile.Name(), err)
|
|
||||||
}
|
|
||||||
header := make([]byte, SuperBlockSize)
|
|
||||||
if _, e := v.dataFile.Read(header); e != nil {
|
|
||||||
return fmt.Errorf("cannot read volume %d super block: %v", v.Id, e)
|
|
||||||
}
|
|
||||||
v.SuperBlock, err = ParseSuperBlock(header)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
|
|
||||||
|
func parseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
|
||||||
superBlock.version = Version(header[0])
|
superBlock.version = Version(header[0])
|
||||||
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
|
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
|
||||||
err = fmt.Errorf("cannot read replica type: %s", err.Error())
|
err = fmt.Errorf("cannot read replica type: %s", err.Error())
|
||||||
|
@ -79,3 +78,17 @@ func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
|
||||||
superBlock.CompactRevision = util.BytesToUint16(header[4:6])
|
superBlock.CompactRevision = util.BytesToUint16(header[4:6])
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReadSuperBlock reads from data file and load it into volume's super block
|
||||||
|
func ReadSuperBlock(dataFile *os.File) (superBlock SuperBlock, err error) {
|
||||||
|
if _, err = dataFile.Seek(0, 0); err != nil {
|
||||||
|
err = fmt.Errorf("cannot seek to the beginning of %s: %v", dataFile.Name(), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
header := make([]byte, _SuperBlockSize)
|
||||||
|
if _, e := dataFile.Read(header); e != nil {
|
||||||
|
err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return parseSuperBlock(header)
|
||||||
|
}
|
||||||
|
|
|
@ -87,14 +87,7 @@ func (v *Volume) cleanupCompact() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) {
|
func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) {
|
||||||
if _, err = file.Seek(0, 0); err != nil {
|
superBlock, err := ReadSuperBlock(file)
|
||||||
return 0, fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err)
|
|
||||||
}
|
|
||||||
header := make([]byte, SuperBlockSize)
|
|
||||||
if _, e := file.Read(header); e != nil {
|
|
||||||
return 0, fmt.Errorf("cannot read file %s 's super block: %v", file.Name(), e)
|
|
||||||
}
|
|
||||||
superBlock, err := ParseSuperBlock(header)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -242,7 +235,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
|
||||||
defer idx.Close()
|
defer idx.Close()
|
||||||
|
|
||||||
nm := NewBtreeNeedleMap(idx)
|
nm := NewBtreeNeedleMap(idx)
|
||||||
new_offset := int64(SuperBlockSize)
|
new_offset := int64(0)
|
||||||
|
|
||||||
now := uint64(time.Now().Unix())
|
now := uint64(time.Now().Unix())
|
||||||
|
|
||||||
|
@ -250,6 +243,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
|
||||||
func(superBlock SuperBlock) error {
|
func(superBlock SuperBlock) error {
|
||||||
superBlock.CompactRevision++
|
superBlock.CompactRevision++
|
||||||
_, err = dst.Write(superBlock.Bytes())
|
_, err = dst.Write(superBlock.Bytes())
|
||||||
|
new_offset = int64(superBlock.BlockSize())
|
||||||
return err
|
return err
|
||||||
}, true, func(n *Needle, offset int64) error {
|
}, true, func(n *Needle, offset int64) error {
|
||||||
if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
|
if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
|
||||||
|
@ -297,7 +291,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
|
||||||
|
|
||||||
v.SuperBlock.CompactRevision++
|
v.SuperBlock.CompactRevision++
|
||||||
dst.Write(v.SuperBlock.Bytes())
|
dst.Write(v.SuperBlock.Bytes())
|
||||||
new_offset := int64(SuperBlockSize)
|
new_offset := int64(v.SuperBlock.BlockSize())
|
||||||
|
|
||||||
WalkIndexFile(oldIndexFile, func(key uint64, offset, size uint32) error {
|
WalkIndexFile(oldIndexFile, func(key uint64, offset, size uint32) error {
|
||||||
if offset == 0 || size == TombstoneFileSize {
|
if offset == 0 || size == TombstoneFileSize {
|
||||||
|
|
Loading…
Reference in a new issue