diff --git a/weed/election/cluster.go b/weed/election/cluster.go new file mode 100644 index 000000000..7c7c1089b --- /dev/null +++ b/weed/election/cluster.go @@ -0,0 +1,155 @@ +package election + +import ( + "github.com/chrislusf/seaweedfs/weed/pb" + "math" + "sync" + "time" +) + +type ClusterNode struct { + Address pb.ServerAddress + Version string + counter int + createdTs time.Time +} + +type Leaders struct { + leaders [3]pb.ServerAddress +} + +type Cluster struct { + nodes map[pb.ServerAddress]*ClusterNode + nodesLock sync.RWMutex + leaders *Leaders +} + +func NewCluster() *Cluster { + return &Cluster{ + nodes: make(map[pb.ServerAddress]*ClusterNode), + leaders: &Leaders{}, + } +} + +func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) { + switch nodeType { + case "filer": + cluster.nodesLock.Lock() + defer cluster.nodesLock.Unlock() + if existingNode, found := cluster.nodes[address]; found { + existingNode.counter++ + return + } + cluster.nodes[address] = &ClusterNode{ + Address: address, + Version: version, + counter: 1, + createdTs: time.Now(), + } + cluster.ensureLeader(true, address) + case "master": + } +} + +func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) { + switch nodeType { + case "filer": + cluster.nodesLock.Lock() + defer cluster.nodesLock.Unlock() + if existingNode, found := cluster.nodes[address]; !found { + return + } else { + existingNode.counter-- + if existingNode.counter <= 0 { + delete(cluster.nodes, address) + cluster.ensureLeader(false, address) + } + } + case "master": + } +} + +func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) { + switch nodeType { + case "filer": + cluster.nodesLock.RLock() + defer cluster.nodesLock.RUnlock() + for _, node := range cluster.nodes { + nodes = append(nodes, node) + } + case "master": + } + return +} + +func (cluster *Cluster) ensureLeader(isAdd bool, address pb.ServerAddress) { + if isAdd { + if cluster.leaders.addLeaderIfVacant(address) { + // has added the address as one leader + } + } else { + if cluster.leaders.removeLeaderIfExists(address) { + // pick the freshest one, since it is less likely to go away + var shortestDuration int64 = math.MaxInt64 + now := time.Now() + var candidateAddress pb.ServerAddress + for _, node := range cluster.nodes { + if cluster.leaders.isOneLeader(node.Address) { + continue + } + duration := now.Sub(node.createdTs).Nanoseconds() + if duration < shortestDuration { + shortestDuration = duration + candidateAddress = node.Address + } + } + if candidateAddress != "" { + cluster.leaders.addLeaderIfVacant(candidateAddress) + } + // removed the leader, and maybe added a new leader + } + } +} + +func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) { + if leaders.isOneLeader(address) { + return + } + for i := 0; i < len(leaders.leaders); i++ { + if leaders.leaders[i] == "" { + leaders.leaders[i] = address + hasChanged = true + return + } + } + return +} +func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) { + if !leaders.isOneLeader(address) { + return + } + for i := 0; i < len(leaders.leaders); i++ { + if leaders.leaders[i] == address { + leaders.leaders[i] = "" + hasChanged = true + return + } + } + return +} +func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool { + for i := 0; i < len(leaders.leaders); i++ { + if leaders.leaders[i] == address { + return true + } + } + return false +} +func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) { + for i := 0; i < len(leaders.leaders); i++ { + if leaders.leaders[i] != "" { + addresses = append(addresses, leaders.leaders[i]) + } + } + return +} diff --git a/weed/election/cluster_test.go b/weed/election/cluster_test.go new file mode 100644 index 000000000..624ff27d6 --- /dev/null +++ b/weed/election/cluster_test.go @@ -0,0 +1,47 @@ +package election + +import ( + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestClusterAddRemoveNodes(t *testing.T) { + c := NewCluster() + + c.AddClusterNode("filer", pb.ServerAddress("111:1"), "23.45") + c.AddClusterNode("filer", pb.ServerAddress("111:2"), "23.45") + assert.Equal(t, []pb.ServerAddress{ + pb.ServerAddress("111:1"), + pb.ServerAddress("111:2"), + }, c.leaders.GetLeaders()) + + c.AddClusterNode("filer", pb.ServerAddress("111:3"), "23.45") + c.AddClusterNode("filer", pb.ServerAddress("111:4"), "23.45") + assert.Equal(t, []pb.ServerAddress{ + pb.ServerAddress("111:1"), + pb.ServerAddress("111:2"), + pb.ServerAddress("111:3"), + }, c.leaders.GetLeaders()) + + c.AddClusterNode("filer", pb.ServerAddress("111:5"), "23.45") + c.AddClusterNode("filer", pb.ServerAddress("111:6"), "23.45") + c.RemoveClusterNode("filer", pb.ServerAddress("111:4")) + assert.Equal(t, []pb.ServerAddress{ + pb.ServerAddress("111:1"), + pb.ServerAddress("111:2"), + pb.ServerAddress("111:3"), + }, c.leaders.GetLeaders()) + + // remove oldest + c.RemoveClusterNode("filer", pb.ServerAddress("111:1")) + assert.Equal(t, []pb.ServerAddress{ + pb.ServerAddress("111:6"), + pb.ServerAddress("111:2"), + pb.ServerAddress("111:3"), + }, c.leaders.GetLeaders()) + + // remove oldest + c.RemoveClusterNode("filer", pb.ServerAddress("111:1")) + +} diff --git a/weed/server/master_grpc_server_cluster.go b/weed/server/master_grpc_server_cluster.go index 68801a3ba..8e80cade3 100644 --- a/weed/server/master_grpc_server_cluster.go +++ b/weed/server/master_grpc_server_cluster.go @@ -12,8 +12,8 @@ func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.Lis for _, node := range clusterNodes { resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{ - Address: string(node.address), - Version: node.version, + Address: string(node.Address), + Version: node.Version, }) } return resp, nil diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 3f1f1c082..af2f7ddd5 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -2,6 +2,7 @@ package weed_server import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/election" "github.com/chrislusf/seaweedfs/weed/pb" "net/http" "net/http/httputil" @@ -68,7 +69,7 @@ type MasterServer struct { adminLocks *AdminLocks - Cluster *Cluster + Cluster *election.Cluster } func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddress) *MasterServer { @@ -105,7 +106,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddre grpcDialOption: grpcDialOption, MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Master, "", peers), adminLocks: NewAdminLocks(), - Cluster: NewCluster(), + Cluster: election.NewCluster(), } ms.boundedLeaderChan = make(chan int, 16) diff --git a/weed/server/master_server_cluster.go b/weed/server/master_server_cluster.go deleted file mode 100644 index fa5280ccd..000000000 --- a/weed/server/master_server_cluster.go +++ /dev/null @@ -1,77 +0,0 @@ -package weed_server - -import ( - "github.com/chrislusf/seaweedfs/weed/pb" - "sync" -) - -type NodeType int - -const ( - filerNodeType NodeType = iota -) - -type ClusterNode struct { - address pb.ServerAddress - version string - counter int -} - -type Cluster struct { - filers map[pb.ServerAddress]*ClusterNode - filersLock sync.RWMutex -} - -func NewCluster() *Cluster { - return &Cluster{ - filers: make(map[pb.ServerAddress]*ClusterNode), - } -} - -func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) { - switch nodeType { - case "filer": - cluster.filersLock.Lock() - defer cluster.filersLock.Unlock() - if existingNode, found := cluster.filers[address]; found { - existingNode.counter++ - return - } - cluster.filers[address] = &ClusterNode{ - address: address, - version: version, - counter: 1, - } - case "master": - } -} - -func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) { - switch nodeType { - case "filer": - cluster.filersLock.Lock() - defer cluster.filersLock.Unlock() - if existingNode, found := cluster.filers[address]; !found { - return - } else { - existingNode.counter-- - if existingNode.counter <= 0 { - delete(cluster.filers, address) - } - } - case "master": - } -} - -func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) { - switch nodeType { - case "filer": - cluster.filersLock.RLock() - defer cluster.filersLock.RUnlock() - for _, node := range cluster.filers { - nodes = append(nodes, node) - } - case "master": - } - return -}