Added treat_replication_as_minimums master toml option

This commit is contained in:
James Hartig 2020-04-01 15:18:40 -04:00
parent 9dc0b1df8f
commit eae3f27c80
7 changed files with 39 additions and 13 deletions

View file

@ -394,5 +394,13 @@ copy_2 = 6 # create 2 x 6 = 12 actual volumes
copy_3 = 3 # create 3 x 3 = 9 actual volumes
copy_other = 1 # create n x 1 = n actual volumes
# configuration flags for replication
[master.replication]
# any replication counts should be considered minimums. If you specify 010 and
# have 3 different racks, that's still considered writable. Writes will still
# try to replicate to all available volumes. You should only use this option
# if you are doing your own replication or periodic sync of volumes.
treat_replication_as_minimums = false
`
)

View file

@ -77,6 +77,9 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
v.SetDefault("master.replication.treat_replication_as_minimums", false)
replicationAsMin := v.GetBool("master.replication.treat_replication_as_minimums")
var preallocateSize int64
if option.VolumePreallocate {
preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
@ -96,7 +99,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
if nil == seq {
glog.Fatalf("create sequencer failed.")
}
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds, replicationAsMin)
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")

View file

@ -11,11 +11,16 @@ import (
type Collection struct {
Name string
volumeSizeLimit uint64
replicationAsMin bool
storageType2VolumeLayout *util.ConcurrentReadMap
}
func NewCollection(name string, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
func NewCollection(name string, volumeSizeLimit uint64, replicationAsMin bool) *Collection {
c := &Collection{
Name: name,
volumeSizeLimit: volumeSizeLimit,
replicationAsMin: replicationAsMin,
}
c.storageType2VolumeLayout = util.NewConcurrentReadMap()
return c
}
@ -30,7 +35,7 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, t
keyString += ttl.String()
}
vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
return NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
return NewVolumeLayout(rp, ttl, c.volumeSizeLimit, c.replicationAsMin)
})
return vl.(*VolumeLayout)
}

View file

@ -27,7 +27,8 @@ type Topology struct {
pulse int64
volumeSizeLimit uint64
volumeSizeLimit uint64
replicationAsMin bool
Sequence sequence.Sequencer
@ -38,7 +39,7 @@ type Topology struct {
RaftServer raft.Server
}
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) *Topology {
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
t := &Topology{}
t.id = NodeId(id)
t.nodeType = "Topology"
@ -48,6 +49,7 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations)
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
t.replicationAsMin = replicationAsMin
t.Sequence = seq
@ -138,7 +140,7 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string,
func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout {
return t.collectionMap.Get(collectionName, func() interface{} {
return NewCollection(collectionName, t.volumeSizeLimit)
return NewCollection(collectionName, t.volumeSizeLimit, t.replicationAsMin)
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
}

View file

@ -23,7 +23,7 @@ func TestRemoveDataCenter(t *testing.T) {
}
func TestHandlingVolumeServerHeartbeat(t *testing.T) {
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
@ -140,7 +140,7 @@ func assert(t *testing.T, message string, actual, expected int) {
func TestAddRemoveVolume(t *testing.T) {
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")

View file

@ -81,7 +81,7 @@ func setup(topologyLayout string) *Topology {
fmt.Println("data:", data)
//need to connect all nodes first before server adding volumes
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
mTopology := data.(map[string]interface{})
for dcKey, dcValue := range mTopology {
dc := NewDataCenter(dcKey)

View file

@ -22,6 +22,7 @@ type VolumeLayout struct {
readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes
oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes
volumeSizeLimit uint64
replicationAsMin bool
accessLock sync.RWMutex
}
@ -31,7 +32,7 @@ type VolumeLayoutStats struct {
FileCount uint64
}
func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout {
func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout {
return &VolumeLayout{
rp: rp,
ttl: ttl,
@ -40,6 +41,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSi
readonlyVolumes: make(map[needle.VolumeId]bool),
oversizedVolumes: make(map[needle.VolumeId]bool),
volumeSizeLimit: volumeSizeLimit,
replicationAsMin: replicationAsMin,
}
}
@ -107,7 +109,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
}
func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) {
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
if vl.enoughCopies(v.Id) && vl.isWritable(v) {
if _, ok := vl.oversizedVolumes[v.Id]; !ok {
vl.setVolumeWritable(v.Id)
}
@ -272,12 +274,18 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, is
return false
}
if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() {
if vl.enoughCopies(vid) {
return vl.setVolumeWritable(vid)
}
return false
}
func (vl *VolumeLayout) enoughCopies(vid needle.VolumeId) bool {
locations := vl.vid2location[vid].Length()
desired := vl.rp.GetCopyCount()
return locations == desired || (vl.replicationAsMin && locations > desired)
}
func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()