seaweedfs/weed/filer/postgres_store/postgres_store.go

150 lines
4 KiB
Go
Raw Normal View History

2016-12-08 00:24:40 +00:00
package postgres_store
import (
"database/sql"
"errors"
2016-12-08 00:24:40 +00:00
"fmt"
"sync"
"github.com/chrislusf/seaweedfs/weed/filer"
2016-12-08 00:24:40 +00:00
"github.com/chrislusf/seaweedfs/weed/glog"
_ "github.com/lib/pq"
_ "path/filepath"
2018-05-05 09:01:50 +00:00
"path/filepath"
2016-12-08 00:24:40 +00:00
)
const (
default_maxIdleConnections = 100
default_maxOpenConnections = 50
filesTableName = "files"
directoriesTableName = "directories"
2016-12-08 00:24:40 +00:00
)
var (
_init_db sync.Once
_db_connection *sql.DB
2016-12-08 00:24:40 +00:00
)
type PostgresConf struct {
User string
Password string
HostName string
Port int
DataBase string
SslMode string
2016-12-08 00:24:40 +00:00
MaxIdleConnections int
MaxOpenConnections int
}
type PostgresStore struct {
db *sql.DB
server string
user string
2016-12-08 00:24:40 +00:00
password string
}
2018-04-16 08:16:06 +00:00
func (s *PostgresStore) CreateFile(fullFilePath string, fid string) (err error) {
2016-12-08 00:24:40 +00:00
var old_fid string
if old_fid, err = s.query(fullFilePath); err != nil && err != sql.ErrNoRows {
2016-12-08 00:24:40 +00:00
return fmt.Errorf("PostgresStore Put operation failed when querying path %s: err is %v", fullFilePath, err)
} else {
if len(old_fid) == 0 {
err = s.insert(fullFilePath, fid)
2016-12-08 00:24:40 +00:00
if err != nil {
return fmt.Errorf("PostgresStore Put operation failed when inserting path %s with fid %s : err is %v", fullFilePath, fid, err)
}
} else {
err = s.update(fullFilePath, fid)
2016-12-08 00:24:40 +00:00
if err != nil {
return fmt.Errorf("PostgresStore Put operation failed when updating path %s with fid %s : err is %v", fullFilePath, fid, err)
}
}
}
return
2018-04-16 08:16:06 +00:00
2016-12-08 00:24:40 +00:00
}
2018-04-16 08:16:06 +00:00
func (s *PostgresStore) FindFile(fullFilePath string) (fid string, err error) {
2016-12-08 00:24:40 +00:00
if err != nil {
2018-04-16 08:16:06 +00:00
return "", fmt.Errorf("PostgresStore Get operation can not parse file path %s: err is %v", fullFilePath, err)
}
fid, err = s.query(fullFilePath)
return fid, err
}
2018-05-05 09:01:50 +00:00
func (s *PostgresStore) LookupDirectoryEntry(dirPath string, name string) (found bool, fileId string, err error) {
fullPath := filepath.Join(dirPath, name)
if fileId, err = s.FindFile(fullPath); err == nil {
return true, fileId, nil
}
if _, _, err := s.lookupDirectory(fullPath); err == nil {
return true, "", err
}
return false, "", err
}
2018-04-16 08:16:06 +00:00
func (s *PostgresStore) DeleteFile(fullFilePath string) (fid string, err error) {
if err != nil {
return "", fmt.Errorf("PostgresStore Delete operation can not parse file path %s: err is %v", fullFilePath, err)
2016-12-08 00:24:40 +00:00
}
if fid, err = s.query(fullFilePath); err != nil {
2018-04-16 08:16:06 +00:00
return "", fmt.Errorf("PostgresStore Delete operation failed when querying path %s: err is %v", fullFilePath, err)
2016-12-08 00:24:40 +00:00
} else if fid == "" {
2018-04-16 08:16:06 +00:00
return "", nil
2016-12-08 00:24:40 +00:00
}
if err = s.delete(fullFilePath); err != nil {
2018-04-16 08:16:06 +00:00
return "", fmt.Errorf("PostgresStore Delete operation failed when deleting path %s: err is %v", fullFilePath, err)
2016-12-08 00:24:40 +00:00
} else {
2018-04-16 08:16:06 +00:00
return "", nil
2016-12-08 00:24:40 +00:00
}
}
2018-05-05 09:01:50 +00:00
func (s *PostgresStore) ListDirectories(dirPath string) (dirs []filer.DirectoryName, err error) {
2018-04-16 08:16:06 +00:00
2018-05-05 09:01:50 +00:00
dirs, err = s.findDirectories(dirPath, 1000)
glog.V(3).Infof("Postgres ListDirs = found %d directories under %s", len(dirs), dirPath)
return dirs, err
}
2018-04-16 08:16:06 +00:00
func (s *PostgresStore) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
files, err = s.findFiles(dirPath, lastFileName, limit)
return files, err
}
2018-04-16 08:16:06 +00:00
func (s *PostgresStore) DeleteDirectory(dirPath string, recursive bool) (err error) {
err = s.deleteDirectory(dirPath, recursive)
if err != nil {
2018-04-16 08:16:06 +00:00
glog.V(0).Infof("Error in Postgres DeleteDir '%s' (recursive = '%t'): %s", err)
}
2018-04-16 08:16:06 +00:00
return err
}
2018-04-16 08:16:06 +00:00
func (s *PostgresStore) Move(fromPath string, toPath string) (err error) {
glog.V(3).Infoln("Calling posgres_store Move")
return errors.New("Move is not yet implemented for the PostgreSQL store.")
}
2018-04-16 08:16:06 +00:00
//func NewPostgresStore(master string, confs []PostgresConf, isSharding bool, shardCount int) *PostgresStore {
func NewPostgresStore(master string, conf PostgresConf) *PostgresStore {
pg := &PostgresStore{
db: getDbConnection(conf),
}
2018-04-16 08:16:06 +00:00
pg.createDirectoriesTable()
2018-04-16 08:16:06 +00:00
if err := pg.createFilesTable(); err != nil {
fmt.Printf("create table failed %v", err)
}
2018-04-16 08:16:06 +00:00
return pg
}
2018-04-16 08:16:06 +00:00
func (s *PostgresStore) Close() {
s.db.Close()
}