refactoring mysql store code

This commit is contained in:
霍晓栋 2016-09-05 14:10:22 +08:00
parent e7b237c8da
commit 3aa021a812
5 changed files with 141 additions and 71 deletions

View file

@ -0,0 +1,66 @@
#MySQL filer mapping store
## Schema format
Basically, uriPath and fid are the key elements stored in MySQL. In view of the optimization and user's usage,
adding primary key with integer type and involving createTime, updateTime, status fields should be somewhat meaningful.
Of course, you could customize the schema per your concretely circumstance freely.
<pre><code>
CREATE TABLE IF NOT EXISTS `filer_mapping` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`uriPath` char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
`fid` char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
`createTime` int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
`updateTime` int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
`remark` varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
`status` tinyint(2) DEFAULT '1' COMMENT 'resource status',
PRIMARY KEY (`id`),
UNIQUE KEY `index_uriPath` (`uriPath`)
) DEFAULT CHARSET=utf8;
</code></pre>
The MySQL 's config params is not added into the weed command option as other stores(redis,cassandra). Instead,
We created a config file(json format) for them. TOML,YAML or XML also should be OK. But TOML and YAML need import thirdparty package
while XML is a little bit complex.
The sample config file's content is below:
<pre><code>
{
"mysql": [
{
"User": "root",
"Password": "root",
"HostName": "127.0.0.1",
"Port": 3306,
"DataBase": "seaweedfs"
},
{
"User": "root",
"Password": "root",
"HostName": "127.0.0.2",
"Port": 3306,
"DataBase": "seaweedfs"
}
],
"IsSharding":true,
"ShardingNum":1024
}
</code></pre>
The "mysql" field in above conf file is an array which include all mysql instances you prepared to store sharding data.
1. If one mysql instance is enough, just keep one instance in "mysql" field.
2. If table sharding at a specific mysql instance is needed , mark "IsSharding" field with true and specify total table
sharding numbers using "ShardingNum" field.
3. If the mysql service could be auto scaled transparently in your environment, just config one mysql instance(usually it's a frondend proxy or VIP),
and mark "IsSharding" with false value
4. If your prepare more than one mysql instances and have no plan to use table sharding for any instance(mark isSharding with false), instance sharding
will still be done implicitly

View file

@ -1,11 +0,0 @@
CREATE TABLE IF NOT EXISTS `filer_mapping` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`uriPath` char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
`fid` char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
`createTime` int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
`updateTime` int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
`remark` varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
`status` tinyint(2) DEFAULT '1' COMMENT 'resource status',
PRIMARY KEY (`id`),
UNIQUE KEY `index_uriPath` (`uriPath`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

View file

@ -11,11 +11,11 @@ import (
)
const (
sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
maxIdleConnections = 100
maxOpenConnections = 50
maxTableNums = 1024
tableName = "filer_mapping"
sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
default_maxIdleConnections = 100
default_maxOpenConnections = 50
default_maxTableNums = 1024
tableName = "filer_mapping"
)
var (
@ -24,15 +24,24 @@ var (
)
type MySqlConf struct {
User string
Password string
HostName string
Port int
DataBase string
User string
Password string
HostName string
Port int
DataBase string
MaxIdleConnections int
MaxOpenConnections int
}
type ShardingConf struct {
IsSharding bool `json:"isSharding"`
ShardingNum int `json:"shardingNum"`
}
type MySqlStore struct {
dbs []*sql.DB
dbs []*sql.DB
isSharding bool
shardingNum int
}
func getDbConnection(confs []MySqlConf) []*sql.DB {
@ -47,6 +56,19 @@ func getDbConnection(confs []MySqlConf) []*sql.DB {
_db_connection = nil
panic(dbErr)
}
var maxIdleConnections, maxOpenConnections int
if conf.MaxIdleConnections != 0 {
maxIdleConnections = conf.MaxIdleConnections
} else {
maxIdleConnections = default_maxIdleConnections
}
if conf.MaxOpenConnections != 0 {
maxOpenConnections = conf.MaxOpenConnections
} else {
maxOpenConnections = default_maxOpenConnections
}
_db_connection.SetMaxIdleConns(maxIdleConnections)
_db_connection.SetMaxOpenConns(maxOpenConnections)
_db_connections = append(_db_connections, _db_connection)
@ -55,15 +77,24 @@ func getDbConnection(confs []MySqlConf) []*sql.DB {
return _db_connections
}
func NewMysqlStore(confs []MySqlConf) *MySqlStore {
func NewMysqlStore(confs []MySqlConf, isSharding bool, shardingNum int) *MySqlStore {
ms := &MySqlStore{
dbs: getDbConnection(confs),
dbs: getDbConnection(confs),
isSharding: isSharding,
shardingNum: shardingNum,
}
for _, db := range ms.dbs {
for i := 0; i < maxTableNums; i++ {
if !isSharding {
ms.shardingNum = 1
} else {
if ms.shardingNum == 0 {
ms.shardingNum = default_maxTableNums
}
}
for i := 0; i < ms.shardingNum; i++ {
if err := ms.createTables(db, tableName, i); err != nil {
fmt.Printf("create table failed %s", err.Error())
fmt.Printf("create table failed %v", err)
}
}
}
@ -74,21 +105,25 @@ func NewMysqlStore(confs []MySqlConf) *MySqlStore {
func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) {
hash_value := crc32.ChecksumIEEE([]byte(fullFileName))
instance_offset = int(hash_value) % len(s.dbs)
table_postfix = int(hash_value) % maxTableNums
table_postfix = int(hash_value) % s.shardingNum
return
}
func (s *MySqlStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) {
instance_offset, table_postfix := s.hash(path)
instanceId = instance_offset
tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix)
if s.isSharding {
tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix)
} else {
tableFullName = tableName
}
return
}
func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) {
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
if err != nil {
return "", err
return "", fmt.Errorf("MySqlStore Get operation can not parse file path %s: err is %v", fullFilePath, err)
}
fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName)
if err == sql.ErrNoRows {
@ -103,16 +138,18 @@ func (s *MySqlStore) Put(fullFilePath string, fid string) (err error) {
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
if err != nil {
return err
return fmt.Errorf("MySqlStore Put operation can not parse file path %s: err is %v", fullFilePath, err)
}
if old_fid, localErr := s.query(fullFilePath, s.dbs[instance_offset], tableFullName); localErr != nil && localErr != sql.ErrNoRows {
err = localErr
return
var old_fid string
if old_fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil && err != sql.ErrNoRows {
return fmt.Errorf("MySqlStore Put operation failed when querying path %s: err is %v", fullFilePath, err)
} else {
if len(old_fid) == 0 {
err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
err = fmt.Errorf("MySqlStore Put operation failed when inserting path %s with fid %s : err is %v", fullFilePath, fid, err)
} else {
err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
err = fmt.Errorf("MySqlStore Put operation failed when updating path %s with fid %s : err is %v", fullFilePath, fid, err)
}
}
return
@ -122,15 +159,15 @@ func (s *MySqlStore) Delete(fullFilePath string) (err error) {
var fid string
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
if err != nil {
return err
return fmt.Errorf("MySqlStore Delete operation can not parse file path %s: err is %v", fullFilePath, err)
}
if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
return err
return fmt.Errorf("MySqlStore Delete operation failed when querying path %s: err is %v", fullFilePath, err)
} else if fid == "" {
return nil
}
if err := s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
return err
if err = s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
return fmt.Errorf("MySqlStore Delete operation failed when deleting path %s: err is %v", fullFilePath, err)
} else {
return nil
}
@ -143,7 +180,7 @@ func (s *MySqlStore) Close() {
}
var createTable = `
CREATE TABLE IF NOT EXISTS %s_%04d (
CREATE TABLE IF NOT EXISTS %s (
id bigint(20) NOT NULL AUTO_INCREMENT,
uriPath char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
fid char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
@ -153,11 +190,18 @@ CREATE TABLE IF NOT EXISTS %s_%04d (
status tinyint(2) DEFAULT '1' COMMENT 'resource status',
PRIMARY KEY (id),
UNIQUE KEY index_uriPath (uriPath)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
) DEFAULT CHARSET=utf8;
`
func (s *MySqlStore) createTables(db *sql.DB, tableName string, postfix int) error {
stmt, err := db.Prepare(fmt.Sprintf(createTable, tableName, postfix))
var realTableName string
if s.isSharding {
realTableName = fmt.Sprintf("%s_%4d", tableName, postfix)
} else {
realTableName = tableName
}
stmt, err := db.Prepare(fmt.Sprintf(createTable, realTableName))
if err != nil {
return err
}

View file

@ -6,36 +6,6 @@ import (
"testing"
)
/*
To improve performance when storing billion of files, you could shar
At each mysql instance, we will try to create 1024 tables if not exist, table name will be something like:
filer_mapping_0000
filer_mapping_0001
.....
filer_mapping_1023
sample conf should be
>$cat filer_conf.json
{
"mysql": [
{
"User": "root",
"Password": "root",
"HostName": "127.0.0.1",
"Port": 3306,
"DataBase": "seaweedfs"
},
{
"User": "root",
"Password": "root",
"HostName": "127.0.0.2",
"Port": 3306,
"DataBase": "seaweedfs"
}
]
}
*/
func TestGenerateMysqlConf(t *testing.T) {
var conf []MySqlConf
conf = append(conf, MySqlConf{

View file

@ -23,6 +23,7 @@ import (
type filerConf struct {
MysqlConf []mysql_store.MySqlConf `json:"mysql"`
mysql_store.ShardingConf
}
func parseConfFile(confPath string) (*filerConf, error) {
@ -83,7 +84,7 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st
}
if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 {
mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf)
mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf, setting.IsSharding, setting.ShardingNum)
fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store)
} else if cassandra_server != "" {
cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)