seaweedfs/weed/operation/assign_file_id.go

278 lines
7.7 KiB
Go
Raw Permalink Normal View History

package operation
import (
2018-11-23 08:26:15 +00:00
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
2023-08-23 07:31:33 +00:00
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"google.golang.org/grpc"
2023-09-11 05:03:44 +00:00
"sync"
)
2016-06-26 02:50:18 +00:00
type VolumeAssignRequest struct {
Count uint64
Replication string
Collection string
Ttl string
2020-12-16 17:14:05 +00:00
DiskType string
DataCenter string
Rack string
DataNode string
WritableVolumeCount uint32
2016-06-26 02:50:18 +00:00
}
type AssignResult struct {
Fid string `json:"fid,omitempty"`
Url string `json:"url,omitempty"`
PublicUrl string `json:"publicUrl,omitempty"`
GrpcPort int `json:"grpcPort,omitempty"`
Count uint64 `json:"count,omitempty"`
Error string `json:"error,omitempty"`
Auth security.EncodedJwt `json:"auth,omitempty"`
Replicas []Location `json:"replicas,omitempty"`
}
2023-08-23 07:31:33 +00:00
// This is a proxy to the master server, only for assigning volume ids.
// It runs via grpc to the master server in streaming mode.
// The connection to the master would only be re-established when the last connection has error.
type AssignProxy struct {
grpcConnection *grpc.ClientConn
pool chan *singleThreadAssignProxy
}
func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concurrency int) (ap *AssignProxy, err error) {
ap = &AssignProxy{
pool: make(chan *singleThreadAssignProxy, concurrency),
}
ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn().ToGrpcAddress(), true, grpcDialOption)
if err != nil {
return nil, fmt.Errorf("fail to dial %s: %v", masterFn().ToGrpcAddress(), err)
}
for i := 0; i < concurrency; i++ {
ap.pool <- &singleThreadAssignProxy{}
}
return ap, nil
}
func (ap *AssignProxy) Assign(primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) {
p := <-ap.pool
defer func() {
ap.pool <- p
}()
return p.doAssign(ap.grpcConnection, primaryRequest, alternativeRequests...)
}
type singleThreadAssignProxy struct {
assignClient master_pb.Seaweed_StreamAssignClient
2023-09-11 05:03:44 +00:00
sync.Mutex
2023-08-23 07:31:33 +00:00
}
func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) {
2023-09-11 05:03:44 +00:00
ap.Lock()
defer ap.Unlock()
2023-08-23 07:31:33 +00:00
if ap.assignClient == nil {
client := master_pb.NewSeaweedClient(grpcConnection)
ap.assignClient, err = client.StreamAssign(context.Background())
if err != nil {
ap.assignClient = nil
return nil, fmt.Errorf("fail to create stream assign client: %v", err)
}
}
var requests []*VolumeAssignRequest
requests = append(requests, primaryRequest)
requests = append(requests, alternativeRequests...)
ret = &AssignResult{}
for _, request := range requests {
if request == nil {
continue
}
req := &master_pb.AssignRequest{
Count: request.Count,
Replication: request.Replication,
Collection: request.Collection,
Ttl: request.Ttl,
DiskType: request.DiskType,
DataCenter: request.DataCenter,
Rack: request.Rack,
DataNode: request.DataNode,
WritableVolumeCount: request.WritableVolumeCount,
}
if err = ap.assignClient.Send(req); err != nil {
return nil, fmt.Errorf("StreamAssignSend: %v", err)
}
resp, grpcErr := ap.assignClient.Recv()
if grpcErr != nil {
return nil, grpcErr
}
if resp.Error != "" {
return nil, fmt.Errorf("StreamAssignRecv: %v", resp.Error)
}
ret.Count = resp.Count
ret.Fid = resp.Fid
ret.Url = resp.Location.Url
ret.PublicUrl = resp.Location.PublicUrl
ret.GrpcPort = int(resp.Location.GrpcPort)
ret.Error = resp.Error
ret.Auth = security.EncodedJwt(resp.Auth)
for _, r := range resp.Replicas {
ret.Replicas = append(ret.Replicas, Location{
Url: r.Url,
PublicUrl: r.PublicUrl,
DataCenter: r.DataCenter,
})
}
if ret.Count <= 0 {
continue
}
break
}
return
}
func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
var requests []*VolumeAssignRequest
requests = append(requests, primaryRequest)
requests = append(requests, alternativeRequests...)
2016-06-23 03:19:09 +00:00
var lastError error
ret := &AssignResult{}
for i, request := range requests {
if request == nil {
continue
}
lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
2020-11-17 08:36:21 +00:00
Count: request.Count,
Replication: request.Replication,
Collection: request.Collection,
Ttl: request.Ttl,
2020-12-16 17:14:05 +00:00
DiskType: request.DiskType,
2020-11-17 08:36:21 +00:00
DataCenter: request.DataCenter,
Rack: request.Rack,
DataNode: request.DataNode,
WritableVolumeCount: request.WritableVolumeCount,
}
resp, grpcErr := masterClient.Assign(context.Background(), req)
if grpcErr != nil {
return grpcErr
}
if resp.Error != "" {
return fmt.Errorf("assignRequest: %v", resp.Error)
}
ret.Count = resp.Count
ret.Fid = resp.Fid
ret.Url = resp.Location.Url
ret.PublicUrl = resp.Location.PublicUrl
ret.GrpcPort = int(resp.Location.GrpcPort)
ret.Error = resp.Error
2019-02-15 08:09:19 +00:00
ret.Auth = security.EncodedJwt(resp.Auth)
2021-09-08 22:54:55 +00:00
for _, r := range resp.Replicas {
ret.Replicas = append(ret.Replicas, Location{
Url: r.Url,
PublicUrl: r.PublicUrl,
DataCenter: r.DataCenter,
})
}
return nil
})
if lastError != nil {
continue
}
if ret.Count <= 0 {
lastError = fmt.Errorf("assign failure %d: %v", i+1, ret.Error)
continue
}
2021-04-14 10:29:28 +00:00
break
}
return ret, lastError
}
2019-02-15 08:09:19 +00:00
func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) {
2019-02-15 08:09:19 +00:00
WithMasterServerClient(false, master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
2019-02-15 08:09:19 +00:00
2021-08-13 04:40:33 +00:00
resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{fileId},
})
if grpcErr != nil {
return grpcErr
2019-02-15 08:09:19 +00:00
}
2021-08-13 04:40:33 +00:00
if len(resp.VolumeIdLocations) == 0 {
return nil
}
token = security.EncodedJwt(resp.VolumeIdLocations[0].Auth)
return nil
})
return
2019-02-15 08:09:19 +00:00
}
2020-11-16 00:58:48 +00:00
type StorageOption struct {
Replication string
2020-12-16 17:14:05 +00:00
DiskType string
Collection string
DataCenter string
Rack string
DataNode string
TtlSeconds int32
Fsync bool
VolumeGrowthCount uint32
SaveInside bool
2020-11-16 00:58:48 +00:00
}
func (so *StorageOption) TtlString() string {
return needle.SecondsToTTL(so.TtlSeconds)
}
func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, altRequest *VolumeAssignRequest) {
ar = &VolumeAssignRequest{
Count: uint64(count),
Replication: so.Replication,
Collection: so.Collection,
Ttl: so.TtlString(),
2020-12-16 17:14:05 +00:00
DiskType: so.DiskType,
DataCenter: so.DataCenter,
Rack: so.Rack,
DataNode: so.DataNode,
WritableVolumeCount: so.VolumeGrowthCount,
2020-11-16 00:58:48 +00:00
}
if so.DataCenter != "" || so.Rack != "" || so.DataNode != "" {
2020-11-16 00:58:48 +00:00
altRequest = &VolumeAssignRequest{
Count: uint64(count),
Replication: so.Replication,
Collection: so.Collection,
Ttl: so.TtlString(),
2020-12-16 17:14:05 +00:00
DiskType: so.DiskType,
DataCenter: "",
Rack: "",
DataNode: "",
WritableVolumeCount: so.VolumeGrowthCount,
2020-11-16 00:58:48 +00:00
}
}
return
}