mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
master: add cluster wide lock/unlock operation in weed shell
fix https://github.com/chrislusf/seaweedfs/issues/1286
This commit is contained in:
parent
bdc337a719
commit
73564e6a01
|
@ -205,6 +205,10 @@ func (ms *MasterServer) startAdminScripts() {
|
||||||
filerHostPort := v.GetString("master.filer.default")
|
filerHostPort := v.GetString("master.filer.default")
|
||||||
|
|
||||||
scriptLines := strings.Split(adminScripts, "\n")
|
scriptLines := strings.Split(adminScripts, "\n")
|
||||||
|
if !strings.Contains(adminScripts, "lock") {
|
||||||
|
scriptLines = append(append([]string{}, "lock"), scriptLines...)
|
||||||
|
scriptLines = append(scriptLines, "unlock")
|
||||||
|
}
|
||||||
|
|
||||||
masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
|
masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ package weed_server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/chrislusf/raft"
|
"github.com/chrislusf/raft"
|
||||||
ui "github.com/chrislusf/seaweedfs/weed/server/master_ui"
|
ui "github.com/chrislusf/seaweedfs/weed/server/master_ui"
|
||||||
|
@ -11,7 +12,7 @@ import (
|
||||||
|
|
||||||
func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) {
|
func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
infos := make(map[string]interface{})
|
infos := make(map[string]interface{})
|
||||||
infos["Version"] = util.VERSION
|
infos["Up Time"] = time.Now().Sub(startTime).String()
|
||||||
args := struct {
|
args := struct {
|
||||||
Version string
|
Version string
|
||||||
Topology interface{}
|
Topology interface{}
|
||||||
|
|
|
@ -98,6 +98,10 @@ func (c *commandEcBalance) Help() string {
|
||||||
|
|
||||||
func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||||
|
|
||||||
|
if err = commandEnv.confirmIsLocked(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||||
collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
|
collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
|
||||||
dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
|
dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
|
||||||
|
|
|
@ -36,6 +36,10 @@ func (c *commandEcDecode) Help() string {
|
||||||
|
|
||||||
func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||||
|
|
||||||
|
if err = commandEnv.confirmIsLocked(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||||
volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
|
volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
|
||||||
collection := encodeCommand.String("collection", "", "the collection name")
|
collection := encodeCommand.String("collection", "", "the collection name")
|
||||||
|
|
|
@ -54,6 +54,10 @@ func (c *commandEcEncode) Help() string {
|
||||||
|
|
||||||
func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||||
|
|
||||||
|
if err = commandEnv.confirmIsLocked(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||||
volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
|
volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
|
||||||
collection := encodeCommand.String("collection", "", "the collection name")
|
collection := encodeCommand.String("collection", "", "the collection name")
|
||||||
|
|
|
@ -56,6 +56,10 @@ func (c *commandEcRebuild) Help() string {
|
||||||
|
|
||||||
func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||||
|
|
||||||
|
if err = commandEnv.confirmIsLocked(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||||
collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
|
collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
|
||||||
applyChanges := fixCommand.Bool("force", false, "apply the changes")
|
applyChanges := fixCommand.Bool("force", false, "apply the changes")
|
||||||
|
|
|
@ -60,6 +60,10 @@ func (c *commandVolumeBalance) Help() string {
|
||||||
|
|
||||||
func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||||
|
|
||||||
|
if err = commandEnv.confirmIsLocked(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||||
collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection")
|
collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection")
|
||||||
dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
|
dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
|
||||||
|
|
|
@ -35,6 +35,10 @@ func (c *commandVolumeConfigureReplication) Help() string {
|
||||||
|
|
||||||
func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||||
|
|
||||||
|
if err = commandEnv.confirmIsLocked(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||||
volumeIdInt := configureReplicationCommand.Int("volumeId", 0, "the volume id")
|
volumeIdInt := configureReplicationCommand.Int("volumeId", 0, "the volume id")
|
||||||
replicationString := configureReplicationCommand.String("replication", "", "the intended replication value")
|
replicationString := configureReplicationCommand.String("replication", "", "the intended replication value")
|
||||||
|
|
|
@ -31,6 +31,10 @@ func (c *commandVolumeCopy) Help() string {
|
||||||
|
|
||||||
func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||||
|
|
||||||
|
if err = commandEnv.confirmIsLocked(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if len(args) != 3 {
|
if len(args) != 3 {
|
||||||
fmt.Fprintf(writer, "received args: %+v\n", args)
|
fmt.Fprintf(writer, "received args: %+v\n", args)
|
||||||
return fmt.Errorf("need 3 args of <source volume server host:port> <target volume server host:port> <volume id>")
|
return fmt.Errorf("need 3 args of <source volume server host:port> <target volume server host:port> <volume id>")
|
||||||
|
|
|
@ -30,6 +30,10 @@ func (c *commandVolumeDelete) Help() string {
|
||||||
|
|
||||||
func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||||
|
|
||||||
|
if err = commandEnv.confirmIsLocked(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if len(args) != 2 {
|
if len(args) != 2 {
|
||||||
fmt.Fprintf(writer, "received args: %+v\n", args)
|
fmt.Fprintf(writer, "received args: %+v\n", args)
|
||||||
return fmt.Errorf("need 2 args of <volume server host:port> <volume id>")
|
return fmt.Errorf("need 2 args of <volume server host:port> <volume id>")
|
||||||
|
|
|
@ -44,6 +44,10 @@ func (c *commandVolumeFixReplication) Help() string {
|
||||||
|
|
||||||
func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||||
|
|
||||||
|
if err = commandEnv.confirmIsLocked(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
takeAction := true
|
takeAction := true
|
||||||
if len(args) > 0 && args[0] == "-n" {
|
if len(args) > 0 && args[0] == "-n" {
|
||||||
takeAction = false
|
takeAction = false
|
||||||
|
|
|
@ -48,6 +48,10 @@ func (c *commandVolumeFsck) Help() string {
|
||||||
|
|
||||||
func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||||
|
|
||||||
|
if err = commandEnv.confirmIsLocked(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||||
verbose := fsckCommand.Bool("v", false, "verbose mode")
|
verbose := fsckCommand.Bool("v", false, "verbose mode")
|
||||||
applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer")
|
applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer")
|
||||||
|
|
|
@ -34,6 +34,10 @@ func (c *commandVolumeMount) Help() string {
|
||||||
|
|
||||||
func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||||
|
|
||||||
|
if err = commandEnv.confirmIsLocked(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if len(args) != 2 {
|
if len(args) != 2 {
|
||||||
fmt.Fprintf(writer, "received args: %+v\n", args)
|
fmt.Fprintf(writer, "received args: %+v\n", args)
|
||||||
return fmt.Errorf("need 2 args of <volume server host:port> <volume id>")
|
return fmt.Errorf("need 2 args of <volume server host:port> <volume id>")
|
||||||
|
|
|
@ -44,6 +44,10 @@ func (c *commandVolumeMove) Help() string {
|
||||||
|
|
||||||
func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||||
|
|
||||||
|
if err = commandEnv.confirmIsLocked(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if len(args) != 3 {
|
if len(args) != 3 {
|
||||||
fmt.Fprintf(writer, "received args: %+v\n", args)
|
fmt.Fprintf(writer, "received args: %+v\n", args)
|
||||||
return fmt.Errorf("need 3 args of <source volume server host:port> <target volume server host:port> <volume id>")
|
return fmt.Errorf("need 3 args of <source volume server host:port> <target volume server host:port> <volume id>")
|
||||||
|
|
|
@ -42,6 +42,10 @@ func (c *commandVolumeTierDownload) Help() string {
|
||||||
|
|
||||||
func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||||
|
|
||||||
|
if err = commandEnv.confirmIsLocked(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||||
volumeId := tierCommand.Int("volumeId", 0, "the volume id")
|
volumeId := tierCommand.Int("volumeId", 0, "the volume id")
|
||||||
collection := tierCommand.String("collection", "", "the collection name")
|
collection := tierCommand.String("collection", "", "the collection name")
|
||||||
|
|
|
@ -56,6 +56,10 @@ func (c *commandVolumeTierUpload) Help() string {
|
||||||
|
|
||||||
func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||||
|
|
||||||
|
if err = commandEnv.confirmIsLocked(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||||
volumeId := tierCommand.Int("volumeId", 0, "the volume id")
|
volumeId := tierCommand.Int("volumeId", 0, "the volume id")
|
||||||
collection := tierCommand.String("collection", "", "the collection name")
|
collection := tierCommand.String("collection", "", "the collection name")
|
||||||
|
|
|
@ -34,6 +34,10 @@ func (c *commandVolumeUnmount) Help() string {
|
||||||
|
|
||||||
func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||||
|
|
||||||
|
if err = commandEnv.confirmIsLocked(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if len(args) != 2 {
|
if len(args) != 2 {
|
||||||
fmt.Fprintf(writer, "received args: %+v\n", args)
|
fmt.Fprintf(writer, "received args: %+v\n", args)
|
||||||
return fmt.Errorf("need 2 args of <volume server host:port> <volume id>")
|
return fmt.Errorf("need 2 args of <volume server host:port> <volume id>")
|
||||||
|
|
|
@ -68,6 +68,16 @@ func (ce *CommandEnv) isDirectory(path string) bool {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ce *CommandEnv) confirmIsLocked() error {
|
||||||
|
|
||||||
|
if ce.locker.isLocking {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("need to lock to continue")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (ce *CommandEnv) checkDirectory(path string) error {
|
func (ce *CommandEnv) checkDirectory(path string) error {
|
||||||
|
|
||||||
dir, name := util.FullPath(path).DirAndName()
|
dir, name := util.FullPath(path).DirAndName()
|
||||||
|
|
Loading…
Reference in a new issue