diff --git a/weed-fs/note/startup_process.txt b/weed-fs/note/startup_process.txt deleted file mode 100644 index 558955005..000000000 --- a/weed-fs/note/startup_process.txt +++ /dev/null @@ -1,7 +0,0 @@ -1. clients report its own server info, volumes info, -2. master collect all volumes, separated into readable volumes, writable volumes, volume2machine mapping - machines is an array of machine info - writable volumes is an array of vids - vid2machineId maps volume id to machineId, which is the index of machines array - - diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go index cc2cccdca..2d1538bbe 100644 --- a/weed-fs/src/pkg/storage/volume_info.go +++ b/weed-fs/src/pkg/storage/volume_info.go @@ -1,10 +1,50 @@ package storage -import ( - -) +import () type VolumeInfo struct { - Id VolumeId - Size int64 + Id VolumeId + Size int64 + ReplicationType ReplicationType +} +type ReplicationType int + +const ( + Copy00 = ReplicationType(00) // single copy + Copy01 = ReplicationType(01) // 2 copies, each on different racks, same data center + Copy10 = ReplicationType(10) // 2 copies, each on different data center + Copy11 = ReplicationType(11) // 3 copies, 2 on different racks and local data center, 1 on different data center + Copy20 = ReplicationType(20) // 3 copies, each on dffereint data center + LengthRelicationType = 5 +) + +func GetReplicationLevelIndex(v *VolumeInfo) int { + switch v.ReplicationType { + case Copy00: + return 0 + case Copy01: + return 1 + case Copy10: + return 2 + case Copy11: + return 3 + case Copy20: + return 4 + } + return -1 +} +func GetCopyCount(v *VolumeInfo) int { + switch v.ReplicationType { + case Copy00: + return 1 + case Copy01: + return 2 + case Copy10: + return 2 + case Copy11: + return 3 + case Copy20: + return 3 + } + return 0 } diff --git a/weed-fs/src/pkg/topology/data_node.go b/weed-fs/src/pkg/topology/data_node.go index 8085bba29..ea74dd8e0 100644 --- a/weed-fs/src/pkg/topology/data_node.go +++ b/weed-fs/src/pkg/topology/data_node.go @@ -8,6 +8,10 @@ import ( type DataNode struct { NodeImpl volumes map[storage.VolumeId]*storage.VolumeInfo + ip string + port int + publicUrl string + lastSeen int64 // unix time in seconds } func NewDataNode(id string) *DataNode { @@ -17,12 +21,21 @@ func NewDataNode(id string) *DataNode { s.volumes = make(map[storage.VolumeId]*storage.VolumeInfo) return s } -func (s *DataNode) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId { - s.AddVolume(&storage.VolumeInfo{Id: vid, Size: 32 * 1024 * 1024 * 1024}) +func (dn *DataNode) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId { + dn.AddVolume(&storage.VolumeInfo{Id: vid, Size: 32 * 1024 * 1024 * 1024}) return vid } -func (s *DataNode) AddVolume(v *storage.VolumeInfo) { - s.volumes[v.Id] = v - s.UpAdjustActiveVolumeCountDelta(1) - s.UpAdjustMaxVolumeId(v.Id) +func (dn *DataNode) AddVolume(v *storage.VolumeInfo) { + dn.volumes[v.Id] = v + dn.UpAdjustActiveVolumeCountDelta(1) + dn.UpAdjustMaxVolumeId(v.Id) + dn.GetTopology().RegisterVolume(v,dn) +} +func (dn *DataNode) GetTopology() *Topology { + p := dn.parent + for p.Parent()!=nil{ + p = p.Parent() + } + t := p.(*Topology) + return t } diff --git a/weed-fs/src/pkg/topology/node.go b/weed-fs/src/pkg/topology/node.go index 9815727ff..fb610bd73 100644 --- a/weed-fs/src/pkg/topology/node.go +++ b/weed-fs/src/pkg/topology/node.go @@ -20,10 +20,11 @@ type Node interface { setParent(Node) LinkChildNode(node Node) UnlinkChildNode(nodeId NodeId) + CollectWritableVolumes(freshThreshHold int64, volumeSizeLimit uint64) []storage.VolumeId IsDataNode() bool Children() map[NodeId]Node - Parent() Node + Parent() Node } type NodeImpl struct { id NodeId @@ -65,7 +66,7 @@ func (n *NodeImpl) Children() map[NodeId]Node { return n.children } func (n *NodeImpl) Parent() Node { - return n.parent + return n.parent } func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) { ret := false @@ -146,3 +147,26 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount) } } + +func (n *NodeImpl) CollectWritableVolumes(freshThreshHold int64, volumeSizeLimit uint64) []storage.VolumeId { + var ret []storage.VolumeId + if n.IsRack() { + for _, c := range n.Children() { + dn := c.(*DataNode) //can not cast n to DataNode + if dn.lastSeen > freshThreshHold { + continue + } + for _, v := range dn.volumes { + if uint64(v.Size) < volumeSizeLimit { + ret = append(ret, v.Id) + } + } + } + } else { + for _, c := range n.Children() { + ret = append(ret, c.CollectWritableVolumes(freshThreshHold, volumeSizeLimit)...) + } + } + + return ret +} diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go index 9e5f7bab4..2e617c927 100644 --- a/weed-fs/src/pkg/topology/topology.go +++ b/weed-fs/src/pkg/topology/topology.go @@ -3,37 +3,59 @@ package topology import ( _ "fmt" "math/rand" + "pkg/sequence" "pkg/storage" ) type Topology struct { NodeImpl + + //transient vid~servers mapping for each replication type + replicaType2VolumeLayout []*VolumeLayout + + pulse int64 + + volumeSizeLimit uint64 + + sequence sequence.Sequencer } -func NewTopology(id string) *Topology { +func NewTopology(id string, dirname string, filename string, volumeSizeLimit uint64, pulse int) *Topology { t := &Topology{} t.id = NodeId(id) t.nodeType = "Topology" t.children = make(map[NodeId]Node) + t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType) + t.pulse = int64(pulse) + t.volumeSizeLimit = volumeSizeLimit + t.sequence = sequence.NewSequencer(dirname, filename) return t } func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, storage.VolumeId) { vid := t.NextVolumeId() ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid) - return ret, node, vid + return ret, node, vid } func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, storage.VolumeId) { - freeSpace := t.FreeSpace() - for _, node := range except { - freeSpace -= node.FreeSpace() - } - vid := t.NextVolumeId() - ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid) - return ret, node, vid + freeSpace := t.FreeSpace() + for _, node := range except { + freeSpace -= node.FreeSpace() + } + vid := t.NextVolumeId() + ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid) + return ret, node, vid } func (t *Topology) NextVolumeId() storage.VolumeId { vid := t.GetMaxVolumeId() return vid.Next() } + +func (t *Topology) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { + replicationTypeIndex := storage.GetReplicationLevelIndex(v) + if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { + t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(t.volumeSizeLimit, t.pulse) + } + t.replicaType2VolumeLayout[replicationTypeIndex].RegisterVolume(v, dn) +} diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go new file mode 100644 index 000000000..f11ea430a --- /dev/null +++ b/weed-fs/src/pkg/topology/volume_layout.go @@ -0,0 +1,48 @@ +package topology + +import ( + "errors" + "fmt" + "math/rand" + "pkg/storage" +) + +type VolumeLayout struct { + vid2location map[storage.VolumeId]*DataNodeLocationList + writables []storage.VolumeId // transient array of writable volume id + pulse int64 + volumeSizeLimit uint64 +} + +func NewVolumeLayout(volumeSizeLimit uint64, pulse int64) *VolumeLayout { + return &VolumeLayout{ + vid2location: make(map[storage.VolumeId]*DataNodeLocationList), + writables: *new([]storage.VolumeId), + pulse: pulse, + volumeSizeLimit: volumeSizeLimit, + } +} + +func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { + if _, ok := vl.vid2location[v.Id]; !ok { + vl.vid2location[v.Id] = NewDataNodeLocationList() + } + vl.vid2location[v.Id].Add(dn) + if len(vl.vid2location[v.Id].list) >= storage.GetCopyCount(v) { + vl.writables = append(vl.writables,v.Id) + } +} + +func (vl *VolumeLayout) PickForWrite(count int) (int, *DataNodeLocationList, error) { + len_writers := len(vl.writables) + if len_writers <= 0 { + fmt.Println("No more writable volumes!") + return 0, nil, errors.New("No more writable volumes!") + } + vid := vl.writables[rand.Intn(len_writers)] + locationList := vl.vid2location[vid] + if locationList != nil { + return count, locationList, nil + } + return 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") +} diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go new file mode 100644 index 000000000..c61ee119b --- /dev/null +++ b/weed-fs/src/pkg/topology/volume_location.go @@ -0,0 +1,21 @@ +package topology + +import ( +) + +type DataNodeLocationList struct { + list []*DataNode +} + +func NewDataNodeLocationList() *DataNodeLocationList { + return &DataNodeLocationList{} +} + +func (dnll *DataNodeLocationList) Add(loc *DataNode){ + for _, dnl := range dnll.list { + if loc.ip == dnl.ip && loc.port == dnl.port { + break + } + } + dnll.list = append(dnll.list, loc) +}