Merge pull request #832 from PapaYofen/fix-830

Fix https://github.com/chrislusf/seaweedfs/issues/830
This commit is contained in:
Chris Lu 2019-01-12 17:26:51 -08:00 committed by GitHub
commit 2ff95ead57
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 0 deletions

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"net" "net"
"strings" "strings"
"time"
"github.com/chrislusf/raft" "github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
@ -172,6 +173,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
} }
}() }()
ticker := time.NewTicker(5 * time.Second)
for { for {
select { select {
case message := <-messageChan: case message := <-messageChan:
@ -179,6 +181,10 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
glog.V(0).Infof("=> client %v: %+v", clientName, message) glog.V(0).Infof("=> client %v: %+v", clientName, message)
return err return err
} }
case <-ticker.C:
if !ms.Topo.IsLeader() {
return raft.NotLeaderError
}
case <-stopChan: case <-stopChan:
return nil return nil
} }

View file

@ -4,12 +4,18 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/topology"
) )
func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupVolumeRequest) (*master_pb.LookupVolumeResponse, error) { func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupVolumeRequest) (*master_pb.LookupVolumeResponse, error) {
if !ms.Topo.IsLeader() {
return nil, raft.NotLeaderError
}
resp := &master_pb.LookupVolumeResponse{} resp := &master_pb.LookupVolumeResponse{}
volumeLocations := ms.lookupVolumeId(req.VolumeIds, req.Collection) volumeLocations := ms.lookupVolumeId(req.VolumeIds, req.Collection)
@ -33,6 +39,10 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV
func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest) (*master_pb.AssignResponse, error) { func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest) (*master_pb.AssignResponse, error) {
if !ms.Topo.IsLeader() {
return nil, raft.NotLeaderError
}
if req.Count == 0 { if req.Count == 0 {
req.Count = 1 req.Count = 1
} }
@ -87,6 +97,10 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.StatisticsRequest) (*master_pb.StatisticsResponse, error) { func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.StatisticsRequest) (*master_pb.StatisticsResponse, error) {
if !ms.Topo.IsLeader() {
return nil, raft.NotLeaderError
}
if req.Replication == "" { if req.Replication == "" {
req.Replication = ms.defaultReplicaPlacement req.Replication = ms.defaultReplicaPlacement
} }