From 869161a2613a64bc5cf15f7841607f2721b8ef40 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 26 May 2018 22:02:49 -0700 Subject: [PATCH] support both mysql and postgres --- .../filer2/abstract_sql/abstract_sql_store.go | 54 ++++++++------- weed/filer2/configuration.go | 9 +-- weed/filer2/mysql/mysql_store.go | 8 +++ weed/filer2/postgres/README.txt | 17 +++++ weed/filer2/postgres/postgres_store.go | 68 +++++++++++++++++++ weed/server/filer_server.go | 1 + 6 files changed, 127 insertions(+), 30 deletions(-) create mode 100644 weed/filer2/postgres/README.txt create mode 100644 weed/filer2/postgres/postgres_store.go diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go index 900056dae..bfc76fbc0 100644 --- a/weed/filer2/abstract_sql/abstract_sql_store.go +++ b/weed/filer2/abstract_sql/abstract_sql_store.go @@ -9,7 +9,13 @@ import ( ) type AbstractSqlStore struct { - DB *sql.DB + DB *sql.DB + SqlInsert string + SqlUpdate string + SqlFind string + SqlDelete string + SqlListExclusive string + SqlListInclusive string } func (store *AbstractSqlStore) InsertEntry(entry *filer2.Entry) (err error) { @@ -17,18 +23,17 @@ func (store *AbstractSqlStore) InsertEntry(entry *filer2.Entry) (err error) { dir, name := entry.FullPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() if err != nil { - return fmt.Errorf("mysql encode %s: %s", entry.FullPath, err) + return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - res, err := store.DB.Exec("INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)", - hashToLong(dir), name, dir, meta) + res, err := store.DB.Exec(store.SqlInsert, hashToLong(dir), name, dir, meta) if err != nil { - return fmt.Errorf("mysql insert %s: %s", entry.FullPath, err) + return fmt.Errorf("insert %s: %s", entry.FullPath, err) } _, err = res.RowsAffected() if err != nil { - return fmt.Errorf("mysql insert %s but no rows affected: %s", entry.FullPath, err) + return fmt.Errorf("insert %s but no rows affected: %s", entry.FullPath, err) } return nil } @@ -38,18 +43,17 @@ func (store *AbstractSqlStore) UpdateEntry(entry *filer2.Entry) (err error) { dir, name := entry.FullPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() if err != nil { - return fmt.Errorf("mysql encode %s: %s", entry.FullPath, err) + return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - res, err := store.DB.Exec("UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?", - meta, hashToLong(dir), name, dir) + res, err := store.DB.Exec(store.SqlUpdate, meta, hashToLong(dir), name, dir) if err != nil { - return fmt.Errorf("mysql update %s: %s", entry.FullPath, err) + return fmt.Errorf("update %s: %s", entry.FullPath, err) } _, err = res.RowsAffected() if err != nil { - return fmt.Errorf("mysql update %s but no rows affected: %s", entry.FullPath, err) + return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err) } return nil } @@ -57,18 +61,17 @@ func (store *AbstractSqlStore) UpdateEntry(entry *filer2.Entry) (err error) { func (store *AbstractSqlStore) FindEntry(fullpath filer2.FullPath) (*filer2.Entry, error) { dir, name := fullpath.DirAndName() - row := store.DB.QueryRow("SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?", - hashToLong(dir), name, dir) + row := store.DB.QueryRow(store.SqlFind, hashToLong(dir), name, dir) var data []byte if err := row.Scan(&data); err != nil { - return nil, fmt.Errorf("mysql read entry %s: %v", fullpath, err) + return nil, fmt.Errorf("read entry %s: %v", fullpath, err) } entry := &filer2.Entry{ FullPath: fullpath, } if err := entry.DecodeAttributesAndChunks(data); err != nil { - return entry, fmt.Errorf("mysql decode %s : %v", entry.FullPath, err) + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } return entry, nil @@ -80,15 +83,14 @@ func (store *AbstractSqlStore) DeleteEntry(fullpath filer2.FullPath) (*filer2.En dir, name := fullpath.DirAndName() - res, err := store.DB.Exec("DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?", - hashToLong(dir), name, dir) + res, err := store.DB.Exec(store.SqlDelete, hashToLong(dir), name, dir) if err != nil { - return nil, fmt.Errorf("mysql delete %s: %s", fullpath, err) + return nil, fmt.Errorf("delete %s: %s", fullpath, err) } _, err = res.RowsAffected() if err != nil { - return nil, fmt.Errorf("mysql delete %s but no rows affected: %s", fullpath, err) + return nil, fmt.Errorf("delete %s but no rows affected: %s", fullpath, err) } return entry, nil @@ -96,14 +98,14 @@ func (store *AbstractSqlStore) DeleteEntry(fullpath filer2.FullPath) (*filer2.En func (store *AbstractSqlStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { - sqlText := "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? LIMIT ?" + sqlText := store.SqlListExclusive if inclusive { - sqlText = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? LIMIT ?" + sqlText = store.SqlListInclusive } rows, err := store.DB.Query(sqlText, hashToLong(string(fullpath)), startFileName, string(fullpath), limit) if err != nil { - return nil, fmt.Errorf("mysql list %s : %v", fullpath, err) + return nil, fmt.Errorf("list %s : %v", fullpath, err) } defer rows.Close() @@ -111,16 +113,16 @@ func (store *AbstractSqlStore) ListDirectoryEntries(fullpath filer2.FullPath, st var name string var data []byte if err = rows.Scan(&name, &data); err != nil { - glog.V(0).Infof("mysql scan %s : %v", fullpath, err) - return nil, fmt.Errorf("mysql scan %s: %v", fullpath, err) + glog.V(0).Infof("scan %s : %v", fullpath, err) + return nil, fmt.Errorf("scan %s: %v", fullpath, err) } entry := &filer2.Entry{ FullPath: filer2.NewFullPath(string(fullpath), name), } if err = entry.DecodeAttributesAndChunks(data); err != nil { - glog.V(0).Infof("mysql scan decode %s : %v", entry.FullPath, err) - return nil, fmt.Errorf("mysql scan decode %s : %v", entry.FullPath, err) + glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err) + return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) } entries = append(entries, entry) diff --git a/weed/filer2/configuration.go b/weed/filer2/configuration.go index 14ca61745..91a0d6f46 100644 --- a/weed/filer2/configuration.go +++ b/weed/filer2/configuration.go @@ -35,11 +35,11 @@ dir = "." # directory to store level db files # ) DEFAULT CHARSET=utf8; # enabled = true -server = "localhost" +hostname = "localhost" port = 3306 username = "root" password = "" -database = "" # create or use an existing database, create the seaweedfs table. +database = "" # create or use an existing database connection_max_idle = 2 connection_max_open = 100 @@ -52,11 +52,12 @@ connection_max_open = 100 # PRIMARY KEY (dirhash, name) # ); enabled = false -server = "localhost" +hostname = "localhost" port = 5432 username = "postgres" password = "" -database = "" +database = "" # create or use an existing database +sslmode = "disable" connection_max_idle = 100 connection_max_open = 100 diff --git a/weed/filer2/mysql/mysql_store.go b/weed/filer2/mysql/mysql_store.go index 16fd4311f..38afb99f1 100644 --- a/weed/filer2/mysql/mysql_store.go +++ b/weed/filer2/mysql/mysql_store.go @@ -39,6 +39,14 @@ func (store *MysqlStore) Initialize(viper *viper.Viper) (err error) { } func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int) (err error) { + + store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)" + store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?" + store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?" + store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?" + store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? LIMIT ?" + store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? LIMIT ?" + sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) var dbErr error store.DB, dbErr = sql.Open("mysql", sqlUrl) diff --git a/weed/filer2/postgres/README.txt b/weed/filer2/postgres/README.txt new file mode 100644 index 000000000..ef2ef683b --- /dev/null +++ b/weed/filer2/postgres/README.txt @@ -0,0 +1,17 @@ + +1. create "seaweedfs" database + +export PGHOME=/Library/PostgreSQL/10 +$PGHOME/bin/createdb --username=postgres --password seaweedfs + +2. create "filemeta" table +$PGHOME/bin/psql --username=postgres --password seaweedfs + +CREATE TABLE IF NOT EXISTS filemeta ( + dirhash BIGINT, + name VARCHAR(1000), + directory VARCHAR(4096), + meta bytea, + PRIMARY KEY (dirhash, name) +); + diff --git a/weed/filer2/postgres/postgres_store.go b/weed/filer2/postgres/postgres_store.go new file mode 100644 index 000000000..387b8f58d --- /dev/null +++ b/weed/filer2/postgres/postgres_store.go @@ -0,0 +1,68 @@ +package postgres + +import ( + "fmt" + "database/sql" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/spf13/viper" + _ "github.com/lib/pq" + "github.com/chrislusf/seaweedfs/weed/filer2/abstract_sql" +) + +const ( + CONNECTION_URL_PATTERN = "host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30" +) + +func init() { + filer2.Stores = append(filer2.Stores, &PostgresStore{}) +} + +type PostgresStore struct { + abstract_sql.AbstractSqlStore +} + +func (store *PostgresStore) GetName() string { + return "postgres" +} + +func (store *PostgresStore) Initialize(viper *viper.Viper) (err error) { + return store.initialize( + viper.GetString("username"), + viper.GetString("password"), + viper.GetString("hostname"), + viper.GetInt("port"), + viper.GetString("database"), + viper.GetString("sslmode"), + viper.GetInt("connection_max_idle"), + viper.GetInt("connection_max_open"), + ) +} + +func (store *PostgresStore) initialize(user, password, hostname string, port int, database, sslmode string, maxIdle, maxOpen int) (err error) { + + store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)" + store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4" + store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" + store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" + store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 LIMIT $4" + store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 LIMIT $4" + + sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, user, password, database, sslmode) + var dbErr error + store.DB, dbErr = sql.Open("postgres", sqlUrl) + if dbErr != nil { + store.DB.Close() + store.DB = nil + return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err) + } + + store.DB.SetMaxIdleConns(maxIdle) + store.DB.SetMaxOpenConns(maxOpen) + + if err = store.DB.Ping(); err != nil { + return fmt.Errorf("connect to %s error:%v", sqlUrl, err) + } + + return nil +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 2e88252e9..4517733ae 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -15,6 +15,7 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb" _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql" + _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres" ) type FilerServer struct {