From 9ba335a7c4db23853fc875245477eeeead06c162 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 24 Jun 2018 11:37:08 -0700 Subject: [PATCH] prepare for flexible super block --- .../change_superblock/change_superblock.go | 6 +--- unmaintained/fix_dat/fix_dat.go | 23 +++++++----- weed/storage/volume_read_write.go | 2 +- weed/storage/volume_super_block.go | 35 +++++++++++++------ weed/storage/volume_vacuum.go | 14 +++----- 5 files changed, 45 insertions(+), 35 deletions(-) diff --git a/unmaintained/change_superblock/change_superblock.go b/unmaintained/change_superblock/change_superblock.go index 6ecead697..d7a80472d 100644 --- a/unmaintained/change_superblock/change_superblock.go +++ b/unmaintained/change_superblock/change_superblock.go @@ -48,11 +48,7 @@ func main() { } defer datFile.Close() - header := make([]byte, storage.SuperBlockSize) - if _, e := datFile.Read(header); e != nil { - glog.Fatalf("cannot read volume %s super block: %v", fileName+".dat", e) - } - superBlock, err := storage.ParseSuperBlock(header) + superBlock, err := storage.ReadSuperBlock(datFile) if err != nil { glog.Fatalf("cannot parse existing super block: %v", err) diff --git a/unmaintained/fix_dat/fix_dat.go b/unmaintained/fix_dat/fix_dat.go index 1f95e6cd6..3316395dc 100644 --- a/unmaintained/fix_dat/fix_dat.go +++ b/unmaintained/fix_dat/fix_dat.go @@ -39,24 +39,26 @@ func main() { } indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_RDONLY, 0644) if err != nil { - glog.Fatalf("Read Volume Index [ERROR] %s\n", err) + glog.Fatalf("Read Volume Index %v", err) } defer indexFile.Close() datFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".dat"), os.O_RDONLY, 0644) if err != nil { - glog.Fatalf("Read Volume Data [ERROR] %s\n", err) + glog.Fatalf("Read Volume Data %v", err) } defer datFile.Close() newDatFile, err := os.Create(path.Join(*fixVolumePath, fileName+".dat_fixed")) if err != nil { - glog.Fatalf("Write New Volume Data [ERROR] %s\n", err) + glog.Fatalf("Write New Volume Data %v", err) } defer newDatFile.Close() - header := make([]byte, storage.SuperBlockSize) - datFile.Read(header) - newDatFile.Write(header) + superBlock, err := storage.ReadSuperBlock(datFile) + if err != nil { + glog.Fatalf("Read Volume Data superblock %v", err) + } + newDatFile.Write(superBlock.Bytes()) iterateEntries(datFile, indexFile, func(n *storage.Needle, offset int64) { fmt.Printf("file id=%d name=%s size=%d dataSize=%d\n", n.Id, string(n.Name), n.Size, n.DataSize) @@ -74,8 +76,13 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl readerOffset += int64(count) // start to read dat file - offset := int64(storage.SuperBlockSize) - version := storage.Version2 + superblock, err := storage.ReadSuperBlock(datFile) + if err != nil { + fmt.Printf("cannot read dat file super block: %v", err) + return + } + offset := int64(superblock.BlockSize()) + version := superblock.Version() n, rest, err := storage.ReadNeedleHeader(datFile, version, offset) if err != nil { fmt.Printf("cannot read needle header: %v", err) diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 16d8b6d04..6572ea6c7 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -189,7 +189,7 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId, version := v.Version() - offset := int64(SuperBlockSize) + offset := int64(v.SuperBlock.BlockSize()) n, rest, e := ReadNeedleHeader(v.dataFile, version, offset) if e != nil { err = fmt.Errorf("cannot read needle header: %v", e) diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go index d6350ba0b..b15b848bd 100644 --- a/weed/storage/volume_super_block.go +++ b/weed/storage/volume_super_block.go @@ -9,7 +9,7 @@ import ( ) const ( - SuperBlockSize = 8 + _SuperBlockSize = 8 ) /* @@ -27,11 +27,15 @@ type SuperBlock struct { CompactRevision uint16 } +func (s *SuperBlock) BlockSize() int { + return _SuperBlockSize +} + func (s *SuperBlock) Version() Version { return s.version } func (s *SuperBlock) Bytes() []byte { - header := make([]byte, SuperBlockSize) + header := make([]byte, _SuperBlockSize) header[0] = byte(s.version) header[1] = s.ReplicaPlacement.Byte() s.Ttl.ToBytes(header[2:4]) @@ -59,18 +63,13 @@ func (v *Volume) maybeWriteSuperBlock() error { } return e } + func (v *Volume) readSuperBlock() (err error) { - if _, err = v.dataFile.Seek(0, 0); err != nil { - return fmt.Errorf("cannot seek to the beginning of %s: %v", v.dataFile.Name(), err) - } - header := make([]byte, SuperBlockSize) - if _, e := v.dataFile.Read(header); e != nil { - return fmt.Errorf("cannot read volume %d super block: %v", v.Id, e) - } - v.SuperBlock, err = ParseSuperBlock(header) + v.SuperBlock, err = ReadSuperBlock(v.dataFile) return err } -func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { + +func parseSuperBlock(header []byte) (superBlock SuperBlock, err error) { superBlock.version = Version(header[0]) if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { err = fmt.Errorf("cannot read replica type: %s", err.Error()) @@ -79,3 +78,17 @@ func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { superBlock.CompactRevision = util.BytesToUint16(header[4:6]) return } + +// ReadSuperBlock reads from data file and load it into volume's super block +func ReadSuperBlock(dataFile *os.File) (superBlock SuperBlock, err error) { + if _, err = dataFile.Seek(0, 0); err != nil { + err = fmt.Errorf("cannot seek to the beginning of %s: %v", dataFile.Name(), err) + return + } + header := make([]byte, _SuperBlockSize) + if _, e := dataFile.Read(header); e != nil { + err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e) + return + } + return parseSuperBlock(header) +} diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 9171cadfb..230fca104 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -87,14 +87,7 @@ func (v *Volume) cleanupCompact() error { } func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) { - if _, err = file.Seek(0, 0); err != nil { - return 0, fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err) - } - header := make([]byte, SuperBlockSize) - if _, e := file.Read(header); e != nil { - return 0, fmt.Errorf("cannot read file %s 's super block: %v", file.Name(), e) - } - superBlock, err := ParseSuperBlock(header) + superBlock, err := ReadSuperBlock(file) if err != nil { return 0, err } @@ -242,7 +235,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca defer idx.Close() nm := NewBtreeNeedleMap(idx) - new_offset := int64(SuperBlockSize) + new_offset := int64(0) now := uint64(time.Now().Unix()) @@ -250,6 +243,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca func(superBlock SuperBlock) error { superBlock.CompactRevision++ _, err = dst.Write(superBlock.Bytes()) + new_offset = int64(superBlock.BlockSize()) return err }, true, func(n *Needle, offset int64) error { if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { @@ -297,7 +291,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { v.SuperBlock.CompactRevision++ dst.Write(v.SuperBlock.Bytes()) - new_offset := int64(SuperBlockSize) + new_offset := int64(v.SuperBlock.BlockSize()) WalkIndexFile(oldIndexFile, func(key uint64, offset, size uint32) error { if offset == 0 || size == TombstoneFileSize {