diff --git a/go.mod b/go.mod index fbb9764ee..21320ab0e 100644 --- a/go.mod +++ b/go.mod @@ -78,6 +78,7 @@ require ( gocloud.dev/pubsub/rabbitpubsub v0.16.0 golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect golang.org/x/net v0.0.0-20200202094626-16171245cfb2 + golang.org/x/sync v0.0.0-20200930132711-30421366ff76 // indirect golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 google.golang.org/api v0.9.0 diff --git a/go.sum b/go.sum index b9ceb80fc..3ac47ec22 100644 --- a/go.sum +++ b/go.sum @@ -615,6 +615,8 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190412183630-56d357773e84/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20200930132711-30421366ff76 h1:JnxiSYT3Nm0BT2a8CyvYyM6cnrWpidecD1UuSYbhKm0= +golang.org/x/sync v0.0.0-20200930132711-30421366ff76/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 5ffc3a024..1c11f718a 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -3,6 +3,7 @@ package filer import ( "context" "fmt" + "github.com/golang/groupcache/singleflight" "io" "math/rand" "sync" @@ -18,12 +19,12 @@ type ChunkReadAt struct { chunkViews []*ChunkView lookupFileId func(fileId string) (targetUrl string, err error) readerLock sync.Mutex - fetcherLock sync.Mutex fileSize int64 - lastChunkFileId string - lastChunkData []byte - chunkCache chunk_cache.ChunkCache + fetchGroup singleflight.Group + lastChunkFileId string + lastChunkData []byte + chunkCache chunk_cache.ChunkCache } // var _ = io.ReaderAt(&ChunkReadAt{}) @@ -88,10 +89,16 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { var buffer []byte startOffset, remaining := offset, int64(len(p)) + var nextChunk *ChunkView for i, chunk := range c.chunkViews { if remaining <= 0 { break } + if i+1 < len(c.chunkViews) { + nextChunk = c.chunkViews[i+1] + } else { + nextChunk = nil + } if startOffset < chunk.LogicOffset { gap := int(chunk.LogicOffset - startOffset) glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap)) @@ -107,7 +114,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { continue } glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) - buffer, err = c.readFromWholeChunkData(chunk) + buffer, err = c.readFromWholeChunkData(chunk, nextChunk) if err != nil { glog.Errorf("fetching chunk %+v: %v\n", chunk, err) return @@ -135,36 +142,63 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { } -func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView) (chunkData []byte, err error) { - - c.fetcherLock.Lock() - defer c.fetcherLock.Unlock() +func (c *ChunkReadAt) readFromWholeChunkData(chunkView, nextChunkView *ChunkView) (chunkData []byte, err error) { if c.lastChunkFileId == chunkView.FileId { return c.lastChunkData, nil } - glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize) + v, doErr := c.readOneWholeChunk(chunkView) - chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) - if chunkData != nil { - glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData))) - } else { - chunkData, err = c.doFetchFullChunkData(chunkView) - if err != nil { - return - } - c.chunkCache.SetChunk(chunkView.FileId, chunkData) - c.lastChunkData = chunkData - c.lastChunkFileId = chunkView.FileId + if doErr != nil { + return } + chunkData = v.([]byte) + + c.lastChunkData = chunkData + c.lastChunkFileId = chunkView.FileId + + go func() { + if c.chunkCache != nil && nextChunkView != nil { + c.readOneWholeChunk(nextChunkView) + } + }() + return } +func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, error) { + + var err error + + return c.fetchGroup.Do(chunkView.FileId, func() (interface{}, error) { + + glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize) + + data := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) + if data != nil { + glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data))) + } else { + var err error + data, err = c.doFetchFullChunkData(chunkView) + if err != nil { + return data, err + } + c.chunkCache.SetChunk(chunkView.FileId, data) + } + return data, err + }) +} + func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) { + + glog.V(2).Infof("+ doFetchFullChunkData %s", chunkView.FileId) + data, err := fetchChunk(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) + glog.V(2).Infof("- doFetchFullChunkData %s", chunkView.FileId) + return data, err }