181 lines
3.9 KiB
Go
181 lines
3.9 KiB
Go
|
package tdb
|
||
|
|
||
|
import (
|
||
|
"sort"
|
||
|
|
||
|
"git.keganmyers.com/terribleplan/tdb/stringy"
|
||
|
|
||
|
"github.com/golang/protobuf/proto"
|
||
|
)
|
||
|
|
||
|
type queryData struct {
|
||
|
err error
|
||
|
table *table
|
||
|
ops []queryOpish
|
||
|
sr uint64
|
||
|
}
|
||
|
|
||
|
// NB: "Where" operations should be expected to mutate the underlying query.
|
||
|
type Query interface {
|
||
|
Iterable
|
||
|
debugLogger
|
||
|
Run(txs ...*Tx) ([]proto.Message, error)
|
||
|
RunOrPanic(txs ...*Tx) []proto.Message
|
||
|
First(txs ...*Tx) (proto.Message, error)
|
||
|
Where(fieldName, op string, value interface{}) Query
|
||
|
}
|
||
|
|
||
|
func (q *queryData) debugLog(message string) {
|
||
|
q.table.db.debugLog(message)
|
||
|
}
|
||
|
|
||
|
func (q *queryData) debugLogf(f string, args ...interface{}) {
|
||
|
q.table.db.debugLogf(f, args...)
|
||
|
}
|
||
|
|
||
|
func (q *queryData) Where(fieldName, op string, value interface{}) Query {
|
||
|
if q.err != nil {
|
||
|
return q
|
||
|
}
|
||
|
|
||
|
qop, err := createQueryOp(q.table, fieldName, op, value)
|
||
|
if q.err != nil {
|
||
|
q.err = err
|
||
|
return q
|
||
|
}
|
||
|
|
||
|
q.ops = append(q.ops, qop)
|
||
|
|
||
|
return q
|
||
|
}
|
||
|
|
||
|
func (q *queryData) Ok() error {
|
||
|
return q.err
|
||
|
}
|
||
|
|
||
|
func (q *queryData) Iterate(i Iterator, txs ...*Tx) error {
|
||
|
return q.iterateRaw(func(pv dbPtrValue) (IterationSignal, error) {
|
||
|
return i(pv.Proto())
|
||
|
}, txs...)
|
||
|
}
|
||
|
|
||
|
func (q *queryData) IterateKeys(i KeyIterator, txs ...*Tx) error {
|
||
|
return q.iterateRaw(func(pv dbPtrValue) (IterationSignal, error) {
|
||
|
return i([]byte(stringy.LiteralUintToString(pv.dangerous_Field(q.table.idField).Uint())))
|
||
|
}, txs...)
|
||
|
}
|
||
|
|
||
|
func (q *queryData) iterateRaw(i rawIterator, txs ...*Tx) error {
|
||
|
q.sr = 0
|
||
|
lenOps := len(q.ops)
|
||
|
// straight iteration
|
||
|
if lenOps == 0 {
|
||
|
q.debugLog("[query] No ops, doing table scan")
|
||
|
return q.table.iterateRaw(i, txs...)
|
||
|
}
|
||
|
|
||
|
if lenOps == 1 {
|
||
|
q.debugLog("[query] Single op")
|
||
|
op := q.ops[0]
|
||
|
if op.indexed() {
|
||
|
q.debugLog("[query] Op has index")
|
||
|
return op.iterateRaw(func(v dbPtrValue) (IterationSignal, error) {
|
||
|
q.sr++
|
||
|
return i(v)
|
||
|
}, txs...)
|
||
|
} else {
|
||
|
q.debugLog("[query] Op missing index, doing table scan")
|
||
|
return q.table.iterateRaw(func(v dbPtrValue) (IterationSignal, error) {
|
||
|
q.sr++
|
||
|
if op.match(v) {
|
||
|
return i(v)
|
||
|
}
|
||
|
return ContinueIteration, nil
|
||
|
}, txs...)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
anyHaveIndex := false
|
||
|
sort.SliceStable(q.ops, func(i, j int) bool {
|
||
|
ihi := q.ops[i].indexed()
|
||
|
jhi := q.ops[j].indexed()
|
||
|
|
||
|
anyHaveIndex = anyHaveIndex || ihi || jhi
|
||
|
|
||
|
if ihi {
|
||
|
return true
|
||
|
}
|
||
|
return false
|
||
|
})
|
||
|
|
||
|
var source rawIterable = q.table
|
||
|
conditions := q.ops
|
||
|
|
||
|
if anyHaveIndex {
|
||
|
q.debugLogf("[query] Using index for '%s' to scan", conditions[0].String())
|
||
|
// first condition is iterated over, others are executed as conditions
|
||
|
source = conditions[0]
|
||
|
conditions = conditions[1:]
|
||
|
} else {
|
||
|
q.debugLog("[query] No index, using table scan")
|
||
|
}
|
||
|
|
||
|
return source.iterateRaw(func(v dbPtrValue) (IterationSignal, error) {
|
||
|
matches := true
|
||
|
q.sr++
|
||
|
for _, op := range conditions {
|
||
|
if !op.match(v) {
|
||
|
matches = false
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
if matches {
|
||
|
return i(v)
|
||
|
}
|
||
|
return ContinueIteration, nil
|
||
|
}, txs...)
|
||
|
}
|
||
|
|
||
|
func (query *queryData) Run(txs ...*Tx) ([]proto.Message, error) {
|
||
|
res := make([]proto.Message, 0)
|
||
|
if err := query.Iterate(func(item proto.Message) (IterationSignal, error) {
|
||
|
res = append(res, item)
|
||
|
return ContinueIteration, nil
|
||
|
}, txs...); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return res, nil
|
||
|
}
|
||
|
|
||
|
func (query *queryData) RunOrPanic(txs ...*Tx) []proto.Message {
|
||
|
res, err := query.Run(txs...)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
return res
|
||
|
}
|
||
|
|
||
|
func (query *queryData) First(txs ...*Tx) (proto.Message, error) {
|
||
|
var rm proto.Message
|
||
|
if err := query.Iterate(func(m proto.Message) (IterationSignal, error) {
|
||
|
rm = m
|
||
|
return StopIteration, nil
|
||
|
}, txs...); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return rm, nil
|
||
|
}
|
||
|
|
||
|
func (query *queryData) Update(txs ...*Tx) ([]proto.Message, error) {
|
||
|
res := make([]proto.Message, 0)
|
||
|
if err := query.Iterate(func(item proto.Message) (IterationSignal, error) {
|
||
|
res = append(res, item)
|
||
|
return ContinueIteration, nil
|
||
|
}, txs...); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return res, nil
|
||
|
}
|