diff --git a/go/storage/needle.go b/go/storage/needle.go index 6db99b7df..612a89fed 100644 --- a/go/storage/needle.go +++ b/go/storage/needle.go @@ -13,6 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/images" "github.com/chrislusf/seaweedfs/go/operation" + "github.com/chrislusf/seaweedfs/go/util" ) const ( @@ -22,6 +23,14 @@ const ( MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 ) +var ( + BYTESPOOL *util.BytesPool +) + +func init() { + BYTESPOOL = util.NewBytesPool() +} + /* * A Needle means a uploaded and stored file. * Needle file size is limited to 4GB for now. @@ -43,6 +52,8 @@ type Needle struct { Checksum CRC `comment:"CRC32 to check integrity"` Padding []byte `comment:"Aligned to 8 bytes"` + + rawBytes []byte // underlying supporing []byte, fetched and released into a pool } func (n *Needle) String() (str string) { diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index ccfe1d498..8d051dea3 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -136,15 +136,33 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { return 0, fmt.Errorf("Unsupported Version! (%d)", version) } -func ReadNeedleBlob(r *os.File, offset int64, size uint32) (bytes []byte, err error) { +func ReleaseBytes(b []byte) { + // println("Releasing", len(b)) + BYTESPOOL.Put(b) +} + +func BorrwoBytes(size int) []byte { + ret := BYTESPOOL.Get(size) + // println("Reading", len(ret)) + return ret +} + +func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice, rawBytes []byte, err error) { padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize) - bytes = make([]byte, NeedleHeaderSize+size+NeedleChecksumSize+padding) - _, err = r.ReadAt(bytes, offset) + readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding + rawBytes = BorrwoBytes(int(readSize)) + dataSlice = rawBytes[0:int(readSize)] + _, err = r.ReadAt(dataSlice, offset) return } +func (n *Needle) ReleaseMemory() { + ReleaseBytes(n.rawBytes) +} + func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) { - bytes, err := ReadNeedleBlob(r, offset, size) + bytes, rawBytes, err := ReadNeedleBlob(r, offset, size) + n.rawBytes = rawBytes if err != nil { return err } diff --git a/go/storage/volume.go b/go/storage/volume.go index 5c6b12e9b..af552a10f 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -159,6 +159,7 @@ func (v *Volume) isFileUnchanged(n *Needle) bool { if ok && nv.Offset > 0 { oldNeedle := new(Needle) err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + defer oldNeedle.ReleaseMemory() if err != nil { glog.V(0).Infof("Failed to check updated file %v", err) return false @@ -288,6 +289,7 @@ func (v *Volume) readNeedle(n *Needle) (int, error) { } err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) if err != nil { + n.ReleaseMemory() return 0, err } bytesRead := len(n.Data) @@ -304,6 +306,7 @@ func (v *Volume) readNeedle(n *Needle) (int, error) { if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) { return bytesRead, nil } + n.ReleaseMemory() return -1, errors.New("Not Found") } diff --git a/go/util/bytes_pool.go b/go/util/bytes_pool.go new file mode 100644 index 000000000..58ed6feca --- /dev/null +++ b/go/util/bytes_pool.go @@ -0,0 +1,127 @@ +package util + +import ( + "bytes" + "fmt" + "sync" + "sync/atomic" + "time" +) + +var ( + ChunkSizes = []int{ + 1 << 4, // index 0, 16 bytes, inclusive + 1 << 6, // index 1, 64 bytes + 1 << 8, // index 2, 256 bytes + 1 << 10, // index 3, 1K bytes + 1 << 12, // index 4, 4K bytes + 1 << 14, // index 5, 16K bytes + 1 << 16, // index 6, 64K bytes + 1 << 18, // index 7, 256K bytes + 1 << 20, // index 8, 1M bytes + 1 << 22, // index 9, 4M bytes + 1 << 24, // index 10, 16M bytes + 1 << 26, // index 11, 64M bytes + 1 << 28, // index 12, 128M bytes + } + + _DEBUG = false +) + +type BytesPool struct { + chunkPools []*byteChunkPool +} + +func NewBytesPool() *BytesPool { + var bp BytesPool + for _, size := range ChunkSizes { + bp.chunkPools = append(bp.chunkPools, newByteChunkPool(size)) + } + ret := &bp + if _DEBUG { + t := time.NewTicker(10 * time.Second) + go func() { + for { + println("buffer:", ret.String()) + <-t.C + } + }() + } + return ret +} + +func (m *BytesPool) String() string { + var buf bytes.Buffer + for index, size := range ChunkSizes { + if m.chunkPools[index].count > 0 { + buf.WriteString(fmt.Sprintf("size:%d count:%d\n", size, m.chunkPools[index].count)) + } + } + return buf.String() +} + +func findChunkPoolIndex(size int) int { + if size <= 0 { + return -1 + } + size = (size - 1) >> 4 + ret := 0 + for size > 0 { + size = size >> 2 + ret = ret + 1 + } + if ret >= len(ChunkSizes) { + return -1 + } + return ret +} + +func (m *BytesPool) Get(size int) []byte { + index := findChunkPoolIndex(size) + // println("get index:", index) + if index < 0 { + return make([]byte, size) + } + return m.chunkPools[index].Get() +} + +func (m *BytesPool) Put(b []byte) { + index := findChunkPoolIndex(len(b)) + // println("put index:", index) + if index < 0 { + return + } + m.chunkPools[index].Put(b) +} + +// a pool of fix-sized []byte chunks. The pool size is managed by Go GC +type byteChunkPool struct { + sync.Pool + chunkSizeLimit int + count int64 +} + +var count int + +func newByteChunkPool(chunkSizeLimit int) *byteChunkPool { + var m byteChunkPool + m.chunkSizeLimit = chunkSizeLimit + m.Pool.New = func() interface{} { + count++ + // println("creating []byte size", m.chunkSizeLimit, "new", count, "count", m.count) + return make([]byte, m.chunkSizeLimit) + } + return &m +} + +func (m *byteChunkPool) Get() []byte { + // println("before get size:", m.chunkSizeLimit, "count:", m.count) + atomic.AddInt64(&m.count, 1) + return m.Pool.Get().([]byte) +} + +func (m *byteChunkPool) Put(b []byte) { + atomic.AddInt64(&m.count, -1) + // println("after put get size:", m.chunkSizeLimit, "count:", m.count) + m.Pool.Put(b) +} diff --git a/go/util/bytes_pool_test.go b/go/util/bytes_pool_test.go new file mode 100644 index 000000000..3f37c16cf --- /dev/null +++ b/go/util/bytes_pool_test.go @@ -0,0 +1,41 @@ +package util + +import ( + "testing" +) + +func TestTTLReadWrite(t *testing.T) { + var tests = []struct { + n int // input + expected int // expected result + }{ + {0, -1}, + {1, 0}, + {1 << 4, 0}, + {1 << 6, 1}, + {1 << 8, 2}, + {1 << 10, 3}, + {1 << 12, 4}, + {1 << 14, 5}, + {1 << 16, 6}, + {1 << 18, 7}, + {1<<4 + 1, 1}, + {1<<6 + 1, 2}, + {1<<8 + 1, 3}, + {1<<10 + 1, 4}, + {1<<12 + 1, 5}, + {1<<14 + 1, 6}, + {1<<16 + 1, 7}, + {1<<18 + 1, 8}, + {1<<28 - 1, 12}, + {1 << 28, 12}, + {1<<28 + 2134, -1}, + {1080, 4}, + } + for _, tt := range tests { + actual := findChunkPoolIndex(tt.n) + if actual != tt.expected { + t.Errorf("findChunkPoolIndex(%d): expected %d, actual %d", tt.n, tt.expected, actual) + } + } +} diff --git a/go/weed/signal_handling.go b/go/weed/signal_handling.go index 2004bb088..a8f166382 100644 --- a/go/weed/signal_handling.go +++ b/go/weed/signal_handling.go @@ -20,7 +20,8 @@ func OnInterrupt(fn func()) { // syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, - syscall.SIGQUIT) + // syscall.SIGQUIT, + ) go func() { for _ = range signalChan { fn() diff --git a/go/weed/weed_server/volume_server_handlers_read.go b/go/weed/weed_server/volume_server_handlers_read.go index 52b22426f..6ce648062 100644 --- a/go/weed/weed_server/volume_server_handlers_read.go +++ b/go/weed/weed_server/volume_server_handlers_read.go @@ -66,6 +66,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) cookie := n.Cookie count, e := vs.store.ReadVolumeNeedle(volumeId, n) glog.V(4).Infoln("read bytes", count, "error", e) + defer n.ReleaseMemory() if e != nil || count <= 0 { glog.V(0).Infoln("read error:", e, r.URL.Path) w.WriteHeader(http.StatusNotFound) diff --git a/go/weed/weed_server/volume_server_handlers_sync.go b/go/weed/weed_server/volume_server_handlers_sync.go index c650e5f53..cb24f7cd6 100644 --- a/go/weed/weed_server/volume_server_handlers_sync.go +++ b/go/weed/weed_server/volume_server_handlers_sync.go @@ -50,7 +50,8 @@ func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *ht } offset := uint32(util.ParseUint64(r.FormValue("offset"), 0)) size := uint32(util.ParseUint64(r.FormValue("size"), 0)) - content, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size) + content, rawBytes, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size) + defer storage.ReleaseBytes(rawBytes) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return diff --git a/go/weed/weed_server/volume_server_handlers_write.go b/go/weed/weed_server/volume_server_handlers_write.go index 3d2afaf77..58733ea11 100644 --- a/go/weed/weed_server/volume_server_handlers_write.go +++ b/go/weed/weed_server/volume_server_handlers_write.go @@ -55,7 +55,9 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { cookie := n.Cookie - if _, ok := vs.store.ReadVolumeNeedle(volumeId, n); ok != nil { + _, ok := vs.store.ReadVolumeNeedle(volumeId, n) + defer n.ReleaseMemory() + if ok != nil { m := make(map[string]uint32) m["size"] = 0 writeJsonQuiet(w, r, http.StatusNotFound, m) @@ -120,6 +122,7 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques Status: http.StatusNotFound, Error: err.Error(), }) + n.ReleaseMemory() continue } @@ -129,6 +132,7 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques Status: http.StatusNotAcceptable, Error: "ChunkManifest: not allowed in batch delete mode.", }) + n.ReleaseMemory() continue } @@ -139,6 +143,7 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques Error: "File Random Cookie does not match.", }) glog.V(0).Infoln("deleting", fid, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + n.ReleaseMemory() return } if size, err := vs.store.Delete(volumeId, n); err != nil { @@ -154,6 +159,7 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques Size: int(size)}, ) } + n.ReleaseMemory() } writeJsonQuiet(w, r, http.StatusAccepted, ret)