mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
fix master maintenance logic
Signed-off-by: Lei Liu <lei01.liu@horizon.ai>
This commit is contained in:
parent
ee90236a97
commit
46755ea1e1
|
@ -26,9 +26,9 @@ type VolumeFileScanner4SeeDat struct {
|
||||||
version needle.Version
|
version needle.Version
|
||||||
block storage.SuperBlock
|
block storage.SuperBlock
|
||||||
|
|
||||||
dir string
|
dir string
|
||||||
hashes map[string]bool
|
hashes map[string]bool
|
||||||
dat *os.File
|
dat *os.File
|
||||||
datBackend backend.DataStorageBackend
|
datBackend backend.DataStorageBackend
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -346,6 +346,8 @@ scripts = """
|
||||||
"""
|
"""
|
||||||
sleep_minutes = 17 # sleep minutes between each script execution
|
sleep_minutes = 17 # sleep minutes between each script execution
|
||||||
|
|
||||||
|
filer_url = "http://localhost:8888/"
|
||||||
|
|
||||||
sequencer_type = memory # Choose [memory|etcd] type for storing the file id sequence
|
sequencer_type = memory # Choose [memory|etcd] type for storing the file id sequence
|
||||||
|
|
||||||
# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
|
# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
|
||||||
|
|
|
@ -2,9 +2,6 @@ package command
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/security"
|
"github.com/chrislusf/seaweedfs/weed/security"
|
||||||
"github.com/chrislusf/seaweedfs/weed/shell"
|
"github.com/chrislusf/seaweedfs/weed/shell"
|
||||||
|
@ -37,7 +34,7 @@ func runShell(command *Command, args []string) bool {
|
||||||
shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
|
shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
|
||||||
|
|
||||||
var filerPwdErr error
|
var filerPwdErr error
|
||||||
shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, filerPwdErr = parseFilerUrl(*shellInitialFilerUrl)
|
shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, filerPwdErr = util.ParseFilerUrl(*shellInitialFilerUrl)
|
||||||
if filerPwdErr != nil {
|
if filerPwdErr != nil {
|
||||||
fmt.Printf("failed to parse url filer.url=%s : %v\n", *shellInitialFilerUrl, filerPwdErr)
|
fmt.Printf("failed to parse url filer.url=%s : %v\n", *shellInitialFilerUrl, filerPwdErr)
|
||||||
return false
|
return false
|
||||||
|
@ -48,22 +45,3 @@ func runShell(command *Command, args []string) bool {
|
||||||
return true
|
return true
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
|
|
||||||
if !strings.HasPrefix(entryPath, "http://") && !strings.HasPrefix(entryPath, "https://") {
|
|
||||||
entryPath = "http://" + entryPath
|
|
||||||
}
|
|
||||||
|
|
||||||
var u *url.URL
|
|
||||||
u, err = url.Parse(entryPath)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
filerServer = u.Hostname()
|
|
||||||
portString := u.Port()
|
|
||||||
if portString != "" {
|
|
||||||
filerPort, err = strconv.ParseInt(portString, 10, 32)
|
|
||||||
}
|
|
||||||
path = u.Path
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
|
@ -47,13 +47,13 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum
|
||||||
lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
||||||
|
|
||||||
req := &master_pb.AssignRequest{
|
req := &master_pb.AssignRequest{
|
||||||
Count: primaryRequest.Count,
|
Count: primaryRequest.Count,
|
||||||
Replication: primaryRequest.Replication,
|
Replication: primaryRequest.Replication,
|
||||||
Collection: primaryRequest.Collection,
|
Collection: primaryRequest.Collection,
|
||||||
Ttl: primaryRequest.Ttl,
|
Ttl: primaryRequest.Ttl,
|
||||||
DataCenter: primaryRequest.DataCenter,
|
DataCenter: primaryRequest.DataCenter,
|
||||||
Rack: primaryRequest.Rack,
|
Rack: primaryRequest.Rack,
|
||||||
DataNode: primaryRequest.DataNode,
|
DataNode: primaryRequest.DataNode,
|
||||||
WritableVolumeCount: primaryRequest.WritableVolumeCount,
|
WritableVolumeCount: primaryRequest.WritableVolumeCount,
|
||||||
}
|
}
|
||||||
resp, grpcErr := masterClient.Assign(context.Background(), req)
|
resp, grpcErr := masterClient.Assign(context.Background(), req)
|
||||||
|
|
|
@ -182,16 +182,21 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *MasterServer) startAdminScripts() {
|
func (ms *MasterServer) startAdminScripts() {
|
||||||
|
var err error
|
||||||
|
|
||||||
v := viper.GetViper()
|
v := viper.GetViper()
|
||||||
adminScripts := v.GetString("master.maintenance.scripts")
|
adminScripts := v.GetString("master.maintenance.scripts")
|
||||||
v.SetDefault("master.maintenance.sleep_minutes", 17)
|
|
||||||
sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
|
|
||||||
|
|
||||||
glog.V(0).Infof("adminScripts:\n%v", adminScripts)
|
glog.V(0).Infof("adminScripts:\n%v", adminScripts)
|
||||||
if adminScripts == "" {
|
if adminScripts == "" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
v.SetDefault("master.maintenance.sleep_minutes", 17)
|
||||||
|
sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
|
||||||
|
|
||||||
|
v.SetDefault("master.maintenance.filer_url", "http://localhost:8888/")
|
||||||
|
filerURL := v.GetString("master.maintenance.filer_url")
|
||||||
|
|
||||||
scriptLines := strings.Split(adminScripts, "\n")
|
scriptLines := strings.Split(adminScripts, "\n")
|
||||||
|
|
||||||
masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
|
masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
|
||||||
|
@ -199,9 +204,12 @@ func (ms *MasterServer) startAdminScripts() {
|
||||||
var shellOptions shell.ShellOptions
|
var shellOptions shell.ShellOptions
|
||||||
shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master")
|
shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master")
|
||||||
shellOptions.Masters = &masterAddress
|
shellOptions.Masters = &masterAddress
|
||||||
shellOptions.FilerHost = "localhost"
|
|
||||||
shellOptions.FilerPort = 8888
|
shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, err = util.ParseFilerUrl(filerURL)
|
||||||
shellOptions.Directory = "/"
|
if err != nil {
|
||||||
|
glog.V(0).Infof("failed to parse master.maintenance.filer_url=%s : %v\n", filerURL, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
commandEnv := shell.NewCommandEnv(shellOptions)
|
commandEnv := shell.NewCommandEnv(shellOptions)
|
||||||
|
|
||||||
|
|
|
@ -17,14 +17,14 @@ type MemoryMappedFile struct {
|
||||||
|
|
||||||
func NewMemoryMappedFile(f *os.File, memoryMapSizeMB uint32) *MemoryMappedFile {
|
func NewMemoryMappedFile(f *os.File, memoryMapSizeMB uint32) *MemoryMappedFile {
|
||||||
mmf := &MemoryMappedFile{
|
mmf := &MemoryMappedFile{
|
||||||
mm : new(MemoryMap),
|
mm: new(MemoryMap),
|
||||||
}
|
}
|
||||||
mmf.mm.CreateMemoryMap(f, 1024*1024*uint64(memoryMapSizeMB))
|
mmf.mm.CreateMemoryMap(f, 1024*1024*uint64(memoryMapSizeMB))
|
||||||
return mmf
|
return mmf
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mmf *MemoryMappedFile) ReadAt(p []byte, off int64) (n int, err error) {
|
func (mmf *MemoryMappedFile) ReadAt(p []byte, off int64) (n int, err error) {
|
||||||
readBytes, e := mmf.mm.ReadMemory(uint64(off), uint64(len(p)))
|
readBytes, e := mmf.mm.ReadMemory(uint64(off), uint64(len(p)))
|
||||||
if e != nil {
|
if e != nil {
|
||||||
return 0, e
|
return 0, e
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,4 +45,3 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) {
|
||||||
func Config() Configuration {
|
func Config() Configuration {
|
||||||
return viper.GetViper()
|
return viper.GetViper()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
package util
|
package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ParseInt(text string, defaultValue int) int {
|
func ParseInt(text string, defaultValue int) int {
|
||||||
|
@ -24,3 +26,22 @@ func ParseUint64(text string, defaultValue uint64) uint64 {
|
||||||
}
|
}
|
||||||
return count
|
return count
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ParseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
|
||||||
|
if !strings.HasPrefix(entryPath, "http://") && !strings.HasPrefix(entryPath, "https://") {
|
||||||
|
entryPath = "http://" + entryPath
|
||||||
|
}
|
||||||
|
|
||||||
|
var u *url.URL
|
||||||
|
u, err = url.Parse(entryPath)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
filerServer = u.Hostname()
|
||||||
|
portString := u.Port()
|
||||||
|
if portString != "" {
|
||||||
|
filerPort, err = strconv.ParseInt(portString, 10, 32)
|
||||||
|
}
|
||||||
|
path = u.Path
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue