diff --git a/weed/command/backup.go b/weed/command/backup.go index 167f07225..64924c3c6 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -112,14 +112,14 @@ func runBackup(cmd *Command, args []string) bool { return true } } - v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0) + v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, false) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true } if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { - if err = v.Compact(0, 0); err != nil { + if err = v.Compact(0, 0, false); err != nil { fmt.Printf("Compact Volume before synchronizing %v\n", err) return true } @@ -137,7 +137,7 @@ func runBackup(cmd *Command, args []string) bool { // remove the old data v.Destroy() // recreate an empty volume - v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0) + v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, false) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true diff --git a/weed/command/compact.go b/weed/command/compact.go index df4bcdd6e..7a61f0b95 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -26,6 +26,7 @@ var ( compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.") compactMethod = cmdCompact.Flag.Int("method", 0, "option to choose which compact method. use 0 or 1.") compactVolumePreallocate = cmdCompact.Flag.Int64("preallocateMB", 0, "preallocate volume disk space") + compactVolumeInMemory = cmdCompact.Flag.Bool("volumeInMemory", false, "create the volume in memory") ) func runCompact(cmd *Command, args []string) bool { @@ -35,15 +36,16 @@ func runCompact(cmd *Command, args []string) bool { } preallocate := *compactVolumePreallocate * (1 << 20) + inMemory := *compactVolumeInMemory vid := needle.VolumeId(*compactVolumeId) v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, - storage.NeedleMapInMemory, nil, nil, preallocate) + storage.NeedleMapInMemory, nil, nil, preallocate, inMemory) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } if *compactMethod == 0 { - if err = v.Compact(preallocate, 0); err != nil { + if err = v.Compact(preallocate, 0, inMemory); err != nil { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } else { diff --git a/weed/pb/master.proto b/weed/pb/master.proto index 9cf46ab92..2bc05e2f0 100644 --- a/weed/pb/master.proto +++ b/weed/pb/master.proto @@ -139,6 +139,7 @@ message AssignRequest { string data_center = 5; string rack = 6; string data_node = 7; + bool in_memory = 8; } message AssignResponse { string fid = 1; diff --git a/weed/pb/master_pb/master.pb.go b/weed/pb/master_pb/master.pb.go index f2d9420f0..ed7ef8082 100644 --- a/weed/pb/master_pb/master.pb.go +++ b/weed/pb/master_pb/master.pb.go @@ -44,12 +44,15 @@ It has these top-level messages: */ package master_pb -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" - import ( + fmt "fmt" + + proto "github.com/golang/protobuf/proto" + + math "math" + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" ) @@ -655,6 +658,7 @@ type AssignRequest struct { DataCenter string `protobuf:"bytes,5,opt,name=data_center,json=dataCenter" json:"data_center,omitempty"` Rack string `protobuf:"bytes,6,opt,name=rack" json:"rack,omitempty"` DataNode string `protobuf:"bytes,7,opt,name=data_node,json=dataNode" json:"data_node,omitempty"` + InMemory bool `protobuf:"bytes,4,opt,name=inmemory" json:"inmemory,omitempty"` } func (m *AssignRequest) Reset() { *m = AssignRequest{} } @@ -711,6 +715,13 @@ func (m *AssignRequest) GetDataNode() string { return "" } +func (m *AssignRequest) GetInMemory() bool { + if m != nil { + return m.InMemory + } + return false +} + type AssignResponse struct { Fid string `protobuf:"bytes,1,opt,name=fid" json:"fid,omitempty"` Url string `protobuf:"bytes,2,opt,name=url" json:"url,omitempty"` diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto index 4004875ed..f2d1f61bd 100644 --- a/weed/pb/volume_server.proto +++ b/weed/pb/volume_server.proto @@ -99,6 +99,7 @@ message VacuumVolumeCheckResponse { message VacuumVolumeCompactRequest { uint32 volume_id = 1; int64 preallocate = 2; + bool inmemory = 3; } message VacuumVolumeCompactResponse { } @@ -127,6 +128,7 @@ message AllocateVolumeRequest { int64 preallocate = 3; string replication = 4; string ttl = 5; + bool inmemory = 6; } message AllocateVolumeResponse { } diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index 13d14b1e5..02aa78989 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -68,12 +68,15 @@ It has these top-level messages: */ package volume_server_pb -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" - import ( + fmt "fmt" + + proto "github.com/golang/protobuf/proto" + + math "math" + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" ) @@ -211,6 +214,7 @@ func (m *VacuumVolumeCheckResponse) GetGarbageRatio() float64 { type VacuumVolumeCompactRequest struct { VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` Preallocate int64 `protobuf:"varint,2,opt,name=preallocate" json:"preallocate,omitempty"` + InMemory bool `protobuf:"varint,2,opt,name=inmemory" json:"inmemory,omitempty"` } func (m *VacuumVolumeCompactRequest) Reset() { *m = VacuumVolumeCompactRequest{} } @@ -232,6 +236,13 @@ func (m *VacuumVolumeCompactRequest) GetPreallocate() int64 { return 0 } +func (m *VacuumVolumeCompactRequest) GetInMemory() bool { + if m != nil { + return m.InMemory + } + return false +} + type VacuumVolumeCompactResponse struct { } @@ -318,6 +329,7 @@ type AllocateVolumeRequest struct { Preallocate int64 `protobuf:"varint,3,opt,name=preallocate" json:"preallocate,omitempty"` Replication string `protobuf:"bytes,4,opt,name=replication" json:"replication,omitempty"` Ttl string `protobuf:"bytes,5,opt,name=ttl" json:"ttl,omitempty"` + InMemory bool `protobuf:"varint,6,opt,name=inmemory" json:"inmemory,omitempty"` } func (m *AllocateVolumeRequest) Reset() { *m = AllocateVolumeRequest{} } @@ -360,6 +372,13 @@ func (m *AllocateVolumeRequest) GetTtl() string { return "" } +func (m *AllocateVolumeRequest) GetInMemory() bool { + if m != nil { + return m.InMemory + } + return false +} + type AllocateVolumeResponse struct { } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 67196211f..93d8c4709 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -35,6 +35,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p req.Replication, req.Ttl, req.Preallocate, + req.GetInMemory(), ) if err != nil { diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go index 205843496..020b9347c 100644 --- a/weed/server/volume_grpc_vacuum.go +++ b/weed/server/volume_grpc_vacuum.go @@ -28,7 +28,7 @@ func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_ser resp := &volume_server_pb.VacuumVolumeCompactResponse{} - err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond) + err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond, req.InMemory) if err != nil { glog.Errorf("compact volume %d: %v", req.VolumeId, err) diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 5148dfa2d..a1104ed8e 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -60,7 +60,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne _, found := l.volumes[vid] l.RUnlock() if !found { - if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, false); e == nil { l.Lock() l.volumes[vid] = v l.Unlock() diff --git a/weed/storage/store.go b/weed/storage/store.go index d11c74d50..e9f1065e7 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -59,7 +59,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di return } -func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error { +func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, in_memory bool) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -68,7 +68,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap if e != nil { return e } - e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate) + e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, in_memory) return e } func (s *Store) DeleteCollection(collection string) (e error) { @@ -101,14 +101,14 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64) error { +func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, in_memory bool) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.FindFreeLocation(); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate); err == nil { + if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, in_memory); err == nil { location.SetVolume(vid, volume) glog.V(0).Infof("add volume %d", vid) s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go index 6acf5b10e..3e244c832 100644 --- a/weed/storage/store_vacuum.go +++ b/weed/storage/store_vacuum.go @@ -14,9 +14,9 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) { } return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId) } -func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error { +func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64, in_memory bool) error { if v := s.findVolume(vid); v != nil { - return v.Compact(preallocate, compactionBytePerSecond) + return v.Compact(preallocate, compactionBytePerSecond, in_memory) } return fmt.Errorf("volume id %d is not found during compact", vid) } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 4a89667f0..59cb6d504 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -38,12 +38,12 @@ type Volume struct { isCompacting bool } -func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, in_memory bool) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} v.needleMapKind = needleMapKind - e = v.load(true, true, needleMapKind, preallocate) + e = v.load(true, true, needleMapKind, preallocate, in_memory) return } func (v *Volume) String() string { diff --git a/weed/storage/volume_create_windows.go b/weed/storage/volume_create_windows.go index 658086079..6e8627feb 100644 --- a/weed/storage/volume_create_windows.go +++ b/weed/storage/volume_create_windows.go @@ -12,15 +12,17 @@ import ( "github.com/joeslay/seaweedfs/weed/os_overloads" ) -func createVolumeFile(fileName string, preallocate int64) (*os.File, error) { +func createVolumeFile(fileName string, preallocate int64, in_memory bool) (*os.File, error) { mem_map, exists := memory_map.FileMemoryMap[fileName] if !exists { - file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true) - memory_map.FileMemoryMap[fileName] = new(memory_map.MemoryMap) + file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, in_memory) + if in_memory { + memory_map.FileMemoryMap[fileName] = new(memory_map.MemoryMap) - new_mem_map := memory_map.FileMemoryMap[fileName] - new_mem_map.CreateMemoryMap(file, 1024*1024*1024*2) + new_mem_map := memory_map.FileMemoryMap[fileName] + new_mem_map.CreateMemoryMap(file, 1024*1024*1024*2) + } if preallocate > 0 { glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 27878befd..d90439822 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -16,11 +16,11 @@ func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeI v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{} v.needleMapKind = needleMapKind - e = v.load(false, false, needleMapKind, 0) + e = v.load(false, false, needleMapKind, 0, false) return } -func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) error { +func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64, in_memory bool) error { var e error fileName := v.FileName() alreadyHasSuperBlock := false @@ -42,7 +42,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } } else { if createDatIfMissing { - v.dataFile, e = createVolumeFile(fileName+".dat", preallocate) + v.dataFile, e = createVolumeFile(fileName+".dat", preallocate, in_memory) } else { return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName) } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index fd43ae855..2b2b37633 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -22,7 +22,7 @@ func (v *Volume) garbageLevel() float64 { return float64(v.DeletedSize()) / float64(v.ContentSize()) } -func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error { +func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64, in_memory bool) error { _, exists := memory_map.FileMemoryMap[v.dataFile.Name()] if !exists { //it makes no sense to compact in memory @@ -40,7 +40,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error v.lastCompactIndexOffset = v.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactionRevision glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) - return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond) + return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond, in_memory) } else { return nil } @@ -114,7 +114,7 @@ func (v *Volume) CommitCompact() error { os.RemoveAll(v.FileName() + ".bdb") glog.V(3).Infof("Loading volume %d commit file...", v.Id) - if e = v.load(true, false, v.needleMapKind, 0); e != nil { + if e = v.load(true, false, v.needleMapKind, 0, false); e != nil { return e } } @@ -311,11 +311,11 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in return nil } -func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) { +func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64, in_memory bool) (err error) { var ( dst, idx *os.File ) - if dst, err = createVolumeFile(dstName, preallocate); err != nil { + if dst, err = createVolumeFile(dstName, preallocate, in_memory); err != nil { return } defer dst.Close()