mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
68d39c86f1
The exisitng key-value operation for stores using mysql, postgres, and maybe cassandra are already broken. The kv is used to store hardlink, filer store signature and replication progress. So users using hardlink and also uses mysql, postgres, or cassandra will have broken hard links. Users using filer.sync will need to re-sync the files.
89 lines
2.1 KiB
Go
89 lines
2.1 KiB
Go
package abstract_sql
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/filer"
|
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
|
"github.com/chrislusf/seaweedfs/weed/util"
|
|
)
|
|
|
|
func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
|
|
|
|
dirStr, dirHash, name := genDirAndName(key)
|
|
|
|
res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, dirHash, name, dirStr, value)
|
|
if err != nil {
|
|
if !strings.Contains(strings.ToLower(err.Error()), "duplicate") {
|
|
return fmt.Errorf("kv insert: %s", err)
|
|
}
|
|
}
|
|
|
|
// now the insert failed possibly due to duplication constraints
|
|
glog.V(1).Infof("kv insert falls back to update: %s", err)
|
|
|
|
res, err = store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, value, dirHash, name, dirStr)
|
|
if err != nil {
|
|
return fmt.Errorf("kv upsert: %s", err)
|
|
}
|
|
|
|
_, err = res.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("kv upsert no rows affected: %s", err)
|
|
}
|
|
return nil
|
|
|
|
}
|
|
|
|
func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
|
|
|
|
dirStr, dirHash, name := genDirAndName(key)
|
|
row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, dirHash, name, dirStr)
|
|
|
|
err = row.Scan(&value)
|
|
|
|
if err == sql.ErrNoRows {
|
|
return nil, filer.ErrKvNotFound
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kv get: %v", err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err error) {
|
|
|
|
dirStr, dirHash, name := genDirAndName(key)
|
|
|
|
res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, dirHash, name, dirStr)
|
|
if err != nil {
|
|
return fmt.Errorf("kv delete: %s", err)
|
|
}
|
|
|
|
_, err = res.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("kv delete no rows affected: %s", err)
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
func genDirAndName(key []byte) (dirStr string, dirHash int64, name string) {
|
|
for len(key) < 8 {
|
|
key = append(key, 0)
|
|
}
|
|
|
|
dirHash = int64(util.BytesToUint64(key[:8]))
|
|
dirStr = base64.StdEncoding.EncodeToString(key[:8])
|
|
name = base64.StdEncoding.EncodeToString(key[8:])
|
|
|
|
return
|
|
}
|