seaweedfs/weed/storage/store.go

343 lines
10 KiB
Go
Raw Normal View History

package storage
import (
2013-07-04 05:14:16 +00:00
"fmt"
2019-04-15 06:00:37 +00:00
"sync/atomic"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"google.golang.org/grpc"
)
const (
MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
)
/*
* A VolumeServer contains one Store
*/
type Store struct {
2019-05-28 05:54:58 +00:00
MasterAddress string
grpcDialOption grpc.DialOption
volumeSizeLimit uint64 //read from the master
Ip string
Port int
PublicUrl string
Locations []*DiskLocation
dataCenter string //optional informaton, overwriting master setting if exists
rack string //optional information, overwriting master setting if exists
connected bool
NeedleMapType NeedleMapType
NewVolumesChan chan master_pb.VolumeShortInformationMessage
DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage
DeletedEcShardsChan chan master_pb.VolumeEcShardInformationMessage
}
func (s *Store) String() (str string) {
2019-04-15 06:00:37 +00:00
str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit())
return
}
func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind}
2014-03-26 20:22:27 +00:00
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
2016-04-27 03:45:35 +00:00
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i])
location.loadExistingVolumes(needleMapKind)
2014-03-26 20:22:27 +00:00
s.Locations = append(s.Locations, location)
2019-06-18 04:02:50 +00:00
stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i]))
}
2019-04-20 18:35:20 +00:00
s.NewVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3)
s.DeletedVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3)
s.NewEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3)
s.DeletedEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3)
return
}
2019-10-22 05:57:01 +00:00
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
2012-09-26 08:55:56 +00:00
if e != nil {
return e
}
2019-04-19 04:43:36 +00:00
ttl, e := needle.ReadTTL(ttlString)
if e != nil {
return e
}
2019-10-22 05:57:01 +00:00
e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb)
2012-09-13 07:04:56 +00:00
return e
}
func (s *Store) DeleteCollection(collection string) (e error) {
2014-03-26 20:22:27 +00:00
for _, location := range s.Locations {
2016-04-27 03:45:35 +00:00
e = location.DeleteCollectionFromDiskLocation(collection)
if e != nil {
return
}
2019-04-20 18:35:20 +00:00
// let the heartbeat send the list of volumes, instead of sending the deleted volume ids to DeletedVolumesChan
}
return
}
2016-04-27 03:45:35 +00:00
2019-04-19 04:43:36 +00:00
func (s *Store) findVolume(vid needle.VolumeId) *Volume {
2014-03-26 20:22:27 +00:00
for _, location := range s.Locations {
if v, found := location.FindVolume(vid); found {
return v
}
}
return nil
}
func (s *Store) FindFreeLocation() (ret *DiskLocation) {
max := 0
2014-03-26 20:22:27 +00:00
for _, location := range s.Locations {
currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
if currentFreeCount > max {
max = currentFreeCount
ret = location
}
}
return ret
}
2019-10-22 05:57:01 +00:00
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, MemoryMapMaxSizeMb uint32) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.FindFreeLocation(); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
2019-10-22 05:57:01 +00:00
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, MemoryMapMaxSizeMb); err == nil {
location.SetVolume(vid, volume)
2019-04-21 17:14:17 +00:00
glog.V(0).Infof("add volume %d", vid)
2019-04-20 18:35:20 +00:00
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
Id: uint32(vid),
Collection: collection,
ReplicaPlacement: uint32(replicaPlacement.Byte()),
2019-04-21 06:53:37 +00:00
Version: uint32(volume.Version()),
2019-04-20 18:35:20 +00:00
Ttl: ttl.ToUint32(),
}
return nil
2013-07-20 03:38:00 +00:00
} else {
return err
2013-07-20 03:38:00 +00:00
}
}
return fmt.Errorf("No more free space left")
}
2012-11-07 09:51:43 +00:00
func (s *Store) Status() []*VolumeInfo {
var stats []*VolumeInfo
2014-03-26 20:22:27 +00:00
for _, location := range s.Locations {
location.RLock()
for k, v := range location.volumes {
s := &VolumeInfo{
2019-04-19 04:43:36 +00:00
Id: needle.VolumeId(k),
Size: v.ContentSize(),
2013-11-12 10:21:22 +00:00
Collection: v.Collection,
ReplicaPlacement: v.ReplicaPlacement,
2013-11-12 10:21:22 +00:00
Version: v.Version(),
FileCount: int(v.FileCount()),
DeleteCount: int(v.DeletedCount()),
DeletedByteCount: v.DeletedSize(),
2015-03-22 19:50:04 +00:00
ReadOnly: v.readOnly,
Ttl: v.Ttl,
CompactRevision: uint32(v.CompactionRevision),
}
stats = append(stats, s)
}
location.RUnlock()
}
sortVolumeInfos(stats)
return stats
}
func (s *Store) SetDataCenter(dataCenter string) {
s.dataCenter = dataCenter
}
func (s *Store) SetRack(rack string) {
s.rack = rack
}
2018-05-10 06:11:54 +00:00
func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
var volumeMessages []*master_pb.VolumeInformationMessage
maxVolumeCount := 0
var maxFileKey NeedleId
collectionVolumeSize := make(map[string]uint64)
2014-03-26 20:22:27 +00:00
for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
location.Lock()
2019-03-18 03:27:08 +00:00
for _, v := range location.volumes {
if maxFileKey < v.MaxFileKey() {
maxFileKey = v.MaxFileKey()
}
2019-04-15 06:00:37 +00:00
if !v.expired(s.GetVolumeSizeLimit()) {
2019-03-18 03:27:08 +00:00
volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage())
} else {
2019-01-17 01:17:19 +00:00
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
2016-04-27 03:45:35 +00:00
location.deleteVolumeById(v.Id)
glog.V(0).Infoln("volume", v.Id, "is deleted.")
} else {
glog.V(0).Infoln("volume", v.Id, "is expired.")
}
}
2019-06-16 09:44:20 +00:00
fileSize, _, _ := v.FileStat()
collectionVolumeSize[v.Collection] += fileSize
}
location.Unlock()
}
for col, size := range collectionVolumeSize {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size))
}
2018-05-10 06:11:54 +00:00
return &master_pb.Heartbeat{
2017-01-10 09:01:12 +00:00
Ip: s.Ip,
Port: uint32(s.Port),
PublicUrl: s.PublicUrl,
MaxVolumeCount: uint32(maxVolumeCount),
MaxFileKey: NeedleIdToUint64(maxFileKey),
2017-01-10 09:01:12 +00:00
DataCenter: s.dataCenter,
Rack: s.rack,
Volumes: volumeMessages,
2019-06-05 20:32:33 +00:00
HasNoVolumes: len(volumeMessages) == 0,
}
}
func (s *Store) Close() {
2014-03-26 20:22:27 +00:00
for _, location := range s.Locations {
location.Close()
}
}
2019-07-18 06:43:48 +00:00
func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) {
if v := s.findVolume(i); v != nil {
2013-04-15 02:34:37 +00:00
if v.readOnly {
2019-05-26 08:14:42 +00:00
err = fmt.Errorf("volume %d is read only", i)
return
2015-03-10 07:20:31 +00:00
}
2019-07-21 18:21:30 +00:00
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.version)) {
_, size, isUnchanged, err = v.writeNeedle(n)
2013-04-15 02:34:37 +00:00
} else {
2019-07-21 18:21:30 +00:00
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
2015-03-10 07:20:31 +00:00
}
return
2012-09-11 00:08:52 +00:00
}
glog.V(0).Infoln("volume", i, "not found!")
2019-04-21 17:14:17 +00:00
err = fmt.Errorf("volume %d not found on %s:%d", i, s.Ip, s.Port)
return
}
2019-07-18 06:43:48 +00:00
func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error) {
2019-07-21 18:21:30 +00:00
if v := s.findVolume(i); v != nil {
if v.readOnly {
return 0, fmt.Errorf("volume %d is read only", i)
}
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.version)) {
return v.deleteNeedle(n)
} else {
return 0, fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
}
2012-09-11 00:08:52 +00:00
}
2019-07-21 18:21:30 +00:00
return 0, fmt.Errorf("volume %d not found on %s:%d", i, s.Ip, s.Port)
}
2019-04-19 04:43:36 +00:00
func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle) (int, error) {
if v := s.findVolume(i); v != nil {
return v.readNeedle(n)
2012-09-11 00:08:52 +00:00
}
2019-05-26 08:14:42 +00:00
return 0, fmt.Errorf("volume %d not found", i)
2012-09-11 00:08:52 +00:00
}
2019-04-19 04:43:36 +00:00
func (s *Store) GetVolume(i needle.VolumeId) *Volume {
return s.findVolume(i)
2012-09-21 00:58:29 +00:00
}
2012-09-11 00:08:52 +00:00
2019-04-19 04:43:36 +00:00
func (s *Store) HasVolume(i needle.VolumeId) bool {
v := s.findVolume(i)
return v != nil
}
2019-06-27 19:18:59 +00:00
func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error {
v := s.findVolume(i)
if v == nil {
return fmt.Errorf("volume %d not found", i)
}
v.readOnly = true
return nil
}
2019-04-19 04:43:36 +00:00
func (s *Store) MountVolume(i needle.VolumeId) error {
for _, location := range s.Locations {
if found := location.LoadVolume(i, s.NeedleMapType); found == true {
2019-04-20 18:35:20 +00:00
glog.V(0).Infof("mount volume %d", i)
v := s.findVolume(i)
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
Id: uint32(v.Id),
Collection: v.Collection,
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
2019-04-21 06:53:37 +00:00
Version: uint32(v.Version()),
2019-04-20 18:35:20 +00:00
Ttl: v.Ttl.ToUint32(),
}
return nil
}
}
2019-05-26 08:14:42 +00:00
return fmt.Errorf("volume %d not found on disk", i)
}
2019-04-19 04:43:36 +00:00
func (s *Store) UnmountVolume(i needle.VolumeId) error {
2019-04-20 18:35:20 +00:00
v := s.findVolume(i)
if v == nil {
return nil
}
message := master_pb.VolumeShortInformationMessage{
Id: uint32(v.Id),
Collection: v.Collection,
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
2019-04-21 06:53:37 +00:00
Version: uint32(v.Version()),
2019-04-20 18:35:20 +00:00
Ttl: v.Ttl.ToUint32(),
}
2019-05-26 06:23:19 +00:00
for _, location := range s.Locations {
if err := location.UnloadVolume(i); err == nil {
2019-04-20 18:35:20 +00:00
glog.V(0).Infof("UnmountVolume %d", i)
s.DeletedVolumesChan <- message
return nil
}
}
2019-05-26 08:14:42 +00:00
return fmt.Errorf("volume %d not found on disk", i)
}
2019-04-19 04:43:36 +00:00
func (s *Store) DeleteVolume(i needle.VolumeId) error {
2019-04-20 18:35:20 +00:00
v := s.findVolume(i)
if v == nil {
return nil
}
message := master_pb.VolumeShortInformationMessage{
Id: uint32(v.Id),
Collection: v.Collection,
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
2019-04-21 06:53:37 +00:00
Version: uint32(v.Version()),
2019-04-20 18:35:20 +00:00
Ttl: v.Ttl.ToUint32(),
}
for _, location := range s.Locations {
if error := location.deleteVolumeById(i); error == nil {
2019-04-20 18:35:20 +00:00
glog.V(0).Infof("DeleteVolume %d", i)
s.DeletedVolumesChan <- message
return nil
}
}
2019-05-26 08:14:42 +00:00
return fmt.Errorf("volume %d not found on disk", i)
}
2019-04-15 06:00:37 +00:00
func (s *Store) SetVolumeSizeLimit(x uint64) {
atomic.StoreUint64(&s.volumeSizeLimit, x)
}
func (s *Store) GetVolumeSizeLimit() uint64 {
return atomic.LoadUint64(&s.volumeSizeLimit)
}