mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge pull request #1255 from levenlabs/ignore
Added treat_replication_as_minimums master toml option
This commit is contained in:
commit
c446438ca5
|
@ -394,5 +394,13 @@ copy_2 = 6 # create 2 x 6 = 12 actual volumes
|
|||
copy_3 = 3 # create 3 x 3 = 9 actual volumes
|
||||
copy_other = 1 # create n x 1 = n actual volumes
|
||||
|
||||
# configuration flags for replication
|
||||
[master.replication]
|
||||
# any replication counts should be considered minimums. If you specify 010 and
|
||||
# have 3 different racks, that's still considered writable. Writes will still
|
||||
# try to replicate to all available volumes. You should only use this option
|
||||
# if you are doing your own replication or periodic sync of volumes.
|
||||
treat_replication_as_minimums = false
|
||||
|
||||
`
|
||||
)
|
||||
|
|
|
@ -77,6 +77,9 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
|
|||
v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
|
||||
readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
|
||||
|
||||
v.SetDefault("master.replication.treat_replication_as_minimums", false)
|
||||
replicationAsMin := v.GetBool("master.replication.treat_replication_as_minimums")
|
||||
|
||||
var preallocateSize int64
|
||||
if option.VolumePreallocate {
|
||||
preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
|
||||
|
@ -96,7 +99,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
|
|||
if nil == seq {
|
||||
glog.Fatalf("create sequencer failed.")
|
||||
}
|
||||
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
|
||||
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds, replicationAsMin)
|
||||
ms.vg = topology.NewDefaultVolumeGrowth()
|
||||
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
|
||||
|
||||
|
|
|
@ -11,11 +11,16 @@ import (
|
|||
type Collection struct {
|
||||
Name string
|
||||
volumeSizeLimit uint64
|
||||
replicationAsMin bool
|
||||
storageType2VolumeLayout *util.ConcurrentReadMap
|
||||
}
|
||||
|
||||
func NewCollection(name string, volumeSizeLimit uint64) *Collection {
|
||||
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
|
||||
func NewCollection(name string, volumeSizeLimit uint64, replicationAsMin bool) *Collection {
|
||||
c := &Collection{
|
||||
Name: name,
|
||||
volumeSizeLimit: volumeSizeLimit,
|
||||
replicationAsMin: replicationAsMin,
|
||||
}
|
||||
c.storageType2VolumeLayout = util.NewConcurrentReadMap()
|
||||
return c
|
||||
}
|
||||
|
@ -30,7 +35,7 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, t
|
|||
keyString += ttl.String()
|
||||
}
|
||||
vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
|
||||
return NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
|
||||
return NewVolumeLayout(rp, ttl, c.volumeSizeLimit, c.replicationAsMin)
|
||||
})
|
||||
return vl.(*VolumeLayout)
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ type Topology struct {
|
|||
pulse int64
|
||||
|
||||
volumeSizeLimit uint64
|
||||
replicationAsMin bool
|
||||
|
||||
Sequence sequence.Sequencer
|
||||
|
||||
|
@ -38,7 +39,7 @@ type Topology struct {
|
|||
RaftServer raft.Server
|
||||
}
|
||||
|
||||
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) *Topology {
|
||||
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
|
||||
t := &Topology{}
|
||||
t.id = NodeId(id)
|
||||
t.nodeType = "Topology"
|
||||
|
@ -48,6 +49,7 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
|
|||
t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations)
|
||||
t.pulse = int64(pulse)
|
||||
t.volumeSizeLimit = volumeSizeLimit
|
||||
t.replicationAsMin = replicationAsMin
|
||||
|
||||
t.Sequence = seq
|
||||
|
||||
|
@ -138,7 +140,7 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string,
|
|||
|
||||
func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout {
|
||||
return t.collectionMap.Get(collectionName, func() interface{} {
|
||||
return NewCollection(collectionName, t.volumeSizeLimit)
|
||||
return NewCollection(collectionName, t.volumeSizeLimit, t.replicationAsMin)
|
||||
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ func TestRemoveDataCenter(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestHandlingVolumeServerHeartbeat(t *testing.T) {
|
||||
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
|
||||
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
|
||||
|
||||
dc := topo.GetOrCreateDataCenter("dc1")
|
||||
rack := dc.GetOrCreateRack("rack1")
|
||||
|
@ -140,7 +140,7 @@ func assert(t *testing.T, message string, actual, expected int) {
|
|||
|
||||
func TestAddRemoveVolume(t *testing.T) {
|
||||
|
||||
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
|
||||
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
|
||||
|
||||
dc := topo.GetOrCreateDataCenter("dc1")
|
||||
rack := dc.GetOrCreateRack("rack1")
|
||||
|
|
|
@ -81,7 +81,7 @@ func setup(topologyLayout string) *Topology {
|
|||
fmt.Println("data:", data)
|
||||
|
||||
//need to connect all nodes first before server adding volumes
|
||||
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
|
||||
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
|
||||
mTopology := data.(map[string]interface{})
|
||||
for dcKey, dcValue := range mTopology {
|
||||
dc := NewDataCenter(dcKey)
|
||||
|
|
|
@ -22,6 +22,7 @@ type VolumeLayout struct {
|
|||
readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes
|
||||
oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes
|
||||
volumeSizeLimit uint64
|
||||
replicationAsMin bool
|
||||
accessLock sync.RWMutex
|
||||
}
|
||||
|
||||
|
@ -31,7 +32,7 @@ type VolumeLayoutStats struct {
|
|||
FileCount uint64
|
||||
}
|
||||
|
||||
func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout {
|
||||
func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout {
|
||||
return &VolumeLayout{
|
||||
rp: rp,
|
||||
ttl: ttl,
|
||||
|
@ -40,6 +41,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSi
|
|||
readonlyVolumes: make(map[needle.VolumeId]bool),
|
||||
oversizedVolumes: make(map[needle.VolumeId]bool),
|
||||
volumeSizeLimit: volumeSizeLimit,
|
||||
replicationAsMin: replicationAsMin,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -107,7 +109,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
|
|||
}
|
||||
|
||||
func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) {
|
||||
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
|
||||
if vl.enoughCopies(v.Id) && vl.isWritable(v) {
|
||||
if _, ok := vl.oversizedVolumes[v.Id]; !ok {
|
||||
vl.setVolumeWritable(v.Id)
|
||||
}
|
||||
|
@ -272,12 +274,18 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, is
|
|||
return false
|
||||
}
|
||||
|
||||
if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() {
|
||||
if vl.enoughCopies(vid) {
|
||||
return vl.setVolumeWritable(vid)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (vl *VolumeLayout) enoughCopies(vid needle.VolumeId) bool {
|
||||
locations := vl.vid2location[vid].Length()
|
||||
desired := vl.rp.GetCopyCount()
|
||||
return locations == desired || (vl.replicationAsMin && locations > desired)
|
||||
}
|
||||
|
||||
func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool {
|
||||
vl.accessLock.Lock()
|
||||
defer vl.accessLock.Unlock()
|
||||
|
|
Loading…
Reference in a new issue