Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: add support for aggregations and values in the new factory #50560

Merged
merged 8 commits into from
Jul 1, 2020

Conversation

yuzefovich
Copy link
Member

@yuzefovich yuzefovich commented Jun 23, 2020

sql: remove some unnecessary things around aggregations

Release note: None

sql: add support for aggregations in the new factory

This commit implements ConstructGroupBy and ConstructScalarGroupBy
methods in the new factory by populating the specs upfront and reusing
DistSQLPlanner to do the actual planning.

Fixes: #50290.

Release note: None

sql: minor cleanup of planNodeToRowSource

This commit cleans up planNodeToRowSource to be properly closed
(similar to other processors).

Release note: None

sql: implement ConstructValues in the new factory

This commit adds implementation of ConstructValues in the new factory.
In some cases, a valuesNode is the only way to "construct values" - when
the node must be wrapped (see createPhysPlanForValuesNode for more
details). In other cases, a valuesNode is used to construct the values
processor spec. The latter usage is refactored to avoid valuesNode
creation.

This decision also prompts us to add a separate "side" list of
planNodes that are part of the physical plan and need to be closed
when the whole plan is closed. This is done by introducing a utility
wrapper around PhysicalPlan. This approach was chosen after
considering an alternative in which planNodeToRowSource adapter would
be the one closing the planNode it is wrapping because
planNode.Close contract currently prohibits the execution from closing
any of the planNodes, and changined that would be quite invasive at
this point. We might reevaluate this decision later, once we've made
more progress on the distsql spec work.

Addresses: #47473.

Release note: None

logictest: add spec-planning configs to the default ones

We have now implemented noticeable chunk of methods in the new factory,
so I think it makes to run all logic tests with the spec-planning
configs by default.

The only logic test that is currently skipped is interleaved_join
because the new factory doesn't plan interleaved joins yet.

Release note: None

sql: enhance unsupported error message in the new factory

Release note: None

sql: support simple projection on top of planNode in the new factory

The new factory still constructs some of the planNodes (for example,
explain variants), and we should be able to handle simple projections on
top of those. This is handled by reusing the logic from the old factory.
The issue was hidden by the fallback to the old factory since EXPLAIN
statements don't have "SELECT" statement tag.

Release note: None

sql: implement ConstructLimit in the new factory

This commit additionally removes the requirement that limitExpr and
offsetExpr must be distributable in order for the whole plan to be
distributable in the old path because those expressions are evaluated
during the physical planning, locally.

Release note: None

@yuzefovich yuzefovich added the do-not-merge bors won't merge a PR with this label. label Jun 23, 2020
@yuzefovich yuzefovich requested a review from a team June 23, 2020 18:26
@yuzefovich yuzefovich requested a review from a team as a code owner June 23, 2020 18:26
@yuzefovich yuzefovich requested review from dt and removed request for a team and dt June 23, 2020 18:26
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@yuzefovich yuzefovich force-pushed the distsql-agg branch 7 times, most recently from 75d7bb8 to d20fb18 Compare June 25, 2020 18:11
@yuzefovich yuzefovich removed the do-not-merge bors won't merge a PR with this label. label Jun 25, 2020
@yuzefovich
Copy link
Member Author

I've added 3 commits on top of the original 5 in this PR. Please let me know if you think it's worth separating them out and whether my approach with multiple commits (that lean on a smaller side) per a single PR has been helpful during the review.

Copy link
Contributor

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by what the point of commit 3 is and planNode closing in general. Does commit 3 fix anything? If so, shouldn't that be accompanied by the removal of the custom closing logic we discussed in our 1:1? If it doesn't, I think it's worth investigating more what's going on.

Reviewed 4 of 4 files at r1, 9 of 9 files at r2, 1 of 1 files at r3, 4 of 4 files at r4, 7 of 7 files at r5, 1 of 1 files at r6, 3 of 3 files at r7, 4 of 4 files at r8.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)


pkg/sql/distsql_physical_planner.go, line 1288 at r2 (raw file):

		groupColOrdering:     n.groupColOrdering,
		getInputMergeOrdering: func() execinfrapb.Ordering {
			return dsp.convertOrdering(planReqOrdering(n.plan), p.PlanToStreamColMap)

Can you call this information here and store as a field instead of using a function?


pkg/sql/distsql_physical_planner.go, line 1304 at r2 (raw file):

//        COUNT(k + v), we assume a stream of evaluated 'k + v' values.
//      - Expressions that CONTAIN an aggregation function, e.g. 'COUNT(k) + 1'.
//        This is evaluated in the post aggregation evaluator attached after.

Since you're here, nit: s/This/These. Maybe remove or expand on attached after? It seems vague


pkg/sql/distsql_spec_exec_factory.go, line 397 at r2 (raw file):

		// In order to reuse populateAggregationSpec method we instantiate a "fake" aggregate
		// info for the grouping column.
		fakeGroupColAgg := exec.AggInfo{

Is there a way to not have to do construct a "fake" aggregate info? It seems hacky.


pkg/sql/distsql_spec_exec_factory.go, line 471 at r2 (raw file):

		aggregations,
		exec.OutputOrdering{}, /* reqOrdering */
		true,                  /* isScalar */

Do we need this argument or can contructAggregators determine whether this is true based on the len(groupCols). If it can, I'd be in favor of removing this redundant information.


pkg/sql/distsql_spec_exec_factory.go, line 106 at r4 (raw file):

	// creation, but for simplicity we choose to create the planNode for both
	// scenarios.
	v := &valuesNode{

I would prefer we developed a straight values spec planning approach. Does it save that much complexity to use a valuesNode?


pkg/sql/distsql_spec_exec_factory.go, line 167 at r4 (raw file):

	if indexConstraint != nil && indexConstraint.IsContradiction() {
		return e.ConstructValues([][]tree.TypedExpr{}, p.ResultColumns)

This will construct a values with specifiedInQuery == true is that correct?


pkg/sql/exec_factory_util.go, line 234 at r2 (raw file):

	ints := make([]int, len(ordinals))
	for i := range ordinals {
		ints[i] = int(ordinals[i])

You had a PR that converted slices of []ints to []exec.NodeColumnOrdinal, right? Shouldn't we be converting []int slices to NodeColumnOrdinals instead of introducing this helper?


pkg/sql/opt_exec_factory.go, line 436 at r2 (raw file):

		funcs: make([]*aggregateFuncHolder, 0, len(aggregations)),
		// There are no grouping columns with scalar GroupBy, so we pass in
		// nil for first two arguments.

Can you just make a zero-length slice up front? I find it easier to understand.


pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning, line 65 at r2 (raw file):


statement ok
INSERT INTO kv VALUES (4, NULL), (5, 3)

I'm a bit confused, how does this work in the always mode if it isn't a select statement? Can we add a statement that we expect not to be supported by the factory just to ensure the expected path is executed?

Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Responded to some of the questions and will address the rest of the comments tomorrow.

Does commit 3 fix anything?

Philosophically speaking, yes, it makes it so that planNodeToRowSource processor is actually closed like all other processors. However, this doesn't have any changes to the visible behavior (because currently this wrapper is not responsible for actually closing anything).

If so, shouldn't that be accompanied by the removal of the custom closing logic we discussed in our 1:1? If it doesn't, I think it's worth investigating more what's going on.

I did look into this. The problem with closing the wrapped planNode when we're closing the corresponding wrapper processor is that the plan nodes can still be accessed after the query execution. Namely, we have savePlanInfo method that is called right before the whole plan is closed and collects a query fingerprint for statement statistics. We could get that fingerprint earlier (by calling this savePlanInfo after the planning is done but before dispatching to the execution engine).

However, then we encounter a bigger issue - some of the plan nodes are put into the sync.Pools when they are closed (e.g. insertNode, deleteNode). This is according to the current planNode.Close contract:

	// This method must not be called during execution - the planNode
	// tree must remain "live" and readable via walk() even after
	// execution completes.
	//
	// The node must not be used again after this method is called. Some nodes put
	// themselves back into memory pools on Close.
	Close(ctx context.Context)

That said, I did hack around a few things (moving that savePlanInfo call, adding a few if planNode.input != nil { planNode.input.Close(); planNode.input = nil} blocks, etc), and some tests started working. Maybe we should change the planNode contract so that on Close every planNode is required to reset references to other planNodes it has links to and that it is ok to call Close when cleaning up the execution infra. But that would be a more invasive change than this side "planNodesToClose" business. We probably should talk about it tomorrow.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @jordanlewis, and @yuzefovich)


pkg/sql/distsql_spec_exec_factory.go, line 471 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Do we need this argument or can contructAggregators determine whether this is true based on the len(groupCols). If it can, I'd be in favor of removing this redundant information.

Yes, we do need this argument - scalar aggregation with no grouping columns and non-scalar aggregation with no grouping columns return different results (the former always returns exactly one row and always has no grouping columns whereas the latter can have any number of grouping columns and returns no rows if there are no input rows).


pkg/sql/distsql_spec_exec_factory.go, line 106 at r4 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

I would prefer we developed a straight values spec planning approach. Does it save that much complexity to use a valuesNode?

As the comment above says, (at least currently) in some cases we simply cannot construct a processor spec that would be equivalent to a valuesNode. We have this conditional:

	if !n.specifiedInQuery || planCtx.isLocal || planCtx.noEvalSubqueries {
		return dsp.wrapPlan(planCtx, n)
	}

which forces us to wrap a planNode (meaning that we must have a planNode to be wrapped).

Let's examine all 3 parts (my rough guess of why they were added):

  1. specifiedInQuery == false: this appears to be necessary because valuesNodes are wrapped by delayedNodes in some cases which is needed because we're not able to construct certain planNodes until the execution. I'm not very familiar with this code and unsure whether it is possible / how hard it is to refactor this use case. cc @jordanlewis
  2. isLocal==true: this allows us to have an optimization to not serialize the expressions when we're performing local execution. I think this can be easily removed because with the new factory we know whether a particular expression will be distributed or not during the exec-building and will serialize only if necessary.
  3. noEvalSubqueries==true: this is a special case that tells us to not attempt to replace subqueries with their results when serializing expressions because we don't have those results since we don't intend to evaluate subqueries (we're in EXPLAIN queries). This is probably somewhat related to the issue I was mentioning during the standup, and I think in this particular case it should be easy to address as well.

Summarizing these observations, it's the first part of the conditional that I'm worried about. Probably it's worth discussing it tomorrow as well.

Copy link
Contributor

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: commit 3 I think this is big enough that we should tackle this in another PR. Up to you if you want to leave this small fix in this PR or put it in another one.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jordanlewis and @yuzefovich)


pkg/sql/distsql_spec_exec_factory.go, line 106 at r4 (raw file):

Previously, yuzefovich wrote…

As the comment above says, (at least currently) in some cases we simply cannot construct a processor spec that would be equivalent to a valuesNode. We have this conditional:

	if !n.specifiedInQuery || planCtx.isLocal || planCtx.noEvalSubqueries {
		return dsp.wrapPlan(planCtx, n)
	}

which forces us to wrap a planNode (meaning that we must have a planNode to be wrapped).

Let's examine all 3 parts (my rough guess of why they were added):

  1. specifiedInQuery == false: this appears to be necessary because valuesNodes are wrapped by delayedNodes in some cases which is needed because we're not able to construct certain planNodes until the execution. I'm not very familiar with this code and unsure whether it is possible / how hard it is to refactor this use case. cc @jordanlewis
  2. isLocal==true: this allows us to have an optimization to not serialize the expressions when we're performing local execution. I think this can be easily removed because with the new factory we know whether a particular expression will be distributed or not during the exec-building and will serialize only if necessary.
  3. noEvalSubqueries==true: this is a special case that tells us to not attempt to replace subqueries with their results when serializing expressions because we don't have those results since we don't intend to evaluate subqueries (we're in EXPLAIN queries). This is probably somewhat related to the issue I was mentioning during the standup, and I think in this particular case it should be easy to address as well.

Summarizing these observations, it's the first part of the conditional that I'm worried about. Probably it's worth discussing it tomorrow as well.

Yeah, I'm specifically talking about the non-wrapping case. It looks like you're using a valuesNode as planning information. I think it would be best to have a separate info struct like you do with the joiners, and then either create a values node if it needs to be wrapped, or create the spec directly in the non-wrapping case.

Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, definitely that big change should be in a separate PR. I'll keep the present third commit in this one, why not. Also, I'm assuming this is a non-blocker for this PR from your perspective, right? I think it should be ok to merge this PR as is (with planNodesToClose slice) and remove that later, once we figure out the closing story.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jordanlewis and @yuzefovich)

This commit implements `ConstructGroupBy` and `ConstructScalarGroupBy`
methods in the new factory by populating the specs upfront and reusing
DistSQLPlanner to do the actual planning.

Release note: None
This commit cleans up `planNodeToRowSource` to be properly closed
(similar to other processors).

Release note: None
This commit adds implementation of `ConstructValues` in the new factory.
In some cases, a valuesNode is the only way to "construct values" - when
the node must be wrapped (see createPhysPlanForValuesNode for more
details). In other cases, a valuesNode is used to construct the values
processor spec. The latter usage is refactored to avoid valuesNode
creation.

This decision also prompts us to add a separate "side" list of
`planNode`s that are part of the physical plan and need to be closed
when the whole plan is closed. This is done by introducing a utility
wrapper around `PhysicalPlan`. This approach was chosen after
considering an alternative in which `planNodeToRowSource` adapter would
be the one closing the `planNode` it is wrapping because
`planNode.Close` contract currently prohibits the execution from closing
any of the `planNode`s, and changined that would be quite invasive at
this point. We might reevaluate this decision later, once we've made
more progress on the distsql spec work.

Release note: None
We have now implemented noticeable chunk of methods in the new factory,
so I think it makes to run all logic tests with the spec-planning
configs by default.

The only logic test that is currently skipped is interleaved_join
because the new factory doesn't plan interleaved joins yet.

Release note: None
The new factory still constructs some of the `planNode`s (for example,
explain variants), and we should be able to handle simple projections on
top of those. This is handled by reusing the logic from the old factory.
The issue was hidden by the fallback to the old factory since EXPLAIN
statements don't have "SELECT" statement tag.

Release note: None
This commit additionally removes the requirement that `limitExpr` and
`offsetExpr` must be distributable in order for the whole plan to be
distributable in the old path because those expressions are evaluated
during the physical planning, locally.

Release note: None
Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @jordanlewis)


pkg/sql/distsql_physical_planner.go, line 1288 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Can you call this information here and store as a field instead of using a function?

Fixed. I think for some reason I thought that there was a runtime parameter that is not available at the time we call planAggregators, not sure why that was the case.


pkg/sql/distsql_physical_planner.go, line 1304 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Since you're here, nit: s/This/These. Maybe remove or expand on attached after? It seems vague

Done.


pkg/sql/distsql_spec_exec_factory.go, line 397 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Is there a way to not have to do construct a "fake" aggregate info? It seems hacky.

Initially I didn't want to introduce five arguments when we have a struct that contains all of them, but I don't have a strong preference here, refactored.


pkg/sql/distsql_spec_exec_factory.go, line 106 at r4 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Yeah, I'm specifically talking about the non-wrapping case. It looks like you're using a valuesNode as planning information. I think it would be best to have a separate info struct like you do with the joiners, and then either create a values node if it needs to be wrapped, or create the spec directly in the non-wrapping case.

Thanks for pushing with this. I refactored the code a bit, and I think it is a lot cleaner now.


pkg/sql/distsql_spec_exec_factory.go, line 167 at r4 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

This will construct a values with specifiedInQuery == true is that correct?

Yes, added a clarifying comment.


pkg/sql/exec_factory_util.go, line 234 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

You had a PR that converted slices of []ints to []exec.NodeColumnOrdinal, right? Shouldn't we be converting []int slices to NodeColumnOrdinals instead of introducing this helper?

That PR was for joiners in particular and removed unnecessary conversion (i.e. it was exec.NodeColumnOrdinal -> int -> uint32 conversion, and I removed the intermediate step). Here, we have a similar two-step conversion conversion, but the situation is slightly different: in the old path we pass through groupCols via groupNode.groupCols which is []int, also, we use util.FastIntSet, so in planAggregators we actually need both int and uint32.


pkg/sql/opt_exec_factory.go, line 436 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Can you just make a zero-length slice up front? I find it easier to understand.

Done.


pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning, line 65 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

I'm a bit confused, how does this work in the always mode if it isn't a select statement? Can we add a statement that we expect not to be supported by the factory just to ensure the expected path is executed?

Currently, only unsupported SELECT statements return an error with always, all others fallback transparently to the old factory. I added a query that does return an error.

Copy link
Contributor

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 21 of 21 files at r9, 9 of 9 files at r10, 1 of 1 files at r11, 4 of 4 files at r12, 8 of 8 files at r13, 2 of 2 files at r14, 3 of 3 files at r15, 4 of 4 files at r16.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @jordanlewis and @yuzefovich)


pkg/sql/distsql_spec_exec_factory.go, line 106 at r4 (raw file):

Previously, yuzefovich wrote…

Thanks for pushing with this. I refactored the code a bit, and I think it is a lot cleaner now.

Nice, thanks for changing it!


pkg/sql/exec_factory_util.go, line 234 at r2 (raw file):

Previously, yuzefovich wrote…

That PR was for joiners in particular and removed unnecessary conversion (i.e. it was exec.NodeColumnOrdinal -> int -> uint32 conversion, and I removed the intermediate step). Here, we have a similar two-step conversion conversion, but the situation is slightly different: in the old path we pass through groupCols via groupNode.groupCols which is []int, also, we use util.FastIntSet, so in planAggregators we actually need both int and uint32.

Do we want to change that or not worth it?


pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning, line 65 at r2 (raw file):

Previously, yuzefovich wrote…

Currently, only unsupported SELECT statements return an error with always, all others fallback transparently to the old factory. I added a query that does return an error.

I guess it makes sense, so that we can write logic tests easily. But maybe it would be better to explicitly allow some statements to write logic tests easily (e.g. create table, insert) so that we get an error for anything else that is unsupported. I find that falling back if it's not a select statement could lead to confusion (i.e. I thought this was supported, why is it falling back to the other factory?). Not saying we should do this in this PR but maybe in another one.

Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TFTR!

bors r+

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @asubiotto and @jordanlewis)


pkg/sql/exec_factory_util.go, line 234 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Do we want to change that or not worth it?

I think it's not worth it at this point - we can clean this up once the old factory is removed.


pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning, line 65 at r2 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

I guess it makes sense, so that we can write logic tests easily. But maybe it would be better to explicitly allow some statements to write logic tests easily (e.g. create table, insert) so that we get an error for anything else that is unsupported. I find that falling back if it's not a select statement could lead to confusion (i.e. I thought this was supported, why is it falling back to the other factory?). Not saying we should do this in this PR but maybe in another one.

I see your point, but also this behavior will be changing soon - once we support more things I think it'll make sense to always return an error (except for SET statements maybe). At least currently I don't see anyone apart from me using this setting, and I know that anything apart from SELECTs and some EXPLAIN variants is unsupported, so it's easier to have this fallback behavior.

I do agree that the current way we treat always might be confusing, but this is an area of active work-in-progress, and I think it's an acceptable tradeoff if it makes the development easier.

Copy link
Contributor

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @asubiotto, @jordanlewis, and @yuzefovich)


pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning, line 65 at r2 (raw file):

I think it'll make sense to always return an error (except for SET statements maybe)

We'd have to wait for insert and create table to be supported then. What I'm suggesting is a stop-gap until then, but I see your point to just wait until more stuff is supported.

I don't see anyone apart from me using this setting

Anyone interested in the current state of support of this factory would be interested (including me). It's the same reasoning for adding you EXPLAIN (VEC, DEBUG) statement.

@craig
Copy link
Contributor

craig bot commented Jul 1, 2020

Build succeeded

@craig craig bot merged commit e99e7a8 into cockroachdb:master Jul 1, 2020
@yuzefovich yuzefovich deleted the distsql-agg branch July 1, 2020 15:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

sql: implement ConstructGroupBy in the new factory
3 participants