diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go index 9ed9a0c90..8f4a347d7 100644 --- a/weed/server/raft_server_handlers.go +++ b/weed/server/raft_server_handlers.go @@ -1,9 +1,12 @@ package weed_server import ( + "github.com/cenkalti/backoff/v4" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "net/http" + "time" ) type ClusterStatusResult struct { @@ -27,12 +30,24 @@ func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) { } func (s *RaftServer) HealthzHandler(w http.ResponseWriter, r *http.Request) { - _, err := s.topo.Leader() + leader, err := s.topo.Leader() if err != nil { w.WriteHeader(http.StatusServiceUnavailable) - } else { - w.WriteHeader(http.StatusOK) + return } + if s.serverAddr == leader { + expBackoff := backoff.NewExponentialBackOff() + expBackoff.InitialInterval = 20 * time.Millisecond + expBackoff.MaxInterval = 1 * time.Second + expBackoff.MaxElapsedTime = 5 * time.Second + isLocked, err := backoff.RetryWithData(s.topo.IsChildLocked, expBackoff) + glog.Errorf("HealthzHandler: %+v", err) + if isLocked { + w.WriteHeader(http.StatusLocked) + return + } + } + w.WriteHeader(http.StatusOK) } func (s *RaftServer) StatsRaftHandler(w http.ResponseWriter, r *http.Request) { diff --git a/weed/topology/node.go b/weed/topology/node.go index 1c7e3e468..851e71385 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -34,11 +34,13 @@ type Node interface { IsDataNode() bool IsRack() bool IsDataCenter() bool + IsLocked() bool Children() []Node Parent() Node GetValue() interface{} //get reference to the topology,dc,rack,datanode } + type NodeImpl struct { diskUsages *DiskUsages id NodeId @@ -122,24 +124,37 @@ func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption func (n *NodeImpl) IsDataNode() bool { return n.nodeType == "DataNode" } + func (n *NodeImpl) IsRack() bool { return n.nodeType == "Rack" } + func (n *NodeImpl) IsDataCenter() bool { return n.nodeType == "DataCenter" } + +func (n *NodeImpl) IsLocked() (isTryLock bool) { + if isTryLock = n.TryRLock(); isTryLock { + n.RUnlock() + } + return !isTryLock +} + func (n *NodeImpl) String() string { if n.parent != nil { return n.parent.String() + ":" + string(n.id) } return string(n.id) } + func (n *NodeImpl) Id() NodeId { return n.id } + func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts { return n.diskUsages.getOrCreateDisk(diskType) } + func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 { t := n.getOrCreateDisk(option.DiskType) freeVolumeSlotCount := atomic.LoadInt64(&t.maxVolumeCount) + atomic.LoadInt64(&t.remoteVolumeCount) - atomic.LoadInt64(&t.volumeCount) @@ -152,6 +167,7 @@ func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 { func (n *NodeImpl) SetParent(node Node) { n.parent = node } + func (n *NodeImpl) Children() (ret []Node) { n.RLock() defer n.RUnlock() @@ -160,12 +176,15 @@ func (n *NodeImpl) Children() (ret []Node) { } return ret } + func (n *NodeImpl) Parent() Node { return n.parent } + func (n *NodeImpl) GetValue() interface{} { return n.value } + func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) { n.RLock() defer n.RUnlock() diff --git a/weed/topology/topology.go b/weed/topology/topology.go index fee4d6a03..017208b81 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -76,6 +76,28 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls return t } +func (t *Topology) IsChildLocked() (bool, error) { + if t.IsLocked() { + return true, errors.New("topology is locked") + } + for _, dcNode := range t.Children() { + if dcNode.IsLocked() { + return true, fmt.Errorf("topology child %s is locked", dcNode.String()) + } + for _, rackNode := range dcNode.Children() { + if rackNode.IsLocked() { + return true, fmt.Errorf("dc %s child %s is locked", dcNode.String(), rackNode.String()) + } + for _, dataNode := range rackNode.Children() { + if dataNode.IsLocked() { + return true, fmt.Errorf("rack %s child %s is locked", rackNode.String(), dataNode.Id()) + } + } + } + } + return false, nil +} + func (t *Topology) IsLeader() bool { t.RaftServerAccessLock.RLock() defer t.RaftServerAccessLock.RUnlock()