2020-09-01 07:21:19 +00:00
package filer
2020-03-27 11:50:51 +00:00
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
2020-04-11 19:45:24 +00:00
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
2020-03-27 11:50:51 +00:00
"github.com/chrislusf/seaweedfs/weed/wdclient"
2020-10-08 06:58:32 +00:00
"github.com/golang/groupcache/singleflight"
"io"
"math/rand"
"sync"
2020-10-10 23:02:39 +00:00
"time"
2020-03-27 11:50:51 +00:00
)
2020-10-11 03:09:43 +00:00
var (
ReadWaitTime = 6 * time . Second
)
2020-03-27 11:50:51 +00:00
type ChunkReadAt struct {
masterClient * wdclient . MasterClient
2020-03-30 04:07:55 +00:00
chunkViews [ ] * ChunkView
2020-10-08 05:49:04 +00:00
lookupFileId LookupFileIdFunctionType
2020-03-27 11:50:51 +00:00
readerLock sync . Mutex
2020-08-16 07:49:08 +00:00
fileSize int64
2020-03-28 20:43:31 +00:00
2020-10-08 05:49:04 +00:00
fetchGroup singleflight . Group
lastChunkFileId string
lastChunkData [ ] byte
chunkCache chunk_cache . ChunkCache
2020-03-27 11:50:51 +00:00
}
// var _ = io.ReaderAt(&ChunkReadAt{})
2020-10-08 05:49:04 +00:00
type LookupFileIdFunctionType func ( fileId string ) ( targetUrls [ ] string , err error )
2020-04-30 00:40:08 +00:00
func LookupFn ( filerClient filer_pb . FilerClient ) LookupFileIdFunctionType {
2020-10-04 03:16:42 +00:00
vidCache := make ( map [ string ] * filer_pb . Locations )
2020-10-08 05:49:04 +00:00
return func ( fileId string ) ( targetUrls [ ] string , err error ) {
2020-10-04 03:16:42 +00:00
vid := VolumeId ( fileId )
locations , found := vidCache [ vid ]
2020-10-10 23:02:39 +00:00
waitTime := time . Second
2020-10-11 03:09:43 +00:00
for ! found && waitTime < ReadWaitTime {
2020-10-04 03:16:42 +00:00
// println("looking up volume", vid)
err = filerClient . WithFilerClient ( func ( client filer_pb . SeaweedFilerClient ) error {
resp , err := client . LookupVolume ( context . Background ( ) , & filer_pb . LookupVolumeRequest {
VolumeIds : [ ] string { vid } ,
} )
if err != nil {
return err
}
locations = resp . LocationsMap [ vid ]
if locations == nil || len ( locations . Locations ) == 0 {
glog . V ( 0 ) . Infof ( "failed to locate %s" , fileId )
return fmt . Errorf ( "failed to locate %s" , fileId )
}
vidCache [ vid ] = locations
return nil
2020-04-30 00:40:08 +00:00
} )
2020-10-10 23:02:39 +00:00
if err == nil {
break
}
glog . V ( 1 ) . Infof ( "wait for volume %s" , vid )
time . Sleep ( waitTime )
waitTime += waitTime / 2
2020-10-04 03:16:42 +00:00
}
2020-04-30 00:40:08 +00:00
2020-10-10 22:43:22 +00:00
if err != nil {
return nil , err
}
2020-10-08 05:49:04 +00:00
for _ , loc := range locations . Locations {
2020-10-12 03:15:10 +00:00
volumeServerAddress := filerClient . AdjustedUrl ( loc )
2020-10-08 05:49:04 +00:00
targetUrl := fmt . Sprintf ( "http://%s/%s" , volumeServerAddress , fileId )
targetUrls = append ( targetUrls , targetUrl )
}
2020-04-30 00:40:08 +00:00
2020-10-08 06:58:32 +00:00
for i := len ( targetUrls ) - 1 ; i > 0 ; i -- {
j := rand . Intn ( i + 1 )
targetUrls [ i ] , targetUrls [ j ] = targetUrls [ j ] , targetUrls [ i ]
}
2020-04-30 00:40:08 +00:00
return
}
}
2020-08-18 03:20:08 +00:00
func NewChunkReaderAtFromClient ( filerClient filer_pb . FilerClient , chunkViews [ ] * ChunkView , chunkCache chunk_cache . ChunkCache , fileSize int64 ) * ChunkReadAt {
2020-03-27 11:50:51 +00:00
return & ChunkReadAt {
2020-05-10 10:50:30 +00:00
chunkViews : chunkViews ,
2020-04-30 00:40:08 +00:00
lookupFileId : LookupFn ( filerClient ) ,
2020-03-28 21:07:16 +00:00
chunkCache : chunkCache ,
2020-08-16 07:49:08 +00:00
fileSize : fileSize ,
2020-03-27 11:50:51 +00:00
}
}
func ( c * ChunkReadAt ) ReadAt ( p [ ] byte , offset int64 ) ( n int , err error ) {
c . readerLock . Lock ( )
defer c . readerLock . Unlock ( )
2020-08-17 23:05:40 +00:00
glog . V ( 4 ) . Infof ( "ReadAt [%d,%d) of total file size %d bytes %d chunk views" , offset , offset + int64 ( len ( p ) ) , c . fileSize , len ( c . chunkViews ) )
2020-08-18 03:14:40 +00:00
return c . doReadAt ( p [ n : ] , offset + int64 ( n ) )
2020-03-27 11:50:51 +00:00
}
func ( c * ChunkReadAt ) doReadAt ( p [ ] byte , offset int64 ) ( n int , err error ) {
2020-08-18 03:14:40 +00:00
var buffer [ ] byte
startOffset , remaining := offset , int64 ( len ( p ) )
2020-10-04 08:31:04 +00:00
var nextChunk * ChunkView
2020-08-17 23:05:40 +00:00
for i , chunk := range c . chunkViews {
2020-08-18 03:14:40 +00:00
if remaining <= 0 {
break
}
2020-10-04 08:31:04 +00:00
if i + 1 < len ( c . chunkViews ) {
nextChunk = c . chunkViews [ i + 1 ]
} else {
nextChunk = nil
}
2020-08-18 03:14:40 +00:00
if startOffset < chunk . LogicOffset {
gap := int ( chunk . LogicOffset - startOffset )
2020-08-18 07:32:01 +00:00
glog . V ( 4 ) . Infof ( "zero [%d,%d)" , startOffset , startOffset + int64 ( gap ) )
n += int ( min ( int64 ( gap ) , remaining ) )
2020-08-18 03:14:40 +00:00
startOffset , remaining = chunk . LogicOffset , remaining - int64 ( gap )
if remaining <= 0 {
break
}
2020-08-17 04:07:46 +00:00
}
2020-08-16 22:16:46 +00:00
// fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
2020-08-18 03:14:40 +00:00
chunkStart , chunkStop := max ( chunk . LogicOffset , startOffset ) , min ( chunk . LogicOffset + int64 ( chunk . Size ) , startOffset + remaining )
2020-08-17 04:07:46 +00:00
if chunkStart >= chunkStop {
continue
}
2020-08-18 03:14:40 +00:00
glog . V ( 4 ) . Infof ( "read [%d,%d), %d/%d chunk %s [%d,%d)" , chunkStart , chunkStop , i , len ( c . chunkViews ) , chunk . FileId , chunk . LogicOffset - chunk . Offset , chunk . LogicOffset - chunk . Offset + int64 ( chunk . Size ) )
2020-10-04 08:31:04 +00:00
buffer , err = c . readFromWholeChunkData ( chunk , nextChunk )
2020-08-17 04:07:46 +00:00
if err != nil {
glog . Errorf ( "fetching chunk %+v: %v\n" , chunk , err )
return
2020-03-27 11:50:51 +00:00
}
2020-08-17 04:07:46 +00:00
bufferOffset := chunkStart - chunk . LogicOffset + chunk . Offset
2020-08-18 04:17:32 +00:00
copied := copy ( p [ startOffset - offset : chunkStop - chunkStart + startOffset - offset ] , buffer [ bufferOffset : bufferOffset + chunkStop - chunkStart ] )
2020-08-17 04:07:46 +00:00
n += copied
2020-08-18 03:20:08 +00:00
startOffset , remaining = startOffset + int64 ( copied ) , remaining - int64 ( copied )
2020-03-27 11:50:51 +00:00
}
2020-08-16 07:49:08 +00:00
2020-08-18 03:14:40 +00:00
glog . V ( 4 ) . Infof ( "doReadAt [%d,%d), n:%v, err:%v" , offset , offset + int64 ( len ( p ) ) , n , err )
2020-08-16 07:49:08 +00:00
2020-08-18 15:50:14 +00:00
if err == nil && remaining > 0 && c . fileSize > startOffset {
2020-08-30 05:28:33 +00:00
delta := int ( min ( remaining , c . fileSize - startOffset ) )
2020-08-18 07:34:15 +00:00
glog . V ( 4 ) . Infof ( "zero2 [%d,%d) of file size %d bytes" , startOffset , startOffset + int64 ( delta ) , c . fileSize )
2020-08-18 05:46:32 +00:00
n += delta
2020-03-27 11:50:51 +00:00
}
2020-08-18 03:14:40 +00:00
2020-08-18 15:18:54 +00:00
if err == nil && offset + int64 ( len ( p ) ) > c . fileSize {
2020-08-16 07:49:08 +00:00
err = io . EOF
}
2020-08-16 22:16:46 +00:00
// fmt.Printf("~~~ filled %d, err: %v\n\n", n, err)
2020-03-27 11:50:51 +00:00
return
}
2020-10-08 05:49:04 +00:00
func ( c * ChunkReadAt ) readFromWholeChunkData ( chunkView * ChunkView , nextChunkViews ... * ChunkView ) ( chunkData [ ] byte , err error ) {
2020-10-04 03:16:42 +00:00
if c . lastChunkFileId == chunkView . FileId {
return c . lastChunkData , nil
}
2020-10-04 08:31:04 +00:00
v , doErr := c . readOneWholeChunk ( chunkView )
2020-03-28 20:43:31 +00:00
2020-10-04 08:31:04 +00:00
if doErr != nil {
2020-10-09 06:19:20 +00:00
return nil , doErr
2020-04-12 08:13:57 +00:00
}
2020-03-29 07:54:39 +00:00
2020-10-04 08:31:04 +00:00
chunkData = v . ( [ ] byte )
c . lastChunkData = chunkData
c . lastChunkFileId = chunkView . FileId
2020-10-05 21:06:18 +00:00
for _ , nextChunkView := range nextChunkViews {
if c . chunkCache != nil && nextChunkView != nil {
go c . readOneWholeChunk ( nextChunkView )
2020-10-04 08:31:04 +00:00
}
2020-10-05 21:06:18 +00:00
}
2020-10-04 08:31:04 +00:00
2020-08-16 22:16:46 +00:00
return
2020-03-29 07:54:39 +00:00
}
2020-10-04 08:31:04 +00:00
func ( c * ChunkReadAt ) readOneWholeChunk ( chunkView * ChunkView ) ( interface { } , error ) {
var err error
return c . fetchGroup . Do ( chunkView . FileId , func ( ) ( interface { } , error ) {
glog . V ( 4 ) . Infof ( "readFromWholeChunkData %s offset %d [%d,%d) size at least %d" , chunkView . FileId , chunkView . Offset , chunkView . LogicOffset , chunkView . LogicOffset + int64 ( chunkView . Size ) , chunkView . ChunkSize )
data := c . chunkCache . GetChunk ( chunkView . FileId , chunkView . ChunkSize )
if data != nil {
glog . V ( 4 ) . Infof ( "cache hit %s [%d,%d)" , chunkView . FileId , chunkView . LogicOffset - chunkView . Offset , chunkView . LogicOffset - chunkView . Offset + int64 ( len ( data ) ) )
} else {
var err error
data , err = c . doFetchFullChunkData ( chunkView )
if err != nil {
return data , err
}
c . chunkCache . SetChunk ( chunkView . FileId , data )
}
return data , err
} )
}
2020-10-04 03:16:42 +00:00
func ( c * ChunkReadAt ) doFetchFullChunkData ( chunkView * ChunkView ) ( [ ] byte , error ) {
2020-10-04 08:31:04 +00:00
2020-10-04 23:21:43 +00:00
glog . V ( 4 ) . Infof ( "+ doFetchFullChunkData %s" , chunkView . FileId )
2020-10-04 08:31:04 +00:00
2020-10-04 03:16:42 +00:00
data , err := fetchChunk ( c . lookupFileId , chunkView . FileId , chunkView . CipherKey , chunkView . IsGzipped )
2020-03-29 07:54:39 +00:00
2020-10-04 23:21:43 +00:00
glog . V ( 4 ) . Infof ( "- doFetchFullChunkData %s" , chunkView . FileId )
2020-10-04 08:31:04 +00:00
2020-10-04 03:16:42 +00:00
return data , err
2020-03-27 11:50:51 +00:00
}