From 5047bdb4a20f756f5d025be0788403dbb2db9523 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 3 Jun 2017 11:44:24 -0700 Subject: [PATCH] skip bytes cache --- weed/command/server.go | 1 - weed/command/volume.go | 3 - weed/server/volume_server.go | 4 +- weed/server/volume_server_handlers_read.go | 1 - weed/server/volume_server_handlers_sync.go | 5 +- weed/server/volume_server_handlers_write.go | 4 - weed/storage/needle.go | 2 - weed/storage/needle_byte_cache.go | 75 +----------- weed/storage/needle_read_write.go | 5 +- weed/storage/volume_read_write.go | 2 - weed/storage/volume_vacuum.go | 3 +- weed/util/bytes_pool.go | 127 -------------------- weed/util/bytes_pool_test.go | 41 ------- 13 files changed, 9 insertions(+), 264 deletions(-) delete mode 100644 weed/util/bytes_pool.go delete mode 100644 weed/util/bytes_pool_test.go diff --git a/weed/command/server.go b/weed/command/server.go index 0c5070981..c8878e9eb 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -266,7 +266,6 @@ func runServer(cmd *Command, args []string) bool { volumeNeedleMapKind, *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect, - *volumeEnableBytesCache, ) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort)) diff --git a/weed/command/volume.go b/weed/command/volume.go index a4e316ecb..ad9803974 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -36,7 +36,6 @@ type VolumeServerOptions struct { indexType *string fixJpgOrientation *bool readRedirect *bool - enableBytesCache *bool } func init() { @@ -55,7 +54,6 @@ func init() { v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.") v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") - v.enableBytesCache = cmdVolume.Flag.Bool("cache.enable", false, "direct cache instead of OS cache, cost more memory.") } var cmdVolume = &Command{ @@ -136,7 +134,6 @@ func runVolume(cmd *Command, args []string) bool { *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, v.whiteList, *v.fixJpgOrientation, *v.readRedirect, - *v.enableBytesCache, ) listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index cc06f0092..cace8d181 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -32,8 +32,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, dataCenter string, rack string, whiteList []string, fixJpgOrientation bool, - readRedirect bool, - enableBytesCache bool) *VolumeServer { + readRedirect bool) *VolumeServer { vs := &VolumeServer{ pulseSeconds: pulseSeconds, dataCenter: dataCenter, @@ -44,7 +43,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, } vs.SetMasterNode(masterNode) vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) - storage.EnableBytesCache = enableBytesCache vs.guard = security.NewGuard(whiteList, "") diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 8f50b265e..9b0fee4eb 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -72,7 +72,6 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) w.WriteHeader(http.StatusNotFound) return } - defer n.ReleaseMemory() if n.Cookie != cookie { glog.V(0).Infoln("request", r.URL.Path, "with unmaching cookie seen:", cookie, "expected:", n.Cookie, "from", r.RemoteAddr, "agent", r.UserAgent()) w.WriteHeader(http.StatusNotFound) diff --git a/weed/server/volume_server_handlers_sync.go b/weed/server/volume_server_handlers_sync.go index 68c381e28..df1fde590 100644 --- a/weed/server/volume_server_handlers_sync.go +++ b/weed/server/volume_server_handlers_sync.go @@ -50,8 +50,7 @@ func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *ht } offset := uint32(util.ParseUint64(r.FormValue("offset"), 0)) size := uint32(util.ParseUint64(r.FormValue("size"), 0)) - content, block, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size) - defer storage.ReleaseBytes(block.Bytes) + content, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return @@ -83,4 +82,4 @@ func (vs *VolumeServer) getVolumeId(volumeParameterName string, r *http.Request) } return vid, err -} \ No newline at end of file +} diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index b02a58fc8..e45c2245c 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -64,7 +64,6 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { writeJsonQuiet(w, r, http.StatusNotFound, m) return } - defer n.ReleaseMemory() if n.Cookie != cookie { glog.V(0).Infoln("delete", r.URL.Path, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) @@ -133,7 +132,6 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques Status: http.StatusNotAcceptable, Error: "ChunkManifest: not allowed in batch delete mode.", }) - n.ReleaseMemory() continue } @@ -144,7 +142,6 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques Error: "File Random Cookie does not match.", }) glog.V(0).Infoln("deleting", fid, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - n.ReleaseMemory() return } if size, err := vs.store.Delete(volumeId, n); err != nil { @@ -160,7 +157,6 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques Size: int(size)}, ) } - n.ReleaseMemory() } writeJsonQuiet(w, r, http.StatusAccepted, ret) diff --git a/weed/storage/needle.go b/weed/storage/needle.go index 29e70ff10..2ffaff4de 100644 --- a/weed/storage/needle.go +++ b/weed/storage/needle.go @@ -49,8 +49,6 @@ type Needle struct { Checksum CRC `comment:"CRC32 to check integrity"` Padding []byte `comment:"Aligned to 8 bytes"` - - rawBlock *Block // underlying supporing []byte, fetched and released into a pool } func (n *Needle) String() (str string) { diff --git a/weed/storage/needle_byte_cache.go b/weed/storage/needle_byte_cache.go index dfc32bcbf..78c1ea862 100644 --- a/weed/storage/needle_byte_cache.go +++ b/weed/storage/needle_byte_cache.go @@ -1,80 +1,11 @@ package storage import ( - "fmt" "os" - "sync/atomic" - - "github.com/hashicorp/golang-lru" - - "github.com/chrislusf/seaweedfs/weed/util" ) -var ( - EnableBytesCache = true - bytesCache *lru.Cache - bytesPool *util.BytesPool -) - -/* -There are one level of caching, and one level of pooling. - -In pooling, all []byte are fetched and returned to the pool bytesPool. - -In caching, the string~[]byte mapping is cached -*/ -func init() { - bytesPool = util.NewBytesPool() - bytesCache, _ = lru.NewWithEvict(512, func(key interface{}, value interface{}) { - value.(*Block).decreaseReference() - }) -} - -type Block struct { - Bytes []byte - refCount int32 -} - -func (block *Block) decreaseReference() { - if atomic.AddInt32(&block.refCount, -1) == 0 { - bytesPool.Put(block.Bytes) - } -} -func (block *Block) increaseReference() { - atomic.AddInt32(&block.refCount, 1) -} - -// get bytes from the LRU cache of []byte first, then from the bytes pool -// when []byte in LRU cache is evicted, it will be put back to the bytes pool -func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, block *Block, err error) { - // check cache, return if found - cacheKey := fmt.Sprintf("%d:%d:%d", r.Fd(), offset>>3, readSize) - if EnableBytesCache { - if obj, found := bytesCache.Get(cacheKey); found { - block = obj.(*Block) - block.increaseReference() - dataSlice = block.Bytes[0:readSize] - return dataSlice, block, nil - } - } - - // get the []byte from pool - b := bytesPool.Get(readSize) - // refCount = 2, one by the bytesCache, one by the actual needle object - block = &Block{Bytes: b, refCount: 2} - dataSlice = block.Bytes[0:readSize] +func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, err error) { + dataSlice = make([]byte, readSize) _, err = r.ReadAt(dataSlice, offset) - if EnableBytesCache { - bytesCache.Add(cacheKey, block) - } - return dataSlice, block, err -} - -func (n *Needle) ReleaseMemory() { - if n.rawBlock != nil { - n.rawBlock.decreaseReference() - } -} -func ReleaseBytes(b []byte) { - bytesPool.Put(b) + return dataSlice, err } diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go index 4f03ce396..ee7cc6046 100644 --- a/weed/storage/needle_read_write.go +++ b/weed/storage/needle_read_write.go @@ -151,16 +151,15 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i return 0, 0, fmt.Errorf("Unsupported Version! (%d)", version) } -func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) { +func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, err error) { return getBytesForFileBlock(r, offset, int(getActualSize(size))) } func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) { - bytes, block, err := ReadNeedleBlob(r, offset, size) + bytes, err := ReadNeedleBlob(r, offset, size) if err != nil { return err } - n.rawBlock = block n.ParseNeedleHeader(bytes) if n.Size != size { return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size) diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 2314bc815..16d8b6d04 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -25,7 +25,6 @@ func (v *Volume) isFileUnchanged(n *Needle) bool { glog.V(0).Infof("Failed to check updated file %v", err) return false } - defer oldNeedle.ReleaseMemory() if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) { n.DataSize = oldNeedle.DataSize return true @@ -172,7 +171,6 @@ func (v *Volume) readNeedle(n *Needle) (int, error) { if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) { return bytesRead, nil } - n.ReleaseMemory() return -1, errors.New("Not Found") } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index f6f68d59b..22c117c41 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -178,7 +178,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI if incre_idx_entry.offset != 0 && incre_idx_entry.size != 0 { //even the needle cache in memory is hit, the need_bytes is correct var needle_bytes []byte - needle_bytes, _, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size) + needle_bytes, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size) if err != nil { return } @@ -291,7 +291,6 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { n := new(Needle) n.ReadData(v.dataFile, int64(offset)*NeedlePaddingSize, size, v.Version()) - defer n.ReleaseMemory() if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { return nil diff --git a/weed/util/bytes_pool.go b/weed/util/bytes_pool.go deleted file mode 100644 index 58ed6feca..000000000 --- a/weed/util/bytes_pool.go +++ /dev/null @@ -1,127 +0,0 @@ -package util - -import ( - "bytes" - "fmt" - "sync" - "sync/atomic" - "time" -) - -var ( - ChunkSizes = []int{ - 1 << 4, // index 0, 16 bytes, inclusive - 1 << 6, // index 1, 64 bytes - 1 << 8, // index 2, 256 bytes - 1 << 10, // index 3, 1K bytes - 1 << 12, // index 4, 4K bytes - 1 << 14, // index 5, 16K bytes - 1 << 16, // index 6, 64K bytes - 1 << 18, // index 7, 256K bytes - 1 << 20, // index 8, 1M bytes - 1 << 22, // index 9, 4M bytes - 1 << 24, // index 10, 16M bytes - 1 << 26, // index 11, 64M bytes - 1 << 28, // index 12, 128M bytes - } - - _DEBUG = false -) - -type BytesPool struct { - chunkPools []*byteChunkPool -} - -func NewBytesPool() *BytesPool { - var bp BytesPool - for _, size := range ChunkSizes { - bp.chunkPools = append(bp.chunkPools, newByteChunkPool(size)) - } - ret := &bp - if _DEBUG { - t := time.NewTicker(10 * time.Second) - go func() { - for { - println("buffer:", ret.String()) - <-t.C - } - }() - } - return ret -} - -func (m *BytesPool) String() string { - var buf bytes.Buffer - for index, size := range ChunkSizes { - if m.chunkPools[index].count > 0 { - buf.WriteString(fmt.Sprintf("size:%d count:%d\n", size, m.chunkPools[index].count)) - } - } - return buf.String() -} - -func findChunkPoolIndex(size int) int { - if size <= 0 { - return -1 - } - size = (size - 1) >> 4 - ret := 0 - for size > 0 { - size = size >> 2 - ret = ret + 1 - } - if ret >= len(ChunkSizes) { - return -1 - } - return ret -} - -func (m *BytesPool) Get(size int) []byte { - index := findChunkPoolIndex(size) - // println("get index:", index) - if index < 0 { - return make([]byte, size) - } - return m.chunkPools[index].Get() -} - -func (m *BytesPool) Put(b []byte) { - index := findChunkPoolIndex(len(b)) - // println("put index:", index) - if index < 0 { - return - } - m.chunkPools[index].Put(b) -} - -// a pool of fix-sized []byte chunks. The pool size is managed by Go GC -type byteChunkPool struct { - sync.Pool - chunkSizeLimit int - count int64 -} - -var count int - -func newByteChunkPool(chunkSizeLimit int) *byteChunkPool { - var m byteChunkPool - m.chunkSizeLimit = chunkSizeLimit - m.Pool.New = func() interface{} { - count++ - // println("creating []byte size", m.chunkSizeLimit, "new", count, "count", m.count) - return make([]byte, m.chunkSizeLimit) - } - return &m -} - -func (m *byteChunkPool) Get() []byte { - // println("before get size:", m.chunkSizeLimit, "count:", m.count) - atomic.AddInt64(&m.count, 1) - return m.Pool.Get().([]byte) -} - -func (m *byteChunkPool) Put(b []byte) { - atomic.AddInt64(&m.count, -1) - // println("after put get size:", m.chunkSizeLimit, "count:", m.count) - m.Pool.Put(b) -} diff --git a/weed/util/bytes_pool_test.go b/weed/util/bytes_pool_test.go deleted file mode 100644 index 3f37c16cf..000000000 --- a/weed/util/bytes_pool_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package util - -import ( - "testing" -) - -func TestTTLReadWrite(t *testing.T) { - var tests = []struct { - n int // input - expected int // expected result - }{ - {0, -1}, - {1, 0}, - {1 << 4, 0}, - {1 << 6, 1}, - {1 << 8, 2}, - {1 << 10, 3}, - {1 << 12, 4}, - {1 << 14, 5}, - {1 << 16, 6}, - {1 << 18, 7}, - {1<<4 + 1, 1}, - {1<<6 + 1, 2}, - {1<<8 + 1, 3}, - {1<<10 + 1, 4}, - {1<<12 + 1, 5}, - {1<<14 + 1, 6}, - {1<<16 + 1, 7}, - {1<<18 + 1, 8}, - {1<<28 - 1, 12}, - {1 << 28, 12}, - {1<<28 + 2134, -1}, - {1080, 4}, - } - for _, tt := range tests { - actual := findChunkPoolIndex(tt.n) - if actual != tt.expected { - t.Errorf("findChunkPoolIndex(%d): expected %d, actual %d", tt.n, tt.expected, actual) - } - } -}