merge notification config with filer.toml

This commit is contained in:
Chris Lu 2018-08-19 15:17:55 -07:00
parent c91372daa6
commit f827ada811
17 changed files with 70 additions and 236 deletions

View file

@ -50,7 +50,6 @@ func init() {
f.secretKey = cmdFiler.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") f.secretKey = cmdFiler.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 1000, "limit sub dir listing size") f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 1000, "limit sub dir listing size")
f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to write to volumes in this data center") f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to write to volumes in this data center")
f.enableNotification = cmdFiler.Flag.Bool("notify", false, "send file updates to the queue defined in message_queue.toml")
} }
var cmdFiler = &Command{ var cmdFiler = &Command{
@ -100,7 +99,6 @@ func (fo *FilerOptions) start() {
SecretKey: *fo.secretKey, SecretKey: *fo.secretKey,
DirListingLimit: *fo.dirListingLimit, DirListingLimit: *fo.dirListingLimit,
DataCenter: *fo.dataCenter, DataCenter: *fo.dataCenter,
EnableNotification: *fo.enableNotification,
}) })
if nfs_err != nil { if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err) glog.Fatalf("Filer startup error: %v", nfs_err)

View file

@ -91,7 +91,6 @@ func init() {
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit") filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit")
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size") filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
filerOptions.enableNotification = cmdServer.Flag.Bool("filer.notify", false, "send file updates to the queue defined in message_queue.toml")
} }
func runServer(cmd *Command, args []string) bool { func runServer(cmd *Command, args []string) bool {

View file

@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/gocql/gocql" "github.com/gocql/gocql"
"github.com/chrislusf/seaweedfs/weed/util"
) )
func init() { func init() {
@ -20,7 +21,7 @@ func (store *CassandraStore) GetName() string {
return "cassandra" return "cassandra"
} }
func (store *CassandraStore) Initialize(configuration filer2.Configuration) (err error) { func (store *CassandraStore) Initialize(configuration util.Configuration) (err error) {
return store.initialize( return store.initialize(
configuration.GetString("keyspace"), configuration.GetString("keyspace"),
configuration.GetStringSlice("hosts"), configuration.GetStringSlice("hosts"),

View file

@ -4,7 +4,7 @@ import (
"os" "os"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
const ( const (
@ -91,29 +91,17 @@ var (
Stores []FilerStore Stores []FilerStore
) )
func (f *Filer) LoadConfiguration() { func (f *Filer) LoadConfiguration(config *viper.Viper) {
// find a filer store
viper.SetConfigName("filer") // name of config file (without extension)
viper.AddConfigPath(".") // optionally look for config in the working directory
viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
if err := viper.ReadInConfig(); err != nil { // Handle errors reading the config file
glog.Fatalf("Failed to load filer.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/" +
"\n\nPlease follow this example and add a filer.toml file to " +
"current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n" + FILER_TOML_EXAMPLE)
}
glog.V(0).Infof("Reading filer configuration from %s", viper.ConfigFileUsed())
for _, store := range Stores { for _, store := range Stores {
if viper.GetBool(store.GetName() + ".enabled") { if config.GetBool(store.GetName() + ".enabled") {
viperSub := viper.Sub(store.GetName()) viperSub := config.Sub(store.GetName())
if err := store.Initialize(viperSub); err != nil { if err := store.Initialize(viperSub); err != nil {
glog.Fatalf("Failed to initialize store for %s: %+v", glog.Fatalf("Failed to initialize store for %s: %+v",
store.GetName(), err) store.GetName(), err)
} }
f.SetStore(store) f.SetStore(store)
glog.V(0).Infof("Configure filer for %s from %s", store.GetName(), viper.ConfigFileUsed()) glog.V(0).Infof("Configure filer for %s", store.GetName())
return return
} }
} }
@ -124,19 +112,5 @@ func (f *Filer) LoadConfiguration() {
println(" " + store.GetName()) println(" " + store.GetName())
} }
println()
println("Please configure a supported filer store in", viper.ConfigFileUsed())
println()
os.Exit(-1) os.Exit(-1)
} }
// A simplified interface to decouple from Viper
type Configuration interface {
GetString(key string) string
GetBool(key string) bool
GetInt(key string) int
GetInt64(key string) int64
GetFloat64(key string) float64
GetStringSlice(key string) []string
}

View file

@ -2,13 +2,14 @@ package filer2
import ( import (
"errors" "errors"
"github.com/chrislusf/seaweedfs/weed/util"
) )
type FilerStore interface { type FilerStore interface {
// GetName gets the name to locate the configuration in filer.toml file // GetName gets the name to locate the configuration in filer.toml file
GetName() string GetName() string
// Initialize initializes the file store // Initialize initializes the file store
Initialize(configuration Configuration) error Initialize(configuration util.Configuration) error
InsertEntry(*Entry) error InsertEntry(*Entry) error
UpdateEntry(*Entry) (err error) UpdateEntry(*Entry) (err error)
// err == filer2.ErrNotFound if not found // err == filer2.ErrNotFound if not found

View file

@ -27,7 +27,7 @@ func (store *LevelDBStore) GetName() string {
return "leveldb" return "leveldb"
} }
func (store *LevelDBStore) Initialize(configuration filer2.Configuration) (err error) { func (store *LevelDBStore) Initialize(configuration weed_util.Configuration) (err error) {
dir := configuration.GetString("dir") dir := configuration.GetString("dir")
return store.initialize(dir) return store.initialize(dir)
} }

View file

@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/google/btree" "github.com/google/btree"
"strings" "strings"
"github.com/chrislusf/seaweedfs/weed/util"
) )
func init() { func init() {
@ -27,7 +28,7 @@ func (store *MemDbStore) GetName() string {
return "memory" return "memory"
} }
func (store *MemDbStore) Initialize(configuration filer2.Configuration) (err error) { func (store *MemDbStore) Initialize(configuration util.Configuration) (err error) {
store.tree = btree.New(8) store.tree = btree.New(8)
return nil return nil
} }

View file

@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/filer2/abstract_sql" "github.com/chrislusf/seaweedfs/weed/filer2/abstract_sql"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/chrislusf/seaweedfs/weed/util"
) )
const ( const (
@ -25,7 +26,7 @@ func (store *MysqlStore) GetName() string {
return "mysql" return "mysql"
} }
func (store *MysqlStore) Initialize(configuration filer2.Configuration) (err error) { func (store *MysqlStore) Initialize(configuration util.Configuration) (err error) {
return store.initialize( return store.initialize(
configuration.GetString("username"), configuration.GetString("username"),
configuration.GetString("password"), configuration.GetString("password"),

View file

@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/filer2/abstract_sql" "github.com/chrislusf/seaweedfs/weed/filer2/abstract_sql"
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/chrislusf/seaweedfs/weed/util"
) )
const ( const (
@ -25,7 +26,7 @@ func (store *PostgresStore) GetName() string {
return "postgres" return "postgres"
} }
func (store *PostgresStore) Initialize(configuration filer2.Configuration) (err error) { func (store *PostgresStore) Initialize(configuration util.Configuration) (err error) {
return store.initialize( return store.initialize(
configuration.GetString("username"), configuration.GetString("username"),
configuration.GetString("password"), configuration.GetString("password"),

View file

@ -3,6 +3,7 @@ package redis
import ( import (
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/go-redis/redis" "github.com/go-redis/redis"
"github.com/chrislusf/seaweedfs/weed/util"
) )
func init() { func init() {
@ -17,7 +18,7 @@ func (store *RedisClusterStore) GetName() string {
return "redis_cluster" return "redis_cluster"
} }
func (store *RedisClusterStore) Initialize(configuration filer2.Configuration) (err error) { func (store *RedisClusterStore) Initialize(configuration util.Configuration) (err error) {
return store.initialize( return store.initialize(
configuration.GetStringSlice("addresses"), configuration.GetStringSlice("addresses"),
) )

View file

@ -3,6 +3,7 @@ package redis
import ( import (
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/go-redis/redis" "github.com/go-redis/redis"
"github.com/chrislusf/seaweedfs/weed/util"
) )
func init() { func init() {
@ -17,7 +18,7 @@ func (store *RedisStore) GetName() string {
return "redis" return "redis"
} }
func (store *RedisStore) Initialize(configuration filer2.Configuration) (err error) { func (store *RedisStore) Initialize(configuration util.Configuration) (err error) {
return store.initialize( return store.initialize(
configuration.GetString("address"), configuration.GetString("address"),
configuration.GetString("password"), configuration.GetString("password"),

View file

@ -1,82 +1,29 @@
package msgqueue package msgqueue
import ( import (
"os"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
const (
MSG_QUEUE_TOML_EXAMPLE = `
# A sample TOML config file for SeaweedFS message queue
[log]
enabled = true
[kafka]
enabled = false
hosts = [
"localhost:9092"
]
topic = "seaweedfs_filer"
`
)
var ( var (
MessageQueues []MessageQueue MessageQueues []MessageQueue
Queue MessageQueue Queue MessageQueue
) )
func LoadConfiguration() { func LoadConfiguration(config *viper.Viper) {
// find a filer store
viper.SetConfigName("message_queue") // name of config file (without extension)
viper.AddConfigPath(".") // optionally look for config in the working directory
viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
if err := viper.ReadInConfig(); err != nil { // Handle errors reading the config file
glog.Fatalf("Failed to load message_queue.toml file from current directory, or $HOME/.seaweedfs/, "+
"or /etc/seaweedfs/"+
"\n\nPlease follow this example and add a message_queue.toml file to "+
"current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+MSG_QUEUE_TOML_EXAMPLE)
}
glog.V(0).Infof("Reading message queue configuration from %s", viper.ConfigFileUsed())
for _, store := range MessageQueues { for _, store := range MessageQueues {
if viper.GetBool(store.GetName() + ".enabled") { if config.GetBool(store.GetName() + ".enabled") {
viperSub := viper.Sub(store.GetName()) viperSub := config.Sub(store.GetName())
if err := store.Initialize(viperSub); err != nil { if err := store.Initialize(viperSub); err != nil {
glog.Fatalf("Failed to initialize store for %s: %+v", glog.Fatalf("Failed to initialize store for %s: %+v",
store.GetName(), err) store.GetName(), err)
} }
Queue = store Queue = store
glog.V(0).Infof("Configure message queue for %s from %s", store.GetName(), viper.ConfigFileUsed()) glog.V(0).Infof("Configure message queue for %s", store.GetName())
return return
} }
} }
println()
println("Supported message queues are:")
for _, store := range MessageQueues {
println(" " + store.GetName())
}
println()
println("Please configure a supported message queue in", viper.ConfigFileUsed())
println()
os.Exit(-1)
}
// A simplified interface to decouple from Viper
type Configuration interface {
GetString(key string) string
GetBool(key string) bool
GetInt(key string) int
GetInt64(key string) int64
GetFloat64(key string) float64
GetStringSlice(key string) []string
} }

View file

@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/msgqueue" "github.com/chrislusf/seaweedfs/weed/msgqueue"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/util"
) )
func init() { func init() {
@ -20,7 +21,7 @@ func (k *KafkaQueue) GetName() string {
return "kafka" return "kafka"
} }
func (k *KafkaQueue) Initialize(configuration msgqueue.Configuration) (err error) { func (k *KafkaQueue) Initialize(configuration util.Configuration) (err error) {
glog.V(0).Infof("filer.msgqueue.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) glog.V(0).Infof("filer.msgqueue.kafka.hosts: %v\n", configuration.GetStringSlice("hosts"))
glog.V(0).Infof("filer.msgqueue.kafka.topic: %v\n", configuration.GetString("topic")) glog.V(0).Infof("filer.msgqueue.kafka.topic: %v\n", configuration.GetString("topic"))
return k.initialize( return k.initialize(

View file

@ -4,6 +4,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/msgqueue" "github.com/chrislusf/seaweedfs/weed/msgqueue"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/util"
) )
func init() { func init() {
@ -17,7 +18,7 @@ func (k *LogQueue) GetName() string {
return "log" return "log"
} }
func (k *LogQueue) Initialize(configuration msgqueue.Configuration) (err error) { func (k *LogQueue) Initialize(configuration util.Configuration) (err error) {
return nil return nil
} }

View file

@ -1,11 +1,14 @@
package msgqueue package msgqueue
import "github.com/golang/protobuf/proto" import (
"github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/util"
)
type MessageQueue interface { type MessageQueue interface {
// GetName gets the name to locate the configuration in message_queue.toml file // GetName gets the name to locate the configuration in message_queue.toml file
GetName() string GetName() string
// Initialize initializes the file store // Initialize initializes the file store
Initialize(configuration Configuration) error Initialize(configuration util.Configuration) error
SendMessage(key string, message proto.Message) error SendMessage(key string, message proto.Message) error
} }

View file

@ -11,10 +11,11 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer2/postgres" _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer2/redis" _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/msgqueue"
_ "github.com/chrislusf/seaweedfs/weed/msgqueue/kafka" _ "github.com/chrislusf/seaweedfs/weed/msgqueue/kafka"
_ "github.com/chrislusf/seaweedfs/weed/msgqueue/log" _ "github.com/chrislusf/seaweedfs/weed/msgqueue/log"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/spf13/viper"
"github.com/chrislusf/seaweedfs/weed/msgqueue"
) )
type FilerOption struct { type FilerOption struct {
@ -27,7 +28,6 @@ type FilerOption struct {
SecretKey string SecretKey string
DirListingLimit int DirListingLimit int
DataCenter string DataCenter string
EnableNotification bool
} }
type FilerServer struct { type FilerServer struct {
@ -49,11 +49,12 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
go fs.filer.KeepConnectedToMaster() go fs.filer.KeepConnectedToMaster()
fs.filer.LoadConfiguration() loadConfiguration("filer", true)
v := viper.GetViper()
if fs.option.EnableNotification { fs.filer.LoadConfiguration(v)
msgqueue.LoadConfiguration()
} msgqueue.LoadConfiguration(v.Sub("notification"))
defaultMux.HandleFunc("/favicon.ico", faviconHandler) defaultMux.HandleFunc("/favicon.ico", faviconHandler)
defaultMux.HandleFunc("/", fs.filerHandler) defaultMux.HandleFunc("/", fs.filerHandler)
@ -67,3 +68,26 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
func (fs *FilerServer) jwt(fileId string) security.EncodedJwt { func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
return security.GenJwt(fs.secret, fileId) return security.GenJwt(fs.secret, fileId)
} }
func loadConfiguration(configFileName string, required bool) {
// find a filer store
viper.SetConfigName(configFileName) // name of config file (without extension)
viper.AddConfigPath(".") // optionally look for config in the working directory
viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
glog.V(0).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
if err := viper.ReadInConfig(); err != nil { // Handle errors reading the config file
glog.V(0).Infof("Reading %s: %v", configFileName, viper.ConfigFileUsed(), err)
if required {
glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+
"\n\nPlease follow this example and add a filer.toml file to "+
"current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+
" https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n",
configFileName, configFileName)
}
}
}

View file

@ -1,130 +1,10 @@
package util package util
// Copyright 2011 Numerotron Inc. type Configuration interface {
// Use of this source code is governed by an MIT-style license GetString(key string) string
// that can be found in the LICENSE file. GetBool(key string) bool
// GetInt(key string) int
// Developed at www.stathat.com by Patrick Crosby GetInt64(key string) int64
// Contact us on twitter with any questions: twitter.com/stat_hat GetFloat64(key string) float64
GetStringSlice(key string) []string
// The jconfig package provides a simple, basic configuration file parser using JSON.
import (
"bytes"
"encoding/json"
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
)
type Config struct {
data map[string]interface{}
filename string
}
func newConfig() *Config {
result := new(Config)
result.data = make(map[string]interface{})
return result
}
// Loads config information from a JSON file
func LoadConfig(filename string) *Config {
result := newConfig()
result.filename = filename
err := result.parse()
if err != nil {
glog.Fatalf("error loading config file %s: %s", filename, err)
}
return result
}
// Loads config information from a JSON string
func LoadConfigString(s string) *Config {
result := newConfig()
err := json.Unmarshal([]byte(s), &result.data)
if err != nil {
glog.Fatalf("error parsing config string %s: %s", s, err)
}
return result
}
func (c *Config) StringMerge(s string) {
next := LoadConfigString(s)
c.merge(next.data)
}
func (c *Config) LoadMerge(filename string) {
next := LoadConfig(filename)
c.merge(next.data)
}
func (c *Config) merge(ndata map[string]interface{}) {
for k, v := range ndata {
c.data[k] = v
}
}
func (c *Config) parse() error {
f, err := os.Open(c.filename)
if err != nil {
return err
}
defer f.Close()
b := new(bytes.Buffer)
_, err = b.ReadFrom(f)
if err != nil {
return err
}
err = json.Unmarshal(b.Bytes(), &c.data)
if err != nil {
return err
}
return nil
}
// Returns a string for the config variable key
func (c *Config) GetString(key string) string {
result, present := c.data[key]
if !present {
return ""
}
return result.(string)
}
// Returns an int for the config variable key
func (c *Config) GetInt(key string) int {
x, ok := c.data[key]
if !ok {
return -1
}
return int(x.(float64))
}
// Returns a float for the config variable key
func (c *Config) GetFloat(key string) float64 {
x, ok := c.data[key]
if !ok {
return -1
}
return x.(float64)
}
// Returns a bool for the config variable key
func (c *Config) GetBool(key string) bool {
x, ok := c.data[key]
if !ok {
return false
}
return x.(bool)
}
// Returns an array for the config variable key
func (c *Config) GetArray(key string) []interface{} {
result, present := c.data[key]
if !present {
return []interface{}(nil)
}
return result.([]interface{})
} }