mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
refactoring
This commit is contained in:
parent
018a9a20be
commit
0d83c1b91e
|
@ -6,7 +6,6 @@ import (
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/storage"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/topology"
|
"github.com/chrislusf/seaweedfs/weed/topology"
|
||||||
"google.golang.org/grpc/peer"
|
"google.golang.org/grpc/peer"
|
||||||
)
|
)
|
||||||
|
@ -50,21 +49,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var volumeInfos []storage.VolumeInfo
|
t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
|
||||||
for _, v := range heartbeat.Volumes {
|
|
||||||
if vi, err := storage.NewVolumeInfo(v); err == nil {
|
|
||||||
volumeInfos = append(volumeInfos, vi)
|
|
||||||
} else {
|
|
||||||
glog.V(0).Infof("Fail to convert joined volume information: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
deletedVolumes := dn.UpdateVolumes(volumeInfos)
|
|
||||||
for _, v := range volumeInfos {
|
|
||||||
t.RegisterVolumeLayout(v, dn)
|
|
||||||
}
|
|
||||||
for _, v := range deletedVolumes {
|
|
||||||
t.UnRegisterVolumeLayout(v, dn)
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/sequence"
|
"github.com/chrislusf/seaweedfs/weed/sequence"
|
||||||
"github.com/chrislusf/seaweedfs/weed/storage"
|
"github.com/chrislusf/seaweedfs/weed/storage"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Topology struct {
|
type Topology struct {
|
||||||
|
@ -145,3 +146,21 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
|
||||||
t.LinkChildNode(dc)
|
t.LinkChildNode(dc)
|
||||||
return dc
|
return dc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) {
|
||||||
|
var volumeInfos []storage.VolumeInfo
|
||||||
|
for _, v := range volumes {
|
||||||
|
if vi, err := storage.NewVolumeInfo(v); err == nil {
|
||||||
|
volumeInfos = append(volumeInfos, vi)
|
||||||
|
} else {
|
||||||
|
glog.V(0).Infof("Fail to convert joined volume information: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
deletedVolumes := dn.UpdateVolumes(volumeInfos)
|
||||||
|
for _, v := range volumeInfos {
|
||||||
|
t.RegisterVolumeLayout(v, dn)
|
||||||
|
}
|
||||||
|
for _, v := range deletedVolumes {
|
||||||
|
t.UnRegisterVolumeLayout(v, dn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"github.com/chrislusf/seaweedfs/weed/sequence"
|
"github.com/chrislusf/seaweedfs/weed/sequence"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/storage"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRemoveDataCenter(t *testing.T) {
|
func TestRemoveDataCenter(t *testing.T) {
|
||||||
|
@ -44,20 +43,8 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
|
||||||
}
|
}
|
||||||
volumeMessages = append(volumeMessages, volumeMessage)
|
volumeMessages = append(volumeMessages, volumeMessage)
|
||||||
}
|
}
|
||||||
var volumeInfos []storage.VolumeInfo
|
|
||||||
for _, v := range volumeMessages {
|
|
||||||
if vi, err := storage.NewVolumeInfo(v); err == nil {
|
|
||||||
volumeInfos = append(volumeInfos, vi)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
deletedVolumes := dn.UpdateVolumes(volumeInfos)
|
topo.SyncDataNodeRegistration(volumeMessages, dn)
|
||||||
for _, v := range volumeInfos {
|
|
||||||
topo.RegisterVolumeLayout(v, dn)
|
|
||||||
}
|
|
||||||
for _, v := range deletedVolumes {
|
|
||||||
topo.UnRegisterVolumeLayout(v, dn)
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount)
|
assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount)
|
||||||
assert(t, "volumeCount", topo.volumeCount, volumeCount)
|
assert(t, "volumeCount", topo.volumeCount, volumeCount)
|
||||||
|
@ -81,20 +68,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
|
||||||
}
|
}
|
||||||
volumeMessages = append(volumeMessages, volumeMessage)
|
volumeMessages = append(volumeMessages, volumeMessage)
|
||||||
}
|
}
|
||||||
var volumeInfos []storage.VolumeInfo
|
topo.SyncDataNodeRegistration(volumeMessages, dn)
|
||||||
for _, v := range volumeMessages {
|
|
||||||
if vi, err := storage.NewVolumeInfo(v); err == nil {
|
|
||||||
volumeInfos = append(volumeInfos, vi)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
deletedVolumes := dn.UpdateVolumes(volumeInfos)
|
|
||||||
for _, v := range volumeInfos {
|
|
||||||
topo.RegisterVolumeLayout(v, dn)
|
|
||||||
}
|
|
||||||
for _, v := range deletedVolumes {
|
|
||||||
topo.UnRegisterVolumeLayout(v, dn)
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount)
|
assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount)
|
||||||
assert(t, "volumeCount", topo.volumeCount, volumeCount)
|
assert(t, "volumeCount", topo.volumeCount, volumeCount)
|
||||||
|
|
Loading…
Reference in a new issue