mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Rename mem_map to mMap, remove some in_memory variables being passed around, added MemoryMapped member to volume struct
This commit is contained in:
parent
d637d86d22
commit
1f01eb78e8
|
@ -119,7 +119,7 @@ func runBackup(cmd *Command, args []string) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
|
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
|
||||||
if err = v.Compact(0, 0, false); err != nil {
|
if err = v.Compact(0, 0); err != nil {
|
||||||
fmt.Printf("Compact Volume before synchronizing %v\n", err)
|
fmt.Printf("Compact Volume before synchronizing %v\n", err)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,6 @@ var (
|
||||||
compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.")
|
compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.")
|
||||||
compactMethod = cmdCompact.Flag.Int("method", 0, "option to choose which compact method. use 0 or 1.")
|
compactMethod = cmdCompact.Flag.Int("method", 0, "option to choose which compact method. use 0 or 1.")
|
||||||
compactVolumePreallocate = cmdCompact.Flag.Int64("preallocateMB", 0, "preallocate volume disk space")
|
compactVolumePreallocate = cmdCompact.Flag.Int64("preallocateMB", 0, "preallocate volume disk space")
|
||||||
compactVolumeInMemory = cmdCompact.Flag.Bool("volumeInMemory", false, "create the volume in memory")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func runCompact(cmd *Command, args []string) bool {
|
func runCompact(cmd *Command, args []string) bool {
|
||||||
|
@ -36,16 +35,15 @@ func runCompact(cmd *Command, args []string) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
preallocate := *compactVolumePreallocate * (1 << 20)
|
preallocate := *compactVolumePreallocate * (1 << 20)
|
||||||
inMemory := *compactVolumeInMemory
|
|
||||||
|
|
||||||
vid := needle.VolumeId(*compactVolumeId)
|
vid := needle.VolumeId(*compactVolumeId)
|
||||||
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid,
|
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid,
|
||||||
storage.NeedleMapInMemory, nil, nil, preallocate, inMemory)
|
storage.NeedleMapInMemory, nil, nil, preallocate, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("Load Volume [ERROR] %s\n", err)
|
glog.Fatalf("Load Volume [ERROR] %s\n", err)
|
||||||
}
|
}
|
||||||
if *compactMethod == 0 {
|
if *compactMethod == 0 {
|
||||||
if err = v.Compact(preallocate, 0, inMemory); err != nil {
|
if err = v.Compact(preallocate, 0); err != nil {
|
||||||
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
|
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -28,7 +28,7 @@ func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_ser
|
||||||
|
|
||||||
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
|
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
|
||||||
|
|
||||||
err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond, req.InMemory)
|
err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("compact volume %d: %v", req.VolumeId, err)
|
glog.Errorf("compact volume %d: %v", req.VolumeId, err)
|
||||||
|
|
23
weed/storage/memory_map/memory_map.go
Normal file
23
weed/storage/memory_map/memory_map.go
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
// +build !windows
|
||||||
|
|
||||||
|
package memory_map
|
||||||
|
|
||||||
|
import "os"
|
||||||
|
|
||||||
|
type MemoryBuffer struct {
|
||||||
|
aligned_length uint64
|
||||||
|
length uint64
|
||||||
|
aligned_ptr uintptr
|
||||||
|
ptr uintptr
|
||||||
|
Buffer []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type MemoryMap struct {
|
||||||
|
File *os.File
|
||||||
|
file_memory_map_handle uintptr
|
||||||
|
write_map_views []MemoryBuffer
|
||||||
|
max_length uint64
|
||||||
|
End_Of_File int64
|
||||||
|
}
|
||||||
|
|
||||||
|
var FileMemoryMap = make(map[string]MemoryMap)
|
|
@ -40,31 +40,31 @@ var system_info, err = getSystemInfo()
|
||||||
|
|
||||||
var chunk_size = uint64(system_info.dwAllocationGranularity) * 512
|
var chunk_size = uint64(system_info.dwAllocationGranularity) * 512
|
||||||
|
|
||||||
func (mem_map *MemoryMap) CreateMemoryMap(file *os.File, maxlength uint64) {
|
func (mMap *MemoryMap) CreateMemoryMap(file *os.File, maxlength uint64) {
|
||||||
|
|
||||||
maxlength_high := uint32(maxlength >> 32)
|
maxlength_high := uint32(maxlength >> 32)
|
||||||
maxlength_low := uint32(maxlength & 0xFFFFFFFF)
|
maxlength_low := uint32(maxlength & 0xFFFFFFFF)
|
||||||
file_memory_map_handle, err := windows.CreateFileMapping(windows.Handle(file.Fd()), nil, windows.PAGE_READWRITE, maxlength_high, maxlength_low, nil)
|
file_memory_map_handle, err := windows.CreateFileMapping(windows.Handle(file.Fd()), nil, windows.PAGE_READWRITE, maxlength_high, maxlength_low, nil)
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
mem_map.File = file
|
mMap.File = file
|
||||||
mem_map.file_memory_map_handle = uintptr(file_memory_map_handle)
|
mMap.file_memory_map_handle = uintptr(file_memory_map_handle)
|
||||||
mem_map.write_map_views = make([]MemoryBuffer, 0, maxlength/chunk_size)
|
mMap.write_map_views = make([]MemoryBuffer, 0, maxlength/chunk_size)
|
||||||
mem_map.max_length = maxlength
|
mMap.max_length = maxlength
|
||||||
mem_map.End_Of_File = -1
|
mMap.End_Of_File = -1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mem_map *MemoryMap) DeleteFileAndMemoryMap() {
|
func (mMap *MemoryMap) DeleteFileAndMemoryMap() {
|
||||||
windows.CloseHandle(windows.Handle(mem_map.file_memory_map_handle))
|
windows.CloseHandle(windows.Handle(mMap.file_memory_map_handle))
|
||||||
windows.CloseHandle(windows.Handle(mem_map.File.Fd()))
|
windows.CloseHandle(windows.Handle(mMap.File.Fd()))
|
||||||
|
|
||||||
for _, view := range mem_map.write_map_views {
|
for _, view := range mMap.write_map_views {
|
||||||
view.ReleaseMemory()
|
view.ReleaseMemory()
|
||||||
}
|
}
|
||||||
|
|
||||||
mem_map.write_map_views = nil
|
mMap.write_map_views = nil
|
||||||
mem_map.max_length = 0
|
mMap.max_length = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func min(x, y uint64) uint64 {
|
func min(x, y uint64) uint64 {
|
||||||
|
@ -74,11 +74,11 @@ func min(x, y uint64) uint64 {
|
||||||
return y
|
return y
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mem_map *MemoryMap) WriteMemory(offset uint64, length uint64, data []byte) {
|
func (mMap *MemoryMap) WriteMemory(offset uint64, length uint64, data []byte) {
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if ((offset+length)/chunk_size)+1 > uint64(len(mem_map.write_map_views)) {
|
if ((offset+length)/chunk_size)+1 > uint64(len(mMap.write_map_views)) {
|
||||||
allocateChunk(mem_map)
|
allocateChunk(mMap)
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -91,7 +91,7 @@ func (mem_map *MemoryMap) WriteMemory(offset uint64, length uint64, data []byte)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
write_end := min((remaining_length + slice_offset), chunk_size)
|
write_end := min((remaining_length + slice_offset), chunk_size)
|
||||||
copy(mem_map.write_map_views[slice_index].Buffer[slice_offset:write_end], data[data_offset:])
|
copy(mMap.write_map_views[slice_index].Buffer[slice_offset:write_end], data[data_offset:])
|
||||||
remaining_length -= (write_end - slice_offset)
|
remaining_length -= (write_end - slice_offset)
|
||||||
data_offset += (write_end - slice_offset)
|
data_offset += (write_end - slice_offset)
|
||||||
|
|
||||||
|
@ -103,13 +103,13 @@ func (mem_map *MemoryMap) WriteMemory(offset uint64, length uint64, data []byte)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if mem_map.End_Of_File < int64(offset+length-1) {
|
if mMap.End_Of_File < int64(offset+length-1) {
|
||||||
mem_map.End_Of_File = int64(offset + length - 1)
|
mMap.End_Of_File = int64(offset + length - 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mem_map *MemoryMap) ReadMemory(offset uint64, length uint64) (MemoryBuffer, error) {
|
func (mMap *MemoryMap) ReadMemory(offset uint64, length uint64) (MemoryBuffer, error) {
|
||||||
return allocate(windows.Handle(mem_map.file_memory_map_handle), offset, length, false)
|
return allocate(windows.Handle(mMap.file_memory_map_handle), offset, length, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mem_buffer *MemoryBuffer) ReleaseMemory() {
|
func (mem_buffer *MemoryBuffer) ReleaseMemory() {
|
||||||
|
@ -122,13 +122,13 @@ func (mem_buffer *MemoryBuffer) ReleaseMemory() {
|
||||||
mem_buffer.Buffer = nil
|
mem_buffer.Buffer = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func allocateChunk(mem_map *MemoryMap) {
|
func allocateChunk(mMap *MemoryMap) {
|
||||||
|
|
||||||
start := uint64(len(mem_map.write_map_views)) * chunk_size
|
start := uint64(len(mMap.write_map_views)) * chunk_size
|
||||||
mem_buffer, err := allocate(windows.Handle(mem_map.file_memory_map_handle), start, chunk_size, true)
|
mem_buffer, err := allocate(windows.Handle(mMap.file_memory_map_handle), start, chunk_size, true)
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
mem_map.write_map_views = append(mem_map.write_map_views, mem_buffer)
|
mMap.write_map_views = append(mMap.write_map_views, mem_buffer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -129,7 +129,7 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err
|
||||||
|
|
||||||
func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32, actualSize int64, err error) {
|
func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32, actualSize int64, err error) {
|
||||||
|
|
||||||
mem_map, exists := memory_map.FileMemoryMap[w.Name()]
|
mMap, exists := memory_map.FileMemoryMap[w.Name()]
|
||||||
if !exists {
|
if !exists {
|
||||||
if end, e := w.Seek(0, io.SeekEnd); e == nil {
|
if end, e := w.Seek(0, io.SeekEnd); e == nil {
|
||||||
defer func(w *os.File, off int64) {
|
defer func(w *os.File, off int64) {
|
||||||
|
@ -145,14 +145,14 @@ func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
offset = uint64(mem_map.End_Of_File + 1)
|
offset = uint64(mMap.End_Of_File + 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version)
|
bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version)
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if exists {
|
if exists {
|
||||||
mem_map.WriteMemory(offset, uint64(len(bytesToWrite)), bytesToWrite)
|
mMap.WriteMemory(offset, uint64(len(bytesToWrite)), bytesToWrite)
|
||||||
} else {
|
} else {
|
||||||
_, err = w.Write(bytesToWrite)
|
_, err = w.Write(bytesToWrite)
|
||||||
}
|
}
|
||||||
|
@ -166,11 +166,11 @@ func ReadNeedleBlob(r *os.File, offset int64, size uint32, version Version) (dat
|
||||||
dataSize := GetActualSize(size, version)
|
dataSize := GetActualSize(size, version)
|
||||||
dataSlice = make([]byte, dataSize)
|
dataSlice = make([]byte, dataSize)
|
||||||
|
|
||||||
mem_map, exists := memory_map.FileMemoryMap[r.Name()]
|
mMap, exists := memory_map.FileMemoryMap[r.Name()]
|
||||||
if exists {
|
if exists {
|
||||||
mem_buffer, err := mem_map.ReadMemory(uint64(offset), uint64(dataSize))
|
mBuffer, err := mMap.ReadMemory(uint64(offset), uint64(dataSize))
|
||||||
copy(dataSlice, mem_buffer.Buffer)
|
copy(dataSlice, mBuffer.Buffer)
|
||||||
mem_buffer.ReleaseMemory()
|
mBuffer.ReleaseMemory()
|
||||||
return dataSlice, err
|
return dataSlice, err
|
||||||
} else {
|
} else {
|
||||||
_, err = r.ReadAt(dataSlice, offset)
|
_, err = r.ReadAt(dataSlice, offset)
|
||||||
|
@ -289,9 +289,9 @@ func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, byt
|
||||||
if version == Version1 || version == Version2 || version == Version3 {
|
if version == Version1 || version == Version2 || version == Version3 {
|
||||||
bytes = make([]byte, NeedleHeaderSize)
|
bytes = make([]byte, NeedleHeaderSize)
|
||||||
|
|
||||||
mem_map, exists := memory_map.FileMemoryMap[r.Name()]
|
mMap, exists := memory_map.FileMemoryMap[r.Name()]
|
||||||
if exists {
|
if exists {
|
||||||
mem_buffer, err := mem_map.ReadMemory(uint64(offset), NeedleHeaderSize)
|
mem_buffer, err := mMap.ReadMemory(uint64(offset), NeedleHeaderSize)
|
||||||
copy(bytes, mem_buffer.Buffer)
|
copy(bytes, mem_buffer.Buffer)
|
||||||
mem_buffer.ReleaseMemory()
|
mem_buffer.ReleaseMemory()
|
||||||
|
|
||||||
|
|
|
@ -101,14 +101,14 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) {
|
||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, in_memory bool) error {
|
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapped bool) error {
|
||||||
if s.findVolume(vid) != nil {
|
if s.findVolume(vid) != nil {
|
||||||
return fmt.Errorf("Volume Id %d already exists!", vid)
|
return fmt.Errorf("Volume Id %d already exists!", vid)
|
||||||
}
|
}
|
||||||
if location := s.FindFreeLocation(); location != nil {
|
if location := s.FindFreeLocation(); location != nil {
|
||||||
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
|
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
|
||||||
location.Directory, vid, collection, replicaPlacement, ttl)
|
location.Directory, vid, collection, replicaPlacement, ttl)
|
||||||
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, in_memory); err == nil {
|
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapped); err == nil {
|
||||||
location.SetVolume(vid, volume)
|
location.SetVolume(vid, volume)
|
||||||
glog.V(0).Infof("add volume %d", vid)
|
glog.V(0).Infof("add volume %d", vid)
|
||||||
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
|
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
|
||||||
|
|
|
@ -14,9 +14,9 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) {
|
||||||
}
|
}
|
||||||
return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId)
|
return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId)
|
||||||
}
|
}
|
||||||
func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64, in_memory bool) error {
|
func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error {
|
||||||
if v := s.findVolume(vid); v != nil {
|
if v := s.findVolume(vid); v != nil {
|
||||||
return v.Compact(preallocate, compactionBytePerSecond, in_memory)
|
return v.Compact(preallocate, compactionBytePerSecond)
|
||||||
}
|
}
|
||||||
return fmt.Errorf("volume id %d is not found during compact", vid)
|
return fmt.Errorf("volume id %d is not found during compact", vid)
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ type Volume struct {
|
||||||
nm NeedleMapper
|
nm NeedleMapper
|
||||||
needleMapKind NeedleMapType
|
needleMapKind NeedleMapType
|
||||||
readOnly bool
|
readOnly bool
|
||||||
|
MemoryMapped bool
|
||||||
|
|
||||||
SuperBlock
|
SuperBlock
|
||||||
|
|
||||||
|
@ -38,12 +39,12 @@ type Volume struct {
|
||||||
isCompacting bool
|
isCompacting bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, in_memory bool) (v *Volume, e error) {
|
func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapped bool) (v *Volume, e error) {
|
||||||
// if replicaPlacement is nil, the superblock will be loaded from disk
|
// if replicaPlacement is nil, the superblock will be loaded from disk
|
||||||
v = &Volume{dir: dirname, Collection: collection, Id: id}
|
v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapped: memoryMapped}
|
||||||
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
|
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
|
||||||
v.needleMapKind = needleMapKind
|
v.needleMapKind = needleMapKind
|
||||||
e = v.load(true, true, needleMapKind, preallocate, in_memory)
|
e = v.load(true, true, needleMapKind, preallocate)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
func (v *Volume) String() string {
|
func (v *Volume) String() string {
|
||||||
|
|
|
@ -8,7 +8,7 @@ import (
|
||||||
"github.com/joeslay/seaweedfs/weed/glog"
|
"github.com/joeslay/seaweedfs/weed/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
func createVolumeFile(fileName string, preallocate int64) (file *os.File, e error) {
|
func createVolumeFile(fileName string, preallocate int64, useMemoryMap bool) (*os.File, error) {
|
||||||
file, e = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
file, e = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
||||||
if preallocate > 0 {
|
if preallocate > 0 {
|
||||||
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
|
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"github.com/joeslay/seaweedfs/weed/glog"
|
"github.com/joeslay/seaweedfs/weed/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
func createVolumeFile(fileName string, preallocate int64) (file *os.File, e error) {
|
func createVolumeFile(fileName string, preallocate int64, useMemoryMap bool) (file *os.File, e error) {
|
||||||
file, e = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
file, e = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
||||||
if preallocate != 0 {
|
if preallocate != 0 {
|
||||||
syscall.Fallocate(int(file.Fd()), 1, 0, preallocate)
|
syscall.Fallocate(int(file.Fd()), 1, 0, preallocate)
|
||||||
|
|
|
@ -12,16 +12,17 @@ import (
|
||||||
"github.com/joeslay/seaweedfs/weed/os_overloads"
|
"github.com/joeslay/seaweedfs/weed/os_overloads"
|
||||||
)
|
)
|
||||||
|
|
||||||
func createVolumeFile(fileName string, preallocate int64, in_memory bool) (*os.File, error) {
|
func createVolumeFile(fileName string, preallocate int64, useMemoryMap bool) (*os.File, error) {
|
||||||
|
|
||||||
mem_map, exists := memory_map.FileMemoryMap[fileName]
|
useMemoryMap = true
|
||||||
|
mMap, exists := memory_map.FileMemoryMap[fileName]
|
||||||
if !exists {
|
if !exists {
|
||||||
file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, in_memory)
|
file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, useMemoryMap)
|
||||||
if in_memory {
|
if useMemoryMap {
|
||||||
memory_map.FileMemoryMap[fileName] = new(memory_map.MemoryMap)
|
memory_map.FileMemoryMap[fileName] = new(memory_map.MemoryMap)
|
||||||
|
|
||||||
new_mem_map := memory_map.FileMemoryMap[fileName]
|
new_mMap := memory_map.FileMemoryMap[fileName]
|
||||||
new_mem_map.CreateMemoryMap(file, 1024*1024*1024*2)
|
new_mMap.CreateMemoryMap(file, 1024*1024*1024*2)
|
||||||
}
|
}
|
||||||
|
|
||||||
if preallocate > 0 {
|
if preallocate > 0 {
|
||||||
|
@ -29,6 +30,6 @@ func createVolumeFile(fileName string, preallocate int64, in_memory bool) (*os.F
|
||||||
}
|
}
|
||||||
return file, e
|
return file, e
|
||||||
} else {
|
} else {
|
||||||
return mem_map.File, nil
|
return mMap.File, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,11 +16,11 @@ func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeI
|
||||||
v = &Volume{dir: dirname, Collection: collection, Id: id}
|
v = &Volume{dir: dirname, Collection: collection, Id: id}
|
||||||
v.SuperBlock = SuperBlock{}
|
v.SuperBlock = SuperBlock{}
|
||||||
v.needleMapKind = needleMapKind
|
v.needleMapKind = needleMapKind
|
||||||
e = v.load(false, false, needleMapKind, 0, false)
|
e = v.load(false, false, needleMapKind, 0)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64, in_memory bool) error {
|
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) error {
|
||||||
var e error
|
var e error
|
||||||
fileName := v.FileName()
|
fileName := v.FileName()
|
||||||
alreadyHasSuperBlock := false
|
alreadyHasSuperBlock := false
|
||||||
|
@ -42,7 +42,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if createDatIfMissing {
|
if createDatIfMissing {
|
||||||
v.dataFile, e = createVolumeFile(fileName+".dat", preallocate, in_memory)
|
v.dataFile, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapped)
|
||||||
} else {
|
} else {
|
||||||
return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
|
return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,9 +49,9 @@ func (v *Volume) Destroy() (err error) {
|
||||||
err = fmt.Errorf("volume %d is compacting", v.Id)
|
err = fmt.Errorf("volume %d is compacting", v.Id)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
mem_map, exists := memory_map.FileMemoryMap[v.dataFile.Name()]
|
mMap, exists := memory_map.FileMemoryMap[v.dataFile.Name()]
|
||||||
if exists {
|
if exists {
|
||||||
mem_map.DeleteFileAndMemoryMap()
|
mMap.DeleteFileAndMemoryMap()
|
||||||
delete(memory_map.FileMemoryMap, v.dataFile.Name())
|
delete(memory_map.FileMemoryMap, v.dataFile.Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -73,11 +73,11 @@ func (s *SuperBlock) Bytes() []byte {
|
||||||
|
|
||||||
func (v *Volume) maybeWriteSuperBlock() error {
|
func (v *Volume) maybeWriteSuperBlock() error {
|
||||||
|
|
||||||
mem_map, exists := memory_map.FileMemoryMap[v.dataFile.Name()]
|
mMap, exists := memory_map.FileMemoryMap[v.dataFile.Name()]
|
||||||
if exists {
|
if exists {
|
||||||
if mem_map.End_Of_File == -1 {
|
if mMap.End_Of_File == -1 {
|
||||||
v.SuperBlock.version = needle.CurrentVersion
|
v.SuperBlock.version = needle.CurrentVersion
|
||||||
mem_map.WriteMemory(0, uint64(len(v.SuperBlock.Bytes())), v.SuperBlock.Bytes())
|
mMap.WriteMemory(0, uint64(len(v.SuperBlock.Bytes())), v.SuperBlock.Bytes())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
} else {
|
} else {
|
||||||
|
@ -111,9 +111,9 @@ func (v *Volume) readSuperBlock() (err error) {
|
||||||
func ReadSuperBlock(dataFile *os.File) (superBlock SuperBlock, err error) {
|
func ReadSuperBlock(dataFile *os.File) (superBlock SuperBlock, err error) {
|
||||||
|
|
||||||
header := make([]byte, _SuperBlockSize)
|
header := make([]byte, _SuperBlockSize)
|
||||||
mem_map, exists := memory_map.FileMemoryMap[dataFile.Name()]
|
mMap, exists := memory_map.FileMemoryMap[dataFile.Name()]
|
||||||
if exists {
|
if exists {
|
||||||
mem_buffer, e := mem_map.ReadMemory(0, _SuperBlockSize)
|
mem_buffer, e := mMap.ReadMemory(0, _SuperBlockSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e)
|
err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e)
|
||||||
return
|
return
|
||||||
|
|
|
@ -8,7 +8,6 @@ import (
|
||||||
"github.com/joeslay/seaweedfs/weed/glog"
|
"github.com/joeslay/seaweedfs/weed/glog"
|
||||||
"github.com/joeslay/seaweedfs/weed/stats"
|
"github.com/joeslay/seaweedfs/weed/stats"
|
||||||
idx2 "github.com/joeslay/seaweedfs/weed/storage/idx"
|
idx2 "github.com/joeslay/seaweedfs/weed/storage/idx"
|
||||||
"github.com/joeslay/seaweedfs/weed/storage/memory_map"
|
|
||||||
"github.com/joeslay/seaweedfs/weed/storage/needle"
|
"github.com/joeslay/seaweedfs/weed/storage/needle"
|
||||||
"github.com/joeslay/seaweedfs/weed/storage/needle_map"
|
"github.com/joeslay/seaweedfs/weed/storage/needle_map"
|
||||||
. "github.com/joeslay/seaweedfs/weed/storage/types"
|
. "github.com/joeslay/seaweedfs/weed/storage/types"
|
||||||
|
@ -22,10 +21,9 @@ func (v *Volume) garbageLevel() float64 {
|
||||||
return float64(v.DeletedSize()) / float64(v.ContentSize())
|
return float64(v.DeletedSize()) / float64(v.ContentSize())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64, in_memory bool) error {
|
func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error {
|
||||||
|
|
||||||
_, exists := memory_map.FileMemoryMap[v.dataFile.Name()]
|
if !v.MemoryMapped { //it makes no sense to compact in memory
|
||||||
if !exists { //it makes no sense to compact in memory
|
|
||||||
glog.V(3).Infof("Compacting volume %d ...", v.Id)
|
glog.V(3).Infof("Compacting volume %d ...", v.Id)
|
||||||
//no need to lock for copy on write
|
//no need to lock for copy on write
|
||||||
//v.accessLock.Lock()
|
//v.accessLock.Lock()
|
||||||
|
@ -40,16 +38,15 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64, in_me
|
||||||
v.lastCompactIndexOffset = v.IndexFileSize()
|
v.lastCompactIndexOffset = v.IndexFileSize()
|
||||||
v.lastCompactRevision = v.SuperBlock.CompactionRevision
|
v.lastCompactRevision = v.SuperBlock.CompactionRevision
|
||||||
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
|
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
|
||||||
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond, in_memory)
|
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond)
|
||||||
} else {
|
} else {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) Compact2() error {
|
func (v *Volume) Compact2() error {
|
||||||
_, exists := memory_map.FileMemoryMap[v.dataFile.Name()]
|
|
||||||
if !exists { //it makes no sense to compact in memory
|
|
||||||
|
|
||||||
|
if !v.MemoryMapped { //it makes no sense to compact in memory
|
||||||
glog.V(3).Infof("Compact2 volume %d ...", v.Id)
|
glog.V(3).Infof("Compact2 volume %d ...", v.Id)
|
||||||
|
|
||||||
v.isCompacting = true
|
v.isCompacting = true
|
||||||
|
@ -66,8 +63,7 @@ func (v *Volume) Compact2() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) CommitCompact() error {
|
func (v *Volume) CommitCompact() error {
|
||||||
_, exists := memory_map.FileMemoryMap[v.dataFile.Name()]
|
if !v.MemoryMapped { //it makes no sense to compact in memory
|
||||||
if !exists { //it makes no sense to compact in memory
|
|
||||||
glog.V(0).Infof("Committing volume %d vacuuming...", v.Id)
|
glog.V(0).Infof("Committing volume %d vacuuming...", v.Id)
|
||||||
|
|
||||||
v.isCompacting = true
|
v.isCompacting = true
|
||||||
|
@ -114,7 +110,7 @@ func (v *Volume) CommitCompact() error {
|
||||||
os.RemoveAll(v.FileName() + ".bdb")
|
os.RemoveAll(v.FileName() + ".bdb")
|
||||||
|
|
||||||
glog.V(3).Infof("Loading volume %d commit file...", v.Id)
|
glog.V(3).Infof("Loading volume %d commit file...", v.Id)
|
||||||
if e = v.load(true, false, v.needleMapKind, 0, false); e != nil {
|
if e = v.load(true, false, v.needleMapKind, 0); e != nil {
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -311,11 +307,11 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64, in_memory bool) (err error) {
|
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
|
||||||
var (
|
var (
|
||||||
dst, idx *os.File
|
dst, idx *os.File
|
||||||
)
|
)
|
||||||
if dst, err = createVolumeFile(dstName, preallocate, in_memory); err != nil {
|
if dst, err = createVolumeFile(dstName, preallocate, v.MemoryMapped); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer dst.Close()
|
defer dst.Close()
|
||||||
|
|
|
@ -68,7 +68,7 @@ func TestCompaction(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(dir) // clean up
|
defer os.RemoveAll(dir) // clean up
|
||||||
|
|
||||||
v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &ReplicaPlacement{}, &needle.TTL{}, 0)
|
v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &ReplicaPlacement{}, &needle.TTL{}, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("volume creation: %v", err)
|
t.Fatalf("volume creation: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -95,7 +95,7 @@ func TestCompaction(t *testing.T) {
|
||||||
|
|
||||||
v.Close()
|
v.Close()
|
||||||
|
|
||||||
v, err = NewVolume(dir, "", 1, NeedleMapInMemory, nil, nil, 0)
|
v, err = NewVolume(dir, "", 1, NeedleMapInMemory, nil, nil, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("volume reloading: %v", err)
|
t.Fatalf("volume reloading: %v", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue