seaweedfs/weed/command/filer_replication.go

142 lines
4.3 KiB
Go
Raw Normal View History

2018-09-17 07:27:56 +00:00
package command
import (
2019-03-16 00:20:24 +00:00
"context"
2018-10-06 20:01:38 +00:00
"strings"
2019-09-02 10:28:40 +00:00
"github.com/joeslay/seaweedfs/weed/glog"
"github.com/joeslay/seaweedfs/weed/replication"
"github.com/joeslay/seaweedfs/weed/replication/sink"
_ "github.com/joeslay/seaweedfs/weed/replication/sink/azuresink"
_ "github.com/joeslay/seaweedfs/weed/replication/sink/b2sink"
_ "github.com/joeslay/seaweedfs/weed/replication/sink/filersink"
_ "github.com/joeslay/seaweedfs/weed/replication/sink/gcssink"
_ "github.com/joeslay/seaweedfs/weed/replication/sink/s3sink"
"github.com/joeslay/seaweedfs/weed/replication/sub"
"github.com/joeslay/seaweedfs/weed/util"
2018-10-11 07:08:13 +00:00
"github.com/spf13/viper"
2018-09-17 07:27:56 +00:00
)
func init() {
cmdFilerReplicate.Run = runFilerReplicate // break init cycle
}
var cmdFilerReplicate = &Command{
UsageLine: "filer.replicate",
Short: "replicate file changes to another destination",
Long: `replicate file changes to another destination
filer.replicate listens on filer notifications. If any file is updated, it will fetch the updated content,
and write to the other destination.
2019-02-10 05:07:12 +00:00
Run "weed scaffold -config=replication" to generate a replication.toml file and customize the parameters.
2018-09-17 07:27:56 +00:00
`,
}
func runFilerReplicate(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadConfiguration("replication", true)
util.LoadConfiguration("notification", true)
2018-09-17 07:27:56 +00:00
config := viper.GetViper()
var notificationInput sub.NotificationInput
2018-09-17 07:27:56 +00:00
2018-12-06 08:44:41 +00:00
validateOneEnabledInput(config)
for _, input := range sub.NotificationInputs {
2018-09-17 07:27:56 +00:00
if config.GetBool("notification." + input.GetName() + ".enabled") {
viperSub := config.Sub("notification." + input.GetName())
if err := input.Initialize(viperSub); err != nil {
glog.Fatalf("Failed to initialize notification input for %s: %+v",
input.GetName(), err)
}
glog.V(0).Infof("Configure notification input to %s", input.GetName())
notificationInput = input
break
}
}
2018-10-30 09:29:11 +00:00
if notificationInput == nil {
println("No notification is defined in notification.toml file.")
println("Please follow 'weed scaffold -config=notification' to see example notification configurations.")
2018-10-30 09:29:11 +00:00
return true
}
2018-09-23 07:40:36 +00:00
// avoid recursive replication
if config.GetBool("notification.source.filer.enabled") && config.GetBool("notification.sink.filer.enabled") {
sourceConfig, sinkConfig := config.Sub("source.filer"), config.Sub("sink.filer")
if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") {
fromDir := sourceConfig.GetString("directory")
toDir := sinkConfig.GetString("directory")
if strings.HasPrefix(toDir, fromDir) {
glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir)
}
}
}
2018-10-04 06:36:52 +00:00
var dataSink sink.ReplicationSink
for _, sk := range sink.Sinks {
if config.GetBool("sink." + sk.GetName() + ".enabled") {
viperSub := config.Sub("sink." + sk.GetName())
if err := sk.Initialize(viperSub); err != nil {
glog.Fatalf("Failed to initialize sink for %s: %+v",
sk.GetName(), err)
}
glog.V(0).Infof("Configure sink to %s", sk.GetName())
dataSink = sk
break
}
}
2018-10-06 20:01:38 +00:00
if dataSink == nil {
println("no data sink configured in replication.toml:")
2018-10-06 20:01:38 +00:00
for _, sk := range sink.Sinks {
println(" " + sk.GetName())
}
return true
}
2018-10-04 06:36:52 +00:00
replicator := replication.NewReplicator(config.Sub("source.filer"), dataSink)
2018-09-17 07:27:56 +00:00
for {
key, m, err := notificationInput.ReceiveMessage()
if err != nil {
2018-09-17 09:23:21 +00:00
glog.Errorf("receive %s: %+v", key, err)
2018-09-17 07:27:56 +00:00
continue
}
if key == "" {
// long poll received no messages
continue
}
2018-09-22 07:12:10 +00:00
if m.OldEntry != nil && m.NewEntry == nil {
glog.V(1).Infof("delete: %s", key)
2018-09-22 07:12:10 +00:00
} else if m.OldEntry == nil && m.NewEntry != nil {
glog.V(1).Infof(" add: %s", key)
2018-09-22 07:12:10 +00:00
} else {
glog.V(1).Infof("modify: %s", key)
}
2019-03-16 00:20:24 +00:00
if err = replicator.Replicate(context.Background(), key, m); err != nil {
2018-09-17 09:23:21 +00:00
glog.Errorf("replicate %s: %+v", key, err)
2018-11-04 19:58:59 +00:00
} else {
2018-11-04 20:07:33 +00:00
glog.V(1).Infof("replicated %s", key)
2018-09-17 07:27:56 +00:00
}
}
return true
}
2018-12-06 08:44:41 +00:00
func validateOneEnabledInput(config *viper.Viper) {
enabledInput := ""
for _, input := range sub.NotificationInputs {
if config.GetBool("notification." + input.GetName() + ".enabled") {
if enabledInput == "" {
enabledInput = input.GetName()
} else {
glog.Fatalf("Notification input is enabled for both %s and %s", enabledInput, input.GetName())
}
}
}
}