seaweedfs/weed/storage/store.go

437 lines
14 KiB
Go
Raw Normal View History

package storage
import (
2013-07-04 05:14:16 +00:00
"fmt"
"path/filepath"
"strings"
2019-04-15 06:00:37 +00:00
"sync/atomic"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
2019-12-23 20:48:20 +00:00
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
const (
MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
)
/*
* A VolumeServer contains one Store
*/
type Store struct {
2019-05-28 05:54:58 +00:00
MasterAddress string
grpcDialOption grpc.DialOption
volumeSizeLimit uint64 //read from the master
Ip string
Port int
PublicUrl string
Locations []*DiskLocation
dataCenter string //optional informaton, overwriting master setting if exists
rack string //optional information, overwriting master setting if exists
connected bool
NeedleMapType NeedleMapType
NewVolumesChan chan master_pb.VolumeShortInformationMessage
DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage
DeletedEcShardsChan chan master_pb.VolumeEcShardInformationMessage
}
func (s *Store) String() (str string) {
2019-04-15 06:00:37 +00:00
str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit())
return
}
func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind}
2014-03-26 20:22:27 +00:00
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
2016-04-27 03:45:35 +00:00
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i])
location.loadExistingVolumes(needleMapKind)
2014-03-26 20:22:27 +00:00
s.Locations = append(s.Locations, location)
2019-06-18 04:02:50 +00:00
stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i]))
}
2019-04-20 18:35:20 +00:00
s.NewVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3)
s.DeletedVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3)
s.NewEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3)
s.DeletedEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3)
return
}
2019-10-22 05:57:01 +00:00
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error {
2019-12-23 20:48:20 +00:00
rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
2012-09-26 08:55:56 +00:00
if e != nil {
return e
}
2019-04-19 04:43:36 +00:00
ttl, e := needle.ReadTTL(ttlString)
if e != nil {
return e
}
2019-10-22 05:57:01 +00:00
e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb)
2012-09-13 07:04:56 +00:00
return e
}
func (s *Store) DeleteCollection(collection string) (e error) {
2014-03-26 20:22:27 +00:00
for _, location := range s.Locations {
2016-04-27 03:45:35 +00:00
e = location.DeleteCollectionFromDiskLocation(collection)
if e != nil {
return
}
2019-04-20 18:35:20 +00:00
// let the heartbeat send the list of volumes, instead of sending the deleted volume ids to DeletedVolumesChan
}
return
}
2016-04-27 03:45:35 +00:00
2019-04-19 04:43:36 +00:00
func (s *Store) findVolume(vid needle.VolumeId) *Volume {
2014-03-26 20:22:27 +00:00
for _, location := range s.Locations {
if v, found := location.FindVolume(vid); found {
return v
}
}
return nil
}
func (s *Store) FindFreeLocation() (ret *DiskLocation) {
max := 0
2014-03-26 20:22:27 +00:00
for _, location := range s.Locations {
currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
currentFreeCount *= erasure_coding.DataShardsCount
currentFreeCount -= location.EcVolumesLen()
currentFreeCount /= erasure_coding.DataShardsCount
if currentFreeCount > max {
max = currentFreeCount
ret = location
}
}
return ret
}
2019-12-23 20:48:20 +00:00
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.FindFreeLocation(); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
2019-10-30 06:16:43 +00:00
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil {
location.SetVolume(vid, volume)
2019-04-21 17:14:17 +00:00
glog.V(0).Infof("add volume %d", vid)
2019-04-20 18:35:20 +00:00
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
Id: uint32(vid),
Collection: collection,
ReplicaPlacement: uint32(replicaPlacement.Byte()),
2019-04-21 06:53:37 +00:00
Version: uint32(volume.Version()),
2019-04-20 18:35:20 +00:00
Ttl: ttl.ToUint32(),
}
return nil
2013-07-20 03:38:00 +00:00
} else {
return err
2013-07-20 03:38:00 +00:00
}
}
return fmt.Errorf("No more free space left")
}
2012-11-07 09:51:43 +00:00
func (s *Store) VolumeInfos() (allStats []*VolumeInfo) {
2014-03-26 20:22:27 +00:00
for _, location := range s.Locations {
stats := collectStatsForOneLocation(location)
allStats = append(allStats, stats...)
}
sortVolumeInfos(allStats)
return allStats
}
func collectStatsForOneLocation(location *DiskLocation) (stats []*VolumeInfo) {
location.volumesLock.RLock()
defer location.volumesLock.RUnlock()
for k, v := range location.volumes {
s := collectStatForOneVolume(k, v)
stats = append(stats, s)
}
return stats
}
func collectStatForOneVolume(vid needle.VolumeId, v *Volume) (s *VolumeInfo) {
s = &VolumeInfo{
Id: vid,
Collection: v.Collection,
ReplicaPlacement: v.ReplicaPlacement,
Version: v.Version(),
ReadOnly: v.IsReadOnly(),
Ttl: v.Ttl,
CompactRevision: uint32(v.CompactionRevision),
}
s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey()
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
if v.nm == nil {
return
}
s.FileCount = v.nm.FileCount()
s.DeleteCount = v.nm.DeletedCount()
s.DeletedByteCount = v.nm.DeletedSize()
s.Size = v.nm.ContentSize()
return
}
func (s *Store) SetDataCenter(dataCenter string) {
s.dataCenter = dataCenter
}
func (s *Store) SetRack(rack string) {
s.rack = rack
}
2018-05-10 06:11:54 +00:00
func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
var volumeMessages []*master_pb.VolumeInformationMessage
maxVolumeCount := 0
var maxFileKey NeedleId
collectionVolumeSize := make(map[string]uint64)
2014-03-26 20:22:27 +00:00
for _, location := range s.Locations {
var deleteVids []needle.VolumeId
2014-03-26 20:22:27 +00:00
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
2019-12-03 04:49:50 +00:00
location.volumesLock.RLock()
2019-03-18 03:27:08 +00:00
for _, v := range location.volumes {
if maxFileKey < v.MaxFileKey() {
maxFileKey = v.MaxFileKey()
}
2019-04-15 06:00:37 +00:00
if !v.expired(s.GetVolumeSizeLimit()) {
2019-03-18 03:27:08 +00:00
volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage())
} else {
2019-01-17 01:17:19 +00:00
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
deleteVids = append(deleteVids, v.Id)
} else {
glog.V(0).Infoln("volume", v.Id, "is expired.")
}
}
2019-06-16 09:44:20 +00:00
fileSize, _, _ := v.FileStat()
collectionVolumeSize[v.Collection] += fileSize
}
2019-12-03 04:49:50 +00:00
location.volumesLock.RUnlock()
if len(deleteVids) > 0 {
2019-11-09 06:47:50 +00:00
// delete expired volumes.
2019-12-03 04:49:50 +00:00
location.volumesLock.Lock()
for _, vid := range deleteVids {
found, err := location.deleteVolumeById(vid)
if found {
if err == nil {
glog.V(0).Infof("volume %d is deleted", vid)
} else {
glog.V(0).Infof("delete volume %d: %v", vid, err)
}
}
}
2019-12-03 04:49:50 +00:00
location.volumesLock.Unlock()
}
}
for col, size := range collectionVolumeSize {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size))
}
2018-05-10 06:11:54 +00:00
return &master_pb.Heartbeat{
2017-01-10 09:01:12 +00:00
Ip: s.Ip,
Port: uint32(s.Port),
PublicUrl: s.PublicUrl,
MaxVolumeCount: uint32(maxVolumeCount),
MaxFileKey: NeedleIdToUint64(maxFileKey),
2017-01-10 09:01:12 +00:00
DataCenter: s.dataCenter,
Rack: s.rack,
Volumes: volumeMessages,
2019-06-05 20:32:33 +00:00
HasNoVolumes: len(volumeMessages) == 0,
}
}
func (s *Store) Close() {
2014-03-26 20:22:27 +00:00
for _, location := range s.Locations {
location.Close()
}
}
func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle, fsync bool) (isUnchanged bool, err error) {
if v := s.findVolume(i); v != nil {
2020-03-17 17:01:24 +00:00
if v.IsReadOnly() {
2019-05-26 08:14:42 +00:00
err = fmt.Errorf("volume %d is read only", i)
return
2015-03-10 07:20:31 +00:00
}
2020-03-15 10:11:26 +00:00
// using len(n.Data) here instead of n.Size before n.Size is populated in v.writeNeedle(n)
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(uint32(len(n.Data)), v.Version())) {
_, _, isUnchanged, err = v.writeNeedle(n, fsync)
2013-04-15 02:34:37 +00:00
} else {
2019-07-21 18:21:30 +00:00
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
2015-03-10 07:20:31 +00:00
}
return
2012-09-11 00:08:52 +00:00
}
glog.V(0).Infoln("volume", i, "not found!")
2019-04-21 17:14:17 +00:00
err = fmt.Errorf("volume %d not found on %s:%d", i, s.Ip, s.Port)
return
}
2019-07-18 06:43:48 +00:00
func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error) {
2019-07-21 18:21:30 +00:00
if v := s.findVolume(i); v != nil {
if v.noWriteOrDelete {
2019-07-21 18:21:30 +00:00
return 0, fmt.Errorf("volume %d is read only", i)
}
2019-12-23 20:48:20 +00:00
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.Version())) {
2019-07-21 18:21:30 +00:00
return v.deleteNeedle(n)
} else {
return 0, fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
}
2012-09-11 00:08:52 +00:00
}
2019-07-21 18:21:30 +00:00
return 0, fmt.Errorf("volume %d not found on %s:%d", i, s.Ip, s.Port)
}
2019-04-19 04:43:36 +00:00
func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle) (int, error) {
if v := s.findVolume(i); v != nil {
return v.readNeedle(n)
2012-09-11 00:08:52 +00:00
}
2019-05-26 08:14:42 +00:00
return 0, fmt.Errorf("volume %d not found", i)
2012-09-11 00:08:52 +00:00
}
2019-04-19 04:43:36 +00:00
func (s *Store) GetVolume(i needle.VolumeId) *Volume {
return s.findVolume(i)
2012-09-21 00:58:29 +00:00
}
2012-09-11 00:08:52 +00:00
2019-04-19 04:43:36 +00:00
func (s *Store) HasVolume(i needle.VolumeId) bool {
v := s.findVolume(i)
return v != nil
}
2019-06-27 19:18:59 +00:00
func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error {
v := s.findVolume(i)
if v == nil {
return fmt.Errorf("volume %d not found", i)
}
v.noWriteOrDelete = true
return nil
}
2019-04-19 04:43:36 +00:00
func (s *Store) MountVolume(i needle.VolumeId) error {
for _, location := range s.Locations {
if found := location.LoadVolume(i, s.NeedleMapType); found == true {
2019-04-20 18:35:20 +00:00
glog.V(0).Infof("mount volume %d", i)
v := s.findVolume(i)
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
Id: uint32(v.Id),
Collection: v.Collection,
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
2019-04-21 06:53:37 +00:00
Version: uint32(v.Version()),
2019-04-20 18:35:20 +00:00
Ttl: v.Ttl.ToUint32(),
}
return nil
}
}
2019-05-26 08:14:42 +00:00
return fmt.Errorf("volume %d not found on disk", i)
}
2019-04-19 04:43:36 +00:00
func (s *Store) UnmountVolume(i needle.VolumeId) error {
2019-04-20 18:35:20 +00:00
v := s.findVolume(i)
if v == nil {
return nil
}
message := master_pb.VolumeShortInformationMessage{
Id: uint32(v.Id),
Collection: v.Collection,
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
2019-04-21 06:53:37 +00:00
Version: uint32(v.Version()),
2019-04-20 18:35:20 +00:00
Ttl: v.Ttl.ToUint32(),
}
2019-05-26 06:23:19 +00:00
for _, location := range s.Locations {
if err := location.UnloadVolume(i); err == nil {
2019-04-20 18:35:20 +00:00
glog.V(0).Infof("UnmountVolume %d", i)
s.DeletedVolumesChan <- message
return nil
}
}
2019-05-26 08:14:42 +00:00
return fmt.Errorf("volume %d not found on disk", i)
}
2019-04-19 04:43:36 +00:00
func (s *Store) DeleteVolume(i needle.VolumeId) error {
2019-04-20 18:35:20 +00:00
v := s.findVolume(i)
if v == nil {
return fmt.Errorf("delete volume %d not found on disk", i)
2019-04-20 18:35:20 +00:00
}
message := master_pb.VolumeShortInformationMessage{
Id: uint32(v.Id),
Collection: v.Collection,
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
2019-04-21 06:53:37 +00:00
Version: uint32(v.Version()),
2019-04-20 18:35:20 +00:00
Ttl: v.Ttl.ToUint32(),
}
for _, location := range s.Locations {
if found, error := location.deleteVolumeById(i); found && error == nil {
2019-04-20 18:35:20 +00:00
glog.V(0).Infof("DeleteVolume %d", i)
s.DeletedVolumesChan <- message
return nil
}
}
2019-05-26 08:14:42 +00:00
return fmt.Errorf("volume %d not found on disk", i)
}
2019-04-15 06:00:37 +00:00
func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error {
for _, location := range s.Locations {
fileInfo, found := location.LocateVolume(i)
if !found {
continue
}
// load, modify, save
baseFileName := strings.TrimSuffix(fileInfo.Name(), filepath.Ext(fileInfo.Name()))
2020-02-27 00:52:57 +00:00
vifFile := filepath.Join(location.Directory, baseFileName+".vif")
volumeInfo, _, err := pb.MaybeLoadVolumeInfo(vifFile)
if err != nil {
return fmt.Errorf("volume %d fail to load vif", i)
}
volumeInfo.Replication = replication
err = pb.SaveVolumeInfo(vifFile, volumeInfo)
if err != nil {
return fmt.Errorf("volume %d fail to save vif", i)
}
return nil
}
return fmt.Errorf("volume %d not found on disk", i)
}
2019-04-15 06:00:37 +00:00
func (s *Store) SetVolumeSizeLimit(x uint64) {
atomic.StoreUint64(&s.volumeSizeLimit, x)
}
func (s *Store) GetVolumeSizeLimit() uint64 {
return atomic.LoadUint64(&s.volumeSizeLimit)
}
func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
volumeSizeLimit := s.GetVolumeSizeLimit()
for _, diskLocation := range s.Locations {
if diskLocation.MaxVolumeCount == 0 {
diskStatus := stats.NewDiskStatus(diskLocation.Directory)
unusedSpace := diskLocation.UnUsedSpace(volumeSizeLimit)
unclaimedSpaces := int64(diskStatus.Free) - int64(unusedSpace)
volCount := diskLocation.VolumesLen()
maxVolumeCount := volCount
if unclaimedSpaces > int64(volumeSizeLimit) {
maxVolumeCount += int(uint64(unclaimedSpaces)/volumeSizeLimit) - 1
}
diskLocation.MaxVolumeCount = maxVolumeCount
2020-04-10 06:43:09 +00:00
glog.V(0).Infof("disk %s max %d unclaimedSpace:%dMB, unused:%dMB volumeSizeLimit:%dMB",
diskLocation.Directory, maxVolumeCount, unclaimedSpaces/1024/1024, unusedSpace/1024/1024, volumeSizeLimit/1024/1024)
hasChanges = true
}
}
return
}