seaweedfs/weed/operation/assign_file_id.go

110 lines
2.9 KiB
Go
Raw Normal View History

package operation
import (
2020-02-10 17:13:29 +00:00
"bytes"
2018-11-23 08:26:15 +00:00
"context"
"fmt"
2020-02-10 17:13:29 +00:00
"github.com/valyala/fasthttp"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
2019-02-15 08:09:19 +00:00
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
2016-06-26 02:50:18 +00:00
type VolumeAssignRequest struct {
Count uint64
Replication string
Collection string
Ttl string
DataCenter string
Rack string
DataNode string
WritableVolumeCount uint32
2016-06-26 02:50:18 +00:00
}
type AssignResult struct {
2019-02-15 08:09:19 +00:00
Fid string `json:"fid,omitempty"`
Url string `json:"url,omitempty"`
PublicUrl string `json:"publicUrl,omitempty"`
Count uint64 `json:"count,omitempty"`
Error string `json:"error,omitempty"`
Auth security.EncodedJwt `json:"auth,omitempty"`
}
2019-02-18 20:11:52 +00:00
func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
var requests []*VolumeAssignRequest
requests = append(requests, primaryRequest)
requests = append(requests, alternativeRequests...)
2016-06-23 03:19:09 +00:00
var lastError error
ret := &AssignResult{}
for i, request := range requests {
if request == nil {
continue
}
lastError = WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
Count: primaryRequest.Count,
Replication: primaryRequest.Replication,
Collection: primaryRequest.Collection,
Ttl: primaryRequest.Ttl,
DataCenter: primaryRequest.DataCenter,
Rack: primaryRequest.Rack,
DataNode: primaryRequest.DataNode,
WritableVolumeCount: primaryRequest.WritableVolumeCount,
}
resp, grpcErr := masterClient.Assign(context.Background(), req)
if grpcErr != nil {
return grpcErr
}
ret.Count = resp.Count
ret.Fid = resp.Fid
ret.Url = resp.Url
ret.PublicUrl = resp.PublicUrl
ret.Error = resp.Error
2019-02-15 08:09:19 +00:00
ret.Auth = security.EncodedJwt(resp.Auth)
return nil
})
if lastError != nil {
continue
}
if ret.Count <= 0 {
lastError = fmt.Errorf("assign failure %d: %v", i+1, ret.Error)
continue
}
}
return ret, lastError
}
2019-02-15 08:09:19 +00:00
func LookupJwt(master string, fileId string) security.EncodedJwt {
tokenStr := ""
2020-02-10 17:13:29 +00:00
lookupUrl := fmt.Sprintf("http://%s/dir/lookup?fileId=%s", master, fileId)
2019-02-15 08:09:19 +00:00
err := util.Head(lookupUrl, func(header *fasthttp.ResponseHeader) {
2020-02-10 17:13:29 +00:00
bearer := header.Peek("Authorization")
if len(bearer) > 7 && string(bytes.ToUpper(bearer[0:6])) == "BEARER" {
tokenStr = string(bearer[7:])
2019-02-15 08:09:19 +00:00
}
2020-02-10 17:13:29 +00:00
})
if err != nil {
glog.V(0).Infof("failed to lookup jwt %s: %v", lookupUrl, err)
2019-02-15 08:09:19 +00:00
}
return security.EncodedJwt(tokenStr)
}