Skip to content

Commit

Permalink
sql: implement ConstructDistinct in the new factory
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
yuzefovich committed Aug 5, 2020
1 parent 85b5eab commit 229df27
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 14 deletions.
43 changes: 30 additions & 13 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2862,19 +2862,25 @@ func (dsp *DistSQLPlanner) createPlanForZero(
return dsp.createValuesPlan(types, 0 /* numRows */, nil /* rawBytes */)
}

func createDistinctSpec(n *distinctNode, cols []int) *execinfrapb.DistinctSpec {
func createDistinctSpec(
distinctOnColIdxs util.FastIntSet,
columnsInOrder util.FastIntSet,
nullsAreDistinct bool,
errorOnDup string,
cols []int,
) *execinfrapb.DistinctSpec {
var orderedColumns []uint32
if !n.columnsInOrder.Empty() {
orderedColumns = make([]uint32, 0, n.columnsInOrder.Len())
for i, ok := n.columnsInOrder.Next(0); ok; i, ok = n.columnsInOrder.Next(i + 1) {
if !columnsInOrder.Empty() {
orderedColumns = make([]uint32, 0, columnsInOrder.Len())
for i, ok := columnsInOrder.Next(0); ok; i, ok = columnsInOrder.Next(i + 1) {
orderedColumns = append(orderedColumns, uint32(cols[i]))
}
}

var distinctColumns []uint32
if !n.distinctOnColIdxs.Empty() {
if !distinctOnColIdxs.Empty() {
for planCol, streamCol := range cols {
if streamCol != -1 && n.distinctOnColIdxs.Contains(planCol) {
if streamCol != -1 && distinctOnColIdxs.Contains(planCol) {
distinctColumns = append(distinctColumns, uint32(streamCol))
}
}
Expand All @@ -2890,8 +2896,8 @@ func createDistinctSpec(n *distinctNode, cols []int) *execinfrapb.DistinctSpec {
return &execinfrapb.DistinctSpec{
OrderedColumns: orderedColumns,
DistinctColumns: distinctColumns,
NullsAreDistinct: n.nullsAreDistinct,
ErrorOnDup: n.errorOnDup,
NullsAreDistinct: nullsAreDistinct,
ErrorOnDup: errorOnDup,
}
}

Expand All @@ -2902,19 +2908,31 @@ func (dsp *DistSQLPlanner) createPlanForDistinct(
if err != nil {
return nil, err
}
spec := createDistinctSpec(
n.distinctOnColIdxs,
n.columnsInOrder,
n.nullsAreDistinct,
n.errorOnDup,
plan.PlanToStreamColMap,
)
dsp.addDistinctProcessors(plan, spec, n.reqOrdering)
return plan, nil
}

func (dsp *DistSQLPlanner) addDistinctProcessors(
plan *PhysicalPlan, spec *execinfrapb.DistinctSpec, reqOrdering ReqOrdering,
) {
distinctSpec := execinfrapb.ProcessorCoreUnion{
Distinct: createDistinctSpec(n, plan.PlanToStreamColMap),
Distinct: spec,
}
newMergeOrdering := dsp.convertOrdering(n.reqOrdering, plan.PlanToStreamColMap)
defer func() {
plan.SetMergeOrdering(newMergeOrdering)
plan.SetMergeOrdering(dsp.convertOrdering(reqOrdering, plan.PlanToStreamColMap))
}()

// Add distinct processors local to each existing current result processor.
plan.AddNoGroupingStage(distinctSpec, execinfrapb.PostProcessSpec{}, plan.ResultTypes, plan.MergeOrdering)
if !plan.IsLastStageDistributed() {
return plan, nil
return
}

nodes := getNodesOfRouters(plan.ResultRouters, plan.Processors)
Expand All @@ -2923,7 +2941,6 @@ func (dsp *DistSQLPlanner) createPlanForDistinct(
distinctSpec.Distinct.DistinctColumns, plan.ResultTypes,
plan.MergeOrdering, plan.ResultRouters,
)
return plan, nil
}

func (dsp *DistSQLPlanner) createPlanForOrdinality(
Expand Down
13 changes: 12 additions & 1 deletion pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,18 @@ func (e *distSQLSpecExecFactory) ConstructDistinct(
nullsAreDistinct bool,
errorOnDup string,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: distinct")
physPlan, plan := getPhysPlan(input)
spec := createDistinctSpec(
distinctCols,
orderedCols,
nullsAreDistinct,
errorOnDup,
physPlan.PlanToStreamColMap,
)
e.dsp.addDistinctProcessors(physPlan, spec, ReqOrdering(reqOrdering))
// Since addition of distinct processors doesn't change any properties of
// the physical plan, we don't need to update any of those.
return plan, nil
}

func (e *distSQLSpecExecFactory) ConstructSetOp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,12 @@ generate_series
1
2
3

# Check that distinct is supported.
query I rowsort
SELECT DISTINCT v FROM kv
----
1
2
NULL
3

0 comments on commit 229df27

Please sign in to comment.