From ede876cfdb0116557dd197a7951957dab6745c24 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 5 Jun 2019 01:30:24 -0700 Subject: [PATCH] periodic scripts exeuction from leader master --- .../repeated_vacuum/repeated_vacuum.go | 8 +-- unmaintained/volume_tailer/volume_tailer.go | 3 +- weed/command/backup.go | 4 +- weed/command/benchmark.go | 8 +-- weed/command/filer.go | 7 +- weed/command/filer_copy.go | 18 ++--- weed/command/filer_replication.go | 8 +-- weed/command/master.go | 14 ++-- weed/command/mount_std.go | 3 +- weed/command/s3.go | 9 +-- weed/command/scaffold.go | 23 ++++++- weed/command/server.go | 10 +-- weed/command/shell.go | 4 +- weed/command/upload.go | 9 +-- weed/command/volume.go | 7 +- weed/command/webdav.go | 2 +- weed/filer2/stream.go | 41 ++++++++++++ weed/server/filer_server.go | 33 +-------- weed/server/filer_server_handlers_read.go | 34 +--------- weed/server/master_grpc_server.go | 2 +- weed/server/master_server.go | 67 +++++++++++++++++++ weed/shell/command_collection_delete.go | 6 +- weed/shell/command_collection_list.go | 8 +-- weed/shell/command_ec_balance.go | 16 ++--- weed/shell/command_ec_common.go | 6 +- weed/shell/command_ec_encode.go | 14 ++-- weed/shell/command_ec_rebuild.go | 10 +-- weed/shell/command_fs_cat.go | 7 +- weed/shell/command_fs_cd.go | 4 +- weed/shell/command_fs_du.go | 6 +- weed/shell/command_fs_ls.go | 4 +- weed/shell/command_fs_meta_load.go | 4 +- weed/shell/command_fs_meta_notify.go | 8 +-- weed/shell/command_fs_meta_save.go | 4 +- weed/shell/command_fs_pwd.go | 4 +- weed/shell/command_fs_tree.go | 4 +- weed/shell/command_volume_balance.go | 12 ++-- weed/shell/command_volume_copy.go | 4 +- weed/shell/command_volume_delete.go | 4 +- weed/shell/command_volume_fix_replication.go | 6 +- weed/shell/command_volume_list.go | 6 +- weed/shell/command_volume_mount.go | 4 +- weed/shell/command_volume_move.go | 4 +- weed/shell/command_volume_unmount.go | 4 +- weed/shell/commands.go | 33 ++++++--- weed/shell/shell_liner.go | 32 ++++----- weed/util/config.go | 34 ++++++++++ 47 files changed, 337 insertions(+), 225 deletions(-) create mode 100644 weed/filer2/stream.go diff --git a/unmaintained/repeated_vacuum/repeated_vacuum.go b/unmaintained/repeated_vacuum/repeated_vacuum.go index 90bdeb5e8..28bcabb9b 100644 --- a/unmaintained/repeated_vacuum/repeated_vacuum.go +++ b/unmaintained/repeated_vacuum/repeated_vacuum.go @@ -4,12 +4,12 @@ import ( "bytes" "flag" "fmt" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/server" - "github.com/spf13/viper" "log" "math/rand" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -22,7 +22,7 @@ var ( func main() { flag.Parse() - weed_server.LoadConfiguration("security", false) + util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") for i := 0; i < *repeat; i++ { diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go index 408f8dfec..f0ef51c09 100644 --- a/unmaintained/volume_tailer/volume_tailer.go +++ b/unmaintained/volume_tailer/volume_tailer.go @@ -7,7 +7,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" - weed_server "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/storage/needle" util2 "github.com/chrislusf/seaweedfs/weed/util" "github.com/spf13/viper" @@ -25,7 +24,7 @@ var ( func main() { flag.Parse() - weed_server.LoadConfiguration("security", false) + util2.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") vid := needle.VolumeId(*volumeId) diff --git a/weed/command/backup.go b/weed/command/backup.go index 022e784c7..31e146965 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -4,8 +4,8 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/spf13/viper" "github.com/chrislusf/seaweedfs/weed/operation" @@ -52,7 +52,7 @@ var cmdBackup = &Command{ func runBackup(cmd *Command, args []string) bool { - weed_server.LoadConfiguration("security", false) + util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") if *s.volumeId == -1 { diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 6c64c4591..dd0fdb88e 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -4,9 +4,6 @@ import ( "bufio" "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/server" - "github.com/spf13/viper" - "google.golang.org/grpc" "io" "math" "math/rand" @@ -18,6 +15,9 @@ import ( "sync" "time" + "github.com/spf13/viper" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" @@ -108,7 +108,7 @@ var ( func runBenchmark(cmd *Command, args []string) bool { - weed_server.LoadConfiguration("security", false) + util.LoadConfiguration("security", false) b.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) diff --git a/weed/command/filer.go b/weed/command/filer.go index d82781765..2aa022cd0 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -1,13 +1,14 @@ package command import ( - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" "net/http" "strconv" "strings" "time" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/server" @@ -77,7 +78,7 @@ var cmdFiler = &Command{ func runFiler(cmd *Command, args []string) bool { - weed_server.LoadConfiguration("security", false) + util.LoadConfiguration("security", false) f.startFiler() diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index c18b9f055..8d8fead62 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -3,14 +3,6 @@ package command import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/server" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/wdclient" - "github.com/spf13/viper" - "google.golang.org/grpc" "io" "io/ioutil" "net/http" @@ -21,6 +13,14 @@ import ( "strings" "sync" "time" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "github.com/spf13/viper" + "google.golang.org/grpc" ) var ( @@ -74,7 +74,7 @@ var cmdCopy = &Command{ func runCopy(cmd *Command, args []string) bool { - weed_server.LoadConfiguration("security", false) + util.LoadConfiguration("security", false) if len(args) <= 1 { return false diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 82576afe6..c6e7f5dba 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -13,7 +13,7 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" "github.com/chrislusf/seaweedfs/weed/replication/sub" - "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/spf13/viper" ) @@ -36,9 +36,9 @@ var cmdFilerReplicate = &Command{ func runFilerReplicate(cmd *Command, args []string) bool { - weed_server.LoadConfiguration("security", false) - weed_server.LoadConfiguration("replication", true) - weed_server.LoadConfiguration("notification", true) + util.LoadConfiguration("security", false) + util.LoadConfiguration("replication", true) + util.LoadConfiguration("notification", true) config := viper.GetViper() var notificationInput sub.NotificationInput diff --git a/weed/command/master.go b/weed/command/master.go index 5a5bf2c0a..bda8493ed 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -1,6 +1,12 @@ package command import ( + "net/http" + "os" + "runtime" + "strconv" + "strings" + "github.com/chrislusf/raft/protobuf" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -10,11 +16,6 @@ import ( "github.com/gorilla/mux" "github.com/spf13/viper" "google.golang.org/grpc/reflection" - "net/http" - "os" - "runtime" - "strconv" - "strings" ) func init() { @@ -56,7 +57,8 @@ var ( func runMaster(cmd *Command, args []string) bool { - weed_server.LoadConfiguration("security", false) + util.LoadConfiguration("security", false) + util.LoadConfiguration("master", false) if *mMaxCpu < 1 { *mMaxCpu = runtime.NumCPU() diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 36c1f97a3..1d1214266 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -13,7 +13,6 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/server" "github.com/jacobsa/daemonize" "github.com/spf13/viper" @@ -45,7 +44,7 @@ func runMount(cmd *Command, args []string) bool { func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCenter string, chunkSizeLimitMB int, allowOthers bool, ttlSec int, dirListingLimit int) bool { - weed_server.LoadConfiguration("security", false) + util.LoadConfiguration("security", false) fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) if dir == "" { diff --git a/weed/command/s3.go b/weed/command/s3.go index 014342766..e004bb066 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -1,13 +1,14 @@ package command import ( - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/server" - "github.com/spf13/viper" "net/http" "time" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/spf13/viper" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/s3api" "github.com/chrislusf/seaweedfs/weed/util" @@ -47,7 +48,7 @@ var cmdS3 = &Command{ func runS3(cmd *Command, args []string) bool { - weed_server.LoadConfiguration("security", false) + util.LoadConfiguration("security", false) return s3StandaloneOptions.startS3Server() diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index e24d7b56a..2282658bb 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -10,7 +10,7 @@ func init() { } var cmdScaffold = &Command{ - UsageLine: "scaffold -config=[filer|notification|replication|security]", + UsageLine: "scaffold -config=[filer|notification|replication|security|master]", Short: "generate basic configuration files", Long: `Generate filer.toml with all possible configurations for you to customize. @@ -19,7 +19,7 @@ var cmdScaffold = &Command{ var ( outputPath = cmdScaffold.Flag.String("output", "", "if not empty, save the configuration file to this directory") - config = cmdScaffold.Flag.String("config", "filer", "[filer|notification|replication|security] the configuration file to generate") + config = cmdScaffold.Flag.String("config", "filer", "[filer|notification|replication|security|master] the configuration file to generate") ) func runScaffold(cmd *Command, args []string) bool { @@ -34,6 +34,8 @@ func runScaffold(cmd *Command, args []string) bool { content = REPLICATION_TOML_EXAMPLE case "security": content = SECURITY_TOML_EXAMPLE + case "master": + content = MASTER_TOML_EXAMPLE } if content == "" { println("need a valid -config option") @@ -309,5 +311,22 @@ cert = "" key = "" +` + + MASTER_TOML_EXAMPLE = ` +# Put this file to one of the location, with descending priority +# ./master.toml +# $HOME/.seaweedfs/master.toml +# /etc/seaweedfs/master.toml +# this file is read by master + +[master.maintenance] +scripts = """ + ec.encode -fullPercent=95 -quietFor=1h + ec.rebuild -force + ec.balance -force + volume.balance -force +""" + ` ) diff --git a/weed/command/server.go b/weed/command/server.go index 630ba72a7..437b0ad83 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -2,9 +2,6 @@ package command import ( "fmt" - "github.com/chrislusf/raft/protobuf" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" "net/http" "os" "runtime" @@ -14,6 +11,10 @@ import ( "sync" "time" + "github.com/chrislusf/raft/protobuf" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/server" @@ -107,7 +108,8 @@ func init() { func runServer(cmd *Command, args []string) bool { - weed_server.LoadConfiguration("security", false) + util.LoadConfiguration("security", false) + util.LoadConfiguration("master", false) if *serverOptions.cpuprofile != "" { f, err := os.Create(*serverOptions.cpuprofile) diff --git a/weed/command/shell.go b/weed/command/shell.go index 95b62f0b5..79f8b8bf9 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -2,8 +2,8 @@ package command import ( "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/shell" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/spf13/viper" ) @@ -28,7 +28,7 @@ var () func runShell(command *Command, args []string) bool { - weed_server.LoadConfiguration("security", false) + util.LoadConfiguration("security", false) shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") shellOptions.FilerHost = "localhost" diff --git a/weed/command/upload.go b/weed/command/upload.go index 1271725ba..25e938d9b 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -3,12 +3,13 @@ package command import ( "encoding/json" "fmt" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/server" - "github.com/spf13/viper" "os" "path/filepath" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/operation" ) @@ -61,7 +62,7 @@ var cmdUpload = &Command{ func runUpload(cmd *Command, args []string) bool { - weed_server.LoadConfiguration("security", false) + util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") if len(args) == 0 { diff --git a/weed/command/volume.go b/weed/command/volume.go index 4e350b08d..c775ac5cf 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -1,8 +1,6 @@ package command import ( - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" "net/http" "os" "runtime" @@ -11,6 +9,9 @@ import ( "strings" "time" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/server" @@ -83,7 +84,7 @@ var ( func runVolume(cmd *Command, args []string) bool { - weed_server.LoadConfiguration("security", false) + util.LoadConfiguration("security", false) if *v.maxCpu < 1 { *v.maxCpu = runtime.NumCPU() diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 125893dfa..371c4a9ad 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -45,7 +45,7 @@ var cmdWebDav = &Command{ func runWebDav(cmd *Command, args []string) bool { - weed_server.LoadConfiguration("security", false) + util.LoadConfiguration("security", false) glog.V(0).Infof("Starting Seaweed WebDav Server %s at https port %d", util.VERSION, *webDavStandaloneOptions.port) diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go new file mode 100644 index 000000000..01b87cad1 --- /dev/null +++ b/weed/filer2/stream.go @@ -0,0 +1,41 @@ +package filer2 + +import ( + "io" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/wdclient" +) + +func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int) error { + + chunkViews := ViewFromChunks(chunks, offset, size) + + fileId2Url := make(map[string]string) + + for _, chunkView := range chunkViews { + + urlString, err := masterClient.LookupFileId(chunkView.FileId) + if err != nil { + glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) + return err + } + fileId2Url[chunkView.FileId] = urlString + } + + for _, chunkView := range chunkViews { + urlString := fileId2Url[chunkView.FileId] + _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) { + w.Write(data) + }) + if err != nil { + glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + return err + } + } + + return nil + +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 40be2d7cf..5d2a54f4d 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -4,6 +4,7 @@ import ( "net/http" "os" + "github.com/chrislusf/seaweedfs/weed/util" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/filer2" @@ -60,7 +61,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) go fs.filer.KeepConnectedToMaster() v := viper.GetViper() - if !LoadConfiguration("filer", false) { + if !util.LoadConfiguration("filer", false) { v.Set("leveldb.enabled", true) v.Set("leveldb.dir", option.DefaultLevelDbDir) _, err := os.Stat(option.DefaultLevelDbDir) @@ -68,7 +69,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) os.MkdirAll(option.DefaultLevelDbDir, 0755) } } - LoadConfiguration("notification", false) + util.LoadConfiguration("notification", false) fs.filer.LoadConfiguration(v) @@ -85,31 +86,3 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) return fs, nil } -func LoadConfiguration(configFileName string, required bool) (loaded bool) { - - // find a filer store - viper.SetConfigName(configFileName) // name of config file (without extension) - viper.AddConfigPath(".") // optionally look for config in the working directory - viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths - viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in - - glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed()) - - if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file - glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err) - if required { - glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+ - "\n\nPlease follow this example and add a filer.toml file to "+ - "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+ - " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n"+ - "\nOr use this command to generate the default toml file\n"+ - " weed scaffold -config=%s -output=.\n\n\n", - configFileName, configFileName, configFileName) - } else { - return false - } - } - - return true - -} diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index ad057f1d3..8477c3783 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -14,9 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/wdclient" ) func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { @@ -233,37 +231,7 @@ func (fs *FilerServer) handleMultipleChunks(w http.ResponseWriter, r *http.Reque func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int64, size int) error { - return StreamContent(fs.filer.MasterClient, w, entry.Chunks, offset, size) + return filer2.StreamContent(fs.filer.MasterClient, w, entry.Chunks, offset, size) } -func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int) error { - - chunkViews := filer2.ViewFromChunks(chunks, offset, size) - - fileId2Url := make(map[string]string) - - for _, chunkView := range chunkViews { - - urlString, err := masterClient.LookupFileId(chunkView.FileId) - if err != nil { - glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) - return err - } - fileId2Url[chunkView.FileId] = urlString - } - - for _, chunkView := range chunkViews { - urlString := fileId2Url[chunkView.FileId] - _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) { - w.Write(data) - }) - if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) - return err - } - } - - return nil - -} diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 506b9782d..0e41a7baa 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -125,7 +125,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if len(heartbeat.EcShards) > 0 { - glog.V(0).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) + glog.V(1).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn) // broadcast the ec vid changes to master clients diff --git a/weed/server/master_server.go b/weed/server/master_server.go index e78bd58dc..9ba9546fd 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -2,11 +2,17 @@ package weed_server import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/shell" "google.golang.org/grpc" "net/http" "net/http/httputil" "net/url" + "os" + "regexp" + "strconv" + "strings" "sync" + "time" "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" @@ -99,6 +105,8 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, garbageThreshold, ms.preallocate) + ms.startAdminScripts() + return ms } @@ -153,3 +161,62 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ } } } + +func (ms *MasterServer) startAdminScripts() { + v := viper.GetViper() + adminScripts := v.GetString("master.maintenance.scripts") + + glog.V(0).Infof("adminScripts:\n%v", adminScripts) + if adminScripts == "" { + return + } + + scriptLines := strings.Split(adminScripts, "\n") + + masterAddress := "localhost:" + strconv.Itoa(ms.port) + + var shellOptions shell.ShellOptions + shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master") + shellOptions.Masters = &masterAddress + shellOptions.FilerHost = "localhost" + shellOptions.FilerPort = 8888 + shellOptions.Directory = "/" + + commandEnv := shell.NewCommandEnv(shellOptions) + + + reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`) + + go commandEnv.MasterClient.KeepConnectedToMaster() + + go func() { + commandEnv.MasterClient.WaitUntilConnected() + + c := time.Tick(17 * time.Second) + for _ = range c { + if ms.Topo.IsLeader() { + for _, line := range scriptLines { + + cmds := reg.FindAllString(line, -1) + if len(cmds) == 0 { + continue + } + args := make([]string, len(cmds[1:])) + for i := range args { + args[i] = strings.Trim(string(cmds[1+i]), "\"'") + } + cmd := strings.ToLower(cmds[0]) + + for _, c := range shell.Commands { + if c.Name() == cmd { + glog.V(0).Infof("executing: %s %v", cmd, args) + if err := c.Do(args, commandEnv, os.Stdout); err != nil { + glog.V(0).Infof("error: %v", err) + } + } + } + } + } + } + }() +} diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go index e3356aba4..fbaddcd51 100644 --- a/weed/shell/command_collection_delete.go +++ b/weed/shell/command_collection_delete.go @@ -8,7 +8,7 @@ import ( ) func init() { - commands = append(commands, &commandCollectionDelete{}) + Commands = append(Commands, &commandCollectionDelete{}) } type commandCollectionDelete struct { @@ -26,7 +26,7 @@ func (c *commandCollectionDelete) Help() string { ` } -func (c *commandCollectionDelete) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { if len(args) == 0 { return nil @@ -35,7 +35,7 @@ func (c *commandCollectionDelete) Do(args []string, commandEnv *commandEnv, writ collectionName := args[0] ctx := context.Background() - err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { _, err = client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{ Name: collectionName, }) diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go index d1124adde..c4325c66f 100644 --- a/weed/shell/command_collection_list.go +++ b/weed/shell/command_collection_list.go @@ -8,7 +8,7 @@ import ( ) func init() { - commands = append(commands, &commandCollectionList{}) + Commands = append(Commands, &commandCollectionList{}) } type commandCollectionList struct { @@ -22,7 +22,7 @@ func (c *commandCollectionList) Help() string { return `list all collections` } -func (c *commandCollectionList) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { collections, err := ListCollectionNames(commandEnv, true, true) @@ -39,10 +39,10 @@ func (c *commandCollectionList) Do(args []string, commandEnv *commandEnv, writer return nil } -func ListCollectionNames(commandEnv *commandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) { +func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) { var resp *master_pb.CollectionListResponse ctx := context.Background() - err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { resp, err = client.CollectionList(ctx, &master_pb.CollectionListRequest{ IncludeNormalVolumes: includeNormalVolumes, IncludeEcVolumes: includeEcVolumes, diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 6d44a184b..424b63d9d 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -13,7 +13,7 @@ import ( ) func init() { - commands = append(commands, &commandEcBalance{}) + Commands = append(Commands, &commandEcBalance{}) } type commandEcBalance struct { @@ -53,7 +53,7 @@ func (c *commandEcBalance) Help() string { ` } -func (c *commandEcBalance) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") @@ -65,7 +65,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *commandEnv, writer io.W var resp *master_pb.VolumeListResponse ctx := context.Background() - err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) return err }) @@ -104,7 +104,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *commandEnv, writer io.W return nil } -func balanceEcVolumes(commandEnv *commandEnv, collection string, applyBalancing bool) error { +func balanceEcVolumes(commandEnv *CommandEnv, collection string, applyBalancing bool) error { ctx := context.Background() @@ -142,7 +142,7 @@ func balanceEcVolumes(commandEnv *commandEnv, collection string, applyBalancing return nil } -func doBalanceEcShards(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId, locations []*EcNode, allEcNodes []*EcNode, applyBalancing bool) error { +func doBalanceEcShards(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, allEcNodes []*EcNode, applyBalancing bool) error { // collect all ec nodes with at least one free slot var possibleDestinationEcNodes []*EcNode for _, ecNode := range allEcNodes { @@ -171,7 +171,7 @@ func doBalanceEcShards(ctx context.Context, commandEnv *commandEnv, collection s return nil } -func doDeduplicateEcShards(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error { +func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error { // check whether this volume has ecNodes that are over average shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount) @@ -205,7 +205,7 @@ func doDeduplicateEcShards(ctx context.Context, commandEnv *commandEnv, collecti return nil } -func spreadShardsIntoMoreDataNodes(ctx context.Context, commandEnv *commandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { +func spreadShardsIntoMoreDataNodes(ctx context.Context, commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { for _, ecNode := range existingLocations { @@ -232,7 +232,7 @@ func spreadShardsIntoMoreDataNodes(ctx context.Context, commandEnv *commandEnv, return nil } -func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *commandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { +func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { sortEcNodes(possibleDestinationEcNodes) diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index b5560b560..4c53ba43b 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -14,7 +14,7 @@ import ( "google.golang.org/grpc" ) -func moveMountedShardToEcNode(ctx context.Context, commandEnv *commandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error { +func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error { fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id) @@ -124,11 +124,11 @@ type EcNode struct { freeEcSlot int } -func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { +func collectEcNodes(ctx context.Context, commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { // list all possible locations var resp *master_pb.VolumeListResponse - err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) return err }) diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index e0d4cf380..8b01f6cfc 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -18,7 +18,7 @@ import ( ) func init() { - commands = append(commands, &commandEcEncode{}) + Commands = append(Commands, &commandEcEncode{}) } type commandEcEncode struct { @@ -51,7 +51,7 @@ func (c *commandEcEncode) Help() string { ` } -func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeId := encodeCommand.Int("volumeId", 0, "the volume id") @@ -85,9 +85,9 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr return nil } -func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId) (err error) { +func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) { // find volume location - locations := commandEnv.masterClient.GetLocations(uint32(vid)) + locations := commandEnv.MasterClient.GetLocations(uint32(vid)) if len(locations) == 0 { return fmt.Errorf("volume %d not found", vid) } @@ -121,7 +121,7 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum } -func spreadEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { +func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv) if err != nil { @@ -228,10 +228,10 @@ func balancedEcDistribution(servers []*EcNode) (allocated []int) { return allocated } -func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { +func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { var resp *master_pb.VolumeListResponse - err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) return err }) diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 4b82f96c0..9a5ea3ca9 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -14,7 +14,7 @@ import ( ) func init() { - commands = append(commands, &commandEcRebuild{}) + Commands = append(Commands, &commandEcRebuild{}) } type commandEcRebuild struct { @@ -54,7 +54,7 @@ func (c *commandEcRebuild) Help() string { ` } -func (c *commandEcRebuild) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") @@ -90,7 +90,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *commandEnv, writer io.W return nil } -func rebuildEcVolumes(commandEnv *commandEnv, allEcNodes []*EcNode, collection string, writer io.Writer, applyChanges bool) error { +func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, writer io.Writer, applyChanges bool) error { ctx := context.Background() @@ -125,7 +125,7 @@ func rebuildEcVolumes(commandEnv *commandEnv, allEcNodes []*EcNode, collection s return nil } -func rebuildOneEcVolume(ctx context.Context, commandEnv *commandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error { +func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error { fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId) @@ -182,7 +182,7 @@ func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption, return } -func prepareDataToRecover(ctx context.Context, commandEnv *commandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) { +func prepareDataToRecover(ctx context.Context, commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) { needEcxFile := true var localShardBits erasure_coding.ShardBits diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index a6c308d2a..66ced46c5 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -8,11 +8,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - weed_server "github.com/chrislusf/seaweedfs/weed/server" ) func init() { - commands = append(commands, &commandFsCat{}) + Commands = append(Commands, &commandFsCat{}) } type commandFsCat struct { @@ -34,7 +33,7 @@ func (c *commandFsCat) Help() string { ` } -func (c *commandFsCat) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { input := findInputDirectory(args) @@ -62,7 +61,7 @@ func (c *commandFsCat) Do(args []string, commandEnv *commandEnv, writer io.Write return err } - return weed_server.StreamContent(commandEnv.masterClient, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt32) + return filer2.StreamContent(commandEnv.MasterClient, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt32) }) diff --git a/weed/shell/command_fs_cd.go b/weed/shell/command_fs_cd.go index f14350f02..408ec86c8 100644 --- a/weed/shell/command_fs_cd.go +++ b/weed/shell/command_fs_cd.go @@ -6,7 +6,7 @@ import ( ) func init() { - commands = append(commands, &commandFsCd{}) + Commands = append(Commands, &commandFsCd{}) } type commandFsCd struct { @@ -29,7 +29,7 @@ func (c *commandFsCd) Help() string { ` } -func (c *commandFsCd) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandFsCd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { input := findInputDirectory(args) diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index f305cabdc..5e634c82a 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -11,7 +11,7 @@ import ( ) func init() { - commands = append(commands, &commandFsDu{}) + Commands = append(Commands, &commandFsDu{}) } type commandFsDu struct { @@ -30,7 +30,7 @@ func (c *commandFsDu) Help() string { ` } -func (c *commandFsDu) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args)) if err != nil { @@ -106,7 +106,7 @@ func paginateDirectory(ctx context.Context, writer io.Writer, client filer_pb.Se } -func (env *commandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error { +func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error { filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000) return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go index 5c5130c53..6979635e1 100644 --- a/weed/shell/command_fs_ls.go +++ b/weed/shell/command_fs_ls.go @@ -13,7 +13,7 @@ import ( ) func init() { - commands = append(commands, &commandFsLs{}) + Commands = append(Commands, &commandFsLs{}) } type commandFsLs struct { @@ -35,7 +35,7 @@ func (c *commandFsLs) Help() string { ` } -func (c *commandFsLs) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { var isLongFormat, showHidden bool for _, arg := range args { diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go index 2c5d19015..5ea8de9f5 100644 --- a/weed/shell/command_fs_meta_load.go +++ b/weed/shell/command_fs_meta_load.go @@ -13,7 +13,7 @@ import ( ) func init() { - commands = append(commands, &commandFsMetaLoad{}) + Commands = append(Commands, &commandFsMetaLoad{}) } type commandFsMetaLoad struct { @@ -31,7 +31,7 @@ func (c *commandFsMetaLoad) Help() string { ` } -func (c *commandFsMetaLoad) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { if len(args) == 0 { fmt.Fprintf(writer, "missing a metadata file\n") diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go index ca4d8da5b..13b272fbf 100644 --- a/weed/shell/command_fs_meta_notify.go +++ b/weed/shell/command_fs_meta_notify.go @@ -8,12 +8,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/notification" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - weed_server "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/spf13/viper" ) func init() { - commands = append(commands, &commandFsMetaNotify{}) + Commands = append(Commands, &commandFsMetaNotify{}) } type commandFsMetaNotify struct { @@ -33,14 +33,14 @@ func (c *commandFsMetaNotify) Help() string { ` } -func (c *commandFsMetaNotify) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args)) if err != nil { return err } - weed_server.LoadConfiguration("notification", true) + util.LoadConfiguration("notification", true) v := viper.GetViper() notification.LoadConfiguration(v.Sub("notification")) diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index 47426a858..6ca395fae 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -14,7 +14,7 @@ import ( ) func init() { - commands = append(commands, &commandFsMetaSave{}) + Commands = append(Commands, &commandFsMetaSave{}) } type commandFsMetaSave struct { @@ -40,7 +40,7 @@ func (c *commandFsMetaSave) Help() string { ` } -func (c *commandFsMetaSave) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args)) if err != nil { diff --git a/weed/shell/command_fs_pwd.go b/weed/shell/command_fs_pwd.go index 0b0a7f176..084a5e90a 100644 --- a/weed/shell/command_fs_pwd.go +++ b/weed/shell/command_fs_pwd.go @@ -6,7 +6,7 @@ import ( ) func init() { - commands = append(commands, &commandFsPwd{}) + Commands = append(Commands, &commandFsPwd{}) } type commandFsPwd struct { @@ -20,7 +20,7 @@ func (c *commandFsPwd) Help() string { return `print out current directory` } -func (c *commandFsPwd) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandFsPwd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { fmt.Fprintf(writer, "http://%s:%d%s\n", commandEnv.option.FilerHost, diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go index f1ffc9e4b..8474e43ea 100644 --- a/weed/shell/command_fs_tree.go +++ b/weed/shell/command_fs_tree.go @@ -10,7 +10,7 @@ import ( ) func init() { - commands = append(commands, &commandFsTree{}) + Commands = append(Commands, &commandFsTree{}) } type commandFsTree struct { @@ -27,7 +27,7 @@ func (c *commandFsTree) Help() string { ` } -func (c *commandFsTree) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args)) if err != nil { diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 683e2dbe8..1ae031658 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -15,7 +15,7 @@ import ( ) func init() { - commands = append(commands, &commandVolumeBalance{}) + Commands = append(Commands, &commandVolumeBalance{}) } type commandVolumeBalance struct { @@ -59,7 +59,7 @@ func (c *commandVolumeBalance) Help() string { ` } -func (c *commandVolumeBalance) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection") @@ -71,7 +71,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *commandEnv, writer var resp *master_pb.VolumeListResponse ctx := context.Background() - err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) return err }) @@ -108,7 +108,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *commandEnv, writer return nil } -func balanceVolumeServers(commandEnv *commandEnv, dataNodeInfos []*master_pb.DataNodeInfo, volumeSizeLimit uint64, collection string, applyBalancing bool) error { +func balanceVolumeServers(commandEnv *CommandEnv, dataNodeInfos []*master_pb.DataNodeInfo, volumeSizeLimit uint64, collection string, applyBalancing bool) error { var nodes []*Node for _, dn := range dataNodeInfos { nodes = append(nodes, &Node{ @@ -181,7 +181,7 @@ func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) { }) } -func balanceSelectedVolume(commandEnv *commandEnv, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) error { +func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) error { selectedVolumeCount := 0 for _, dn := range nodes { selectedVolumeCount += len(dn.selectedVolumes) @@ -223,7 +223,7 @@ func balanceSelectedVolume(commandEnv *commandEnv, nodes []*Node, sortCandidates return nil } -func moveVolume(commandEnv *commandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyBalancing bool) error { +func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyBalancing bool) error { collectionPrefix := v.Collection + "_" if v.Collection == "" { collectionPrefix = "" diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go index 8f14e25b3..1c83ba655 100644 --- a/weed/shell/command_volume_copy.go +++ b/weed/shell/command_volume_copy.go @@ -9,7 +9,7 @@ import ( ) func init() { - commands = append(commands, &commandVolumeCopy{}) + Commands = append(Commands, &commandVolumeCopy{}) } type commandVolumeCopy struct { @@ -30,7 +30,7 @@ func (c *commandVolumeCopy) Help() string { ` } -func (c *commandVolumeCopy) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { if len(args) != 3 { fmt.Fprintf(writer, "received args: %+v\n", args) diff --git a/weed/shell/command_volume_delete.go b/weed/shell/command_volume_delete.go index 748c89eed..17d27ea3a 100644 --- a/weed/shell/command_volume_delete.go +++ b/weed/shell/command_volume_delete.go @@ -9,7 +9,7 @@ import ( ) func init() { - commands = append(commands, &commandVolumeDelete{}) + Commands = append(Commands, &commandVolumeDelete{}) } type commandVolumeDelete struct { @@ -29,7 +29,7 @@ func (c *commandVolumeDelete) Help() string { ` } -func (c *commandVolumeDelete) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { if len(args) != 2 { fmt.Fprintf(writer, "received args: %+v\n", args) diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index cff4adf89..09e1c19eb 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -13,7 +13,7 @@ import ( ) func init() { - commands = append(commands, &commandVolumeFixReplication{}) + Commands = append(Commands, &commandVolumeFixReplication{}) } type commandVolumeFixReplication struct { @@ -41,7 +41,7 @@ func (c *commandVolumeFixReplication) Help() string { ` } -func (c *commandVolumeFixReplication) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { takeAction := true if len(args) > 0 && args[0] == "-n" { @@ -50,7 +50,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *commandEnv, var resp *master_pb.VolumeListResponse ctx := context.Background() - err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) return err }) diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go index a402c1e51..134580ffe 100644 --- a/weed/shell/command_volume_list.go +++ b/weed/shell/command_volume_list.go @@ -11,7 +11,7 @@ import ( ) func init() { - commands = append(commands, &commandVolumeList{}) + Commands = append(Commands, &commandVolumeList{}) } type commandVolumeList struct { @@ -29,11 +29,11 @@ func (c *commandVolumeList) Help() string { ` } -func (c *commandVolumeList) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { var resp *master_pb.VolumeListResponse ctx := context.Background() - err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) return err }) diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go index 37dd7765f..50a307492 100644 --- a/weed/shell/command_volume_mount.go +++ b/weed/shell/command_volume_mount.go @@ -12,7 +12,7 @@ import ( ) func init() { - commands = append(commands, &commandVolumeMount{}) + Commands = append(Commands, &commandVolumeMount{}) } type commandVolumeMount struct { @@ -32,7 +32,7 @@ func (c *commandVolumeMount) Help() string { ` } -func (c *commandVolumeMount) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { if len(args) != 2 { fmt.Fprintf(writer, "received args: %+v\n", args) diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index 03b1446e6..08d87c988 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -14,7 +14,7 @@ import ( ) func init() { - commands = append(commands, &commandVolumeMove{}) + Commands = append(Commands, &commandVolumeMove{}) } type commandVolumeMove struct { @@ -42,7 +42,7 @@ func (c *commandVolumeMove) Help() string { ` } -func (c *commandVolumeMove) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { if len(args) != 3 { fmt.Fprintf(writer, "received args: %+v\n", args) diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go index ce9f666af..8096f34d8 100644 --- a/weed/shell/command_volume_unmount.go +++ b/weed/shell/command_volume_unmount.go @@ -12,7 +12,7 @@ import ( ) func init() { - commands = append(commands, &commandVolumeUnmount{}) + Commands = append(Commands, &commandVolumeUnmount{}) } type commandVolumeUnmount struct { @@ -32,7 +32,7 @@ func (c *commandVolumeUnmount) Help() string { ` } -func (c *commandVolumeUnmount) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { if len(args) != 2 { fmt.Fprintf(writer, "received args: %+v\n", args) diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 6fade6e9d..ade8acd89 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -3,15 +3,16 @@ package shell import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/wdclient" - "google.golang.org/grpc" "io" "net/url" "path/filepath" "strconv" "strings" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "google.golang.org/grpc" ) type ShellOptions struct { @@ -23,23 +24,33 @@ type ShellOptions struct { Directory string } -type commandEnv struct { +type CommandEnv struct { env map[string]string - masterClient *wdclient.MasterClient + MasterClient *wdclient.MasterClient option ShellOptions } type command interface { Name() string Help() string - Do([]string, *commandEnv, io.Writer) error + Do([]string, *CommandEnv, io.Writer) error } var ( - commands = []command{} + Commands = []command{} ) -func (ce *commandEnv) parseUrl(input string) (filerServer string, filerPort int64, path string, err error) { + +func NewCommandEnv(options ShellOptions) *CommandEnv { + return &CommandEnv{ + env: make(map[string]string), + MasterClient: wdclient.NewMasterClient(context.Background(), + options.GrpcDialOption, "shell", strings.Split(*options.Masters, ",")), + option: options, + } +} + +func (ce *CommandEnv) parseUrl(input string) (filerServer string, filerPort int64, path string, err error) { if strings.HasPrefix(input, "http") { return parseFilerUrl(input) } @@ -49,13 +60,13 @@ func (ce *commandEnv) parseUrl(input string) (filerServer string, filerPort int6 return ce.option.FilerHost, ce.option.FilerPort, input, err } -func (ce *commandEnv) isDirectory(ctx context.Context, filerServer string, filerPort int64, path string) bool { +func (ce *CommandEnv) isDirectory(ctx context.Context, filerServer string, filerPort int64, path string) bool { return ce.checkDirectory(ctx, filerServer, filerPort, path) == nil } -func (ce *commandEnv) checkDirectory(ctx context.Context, filerServer string, filerPort int64, path string) error { +func (ce *CommandEnv) checkDirectory(ctx context.Context, filerServer string, filerPort int64, path string) error { dir, name := filer2.FullPath(path).DirAndName() diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go index 2c3058cbf..a4f17e0fa 100644 --- a/weed/shell/shell_liner.go +++ b/weed/shell/shell_liner.go @@ -1,17 +1,16 @@ package shell import ( - "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/wdclient" "io" "os" "path" "regexp" "strings" - "github.com/peterh/liner" "sort" + + "github.com/peterh/liner" ) var ( @@ -33,15 +32,10 @@ func RunShell(options ShellOptions) { reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`) - commandEnv := &commandEnv{ - env: make(map[string]string), - masterClient: wdclient.NewMasterClient(context.Background(), - options.GrpcDialOption, "shell", strings.Split(*options.Masters, ",")), - option: options, - } + commandEnv := NewCommandEnv(options) - go commandEnv.masterClient.KeepConnectedToMaster() - commandEnv.masterClient.WaitUntilConnected() + go commandEnv.MasterClient.KeepConnectedToMaster() + commandEnv.MasterClient.WaitUntilConnected() for { cmd, err := line.Prompt("> ") @@ -71,7 +65,7 @@ func RunShell(options ShellOptions) { return } else { foundCommand := false - for _, c := range commands { + for _, c := range Commands { if c.Name() == cmd { if err := c.Do(args, commandEnv, os.Stdout); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) @@ -94,10 +88,10 @@ func printGenericHelp() { ` fmt.Print(msg) - sort.Slice(commands, func(i, j int) bool { - return strings.Compare(commands[i].Name(), commands[j].Name()) < 0 + sort.Slice(Commands, func(i, j int) bool { + return strings.Compare(Commands[i].Name(), Commands[j].Name()) < 0 }) - for _, c := range commands { + for _, c := range Commands { helpTexts := strings.SplitN(c.Help(), "\n", 2) fmt.Printf(" %-30s\t# %s \n", c.Name(), helpTexts[0]) } @@ -112,11 +106,11 @@ func printHelp(cmds []string) { } else { cmd := strings.ToLower(args[0]) - sort.Slice(commands, func(i, j int) bool { - return strings.Compare(commands[i].Name(), commands[j].Name()) < 0 + sort.Slice(Commands, func(i, j int) bool { + return strings.Compare(Commands[i].Name(), Commands[j].Name()) < 0 }) - for _, c := range commands { + for _, c := range Commands { if c.Name() == cmd { fmt.Printf(" %s\t# %s\n", c.Name(), c.Help()) } @@ -126,7 +120,7 @@ func printHelp(cmds []string) { func setCompletionHandler() { line.SetCompleter(func(line string) (c []string) { - for _, i := range commands { + for _, i := range Commands { if strings.HasPrefix(i.Name(), strings.ToLower(line)) { c = append(c, i.Name()) } diff --git a/weed/util/config.go b/weed/util/config.go index 77cab3019..1ea833d1f 100644 --- a/weed/util/config.go +++ b/weed/util/config.go @@ -1,5 +1,10 @@ package util +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/spf13/viper" +) + type Configuration interface { GetString(key string) string GetBool(key string) bool @@ -8,3 +13,32 @@ type Configuration interface { GetFloat64(key string) float64 GetStringSlice(key string) []string } + +func LoadConfiguration(configFileName string, required bool) (loaded bool) { + + // find a filer store + viper.SetConfigName(configFileName) // name of config file (without extension) + viper.AddConfigPath(".") // optionally look for config in the working directory + viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths + viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in + + glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed()) + + if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file + glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err) + if required { + glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+ + "\n\nPlease follow this example and add a filer.toml file to "+ + "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+ + " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n"+ + "\nOr use this command to generate the default toml file\n"+ + " weed scaffold -config=%s -output=.\n\n\n", + configFileName, configFileName, configFileName) + } else { + return false + } + } + + return true + +}