diff --git a/weed/server/master_grpc_server_assign.go b/weed/server/master_grpc_server_assign.go new file mode 100644 index 000000000..b55d6a80e --- /dev/null +++ b/weed/server/master_grpc_server_assign.go @@ -0,0 +1,103 @@ +package weed_server + +import ( + "context" + "fmt" + "time" + + "github.com/seaweedfs/raft" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/topology" +) + +func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest) (*master_pb.AssignResponse, error) { + + if !ms.Topo.IsLeader() { + return nil, raft.NotLeaderError + } + + if req.Count == 0 { + req.Count = 1 + } + + if req.Replication == "" { + req.Replication = ms.option.DefaultReplicaPlacement + } + replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication) + if err != nil { + return nil, err + } + ttl, err := needle.ReadTTL(req.Ttl) + if err != nil { + return nil, err + } + diskType := types.ToDiskType(req.DiskType) + + option := &topology.VolumeGrowOption{ + Collection: req.Collection, + ReplicaPlacement: replicaPlacement, + Ttl: ttl, + DiskType: diskType, + Preallocate: ms.preallocateSize, + DataCenter: req.DataCenter, + Rack: req.Rack, + DataNode: req.DataNode, + MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb, + } + + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) + + if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) { + if ms.Topo.AvailableSpaceFor(option) <= 0 { + return nil, fmt.Errorf("no free volumes left for " + option.String()) + } + vl.AddGrowRequest() + ms.vgCh <- &topology.VolumeGrowRequest{ + Option: option, + Count: int(req.WritableVolumeCount), + } + } + + var ( + lastErr error + maxTimeout = time.Second * 10 + startTime = time.Now() + ) + + for time.Now().Sub(startTime) < maxTimeout { + fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option) + if err == nil { + dn := dnList.Head() + var replicas []*master_pb.Location + for _, r := range dnList.Rest() { + replicas = append(replicas, &master_pb.Location{ + Url: r.Url(), + PublicUrl: r.PublicUrl, + GrpcPort: uint32(r.GrpcPort), + DataCenter: r.GetDataCenterId(), + }) + } + return &master_pb.AssignResponse{ + Fid: fid, + Location: &master_pb.Location{ + Url: dn.Url(), + PublicUrl: dn.PublicUrl, + GrpcPort: uint32(dn.GrpcPort), + DataCenter: dn.GetDataCenterId(), + }, + Count: count, + Auth: string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)), + Replicas: replicas, + }, nil + } + //glog.V(4).Infoln("waiting for volume growing...") + lastErr = err + time.Sleep(200 * time.Millisecond) + } + return nil, lastErr +} diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 87c7b9990..4fa6406a7 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -16,7 +16,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" - "github.com/seaweedfs/seaweedfs/weed/topology" ) func (ms *MasterServer) ProcessGrowRequest() { @@ -113,93 +112,6 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV return resp, nil } -func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest) (*master_pb.AssignResponse, error) { - - if !ms.Topo.IsLeader() { - return nil, raft.NotLeaderError - } - - if req.Count == 0 { - req.Count = 1 - } - - if req.Replication == "" { - req.Replication = ms.option.DefaultReplicaPlacement - } - replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication) - if err != nil { - return nil, err - } - ttl, err := needle.ReadTTL(req.Ttl) - if err != nil { - return nil, err - } - diskType := types.ToDiskType(req.DiskType) - - option := &topology.VolumeGrowOption{ - Collection: req.Collection, - ReplicaPlacement: replicaPlacement, - Ttl: ttl, - DiskType: diskType, - Preallocate: ms.preallocateSize, - DataCenter: req.DataCenter, - Rack: req.Rack, - DataNode: req.DataNode, - MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb, - } - - vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) - - if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) { - if ms.Topo.AvailableSpaceFor(option) <= 0 { - return nil, fmt.Errorf("no free volumes left for " + option.String()) - } - vl.AddGrowRequest() - ms.vgCh <- &topology.VolumeGrowRequest{ - Option: option, - Count: int(req.WritableVolumeCount), - } - } - - var ( - lastErr error - maxTimeout = time.Second * 10 - startTime = time.Now() - ) - - for time.Now().Sub(startTime) < maxTimeout { - fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option) - if err == nil { - dn := dnList.Head() - var replicas []*master_pb.Location - for _, r := range dnList.Rest() { - replicas = append(replicas, &master_pb.Location{ - Url: r.Url(), - PublicUrl: r.PublicUrl, - GrpcPort: uint32(r.GrpcPort), - DataCenter: r.GetDataCenterId(), - }) - } - return &master_pb.AssignResponse{ - Fid: fid, - Location: &master_pb.Location{ - Url: dn.Url(), - PublicUrl: dn.PublicUrl, - GrpcPort: uint32(dn.GrpcPort), - DataCenter: dn.GetDataCenterId(), - }, - Count: count, - Auth: string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)), - Replicas: replicas, - }, nil - } - //glog.V(4).Infoln("waiting for volume growing...") - lastErr = err - time.Sleep(200 * time.Millisecond) - } - return nil, lastErr -} - func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.StatisticsRequest) (*master_pb.StatisticsResponse, error) { if !ms.Topo.IsLeader() {