427 lines
9.7 KiB
Go
427 lines
9.7 KiB
Go
package tdb
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
|
|
"git.keganmyers.com/terribleplan/tdb/stringy"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
bolt "go.etcd.io/bbolt"
|
|
)
|
|
|
|
type Table interface {
|
|
debugLogger
|
|
Iterable
|
|
Transactable
|
|
|
|
// New
|
|
Create(proto.Message, ...*Tx) (uint64, error)
|
|
CreateOrPanic(proto.Message, ...*Tx) uint64
|
|
|
|
// Read
|
|
Get(uint64, ...*Tx) (proto.Message, error)
|
|
Query() Query
|
|
Where(fieldName, op string, value interface{}) Query
|
|
|
|
// Modify
|
|
Put(value proto.Message, txs ...*Tx) error
|
|
Update(id uint64, action func(proto.Message) error, txs ...*Tx) error
|
|
}
|
|
type TableSetup interface {
|
|
debugLogger
|
|
AddIndex(options SimpleIndexOptions) error
|
|
AddIndexOrPanic(options SimpleIndexOptions)
|
|
AddArrayIndex(options ArrayIndexOptions) error
|
|
AddArrayIndexOrPanic(options ArrayIndexOptions)
|
|
}
|
|
|
|
type IndexQueryOpts struct {
|
|
Desc bool
|
|
}
|
|
|
|
type CreateTableSchema func(createSchema TableSetup) error
|
|
|
|
type table struct {
|
|
db *db
|
|
name string
|
|
nameBytes []byte
|
|
t *dbType
|
|
tPtr *dbPtrType
|
|
idField dbField
|
|
indicies map[string]indexish
|
|
constraints map[string]constraintish
|
|
}
|
|
|
|
func newTable(db *db, t *dbType, idField dbField, createSchema CreateTableSchema) (*table, error) {
|
|
db.debugLogf("Creating table for %s", t.Name)
|
|
ktbl := &table{
|
|
db: db,
|
|
name: t.Name,
|
|
nameBytes: []byte(t.Name),
|
|
t: t,
|
|
tPtr: t.PtrType(),
|
|
idField: idField,
|
|
indicies: make(map[string]indexish),
|
|
constraints: make(map[string]constraintish),
|
|
}
|
|
|
|
err := createSchema(ktbl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return ktbl, nil
|
|
}
|
|
|
|
func (t *table) debugLog(message string) {
|
|
t.db.debugLog(message)
|
|
}
|
|
|
|
func (t *table) debugLogf(f string, args ...interface{}) {
|
|
t.db.debugLogf(f, args...)
|
|
}
|
|
|
|
func (t *table) bucket(tx *Tx) *bolt.Bucket {
|
|
return tx.tx().Bucket(t.nameBytes)
|
|
}
|
|
|
|
func (t *table) AddIndex(options SimpleIndexOptions) error {
|
|
if options.Table != "" && options.Table != t.name {
|
|
t.debugLogf("warn: ignoring table name in index creation options, leave blank to disable this warning (got '%s')", options.Table)
|
|
}
|
|
|
|
if _, exists := t.indicies[options.Field]; exists {
|
|
return fmt.Errorf("There is already an index on '%s'.'%s'", t.name, options.Field)
|
|
}
|
|
|
|
if _, exists := t.constraints[options.Field]; exists {
|
|
return fmt.Errorf("There are already constraints on '%s'.'%s'", t.name, options.Field)
|
|
}
|
|
|
|
options.Table = t.name
|
|
index, err := newSimpleIndex(t, options)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
t.indicies[options.Field] = index
|
|
t.constraints[options.Field] = index
|
|
return nil
|
|
}
|
|
|
|
func (t *table) AddIndexOrPanic(options SimpleIndexOptions) {
|
|
if err := t.AddIndex(options); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (t *table) AddArrayIndex(options ArrayIndexOptions) error {
|
|
if options.Table != "" && options.Table != t.name {
|
|
t.debugLogf("warn: ignoring table name in index creation options, leave blank to disable this warning (got '%s')", options.Table)
|
|
}
|
|
|
|
if _, exists := t.indicies[options.Field]; exists {
|
|
return fmt.Errorf("There is already an index on '%s'.'%s'", t.name, options.Field)
|
|
}
|
|
|
|
if _, exists := t.constraints[options.Field]; exists {
|
|
return fmt.Errorf("There are already constraints on '%s'.'%s'", t.name, options.Field)
|
|
}
|
|
|
|
options.Table = t.name
|
|
index, err := newArrayIndex(t, options)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
t.indicies[options.Field] = index
|
|
t.constraints[options.Field] = index
|
|
return nil
|
|
}
|
|
|
|
func (t *table) AddArrayIndexOrPanic(options ArrayIndexOptions) {
|
|
if err := t.AddArrayIndex(options); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
// Create will insert a record with the next available ID in sequence
|
|
func (t *table) Create(thing proto.Message, txs ...*Tx) (uint64, error) {
|
|
t.debugLogf("[table.Create] Creating '%s'", t.name)
|
|
var id uint64
|
|
pv := dbPtrValueOf(thing)
|
|
|
|
if !pv.IsOfPtrType(t.tPtr) {
|
|
return 0, fmt.Errorf("[table.Create] Expected type '%s' in call (got '%s')", t.tPtr.String(), pv.PtrType().String())
|
|
}
|
|
|
|
if err := t.writeTxHelper(func(tx *Tx) error {
|
|
var idString []byte
|
|
b := t.bucket(tx)
|
|
for {
|
|
id, _ = b.NextSequence()
|
|
idString = []byte(stringy.LiteralUintToString(id))
|
|
if b.Get(idString) == nil {
|
|
break
|
|
}
|
|
}
|
|
t.debugLogf("[table.Create] New '%s' will have Id '%d'", t.name, id)
|
|
|
|
pv.dangerous_Field(t.idField).SetUint(id)
|
|
|
|
if err := t.validate(pv, txs...); err != nil {
|
|
return err
|
|
}
|
|
|
|
data, err := pv.Marshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
b.Put(idString, data)
|
|
|
|
t.updateIndicies(tx, t.tPtr.Zero(), pv)
|
|
t.debugLogf("[table.Create] Created '%s' with Id '%d'", t.name, id)
|
|
return nil
|
|
}, txs...); err != nil {
|
|
return 0, err
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
func (t *table) Put(thing proto.Message, txs ...*Tx) error {
|
|
t.debugLogf("[table.Put] Putting '%s'", t.name)
|
|
pv := dbPtrValueOf(thing)
|
|
|
|
if !pv.IsOfPtrType(t.tPtr) {
|
|
return fmt.Errorf("[table.Put] Expected type '%s' in call (got '%s')", t.tPtr.String(), pv.PtrType().String())
|
|
}
|
|
|
|
id := pv.dangerous_Field(t.idField).Uint()
|
|
idString := []byte(stringy.LiteralUintToString(id))
|
|
|
|
data, err := pv.Marshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := t.writeTxHelper(func(tx *Tx) error {
|
|
b := t.bucket(tx)
|
|
old, err := t.getValWithinTx(b, idString)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := t.validate(pv, tx); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := b.Put(idString, data); err != nil {
|
|
return err
|
|
}
|
|
|
|
t.updateIndicies(tx, old, pv)
|
|
return nil
|
|
}, txs...); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *table) CreateOrPanic(thing proto.Message, txs ...*Tx) uint64 {
|
|
id, err := t.Create(thing, txs...)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return id
|
|
}
|
|
|
|
func (t *table) Get(id uint64, txs ...*Tx) (proto.Message, error) {
|
|
//todo: replace with a query? (once the query engine can optimize .Id = x)
|
|
return t.getRaw([]byte(stringy.ToStringOrPanic(id)), txs...)
|
|
}
|
|
|
|
func (t *table) getRaw(id []byte, txs ...*Tx) (vProtoMessage proto.Message, err error) {
|
|
return vProtoMessage, t.readTxHelper(func(tx *Tx) error {
|
|
vProtoMessage, err = t.getWithinTx(t.bucket(tx), id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}, txs...)
|
|
}
|
|
|
|
func (t *table) getValWithinTx(b *bolt.Bucket, id []byte) (dbPtrValue, error) {
|
|
t.debugLogf("[table.getValWithinTx] looking up '%s'", id)
|
|
//todo: replace with a query? (once the query engine can optimize .Id = x)
|
|
v := b.Get([]byte(id))
|
|
if v == nil {
|
|
t.debugLogf("got nil for '%s'", id)
|
|
return t.tPtr.Zero(), nil
|
|
}
|
|
|
|
return t.tPtr.Unmarshal(v)
|
|
}
|
|
|
|
func (t *table) getWithinTx(b *bolt.Bucket, id []byte) (proto.Message, error) {
|
|
pv, err := t.getValWithinTx(b, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return pv.Proto(), nil
|
|
}
|
|
|
|
func (t *table) Query() Query {
|
|
return &queryData{
|
|
table: t,
|
|
ops: make([]queryOpish, 0),
|
|
}
|
|
}
|
|
|
|
func (t *table) Where(fieldName, op string, value interface{}) Query {
|
|
return t.Query().Where(fieldName, op, value)
|
|
}
|
|
|
|
func (t *table) Update(id uint64, action func(proto.Message) error, txs ...*Tx) error {
|
|
idBytes := []byte(stringy.LiteralUintToString(id))
|
|
return t.writeTxHelper(func(tx *Tx) error {
|
|
b := t.bucket(tx)
|
|
|
|
v := b.Get(idBytes)
|
|
if v == nil {
|
|
t.debugLogf("got nil for '%s'", idBytes)
|
|
return fmt.Errorf("No such entry '%d' in table '%s'", id, t.name)
|
|
}
|
|
|
|
original, err := t.tPtr.Unmarshal(v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
updated, err := t.tPtr.Unmarshal(v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := action(updated.Proto()); err != nil {
|
|
return err
|
|
}
|
|
|
|
data, err := updated.Marshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = b.Put(idBytes, data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
t.updateIndicies(tx, original, updated)
|
|
return nil
|
|
}, txs...)
|
|
}
|
|
|
|
func (t *table) iterateRaw(i rawIterator, txs ...*Tx) error {
|
|
return t.readTxHelper(func(tx *Tx) error {
|
|
c := t.bucket(tx).Cursor()
|
|
for k, v := c.First(); k != nil; k, v = c.Next() {
|
|
t.debugLogf("iterating over '%s' '%s'", t.name, k)
|
|
pv, err := t.tPtr.Unmarshal(v)
|
|
if err != nil {
|
|
t.debugLogf("[table.iterateRaw] error while iterating over '%s' '%s'", t.name, k)
|
|
}
|
|
|
|
signal, err := i(pv)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if signal == StopIteration {
|
|
break
|
|
}
|
|
}
|
|
return nil
|
|
}, txs...)
|
|
}
|
|
|
|
func (t *table) Iterate(i Iterator, txs ...*Tx) error {
|
|
return t.iterateRaw(func(pv dbPtrValue) (IterationSignal, error) {
|
|
return i(pv.Proto())
|
|
}, txs...)
|
|
}
|
|
|
|
func (t *table) IterateKeys(i KeyIterator, txs ...*Tx) error {
|
|
panic(errors.New("unimplemented"))
|
|
}
|
|
|
|
func (t *table) initialize(tx *Tx) error {
|
|
_, err := tx.tx().CreateBucketIfNotExists(t.nameBytes)
|
|
for _, index := range t.indicies {
|
|
if err := index.initialize(tx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (t *table) validate(pv dbPtrValue, txs ...*Tx) error {
|
|
if pv.IsNil() {
|
|
return nil
|
|
}
|
|
|
|
return t.readTxHelper(func(tx *Tx) error {
|
|
for _, c := range t.constraints {
|
|
if err := c.validate(tx, pv); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}, txs...)
|
|
}
|
|
|
|
func (t *table) putIndicies(tx *Tx, after dbPtrValue) {
|
|
for _, index := range t.indicies {
|
|
index.put(tx, after)
|
|
}
|
|
}
|
|
|
|
func (t *table) deleteIndicies(tx *Tx, before dbPtrValue) {
|
|
for _, index := range t.indicies {
|
|
index.delete(tx, before)
|
|
}
|
|
}
|
|
|
|
func (t *table) updateIndiciesRaw(tx *Tx, before, after dbPtrValue) {
|
|
for _, index := range t.indicies {
|
|
index.update(tx, before, after)
|
|
}
|
|
}
|
|
|
|
func (t *table) updateIndicies(tx *Tx, before, after dbPtrValue) {
|
|
if before.IsNil() {
|
|
t.putIndicies(tx, after)
|
|
return
|
|
}
|
|
if after.IsNil() {
|
|
t.deleteIndicies(tx, before)
|
|
return
|
|
}
|
|
t.updateIndiciesRaw(tx, before, after)
|
|
return
|
|
}
|
|
|
|
func (t *table) ReadTx(ta Transaction) error {
|
|
return t.db.ReadTx(ta)
|
|
}
|
|
|
|
func (t *table) readTxHelper(ta Transaction, txs ...*Tx) error {
|
|
return t.db.readTxHelper(ta, txs...)
|
|
}
|
|
|
|
func (t *table) WriteTx(ta Transaction) error {
|
|
return t.db.WriteTx(ta)
|
|
}
|
|
|
|
func (t *table) writeTxHelper(ta Transaction, txs ...*Tx) error {
|
|
return t.db.writeTxHelper(ta, txs...)
|
|
}
|