ListDirectoryPrefixedEntries

This commit is contained in:
Устюжанин Антон Александрович 2020-08-06 00:37:42 +05:00
parent a457c308ad
commit b231f7bdab
3 changed files with 10 additions and 82 deletions

View file

@ -4,8 +4,6 @@ import (
"context" "context"
"database/sql" "database/sql"
"fmt" "fmt"
"strings"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@ -151,87 +149,13 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
return nil return nil
} }
//func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
// sqlText := store.SqlListExclusive
// if inclusive {
// sqlText = store.SqlListInclusive
// }
//
// rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), prefix, limit)
// if err != nil {
// return nil, fmt.Errorf("list %s : %v", fullpath, err)
// }
// defer rows.Close()
//
// for rows.Next() {
// var name string
// var data []byte
// if err = rows.Scan(&name, &data); err != nil {
// glog.V(0).Infof("scan %s : %v", fullpath, err)
// return nil, fmt.Errorf("scan %s: %v", fullpath, err)
// }
//
// entry := &filer2.Entry{
// FullPath: util.NewFullPath(string(fullpath), name),
// }
// if err = entry.DecodeAttributesAndChunks(data); err != nil {
// glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
// return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
// }
//
// entries = append(entries, entry)
// }
//
// return entries, nil
//}
//func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
// return nil, fmt.Errorf("not implemented")
//
//}
func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
var lastFileName string
for count < limit {
for _, entry := range notPrefixed {
lastFileName = entry.Name()
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, lastFileName, inclusive, limit)
if err != nil {
return nil, err
}
if len(notPrefixed) == 0 {
break
}
}
return entries, nil
}
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
sqlText := store.SqlListExclusive sqlText := store.SqlListExclusive
if inclusive { if inclusive {
sqlText = store.SqlListInclusive sqlText = store.SqlListInclusive
} }
rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), limit) rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), prefix, limit)
if err != nil { if err != nil {
return nil, fmt.Errorf("list %s : %v", fullpath, err) return nil, fmt.Errorf("list %s : %v", fullpath, err)
} }
@ -258,6 +182,10 @@ func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpat
return entries, nil return entries, nil
} }
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
return nil, fmt.Errorf("not implemented")
}
func (store *AbstractSqlStore) Shutdown() { func (store *AbstractSqlStore) Shutdown() {
store.DB.Close() store.DB.Close()

View file

@ -41,14 +41,14 @@ func (store *MysqlStore) Initialize(configuration util.Configuration, prefix str
func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int, func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int,
interpolateParams bool) (err error) { interpolateParams bool) (err error) {
//AND name like CONCAT(?,'%') //
store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)" store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)"
store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?" store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?"
store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?" store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?" store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?" store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?"
store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? ORDER BY NAME ASC LIMIT ?" store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? AND name like CONCAT(?,'%') ORDER BY NAME ASC LIMIT ?"
store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? ORDER BY NAME ASC LIMIT ?" store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? AND name like CONCAT(?,'%') ORDER BY NAME ASC LIMIT ?"
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
if interpolateParams { if interpolateParams {

View file

@ -46,8 +46,8 @@ func (store *PostgresStore) initialize(user, password, hostname string, port int
store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2" store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2"
store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4" store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like CONCAT($4,'%')ORDER BY NAME ASC LIMIT $5"
store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4" store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like CONCAT($4,'%') ORDER BY NAME ASC LIMIT $5"
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, user, sslmode) sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, user, sslmode)
if password != "" { if password != "" {