Skip to content

Commit

Permalink
feat: make sure the query plan nodes have unique ids (#3297)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanyzhang authored Nov 2, 2020
1 parent 15923a0 commit 958736c
Show file tree
Hide file tree
Showing 12 changed files with 4,455 additions and 13 deletions.
14 changes: 9 additions & 5 deletions execute/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,21 +402,25 @@ func (s *QueryProfiler) getTableBuilder(q flux.Query, alloc *memory.Allocator) (
stats.TotalAllocated,
strings.Join(stats.RuntimeErrors, "\n"),
}
stats.Metadata.Range(func(key string, value interface{}) bool {
for key, values := range stats.Metadata {
var ty flux.ColType
if intValue, ok := value.(int); ok {
if intValue, ok := values[0].(int); ok {
ty = flux.TInt
colData = append(colData, int64(intValue))
} else {
ty = flux.TString
colData = append(colData, fmt.Sprintf("%v", value))
var data string
for _, value := range values {
valueStr := fmt.Sprintf("%v", value)
data += valueStr + "\n"
}
colData = append(colData, data)
}
colMeta = append(colMeta, flux.ColMeta{
Label: key,
Type: ty,
})
return true
})
}
for _, col := range colMeta {
if _, err := b.AddCol(col); err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion execute/profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ func TestQueryProfiler_GetResult(t *testing.T) {
#default,_profiler,,,,,,,,,,,,,,,
,result,table,_measurement,TotalDuration,CompileDuration,QueueDuration,PlanDuration,RequeueDuration,ExecuteDuration,Concurrency,MaxAllocated,TotalAllocated,RuntimeErrors,flux/query-plan,influxdb/scanned-bytes,influxdb/scanned-values
,,0,profiler/query,1,2,3,4,5,6,7,8,9,"1
2","query plan",10,11
2","query plan
",10,11
`
q.Done()
tbl, err := p.GetResult(q, &memory.Allocator{})
Expand Down
15 changes: 11 additions & 4 deletions internal/spec/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/plan"
"github.com/opentracing/opentracing-go"
)

type ider struct {
id int
id *int
lookup map[*flux.TableObject]flux.OperationID
}

func (i *ider) nextID() int {
next := i.id
i.id++
next := *i.id
*i.id++
return next
}

Expand All @@ -44,8 +45,14 @@ func (i *ider) ID(t *flux.TableObject) flux.OperationID {
}

func FromEvaluation(ctx context.Context, ses []interpreter.SideEffect, now time.Time) (*flux.Spec, error) {
var nextNodeID *int
if value := ctx.Value(plan.NextPlanNodeIDKey); value != nil {
nextNodeID = value.(*int)
} else {
nextNodeID = new(int)
}
ider := &ider{
id: 0,
id: nextNodeID,
lookup: make(map[*flux.TableObject]flux.OperationID),
}

Expand Down
2 changes: 2 additions & 0 deletions lang/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ func (p *AstProgram) Start(ctx context.Context, alloc *memory.Allocator) (flux.Q
// function calls during the evaluation phase (see `tableFind`).
deps := execute.NewExecutionDependencies(alloc, &p.Now, p.Logger)
ctx = deps.Inject(ctx)
nextPlanNodeID := new(int)
ctx = context.WithValue(ctx, plan.NextPlanNodeIDKey, nextPlanNodeID)

// Evaluation.
sp, scope, err := p.getSpec(ctx, alloc)
Expand Down
142 changes: 142 additions & 0 deletions lang/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package lang_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/influxdata/flux"
_ "github.com/influxdata/flux/builtin"
"github.com/influxdata/flux/execute/executetest"
Expand Down Expand Up @@ -167,3 +169,143 @@ csv.from(csv: data) |> map(fn: (r) => r.nonexistent)`
t.Fatalf("unexpected error from query execution: %s", q.Err())
}
}

// This test verifies that when a query involves table functions or chain(), the plan nodes
// the main query generates does not reuse the node IDs that are already used by the table
// functions or chain()
func TestPlanNodeUniqueness(t *testing.T) {
prelude := `
import "experimental/array"
import "experimental"
data = array.from(rows: [{
_measurement: "command",
_field: "id",
_time: 2018-12-19T22:13:30Z,
_value: 12,
}, {
_measurement: "command",
_field: "id",
_time: 2018-12-19T22:13:40Z,
_value: 23,
}, {
_measurement: "command",
_field: "id",
_time: 2018-12-19T22:13:50Z,
_value: 34,
}, {
_measurement: "command",
_field: "guild",
_time: 2018-12-19T22:13:30Z,
_value: 12,
}, {
_measurement: "command",
_field: "guild",
_time: 2018-12-19T22:13:40Z,
_value: 23,
}, {
_measurement: "command",
_field: "guild",
_time: 2018-12-19T22:13:50Z,
_value: 34,
}])
`
tcs := []struct {
name string
script string
want string
}{
{
name: "chain",
script: `
id = data
|> range(start: 2018-12-19T22:13:30Z, stop: 2018-12-19T22:14:21Z)
|> filter(fn: (r) => r["_field"] == "id")
guild = data
|> range(start: 2018-12-19T22:13:30Z, stop: 2018-12-19T22:14:21Z)
|> filter(fn: (r) => r["_field"] == "guild")
experimental.chain(first: id, second: guild)
`,
want: `[digraph {
experimental/array.from0
range1
filter2
// r._field == "id"
generated_yield
experimental/array.from0 -> range1
range1 -> filter2
filter2 -> generated_yield
}
digraph {
experimental/array.from3
range4
filter5
// r._field == "guild"
generated_yield
experimental/array.from3 -> range4
range4 -> filter5
filter5 -> generated_yield
}
]`,
},
{
name: "tableFns",
script: `
ids = data
|> range(start: 2018-12-19T22:13:30Z, stop: 2018-12-19T22:14:21Z)
|> filter(fn: (r) => r["_field"] == "id")
|> sort()
|> tableFind(fn: (key) => true)
|> getColumn(column: "_field")
id = ids[0]
data
|> range(start: 2018-12-19T22:13:30Z, stop: 2018-12-19T22:14:21Z)
|> filter(fn: (r) => r["_field"] == id)
`,
want: `[digraph {
experimental/array.from0
range1
filter2
// r._field == "id"
sort3
generated_yield
experimental/array.from0 -> range1
range1 -> filter2
filter2 -> sort3
sort3 -> generated_yield
}
digraph {
experimental/array.from4
range5
filter6
// r._field == "id"
generated_yield
experimental/array.from4 -> range5
range5 -> filter6
filter6 -> generated_yield
}
]`,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
if q, err := runQuery(prelude + tc.script); err != nil {
t.Error(err)
} else {
got := fmt.Sprintf("%v", q.Statistics().Metadata["flux/query-plan"])
if !cmp.Equal(tc.want, got) {
t.Errorf("unexpected value -want/+got\n%s", cmp.Diff(tc.want, got))
}
}
})
}
}
1 change: 1 addition & 0 deletions libflux/go/libflux/buildinfo.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ var sourceHashes = map[string]string{
"stdlib/universe/join_agg_test.flux": "8ab6a33469a50645e41deaaa1f87c1e4c4180b5d79dd87b03d6b4b1012f8ade9",
"stdlib/universe/join_missing_on_col_test.flux": "98f4ca3999b1379d3a35f836449232e3b664fe312e5485179be57e4cc64e6ef4",
"stdlib/universe/join_test.flux": "bdbbc60918fb9b683d9975816a9a9c59e2d7d1436847696b01414d740beffec3",
"stdlib/universe/join_two_same_sources_test.flux": "0d598a25c72d00ea9b06b9bfbdf8ff790e8b6ffd3aacd267a483438727c0f9c5",
"stdlib/universe/join_use_previous_test.flux": "81fcaa31ff9a9a2a06a35a9275daf73cef0434061e8e81cb8317ccae172f7378",
"stdlib/universe/kama_test.flux": "c84bfe6689f42f8bba75b9e06a4b1cb8e441615266ad0d12201d9fff27e34994",
"stdlib/universe/kama_v2_test.flux": "f9d073089fdfd51c2260167d5e30b05de67ee3e4d91bc5e4261ed1ca722dfbc6",
Expand Down
12 changes: 12 additions & 0 deletions plan/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,18 @@ func CreatePhysicalNode(id NodeID, spec PhysicalProcedureSpec) *PhysicalPlanNode
}
}

const NextPlanNodeIDKey = "NextPlanNodeID"

func CreateUniquePhysicalNode(ctx context.Context, prefix string, spec PhysicalProcedureSpec) *PhysicalPlanNode {
if value := ctx.Value(NextPlanNodeIDKey); value != nil {
nextNodeID := value.(*int)
id := NodeID(fmt.Sprintf("%s%d", prefix, *nextNodeID))
*nextNodeID++
return CreatePhysicalNode(id, spec)
}
return CreatePhysicalNode(NodeID(prefix), spec)
}

// PostPhysicalValidator provides an interface that can be implemented by PhysicalProcedureSpecs for any
// validation checks to be performed post-physical planning.
type PostPhysicalValidator interface {
Expand Down
4 changes: 2 additions & 2 deletions stdlib/influxdata/influxdb/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (p FromRemoteRule) Rewrite(ctx context.Context, node plan.Node) (plan.Node,
config.Token = *spec.Token
}

return plan.CreatePhysicalNode("fromRemote", &FromRemoteProcedureSpec{
return plan.CreateUniquePhysicalNode(ctx, "fromRemote", &FromRemoteProcedureSpec{
Config: config,
}), true, nil
}
Expand Down Expand Up @@ -127,7 +127,7 @@ func (p BucketsRemoteRule) Rewrite(ctx context.Context, node plan.Node) (plan.No
return node, false, nil
}

return plan.CreatePhysicalNode("bucketsRemote", &BucketsRemoteProcedureSpec{
return plan.CreateUniquePhysicalNode(ctx, "bucketsRemote", &BucketsRemoteProcedureSpec{
BucketsProcedureSpec: spec,
}), true, nil
}
Expand Down
77 changes: 77 additions & 0 deletions stdlib/influxdata/influxdb/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,83 @@ import (
"github.com/influxdata/flux/values/valuestest"
)

func TestRuleCreatedNodeUniqueness(t *testing.T) {
nextPlanNodeID := 3
ctx := context.WithValue(context.Background(), plan.NextPlanNodeIDKey, &nextPlanNodeID)
host, token := "localhost", "token"
bucketsProcedureSpec := &influxdb.BucketsProcedureSpec{
Org: &influxdb.NameOrID{Name: "influxdata"},
Host: &host,
Token: &token,
}
joinSpec := &universe.MergeJoinProcedureSpec{
TableNames: []string{"a", "b"},
On: []string{"_value"},
}
fromSpec := &influxdb.FromProcedureSpec{
Bucket: influxdb.NameOrID{Name: "my-bucket"},
Host: &host,
}
fromRemoteSpec := &influxdb.FromRemoteProcedureSpec{
Config: influxdb.Config{Bucket: influxdb.NameOrID{Name: "my-bucket"}, Host: "localhost"},
}
joinEdges := [][2]int{{0, 2}, {1, 2}}
tcs := []plantest.RuleTestCase{
{
Name: "BucketsRemoteJoin",
Context: ctx,
Rules: []plan.Rule{influxdb.BucketsRemoteRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("buckets0", bucketsProcedureSpec),
plan.CreateLogicalNode("buckets1", bucketsProcedureSpec),
plan.CreateLogicalNode("join2", joinSpec),
},
Edges: joinEdges,
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("bucketsRemote3", &influxdb.BucketsRemoteProcedureSpec{
BucketsProcedureSpec: bucketsProcedureSpec,
}),
plan.CreatePhysicalNode("bucketsRemote4", &influxdb.BucketsRemoteProcedureSpec{
BucketsProcedureSpec: bucketsProcedureSpec,
}),
plan.CreatePhysicalNode("join2", joinSpec),
},
Edges: joinEdges,
},
},
{
Name: "FromRemoteTableJoin",
Context: ctx,
Rules: []plan.Rule{influxdb.FromRemoteRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("from0", fromSpec),
plan.CreateLogicalNode("from1", fromSpec),
plan.CreateLogicalNode("join2", joinSpec),
},
Edges: joinEdges,
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("fromRemote5", fromRemoteSpec),
plan.CreatePhysicalNode("fromRemote6", fromRemoteSpec),
plan.CreatePhysicalNode("join2", joinSpec),
},
Edges: joinEdges,
},
},
}

for _, tc := range tcs {
t.Run(tc.Name, func(t *testing.T) {
plantest.PhysicalRuleTestHelper(t, &tc)
})
}
}

func TestFromRemoteRule_WithHost(t *testing.T) {
fromSpec := influxdb.FromProcedureSpec{
Org: &influxdb.NameOrID{Name: "influxdata"},
Expand Down
2 changes: 1 addition & 1 deletion stdlib/influxdata/influxdb/v1/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (p DatabasesRemoteRule) Rewrite(ctx context.Context, node plan.Node) (plan.
return node, false, nil
}

return plan.CreatePhysicalNode("databasesRemote", &DatabasesRemoteProcedureSpec{
return plan.CreateUniquePhysicalNode(ctx, "databasesRemote", &DatabasesRemoteProcedureSpec{
DatabasesProcedureSpec: spec,
}), true, nil
}
Loading

0 comments on commit 958736c

Please sign in to comment.