seaweedfs/weed/shell/commands.go

194 lines
4.9 KiB
Go
Raw Permalink Normal View History

package shell
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"io"
"net/url"
"strconv"
"strings"
2019-12-13 08:22:37 +00:00
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"github.com/seaweedfs/seaweedfs/weed/wdclient/exclusive_locks"
)
type ShellOptions struct {
2019-10-25 14:45:12 +00:00
Masters *string
GrpcDialOption grpc.DialOption
// shell transient context
2021-08-04 19:30:18 +00:00
FilerHost string
FilerPort int64
2022-05-02 04:59:16 +00:00
FilerGroup *string
FilerAddress pb.ServerAddress
2021-08-04 19:30:18 +00:00
Directory string
}
type CommandEnv struct {
env map[string]string
MasterClient *wdclient.MasterClient
option *ShellOptions
2020-05-26 07:03:31 +00:00
locker *exclusive_locks.ExclusiveLocker
}
type command interface {
Name() string
Help() string
Do([]string, *CommandEnv, io.Writer) error
}
var (
Commands = []command{}
)
func NewCommandEnv(options *ShellOptions) *CommandEnv {
2020-04-23 09:31:04 +00:00
ce := &CommandEnv{
2020-03-02 06:39:08 +00:00
env: make(map[string]string),
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, *options.FilerGroup, pb.AdminShellClient, "", "", "", *pb.ServerAddresses(*options.Masters).ToServiceDiscovery()),
2020-03-02 06:39:08 +00:00
option: options,
}
2023-03-26 19:17:23 +00:00
ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "shell")
2020-04-23 09:31:04 +00:00
return ce
}
2020-03-24 04:26:15 +00:00
func (ce *CommandEnv) parseUrl(input string) (path string, err error) {
if strings.HasPrefix(input, "http") {
err = fmt.Errorf("http://<filer>:<port> prefix is not supported any more")
return
}
if !strings.HasPrefix(input, "/") {
2020-04-05 20:11:43 +00:00
input = util.Join(ce.option.Directory, input)
}
2020-03-24 04:26:15 +00:00
return input, err
}
2020-03-24 04:26:15 +00:00
func (ce *CommandEnv) isDirectory(path string) bool {
2020-03-24 04:26:15 +00:00
return ce.checkDirectory(path) == nil
}
2021-12-10 21:24:38 +00:00
func (ce *CommandEnv) confirmIsLocked(args []string) error {
2022-08-22 21:11:13 +00:00
if ce.locker.IsLocked() {
return nil
}
2021-12-10 21:24:38 +00:00
ce.locker.SetMessage(fmt.Sprintf("%v", args))
2021-08-06 04:06:55 +00:00
return fmt.Errorf("need to run \"lock\" first to continue")
}
2022-08-22 21:11:13 +00:00
func (ce *CommandEnv) isLocked() bool {
2022-08-23 08:52:29 +00:00
if ce == nil {
return true
}
return ce.locker.IsLocked()
2022-08-22 21:11:13 +00:00
}
2020-03-24 04:26:15 +00:00
func (ce *CommandEnv) checkDirectory(path string) error {
2020-03-23 07:01:34 +00:00
dir, name := util.FullPath(path).DirAndName()
2020-03-24 04:36:39 +00:00
exists, err := filer_pb.Exists(ce, dir, name, true)
if !exists {
return fmt.Errorf("%s is not a directory", path)
}
2020-03-24 04:26:15 +00:00
return err
}
2020-04-29 20:26:02 +00:00
var _ = filer_pb.FilerClient(&CommandEnv{})
func (ce *CommandEnv) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
2020-03-24 05:54:02 +00:00
return pb.WithGrpcFilerClient(streamingMode, 0, ce.option.FilerAddress, ce.option.GrpcDialOption, fn)
2020-03-24 05:54:02 +00:00
}
2021-01-28 22:36:29 +00:00
func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
func (ce *CommandEnv) GetDataCenter() string {
return ce.MasterClient.DataCenter
}
func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
if strings.HasPrefix(entryPath, "http") {
var u *url.URL
u, err = url.Parse(entryPath)
if err != nil {
return
}
filerServer = u.Hostname()
portString := u.Port()
if portString != "" {
filerPort, err = strconv.ParseInt(portString, 10, 32)
}
path = u.Path
} else {
err = fmt.Errorf("path should have full url /path/to/dirOrFile : %s", entryPath)
}
return
}
func findInputDirectory(args []string) (input string) {
input = "."
if len(args) > 0 {
input = args[len(args)-1]
if strings.HasPrefix(input, "-") {
input = "."
}
}
return input
}
func readNeedleMeta(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.ReadNeedleMetaResponse, err error) {
err = operation.WithVolumeServerClient(false, volumeServer, grpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {
if resp, err = client.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
VolumeId: volumeId,
NeedleId: uint64(needleValue.Key),
Offset: needleValue.Offset.ToActualOffset(),
Size: int32(needleValue.Size),
}); err != nil {
return err
}
return nil
},
)
return
}
func readNeedleStatus(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.VolumeNeedleStatusResponse, err error) {
err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {
if resp, err = client.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{
VolumeId: volumeId,
NeedleId: uint64(needleValue.Key),
}); err != nil {
return err
}
return nil
},
)
return
}
func getCollectionName(commandEnv *CommandEnv, bucket string) string {
if *commandEnv.option.FilerGroup != "" {
return fmt.Sprintf("%s_%s", *commandEnv.option.FilerGroup, bucket)
}
return bucket
}