mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Add support for distributed filer metadata store.
This commit is contained in:
parent
165734ce11
commit
49784d7f28
87
go/filer/cassandra_store/cassandra_store.go
Normal file
87
go/filer/cassandra_store/cassandra_store.go
Normal file
|
@ -0,0 +1,87 @@
|
||||||
|
package cassandra_store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/chrislusf/weed-fs/go/glog"
|
||||||
|
|
||||||
|
"github.com/gocql/gocql"
|
||||||
|
)
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
|
Basically you need a table just like this:
|
||||||
|
|
||||||
|
CREATE TABLE seaweed_files (
|
||||||
|
path varchar,
|
||||||
|
fids list<varchar>,
|
||||||
|
PRIMARY KEY (path)
|
||||||
|
);
|
||||||
|
|
||||||
|
Need to match flat_namespace.FlatNamespaceStore interface
|
||||||
|
Put(fullFileName string, fid string) (err error)
|
||||||
|
Get(fullFileName string) (fid string, err error)
|
||||||
|
Delete(fullFileName string) (fid string, err error)
|
||||||
|
|
||||||
|
*/
|
||||||
|
type CassandraStore struct {
|
||||||
|
cluster *gocql.ClusterConfig
|
||||||
|
session *gocql.Session
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCassandraStore(keyspace string, hosts ...string) (c *CassandraStore, err error) {
|
||||||
|
c = &CassandraStore{}
|
||||||
|
c.cluster = gocql.NewCluster(hosts...)
|
||||||
|
c.cluster.Keyspace = keyspace
|
||||||
|
c.cluster.Consistency = gocql.Quorum
|
||||||
|
c.session, err = c.cluster.CreateSession()
|
||||||
|
if err != nil {
|
||||||
|
glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CassandraStore) Put(fullFileName string, fid string) (err error) {
|
||||||
|
var input []string
|
||||||
|
input = append(input, fid)
|
||||||
|
if err := c.session.Query(
|
||||||
|
`INSERT INTO seaweed_files (path, fids) VALUES (?, ?)`,
|
||||||
|
fullFileName, input).Exec(); err != nil {
|
||||||
|
glog.V(0).Infof("Failed to save file %s with id %s: %v", fullFileName, fid, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (c *CassandraStore) Get(fullFileName string) (fid string, err error) {
|
||||||
|
var output []string
|
||||||
|
if err := c.session.Query(
|
||||||
|
`select fids FROM seaweed_files WHERE path = ? LIMIT 1`,
|
||||||
|
fullFileName).Consistency(gocql.One).Scan(&output); err != nil {
|
||||||
|
if err != gocql.ErrNotFound {
|
||||||
|
glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(output) == 0 {
|
||||||
|
return "", fmt.Errorf("No file id found for %s", fullFileName)
|
||||||
|
}
|
||||||
|
return output[0], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Currently the fid is not returned
|
||||||
|
func (c *CassandraStore) Delete(fullFileName string) (fid string, err error) {
|
||||||
|
if err := c.session.Query(
|
||||||
|
`DELETE FROM seaweed_files WHERE path = ?`,
|
||||||
|
fullFileName).Exec(); err != nil {
|
||||||
|
if err != gocql.ErrNotFound {
|
||||||
|
glog.V(0).Infof("Failed to delete file %s: %v", fullFileName, err)
|
||||||
|
}
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CassandraStore) Close() {
|
||||||
|
if c.session != nil {
|
||||||
|
c.session.Close()
|
||||||
|
}
|
||||||
|
}
|
22
go/filer/cassandra_store/schema.cql
Normal file
22
go/filer/cassandra_store/schema.cql
Normal file
|
@ -0,0 +1,22 @@
|
||||||
|
/*
|
||||||
|
|
||||||
|
Here is the CQL to create the table.CassandraStore
|
||||||
|
|
||||||
|
Optionally you can adjust the keyspace name and replication settings.
|
||||||
|
|
||||||
|
For production server, very likely you want to set replication_factor to 3
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
create keyspace seaweed WITH replication = {
|
||||||
|
'class':'SimpleStrategy',
|
||||||
|
'replication_factor':1
|
||||||
|
};
|
||||||
|
|
||||||
|
use seaweed;
|
||||||
|
|
||||||
|
CREATE TABLE seaweed_files (
|
||||||
|
path varchar,
|
||||||
|
fids list<varchar>,
|
||||||
|
PRIMARY KEY (path)
|
||||||
|
);
|
50
go/filer/flat_namespace/flat_namespace_filer.go
Normal file
50
go/filer/flat_namespace/flat_namespace_filer.go
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
package flat_namespace
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/chrislusf/weed-fs/go/filer"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FlatNamesapceFiler struct {
|
||||||
|
master string
|
||||||
|
store FlatNamespaceStore
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
NotImplemented = errors.New("Not Implemented for flat namespace meta data store!")
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewFlatNamesapceFiler(master string, store FlatNamespaceStore) *FlatNamesapceFiler {
|
||||||
|
return &FlatNamesapceFiler{
|
||||||
|
master: master,
|
||||||
|
store: store,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (filer *FlatNamesapceFiler) CreateFile(fullFileName string, fid string) (err error) {
|
||||||
|
return filer.store.Put(fullFileName, fid)
|
||||||
|
}
|
||||||
|
func (filer *FlatNamesapceFiler) FindFile(fullFileName string) (fid string, err error) {
|
||||||
|
return filer.store.Get(fullFileName)
|
||||||
|
}
|
||||||
|
func (filer *FlatNamesapceFiler) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) {
|
||||||
|
return 0, NotImplemented
|
||||||
|
}
|
||||||
|
func (filer *FlatNamesapceFiler) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) {
|
||||||
|
return nil, NotImplemented
|
||||||
|
}
|
||||||
|
func (filer *FlatNamesapceFiler) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
|
||||||
|
return nil, NotImplemented
|
||||||
|
}
|
||||||
|
func (filer *FlatNamesapceFiler) DeleteDirectory(dirPath string, recursive bool) (err error) {
|
||||||
|
return NotImplemented
|
||||||
|
}
|
||||||
|
|
||||||
|
func (filer *FlatNamesapceFiler) DeleteFile(fullFileName string) (fid string, err error) {
|
||||||
|
return filer.store.Delete(fullFileName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (filer *FlatNamesapceFiler) Move(fromPath string, toPath string) error {
|
||||||
|
return NotImplemented
|
||||||
|
}
|
9
go/filer/flat_namespace/flat_namespace_store.go
Normal file
9
go/filer/flat_namespace/flat_namespace_store.go
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
package flat_namespace
|
||||||
|
|
||||||
|
import ()
|
||||||
|
|
||||||
|
type FlatNamespaceStore interface {
|
||||||
|
Put(fullFileName string, fid string) (err error)
|
||||||
|
Get(fullFileName string) (fid string, err error)
|
||||||
|
Delete(fullFileName string) (fid string, err error)
|
||||||
|
}
|
|
@ -22,6 +22,8 @@ type FilerOptions struct {
|
||||||
defaultReplicaPlacement *string
|
defaultReplicaPlacement *string
|
||||||
dir *string
|
dir *string
|
||||||
redirectOnRead *bool
|
redirectOnRead *bool
|
||||||
|
cassandra_server *string
|
||||||
|
cassandra_keyspace *string
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -32,13 +34,15 @@ func init() {
|
||||||
f.dir = cmdFiler.Flag.String("dir", os.TempDir(), "directory to store meta data")
|
f.dir = cmdFiler.Flag.String("dir", os.TempDir(), "directory to store meta data")
|
||||||
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
|
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
|
||||||
f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
|
f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
|
||||||
|
f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server")
|
||||||
|
f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
|
||||||
}
|
}
|
||||||
|
|
||||||
var cmdFiler = &Command{
|
var cmdFiler = &Command{
|
||||||
UsageLine: "filer -port=8888 -dir=/tmp -master=<ip:port>",
|
UsageLine: "filer -port=8888 -dir=/tmp -master=<ip:port>",
|
||||||
Short: "start a file server that points to a master server",
|
Short: "start a file server that points to a master server",
|
||||||
Long: `start a file server which accepts REST operation for any files.
|
Long: `start a file server which accepts REST operation for any files.
|
||||||
|
|
||||||
//create or overwrite the file, the directories /path/to will be automatically created
|
//create or overwrite the file, the directories /path/to will be automatically created
|
||||||
POST /path/to/file
|
POST /path/to/file
|
||||||
//get the file content
|
//get the file content
|
||||||
|
@ -47,10 +51,10 @@ var cmdFiler = &Command{
|
||||||
POST /path/to/
|
POST /path/to/
|
||||||
//return a json format subdirectory and files listing
|
//return a json format subdirectory and files listing
|
||||||
GET /path/to/
|
GET /path/to/
|
||||||
|
|
||||||
Current <fullpath~fileid> mapping metadata store is local embedded leveldb.
|
Current <fullpath~fileid> mapping metadata store is local embedded leveldb.
|
||||||
It should be highly scalable to hundreds of millions of files on a modest machine.
|
It should be highly scalable to hundreds of millions of files on a modest machine.
|
||||||
|
|
||||||
Future we will ensure it can avoid of being SPOF.
|
Future we will ensure it can avoid of being SPOF.
|
||||||
|
|
||||||
`,
|
`,
|
||||||
|
@ -63,8 +67,9 @@ func runFiler(cmd *Command, args []string) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
r := http.NewServeMux()
|
r := http.NewServeMux()
|
||||||
_, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection,
|
_, nfs_err := weed_server.NewEmbeddedFilerServer(r, *f.port, *f.master, *f.dir, *f.collection,
|
||||||
*f.defaultReplicaPlacement, *f.redirectOnRead,
|
*f.defaultReplicaPlacement, *f.redirectOnRead,
|
||||||
|
*f.cassandra_server, *f.cassandra_keyspace,
|
||||||
)
|
)
|
||||||
if nfs_err != nil {
|
if nfs_err != nil {
|
||||||
glog.Fatalf(nfs_err.Error())
|
glog.Fatalf(nfs_err.Error())
|
||||||
|
|
|
@ -157,8 +157,9 @@ func runServer(cmd *Command, args []string) bool {
|
||||||
if *isStartingFiler {
|
if *isStartingFiler {
|
||||||
go func() {
|
go func() {
|
||||||
r := http.NewServeMux()
|
r := http.NewServeMux()
|
||||||
_, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
|
_, nfs_err := weed_server.NewEmbeddedFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
|
||||||
*filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead,
|
*filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead,
|
||||||
|
"", "",
|
||||||
)
|
)
|
||||||
if nfs_err != nil {
|
if nfs_err != nil {
|
||||||
glog.Fatalf(nfs_err.Error())
|
glog.Fatalf(nfs_err.Error())
|
||||||
|
|
|
@ -5,7 +5,9 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/chrislusf/weed-fs/go/filer"
|
"github.com/chrislusf/weed-fs/go/filer"
|
||||||
|
"github.com/chrislusf/weed-fs/go/filer/cassandra_store"
|
||||||
"github.com/chrislusf/weed-fs/go/filer/embedded_filer"
|
"github.com/chrislusf/weed-fs/go/filer/embedded_filer"
|
||||||
|
"github.com/chrislusf/weed-fs/go/filer/flat_namespace"
|
||||||
"github.com/chrislusf/weed-fs/go/glog"
|
"github.com/chrislusf/weed-fs/go/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -18,8 +20,9 @@ type FilerServer struct {
|
||||||
filer filer.Filer
|
filer filer.Filer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string,
|
func NewEmbeddedFilerServer(r *http.ServeMux, port int, master string, dir string, collection string,
|
||||||
replication string, redirectOnRead bool,
|
replication string, redirectOnRead bool,
|
||||||
|
cassandra_server string, cassandra_keyspace string,
|
||||||
) (fs *FilerServer, err error) {
|
) (fs *FilerServer, err error) {
|
||||||
fs = &FilerServer{
|
fs = &FilerServer{
|
||||||
master: master,
|
master: master,
|
||||||
|
@ -29,12 +32,21 @@ func NewFilerServer(r *http.ServeMux, port int, master string, dir string, colle
|
||||||
port: ":" + strconv.Itoa(port),
|
port: ":" + strconv.Itoa(port),
|
||||||
}
|
}
|
||||||
|
|
||||||
if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil {
|
if cassandra_server == "" {
|
||||||
glog.Fatal("Can not start filer in dir", dir, ": ", err.Error())
|
if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil {
|
||||||
return
|
glog.Fatalf("Can not start filer in dir %s : %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r.HandleFunc("/admin/mv", fs.moveHandler)
|
||||||
|
} else {
|
||||||
|
cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err)
|
||||||
|
}
|
||||||
|
fs.filer = flat_namespace.NewFlatNamesapceFiler(master, cassandra_store)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.HandleFunc("/admin/mv", fs.moveHandler)
|
|
||||||
r.HandleFunc("/", fs.filerHandler)
|
r.HandleFunc("/", fs.filerHandler)
|
||||||
|
|
||||||
return fs, nil
|
return fs, nil
|
||||||
|
|
|
@ -179,7 +179,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
glog.V(4).Infoln("saving", path, "=>", assignResult.Fid)
|
glog.V(4).Infoln("saving", path, "=>", assignResult.Fid)
|
||||||
if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil {
|
if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil {
|
||||||
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
|
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
|
||||||
glog.V(0).Infoln("failing to write to filer server", r.RequestURI, db_err.Error())
|
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
|
||||||
writeJsonError(w, r, db_err)
|
writeJsonError(w, r, db_err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue