prepare to register ec shard info in master

This commit is contained in:
Chris Lu 2019-05-23 00:42:28 -07:00
parent e913ee380a
commit 4659d80035
5 changed files with 65 additions and 5 deletions

View file

@ -87,7 +87,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
} }
// update master internal volume layouts // update master internal volume layouts
t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn) t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
} else if len(heartbeat.Volumes) > 0 { }
if len(heartbeat.Volumes) > 0 {
// process heartbeat.Volumes // process heartbeat.Volumes
newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn) newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
@ -99,7 +101,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url()) glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url())
message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
} }
} else if len(heartbeat.EcShards) > 0 { }
if len(heartbeat.EcShards) > 0 {
glog.V(0).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) glog.V(0).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
} }

View file

@ -12,6 +12,13 @@ type EcVolumeInfo struct {
shardIds uint16 // use bits to indicate the shard id shardIds uint16 // use bits to indicate the shard id
} }
func NewEcVolumeInfo(collection string, vid needle.VolumeId) *EcVolumeInfo {
return &EcVolumeInfo{
Collection: collection,
VolumeId: vid,
}
}
func (ecInfo *EcVolumeInfo) AddShardId(id ShardId) { func (ecInfo *EcVolumeInfo) AddShardId(id ShardId) {
ecInfo.shardIds |= (1 << id) ecInfo.shardIds |= (1 << id)
} }

View file

@ -21,7 +21,7 @@ type DataNode struct {
Port int Port int
PublicUrl string PublicUrl string
LastSeen int64 // unix time in seconds LastSeen int64 // unix time in seconds
ecShards map[needle.VolumeId]erasure_coding.EcVolumeInfo ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo
ecShardsLock sync.RWMutex ecShardsLock sync.RWMutex
} }
@ -30,7 +30,7 @@ func NewDataNode(id string) *DataNode {
s.id = NodeId(id) s.id = NodeId(id)
s.nodeType = "DataNode" s.nodeType = "DataNode"
s.volumes = make(map[needle.VolumeId]storage.VolumeInfo) s.volumes = make(map[needle.VolumeId]storage.VolumeInfo)
s.ecShards = make(map[needle.VolumeId]erasure_coding.EcVolumeInfo) s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
s.NodeImpl.value = s s.NodeImpl.value = s
return s return s
} }

View file

@ -4,7 +4,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
) )
func (dn *DataNode) GetEcShards() (ret []erasure_coding.EcVolumeInfo) { func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
dn.RLock() dn.RLock()
for _, ecVolumeInfo := range dn.ecShards { for _, ecVolumeInfo := range dn.ecShards {
ret = append(ret, ecVolumeInfo) ret = append(ret, ecVolumeInfo)
@ -12,3 +12,9 @@ func (dn *DataNode) GetEcShards() (ret []erasure_coding.EcVolumeInfo) {
dn.RUnlock() dn.RUnlock()
return ret return ret
} }
func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
dn.ecShardsLock.Lock()
dn.ecShardsLock.Unlock()
return
}

View file

@ -0,0 +1,43 @@
package topology
import (
"sort"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
// convert into in memory struct storage.VolumeInfo
var shards []*erasure_coding.EcVolumeInfo
sort.Slice(shardInfos, func(i, j int) bool {
return shardInfos[i].Id < shardInfos[j].Id
})
var prevVolumeId uint32
var ecVolumeInfo *erasure_coding.EcVolumeInfo
for _, shardInfo := range shardInfos {
if shardInfo.Id != prevVolumeId {
ecVolumeInfo = erasure_coding.NewEcVolumeInfo(shardInfo.Collection, needle.VolumeId(shardInfo.Id))
shards = append(shards, ecVolumeInfo)
}
ecVolumeInfo.AddShardId(erasure_coding.ShardId(shardInfo.EcIndex))
}
// find out the delta volumes
newShards, deletedShards = dn.UpdateEcShards(shards)
for _, v := range newShards {
t.RegisterEcShards(v, dn)
}
for _, v := range deletedShards {
t.UnRegisterEcShards(v, dn)
}
return
}
func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
}
func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
glog.Infof("removing ec shard info:%+v", ecShardInfos)
}