2016-07-03 07:10:27 +00:00
package storage
import (
"bytes"
"errors"
"fmt"
"io"
"os"
"time"
2019-09-12 13:18:21 +00:00
"github.com/chrislusf/seaweedfs/weed/glog"
2019-10-29 07:35:16 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/backend"
2019-09-12 13:18:21 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/needle"
2019-12-23 20:48:20 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
2019-09-12 13:18:21 +00:00
. "github.com/chrislusf/seaweedfs/weed/storage/types"
2016-07-03 07:10:27 +00:00
)
2021-02-20 08:57:07 +00:00
var ErrorNotFound = errors . New ( "not found" )
var ErrorDeleted = errors . New ( "already deleted" )
var ErrorSizeMismatch = errors . New ( "size mismatch" )
2019-01-06 03:52:17 +00:00
2020-11-28 08:09:29 +00:00
func ( v * Volume ) checkReadWriteError ( err error ) {
if err == nil {
if v . lastIoError != nil {
v . lastIoError = nil
}
return
}
if err . Error ( ) == "input/output error" {
v . lastIoError = err
}
}
2016-07-03 07:10:27 +00:00
// isFileUnchanged checks whether this needle to write is same as last one.
// It requires serialized access in the same volume.
2019-04-19 04:43:36 +00:00
func ( v * Volume ) isFileUnchanged ( n * needle . Needle ) bool {
2016-07-03 07:10:27 +00:00
if v . Ttl . String ( ) != "" {
return false
}
2019-08-14 08:08:01 +00:00
2016-07-03 07:10:27 +00:00
nv , ok := v . nm . Get ( n . Id )
2020-08-19 01:01:37 +00:00
if ok && ! nv . Offset . IsZero ( ) && nv . Size . IsValid ( ) {
2019-04-19 04:43:36 +00:00
oldNeedle := new ( needle . Needle )
2021-02-07 04:11:51 +00:00
err := oldNeedle . ReadData ( v . DataBackend , nv . Offset . ToActualOffset ( ) , nv . Size , v . Version ( ) )
2016-07-03 07:10:27 +00:00
if err != nil {
2021-02-07 04:11:51 +00:00
glog . V ( 0 ) . Infof ( "Failed to check updated file at offset %d size %d: %v" , nv . Offset . ToActualOffset ( ) , nv . Size , err )
2016-07-03 07:10:27 +00:00
return false
}
2019-04-21 20:31:45 +00:00
if oldNeedle . Cookie == n . Cookie && oldNeedle . Checksum == n . Checksum && bytes . Equal ( oldNeedle . Data , n . Data ) {
2016-07-03 07:10:27 +00:00
n . DataSize = oldNeedle . DataSize
return true
}
}
return false
}
// Destroy removes everything related to this volume
func ( v * Volume ) Destroy ( ) ( err error ) {
2019-08-12 07:53:50 +00:00
if v . isCompacting {
err = fmt . Errorf ( "volume %d is compacting" , v . Id )
return
}
2020-05-03 11:22:45 +00:00
close ( v . asyncRequestsChan )
2019-12-26 00:17:58 +00:00
storageName , storageKey := v . RemoteStorageNameKey ( )
if v . HasRemoteFile ( ) && storageName != "" && storageKey != "" {
if backendStorage , found := backend . BackendStorages [ storageName ] ; found {
backendStorage . DeleteFile ( storageKey )
}
}
2018-11-05 16:53:38 +00:00
v . Close ( )
2020-11-27 11:17:10 +00:00
removeVolumeFiles ( v . DataFileName ( ) )
removeVolumeFiles ( v . IndexFileName ( ) )
2016-07-03 07:10:27 +00:00
return
}
2020-10-27 22:56:49 +00:00
func removeVolumeFiles ( filename string ) {
2020-11-26 23:22:42 +00:00
// basic
2020-10-31 23:45:38 +00:00
os . Remove ( filename + ".dat" )
2020-10-27 22:56:49 +00:00
os . Remove ( filename + ".idx" )
os . Remove ( filename + ".vif" )
2020-11-26 23:22:42 +00:00
// sorted index file
2020-10-27 22:56:49 +00:00
os . Remove ( filename + ".sdx" )
2020-11-26 23:22:42 +00:00
// compaction
2020-10-27 22:56:49 +00:00
os . Remove ( filename + ".cpd" )
os . Remove ( filename + ".cpx" )
2020-11-26 23:22:42 +00:00
// level db indx file
2020-10-27 22:56:49 +00:00
os . RemoveAll ( filename + ".ldb" )
2020-11-26 23:22:42 +00:00
// marker for damaged or incomplete volume
2020-10-27 22:56:49 +00:00
os . Remove ( filename + ".note" )
}
2020-05-03 11:22:45 +00:00
func ( v * Volume ) asyncRequestAppend ( request * needle . AsyncRequest ) {
v . asyncRequestsChan <- request
}
2020-08-19 00:04:28 +00:00
func ( v * Volume ) syncWrite ( n * needle . Needle ) ( offset uint64 , size Size , isUnchanged bool , err error ) {
2020-01-25 04:06:58 +00:00
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
2020-08-19 00:04:28 +00:00
actualSize := needle . GetActualSize ( Size ( len ( n . Data ) ) , v . Version ( ) )
2020-05-06 13:10:39 +00:00
2016-07-03 07:10:27 +00:00
v . dataFileAccessLock . Lock ( )
defer v . dataFileAccessLock . Unlock ( )
2020-05-06 13:10:39 +00:00
if MaxPossibleVolumeSize < v . nm . ContentSize ( ) + uint64 ( actualSize ) {
2020-06-08 15:12:59 +00:00
err = fmt . Errorf ( "volume size limit %d exceeded! current size is %d" , MaxPossibleVolumeSize , v . nm . ContentSize ( ) )
2020-05-06 13:10:39 +00:00
return
}
2016-07-03 07:10:27 +00:00
if v . isFileUnchanged ( n ) {
2020-08-19 00:04:28 +00:00
size = Size ( n . DataSize )
2019-04-21 20:33:23 +00:00
isUnchanged = true
2016-07-03 07:10:27 +00:00
return
}
2019-07-18 06:57:34 +00:00
// check whether existing needle cookie matches
nv , ok := v . nm . Get ( n . Id )
if ok {
2021-02-07 04:11:51 +00:00
existingNeedle , _ , _ , existingNeedleReadErr := needle . ReadNeedleHeader ( v . DataBackend , v . Version ( ) , nv . Offset . ToActualOffset ( ) )
2019-07-18 06:57:34 +00:00
if existingNeedleReadErr != nil {
err = fmt . Errorf ( "reading existing needle: %v" , existingNeedleReadErr )
return
}
if existingNeedle . Cookie != n . Cookie {
glog . V ( 0 ) . Infof ( "write cookie mismatch: existing %x, new %x" , existingNeedle . Cookie , n . Cookie )
err = fmt . Errorf ( "mismatching cookie %x" , n . Cookie )
return
}
}
// append to dat file
2018-07-24 09:44:33 +00:00
n . AppendAtNs = uint64 ( time . Now ( ) . UnixNano ( ) )
2020-11-28 08:09:29 +00:00
offset , size , _ , err = n . Append ( v . DataBackend , v . Version ( ) )
v . checkReadWriteError ( err )
if err != nil {
2016-07-03 07:10:27 +00:00
return
}
2020-05-06 13:10:39 +00:00
2019-04-19 07:39:34 +00:00
v . lastAppendAtNs = n . AppendAtNs
2017-01-06 18:22:20 +00:00
2019-07-18 06:57:34 +00:00
// add to needle map
2021-02-07 04:11:51 +00:00
if ! ok || uint64 ( nv . Offset . ToActualOffset ( ) ) < offset {
2019-04-09 02:40:56 +00:00
if err = v . nm . Put ( n . Id , ToOffset ( int64 ( offset ) ) , n . Size ) ; err != nil {
2016-07-03 07:10:27 +00:00
glog . V ( 4 ) . Infof ( "failed to save in needle map %d: %v" , n . Id , err )
}
}
2019-04-19 07:39:34 +00:00
if v . lastModifiedTsSeconds < n . LastModified {
v . lastModifiedTsSeconds = n . LastModified
2016-07-03 07:10:27 +00:00
}
return
}
2020-08-19 00:04:28 +00:00
func ( v * Volume ) writeNeedle2 ( n * needle . Needle , fsync bool ) ( offset uint64 , size Size , isUnchanged bool , err error ) {
2020-05-03 11:22:45 +00:00
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
if n . Ttl == needle . EMPTY_TTL && v . Ttl != needle . EMPTY_TTL {
n . SetHasTtl ( )
n . Ttl = v . Ttl
}
2020-05-06 13:10:39 +00:00
if ! fsync {
return v . syncWrite ( n )
} else {
asyncRequest := needle . NewAsyncRequest ( n , true )
// using len(n.Data) here instead of n.Size before n.Size is populated in n.Append()
2020-08-19 00:04:28 +00:00
asyncRequest . ActualSize = needle . GetActualSize ( Size ( len ( n . Data ) ) , v . Version ( ) )
2020-05-03 11:22:45 +00:00
2020-05-06 13:10:39 +00:00
v . asyncRequestAppend ( asyncRequest )
offset , _ , isUnchanged , err = asyncRequest . WaitComplete ( )
2020-05-03 11:22:45 +00:00
2020-05-06 13:10:39 +00:00
return
}
2020-05-03 11:22:45 +00:00
}
2020-08-19 00:04:28 +00:00
func ( v * Volume ) doWriteRequest ( n * needle . Needle ) ( offset uint64 , size Size , isUnchanged bool , err error ) {
2020-05-03 11:22:45 +00:00
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
if v . isFileUnchanged ( n ) {
2020-08-19 00:04:28 +00:00
size = Size ( n . DataSize )
2020-05-03 11:22:45 +00:00
isUnchanged = true
return
}
// check whether existing needle cookie matches
nv , ok := v . nm . Get ( n . Id )
if ok {
2021-02-07 04:11:51 +00:00
existingNeedle , _ , _ , existingNeedleReadErr := needle . ReadNeedleHeader ( v . DataBackend , v . Version ( ) , nv . Offset . ToActualOffset ( ) )
2020-05-03 11:22:45 +00:00
if existingNeedleReadErr != nil {
err = fmt . Errorf ( "reading existing needle: %v" , existingNeedleReadErr )
return
}
if existingNeedle . Cookie != n . Cookie {
glog . V ( 0 ) . Infof ( "write cookie mismatch: existing %x, new %x" , existingNeedle . Cookie , n . Cookie )
err = fmt . Errorf ( "mismatching cookie %x" , n . Cookie )
return
}
}
// append to dat file
n . AppendAtNs = uint64 ( time . Now ( ) . UnixNano ( ) )
2020-11-28 08:09:29 +00:00
offset , size , _ , err = n . Append ( v . DataBackend , v . Version ( ) )
v . checkReadWriteError ( err )
if err != nil {
2020-05-03 11:22:45 +00:00
return
}
v . lastAppendAtNs = n . AppendAtNs
// add to needle map
2021-02-07 04:11:51 +00:00
if ! ok || uint64 ( nv . Offset . ToActualOffset ( ) ) < offset {
2020-05-03 11:22:45 +00:00
if err = v . nm . Put ( n . Id , ToOffset ( int64 ( offset ) ) , n . Size ) ; err != nil {
glog . V ( 4 ) . Infof ( "failed to save in needle map %d: %v" , n . Id , err )
}
}
if v . lastModifiedTsSeconds < n . LastModified {
v . lastModifiedTsSeconds = n . LastModified
}
return
}
2020-08-19 00:04:28 +00:00
func ( v * Volume ) syncDelete ( n * needle . Needle ) ( Size , error ) {
2020-10-24 16:42:54 +00:00
// glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
2020-05-06 13:10:39 +00:00
actualSize := needle . GetActualSize ( 0 , v . Version ( ) )
2016-07-03 07:10:27 +00:00
v . dataFileAccessLock . Lock ( )
defer v . dataFileAccessLock . Unlock ( )
2020-05-06 13:10:39 +00:00
if MaxPossibleVolumeSize < v . nm . ContentSize ( ) + uint64 ( actualSize ) {
2020-06-08 15:12:59 +00:00
err := fmt . Errorf ( "volume size limit %d exceeded! current size is %d" , MaxPossibleVolumeSize , v . nm . ContentSize ( ) )
2020-05-06 13:10:39 +00:00
return 0 , err
}
2016-07-03 07:10:27 +00:00
nv , ok := v . nm . Get ( n . Id )
2020-08-19 02:22:16 +00:00
// fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
2020-08-19 01:01:37 +00:00
if ok && nv . Size . IsValid ( ) {
2016-07-03 07:10:27 +00:00
size := nv . Size
2018-12-31 23:08:32 +00:00
n . Data = nil
n . AppendAtNs = uint64 ( time . Now ( ) . UnixNano ( ) )
2020-09-12 19:42:36 +00:00
offset , _ , _ , err := n . Append ( v . DataBackend , v . Version ( ) )
2020-11-28 08:09:29 +00:00
v . checkReadWriteError ( err )
2017-01-20 15:31:11 +00:00
if err != nil {
2016-07-03 07:10:27 +00:00
return size , err
}
2019-04-19 07:39:34 +00:00
v . lastAppendAtNs = n . AppendAtNs
2020-09-12 19:42:36 +00:00
if err = v . nm . Delete ( n . Id , ToOffset ( int64 ( offset ) ) ) ; err != nil {
2016-07-03 07:10:27 +00:00
return size , err
}
return size , err
}
return 0 , nil
}
2020-08-19 00:04:28 +00:00
func ( v * Volume ) deleteNeedle2 ( n * needle . Needle ) ( Size , error ) {
2020-05-06 13:10:39 +00:00
// todo: delete info is always appended no fsync, it may need fsync in future
fsync := false
2020-05-03 11:22:45 +00:00
2020-05-06 13:10:39 +00:00
if ! fsync {
return v . syncDelete ( n )
} else {
asyncRequest := needle . NewAsyncRequest ( n , false )
asyncRequest . ActualSize = needle . GetActualSize ( 0 , v . Version ( ) )
2020-05-03 11:22:45 +00:00
2020-05-06 13:10:39 +00:00
v . asyncRequestAppend ( asyncRequest )
_ , size , _ , err := asyncRequest . WaitComplete ( )
2020-08-19 00:04:28 +00:00
return Size ( size ) , err
2020-05-06 13:10:39 +00:00
}
2020-05-03 11:22:45 +00:00
}
2020-08-19 00:04:28 +00:00
func ( v * Volume ) doDeleteRequest ( n * needle . Needle ) ( Size , error ) {
2020-05-03 11:22:45 +00:00
glog . V ( 4 ) . Infof ( "delete needle %s" , needle . NewFileIdFromNeedle ( v . Id , n ) . String ( ) )
nv , ok := v . nm . Get ( n . Id )
2020-08-19 02:22:16 +00:00
// fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
2020-08-19 01:01:37 +00:00
if ok && nv . Size . IsValid ( ) {
2020-05-03 11:22:45 +00:00
size := nv . Size
n . Data = nil
n . AppendAtNs = uint64 ( time . Now ( ) . UnixNano ( ) )
2020-09-12 19:42:36 +00:00
offset , _ , _ , err := n . Append ( v . DataBackend , v . Version ( ) )
2020-11-28 08:09:29 +00:00
v . checkReadWriteError ( err )
2020-05-03 11:22:45 +00:00
if err != nil {
return size , err
}
v . lastAppendAtNs = n . AppendAtNs
2020-09-12 19:42:36 +00:00
if err = v . nm . Delete ( n . Id , ToOffset ( int64 ( offset ) ) ) ; err != nil {
2020-05-03 11:22:45 +00:00
return size , err
}
return size , err
}
return 0 , nil
}
2016-07-03 07:10:27 +00:00
// read fills in Needle content by looking up n.Id from NeedleMapper
2020-08-19 00:37:26 +00:00
func ( v * Volume ) readNeedle ( n * needle . Needle , readOption * ReadOption ) ( int , error ) {
2019-12-06 14:59:57 +00:00
v . dataFileAccessLock . RLock ( )
defer v . dataFileAccessLock . RUnlock ( )
2019-08-14 08:08:01 +00:00
2016-07-03 07:10:27 +00:00
nv , ok := v . nm . Get ( n . Id )
2019-04-09 02:40:56 +00:00
if ! ok || nv . Offset . IsZero ( ) {
2021-02-20 08:57:07 +00:00
return - 1 , ErrorNotFound
2016-07-03 07:10:27 +00:00
}
2020-08-19 02:22:16 +00:00
readSize := nv . Size
if readSize . IsDeleted ( ) {
if readOption != nil && readOption . ReadDeleted && readSize != TombstoneFileSize {
2020-09-12 19:42:36 +00:00
glog . V ( 3 ) . Infof ( "reading deleted %s" , n . String ( ) )
2020-08-19 02:22:16 +00:00
readSize = - readSize
} else {
2021-02-20 08:57:07 +00:00
return - 1 , ErrorDeleted
2020-08-19 02:22:16 +00:00
}
2017-01-06 18:22:20 +00:00
}
2020-08-19 02:22:16 +00:00
if readSize == 0 {
2019-01-08 17:03:28 +00:00
return 0 , nil
}
2021-02-07 04:11:51 +00:00
err := n . ReadData ( v . DataBackend , nv . Offset . ToActualOffset ( ) , readSize , v . Version ( ) )
2021-02-20 08:57:07 +00:00
if err == needle . ErrorSizeMismatch && OffsetSize == 4 {
2021-02-19 07:59:55 +00:00
// add special handling for .dat larger than 32GB, from git commit comment of
// 06c15ab3 Chris Lu <chris.lu@gmail.com> on 2020/10/28 at 4:11 上
2021-02-07 04:11:51 +00:00
err = n . ReadData ( v . DataBackend , nv . Offset . ToActualOffset ( ) + int64 ( MaxPossibleVolumeSize ) , readSize , v . Version ( ) )
2020-10-27 20:11:56 +00:00
}
2020-11-28 08:09:29 +00:00
v . checkReadWriteError ( err )
2016-07-03 07:10:27 +00:00
if err != nil {
return 0 , err
}
bytesRead := len ( n . Data )
if ! n . HasTtl ( ) {
return bytesRead , nil
}
ttlMinutes := n . Ttl . Minutes ( )
if ttlMinutes == 0 {
return bytesRead , nil
}
if ! n . HasLastModifiedDate ( ) {
return bytesRead , nil
}
2020-12-12 00:55:18 +00:00
if time . Now ( ) . Before ( time . Unix ( 0 , int64 ( n . AppendAtNs ) ) . Add ( time . Duration ( ttlMinutes ) * time . Minute ) ) {
2016-07-03 07:10:27 +00:00
return bytesRead , nil
}
2021-02-20 08:57:07 +00:00
return - 1 , ErrorNotFound
2016-07-03 07:10:27 +00:00
}
2020-05-03 11:22:45 +00:00
func ( v * Volume ) startWorker ( ) {
go func ( ) {
chanClosed := false
for {
// chan closed. go thread will exit
if chanClosed {
break
}
currentRequests := make ( [ ] * needle . AsyncRequest , 0 , 128 )
currentBytesToWrite := int64 ( 0 )
for {
request , ok := <- v . asyncRequestsChan
2020-08-19 02:22:16 +00:00
// volume may be closed
2020-05-03 11:22:45 +00:00
if ! ok {
chanClosed = true
break
}
if MaxPossibleVolumeSize < v . ContentSize ( ) + uint64 ( currentBytesToWrite + request . ActualSize ) {
request . Complete ( 0 , 0 , false ,
fmt . Errorf ( "volume size limit %d exceeded! current size is %d" , MaxPossibleVolumeSize , v . ContentSize ( ) ) )
break
}
currentRequests = append ( currentRequests , request )
currentBytesToWrite += request . ActualSize
// submit at most 4M bytes or 128 requests at one time to decrease request delay.
// it also need to break if there is no data in channel to avoid io hang.
if currentBytesToWrite >= 4 * 1024 * 1024 || len ( currentRequests ) >= 128 || len ( v . asyncRequestsChan ) == 0 {
break
}
}
if len ( currentRequests ) == 0 {
continue
}
v . dataFileAccessLock . Lock ( )
end , _ , e := v . DataBackend . GetStat ( )
if e != nil {
for i := 0 ; i < len ( currentRequests ) ; i ++ {
currentRequests [ i ] . Complete ( 0 , 0 , false ,
fmt . Errorf ( "cannot read current volume position: %v" , e ) )
}
v . dataFileAccessLock . Unlock ( )
continue
}
for i := 0 ; i < len ( currentRequests ) ; i ++ {
if currentRequests [ i ] . IsWriteRequest {
offset , size , isUnchanged , err := v . doWriteRequest ( currentRequests [ i ] . N )
currentRequests [ i ] . UpdateResult ( offset , uint64 ( size ) , isUnchanged , err )
} else {
size , err := v . doDeleteRequest ( currentRequests [ i ] . N )
currentRequests [ i ] . UpdateResult ( 0 , uint64 ( size ) , false , err )
}
}
2020-05-06 13:10:39 +00:00
// if sync error, data is not reliable, we should mark the completed request as fail and rollback
if err := v . DataBackend . Sync ( ) ; err != nil {
// todo: this may generate dirty data or cause data inconsistent, may be weed need to panic?
if te := v . DataBackend . Truncate ( end ) ; te != nil {
glog . V ( 0 ) . Infof ( "Failed to truncate %s back to %d with error: %v" , v . DataBackend . Name ( ) , end , te )
}
for i := 0 ; i < len ( currentRequests ) ; i ++ {
if currentRequests [ i ] . IsSucceed ( ) {
currentRequests [ i ] . UpdateResult ( 0 , 0 , false , err )
2020-05-03 11:22:45 +00:00
}
}
}
for i := 0 ; i < len ( currentRequests ) ; i ++ {
currentRequests [ i ] . Submit ( )
}
v . dataFileAccessLock . Unlock ( )
}
} ( )
}
2019-01-16 09:48:59 +00:00
type VolumeFileScanner interface {
2019-12-23 20:48:20 +00:00
VisitSuperBlock ( super_block . SuperBlock ) error
2019-01-16 09:48:59 +00:00
ReadNeedleBody ( ) bool
2019-10-22 07:50:30 +00:00
VisitNeedle ( n * needle . Needle , offset int64 , needleHeader , needleBody [ ] byte ) error
2019-01-16 09:48:59 +00:00
}
2019-04-19 04:43:36 +00:00
func ScanVolumeFile ( dirname string , collection string , id needle . VolumeId ,
2021-02-07 01:00:03 +00:00
needleMapKind NeedleMapKind ,
2019-01-16 09:48:59 +00:00
volumeFileScanner VolumeFileScanner ) ( err error ) {
2016-07-03 07:10:27 +00:00
var v * Volume
if v , err = loadVolumeWithoutIndex ( dirname , collection , id , needleMapKind ) ; err != nil {
2019-03-26 06:01:53 +00:00
return fmt . Errorf ( "failed to load volume %d: %v" , id , err )
2016-07-03 07:10:27 +00:00
}
2020-09-15 05:57:23 +00:00
if err = volumeFileScanner . VisitSuperBlock ( v . SuperBlock ) ; err != nil {
return fmt . Errorf ( "failed to process volume %d super block: %v" , id , err )
2016-07-03 07:10:27 +00:00
}
2018-11-04 07:28:24 +00:00
defer v . Close ( )
2016-07-03 07:10:27 +00:00
version := v . Version ( )
2018-06-24 18:37:08 +00:00
offset := int64 ( v . SuperBlock . BlockSize ( ) )
2019-03-26 06:01:53 +00:00
2019-10-29 07:35:16 +00:00
return ScanVolumeFileFrom ( version , v . DataBackend , offset , volumeFileScanner )
2019-03-26 06:01:53 +00:00
}
2019-11-29 02:33:18 +00:00
func ScanVolumeFileFrom ( version needle . Version , datBackend backend . BackendStorageFile , offset int64 , volumeFileScanner VolumeFileScanner ) ( err error ) {
2019-10-29 07:35:16 +00:00
n , nh , rest , e := needle . ReadNeedleHeader ( datBackend , version , offset )
2016-07-03 07:10:27 +00:00
if e != nil {
2019-03-26 06:01:53 +00:00
if e == io . EOF {
return nil
}
2019-12-09 03:44:16 +00:00
return fmt . Errorf ( "cannot read %s at offset %d: %v" , datBackend . Name ( ) , offset , e )
2016-07-03 07:10:27 +00:00
}
for n != nil {
2019-10-22 07:50:30 +00:00
var needleBody [ ] byte
2019-01-16 09:48:59 +00:00
if volumeFileScanner . ReadNeedleBody ( ) {
2020-09-15 05:57:23 +00:00
// println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest)
2019-10-29 07:35:16 +00:00
if needleBody , err = n . ReadNeedleBody ( datBackend , version , offset + NeedleHeaderSize , rest ) ; err != nil {
2020-09-15 05:57:23 +00:00
glog . V ( 0 ) . Infof ( "cannot read needle head [%d, %d) body [%d, %d) body length %d: %v" , offset , offset + NeedleHeaderSize , offset + NeedleHeaderSize , offset + NeedleHeaderSize + rest , rest , err )
2020-08-19 02:22:16 +00:00
// err = fmt.Errorf("cannot read needle body: %v", err)
// return
2016-07-03 07:10:27 +00:00
}
}
2019-10-22 07:50:30 +00:00
err := volumeFileScanner . VisitNeedle ( n , offset , nh , needleBody )
2018-07-15 03:51:17 +00:00
if err == io . EOF {
return nil
}
if err != nil {
2016-07-03 07:10:27 +00:00
glog . V ( 0 ) . Infof ( "visit needle error: %v" , err )
2019-07-18 06:22:01 +00:00
return fmt . Errorf ( "visit needle error: %v" , err )
2016-07-03 07:10:27 +00:00
}
2019-04-19 07:39:34 +00:00
offset += NeedleHeaderSize + rest
2016-07-03 07:10:27 +00:00
glog . V ( 4 ) . Infof ( "==> new entry offset %d" , offset )
2019-10-29 07:35:16 +00:00
if n , nh , rest , err = needle . ReadNeedleHeader ( datBackend , version , offset ) ; err != nil {
2019-04-18 07:18:29 +00:00
if err == io . EOF {
return nil
}
return fmt . Errorf ( "cannot read needle header at offset %d: %v" , offset , err )
}
glog . V ( 4 ) . Infof ( "new entry needle size:%d rest:%d" , n . Size , rest )
}
return nil
}