2020-03-30 04:07:55 +00:00
package filer2
2020-03-27 11:50:51 +00:00
import (
"context"
"fmt"
"io"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
2020-04-11 19:45:24 +00:00
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
2020-03-27 11:50:51 +00:00
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
type ChunkReadAt struct {
masterClient * wdclient . MasterClient
2020-03-30 04:07:55 +00:00
chunkViews [ ] * ChunkView
2020-03-27 11:50:51 +00:00
buffer [ ] byte
2020-08-16 22:16:46 +00:00
bufferFileId string
2020-03-27 11:50:51 +00:00
lookupFileId func ( fileId string ) ( targetUrl string , err error )
readerLock sync . Mutex
2020-08-16 07:49:08 +00:00
fileSize int64
2020-03-28 20:43:31 +00:00
2020-04-11 19:45:24 +00:00
chunkCache * chunk_cache . ChunkCache
2020-03-27 11:50:51 +00:00
}
// var _ = io.ReaderAt(&ChunkReadAt{})
2020-04-30 00:40:08 +00:00
type LookupFileIdFunctionType func ( fileId string ) ( targetUrl string , err error )
func LookupFn ( filerClient filer_pb . FilerClient ) LookupFileIdFunctionType {
return func ( fileId string ) ( targetUrl string , err error ) {
err = filerClient . WithFilerClient ( func ( client filer_pb . SeaweedFilerClient ) error {
vid := VolumeId ( fileId )
resp , err := client . LookupVolume ( context . Background ( ) , & filer_pb . LookupVolumeRequest {
VolumeIds : [ ] string { vid } ,
} )
if err != nil {
return err
}
locations := resp . LocationsMap [ vid ]
if locations == nil || len ( locations . Locations ) == 0 {
glog . V ( 0 ) . Infof ( "failed to locate %s" , fileId )
return fmt . Errorf ( "failed to locate %s" , fileId )
}
volumeServerAddress := filerClient . AdjustedUrl ( locations . Locations [ 0 ] . Url )
targetUrl = fmt . Sprintf ( "http://%s/%s" , volumeServerAddress , fileId )
return nil
} )
return
}
}
2020-08-16 07:49:08 +00:00
func NewChunkReaderAtFromClient ( filerClient filer_pb . FilerClient , chunkViews [ ] * ChunkView , chunkCache * chunk_cache . ChunkCache , fileSize int64 ) * ChunkReadAt {
2020-03-27 11:50:51 +00:00
return & ChunkReadAt {
2020-05-10 10:50:30 +00:00
chunkViews : chunkViews ,
2020-04-30 00:40:08 +00:00
lookupFileId : LookupFn ( filerClient ) ,
2020-03-28 21:07:16 +00:00
chunkCache : chunkCache ,
2020-08-16 07:49:08 +00:00
fileSize : fileSize ,
2020-03-27 11:50:51 +00:00
}
}
func ( c * ChunkReadAt ) ReadAt ( p [ ] byte , offset int64 ) ( n int , err error ) {
c . readerLock . Lock ( )
defer c . readerLock . Unlock ( )
for n < len ( p ) && err == nil {
readCount , readErr := c . doReadAt ( p [ n : ] , offset + int64 ( n ) )
n += readCount
err = readErr
}
return
}
func ( c * ChunkReadAt ) doReadAt ( p [ ] byte , offset int64 ) ( n int , err error ) {
2020-08-17 04:07:46 +00:00
var startOffset = offset
2020-03-27 11:50:51 +00:00
for _ , chunk := range c . chunkViews {
2020-08-17 04:07:46 +00:00
if startOffset < min ( chunk . LogicOffset , int64 ( len ( p ) ) + offset ) {
gap := int ( min ( chunk . LogicOffset , int64 ( len ( p ) ) + offset ) - startOffset )
glog . V ( 4 ) . Infof ( "zero [%d,%d)" , n , n + gap )
n += gap
startOffset = chunk . LogicOffset
}
2020-08-16 22:16:46 +00:00
// fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
2020-08-17 04:07:46 +00:00
chunkStart , chunkStop := max ( chunk . LogicOffset , offset ) , min ( chunk . LogicOffset + int64 ( chunk . Size ) , offset + int64 ( len ( p ) ) )
if chunkStart >= chunkStop {
continue
}
glog . V ( 4 ) . Infof ( "read [%d,%d), chunk %s [%d,%d)\n" , chunkStart , chunkStop , chunk . FileId , chunk . LogicOffset - chunk . Offset , chunk . LogicOffset - chunk . Offset + int64 ( chunk . Size ) )
c . buffer , err = c . fetchWholeChunkData ( chunk )
if err != nil {
glog . Errorf ( "fetching chunk %+v: %v\n" , chunk , err )
return
2020-03-27 11:50:51 +00:00
}
2020-08-17 04:07:46 +00:00
c . bufferFileId = chunk . FileId
bufferOffset := chunkStart - chunk . LogicOffset + chunk . Offset
copied := copy ( p [ chunkStart - offset : chunkStop - offset ] , c . buffer [ bufferOffset : bufferOffset + chunkStop - chunkStart ] )
n += copied
startOffset += int64 ( copied )
2020-03-27 11:50:51 +00:00
}
2020-08-16 07:49:08 +00:00
2020-08-16 22:16:46 +00:00
// fmt.Printf("> doReadAt [%d,%d), buffer %s:%d, found:%v, err:%v\n", offset, offset+int64(len(p)), c.bufferFileId, int64(len(c.buffer)), found, err)
2020-08-16 07:49:08 +00:00
2020-08-17 04:07:46 +00:00
if startOffset < min ( c . fileSize , int64 ( len ( p ) ) + offset ) {
gap := int ( min ( c . fileSize , int64 ( len ( p ) ) + offset ) - startOffset )
glog . V ( 4 ) . Infof ( "zero2 [%d,%d)" , n , n + gap )
n += gap
2020-03-27 11:50:51 +00:00
}
2020-08-16 07:49:08 +00:00
if offset + int64 ( n ) >= c . fileSize {
err = io . EOF
}
2020-08-16 22:16:46 +00:00
// fmt.Printf("~~~ filled %d, err: %v\n\n", n, err)
2020-03-27 11:50:51 +00:00
return
}
2020-08-16 22:16:46 +00:00
func ( c * ChunkReadAt ) fetchWholeChunkData ( chunkView * ChunkView ) ( chunkData [ ] byte , err error ) {
2020-03-27 11:50:51 +00:00
2020-08-16 22:16:46 +00:00
glog . V ( 4 ) . Infof ( "fetchWholeChunkData %s offset %d [%d,%d)\n" , chunkView . FileId , chunkView . Offset , chunkView . LogicOffset , chunkView . LogicOffset + int64 ( chunkView . Size ) )
2020-03-28 20:43:31 +00:00
2020-08-16 22:16:46 +00:00
chunkData = c . chunkCache . GetChunk ( chunkView . FileId , chunkView . ChunkSize )
2020-03-28 20:43:31 +00:00
if chunkData != nil {
2020-08-17 04:07:46 +00:00
glog . V ( 5 ) . Infof ( "cache hit %s [%d,%d)" , chunkView . FileId , chunkView . LogicOffset - chunkView . Offset , chunkView . LogicOffset - chunkView . Offset + int64 ( len ( chunkData ) ) )
2020-03-29 07:54:39 +00:00
} else {
chunkData , err = c . doFetchFullChunkData ( chunkView . FileId , chunkView . CipherKey , chunkView . IsGzipped )
if err != nil {
2020-08-16 22:16:46 +00:00
return
2020-03-29 07:54:39 +00:00
}
2020-04-12 08:13:57 +00:00
c . chunkCache . SetChunk ( chunkView . FileId , chunkData )
}
2020-03-29 07:54:39 +00:00
2020-08-16 22:16:46 +00:00
return
2020-03-29 07:54:39 +00:00
}
func ( c * ChunkReadAt ) doFetchFullChunkData ( fileId string , cipherKey [ ] byte , isGzipped bool ) ( [ ] byte , error ) {
2020-07-20 00:59:43 +00:00
return fetchChunk ( c . lookupFileId , fileId , cipherKey , isGzipped )
2020-03-27 11:50:51 +00:00
}