2021-03-21 20:05:53 +00:00
package storage
import (
"fmt"
2022-07-29 07:17:28 +00:00
"github.com/seaweedfs/seaweedfs/weed/util/mem"
2021-03-21 20:05:53 +00:00
"io"
"time"
2022-07-29 07:17:28 +00:00
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
2021-03-21 20:05:53 +00:00
)
2022-06-05 18:54:04 +00:00
const PagedReadLimit = 1024 * 1024
2021-03-21 20:05:53 +00:00
// read fills in Needle content by looking up n.Id from NeedleMapper
2022-06-05 18:54:04 +00:00
func ( v * Volume ) readNeedle ( n * needle . Needle , readOption * ReadOption , onReadSizeFn func ( size Size ) ) ( count int , err error ) {
2021-03-21 20:05:53 +00:00
v . dataFileAccessLock . RLock ( )
defer v . dataFileAccessLock . RUnlock ( )
nv , ok := v . nm . Get ( n . Id )
if ! ok || nv . Offset . IsZero ( ) {
return - 1 , ErrorNotFound
}
readSize := nv . Size
if readSize . IsDeleted ( ) {
if readOption != nil && readOption . ReadDeleted && readSize != TombstoneFileSize {
glog . V ( 3 ) . Infof ( "reading deleted %s" , n . String ( ) )
readSize = - readSize
} else {
return - 1 , ErrorDeleted
}
}
if readSize == 0 {
return 0 , nil
}
2021-08-09 06:25:16 +00:00
if onReadSizeFn != nil {
onReadSizeFn ( readSize )
}
2022-06-05 18:54:04 +00:00
if readOption != nil && readOption . AttemptMetaOnly && readSize > PagedReadLimit {
readOption . VolumeRevision = v . SuperBlock . CompactionRevision
2022-06-05 22:24:02 +00:00
err = n . ReadNeedleMeta ( v . DataBackend , nv . Offset . ToActualOffset ( ) , readSize , v . Version ( ) )
2022-06-05 18:54:04 +00:00
if err == needle . ErrorSizeMismatch && OffsetSize == 4 {
readOption . IsOutOfRange = true
2022-06-05 22:24:02 +00:00
err = n . ReadNeedleMeta ( v . DataBackend , nv . Offset . ToActualOffset ( ) + int64 ( MaxPossibleVolumeSize ) , readSize , v . Version ( ) )
2022-06-05 18:54:04 +00:00
}
if err != nil {
return 0 , err
}
if ! n . IsCompressed ( ) && ! n . IsChunkedManifest ( ) {
readOption . IsMetaOnly = true
}
2021-03-21 20:05:53 +00:00
}
2022-06-05 18:54:04 +00:00
if readOption == nil || ! readOption . IsMetaOnly {
err = n . ReadData ( v . DataBackend , nv . Offset . ToActualOffset ( ) , readSize , v . Version ( ) )
v . checkReadWriteError ( err )
if err != nil {
return 0 , err
}
2021-03-21 20:05:53 +00:00
}
2022-06-05 18:54:04 +00:00
count = int ( n . DataSize )
2021-03-21 20:05:53 +00:00
if ! n . HasTtl ( ) {
2022-06-05 18:54:04 +00:00
return
2021-03-21 20:05:53 +00:00
}
ttlMinutes := n . Ttl . Minutes ( )
if ttlMinutes == 0 {
2022-06-05 18:54:04 +00:00
return
2021-03-21 20:05:53 +00:00
}
if ! n . HasLastModifiedDate ( ) {
2022-06-05 18:54:04 +00:00
return
2021-03-21 20:05:53 +00:00
}
if time . Now ( ) . Before ( time . Unix ( 0 , int64 ( n . AppendAtNs ) ) . Add ( time . Duration ( ttlMinutes ) * time . Minute ) ) {
2022-06-05 18:54:04 +00:00
return
2021-03-21 20:05:53 +00:00
}
return - 1 , ErrorNotFound
}
2022-09-07 06:51:27 +00:00
// read needle at a specific offset
func ( v * Volume ) readNeedleMetaAt ( n * needle . Needle , offset int64 , size int32 ) ( err error ) {
v . dataFileAccessLock . RLock ( )
defer v . dataFileAccessLock . RUnlock ( )
2022-10-25 05:09:38 +00:00
// read deleted needle meta data
if size < 0 {
size = 0
}
2022-09-07 06:51:27 +00:00
err = n . ReadNeedleMeta ( v . DataBackend , offset , Size ( size ) , v . Version ( ) )
if err == needle . ErrorSizeMismatch && OffsetSize == 4 {
err = n . ReadNeedleMeta ( v . DataBackend , offset + int64 ( MaxPossibleVolumeSize ) , Size ( size ) , v . Version ( ) )
}
if err != nil {
return err
}
return nil
}
2022-06-05 18:54:04 +00:00
// read fills in Needle content by looking up n.Id from NeedleMapper
func ( v * Volume ) readNeedleDataInto ( n * needle . Needle , readOption * ReadOption , writer io . Writer , offset int64 , size int64 ) ( err error ) {
2022-09-15 10:11:32 +00:00
if ! readOption . HasSlowRead {
v . dataFileAccessLock . RLock ( )
defer v . dataFileAccessLock . RUnlock ( )
}
if readOption . HasSlowRead {
v . dataFileAccessLock . RLock ( )
}
2022-06-05 18:54:04 +00:00
nv , ok := v . nm . Get ( n . Id )
2022-09-15 10:11:32 +00:00
if readOption . HasSlowRead {
v . dataFileAccessLock . RUnlock ( )
}
2022-09-09 02:05:31 +00:00
2022-06-05 18:54:04 +00:00
if ! ok || nv . Offset . IsZero ( ) {
return ErrorNotFound
}
readSize := nv . Size
if readSize . IsDeleted ( ) {
if readOption != nil && readOption . ReadDeleted && readSize != TombstoneFileSize {
glog . V ( 3 ) . Infof ( "reading deleted %s" , n . String ( ) )
readSize = - readSize
} else {
return ErrorDeleted
}
}
if readSize == 0 {
return nil
}
actualOffset := nv . Offset . ToActualOffset ( )
if readOption . IsOutOfRange {
actualOffset += int64 ( MaxPossibleVolumeSize )
}
2022-09-16 07:30:40 +00:00
buf := mem . Allocate ( min ( readOption . ReadBufferSize , int ( size ) ) )
2022-09-09 02:05:31 +00:00
defer mem . Free ( buf )
2022-09-09 01:54:02 +00:00
// read needle data
crc := needle . CRC ( 0 )
2022-09-09 01:54:16 +00:00
for x := offset ; x < offset + size ; x += int64 ( len ( buf ) ) {
2022-09-09 02:05:31 +00:00
2022-09-15 10:11:32 +00:00
if readOption . HasSlowRead {
v . dataFileAccessLock . RLock ( )
}
2022-09-09 02:05:31 +00:00
// possibly re-read needle offset if volume is compacted
if readOption . VolumeRevision != v . SuperBlock . CompactionRevision {
// the volume is compacted
nv , ok = v . nm . Get ( n . Id )
if ! ok || nv . Offset . IsZero ( ) {
2022-09-15 10:11:32 +00:00
if readOption . HasSlowRead {
v . dataFileAccessLock . RUnlock ( )
}
2022-09-09 02:05:31 +00:00
return ErrorNotFound
}
2022-09-09 02:07:57 +00:00
actualOffset = nv . Offset . ToActualOffset ( )
2022-09-09 04:01:21 +00:00
readOption . VolumeRevision = v . SuperBlock . CompactionRevision
2022-09-09 02:05:31 +00:00
}
2022-09-09 01:54:16 +00:00
count , err := n . ReadNeedleData ( v . DataBackend , actualOffset , buf , x )
2022-09-15 10:11:32 +00:00
if readOption . HasSlowRead {
v . dataFileAccessLock . RUnlock ( )
}
2022-09-09 02:05:31 +00:00
2022-09-09 01:54:16 +00:00
toWrite := min ( count , int ( offset + size - x ) )
2022-09-09 01:54:02 +00:00
if toWrite > 0 {
crc = crc . Update ( buf [ 0 : toWrite ] )
if _ , err = writer . Write ( buf [ 0 : toWrite ] ) ; err != nil {
return fmt . Errorf ( "ReadNeedleData write: %v" , err )
}
}
if err != nil {
if err == io . EOF {
err = nil
break
}
return fmt . Errorf ( "ReadNeedleData: %v" , err )
}
if count <= 0 {
break
}
}
2022-09-09 01:54:16 +00:00
if offset == 0 && size == int64 ( n . DataSize ) && ( n . Checksum != crc && uint32 ( n . Checksum ) != crc . Value ( ) ) {
2022-09-09 01:54:02 +00:00
// the crc.Value() function is to be deprecated. this double checking is for backward compatible.
return fmt . Errorf ( "ReadNeedleData checksum %v expected %v" , crc , n . Checksum )
}
return nil
2022-06-05 18:54:04 +00:00
}
2022-09-05 01:50:45 +00:00
func min ( x , y int ) int {
if x < y {
return x
}
return y
}
2021-03-22 07:03:16 +00:00
// read fills in Needle content by looking up n.Id from NeedleMapper
func ( v * Volume ) ReadNeedleBlob ( offset int64 , size Size ) ( [ ] byte , error ) {
v . dataFileAccessLock . RLock ( )
defer v . dataFileAccessLock . RUnlock ( )
return needle . ReadNeedleBlob ( v . DataBackend , offset , size , v . Version ( ) )
}
2021-03-21 20:05:53 +00:00
type VolumeFileScanner interface {
VisitSuperBlock ( super_block . SuperBlock ) error
ReadNeedleBody ( ) bool
VisitNeedle ( n * needle . Needle , offset int64 , needleHeader , needleBody [ ] byte ) error
}
func ScanVolumeFile ( dirname string , collection string , id needle . VolumeId ,
needleMapKind NeedleMapKind ,
volumeFileScanner VolumeFileScanner ) ( err error ) {
var v * Volume
if v , err = loadVolumeWithoutIndex ( dirname , collection , id , needleMapKind ) ; err != nil {
return fmt . Errorf ( "failed to load volume %d: %v" , id , err )
}
if err = volumeFileScanner . VisitSuperBlock ( v . SuperBlock ) ; err != nil {
return fmt . Errorf ( "failed to process volume %d super block: %v" , id , err )
}
defer v . Close ( )
version := v . Version ( )
offset := int64 ( v . SuperBlock . BlockSize ( ) )
return ScanVolumeFileFrom ( version , v . DataBackend , offset , volumeFileScanner )
}
func ScanVolumeFileFrom ( version needle . Version , datBackend backend . BackendStorageFile , offset int64 , volumeFileScanner VolumeFileScanner ) ( err error ) {
n , nh , rest , e := needle . ReadNeedleHeader ( datBackend , version , offset )
if e != nil {
if e == io . EOF {
return nil
}
return fmt . Errorf ( "cannot read %s at offset %d: %v" , datBackend . Name ( ) , offset , e )
}
for n != nil {
var needleBody [ ] byte
if volumeFileScanner . ReadNeedleBody ( ) {
// println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest)
if needleBody , err = n . ReadNeedleBody ( datBackend , version , offset + NeedleHeaderSize , rest ) ; err != nil {
glog . V ( 0 ) . Infof ( "cannot read needle head [%d, %d) body [%d, %d) body length %d: %v" , offset , offset + NeedleHeaderSize , offset + NeedleHeaderSize , offset + NeedleHeaderSize + rest , rest , err )
// err = fmt.Errorf("cannot read needle body: %v", err)
// return
}
}
err := volumeFileScanner . VisitNeedle ( n , offset , nh , needleBody )
if err == io . EOF {
return nil
}
if err != nil {
glog . V ( 0 ) . Infof ( "visit needle error: %v" , err )
return fmt . Errorf ( "visit needle error: %v" , err )
}
offset += NeedleHeaderSize + rest
glog . V ( 4 ) . Infof ( "==> new entry offset %d" , offset )
if n , nh , rest , err = needle . ReadNeedleHeader ( datBackend , version , offset ) ; err != nil {
if err == io . EOF {
return nil
}
return fmt . Errorf ( "cannot read needle header at offset %d: %v" , offset , err )
}
glog . V ( 4 ) . Infof ( "new entry needle size:%d rest:%d" , n . Size , rest )
}
return nil
}