From 9ac4935f22a2e871d8933cf36ddc2cb2f85625a7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 27 Nov 2020 16:18:48 -0800 Subject: [PATCH] read from volume index file directly instead of open a separate file fix https://github.com/chrislusf/seaweedfs/issues/1640 read from volume index file directly instead of open a separate file, to ensure reading latest index entries. --- weed/storage/needle_map.go | 20 ++++++++++++++++++++ weed/storage/volume_backup.go | 32 +++++++++++--------------------- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index e91856dfe..9f331267d 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -2,9 +2,11 @@ package storage import ( "fmt" + "io" "os" "sync" + "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle_map" . "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -31,6 +33,7 @@ type NeedleMapper interface { MaxFileKey() NeedleId IndexFileSize() uint64 Sync() error + ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error) } type baseNeedleMapper struct { @@ -64,3 +67,20 @@ func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size func (nm *baseNeedleMapper) Sync() error { return nm.indexFile.Sync() } + +func (nm *baseNeedleMapper) ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error) { + bytes := make([]byte, NeedleMapEntrySize) + var readCount int + if readCount, err = nm.indexFile.ReadAt(bytes, n*NeedleMapEntrySize); err != nil { + if err == io.EOF { + if readCount == NeedleMapEntrySize { + err = nil + } + } + if err != nil { + return + } + } + key, offset, size = idx.IdxFileEntry(bytes) + return +} diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index 62004d4da..9aeb10f69 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -168,25 +168,13 @@ func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) { // on server side func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) { - indexFile, openErr := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644) - if openErr != nil { - err = fmt.Errorf("cannot read %s: %v", v.FileName(".idx"), openErr) - return - } - defer indexFile.Close() - fi, statErr := indexFile.Stat() - if statErr != nil { - err = fmt.Errorf("file %s stat error: %v", indexFile.Name(), statErr) - return - } - fileSize := fi.Size() + fileSize := int64(v.IndexFileSize()) if fileSize%NeedleMapEntrySize != 0 { - err = fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize) + err = fmt.Errorf("unexpected file %s.idx size: %d", v.IndexFileName(), fileSize) return } - bytes := make([]byte, NeedleMapEntrySize) entryCount := fileSize / NeedleMapEntrySize l := int64(0) h := entryCount @@ -200,7 +188,7 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast } // read the appendAtNs for entry m - offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, m) + offset, err = v.readOffsetFromIndex(m) if err != nil { return } @@ -224,19 +212,21 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast return Offset{}, true, nil } - offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, l) + offset, err = v.readOffsetFromIndex(l) return offset, false, err } // bytes is of size NeedleMapEntrySize -func (v *Volume) readAppendAtNsForIndexEntry(indexFile *os.File, bytes []byte, m int64) (Offset, error) { - if _, readErr := indexFile.ReadAt(bytes, m*NeedleMapEntrySize); readErr != nil && readErr != io.EOF { - return Offset{}, readErr +func (v *Volume) readOffsetFromIndex(m int64) (Offset, error) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + if v.nm == nil { + return Offset{}, io.EOF } - _, offset, _ := idx.IdxFileEntry(bytes) - return offset, nil + _, offset, _, err := v.nm.ReadIndexEntry(m) + return offset, err } // generate the volume idx