diff --git a/go/filer/cassandra_store/cassandra_store.go b/go/filer/cassandra_store/cassandra_store.go new file mode 100644 index 000000000..83face686 --- /dev/null +++ b/go/filer/cassandra_store/cassandra_store.go @@ -0,0 +1,87 @@ +package cassandra_store + +import ( + "fmt" + + "github.com/chrislusf/weed-fs/go/glog" + + "github.com/gocql/gocql" +) + +/* + +Basically you need a table just like this: + +CREATE TABLE seaweed_files ( + path varchar, + fids list, + PRIMARY KEY (path) +); + +Need to match flat_namespace.FlatNamespaceStore interface + Put(fullFileName string, fid string) (err error) + Get(fullFileName string) (fid string, err error) + Delete(fullFileName string) (fid string, err error) + +*/ +type CassandraStore struct { + cluster *gocql.ClusterConfig + session *gocql.Session +} + +func NewCassandraStore(keyspace string, hosts ...string) (c *CassandraStore, err error) { + c = &CassandraStore{} + c.cluster = gocql.NewCluster(hosts...) + c.cluster.Keyspace = keyspace + c.cluster.Consistency = gocql.Quorum + c.session, err = c.cluster.CreateSession() + if err != nil { + glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace) + } + return +} + +func (c *CassandraStore) Put(fullFileName string, fid string) (err error) { + var input []string + input = append(input, fid) + if err := c.session.Query( + `INSERT INTO seaweed_files (path, fids) VALUES (?, ?)`, + fullFileName, input).Exec(); err != nil { + glog.V(0).Infof("Failed to save file %s with id %s: %v", fullFileName, fid, err) + return err + } + return nil +} +func (c *CassandraStore) Get(fullFileName string) (fid string, err error) { + var output []string + if err := c.session.Query( + `select fids FROM seaweed_files WHERE path = ? LIMIT 1`, + fullFileName).Consistency(gocql.One).Scan(&output); err != nil { + if err != gocql.ErrNotFound { + glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err) + } + } + if len(output) == 0 { + return "", fmt.Errorf("No file id found for %s", fullFileName) + } + return output[0], nil +} + +// Currently the fid is not returned +func (c *CassandraStore) Delete(fullFileName string) (fid string, err error) { + if err := c.session.Query( + `DELETE FROM seaweed_files WHERE path = ?`, + fullFileName).Exec(); err != nil { + if err != gocql.ErrNotFound { + glog.V(0).Infof("Failed to delete file %s: %v", fullFileName, err) + } + return "", err + } + return "", nil +} + +func (c *CassandraStore) Close() { + if c.session != nil { + c.session.Close() + } +} diff --git a/go/filer/cassandra_store/schema.cql b/go/filer/cassandra_store/schema.cql new file mode 100644 index 000000000..d6f2bb093 --- /dev/null +++ b/go/filer/cassandra_store/schema.cql @@ -0,0 +1,22 @@ +/* + +Here is the CQL to create the table.CassandraStore + +Optionally you can adjust the keyspace name and replication settings. + +For production server, very likely you want to set replication_factor to 3 + +*/ + +create keyspace seaweed WITH replication = { + 'class':'SimpleStrategy', + 'replication_factor':1 +}; + +use seaweed; + +CREATE TABLE seaweed_files ( + path varchar, + fids list, + PRIMARY KEY (path) +); diff --git a/go/filer/design.txt b/go/filer/embedded_filer/design.txt similarity index 100% rename from go/filer/design.txt rename to go/filer/embedded_filer/design.txt diff --git a/go/filer/flat_namespace/flat_namespace_filer.go b/go/filer/flat_namespace/flat_namespace_filer.go new file mode 100644 index 000000000..021b809d6 --- /dev/null +++ b/go/filer/flat_namespace/flat_namespace_filer.go @@ -0,0 +1,50 @@ +package flat_namespace + +import ( + "errors" + + "github.com/chrislusf/weed-fs/go/filer" +) + +type FlatNamesapceFiler struct { + master string + store FlatNamespaceStore +} + +var ( + NotImplemented = errors.New("Not Implemented for flat namespace meta data store!") +) + +func NewFlatNamesapceFiler(master string, store FlatNamespaceStore) *FlatNamesapceFiler { + return &FlatNamesapceFiler{ + master: master, + store: store, + } +} + +func (filer *FlatNamesapceFiler) CreateFile(fullFileName string, fid string) (err error) { + return filer.store.Put(fullFileName, fid) +} +func (filer *FlatNamesapceFiler) FindFile(fullFileName string) (fid string, err error) { + return filer.store.Get(fullFileName) +} +func (filer *FlatNamesapceFiler) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { + return 0, NotImplemented +} +func (filer *FlatNamesapceFiler) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) { + return nil, NotImplemented +} +func (filer *FlatNamesapceFiler) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { + return nil, NotImplemented +} +func (filer *FlatNamesapceFiler) DeleteDirectory(dirPath string, recursive bool) (err error) { + return NotImplemented +} + +func (filer *FlatNamesapceFiler) DeleteFile(fullFileName string) (fid string, err error) { + return filer.store.Delete(fullFileName) +} + +func (filer *FlatNamesapceFiler) Move(fromPath string, toPath string) error { + return NotImplemented +} diff --git a/go/filer/flat_namespace/flat_namespace_store.go b/go/filer/flat_namespace/flat_namespace_store.go new file mode 100644 index 000000000..832b70e40 --- /dev/null +++ b/go/filer/flat_namespace/flat_namespace_store.go @@ -0,0 +1,9 @@ +package flat_namespace + +import () + +type FlatNamespaceStore interface { + Put(fullFileName string, fid string) (err error) + Get(fullFileName string) (fid string, err error) + Delete(fullFileName string) (fid string, err error) +} diff --git a/go/weed/filer.go b/go/weed/filer.go index 11154f183..767864450 100644 --- a/go/weed/filer.go +++ b/go/weed/filer.go @@ -22,6 +22,8 @@ type FilerOptions struct { defaultReplicaPlacement *string dir *string redirectOnRead *bool + cassandra_server *string + cassandra_keyspace *string } func init() { @@ -32,13 +34,15 @@ func init() { f.dir = cmdFiler.Flag.String("dir", os.TempDir(), "directory to store meta data") f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") + f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server") + f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server") } var cmdFiler = &Command{ UsageLine: "filer -port=8888 -dir=/tmp -master=", Short: "start a file server that points to a master server", Long: `start a file server which accepts REST operation for any files. - + //create or overwrite the file, the directories /path/to will be automatically created POST /path/to/file //get the file content @@ -47,10 +51,10 @@ var cmdFiler = &Command{ POST /path/to/ //return a json format subdirectory and files listing GET /path/to/ - + Current mapping metadata store is local embedded leveldb. It should be highly scalable to hundreds of millions of files on a modest machine. - + Future we will ensure it can avoid of being SPOF. `, @@ -63,8 +67,9 @@ func runFiler(cmd *Command, args []string) bool { } r := http.NewServeMux() - _, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection, + _, nfs_err := weed_server.NewEmbeddedFilerServer(r, *f.port, *f.master, *f.dir, *f.collection, *f.defaultReplicaPlacement, *f.redirectOnRead, + *f.cassandra_server, *f.cassandra_keyspace, ) if nfs_err != nil { glog.Fatalf(nfs_err.Error()) diff --git a/go/weed/server.go b/go/weed/server.go index 2db251944..3aae2e7fb 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -157,8 +157,9 @@ func runServer(cmd *Command, args []string) bool { if *isStartingFiler { go func() { r := http.NewServeMux() - _, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, + _, nfs_err := weed_server.NewEmbeddedFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, *filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead, + "", "", ) if nfs_err != nil { glog.Fatalf(nfs_err.Error()) diff --git a/go/weed/weed_server/filer_server.go b/go/weed/weed_server/filer_server.go index b8cb9bd5d..325c9b626 100644 --- a/go/weed/weed_server/filer_server.go +++ b/go/weed/weed_server/filer_server.go @@ -5,7 +5,9 @@ import ( "strconv" "github.com/chrislusf/weed-fs/go/filer" + "github.com/chrislusf/weed-fs/go/filer/cassandra_store" "github.com/chrislusf/weed-fs/go/filer/embedded_filer" + "github.com/chrislusf/weed-fs/go/filer/flat_namespace" "github.com/chrislusf/weed-fs/go/glog" ) @@ -18,8 +20,9 @@ type FilerServer struct { filer filer.Filer } -func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string, +func NewEmbeddedFilerServer(r *http.ServeMux, port int, master string, dir string, collection string, replication string, redirectOnRead bool, + cassandra_server string, cassandra_keyspace string, ) (fs *FilerServer, err error) { fs = &FilerServer{ master: master, @@ -29,12 +32,21 @@ func NewFilerServer(r *http.ServeMux, port int, master string, dir string, colle port: ":" + strconv.Itoa(port), } - if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil { - glog.Fatal("Can not start filer in dir", dir, ": ", err.Error()) - return + if cassandra_server == "" { + if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil { + glog.Fatalf("Can not start filer in dir %s : %v", err) + return + } + + r.HandleFunc("/admin/mv", fs.moveHandler) + } else { + cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server) + if err != nil { + glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err) + } + fs.filer = flat_namespace.NewFlatNamesapceFiler(master, cassandra_store) } - r.HandleFunc("/admin/mv", fs.moveHandler) r.HandleFunc("/", fs.filerHandler) return fs, nil diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go index 6f22912a7..1afc1f852 100644 --- a/go/weed/weed_server/filer_server_handlers.go +++ b/go/weed/weed_server/filer_server_handlers.go @@ -179,7 +179,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { glog.V(4).Infoln("saving", path, "=>", assignResult.Fid) if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil { operation.DeleteFile(fs.master, assignResult.Fid) //clean up - glog.V(0).Infoln("failing to write to filer server", r.RequestURI, db_err.Error()) + glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) writeJsonError(w, r, db_err) return }