181 lines
3.9 KiB
Go
181 lines
3.9 KiB
Go
package tdb
|
|
|
|
import (
|
|
"sort"
|
|
|
|
"git.keganmyers.com/terribleplan/tdb/stringy"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
)
|
|
|
|
type queryData struct {
|
|
err error
|
|
table *table
|
|
ops []queryOpish
|
|
sr uint64
|
|
}
|
|
|
|
// NB: "Where" operations should be expected to mutate the underlying query.
|
|
type Query interface {
|
|
Iterable
|
|
debugLogger
|
|
Run(txs ...*Tx) ([]proto.Message, error)
|
|
RunOrPanic(txs ...*Tx) []proto.Message
|
|
First(txs ...*Tx) (proto.Message, error)
|
|
Where(fieldName, op string, value interface{}) Query
|
|
}
|
|
|
|
func (q *queryData) debugLog(message string) {
|
|
q.table.db.debugLog(message)
|
|
}
|
|
|
|
func (q *queryData) debugLogf(f string, args ...interface{}) {
|
|
q.table.db.debugLogf(f, args...)
|
|
}
|
|
|
|
func (q *queryData) Where(fieldName, op string, value interface{}) Query {
|
|
if q.err != nil {
|
|
return q
|
|
}
|
|
|
|
qop, err := createQueryOp(q.table, fieldName, op, value)
|
|
if q.err != nil {
|
|
q.err = err
|
|
return q
|
|
}
|
|
|
|
q.ops = append(q.ops, qop)
|
|
|
|
return q
|
|
}
|
|
|
|
func (q *queryData) Ok() error {
|
|
return q.err
|
|
}
|
|
|
|
func (q *queryData) Iterate(i Iterator, txs ...*Tx) error {
|
|
return q.iterateRaw(func(pv dbPtrValue) (IterationSignal, error) {
|
|
return i(pv.Proto())
|
|
}, txs...)
|
|
}
|
|
|
|
func (q *queryData) IterateKeys(i KeyIterator, txs ...*Tx) error {
|
|
return q.iterateRaw(func(pv dbPtrValue) (IterationSignal, error) {
|
|
return i([]byte(stringy.LiteralUintToString(pv.dangerous_Field(q.table.idField).Uint())))
|
|
}, txs...)
|
|
}
|
|
|
|
func (q *queryData) iterateRaw(i rawIterator, txs ...*Tx) error {
|
|
q.sr = 0
|
|
lenOps := len(q.ops)
|
|
// straight iteration
|
|
if lenOps == 0 {
|
|
q.debugLog("[query] No ops, doing table scan")
|
|
return q.table.iterateRaw(i, txs...)
|
|
}
|
|
|
|
if lenOps == 1 {
|
|
q.debugLog("[query] Single op")
|
|
op := q.ops[0]
|
|
if op.indexed() {
|
|
q.debugLog("[query] Op has index")
|
|
return op.iterateRaw(func(v dbPtrValue) (IterationSignal, error) {
|
|
q.sr++
|
|
return i(v)
|
|
}, txs...)
|
|
} else {
|
|
q.debugLog("[query] Op missing index, doing table scan")
|
|
return q.table.iterateRaw(func(v dbPtrValue) (IterationSignal, error) {
|
|
q.sr++
|
|
if op.match(v) {
|
|
return i(v)
|
|
}
|
|
return ContinueIteration, nil
|
|
}, txs...)
|
|
}
|
|
}
|
|
|
|
anyHaveIndex := false
|
|
sort.SliceStable(q.ops, func(i, j int) bool {
|
|
ihi := q.ops[i].indexed()
|
|
jhi := q.ops[j].indexed()
|
|
|
|
anyHaveIndex = anyHaveIndex || ihi || jhi
|
|
|
|
if ihi {
|
|
return true
|
|
}
|
|
return false
|
|
})
|
|
|
|
var source rawIterable = q.table
|
|
conditions := q.ops
|
|
|
|
if anyHaveIndex {
|
|
q.debugLogf("[query] Using index for '%s' to scan", conditions[0].String())
|
|
// first condition is iterated over, others are executed as conditions
|
|
source = conditions[0]
|
|
conditions = conditions[1:]
|
|
} else {
|
|
q.debugLog("[query] No index, using table scan")
|
|
}
|
|
|
|
return source.iterateRaw(func(v dbPtrValue) (IterationSignal, error) {
|
|
matches := true
|
|
q.sr++
|
|
for _, op := range conditions {
|
|
if !op.match(v) {
|
|
matches = false
|
|
break
|
|
}
|
|
}
|
|
if matches {
|
|
return i(v)
|
|
}
|
|
return ContinueIteration, nil
|
|
}, txs...)
|
|
}
|
|
|
|
func (query *queryData) Run(txs ...*Tx) ([]proto.Message, error) {
|
|
res := make([]proto.Message, 0)
|
|
if err := query.Iterate(func(item proto.Message) (IterationSignal, error) {
|
|
res = append(res, item)
|
|
return ContinueIteration, nil
|
|
}, txs...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (query *queryData) RunOrPanic(txs ...*Tx) []proto.Message {
|
|
res, err := query.Run(txs...)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return res
|
|
}
|
|
|
|
func (query *queryData) First(txs ...*Tx) (proto.Message, error) {
|
|
var rm proto.Message
|
|
if err := query.Iterate(func(m proto.Message) (IterationSignal, error) {
|
|
rm = m
|
|
return StopIteration, nil
|
|
}, txs...); err != nil {
|
|
return nil, err
|
|
}
|
|
return rm, nil
|
|
}
|
|
|
|
func (query *queryData) Update(txs ...*Tx) ([]proto.Message, error) {
|
|
res := make([]proto.Message, 0)
|
|
if err := query.Iterate(func(item proto.Message) (IterationSignal, error) {
|
|
res = append(res, item)
|
|
return ContinueIteration, nil
|
|
}, txs...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return res, nil
|
|
}
|