diff --git a/docker/Makefile b/docker/Makefile index fe278f9b4..5949842f1 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -21,6 +21,9 @@ k8s: build dev_registry: build docker-compose -f local-registry-compose.yml -p seaweedfs up +dev_replicate: build + docker-compose -f local-replicate-compose.yml -p seaweedfs up + cluster: build docker-compose -f local-cluster-compose.yml -p seaweedfs up diff --git a/docker/local-replicate-compose.yml b/docker/local-replicate-compose.yml new file mode 100644 index 000000000..a8e0f808e --- /dev/null +++ b/docker/local-replicate-compose.yml @@ -0,0 +1,59 @@ +version: '2' + +services: + master: + image: chrislusf/seaweedfs:local + ports: + - 9333:9333 + - 19333:19333 + command: "master -ip=master" + volume: + image: chrislusf/seaweedfs:local + ports: + - 8080:8080 + - 18080:18080 + command: "volume -mserver=master:9333 -port=8080 -ip=volume -preStopSeconds=1" + depends_on: + - master + filer: + image: chrislusf/seaweedfs:local + ports: + - 8888:8888 + - 18888:18888 + command: '-v=9 filer -master="master:9333"' + restart: on-failure + volumes: + - ./notification.toml:/etc/seaweedfs/notification.toml + depends_on: + - master + - volume + - rabbitmq + - replicate + environment: + RABBIT_SERVER_URL: "amqp://guest:guest@rabbitmq:5672/" + replicate: + image: chrislusf/seaweedfs:local + command: '-v=9 filer.replicate' + restart: on-failure + volumes: + - ./notification.toml:/etc/seaweedfs/notification.toml + - ./replication.toml:/etc/seaweedfs/replication.toml + depends_on: + - rabbitmq + environment: + RABBIT_SERVER_URL: "amqp://guest:guest@rabbitmq:5672/" + s3: + image: chrislusf/seaweedfs:local + ports: + - 8333:8333 + command: 's3 -filer="filer:8888"' + depends_on: + - master + - volume + - filer + rabbitmq: + image: rabbitmq:3.8.10-management-alpine + ports: + - 5672:5672 + - 15671:15671 + - 15672:15672 \ No newline at end of file diff --git a/docker/notification.toml b/docker/notification.toml new file mode 100644 index 000000000..4ed76825e --- /dev/null +++ b/docker/notification.toml @@ -0,0 +1,16 @@ +[notification.log] +# this is only for debugging perpose and does not work with "weed filer.replicate" +enabled = false + + +[notification.gocdk_pub_sub] +# The Go Cloud Development Kit (https://gocloud.dev). +# PubSub API (https://godoc.org/gocloud.dev/pubsub). +# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ. +enabled = true +# This URL will Dial the RabbitMQ server at the URL in the environment +# variable RABBIT_SERVER_URL and open the exchange "myexchange". +# The exchange must have already been created by some other means, like +# the RabbitMQ management plugin. +topic_url = "rabbit://swexchange" +sub_url = "rabbit://swqueue" \ No newline at end of file diff --git a/docker/replication.toml b/docker/replication.toml new file mode 100644 index 000000000..84c0bec28 --- /dev/null +++ b/docker/replication.toml @@ -0,0 +1,11 @@ +[source.filer] +enabled = true +grpcAddress = "filer:18888" +# all files under this directory tree are replicated. +# this is not a directory on your hard drive, but on your filer. +# i.e., all files with this "prefix" are sent to notification message queue. +directory = "/buckets" + +[sink.local] +enabled = true +directory = "/data" \ No newline at end of file diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 4f698e375..b6515e505 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -11,6 +11,7 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" "github.com/chrislusf/seaweedfs/weed/replication/sub" "github.com/chrislusf/seaweedfs/weed/util" diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 8b74274e5..415e0cba3 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -350,6 +350,10 @@ grpcAddress = "localhost:18888" # i.e., all files with this "prefix" are sent to notification message queue. directory = "/buckets" +[sink.local] +enabled = false +directory = "/backup" + [sink.filer] enabled = false grpcAddress = "localhost:18888" diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go new file mode 100644 index 000000000..76e0384e5 --- /dev/null +++ b/weed/replication/sink/localsink/local_sink.go @@ -0,0 +1,101 @@ +package localsink + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/repl_util" + "github.com/chrislusf/seaweedfs/weed/replication/sink" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" + "io/ioutil" + "os" + "path/filepath" + "strings" +) + +type LocalSink struct { + dir string + filerSource *source.FilerSource +} + +func init() { + sink.Sinks = append(sink.Sinks, &LocalSink{}) +} + +func (localsink *LocalSink) SetSourceFiler(s *source.FilerSource) { + localsink.filerSource = s +} + +func (localsink *LocalSink) GetName() string { + return "local" +} + +func (localsink *LocalSink) isMultiPartEntry(key string) bool { + return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/") +} + +func (localsink *LocalSink) initialize(dir string) error { + localsink.dir = dir + return nil +} + +func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error { + dir := configuration.GetString(prefix + "directory") + glog.V(4).Infof("sink.local.directory: %v", dir) + return localsink.initialize(dir) +} + +func (localsink *LocalSink) GetSinkToDirectory() string { + return localsink.dir +} + +func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { + if localsink.isMultiPartEntry(key) { + return nil + } + glog.V(4).Infof("Delete Entry key: %s", key) + if err := os.Remove(key); err != nil { + return err + } + return nil +} + +func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { + if entry.IsDirectory || localsink.isMultiPartEntry(key) { + return nil + } + glog.V(4).Infof("Create Entry key: %s", key) + + totalSize := filer.FileSize(entry) + chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) + + dir := filepath.Dir(key) + + if _, err := os.Stat(dir); os.IsNotExist(err) { + glog.V(4).Infof("Create Direcotry key: %s", dir) + if err = os.MkdirAll(dir, 0); err != nil { + return err + } + } + + writeFunc := func(data []byte) error { + writeErr := ioutil.WriteFile(key, data, 0) + return writeErr + } + + if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil { + return err + } + + return nil +} + +func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) { + if localsink.isMultiPartEntry(key) { + return true, nil + } + glog.V(4).Infof("Update Entry key: %s", key) + // do delete and create + return false, nil +}