-
Notifications
You must be signed in to change notification settings - Fork 53
/
Copy pathplanner.go
398 lines (335 loc) · 10.4 KB
/
planner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
// Copyright 2020 Source Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package planner
import (
"context"
"fmt"
"reflect"
"github.com/pkg/errors"
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/query/graphql/parser"
"github.com/graphql-go/graphql/language/ast"
)
// planNode is an interface all nodes in the plan tree need to implement
type planNode interface {
// Initializes or Re-Initializes an existing planNode
// Often called internally by Start()
Init() error
// Starts any internal logic or processes
// required by the plan node.
Start() error
// Next processes the next result doc from
// the query. Can only be called *after*
// Start(). Can't be called again if any
// previous call returns false.
Next() (bool, error)
// Spans sets the planNodes target
// spans. This is primarily only used
// for a scanNode, but based on the tree
// structure, may need to be propogated
// Eg. From a selectNode -> scanNode.
Spans(core.Spans)
// returns the value of the current doc
// processed by the executor
Values() map[string]interface{}
// Source returns the child planNode that
// generates the source values for this plan.
// If a plan has no source, return nil
Source() planNode
// Close terminates the planNode execution releases its resources.
Close()
}
// basic plan Node that implements the planNode interface
// can be added to any struct to turn it into a planNode
type baseNode struct {
plan planNode
}
func (n *baseNode) Init() error { return n.plan.Init() }
func (n *baseNode) Start() error { return n.plan.Start() }
func (n *baseNode) Next() (bool, error) { return n.plan.Next() }
func (n *baseNode) Spans(spans core.Spans) { n.plan.Spans(spans) }
func (n *baseNode) Values() map[string]interface{} { return n.plan.Values() }
func (n *baseNode) Close() { n.plan.Close() }
func (n *baseNode) Source() planNode { return n.plan }
type ExecutionContext struct {
context.Context
}
type PlanContext struct {
context.Context
}
type Statement struct {
requestString string
requestDocument *ast.Document // parser.Statement -> parser.Query - >
requestQuery parser.Query
}
// Planner combines session state and databse state to
// produce a query plan, which is run by the exuction context.
type Planner struct {
txn client.Txn
db client.DB
ctx context.Context
evalCtx parser.EvalContext
// isFinalized bool
}
func makePlanner(db client.DB, txn client.Txn) *Planner {
ctx := context.Background()
return &Planner{
txn: txn,
db: db,
ctx: ctx,
}
}
func (p *Planner) newPlan(stmt parser.Statement) (planNode, error) {
switch n := stmt.(type) {
case *parser.Query:
if len(n.Queries) > 0 {
return p.newPlan(n.Queries[0]) // @todo, handle multiple query statements
} else if len(n.Mutations) > 0 {
return p.newPlan(n.Mutations[0]) // @todo: handle multiple mutation statements
} else {
return nil, errors.New("Query is missing query or mutation statements")
}
case *parser.OperationDefinition:
if len(n.Selections) == 0 {
return nil, errors.New("OperationDefinition is missing selections")
}
return p.newPlan(n.Selections[0])
case *parser.Select:
return p.Select(n)
case *parser.CommitSelect:
return p.CommitSelect(n)
case *parser.Mutation:
return p.newObjectMutationPlan(n)
}
return nil, errors.Errorf("unknown statement type %T", stmt)
}
func (p *Planner) newObjectMutationPlan(stmt *parser.Mutation) (planNode, error) {
switch stmt.Type {
case parser.CreateObjects:
return p.CreateDoc(stmt)
case parser.UpdateObjects:
return p.UpdateDocs(stmt)
default:
return nil, errors.Errorf("unknown mutation action %T", stmt.Type)
}
}
func (p *Planner) makePlan(stmt parser.Statement) (planNode, error) {
plan, err := p.newPlan(stmt)
if err != nil {
return nil, err
}
err = p.optimizePlan(plan)
if err != nil {
return nil, err
}
return plan, nil
}
// plan optimization. Includes plan expansion and wiring
func (p *Planner) optimizePlan(plan planNode) error {
err := p.expandPlan(plan)
return err
}
// full plan graph expansion and optimization
func (p *Planner) expandPlan(plan planNode) error {
switch n := plan.(type) {
case *selectTopNode:
return p.expandSelectTopNodePlan(n)
case *commitSelectTopNode:
return p.expandPlan(n.plan)
case *selectNode:
return p.expandPlan(n.source)
case *typeIndexJoin:
return p.expandTypeIndexJoinPlan(n)
case *groupNode:
// We only care about expanding the child source here, it is assumed that the parent source
// is expanded elsewhere/already
return p.expandPlan(n.dataSource.childSource)
case MultiNode:
return p.expandMultiNode(n)
case *updateNode:
return p.expandPlan(n.results)
default:
return nil
}
}
func (p *Planner) expandSelectTopNodePlan(plan *selectTopNode) error {
if err := p.expandPlan(plan.source); err != nil {
return err
}
// wire up source to plan
plan.plan = plan.source
// if group
if plan.group != nil {
err := p.expandGroupNodePlan(plan)
if err != nil {
return err
}
plan.plan = plan.group
}
// wire up the render plan
if plan.render != nil {
plan.render.plan = plan.plan
plan.plan = plan.render
}
// if order
if plan.sort != nil {
plan.sort.plan = plan.plan
plan.plan = plan.sort
}
if plan.limit != nil {
plan.limit.plan = plan.plan
plan.plan = plan.limit
}
return nil
}
func (p *Planner) expandMultiNode(plan MultiNode) error {
for _, child := range plan.Children() {
if err := p.expandPlan(child); err != nil {
return err
}
}
return nil
}
// func (p *Planner) expandSelectNodePlan(plan *selectNode) error {
// fmt.Println("Expanding select plan")
// return p.expandPlan(plan.source)
// }
func (p *Planner) expandTypeIndexJoinPlan(plan *typeIndexJoin) error {
switch node := plan.joinPlan.(type) {
case *typeJoinOne:
return p.expandPlan(node.subType)
case *typeJoinMany:
return p.expandPlan(node.subType)
}
return errors.New("Unknown type index join plan")
}
func (p *Planner) expandGroupNodePlan(plan *selectTopNode) error {
var childSource planNode
// @todo: this is incorrect for joins within `_group` collections, and should be corrected when possible
// Find the first scan node in the plan, we assume that it will be for the correct collection
scanNode := p.walkAndFindPlanType(plan.plan, &scanNode{}).(*scanNode)
// Check for any existing pipe nodes in the plan, we should use it if there is one
pipe, hasPipe := p.walkAndFindPlanType(plan.plan, &pipeNode{}).(*pipeNode)
if !hasPipe {
newPipeNode := newPipeNode()
pipe = &newPipeNode
pipe.source = scanNode
}
if plan.group.childSelect != nil {
if plan.group.childSelect.GroupBy != nil {
// group by fields have to be propogated downwards to ensure correct sub-grouping, otherwise child
// groups will only group on the fields they explicitly reference
plan.group.childSelect.GroupBy.Fields = append(plan.group.childSelect.GroupBy.Fields, plan.group.groupByFields...)
}
childSelectNode, err := p.SelectFromSource(plan.group.childSelect, pipe, false)
if err != nil {
return err
}
// We need to remove the render so that any child records are preserved on arrival at the parent
childSelectNode.(*selectTopNode).render = nil
childSource = childSelectNode
}
plan.group.dataSource.childSource = childSource
plan.group.dataSource.parentSource = plan.plan
plan.group.dataSource.pipeNode = pipe
if err := p.walkAndReplacePlan(plan.group, scanNode, pipe); err != nil {
return err
}
return p.expandPlan(childSource)
}
// func (p *Planner) QueryDocs(query parser.Query) {
// }
// walkAndReplace walks through the provided plan, and searches for an instance
// of the target plan, and replaces it with the replace plan
func (p *Planner) walkAndReplacePlan(plan, target, replace planNode) error {
src := plan.Source()
if src == nil {
return nil
}
// not our target plan
// walk into the next plan
if src != target {
return p.walkAndReplacePlan(src, target, replace)
}
// We've found our plan, figure out what type our current plan is
// and update accordingly
switch node := plan.(type) {
case *selectNode:
node.source = replace
case *typeJoinOne:
node.root = replace
case *typeJoinMany:
node.root = replace
case *pipeNode:
/* Do nothing - pipe nodes should not be replaced */
// @todo: add more nodes that apply here
default:
return fmt.Errorf("Unknown plan node type to replace: %T", node)
}
return nil
}
// walkAndFindPlanType walks through the plan graph, and returns the first
// instance of a plan, that matches the same type as the target plan
func (p *Planner) walkAndFindPlanType(plan, target planNode) planNode {
src := plan
if src == nil {
return nil
}
srcType := reflect.TypeOf(src)
targetType := reflect.TypeOf(target)
if srcType != targetType {
return p.walkAndFindPlanType(plan.Source(), target)
}
return src
}
func (p *Planner) queryDocs(query *parser.Query) ([]map[string]interface{}, error) {
plan, err := p.query(query)
if err != nil {
return nil, err
}
defer plan.Close()
if err := plan.Start(); err != nil {
return nil, err
}
if next, err := plan.Next(); err != nil || !next {
return nil, err
}
var docs []map[string]interface{}
for {
if values := plan.Values(); values != nil {
copy := copyMap(values)
docs = append(docs, copy)
}
next, err := plan.Next()
if err != nil {
return nil, err
}
if !next {
break
}
}
return docs, nil
}
func (p *Planner) query(query *parser.Query) (planNode, error) {
return p.makePlan(query)
}
func copyMap(m map[string]interface{}) map[string]interface{} {
cp := make(map[string]interface{})
for k, v := range m {
vm, ok := v.(map[string]interface{})
if ok {
cp[k] = copyMap(vm)
} else {
cp[k] = v
}
}
return cp
}