mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge pull request #1784 from bingoohuang/master
This commit is contained in:
commit
7fca4324f9
|
@ -24,7 +24,7 @@ type VolumeServer struct {
|
||||||
guard *security.Guard
|
guard *security.Guard
|
||||||
grpcDialOption grpc.DialOption
|
grpcDialOption grpc.DialOption
|
||||||
|
|
||||||
needleMapKind storage.NeedleMapType
|
needleMapKind storage.NeedleMapKind
|
||||||
FixJpgOrientation bool
|
FixJpgOrientation bool
|
||||||
ReadRedirect bool
|
ReadRedirect bool
|
||||||
compactionBytePerSecond int64
|
compactionBytePerSecond int64
|
||||||
|
@ -39,7 +39,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
|
||||||
port int, publicUrl string,
|
port int, publicUrl string,
|
||||||
folders []string, maxCounts []int, minFreeSpacePercents []float32,
|
folders []string, maxCounts []int, minFreeSpacePercents []float32,
|
||||||
idxFolder string,
|
idxFolder string,
|
||||||
needleMapKind storage.NeedleMapType,
|
needleMapKind storage.NeedleMapKind,
|
||||||
masterNodes []string, pulseSeconds int,
|
masterNodes []string, pulseSeconds int,
|
||||||
dataCenter string, rack string,
|
dataCenter string, rack string,
|
||||||
whiteList []string,
|
whiteList []string,
|
||||||
|
|
|
@ -82,7 +82,7 @@ func getValidVolumeName(basename string) string {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool {
|
func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapKind) bool {
|
||||||
basename := fileInfo.Name()
|
basename := fileInfo.Name()
|
||||||
if fileInfo.IsDir() {
|
if fileInfo.IsDir() {
|
||||||
return false
|
return false
|
||||||
|
@ -133,7 +133,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) {
|
func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int) {
|
||||||
|
|
||||||
task_queue := make(chan os.FileInfo, 10*concurrency)
|
task_queue := make(chan os.FileInfo, 10*concurrency)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -167,7 +167,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
|
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) {
|
||||||
|
|
||||||
l.concurrentLoadingVolumes(needleMapKind, 10)
|
l.concurrentLoadingVolumes(needleMapKind, 10)
|
||||||
glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
|
glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
|
||||||
|
@ -237,7 +237,7 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e erro
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool {
|
func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool {
|
||||||
if fileInfo, found := l.LocateVolume(vid); found {
|
if fileInfo, found := l.LocateVolume(vid); found {
|
||||||
return l.loadExistingVolume(fileInfo, needleMapKind)
|
return l.loadExistingVolume(fileInfo, needleMapKind)
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,10 +11,10 @@ import (
|
||||||
. "github.com/chrislusf/seaweedfs/weed/storage/types"
|
. "github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type NeedleMapType int
|
type NeedleMapKind int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
NeedleMapInMemory NeedleMapType = iota
|
NeedleMapInMemory NeedleMapKind = iota
|
||||||
NeedleMapLevelDb // small memory footprint, 4MB total, 1 write buffer, 3 block buffer
|
NeedleMapLevelDb // small memory footprint, 4MB total, 1 write buffer, 3 block buffer
|
||||||
NeedleMapLevelDbMedium // medium memory footprint, 8MB total, 3 write buffer, 5 block buffer
|
NeedleMapLevelDbMedium // medium memory footprint, 8MB total, 3 write buffer, 5 block buffer
|
||||||
NeedleMapLevelDbLarge // large memory footprint, 12MB total, 4write buffer, 8 block buffer
|
NeedleMapLevelDbLarge // large memory footprint, 12MB total, 4write buffer, 8 block buffer
|
||||||
|
|
|
@ -40,7 +40,7 @@ type Store struct {
|
||||||
dataCenter string // optional informaton, overwriting master setting if exists
|
dataCenter string // optional informaton, overwriting master setting if exists
|
||||||
rack string // optional information, overwriting master setting if exists
|
rack string // optional information, overwriting master setting if exists
|
||||||
connected bool
|
connected bool
|
||||||
NeedleMapType NeedleMapType
|
NeedleMapKind NeedleMapKind
|
||||||
NewVolumesChan chan master_pb.VolumeShortInformationMessage
|
NewVolumesChan chan master_pb.VolumeShortInformationMessage
|
||||||
DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
|
DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
|
||||||
NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage
|
NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage
|
||||||
|
@ -52,8 +52,8 @@ func (s *Store) String() (str string) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapType) (s *Store) {
|
func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapKind) (s *Store) {
|
||||||
s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind}
|
s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapKind: needleMapKind}
|
||||||
s.Locations = make([]*DiskLocation, 0)
|
s.Locations = make([]*DiskLocation, 0)
|
||||||
for i := 0; i < len(dirnames); i++ {
|
for i := 0; i < len(dirnames); i++ {
|
||||||
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder)
|
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder)
|
||||||
|
@ -69,7 +69,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error {
|
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error {
|
||||||
rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
|
rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
return e
|
return e
|
||||||
|
@ -114,7 +114,7 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) {
|
||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
|
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
|
||||||
if s.findVolume(vid) != nil {
|
if s.findVolume(vid) != nil {
|
||||||
return fmt.Errorf("Volume Id %d already exists!", vid)
|
return fmt.Errorf("Volume Id %d already exists!", vid)
|
||||||
}
|
}
|
||||||
|
@ -362,7 +362,7 @@ func (s *Store) MarkVolumeWritable(i needle.VolumeId) error {
|
||||||
|
|
||||||
func (s *Store) MountVolume(i needle.VolumeId) error {
|
func (s *Store) MountVolume(i needle.VolumeId) error {
|
||||||
for _, location := range s.Locations {
|
for _, location := range s.Locations {
|
||||||
if found := location.LoadVolume(i, s.NeedleMapType); found == true {
|
if found := location.LoadVolume(i, s.NeedleMapKind); found == true {
|
||||||
glog.V(0).Infof("mount volume %d", i)
|
glog.V(0).Infof("mount volume %d", i)
|
||||||
v := s.findVolume(i)
|
v := s.findVolume(i)
|
||||||
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
|
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
|
||||||
|
|
|
@ -25,7 +25,7 @@ type Volume struct {
|
||||||
Collection string
|
Collection string
|
||||||
DataBackend backend.BackendStorageFile
|
DataBackend backend.BackendStorageFile
|
||||||
nm NeedleMapper
|
nm NeedleMapper
|
||||||
needleMapKind NeedleMapType
|
needleMapKind NeedleMapKind
|
||||||
noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
|
noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
|
||||||
noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
|
noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
|
||||||
noWriteLock sync.RWMutex
|
noWriteLock sync.RWMutex
|
||||||
|
@ -50,7 +50,7 @@ type Volume struct {
|
||||||
lastIoError error
|
lastIoError error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
|
func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
|
||||||
// if replicaPlacement is nil, the superblock will be loaded from disk
|
// if replicaPlacement is nil, the superblock will be loaded from disk
|
||||||
v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
|
v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
|
||||||
asyncRequestsChan: make(chan *needle.AsyncRequest, 128)}
|
asyncRequestsChan: make(chan *needle.AsyncRequest, 128)}
|
||||||
|
|
|
@ -14,7 +14,7 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, err error) {
|
func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind) (v *Volume, err error) {
|
||||||
v = &Volume{dir: dirname, Collection: collection, Id: id}
|
v = &Volume{dir: dirname, Collection: collection, Id: id}
|
||||||
v.SuperBlock = super_block.SuperBlock{}
|
v.SuperBlock = super_block.SuperBlock{}
|
||||||
v.needleMapKind = needleMapKind
|
v.needleMapKind = needleMapKind
|
||||||
|
@ -22,7 +22,7 @@ func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeI
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) (err error) {
|
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64) (err error) {
|
||||||
alreadyHasSuperBlock := false
|
alreadyHasSuperBlock := false
|
||||||
|
|
||||||
hasLoadedVolume := false
|
hasLoadedVolume := false
|
||||||
|
|
|
@ -410,7 +410,7 @@ type VolumeFileScanner interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
|
func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
|
||||||
needleMapKind NeedleMapType,
|
needleMapKind NeedleMapKind,
|
||||||
volumeFileScanner VolumeFileScanner) (err error) {
|
volumeFileScanner VolumeFileScanner) (err error) {
|
||||||
var v *Volume
|
var v *Volume
|
||||||
if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
|
if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
|
||||||
|
|
Loading…
Reference in a new issue