From e16eda8aa2debc92dce4bfeeed0d053c5baed009 Mon Sep 17 00:00:00 2001 From: Rebecca Taft Date: Mon, 15 Jun 2020 20:29:02 -0500 Subject: [PATCH 1/5] opt: move geo functions to a new file in xform This commit does some refactoring by moving several geospatial functions from xform/custom_funcs.go to a new file xform/geo.go. This is in preparation for code changes in the next commit as well as more additions to geo.go. Release note: None --- pkg/sql/opt/xform/custom_funcs.go | 220 --------------------------- pkg/sql/opt/xform/geo.go | 240 ++++++++++++++++++++++++++++++ 2 files changed, 240 insertions(+), 220 deletions(-) create mode 100644 pkg/sql/opt/xform/geo.go diff --git a/pkg/sql/opt/xform/custom_funcs.go b/pkg/sql/opt/xform/custom_funcs.go index 04dce8864402..012e5820e206 100644 --- a/pkg/sql/opt/xform/custom_funcs.go +++ b/pkg/sql/opt/xform/custom_funcs.go @@ -14,7 +14,6 @@ import ( "fmt" "sort" - "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" @@ -1153,193 +1152,6 @@ func (c *CustomFuncs) canMaybeConstrainIndex( return false } -// getSpanExprForGeoIndexFn is a function that returns a SpanExpression that -// constrains the given geo index according to the given constant and -// geospatial relationship. It is implemented by getSpanExprForGeographyIndex -// and getSpanExprForGeometryIndex and used in constrainGeoIndex. -type getSpanExprForGeoIndexFn func( - tree.Datum, geoindex.RelationshipType, cat.Index, -) *invertedexpr.SpanExpression - -// tryConstrainGeoIndex tries to derive an inverted index constraint for the -// given geospatial index from the specified filters. If a constraint is -// derived, it is returned with ok=true. If no constraint can be derived, -// then tryConstrainGeoIndex returns ok=false. -func (c *CustomFuncs) tryConstrainGeoIndex( - filters memo.FiltersExpr, tabID opt.TableID, index cat.Index, -) (invertedConstraint *invertedexpr.SpanExpression, ok bool) { - config := index.GeoConfig() - var getSpanExpr getSpanExprForGeoIndexFn - if geoindex.IsGeographyConfig(config) { - getSpanExpr = c.getSpanExprForGeographyIndex - } else if geoindex.IsGeometryConfig(config) { - getSpanExpr = c.getSpanExprForGeometryIndex - } else { - return nil, false - } - - var invertedExpr invertedexpr.InvertedExpression - for i := range filters { - invertedExprLocal := c.constrainGeoIndex(filters[i].Condition, tabID, index, getSpanExpr) - if invertedExpr == nil { - invertedExpr = invertedExprLocal - } else { - invertedExpr = invertedexpr.And(invertedExpr, invertedExprLocal) - } - } - - if invertedExpr == nil { - return nil, false - } - - spanExpr, ok := invertedExpr.(*invertedexpr.SpanExpression) - if !ok { - return nil, false - } - - return spanExpr, true -} - -// getSpanExprForGeographyIndex gets a SpanExpression that constrains the given -// geography index according to the given constant and geospatial relationship. -func (c *CustomFuncs) getSpanExprForGeographyIndex( - d tree.Datum, relationship geoindex.RelationshipType, index cat.Index, -) *invertedexpr.SpanExpression { - geogIdx := geoindex.NewS2GeographyIndex(*index.GeoConfig().S2Geography) - geog := d.(*tree.DGeography).Geography - var spanExpr *invertedexpr.SpanExpression - - switch relationship { - case geoindex.Covers: - unionKeySpans, err := geogIdx.Covers(c.e.evalCtx.Context, geog) - if err != nil { - panic(err) - } - spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) - - case geoindex.CoveredBy: - rpKeyExpr, err := geogIdx.CoveredBy(c.e.evalCtx.Context, geog) - if err != nil { - panic(err) - } - if spanExpr, err = invertedexpr.GeoRPKeyExprToSpanExpr(rpKeyExpr); err != nil { - panic(err) - } - - case geoindex.Intersects: - unionKeySpans, err := geogIdx.Intersects(c.e.evalCtx.Context, geog) - if err != nil { - panic(err) - } - spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) - - default: - panic(errors.AssertionFailedf("unhandled relationship: %v", relationship)) - } - - return spanExpr -} - -// getSpanExprForGeometryIndex gets a SpanExpression that constrains the given -// geometry index according to the given constant and geospatial relationship. -func (c *CustomFuncs) getSpanExprForGeometryIndex( - d tree.Datum, relationship geoindex.RelationshipType, index cat.Index, -) *invertedexpr.SpanExpression { - geomIdx := geoindex.NewS2GeometryIndex(*index.GeoConfig().S2Geometry) - geom := d.(*tree.DGeometry).Geometry - var spanExpr *invertedexpr.SpanExpression - - switch relationship { - case geoindex.Covers: - unionKeySpans, err := geomIdx.Covers(c.e.evalCtx.Context, geom) - if err != nil { - panic(err) - } - spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) - - case geoindex.CoveredBy: - rpKeyExpr, err := geomIdx.CoveredBy(c.e.evalCtx.Context, geom) - if err != nil { - panic(err) - } - if spanExpr, err = invertedexpr.GeoRPKeyExprToSpanExpr(rpKeyExpr); err != nil { - panic(err) - } - - case geoindex.Intersects: - unionKeySpans, err := geomIdx.Intersects(c.e.evalCtx.Context, geom) - if err != nil { - panic(err) - } - spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) - - default: - panic(errors.AssertionFailedf("unhandled relationship: %v", relationship)) - } - - return spanExpr -} - -// constrainGeoIndex returns an InvertedExpression representing a constraint -// of the given geospatial index. -func (c *CustomFuncs) constrainGeoIndex( - expr opt.ScalarExpr, tabID opt.TableID, index cat.Index, getSpanExpr getSpanExprForGeoIndexFn, -) (_ invertedexpr.InvertedExpression) { - var fn *memo.FunctionExpr - switch t := expr.(type) { - case *memo.AndExpr: - return invertedexpr.And( - c.constrainGeoIndex(t.Left, tabID, index, getSpanExpr), - c.constrainGeoIndex(t.Right, tabID, index, getSpanExpr), - ) - - case *memo.OrExpr: - return invertedexpr.Or( - c.constrainGeoIndex(t.Left, tabID, index, getSpanExpr), - c.constrainGeoIndex(t.Right, tabID, index, getSpanExpr), - ) - - case *memo.FunctionExpr: - fn = t - - default: - return invertedexpr.NonInvertedColExpression{} - } - - if !IsGeoIndexFunction(fn) { - return invertedexpr.NonInvertedColExpression{} - } - - if fn.Args.ChildCount() < 2 { - panic(errors.AssertionFailedf( - "all index-accelerated geospatial functions should have at least two arguments", - )) - } - - // The first argument should be a constant. - if !memo.CanExtractConstDatum(fn.Args.Child(0)) { - return invertedexpr.NonInvertedColExpression{} - } - d := memo.ExtractConstDatum(fn.Args.Child(0)) - - // The second argument should be a variable corresponding to the index - // column. - variable, ok := fn.Args.Child(1).(*memo.VariableExpr) - if !ok { - // TODO(rytaft): Commute the geospatial function in this case. - // Covers <-> CoveredBy - // Intersects <-> Intersects - return invertedexpr.NonInvertedColExpression{} - } - if variable.Col != tabID.ColumnID(index.Column(0).Ordinal) { - // The column in the function does not match the index column. - return invertedexpr.NonInvertedColExpression{} - } - - relationship := geoRelationshipMap[fn.Name] - return getSpanExpr(d, relationship, index) -} - // ---------------------------------------------------------------------- // // Limit Rules @@ -2134,44 +1946,12 @@ func (c *CustomFuncs) GenerateGeoLookupJoins( } } -// geoRelationshipMap contains all the geospatial functions that can be index- -// accelerated. Each function implies a certain type of geospatial relationship, -// which affects how the index is queried as part of a constrained scan or -// geospatial lookup join. geoRelationshipMap maps the function name to its -// corresponding relationship (Covers, CoveredBy, or Intersects). -// -// Note that for all of these functions, a geospatial lookup join or constrained -// index scan may produce false positives. Therefore, the original function must -// be called on the output of the index operation to filter the results. -// TODO(rytaft): add ST_DFullyWithin (geoindex.Covers) and ST_DWithin -// (geoindex.Intersects) once we add support for extending a geometry. -var geoRelationshipMap = map[string]geoindex.RelationshipType{ - "st_covers": geoindex.Covers, - "st_coveredby": geoindex.CoveredBy, - "st_contains": geoindex.Covers, - "st_containsproperly": geoindex.Covers, - "st_crosses": geoindex.Intersects, - "st_equals": geoindex.Intersects, - "st_intersects": geoindex.Intersects, - "st_overlaps": geoindex.Intersects, - "st_touches": geoindex.Intersects, - "st_within": geoindex.CoveredBy, -} - // IsGeoIndexFunction returns true if the given function is a geospatial // function that can be index-accelerated. func (c *CustomFuncs) IsGeoIndexFunction(fn opt.ScalarExpr) bool { return IsGeoIndexFunction(fn) } -// IsGeoIndexFunction returns true if the given function is a geospatial -// function that can be index-accelerated. -func IsGeoIndexFunction(fn opt.ScalarExpr) bool { - function := fn.(*memo.FunctionExpr) - _, ok := geoRelationshipMap[function.Name] - return ok -} - // HasAllVariableArgs returns true if all the arguments to the given function // are variables. func (c *CustomFuncs) HasAllVariableArgs(fn opt.ScalarExpr) bool { diff --git a/pkg/sql/opt/xform/geo.go b/pkg/sql/opt/xform/geo.go new file mode 100644 index 000000000000..743e8936aa23 --- /dev/null +++ b/pkg/sql/opt/xform/geo.go @@ -0,0 +1,240 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package xform + +import ( + "github.com/cockroachdb/cockroach/pkg/geo/geoindex" + "github.com/cockroachdb/cockroach/pkg/sql/opt" + "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" + "github.com/cockroachdb/cockroach/pkg/sql/opt/invertedexpr" + "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/errors" +) + +// geoRelationshipMap contains all the geospatial functions that can be index- +// accelerated. Each function implies a certain type of geospatial relationship, +// which affects how the index is queried as part of a constrained scan or +// geospatial lookup join. geoRelationshipMap maps the function name to its +// corresponding relationship (Covers, CoveredBy, or Intersects). +// +// Note that for all of these functions, a geospatial lookup join or constrained +// index scan may produce false positives. Therefore, the original function must +// be called on the output of the index operation to filter the results. +// TODO(rytaft): add ST_DFullyWithin (geoindex.Covers) and ST_DWithin +// (geoindex.Intersects) once we add support for extending a geometry. +var geoRelationshipMap = map[string]geoindex.RelationshipType{ + "st_covers": geoindex.Covers, + "st_coveredby": geoindex.CoveredBy, + "st_contains": geoindex.Covers, + "st_containsproperly": geoindex.Covers, + "st_crosses": geoindex.Intersects, + "st_equals": geoindex.Intersects, + "st_intersects": geoindex.Intersects, + "st_overlaps": geoindex.Intersects, + "st_touches": geoindex.Intersects, + "st_within": geoindex.CoveredBy, +} + +// IsGeoIndexFunction returns true if the given function is a geospatial +// function that can be index-accelerated. +func IsGeoIndexFunction(fn opt.ScalarExpr) bool { + function := fn.(*memo.FunctionExpr) + _, ok := geoRelationshipMap[function.Name] + return ok +} + +// getSpanExprForGeoIndexFn is a function that returns a SpanExpression that +// constrains the given geo index according to the given constant and +// geospatial relationship. It is implemented by getSpanExprForGeographyIndex +// and getSpanExprForGeometryIndex and used in constrainGeoIndex. +type getSpanExprForGeoIndexFn func( + tree.Datum, geoindex.RelationshipType, cat.Index, +) *invertedexpr.SpanExpression + +// tryConstrainGeoIndex tries to derive an inverted index constraint for the +// given geospatial index from the specified filters. If a constraint is +// derived, it is returned with ok=true. If no constraint can be derived, +// then tryConstrainGeoIndex returns ok=false. +func (c *CustomFuncs) tryConstrainGeoIndex( + filters memo.FiltersExpr, tabID opt.TableID, index cat.Index, +) (invertedConstraint *invertedexpr.SpanExpression, ok bool) { + config := index.GeoConfig() + var getSpanExpr getSpanExprForGeoIndexFn + if geoindex.IsGeographyConfig(config) { + getSpanExpr = c.getSpanExprForGeographyIndex + } else if geoindex.IsGeometryConfig(config) { + getSpanExpr = c.getSpanExprForGeometryIndex + } else { + return nil, false + } + + var invertedExpr invertedexpr.InvertedExpression + for i := range filters { + invertedExprLocal := c.constrainGeoIndex(filters[i].Condition, tabID, index, getSpanExpr) + if invertedExpr == nil { + invertedExpr = invertedExprLocal + } else { + invertedExpr = invertedexpr.And(invertedExpr, invertedExprLocal) + } + } + + if invertedExpr == nil { + return nil, false + } + + spanExpr, ok := invertedExpr.(*invertedexpr.SpanExpression) + if !ok { + return nil, false + } + + return spanExpr, true +} + +// getSpanExprForGeographyIndex gets a SpanExpression that constrains the given +// geography index according to the given constant and geospatial relationship. +func (c *CustomFuncs) getSpanExprForGeographyIndex( + d tree.Datum, relationship geoindex.RelationshipType, index cat.Index, +) *invertedexpr.SpanExpression { + geogIdx := geoindex.NewS2GeographyIndex(*index.GeoConfig().S2Geography) + geog := d.(*tree.DGeography).Geography + var spanExpr *invertedexpr.SpanExpression + + switch relationship { + case geoindex.Covers: + unionKeySpans, err := geogIdx.Covers(c.e.evalCtx.Context, geog) + if err != nil { + panic(err) + } + spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) + + case geoindex.CoveredBy: + rpKeyExpr, err := geogIdx.CoveredBy(c.e.evalCtx.Context, geog) + if err != nil { + panic(err) + } + if spanExpr, err = invertedexpr.GeoRPKeyExprToSpanExpr(rpKeyExpr); err != nil { + panic(err) + } + + case geoindex.Intersects: + unionKeySpans, err := geogIdx.Intersects(c.e.evalCtx.Context, geog) + if err != nil { + panic(err) + } + spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) + + default: + panic(errors.AssertionFailedf("unhandled relationship: %v", relationship)) + } + + return spanExpr +} + +// getSpanExprForGeometryIndex gets a SpanExpression that constrains the given +// geometry index according to the given constant and geospatial relationship. +func (c *CustomFuncs) getSpanExprForGeometryIndex( + d tree.Datum, relationship geoindex.RelationshipType, index cat.Index, +) *invertedexpr.SpanExpression { + geomIdx := geoindex.NewS2GeometryIndex(*index.GeoConfig().S2Geometry) + geom := d.(*tree.DGeometry).Geometry + var spanExpr *invertedexpr.SpanExpression + + switch relationship { + case geoindex.Covers: + unionKeySpans, err := geomIdx.Covers(c.e.evalCtx.Context, geom) + if err != nil { + panic(err) + } + spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) + + case geoindex.CoveredBy: + rpKeyExpr, err := geomIdx.CoveredBy(c.e.evalCtx.Context, geom) + if err != nil { + panic(err) + } + if spanExpr, err = invertedexpr.GeoRPKeyExprToSpanExpr(rpKeyExpr); err != nil { + panic(err) + } + + case geoindex.Intersects: + unionKeySpans, err := geomIdx.Intersects(c.e.evalCtx.Context, geom) + if err != nil { + panic(err) + } + spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) + + default: + panic(errors.AssertionFailedf("unhandled relationship: %v", relationship)) + } + + return spanExpr +} + +// constrainGeoIndex returns an InvertedExpression representing a constraint +// of the given geospatial index. +func (c *CustomFuncs) constrainGeoIndex( + expr opt.ScalarExpr, tabID opt.TableID, index cat.Index, getSpanExpr getSpanExprForGeoIndexFn, +) (_ invertedexpr.InvertedExpression) { + var fn *memo.FunctionExpr + switch t := expr.(type) { + case *memo.AndExpr: + return invertedexpr.And( + c.constrainGeoIndex(t.Left, tabID, index, getSpanExpr), + c.constrainGeoIndex(t.Right, tabID, index, getSpanExpr), + ) + + case *memo.OrExpr: + return invertedexpr.Or( + c.constrainGeoIndex(t.Left, tabID, index, getSpanExpr), + c.constrainGeoIndex(t.Right, tabID, index, getSpanExpr), + ) + + case *memo.FunctionExpr: + fn = t + + default: + return invertedexpr.NonInvertedColExpression{} + } + + if !IsGeoIndexFunction(fn) { + return invertedexpr.NonInvertedColExpression{} + } + + if fn.Args.ChildCount() < 2 { + panic(errors.AssertionFailedf( + "all index-accelerated geospatial functions should have at least two arguments", + )) + } + + // The first argument should be a constant. + if !memo.CanExtractConstDatum(fn.Args.Child(0)) { + return invertedexpr.NonInvertedColExpression{} + } + d := memo.ExtractConstDatum(fn.Args.Child(0)) + + // The second argument should be a variable corresponding to the index + // column. + variable, ok := fn.Args.Child(1).(*memo.VariableExpr) + if !ok { + // TODO(rytaft): Commute the geospatial function in this case. + // Covers <-> CoveredBy + // Intersects <-> Intersects + return invertedexpr.NonInvertedColExpression{} + } + if variable.Col != tabID.ColumnID(index.Column(0).Ordinal) { + // The column in the function does not match the index column. + return invertedexpr.NonInvertedColExpression{} + } + + relationship := geoRelationshipMap[fn.Name] + return getSpanExpr(d, relationship, index) +} From e63a7b20dae57351ad2cb05481f59a812ed44960 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Sat, 20 Jun 2020 14:45:17 -0700 Subject: [PATCH 2/5] sql: minor cleanup of joiner planning `joinNode.mergeJoinOrdering` is now set to non-zero length by the optimizer only when we can use a merge join (meaning that number of equality columns is non-zero and equals the length of the ordering we have). This allows us to slightly simplify the setup up of the merge joiners. Additionally, this commit switching to using `[]exec.NodeColumnOrdinal` instead of `int` for equality columns in `joinPredicate` which allows us to remove one conversion step when planning hash joiners. Also we introduce a small helper that will be reused by the follow-up work. Release note: None --- pkg/sql/distsql_physical_planner.go | 42 ++++++++---------- pkg/sql/distsql_plan_join.go | 69 ++++++++++++++++++----------- pkg/sql/distsql_plan_join_test.go | 11 +++-- pkg/sql/join_predicate.go | 19 +++++--- pkg/sql/opt_exec_factory.go | 16 +++---- 5 files changed, 86 insertions(+), 71 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 3eb5936558e0..fb406f38e130 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -2159,6 +2159,19 @@ func (dsp *DistSQLPlanner) createPlanForJoin( return nil, err } + leftMap, rightMap := leftPlan.PlanToStreamColMap, rightPlan.PlanToStreamColMap + helper := &joinPlanningHelper{ + numLeftCols: n.pred.numLeftCols, + numRightCols: n.pred.numRightCols, + leftPlanToStreamColMap: leftMap, + rightPlanToStreamColMap: rightMap, + } + post, joinToStreamColMap := helper.joinOutColumns(n.joinType, n.columns) + onExpr, err := helper.remapOnExpr(planCtx, n.pred.onCond) + if err != nil { + return nil, err + } + // Nodes where we will run the join processors. var nodes []roachpb.NodeID @@ -2174,8 +2187,8 @@ func (dsp *DistSQLPlanner) createPlanForJoin( // Set up the equality columns. if numEq := len(n.pred.leftEqualityIndices); numEq != 0 { - leftEqCols = eqCols(n.pred.leftEqualityIndices, leftPlan.PlanToStreamColMap) - rightEqCols = eqCols(n.pred.rightEqualityIndices, rightPlan.PlanToStreamColMap) + leftEqCols = eqCols(n.pred.leftEqualityIndices, leftMap) + rightEqCols = eqCols(n.pred.rightEqualityIndices, rightMap) } p := MakePhysicalPlan(dsp.gatewayNodeID) @@ -2186,20 +2199,6 @@ func (dsp *DistSQLPlanner) createPlanForJoin( // Set up the output columns. if numEq := len(n.pred.leftEqualityIndices); numEq != 0 { nodes = findJoinProcessorNodes(leftRouters, rightRouters, p.Processors) - - if len(n.mergeJoinOrdering) > 0 { - // TODO(radu): we currently only use merge joins when we have an ordering on - // all equality columns. We should relax this by either: - // - implementing a hybrid hash/merge processor which implements merge - // logic on the columns we have an ordering on, and within each merge - // group uses a hashmap on the remaining columns - // - or: adding a sort processor to complete the order - if len(n.mergeJoinOrdering) == len(n.pred.leftEqualityIndices) { - // Excellent! We can use the merge joiner. - leftMergeOrd = distsqlOrdering(n.mergeJoinOrdering, leftEqCols) - rightMergeOrd = distsqlOrdering(n.mergeJoinOrdering, rightEqCols) - } - } } else { // Without column equality, we cannot distribute the join. Run a // single processor. @@ -2214,16 +2213,9 @@ func (dsp *DistSQLPlanner) createPlanForJoin( } } - rightMap := rightPlan.PlanToStreamColMap - post, joinToStreamColMap := joinOutColumns(n, leftPlan.PlanToStreamColMap, rightMap) - onExpr, err := remapOnExpr(planCtx, n, leftPlan.PlanToStreamColMap, rightMap) - if err != nil { - return nil, err - } - // Create the Core spec. var core execinfrapb.ProcessorCoreUnion - if leftMergeOrd.Columns == nil { + if len(n.mergeJoinOrdering) == 0 { core.HashJoiner = &execinfrapb.HashJoinerSpec{ LeftEqColumns: leftEqCols, RightEqColumns: rightEqCols, @@ -2233,6 +2225,8 @@ func (dsp *DistSQLPlanner) createPlanForJoin( RightEqColumnsAreKey: n.pred.rightEqKey, } } else { + leftMergeOrd = distsqlOrdering(n.mergeJoinOrdering, leftEqCols) + rightMergeOrd = distsqlOrdering(n.mergeJoinOrdering, rightEqCols) core.MergeJoiner = &execinfrapb.MergeJoinerSpec{ LeftOrdering: leftMergeOrd, RightOrdering: rightMergeOrd, diff --git a/pkg/sql/distsql_plan_join.go b/pkg/sql/distsql_plan_join.go index 948adbf3bd98..8de2816244f3 100644 --- a/pkg/sql/distsql_plan_join.go +++ b/pkg/sql/distsql_plan_join.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" @@ -56,7 +57,7 @@ func (dsp *DistSQLPlanner) tryCreatePlanForInterleavedJoin( var totalLimitHint int64 for i, t := range []struct { scan *scanNode - eqIndices []int + eqIndices []exec.NodeColumnOrdinal }{ { scan: leftScan, @@ -105,8 +106,15 @@ func (dsp *DistSQLPlanner) tryCreatePlanForInterleavedJoin( joinType := n.joinType - post, joinToStreamColMap := joinOutColumns(n, plans[0].PlanToStreamColMap, plans[1].PlanToStreamColMap) - onExpr, err := remapOnExpr(planCtx, n, plans[0].PlanToStreamColMap, plans[1].PlanToStreamColMap) + leftMap, rightMap := plans[0].PlanToStreamColMap, plans[1].PlanToStreamColMap + helper := &joinPlanningHelper{ + numLeftCols: n.pred.numLeftCols, + numRightCols: n.pred.numRightCols, + leftPlanToStreamColMap: leftMap, + rightPlanToStreamColMap: rightMap, + } + post, joinToStreamColMap := helper.joinOutColumns(n.joinType, n.columns) + onExpr, err := helper.remapOnExpr(planCtx, n.pred.onCond) if err != nil { return nil, false, err } @@ -228,10 +236,17 @@ func (dsp *DistSQLPlanner) tryCreatePlanForInterleavedJoin( return plan, true, nil } -func joinOutColumns( - n *joinNode, leftPlanToStreamColMap, rightPlanToStreamColMap []int, +// joinPlanningHelper is a utility struct that helps with the physical planning +// of joins. +type joinPlanningHelper struct { + numLeftCols, numRightCols int + leftPlanToStreamColMap, rightPlanToStreamColMap []int +} + +func (h *joinPlanningHelper) joinOutColumns( + joinType sqlbase.JoinType, columns sqlbase.ResultColumns, ) (post execinfrapb.PostProcessSpec, joinToStreamColMap []int) { - joinToStreamColMap = makePlanToStreamColMap(len(n.columns)) + joinToStreamColMap = makePlanToStreamColMap(len(columns)) post.Projection = true // addOutCol appends to post.OutputColumns and returns the index @@ -245,14 +260,14 @@ func joinOutColumns( // The join columns are in two groups: // - the columns on the left side (numLeftCols) // - the columns on the right side (numRightCols) - for i := 0; i < n.pred.numLeftCols; i++ { - joinToStreamColMap[i] = addOutCol(uint32(leftPlanToStreamColMap[i])) + for i := 0; i < h.numLeftCols; i++ { + joinToStreamColMap[i] = addOutCol(uint32(h.leftPlanToStreamColMap[i])) } - if n.pred.joinType != sqlbase.LeftSemiJoin && n.pred.joinType != sqlbase.LeftAntiJoin { - for i := 0; i < n.pred.numRightCols; i++ { - joinToStreamColMap[n.pred.numLeftCols+i] = addOutCol( - uint32(n.pred.numLeftCols + rightPlanToStreamColMap[i]), + if joinType != sqlbase.LeftSemiJoin && joinType != sqlbase.LeftAntiJoin { + for i := 0; i < h.numRightCols; i++ { + joinToStreamColMap[h.numLeftCols+i] = addOutCol( + uint32(h.numLeftCols + h.rightPlanToStreamColMap[i]), ) } } @@ -263,29 +278,29 @@ func joinOutColumns( // remapOnExpr remaps ordinal references in the on condition (which refer to the // join columns as described above) to values that make sense in the joiner (0 // to N-1 for the left input columns, N to N+M-1 for the right input columns). -func remapOnExpr( - planCtx *PlanningCtx, n *joinNode, leftPlanToStreamColMap, rightPlanToStreamColMap []int, +func (h *joinPlanningHelper) remapOnExpr( + planCtx *PlanningCtx, onCond tree.TypedExpr, ) (execinfrapb.Expression, error) { - if n.pred.onCond == nil { + if onCond == nil { return execinfrapb.Expression{}, nil } - joinColMap := make([]int, n.pred.numLeftCols+n.pred.numRightCols) + joinColMap := make([]int, h.numLeftCols+h.numRightCols) idx := 0 leftCols := 0 - for i := 0; i < n.pred.numLeftCols; i++ { - joinColMap[idx] = leftPlanToStreamColMap[i] - if leftPlanToStreamColMap[i] != -1 { + for i := 0; i < h.numLeftCols; i++ { + joinColMap[idx] = h.leftPlanToStreamColMap[i] + if h.leftPlanToStreamColMap[i] != -1 { leftCols++ } idx++ } - for i := 0; i < n.pred.numRightCols; i++ { - joinColMap[idx] = leftCols + rightPlanToStreamColMap[i] + for i := 0; i < h.numRightCols; i++ { + joinColMap[idx] = leftCols + h.rightPlanToStreamColMap[i] idx++ } - return physicalplan.MakeExpression(n.pred.onCond, planCtx, joinColMap) + return physicalplan.MakeExpression(onCond, planCtx, joinColMap) } // eqCols produces a slice of ordinal references for the plan columns specified @@ -293,7 +308,7 @@ func remapOnExpr( // That is: eqIndices contains a slice of plan column indexes and planToColMap // maps the plan column indexes to the ordinal references (index of the // intermediate row produced). -func eqCols(eqIndices, planToColMap []int) []uint32 { +func eqCols(eqIndices []exec.NodeColumnOrdinal, planToColMap []int) []uint32 { eqCols := make([]uint32, len(eqIndices)) for i, planCol := range eqIndices { eqCols[i] = uint32(planToColMap[planCol]) @@ -343,8 +358,8 @@ func useInterleavedJoin(n *joinNode) bool { return false } - var ancestorEqIndices []int - var descendantEqIndices []int + var ancestorEqIndices []exec.NodeColumnOrdinal + var descendantEqIndices []exec.NodeColumnOrdinal // We are guaranteed that both of the sources are scan nodes from // n.interleavedNodes(). if ancestor == n.left.plan.(*scanNode) { @@ -377,8 +392,8 @@ func useInterleavedJoin(n *joinNode) bool { // the index in scanNode.resultColumns. To convert the colID // from the index descriptor, we can use the map provided by // colIdxMap. - if ancestorEqIndices[info.ColIdx] != ancestor.colIdxMap[colID] || - descendantEqIndices[info.ColIdx] != descendant.colIdxMap[colID] { + if int(ancestorEqIndices[info.ColIdx]) != ancestor.colIdxMap[colID] || + int(descendantEqIndices[info.ColIdx]) != descendant.colIdxMap[colID] { // The column in the ordering does not correspond to // the column in the interleave prefix. // We should not try to do an interleaved join. diff --git a/pkg/sql/distsql_plan_join_test.go b/pkg/sql/distsql_plan_join_test.go index 34bc1e88ab89..68d0cce00824 100644 --- a/pkg/sql/distsql_plan_join_test.go +++ b/pkg/sql/distsql_plan_join_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -30,12 +31,14 @@ import ( "github.com/cockroachdb/errors" ) -func setTestEqColForSide(colName string, side *scanNode, equalityIndices *[]int) error { +func setTestEqColForSide( + colName string, side *scanNode, equalityIndices *[]exec.NodeColumnOrdinal, +) error { colFound := false for i, leftCol := range side.cols { if colName == leftCol.Name { - *equalityIndices = append(*equalityIndices, i) + *equalityIndices = append(*equalityIndices, exec.NodeColumnOrdinal(i)) colFound = true break } @@ -774,7 +777,7 @@ func TestAlignInterleavedSpans(t *testing.T) { // The returned ordering can be partial, i.e. only contains a subset of the // equality columns. func computeMergeJoinOrdering( - a, b sqlbase.ColumnOrdering, colA, colB []int, + a, b sqlbase.ColumnOrdering, colA, colB []exec.NodeColumnOrdinal, ) sqlbase.ColumnOrdering { if len(colA) != len(colB) { panic(fmt.Sprintf("invalid column lists %v; %v", colA, colB)) @@ -786,7 +789,7 @@ func computeMergeJoinOrdering( break } for j := range colA { - if colA[j] == a[i].ColIdx && colB[j] == b[i].ColIdx { + if int(colA[j]) == a[i].ColIdx && int(colB[j]) == b[i].ColIdx { result = append(result, sqlbase.ColumnOrderInfo{ ColIdx: j, Direction: a[i].Direction, diff --git a/pkg/sql/join_predicate.go b/pkg/sql/join_predicate.go index b71d969ffc0c..277406adb53a 100644 --- a/pkg/sql/join_predicate.go +++ b/pkg/sql/join_predicate.go @@ -11,6 +11,7 @@ package sql import ( + "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -36,8 +37,8 @@ type joinPredicate struct { // on the left and right input row arrays, respectively. // Only columns with the same left and right value types can be equality // columns. - leftEqualityIndices []int - rightEqualityIndices []int + leftEqualityIndices []exec.NodeColumnOrdinal + rightEqualityIndices []exec.NodeColumnOrdinal // The list of names for the columns listed in leftEqualityIndices. // Used mainly for pretty-printing. @@ -67,9 +68,10 @@ type joinPredicate struct { rightEqKey bool } -// makePredicate constructs a joinPredicate object for joins. The equality -// columns / on condition must be initialized separately. -func makePredicate(joinType sqlbase.JoinType, left, right sqlbase.ResultColumns) *joinPredicate { +// getJoinResultColumns returns the result columns of a join. +func getJoinResultColumns( + joinType sqlbase.JoinType, left, right sqlbase.ResultColumns, +) sqlbase.ResultColumns { // For anti and semi joins, the right columns are omitted from the output (but // they must be available internally for the ON condition evaluation). omitRightColumns := joinType == sqlbase.LeftSemiJoin || joinType == sqlbase.LeftAntiJoin @@ -83,14 +85,19 @@ func makePredicate(joinType sqlbase.JoinType, left, right sqlbase.ResultColumns) if !omitRightColumns { columns = append(columns, right...) } + return columns +} +// makePredicate constructs a joinPredicate object for joins. The equality +// columns / on condition must be initialized separately. +func makePredicate(joinType sqlbase.JoinType, left, right sqlbase.ResultColumns) *joinPredicate { pred := &joinPredicate{ joinType: joinType, numLeftCols: len(left), numRightCols: len(right), leftCols: left, rightCols: right, - cols: columns, + cols: getJoinResultColumns(joinType, left, right), } // We must initialize the indexed var helper in all cases, even when // there is no on condition, so that getNeededColumns() does not get diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 82797b947367..9aaaa4dd4490 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -355,17 +355,13 @@ func (ef *execFactory) ConstructHashJoin( pred := makePredicate(joinType, leftSrc.columns, rightSrc.columns) numEqCols := len(leftEqCols) - // Save some allocations by putting both sides in the same slice. - intBuf := make([]int, 2*numEqCols) - pred.leftEqualityIndices = intBuf[:numEqCols:numEqCols] - pred.rightEqualityIndices = intBuf[numEqCols:] + pred.leftEqualityIndices = leftEqCols + pred.rightEqualityIndices = rightEqCols nameBuf := make(tree.NameList, 2*numEqCols) pred.leftColNames = nameBuf[:numEqCols:numEqCols] pred.rightColNames = nameBuf[numEqCols:] for i := range leftEqCols { - pred.leftEqualityIndices[i] = int(leftEqCols[i]) - pred.rightEqualityIndices[i] = int(rightEqCols[i]) pred.leftColNames[i] = tree.Name(leftSrc.columns[leftEqCols[i]].Name) pred.rightColNames[i] = tree.Name(rightSrc.columns[rightEqCols[i]].Name) } @@ -412,14 +408,14 @@ func (ef *execFactory) ConstructMergeJoin( if n == 0 || len(rightOrdering) != n { return nil, errors.Errorf("orderings from the left and right side must be the same non-zero length") } - pred.leftEqualityIndices = make([]int, n) - pred.rightEqualityIndices = make([]int, n) + pred.leftEqualityIndices = make([]exec.NodeColumnOrdinal, n) + pred.rightEqualityIndices = make([]exec.NodeColumnOrdinal, n) pred.leftColNames = make(tree.NameList, n) pred.rightColNames = make(tree.NameList, n) for i := 0; i < n; i++ { leftColIdx, rightColIdx := leftOrdering[i].ColIdx, rightOrdering[i].ColIdx - pred.leftEqualityIndices[i] = leftColIdx - pred.rightEqualityIndices[i] = rightColIdx + pred.leftEqualityIndices[i] = exec.NodeColumnOrdinal(leftColIdx) + pred.rightEqualityIndices[i] = exec.NodeColumnOrdinal(rightColIdx) pred.leftColNames[i] = tree.Name(leftSrc.columns[leftColIdx].Name) pred.rightColNames[i] = tree.Name(rightSrc.columns[rightColIdx].Name) } From d55f5eb675dbb2a374537f0d628bd2e16b36df2c Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 24 Jun 2020 13:57:40 -0700 Subject: [PATCH 3/5] sql: add support for hash and merge joins in the new factory This commit adds implementation of `ConstructHashJoin` and `ConstructMergeJoin` in the new factory by mostly refactoring and reusing already existing code in the physical planner. Notably, interleaved joins are not supported yet. Release note: None --- pkg/sql/distsql_physical_planner.go | 149 ++++++++---------- pkg/sql/distsql_plan_join.go | 55 +++++++ pkg/sql/distsql_spec_exec_factory.go | 82 +++++++++- pkg/sql/exec_factory_util.go | 32 ++++ pkg/sql/join_predicate.go | 1 - .../testdata/logic_test/distsql_join | 5 - .../logic_test/experimental_distsql_planning | 12 ++ .../experimental_distsql_planning_5node | 20 +++ .../logictest/testdata/logic_test/hash_join | 2 + .../testdata/logic_test/hash_join_dist | 5 + .../logictest/testdata/logic_test/merge_join | 2 + pkg/sql/opt_exec_factory.go | 23 +-- pkg/sql/physicalplan/physical_plan.go | 17 +- 13 files changed, 293 insertions(+), 112 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index fb406f38e130..d27bf4e8b8f1 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -2131,25 +2131,6 @@ func (dsp *DistSQLPlanner) createPlanForJoin( } } - // Outline of the planning process for joins: - // - // - We create PhysicalPlans for the left and right side. Each plan has a set - // of output routers with result that will serve as input for the join. - // - // - We merge the list of processors and streams into a single plan. We keep - // track of the output routers for the left and right results. - // - // - We add a set of joiner processors (say K of them). - // - // - We configure the left and right output routers to send results to - // these joiners, distributing rows by hash (on the join equality columns). - // We are thus breaking up all input rows into K buckets such that rows - // that match on the equality columns end up in the same bucket. If there - // are no equality columns, we cannot distribute rows so we use a single - // joiner. - // - // - The routers of the joiner processors are the result routers of the plan. - leftPlan, err := dsp.createPhysPlanForPlanNode(planCtx, n.left.plan) if err != nil { return nil, err @@ -2172,32 +2153,71 @@ func (dsp *DistSQLPlanner) createPlanForJoin( return nil, err } - // Nodes where we will run the join processors. - var nodes []roachpb.NodeID - // We initialize these properties of the joiner. They will then be used to // fill in the processor spec. See descriptions for HashJoinerSpec. - var leftEqCols, rightEqCols []uint32 - var leftMergeOrd, rightMergeOrd execinfrapb.Ordering - joinType := n.joinType + // Set up the equality columns and the merge ordering. + leftEqCols := eqCols(n.pred.leftEqualityIndices, leftMap) + rightEqCols := eqCols(n.pred.rightEqualityIndices, rightMap) + leftMergeOrd := distsqlOrdering(n.mergeJoinOrdering, leftEqCols) + rightMergeOrd := distsqlOrdering(n.mergeJoinOrdering, rightEqCols) - // Figure out the left and right types. - leftTypes := leftPlan.ResultTypes - rightTypes := rightPlan.ResultTypes - - // Set up the equality columns. - if numEq := len(n.pred.leftEqualityIndices); numEq != 0 { - leftEqCols = eqCols(n.pred.leftEqualityIndices, leftMap) - rightEqCols = eqCols(n.pred.rightEqualityIndices, rightMap) + joinResultTypes, err := getTypesForPlanResult(n, joinToStreamColMap) + if err != nil { + return nil, err } + return dsp.planJoiners(&joinPlanningInfo{ + leftPlan: leftPlan, + rightPlan: rightPlan, + joinType: n.joinType, + joinResultTypes: joinResultTypes, + onExpr: onExpr, + post: post, + joinToStreamColMap: joinToStreamColMap, + leftEqCols: leftEqCols, + rightEqCols: rightEqCols, + leftEqColsAreKey: n.pred.leftEqKey, + rightEqColsAreKey: n.pred.rightEqKey, + leftMergeOrd: leftMergeOrd, + rightMergeOrd: rightMergeOrd, + // In the old execFactory we can only have either local or fully + // distributed plans, so checking the last stage is sufficient to get + // the distribution of the whole plans. + leftPlanDistribution: leftPlan.GetLastStageDistribution(), + rightPlanDistribution: rightPlan.GetLastStageDistribution(), + }, n.reqOrdering), nil +} + +func (dsp *DistSQLPlanner) planJoiners( + info *joinPlanningInfo, reqOrdering ReqOrdering, +) *PhysicalPlan { + // Outline of the planning process for joins when given PhysicalPlans for + // the left and right side (with each plan having a set of output routers + // with result that will serve as input for the join). + // + // - We merge the list of processors and streams into a single plan. We keep + // track of the output routers for the left and right results. + // + // - We add a set of joiner processors (say K of them). + // + // - We configure the left and right output routers to send results to + // these joiners, distributing rows by hash (on the join equality columns). + // We are thus breaking up all input rows into K buckets such that rows + // that match on the equality columns end up in the same bucket. If there + // are no equality columns, we cannot distribute rows so we use a single + // joiner. + // + // - The routers of the joiner processors are the result routers of the plan. + p := MakePhysicalPlan(dsp.gatewayNodeID) leftRouters, rightRouters := physicalplan.MergePlans( - &p.PhysicalPlan, &leftPlan.PhysicalPlan, &rightPlan.PhysicalPlan, + &p.PhysicalPlan, &info.leftPlan.PhysicalPlan, &info.rightPlan.PhysicalPlan, + info.leftPlanDistribution, info.rightPlanDistribution, ) - // Set up the output columns. - if numEq := len(n.pred.leftEqualityIndices); numEq != 0 { + // Nodes where we will run the join processors. + var nodes []roachpb.NodeID + if numEq := len(info.leftEqCols); numEq != 0 { nodes = findJoinProcessorNodes(leftRouters, rightRouters, p.Processors) } else { // Without column equality, we cannot distribute the join. Run a @@ -2213,48 +2233,21 @@ func (dsp *DistSQLPlanner) createPlanForJoin( } } - // Create the Core spec. - var core execinfrapb.ProcessorCoreUnion - if len(n.mergeJoinOrdering) == 0 { - core.HashJoiner = &execinfrapb.HashJoinerSpec{ - LeftEqColumns: leftEqCols, - RightEqColumns: rightEqCols, - OnExpr: onExpr, - Type: joinType, - LeftEqColumnsAreKey: n.pred.leftEqKey, - RightEqColumnsAreKey: n.pred.rightEqKey, - } - } else { - leftMergeOrd = distsqlOrdering(n.mergeJoinOrdering, leftEqCols) - rightMergeOrd = distsqlOrdering(n.mergeJoinOrdering, rightEqCols) - core.MergeJoiner = &execinfrapb.MergeJoinerSpec{ - LeftOrdering: leftMergeOrd, - RightOrdering: rightMergeOrd, - OnExpr: onExpr, - Type: joinType, - LeftEqColumnsAreKey: n.pred.leftEqKey, - RightEqColumnsAreKey: n.pred.rightEqKey, - } - } - p.AddJoinStage( - nodes, core, post, leftEqCols, rightEqCols, leftTypes, rightTypes, - leftMergeOrd, rightMergeOrd, leftRouters, rightRouters, + nodes, info.makeCoreSpec(), info.post, + info.leftEqCols, info.rightEqCols, + info.leftPlan.ResultTypes, info.rightPlan.ResultTypes, + info.leftMergeOrd, info.rightMergeOrd, + leftRouters, rightRouters, ) - p.PlanToStreamColMap = joinToStreamColMap - p.ResultTypes, err = getTypesForPlanResult(n, joinToStreamColMap) - if err != nil { - return nil, err - } + p.PlanToStreamColMap = info.joinToStreamColMap + p.ResultTypes = info.joinResultTypes // Joiners may guarantee an ordering to outputs, so we ensure that // ordering is propagated through the input synchronizer of the next stage. - // We can propagate the ordering from either side, we use the left side here. - // Note that n.props only has a non-empty ordering for inner joins, where it - // uses the mergeJoinOrdering. - p.SetMergeOrdering(dsp.convertOrdering(n.reqOrdering, p.PlanToStreamColMap)) - return &p, nil + p.SetMergeOrdering(dsp.convertOrdering(reqOrdering, p.PlanToStreamColMap)) + return &p } func (dsp *DistSQLPlanner) createPhysPlan( @@ -2938,6 +2931,11 @@ func (dsp *DistSQLPlanner) createPlanForSetOp( // Merge processors, streams, result routers, and stage counter. leftRouters, rightRouters := physicalplan.MergePlans( &p.PhysicalPlan, &leftPlan.PhysicalPlan, &rightPlan.PhysicalPlan, + // In the old execFactory we can only have either local or fully + // distributed plans, so checking the last stage is sufficient to get + // the distribution of the whole plans. + leftPlan.GetLastStageDistribution(), + rightPlan.GetLastStageDistribution(), ) if n.unionType == tree.UnionOp { @@ -2992,13 +2990,6 @@ func (dsp *DistSQLPlanner) createPlanForSetOp( copy(post.OutputColumns, streamCols) // Create the Core spec. - // - // TODO(radu): we currently only use merge joins when we have an ordering on - // all equality columns. We should relax this by either: - // - implementing a hybrid hash/merge processor which implements merge - // logic on the columns we have an ordering on, and within each merge - // group uses a hashmap on the remaining columns - // - or: adding a sort processor to complete the order var core execinfrapb.ProcessorCoreUnion if len(mergeOrdering.Columns) < len(streamCols) { core.HashJoiner = &execinfrapb.HashJoinerSpec{ diff --git a/pkg/sql/distsql_plan_join.go b/pkg/sql/distsql_plan_join.go index 8de2816244f3..da3cf712bae9 100644 --- a/pkg/sql/distsql_plan_join.go +++ b/pkg/sql/distsql_plan_join.go @@ -23,10 +23,65 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/errors" ) +// joinPlanningInfo is a utility struct that contains the information needed to +// perform the physical planning of hash and merge joins. +type joinPlanningInfo struct { + leftPlan, rightPlan *PhysicalPlan + joinType sqlbase.JoinType + joinResultTypes []*types.T + onExpr execinfrapb.Expression + post execinfrapb.PostProcessSpec + joinToStreamColMap []int + // leftEqCols and rightEqCols are the indices of equality columns. These + // are only used when planning a hash join. + leftEqCols, rightEqCols []uint32 + leftEqColsAreKey, rightEqColsAreKey bool + // leftMergeOrd and rightMergeOrd are the orderings on both inputs to a + // merge join. They must be of the same length, and if the length is 0, + // then a hash join is planned. + leftMergeOrd, rightMergeOrd execinfrapb.Ordering + leftPlanDistribution, rightPlanDistribution physicalplan.PlanDistribution +} + +// makeCoreSpec creates a processor core for hash and merge joins based on the +// join planning information. Merge ordering fields of info determine which +// kind of join is being planned. +func (info *joinPlanningInfo) makeCoreSpec() execinfrapb.ProcessorCoreUnion { + var core execinfrapb.ProcessorCoreUnion + if len(info.leftMergeOrd.Columns) != len(info.rightMergeOrd.Columns) { + panic(fmt.Sprintf( + "unexpectedly different merge join ordering lengths: left %d, right %d", + len(info.leftMergeOrd.Columns), len(info.rightMergeOrd.Columns), + )) + } + if len(info.leftMergeOrd.Columns) == 0 { + // There is no required ordering on the columns, so we plan a hash join. + core.HashJoiner = &execinfrapb.HashJoinerSpec{ + LeftEqColumns: info.leftEqCols, + RightEqColumns: info.rightEqCols, + OnExpr: info.onExpr, + Type: info.joinType, + LeftEqColumnsAreKey: info.leftEqColsAreKey, + RightEqColumnsAreKey: info.rightEqColsAreKey, + } + } else { + core.MergeJoiner = &execinfrapb.MergeJoinerSpec{ + LeftOrdering: info.leftMergeOrd, + RightOrdering: info.rightMergeOrd, + OnExpr: info.onExpr, + Type: info.joinType, + LeftEqColumnsAreKey: info.leftEqColsAreKey, + RightEqColumnsAreKey: info.rightEqColsAreKey, + } + } + return core +} + var planInterleavedJoins = settings.RegisterBoolSetting( "sql.distsql.interleaved_joins.enabled", "if set we plan interleaved table joins instead of merge joins when possible", diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 0677a69789af..c115edc8c70a 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -244,8 +244,7 @@ func (e *distSQLSpecExecFactory) checkExprsAndMaybeMergeLastStage( func (e *distSQLSpecExecFactory) ConstructFilter( n exec.Node, filter tree.TypedExpr, reqOrdering exec.OutputOrdering, ) (exec.Node, error) { - plan := n.(planMaybePhysical) - physPlan := plan.physPlan + physPlan, plan := getPhysPlan(n) recommendation := e.checkExprsAndMaybeMergeLastStage(physPlan, []tree.TypedExpr{filter}) // AddFilter will attempt to push the filter into the last stage of // processors. @@ -259,8 +258,7 @@ func (e *distSQLSpecExecFactory) ConstructFilter( func (e *distSQLSpecExecFactory) ConstructSimpleProject( n exec.Node, cols []exec.NodeColumnOrdinal, colNames []string, reqOrdering exec.OutputOrdering, ) (exec.Node, error) { - plan := n.(planMaybePhysical) - physPlan := plan.physPlan + physPlan, plan := getPhysPlan(n) projection := make([]uint32, len(cols)) for i := range cols { projection[i] = uint32(cols[i]) @@ -285,8 +283,7 @@ func (e *distSQLSpecExecFactory) ConstructRender( exprs tree.TypedExprs, reqOrdering exec.OutputOrdering, ) (exec.Node, error) { - plan := n.(planMaybePhysical) - physPlan := plan.physPlan + physPlan, plan := getPhysPlan(n) recommendation := e.checkExprsAndMaybeMergeLastStage(physPlan, exprs) if err := physPlan.AddRendering( exprs, e.getPlanCtx(recommendation), physPlan.PlanToStreamColMap, getTypesFromResultColumns(columns), @@ -309,6 +306,9 @@ func (e *distSQLSpecExecFactory) ConstructApplyJoin( return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning") } +// TODO(yuzefovich): move the decision whether to use an interleaved join from +// the physical planner into the execbuilder. + func (e *distSQLSpecExecFactory) ConstructHashJoin( joinType sqlbase.JoinType, left, right exec.Node, @@ -316,7 +316,11 @@ func (e *distSQLSpecExecFactory) ConstructHashJoin( leftEqColsAreKey, rightEqColsAreKey bool, extraOnCond tree.TypedExpr, ) (exec.Node, error) { - return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning") + return e.constructHashOrMergeJoin( + joinType, left, right, extraOnCond, leftEqCols, rightEqCols, + leftEqColsAreKey, rightEqColsAreKey, + ReqOrdering{} /* mergeJoinOrdering */, exec.OutputOrdering{}, /* reqOrdering */ + ) } func (e *distSQLSpecExecFactory) ConstructMergeJoin( @@ -327,7 +331,14 @@ func (e *distSQLSpecExecFactory) ConstructMergeJoin( reqOrdering exec.OutputOrdering, leftEqColsAreKey, rightEqColsAreKey bool, ) (exec.Node, error) { - return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning") + leftEqCols, rightEqCols, mergeJoinOrdering, err := getEqualityIndicesAndMergeJoinOrdering(leftOrdering, rightOrdering) + if err != nil { + return nil, err + } + return e.constructHashOrMergeJoin( + joinType, left, right, onCond, leftEqCols, rightEqCols, + leftEqColsAreKey, rightEqColsAreKey, mergeJoinOrdering, reqOrdering, + ) } func (e *distSQLSpecExecFactory) ConstructGroupBy( @@ -673,3 +684,58 @@ func (e *distSQLSpecExecFactory) ConstructExport( ) (exec.Node, error) { return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning") } + +func getPhysPlan(n exec.Node) (*PhysicalPlan, planMaybePhysical) { + plan := n.(planMaybePhysical) + return plan.physPlan, plan +} + +func (e *distSQLSpecExecFactory) constructHashOrMergeJoin( + joinType sqlbase.JoinType, + left, right exec.Node, + onCond tree.TypedExpr, + leftEqCols, rightEqCols []exec.NodeColumnOrdinal, + leftEqColsAreKey, rightEqColsAreKey bool, + mergeJoinOrdering sqlbase.ColumnOrdering, + reqOrdering exec.OutputOrdering, +) (exec.Node, error) { + leftPlan, _ := getPhysPlan(left) + rightPlan, _ := getPhysPlan(right) + resultColumns := getJoinResultColumns(joinType, leftPlan.ResultColumns, rightPlan.ResultColumns) + leftMap, rightMap := leftPlan.PlanToStreamColMap, rightPlan.PlanToStreamColMap + helper := &joinPlanningHelper{ + numLeftCols: len(leftPlan.ResultColumns), + numRightCols: len(rightPlan.ResultColumns), + leftPlanToStreamColMap: leftMap, + rightPlanToStreamColMap: rightMap, + } + post, joinToStreamColMap := helper.joinOutColumns(joinType, resultColumns) + // We always try to distribute the join, but planJoiners() itself might + // decide not to. + onExpr, err := helper.remapOnExpr(e.getPlanCtx(shouldDistribute), onCond) + if err != nil { + return nil, err + } + + leftEqColsRemapped := eqCols(leftEqCols, leftMap) + rightEqColsRemapped := eqCols(rightEqCols, rightMap) + p := e.dsp.planJoiners(&joinPlanningInfo{ + leftPlan: leftPlan, + rightPlan: rightPlan, + joinType: joinType, + joinResultTypes: getTypesFromResultColumns(resultColumns), + onExpr: onExpr, + post: post, + joinToStreamColMap: joinToStreamColMap, + leftEqCols: leftEqColsRemapped, + rightEqCols: rightEqColsRemapped, + leftEqColsAreKey: leftEqColsAreKey, + rightEqColsAreKey: rightEqColsAreKey, + leftMergeOrd: distsqlOrdering(mergeJoinOrdering, leftEqColsRemapped), + rightMergeOrd: distsqlOrdering(mergeJoinOrdering, rightEqColsRemapped), + leftPlanDistribution: leftPlan.Distribution, + rightPlanDistribution: rightPlan.Distribution, + }, ReqOrdering(reqOrdering)) + p.ResultColumns = resultColumns + return planMaybePhysical{physPlan: p}, nil +} diff --git a/pkg/sql/exec_factory_util.go b/pkg/sql/exec_factory_util.go index b7f24c0cafc5..044564da19b3 100644 --- a/pkg/sql/exec_factory_util.go +++ b/pkg/sql/exec_factory_util.go @@ -177,3 +177,35 @@ func getResultColumnsForSimpleProject( } return resultCols } + +func getEqualityIndicesAndMergeJoinOrdering( + leftOrdering, rightOrdering sqlbase.ColumnOrdering, +) ( + leftEqualityIndices, rightEqualityIndices []exec.NodeColumnOrdinal, + mergeJoinOrdering sqlbase.ColumnOrdering, + err error, +) { + n := len(leftOrdering) + if n == 0 || len(rightOrdering) != n { + return nil, nil, nil, errors.Errorf( + "orderings from the left and right side must be the same non-zero length", + ) + } + leftEqualityIndices = make([]exec.NodeColumnOrdinal, n) + rightEqualityIndices = make([]exec.NodeColumnOrdinal, n) + for i := 0; i < n; i++ { + leftColIdx, rightColIdx := leftOrdering[i].ColIdx, rightOrdering[i].ColIdx + leftEqualityIndices[i] = exec.NodeColumnOrdinal(leftColIdx) + rightEqualityIndices[i] = exec.NodeColumnOrdinal(rightColIdx) + } + + mergeJoinOrdering = make(sqlbase.ColumnOrdering, n) + for i := 0; i < n; i++ { + // The mergeJoinOrdering "columns" are equality column indices. Because of + // the way we constructed the equality indices, the ordering will always be + // 0,1,2,3.. + mergeJoinOrdering[i].ColIdx = i + mergeJoinOrdering[i].Direction = leftOrdering[i].Direction + } + return leftEqualityIndices, rightEqualityIndices, mergeJoinOrdering, nil +} diff --git a/pkg/sql/join_predicate.go b/pkg/sql/join_predicate.go index 277406adb53a..9c84f1522515 100644 --- a/pkg/sql/join_predicate.go +++ b/pkg/sql/join_predicate.go @@ -76,7 +76,6 @@ func getJoinResultColumns( // they must be available internally for the ON condition evaluation). omitRightColumns := joinType == sqlbase.LeftSemiJoin || joinType == sqlbase.LeftAntiJoin - // Prepare the metadata for the result columns. // The structure of the join data source results is like this: // - all the left columns, // - then all the right columns (except for anti/semi join). diff --git a/pkg/sql/logictest/testdata/logic_test/distsql_join b/pkg/sql/logictest/testdata/logic_test/distsql_join index 18621c83cd9e..e887ba7f3346 100644 --- a/pkg/sql/logictest/testdata/logic_test/distsql_join +++ b/pkg/sql/logictest/testdata/logic_test/distsql_join @@ -54,8 +54,3 @@ query III SELECT pk, a, b FROM tab0 WHERE a < 10 AND b = 2 ORDER BY a DESC, pk; ---- 0 1 2 - -query T -SELECT feature_name FROM crdb_internal.feature_usage WHERE feature_name='sql.exec.query.is-distributed' AND usage_count > 0 ----- -sql.exec.query.is-distributed diff --git a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning index 759be4efda96..fb5ab3231dc0 100644 --- a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning +++ b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning @@ -60,3 +60,15 @@ SELECT * FROM kv WHERE k > v ---- 2 1 3 2 + +statement ok +INSERT INTO kv VALUES (4, NULL), (5, 3) + +query I +SELECT v FROM kv ORDER BY k +---- +1 +1 +2 +NULL +3 diff --git a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning_5node b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning_5node index e6ecfa31c73b..f18025bb4619 100644 --- a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning_5node +++ b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning_5node @@ -142,3 +142,23 @@ EXPLAIN SELECT k::REGCLASS FROM kv ---- · distribution partial · vectorized true + +# Check that hash join is supported by the new factory. +query II rowsort +SELECT kv.k, v FROM kv, kw WHERE v = w +---- +1 1 +2 2 +3 3 +4 4 +5 5 + +# Check that merge join is supported by the new factory. +query I +SELECT kv.k FROM kv, kw WHERE kv.k = kw.k ORDER BY 1 +---- +1 +2 +3 +4 +5 diff --git a/pkg/sql/logictest/testdata/logic_test/hash_join b/pkg/sql/logictest/testdata/logic_test/hash_join index 736ff50ee9d6..a3d68ebba65b 100644 --- a/pkg/sql/logictest/testdata/logic_test/hash_join +++ b/pkg/sql/logictest/testdata/logic_test/hash_join @@ -1,3 +1,5 @@ +# LogicTest: default-configs local-spec-planning fakedist-spec-planning + statement ok CREATE TABLE t1 (k INT PRIMARY KEY, v INT) diff --git a/pkg/sql/logictest/testdata/logic_test/hash_join_dist b/pkg/sql/logictest/testdata/logic_test/hash_join_dist index 5729c5de53c1..d98690996c4d 100644 --- a/pkg/sql/logictest/testdata/logic_test/hash_join_dist +++ b/pkg/sql/logictest/testdata/logic_test/hash_join_dist @@ -66,3 +66,8 @@ SELECT small.b, large.d FROM large RIGHT HASH JOIN small ON small.b = large.c AN 9 NULL 12 24 15 NULL + +query T +SELECT feature_name FROM crdb_internal.feature_usage WHERE feature_name='sql.exec.query.is-distributed' AND usage_count > 0 +---- +sql.exec.query.is-distributed diff --git a/pkg/sql/logictest/testdata/logic_test/merge_join b/pkg/sql/logictest/testdata/logic_test/merge_join index d3ea189fc18f..14de3c603a13 100644 --- a/pkg/sql/logictest/testdata/logic_test/merge_join +++ b/pkg/sql/logictest/testdata/logic_test/merge_join @@ -1,3 +1,5 @@ +# LogicTest: default-configs local-spec-planning fakedist-spec-planning + # Basic tables, no nulls statement ok diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 9aaaa4dd4490..4b31227f2ba8 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -396,40 +396,29 @@ func (ef *execFactory) ConstructMergeJoin( reqOrdering exec.OutputOrdering, leftEqColsAreKey, rightEqColsAreKey bool, ) (exec.Node, error) { + var err error p := ef.planner leftSrc := asDataSource(left) rightSrc := asDataSource(right) pred := makePredicate(joinType, leftSrc.columns, rightSrc.columns) pred.onCond = pred.iVarHelper.Rebind(onCond) + node := p.makeJoinNode(leftSrc, rightSrc, pred) pred.leftEqKey = leftEqColsAreKey pred.rightEqKey = rightEqColsAreKey - n := len(leftOrdering) - if n == 0 || len(rightOrdering) != n { - return nil, errors.Errorf("orderings from the left and right side must be the same non-zero length") + pred.leftEqualityIndices, pred.rightEqualityIndices, node.mergeJoinOrdering, err = getEqualityIndicesAndMergeJoinOrdering(leftOrdering, rightOrdering) + if err != nil { + return nil, err } - pred.leftEqualityIndices = make([]exec.NodeColumnOrdinal, n) - pred.rightEqualityIndices = make([]exec.NodeColumnOrdinal, n) + n := len(leftOrdering) pred.leftColNames = make(tree.NameList, n) pred.rightColNames = make(tree.NameList, n) for i := 0; i < n; i++ { leftColIdx, rightColIdx := leftOrdering[i].ColIdx, rightOrdering[i].ColIdx - pred.leftEqualityIndices[i] = exec.NodeColumnOrdinal(leftColIdx) - pred.rightEqualityIndices[i] = exec.NodeColumnOrdinal(rightColIdx) pred.leftColNames[i] = tree.Name(leftSrc.columns[leftColIdx].Name) pred.rightColNames[i] = tree.Name(rightSrc.columns[rightColIdx].Name) } - node := p.makeJoinNode(leftSrc, rightSrc, pred) - node.mergeJoinOrdering = make(sqlbase.ColumnOrdering, n) - for i := 0; i < n; i++ { - // The mergeJoinOrdering "columns" are equality column indices. Because of - // the way we constructed the equality indices, the ordering will always be - // 0,1,2,3.. - node.mergeJoinOrdering[i].ColIdx = i - node.mergeJoinOrdering[i].Direction = leftOrdering[i].Direction - } - // Set up node.props, which tells the distsql planner to maintain the // resulting ordering (if needed). node.reqOrdering = ReqOrdering(reqOrdering) diff --git a/pkg/sql/physicalplan/physical_plan.go b/pkg/sql/physicalplan/physical_plan.go index 2a43c43d6027..a06dfed1983f 100644 --- a/pkg/sql/physicalplan/physical_plan.go +++ b/pkg/sql/physicalplan/physical_plan.go @@ -959,7 +959,9 @@ func (p *PhysicalPlan) GenerateFlowSpecs() map[roachpb.NodeID]*execinfrapb.FlowS // The result routers for each side are returned (they point at processors in // the merged plan). func MergePlans( - mergedPlan *PhysicalPlan, left, right *PhysicalPlan, + mergedPlan *PhysicalPlan, + left, right *PhysicalPlan, + leftPlanDistribution, rightPlanDistribution PlanDistribution, ) (leftRouters []ProcessorIdx, rightRouters []ProcessorIdx) { mergedPlan.Processors = append(left.Processors, right.Processors...) rightProcStart := ProcessorIdx(len(left.Processors)) @@ -1003,6 +1005,7 @@ func MergePlans( mergedPlan.MaxEstimatedRowCount = left.MaxEstimatedRowCount } + mergedPlan.Distribution = leftPlanDistribution.compose(rightPlanDistribution) return leftRouters, rightRouters } @@ -1277,10 +1280,20 @@ func (p *PhysicalPlan) EnsureSingleStreamPerNode() { } } +// GetLastStageDistribution returns the distribution *only* of the last stage. +// Note that if the last stage consists of a single processor planned on a +// remote node, such stage is considered distributed. +func (p *PhysicalPlan) GetLastStageDistribution() PlanDistribution { + if len(p.ResultRouters) == 1 && p.Processors[p.ResultRouters[0]].Node == p.GatewayNodeID { + return LocalPlan + } + return FullyDistributedPlan +} + // IsLastStageDistributed returns whether the last stage of processors is // distributed (meaning that it contains at least one remote processor). func (p *PhysicalPlan) IsLastStageDistributed() bool { - return len(p.ResultRouters) > 1 || p.Processors[p.ResultRouters[0]].Node != p.GatewayNodeID + return p.GetLastStageDistribution() != LocalPlan } // PlanDistribution describes the distribution of the physical plan. From cfae01b5170ca55de3a086756fea81f5c635f3c3 Mon Sep 17 00:00:00 2001 From: Oliver Tan Date: Thu, 25 Jun 2020 08:40:17 -0700 Subject: [PATCH 4/5] build: update instructions for updating dependencies Release note: None --- build/README.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/build/README.md b/build/README.md index c4c94e95e046..0cd00c3db4ef 100644 --- a/build/README.md +++ b/build/README.md @@ -90,6 +90,9 @@ Dependencies are managed using `go mod`. We use `go mod vendor` so that we can i Run `go get -u `. To get a specific version, run `go get -u @`. +When updating a dependency, you should run `go mod tidy` after `go get` to ensure the old entries +are removed from go.sum. + You must then run `make -k vendor_rebuild` to ensure the modules are installed. These changes must then be committed in the submodule directory (see Working with Submodules). @@ -160,6 +163,18 @@ is important to re-run `go mod tidy `and `make -k vendor_rebuild` against the fetched, updated `vendor` ref, thus generating a new commit in the submodule that has as its parent the one from the earlier change. +### Recovering from a broken vendor directory + +If you happen to run into a broken `vendor` directory which is irrecoverable, +you can run the following commands which will restore the directory in +working order: + +``` +rm -rf vendor +git checkout HEAD vendor # you can replace HEAD with any branch/sha +git submodule update --init --recursive +``` + ### Repository Name We only want the vendor directory used by builds when it is explicitly checked From 3ad1f87b78fd6d38c8fbc615490c4cf403030ddd Mon Sep 17 00:00:00 2001 From: Rebecca Taft Date: Mon, 15 Jun 2020 21:02:53 -0500 Subject: [PATCH 5/5] opt: change GeoLookupJoin to InvertedLookupJoin This commit converts the GeoLookupJoin operator in the optimizer into a more general operator, InvertedLookupJoin. This new operator maps directly to the invertedJoiner DistSQL processor. In the future, the InvertedLookupJoin operator can also be used for lookup joins on inverted JSON and array indexes in addition to geospatial indexes. This commit also adds a new structure, geoDatumToInvertedExpr, which implements the DatumToInvertedExpr interface for geospatial data types. This will enable the optimized plan to be easily converted to a DistSQL plan for execution by the invertedJoiner. Release note: None --- pkg/sql/distsql_spec_exec_factory.go | 7 +- pkg/sql/execinfrapb/processors_sql.pb.go | 87 ++++++------ pkg/sql/execinfrapb/processors_sql.proto | 23 ++-- pkg/sql/opt/bench/stub_factory.go | 7 +- pkg/sql/opt/exec/execbuilder/relational.go | 16 ++- pkg/sql/opt/exec/factory.go | 19 ++- pkg/sql/opt/invertedexpr/expression.go | 13 ++ pkg/sql/opt/memo/expr.go | 2 +- pkg/sql/opt/memo/expr_format.go | 11 +- pkg/sql/opt/memo/interner.go | 9 -- pkg/sql/opt/memo/logical_props_builder.go | 16 +-- pkg/sql/opt/memo/statistics_builder.go | 55 ++++---- pkg/sql/opt/ops/relational.opt | 57 +++----- pkg/sql/opt/optgen/cmd/optgen/exprs_gen.go | 1 - pkg/sql/opt/optgen/cmd/optgen/metadata.go | 99 +++++++------- pkg/sql/opt/optgen/cmd/optgen/testdata/exprs | 2 - pkg/sql/opt/xform/coster.go | 10 +- pkg/sql/opt/xform/custom_funcs.go | 45 +++++-- pkg/sql/opt/xform/geo.go | 133 +++++++++++++++---- pkg/sql/opt/xform/testdata/rules/join | 9 +- pkg/sql/opt_exec_factory.go | 7 +- pkg/sql/rowexec/inverted_joiner.go | 26 ++-- pkg/sql/rowexec/inverted_joiner_test.go | 16 ++- 23 files changed, 380 insertions(+), 290 deletions(-) diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 0677a69789af..70188d41312f 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -11,7 +11,6 @@ package sql import ( - "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/opt" @@ -398,13 +397,13 @@ func (e *distSQLSpecExecFactory) ConstructLookupJoin( return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning") } -func (e *distSQLSpecExecFactory) ConstructGeoLookupJoin( +func (e *distSQLSpecExecFactory) ConstructInvertedJoin( joinType sqlbase.JoinType, - geoRelationshipType geoindex.RelationshipType, + invertedExpr tree.TypedExpr, input exec.Node, table cat.Table, index cat.Index, - geoCol exec.NodeColumnOrdinal, + inputCol exec.NodeColumnOrdinal, lookupCols exec.TableColumnOrdinalSet, onCond tree.TypedExpr, reqOrdering exec.OutputOrdering, diff --git a/pkg/sql/execinfrapb/processors_sql.pb.go b/pkg/sql/execinfrapb/processors_sql.pb.go index cf5b02d4bd4c..6257c8074125 100644 --- a/pkg/sql/execinfrapb/processors_sql.pb.go +++ b/pkg/sql/execinfrapb/processors_sql.pb.go @@ -64,7 +64,7 @@ func (x *ScanVisibility) UnmarshalJSON(data []byte) error { return nil } func (ScanVisibility) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{0} + return fileDescriptor_processors_sql_684af93f61811791, []int{0} } // These mirror the aggregate functions supported by sql/parser. See @@ -181,7 +181,7 @@ func (x *AggregatorSpec_Func) UnmarshalJSON(data []byte) error { return nil } func (AggregatorSpec_Func) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{12, 0} + return fileDescriptor_processors_sql_684af93f61811791, []int{12, 0} } type AggregatorSpec_Type int32 @@ -227,7 +227,7 @@ func (x *AggregatorSpec_Type) UnmarshalJSON(data []byte) error { return nil } func (AggregatorSpec_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{12, 1} + return fileDescriptor_processors_sql_684af93f61811791, []int{12, 1} } type WindowerSpec_WindowFunc int32 @@ -291,7 +291,7 @@ func (x *WindowerSpec_WindowFunc) UnmarshalJSON(data []byte) error { return nil } func (WindowerSpec_WindowFunc) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 0} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 0} } // Mode indicates which mode of framing is used. @@ -335,7 +335,7 @@ func (x *WindowerSpec_Frame_Mode) UnmarshalJSON(data []byte) error { return nil } func (WindowerSpec_Frame_Mode) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 1, 0} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 1, 0} } // BoundType indicates which type of boundary is used. @@ -382,7 +382,7 @@ func (x *WindowerSpec_Frame_BoundType) UnmarshalJSON(data []byte) error { return nil } func (WindowerSpec_Frame_BoundType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 1, 1} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 1, 1} } // Exclusion specifies the type of frame exclusion. @@ -425,7 +425,7 @@ func (x *WindowerSpec_Frame_Exclusion) UnmarshalJSON(data []byte) error { return nil } func (WindowerSpec_Frame_Exclusion) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 1, 2} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 1, 2} } // ValuesCoreSpec is the core of a processor that has no inputs and generates @@ -445,7 +445,7 @@ func (m *ValuesCoreSpec) Reset() { *m = ValuesCoreSpec{} } func (m *ValuesCoreSpec) String() string { return proto.CompactTextString(m) } func (*ValuesCoreSpec) ProtoMessage() {} func (*ValuesCoreSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{0} + return fileDescriptor_processors_sql_684af93f61811791, []int{0} } func (m *ValuesCoreSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -550,7 +550,7 @@ func (m *TableReaderSpec) Reset() { *m = TableReaderSpec{} } func (m *TableReaderSpec) String() string { return proto.CompactTextString(m) } func (*TableReaderSpec) ProtoMessage() {} func (*TableReaderSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{1} + return fileDescriptor_processors_sql_684af93f61811791, []int{1} } func (m *TableReaderSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -608,7 +608,7 @@ func (m *IndexSkipTableReaderSpec) Reset() { *m = IndexSkipTableReaderSp func (m *IndexSkipTableReaderSpec) String() string { return proto.CompactTextString(m) } func (*IndexSkipTableReaderSpec) ProtoMessage() {} func (*IndexSkipTableReaderSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{2} + return fileDescriptor_processors_sql_684af93f61811791, []int{2} } func (m *IndexSkipTableReaderSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -710,7 +710,7 @@ func (m *JoinReaderSpec) Reset() { *m = JoinReaderSpec{} } func (m *JoinReaderSpec) String() string { return proto.CompactTextString(m) } func (*JoinReaderSpec) ProtoMessage() {} func (*JoinReaderSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{3} + return fileDescriptor_processors_sql_684af93f61811791, []int{3} } func (m *JoinReaderSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -754,7 +754,7 @@ func (m *SorterSpec) Reset() { *m = SorterSpec{} } func (m *SorterSpec) String() string { return proto.CompactTextString(m) } func (*SorterSpec) ProtoMessage() {} func (*SorterSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{4} + return fileDescriptor_processors_sql_684af93f61811791, []int{4} } func (m *SorterSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -816,7 +816,7 @@ func (m *DistinctSpec) Reset() { *m = DistinctSpec{} } func (m *DistinctSpec) String() string { return proto.CompactTextString(m) } func (*DistinctSpec) ProtoMessage() {} func (*DistinctSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{5} + return fileDescriptor_processors_sql_684af93f61811791, []int{5} } func (m *DistinctSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -851,7 +851,7 @@ func (m *OrdinalitySpec) Reset() { *m = OrdinalitySpec{} } func (m *OrdinalitySpec) String() string { return proto.CompactTextString(m) } func (*OrdinalitySpec) ProtoMessage() {} func (*OrdinalitySpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{6} + return fileDescriptor_processors_sql_684af93f61811791, []int{6} } func (m *OrdinalitySpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -909,7 +909,7 @@ func (m *ZigzagJoinerSpec) Reset() { *m = ZigzagJoinerSpec{} } func (m *ZigzagJoinerSpec) String() string { return proto.CompactTextString(m) } func (*ZigzagJoinerSpec) ProtoMessage() {} func (*ZigzagJoinerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{7} + return fileDescriptor_processors_sql_684af93f61811791, []int{7} } func (m *ZigzagJoinerSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -985,7 +985,7 @@ func (m *MergeJoinerSpec) Reset() { *m = MergeJoinerSpec{} } func (m *MergeJoinerSpec) String() string { return proto.CompactTextString(m) } func (*MergeJoinerSpec) ProtoMessage() {} func (*MergeJoinerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{8} + return fileDescriptor_processors_sql_684af93f61811791, []int{8} } func (m *MergeJoinerSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1072,7 +1072,7 @@ func (m *HashJoinerSpec) Reset() { *m = HashJoinerSpec{} } func (m *HashJoinerSpec) String() string { return proto.CompactTextString(m) } func (*HashJoinerSpec) ProtoMessage() {} func (*HashJoinerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{9} + return fileDescriptor_processors_sql_684af93f61811791, []int{9} } func (m *HashJoinerSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1133,16 +1133,18 @@ type InvertedJoinerSpec struct { // Index of the column in the input stream that is to be joined with // the inverted index. LookupColumn uint32 `protobuf:"varint,3,opt,name=lookup_column,json=lookupColumn" json:"lookup_column"` - // Expression involving only the indexed column and the lookup column, where - // @1 refers to the lookup column and @2 to the indexed column. The - // expression is used to construct an implementation of - // RowToInvertedIndexExpr which will be fed each input row and output - // an expression to evaluate over the inverted index. + // Expression involving only the indexed column and the lookup column. + // Assuming that the input stream has N columns and the table that has been + // indexed has M columns, in this expression variables @1 to @N refer to + // columns of the input stream and variables @(N+1) to @(N+M) refer to + // columns in the table. Although the numbering includes all columns, only + // columns corresponding to the indexed column and the lookup column may be + // present in this expression. Note that the column numbering matches the + // numbering used below by the on expression. // - // TODO(sumeer): RowToInvertedIndexExpr will be added with the - // invertedJoiner implementation. And update this comment when all the - // expression generation machinery is in place to refer to actual code - // abstractions. + // The expression is passed to xform.NewDatumToInvertedExpr to construct an + // implementation of invertedexpr.DatumToInvertedExpr, which will be fed each + // input row and output an expression to evaluate over the inverted index. InvertedExpr Expression `protobuf:"bytes,4,opt,name=inverted_expr,json=invertedExpr" json:"inverted_expr"` // Optional expression involving the columns in the index (other than the // inverted column) and the columns in the input stream. Assuming that the @@ -1151,7 +1153,8 @@ type InvertedJoinerSpec struct { // input stream and variables @(N+1) to @(N+M) refer to columns in the // table. The numbering does not omit the column in the table corresponding // to the inverted column, or other table columns absent from the index, but - // they cannot be present in this expression. + // they cannot be present in this expression. Note that the column numbering + // matches the numbering used above by the inverted expression. OnExpr Expression `protobuf:"bytes,5,opt,name=on_expr,json=onExpr" json:"on_expr"` // Only INNER, LEFT_OUTER, LEFT_SEMI, LEFT_ANTI are supported. For indexes // that produce false positives for user expressions, like geospatial @@ -1166,7 +1169,7 @@ func (m *InvertedJoinerSpec) Reset() { *m = InvertedJoinerSpec{} } func (m *InvertedJoinerSpec) String() string { return proto.CompactTextString(m) } func (*InvertedJoinerSpec) ProtoMessage() {} func (*InvertedJoinerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{10} + return fileDescriptor_processors_sql_684af93f61811791, []int{10} } func (m *InvertedJoinerSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1218,7 +1221,7 @@ func (m *InvertedFiltererSpec) Reset() { *m = InvertedFiltererSpec{} } func (m *InvertedFiltererSpec) String() string { return proto.CompactTextString(m) } func (*InvertedFiltererSpec) ProtoMessage() {} func (*InvertedFiltererSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{11} + return fileDescriptor_processors_sql_684af93f61811791, []int{11} } func (m *InvertedFiltererSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1264,7 +1267,7 @@ func (m *AggregatorSpec) Reset() { *m = AggregatorSpec{} } func (m *AggregatorSpec) String() string { return proto.CompactTextString(m) } func (*AggregatorSpec) ProtoMessage() {} func (*AggregatorSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{12} + return fileDescriptor_processors_sql_684af93f61811791, []int{12} } func (m *AggregatorSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1315,7 +1318,7 @@ func (m *AggregatorSpec_Aggregation) Reset() { *m = AggregatorSpec_Aggre func (m *AggregatorSpec_Aggregation) String() string { return proto.CompactTextString(m) } func (*AggregatorSpec_Aggregation) ProtoMessage() {} func (*AggregatorSpec_Aggregation) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{12, 0} + return fileDescriptor_processors_sql_684af93f61811791, []int{12, 0} } func (m *AggregatorSpec_Aggregation) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1390,7 +1393,7 @@ func (m *InterleavedReaderJoinerSpec) Reset() { *m = InterleavedReaderJo func (m *InterleavedReaderJoinerSpec) String() string { return proto.CompactTextString(m) } func (*InterleavedReaderJoinerSpec) ProtoMessage() {} func (*InterleavedReaderJoinerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{13} + return fileDescriptor_processors_sql_684af93f61811791, []int{13} } func (m *InterleavedReaderJoinerSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1446,7 +1449,7 @@ func (m *InterleavedReaderJoinerSpec_Table) Reset() { *m = InterleavedRe func (m *InterleavedReaderJoinerSpec_Table) String() string { return proto.CompactTextString(m) } func (*InterleavedReaderJoinerSpec_Table) ProtoMessage() {} func (*InterleavedReaderJoinerSpec_Table) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{13, 0} + return fileDescriptor_processors_sql_684af93f61811791, []int{13, 0} } func (m *InterleavedReaderJoinerSpec_Table) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1486,7 +1489,7 @@ func (m *ProjectSetSpec) Reset() { *m = ProjectSetSpec{} } func (m *ProjectSetSpec) String() string { return proto.CompactTextString(m) } func (*ProjectSetSpec) ProtoMessage() {} func (*ProjectSetSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{14} + return fileDescriptor_processors_sql_684af93f61811791, []int{14} } func (m *ProjectSetSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1528,7 +1531,7 @@ func (m *WindowerSpec) Reset() { *m = WindowerSpec{} } func (m *WindowerSpec) String() string { return proto.CompactTextString(m) } func (*WindowerSpec) ProtoMessage() {} func (*WindowerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15} + return fileDescriptor_processors_sql_684af93f61811791, []int{15} } func (m *WindowerSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1564,7 +1567,7 @@ func (m *WindowerSpec_Func) Reset() { *m = WindowerSpec_Func{} } func (m *WindowerSpec_Func) String() string { return proto.CompactTextString(m) } func (*WindowerSpec_Func) ProtoMessage() {} func (*WindowerSpec_Func) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 0} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 0} } func (m *WindowerSpec_Func) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1600,7 +1603,7 @@ func (m *WindowerSpec_Frame) Reset() { *m = WindowerSpec_Frame{} } func (m *WindowerSpec_Frame) String() string { return proto.CompactTextString(m) } func (*WindowerSpec_Frame) ProtoMessage() {} func (*WindowerSpec_Frame) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 1} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 1} } func (m *WindowerSpec_Frame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1641,7 +1644,7 @@ func (m *WindowerSpec_Frame_Bound) Reset() { *m = WindowerSpec_Frame_Bou func (m *WindowerSpec_Frame_Bound) String() string { return proto.CompactTextString(m) } func (*WindowerSpec_Frame_Bound) ProtoMessage() {} func (*WindowerSpec_Frame_Bound) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 1, 0} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 1, 0} } func (m *WindowerSpec_Frame_Bound) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1677,7 +1680,7 @@ func (m *WindowerSpec_Frame_Bounds) Reset() { *m = WindowerSpec_Frame_Bo func (m *WindowerSpec_Frame_Bounds) String() string { return proto.CompactTextString(m) } func (*WindowerSpec_Frame_Bounds) ProtoMessage() {} func (*WindowerSpec_Frame_Bounds) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 1, 1} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 1, 1} } func (m *WindowerSpec_Frame_Bounds) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1727,7 +1730,7 @@ func (m *WindowerSpec_WindowFn) Reset() { *m = WindowerSpec_WindowFn{} } func (m *WindowerSpec_WindowFn) String() string { return proto.CompactTextString(m) } func (*WindowerSpec_WindowFn) ProtoMessage() {} func (*WindowerSpec_WindowFn) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 2} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 2} } func (m *WindowerSpec_WindowFn) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -7930,10 +7933,10 @@ var ( ) func init() { - proto.RegisterFile("sql/execinfrapb/processors_sql.proto", fileDescriptor_processors_sql_fccc336145676e72) + proto.RegisterFile("sql/execinfrapb/processors_sql.proto", fileDescriptor_processors_sql_684af93f61811791) } -var fileDescriptor_processors_sql_fccc336145676e72 = []byte{ +var fileDescriptor_processors_sql_684af93f61811791 = []byte{ // 2645 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x5a, 0x4b, 0x73, 0x1b, 0xc7, 0x11, 0xe6, 0xe2, 0x41, 0x02, 0x8d, 0x07, 0x57, 0x23, 0xda, 0x82, 0x21, 0x17, 0x45, 0xc1, 0x2f, diff --git a/pkg/sql/execinfrapb/processors_sql.proto b/pkg/sql/execinfrapb/processors_sql.proto index d997c266ba74..7711e1c5f233 100644 --- a/pkg/sql/execinfrapb/processors_sql.proto +++ b/pkg/sql/execinfrapb/processors_sql.proto @@ -499,16 +499,18 @@ message InvertedJoinerSpec { // The join expression is a conjunction of inverted_expr and on_expr. - // Expression involving only the indexed column and the lookup column, where - // @1 refers to the lookup column and @2 to the indexed column. The - // expression is used to construct an implementation of - // RowToInvertedIndexExpr which will be fed each input row and output - // an expression to evaluate over the inverted index. + // Expression involving only the indexed column and the lookup column. + // Assuming that the input stream has N columns and the table that has been + // indexed has M columns, in this expression variables @1 to @N refer to + // columns of the input stream and variables @(N+1) to @(N+M) refer to + // columns in the table. Although the numbering includes all columns, only + // columns corresponding to the indexed column and the lookup column may be + // present in this expression. Note that the column numbering matches the + // numbering used below by the on expression. // - // TODO(sumeer): RowToInvertedIndexExpr will be added with the - // invertedJoiner implementation. And update this comment when all the - // expression generation machinery is in place to refer to actual code - // abstractions. + // The expression is passed to xform.NewDatumToInvertedExpr to construct an + // implementation of invertedexpr.DatumToInvertedExpr, which will be fed each + // input row and output an expression to evaluate over the inverted index. optional Expression inverted_expr = 4 [(gogoproto.nullable) = false]; // Optional expression involving the columns in the index (other than the @@ -518,7 +520,8 @@ message InvertedJoinerSpec { // input stream and variables @(N+1) to @(N+M) refer to columns in the // table. The numbering does not omit the column in the table corresponding // to the inverted column, or other table columns absent from the index, but - // they cannot be present in this expression. + // they cannot be present in this expression. Note that the column numbering + // matches the numbering used above by the inverted expression. optional Expression on_expr = 5 [(gogoproto.nullable) = false]; // Only INNER, LEFT_OUTER, LEFT_SEMI, LEFT_ANTI are supported. For indexes diff --git a/pkg/sql/opt/bench/stub_factory.go b/pkg/sql/opt/bench/stub_factory.go index d984e42627f9..bfe965dfb5c9 100644 --- a/pkg/sql/opt/bench/stub_factory.go +++ b/pkg/sql/opt/bench/stub_factory.go @@ -11,7 +11,6 @@ package bench import ( - "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" @@ -165,13 +164,13 @@ func (f *stubFactory) ConstructLookupJoin( return struct{}{}, nil } -func (f *stubFactory) ConstructGeoLookupJoin( +func (f *stubFactory) ConstructInvertedJoin( joinType sqlbase.JoinType, - geoRelationshipType geoindex.RelationshipType, + invertedExpr tree.TypedExpr, input exec.Node, table cat.Table, index cat.Index, - geoCol exec.NodeColumnOrdinal, + inputCol exec.NodeColumnOrdinal, lookupCols exec.TableColumnOrdinalSet, onCond tree.TypedExpr, reqOrdering exec.OutputOrdering, diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index d89ab2562618..9badf367b049 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -215,8 +215,8 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) { case *memo.LookupJoinExpr: ep, err = b.buildLookupJoin(t) - case *memo.GeoLookupJoinExpr: - ep, err = b.buildGeoLookupJoin(t) + case *memo.InvertedJoinExpr: + ep, err = b.buildInvertedJoin(t) case *memo.ZigzagJoinExpr: ep, err = b.buildZigzagJoin(t) @@ -1361,7 +1361,7 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) { return res, nil } -func (b *Builder) buildGeoLookupJoin(join *memo.GeoLookupJoinExpr) (execPlan, error) { +func (b *Builder) buildInvertedJoin(join *memo.InvertedJoinExpr) (execPlan, error) { input, err := b.buildRelational(join.Input) if err != nil { return execPlan{}, err @@ -1385,6 +1385,10 @@ func (b *Builder) buildGeoLookupJoin(join *memo.GeoLookupJoinExpr) (execPlan, er ivh: tree.MakeIndexedVarHelper(nil /* container */, allCols.Len()), ivarMap: allCols, } + invertedExpr, err := b.buildScalar(&ctx, join.InvertedExpr) + if err != nil { + return execPlan{}, err + } onExpr, err := b.buildScalar(&ctx, &join.On) if err != nil { return execPlan{}, err @@ -1393,13 +1397,13 @@ func (b *Builder) buildGeoLookupJoin(join *memo.GeoLookupJoinExpr) (execPlan, er tab := md.Table(join.Table) idx := tab.Index(join.Index) - res.root, err = b.factory.ConstructGeoLookupJoin( + res.root, err = b.factory.ConstructInvertedJoin( joinOpToJoinType(join.JoinType), - join.GeoRelationshipType, + invertedExpr, input.root, tab, idx, - input.getNodeColumnOrdinal(join.GeoCol), + input.getNodeColumnOrdinal(join.InputCol), lookupOrdinals, onExpr, res.reqOrdering(join), diff --git a/pkg/sql/opt/exec/factory.go b/pkg/sql/opt/exec/factory.go index f8c9ae5bd6eb..2de8a6920507 100644 --- a/pkg/sql/opt/exec/factory.go +++ b/pkg/sql/opt/exec/factory.go @@ -13,7 +13,6 @@ package exec import ( "context" - "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" @@ -232,22 +231,22 @@ type Factory interface { reqOrdering OutputOrdering, ) (Node, error) - // ConstructGeoLookupJoin returns a node that performs a geospatial lookup - // join. geoRelationshipType describes the type of geospatial relationship - // represented by the join. geoCol is the geospatial column from the input - // that will be used to look up into the index; lookupCols are ordinals for - // the table columns we are retrieving. + // ConstructInvertedJoin returns a node that performs an inverted join. + // invertedExpr is used along with inputCol (a column from the input) to + // find the keys to look up in the index; lookupCols are ordinals for the + // table columns we are retrieving. // // The node produces the columns in the input and (unless join type is // LeftSemiJoin or LeftAntiJoin) the lookupCols, ordered by ordinal. The ON - // condition can refer to these using IndexedVars. - ConstructGeoLookupJoin( + // condition can refer to these using IndexedVars. Note that lookupCols does + // not include the inverted column. + ConstructInvertedJoin( joinType sqlbase.JoinType, - geoRelationshipType geoindex.RelationshipType, + invertedExpr tree.TypedExpr, input Node, table cat.Table, index cat.Index, - geoCol NodeColumnOrdinal, + inputCol NodeColumnOrdinal, lookupCols TableColumnOrdinalSet, onCond tree.TypedExpr, reqOrdering OutputOrdering, diff --git a/pkg/sql/opt/invertedexpr/expression.go b/pkg/sql/opt/invertedexpr/expression.go index d25717ac5e5b..9c16194ae50b 100644 --- a/pkg/sql/opt/invertedexpr/expression.go +++ b/pkg/sql/opt/invertedexpr/expression.go @@ -12,13 +12,26 @@ package invertedexpr import ( "bytes" + "context" "fmt" "strconv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/treeprinter" ) +// DatumToInvertedExpr is an interface that is used by the +// rowexec.invertedJoiner to extract a SpanExpressionProto given an +// input row. The rowexec.invertedJoiner calls Convert and uses the resulting +// SpanExpressionProto.SpansToRead to determine which spans to read from the +// inverted index. Then it computes a set expression on the scanned rows as +// defined by the SpanExpressionProto.Node. +type DatumToInvertedExpr interface { + // Convert uses the lookup column to construct an inverted expression. + Convert(context.Context, sqlbase.EncDatum) (*SpanExpressionProto, error) +} + // EncInvertedVal is the encoded form of a value in the inverted column. // This library does not care about how the value is encoded. The following // encoding comment is only relevant for integration purposes, and to justify diff --git a/pkg/sql/opt/memo/expr.go b/pkg/sql/opt/memo/expr.go index 8b3f0626bf4e..3470dbe619a6 100644 --- a/pkg/sql/opt/memo/expr.go +++ b/pkg/sql/opt/memo/expr.go @@ -453,7 +453,7 @@ func (lj *LookupJoinExpr) initUnexportedFields(mem *Memo) { // lookupProps are initialized as necessary by the logical props builder. } -func (gj *GeoLookupJoinExpr) initUnexportedFields(mem *Memo) { +func (gj *InvertedJoinExpr) initUnexportedFields(mem *Memo) { // lookupProps are initialized as necessary by the logical props builder. } diff --git a/pkg/sql/opt/memo/expr_format.go b/pkg/sql/opt/memo/expr_format.go index 6068a7937b2f..728f916d41db 100644 --- a/pkg/sql/opt/memo/expr_format.go +++ b/pkg/sql/opt/memo/expr_format.go @@ -192,8 +192,8 @@ func (f *ExprFmtCtx) formatRelational(e RelExpr, tp treeprinter.Node) { FormatPrivate(f, e.Private(), required) f.Buffer.WriteByte(')') - case *GeoLookupJoinExpr: - fmt.Fprintf(f.Buffer, "%v (geo-lookup", t.JoinType) + case *InvertedJoinExpr: + fmt.Fprintf(f.Buffer, "%v (inverted-lookup", t.JoinType) FormatPrivate(f, e.Private(), required) f.Buffer.WriteByte(')') @@ -451,11 +451,12 @@ func (f *ExprFmtCtx) formatRelational(e RelExpr, tp treeprinter.Node) { tp.Childf("lookup columns are key") } - case *GeoLookupJoinExpr: + case *InvertedJoinExpr: if !t.Flags.Empty() { tp.Childf("flags: %s", t.Flags.String()) } - tp.Childf("geo-relationship: %v", t.GeoRelationshipType) + n := tp.Child("inverted-expr") + f.formatExpr(t.InvertedExpr, n) case *ZigzagJoinExpr: if !f.HasFlags(ExprFmtHideColumns) { @@ -1259,7 +1260,7 @@ func FormatPrivate(f *ExprFmtCtx, private interface{}, physProps *physical.Requi fmt.Fprintf(f.Buffer, " %s@%s", tab.Name(), tab.Index(t.Index).Name()) } - case *GeoLookupJoinPrivate: + case *InvertedJoinPrivate: tab := f.Memo.metadata.Table(t.Table) fmt.Fprintf(f.Buffer, " %s@%s", tab.Name(), tab.Index(t.Index).Name()) diff --git a/pkg/sql/opt/memo/interner.go b/pkg/sql/opt/memo/interner.go index e73474d5c053..c88a58cfd573 100644 --- a/pkg/sql/opt/memo/interner.go +++ b/pkg/sql/opt/memo/interner.go @@ -17,7 +17,6 @@ import ( "reflect" "unsafe" - "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/props/physical" @@ -561,10 +560,6 @@ func (h *hasher) HashLockingItem(val *tree.LockingItem) { } } -func (h *hasher) HashGeoRelationshipType(val geoindex.RelationshipType) { - h.HashUint64(uint64(val)) -} - func (h *hasher) HashRelExpr(val RelExpr) { h.HashUint64(uint64(reflect.ValueOf(val).Pointer())) } @@ -906,10 +901,6 @@ func (h *hasher) IsLockingItemEqual(l, r *tree.LockingItem) bool { return l.Strength == r.Strength && l.WaitPolicy == r.WaitPolicy } -func (h *hasher) IsGeoRelationshipTypeEqual(l, r geoindex.RelationshipType) bool { - return l == r -} - func (h *hasher) IsPointerEqual(l, r unsafe.Pointer) bool { return l == r } diff --git a/pkg/sql/opt/memo/logical_props_builder.go b/pkg/sql/opt/memo/logical_props_builder.go index da289f11e9a5..a5791ff978bc 100644 --- a/pkg/sql/opt/memo/logical_props_builder.go +++ b/pkg/sql/opt/memo/logical_props_builder.go @@ -400,8 +400,8 @@ func (b *logicalPropsBuilder) buildLookupJoinProps(join *LookupJoinExpr, rel *pr b.buildJoinProps(join, rel) } -func (b *logicalPropsBuilder) buildGeoLookupJoinProps( - join *GeoLookupJoinExpr, rel *props.Relational, +func (b *logicalPropsBuilder) buildInvertedJoinProps( + join *InvertedJoinExpr, rel *props.Relational, ) { b.buildJoinProps(join, rel) } @@ -1702,11 +1702,9 @@ func ensureLookupJoinInputProps(join *LookupJoinExpr, sb *statisticsBuilder) *pr return relational } -// ensureGeoLookupJoinInputProps lazily populates the relational properties +// ensureInvertedJoinInputProps lazily populates the relational properties // that apply to the lookup side of the join, as if it were a Scan operator. -func ensureGeoLookupJoinInputProps( - join *GeoLookupJoinExpr, sb *statisticsBuilder, -) *props.Relational { +func ensureInvertedJoinInputProps(join *InvertedJoinExpr, sb *statisticsBuilder) *props.Relational { relational := &join.lookupProps if relational.OutputCols.Empty() { md := join.Memo().Metadata() @@ -1844,16 +1842,16 @@ func (h *joinPropsHelper) init(b *logicalPropsBuilder, joinExpr RelExpr) { h.filterIsTrue = false h.filterIsFalse = h.filters.IsFalse() - case *GeoLookupJoinExpr: + case *InvertedJoinExpr: h.leftProps = joinExpr.Child(0).(RelExpr).Relational() - ensureGeoLookupJoinInputProps(join, &b.sb) + ensureInvertedJoinInputProps(join, &b.sb) h.joinType = join.JoinType h.rightProps = &join.lookupProps h.filters = join.On b.addFiltersToFuncDep(h.filters, &h.filtersFD) h.filterNotNullCols = b.rejectNullCols(h.filters) - // Geospatial lookup join always has a filter condition on the index keys. + // Inverted join always has a filter condition on the index keys. h.filterIsTrue = false h.filterIsFalse = h.filters.IsFalse() diff --git a/pkg/sql/opt/memo/statistics_builder.go b/pkg/sql/opt/memo/statistics_builder.go index 5b59194ed265..38ad49004409 100644 --- a/pkg/sql/opt/memo/statistics_builder.go +++ b/pkg/sql/opt/memo/statistics_builder.go @@ -210,8 +210,8 @@ func (sb *statisticsBuilder) availabilityFromInput(e RelExpr) bool { ensureLookupJoinInputProps(t, sb) return t.lookupProps.Stats.Available && t.Input.Relational().Stats.Available - case *GeoLookupJoinExpr: - ensureGeoLookupJoinInputProps(t, sb) + case *InvertedJoinExpr: + ensureInvertedJoinInputProps(t, sb) return t.lookupProps.Stats.Available && t.Input.Relational().Stats.Available case *ZigzagJoinExpr: @@ -240,7 +240,7 @@ func (sb *statisticsBuilder) colStatFromInput( colSet opt.ColSet, e RelExpr, ) (*props.ColumnStatistic, *props.Statistics) { var lookupJoin *LookupJoinExpr - var geospatialLookupJoin *GeoLookupJoinExpr + var invertedJoin *InvertedJoinExpr var zigzagJoin *ZigzagJoinExpr switch t := e.(type) { @@ -254,16 +254,16 @@ func (sb *statisticsBuilder) colStatFromInput( lookupJoin = t ensureLookupJoinInputProps(lookupJoin, sb) - case *GeoLookupJoinExpr: - geospatialLookupJoin = t - ensureGeoLookupJoinInputProps(geospatialLookupJoin, sb) + case *InvertedJoinExpr: + invertedJoin = t + ensureInvertedJoinInputProps(invertedJoin, sb) case *ZigzagJoinExpr: zigzagJoin = t ensureZigzagJoinInputProps(zigzagJoin, sb) } - if lookupJoin != nil || geospatialLookupJoin != nil || zigzagJoin != nil || + if lookupJoin != nil || invertedJoin != nil || zigzagJoin != nil || opt.IsJoinOp(e) || e.Op() == opt.MergeJoinOp { var leftProps *props.Relational if zigzagJoin != nil { @@ -276,8 +276,8 @@ func (sb *statisticsBuilder) colStatFromInput( var intersectsRight bool if lookupJoin != nil { intersectsRight = lookupJoin.lookupProps.OutputCols.Intersects(colSet) - } else if geospatialLookupJoin != nil { - intersectsRight = geospatialLookupJoin.lookupProps.OutputCols.Intersects(colSet) + } else if invertedJoin != nil { + intersectsRight = invertedJoin.lookupProps.OutputCols.Intersects(colSet) } else if zigzagJoin != nil { intersectsRight = zigzagJoin.rightProps.OutputCols.Intersects(colSet) } else { @@ -300,10 +300,10 @@ func (sb *statisticsBuilder) colStatFromInput( return sb.colStatTable(lookupJoin.Table, colSet), sb.makeTableStatistics(lookupJoin.Table) } - if geospatialLookupJoin != nil { + if invertedJoin != nil { // TODO(rytaft): use inverted index stats when available. - return sb.colStatTable(geospatialLookupJoin.Table, colSet), - sb.makeTableStatistics(geospatialLookupJoin.Table) + return sb.colStatTable(invertedJoin.Table, colSet), + sb.makeTableStatistics(invertedJoin.Table) } if zigzagJoin != nil { return sb.colStatTable(zigzagJoin.RightTable, colSet), @@ -360,7 +360,7 @@ func (sb *statisticsBuilder) colStat(colSet opt.ColSet, e RelExpr) *props.Column case opt.InnerJoinOp, opt.LeftJoinOp, opt.RightJoinOp, opt.FullJoinOp, opt.SemiJoinOp, opt.AntiJoinOp, opt.InnerJoinApplyOp, opt.LeftJoinApplyOp, opt.SemiJoinApplyOp, opt.AntiJoinApplyOp, opt.MergeJoinOp, opt.LookupJoinOp, - opt.GeoLookupJoinOp, opt.ZigzagJoinOp: + opt.InvertedJoinOp, opt.ZigzagJoinOp: return sb.colStatJoin(colSet, e) case opt.IndexJoinOp: @@ -1009,8 +1009,8 @@ func (sb *statisticsBuilder) buildJoin( s.ApplySelectivity(sb.selectivityFromEquivalencies(equivReps, &h.filtersFD, join, s)) } - if join.Op() == opt.GeoLookupJoinOp { - s.ApplySelectivity(sb.selectivityFromGeoRelationship(join, s)) + if join.Op() == opt.InvertedJoinOp { + s.ApplySelectivity(sb.selectivityFromInvertedJoinCondition(join, s)) } s.ApplySelectivity(sb.selectivityFromHistograms(histCols, join, s)) s.ApplySelectivity(sb.selectivityFromMultiColDistinctCounts( @@ -1154,10 +1154,10 @@ func (sb *statisticsBuilder) colStatJoin(colSet opt.ColSet, join RelExpr) *props ensureLookupJoinInputProps(j, sb) rightProps = &j.lookupProps - case *GeoLookupJoinExpr: + case *InvertedJoinExpr: joinType = j.JoinType leftProps = j.Input.Relational() - ensureGeoLookupJoinInputProps(j, sb) + ensureInvertedJoinInputProps(j, sb) rightProps = &j.lookupProps case *ZigzagJoinExpr: @@ -2466,13 +2466,13 @@ func (sb *statisticsBuilder) rowsProcessed(e RelExpr) float64 { withoutOn := e.Memo().MemoizeLookupJoin(t.Input, nil /* on */, lookupJoinPrivate) return withoutOn.Relational().Stats.RowCount - case *GeoLookupJoinExpr: - var lookupJoinPrivate *GeoLookupJoinPrivate + case *InvertedJoinExpr: + var lookupJoinPrivate *InvertedJoinPrivate switch t.JoinType { case opt.SemiJoinOp, opt.SemiJoinApplyOp, opt.AntiJoinOp, opt.AntiJoinApplyOp: // The number of rows processed for semi and anti joins is closer to the // number of output rows for an equivalent inner join. - copy := t.GeoLookupJoinPrivate + copy := t.InvertedJoinPrivate copy.JoinType = semiAntiJoinToInnerJoin(t.JoinType) lookupJoinPrivate = © @@ -2482,12 +2482,12 @@ func (sb *statisticsBuilder) rowsProcessed(e RelExpr) float64 { // equals the number of output rows. return e.Relational().Stats.RowCount } - lookupJoinPrivate = &t.GeoLookupJoinPrivate + lookupJoinPrivate = &t.InvertedJoinPrivate } // We need to determine the row count of the join before the // ON conditions are applied. - withoutOn := e.Memo().MemoizeGeoLookupJoin(t.Input, nil /* on */, lookupJoinPrivate) + withoutOn := e.Memo().MemoizeInvertedJoin(t.Input, nil /* on */, lookupJoinPrivate) return withoutOn.Relational().Stats.RowCount case *MergeJoinExpr: @@ -2627,10 +2627,9 @@ const ( // it worth adding the overhead of using a histogram. minCardinalityForHistogram = 100 - // This is the default selectivity estimated for geospatial lookup joins - // until we can get better statistics on inverted indexes and geospatial - // columns. - unknownGeoRelationshipSelectivity = 1.0 / 100.0 + // This is the default selectivity estimated for inverted joins until we can + // get better statistics on inverted indexes. + unknownInvertedJoinSelectivity = 1.0 / 100.0 // multiColWeight is the weight to assign the selectivity calculation using // multi-column statistics versus the calculation using single-column @@ -3568,10 +3567,10 @@ func (sb *statisticsBuilder) selectivityFromEquivalencySemiJoin( return fraction(minDistinctCountRight, maxDistinctCountLeft) } -func (sb *statisticsBuilder) selectivityFromGeoRelationship( +func (sb *statisticsBuilder) selectivityFromInvertedJoinCondition( e RelExpr, s *props.Statistics, ) (selectivity float64) { - return unknownGeoRelationshipSelectivity + return unknownInvertedJoinSelectivity } func (sb *statisticsBuilder) selectivityFromUnappliedConjuncts( diff --git a/pkg/sql/opt/ops/relational.opt b/pkg/sql/opt/ops/relational.opt index aee61f6cd9a7..3dc074b6c64c 100644 --- a/pkg/sql/opt/ops/relational.opt +++ b/pkg/sql/opt/ops/relational.opt @@ -302,38 +302,16 @@ define LookupJoinPrivate { _ JoinPrivate } -# GeoLookupJoin represents a join between an input expression and an index, -# where the index is an inverted index on a Geometry or Geography column. -# -# A GeoLookupJoin can be generated for queries containing a join where one of -# the join conditions is a geospatial binary function such as ST_Covers or -# ST_CoveredBy, and at least one of the two inputs to the function is an -# indexed geospatial column. The type of geospatial function implies the -# GeoRelationshipType (Covers, CoveredBy or Intersects) for the join, which is -# stored in the GeoLookupJoinPrivate and affects how the join is executed. For -# a full list of the geospatial functions that can be index-accelerated and -# their corresponding GeoRelationshipTypes, see geoRelationshipMap in -# xform/custom_funcs.go. -# -# The GeoLookupJoin has no false negatives, but it may return false positives -# that would not have been returned by the original geospatial function -# join predicate. Therefore, the original function must still be applied on -# the output of the join. Since the inverted index does not actually include -# the geospatial column (or any other columns besides the primary key columns), -# the GeoLookupJoin will be wrapped in an index join. The geospatial function -# and any other filters on non-key columns will be appied as filters on the -# outer index join. +# InvertedJoin represents a join between an input expression and an inverted +# index. The type of join is in the InvertedJoinPrivate field. [Relational] -define GeoLookupJoin { +define InvertedJoin { Input RelExpr # On only contains filters on the input columns and primary key columns of - # the inverted index's base table. (Since the indexed geospatial column is - # not actually included in the index, the GeoLookupJoin must be wrapped in - # an index join, which will contain the original geospatial function as one - # of its On conditions.) + # the inverted index's base table. On FiltersExpr - _ GeoLookupJoinPrivate + _ InvertedJoinPrivate # lookupProps caches relational properties for the "table" side of the lookup # join, treating it as if it were another relational input. This makes the @@ -342,28 +320,29 @@ define GeoLookupJoin { } [Private] -define GeoLookupJoinPrivate { +define InvertedJoinPrivate { # JoinType is InnerJoin, LeftJoin, SemiJoin, or AntiJoin. JoinType Operator - # GeoRelationshipType is Covers, CoveredBy, or Intersects. - GeoRelationshipType GeoRelationshipType + # InvertedExpr is the inverted join condition. It is used to get the keys + # to lookup in the inverted index based on the value of the input column. + InvertedExpr ScalarExpr # Table identifies the table do to lookups in. Table TableID - # Index identifies the geospatial inverted index to do lookups in. It can - # be passed to the cat.Table.Index() method in order to fetch the cat.Index - # metadata. + # Index identifies the inverted index to do lookups in. It can be passed to + # the cat.Table.Index() method in order to fetch the cat.Index metadata. Index IndexOrdinal - # GeoCol is the geospatial column (produced by the input) used to - # determine the keys (i.e., s2 CellIDs) to scan in the inverted index. - GeoCol ColumnID + # InputCol is the column (produced by the input) that will be bound to + # InvertedExpr and used to determine the keys to scan in the inverted + # index. + InputCol ColumnID - # Cols is the set of columns produced by the geospatial lookup join. This - # set can contain columns from the input and columns from the index. Any - # columns not in the input are retrieved from the index. + # Cols is the set of columns produced by the inverted join. This set can + # contain columns from the input and columns from the index. Any columns + # not in the input are retrieved from the index. Cols ColSet _ JoinPrivate } diff --git a/pkg/sql/opt/optgen/cmd/optgen/exprs_gen.go b/pkg/sql/opt/optgen/cmd/optgen/exprs_gen.go index 7000680399ee..323780f1be6f 100644 --- a/pkg/sql/opt/optgen/cmd/optgen/exprs_gen.go +++ b/pkg/sql/opt/optgen/cmd/optgen/exprs_gen.go @@ -35,7 +35,6 @@ func (g *exprsGen) generate(compiled *lang.CompiledExpr, w io.Writer) { fmt.Fprintf(g.w, "import (\n") fmt.Fprintf(g.w, " \"unsafe\"\n") fmt.Fprintf(g.w, "\n") - fmt.Fprintf(g.w, " \"github.com/cockroachdb/cockroach/pkg/geo/geoindex\"\n") fmt.Fprintf(g.w, " \"github.com/cockroachdb/cockroach/pkg/sql/opt\"\n") fmt.Fprintf(g.w, " \"github.com/cockroachdb/cockroach/pkg/sql/opt/cat\"\n") fmt.Fprintf(g.w, " \"github.com/cockroachdb/cockroach/pkg/sql/opt/constraint\"\n") diff --git a/pkg/sql/opt/optgen/cmd/optgen/metadata.go b/pkg/sql/opt/optgen/cmd/optgen/metadata.go index 01345fddc995..49e0db977586 100644 --- a/pkg/sql/opt/optgen/cmd/optgen/metadata.go +++ b/pkg/sql/opt/optgen/cmd/optgen/metadata.go @@ -185,56 +185,55 @@ func newMetadata(compiled *lang.CompiledExpr, pkg string) *metadata { // Add all types used in Optgen defines here. md.types = map[string]*typeDef{ - "RelExpr": {fullName: "memo.RelExpr", isExpr: true, isInterface: true}, - "Expr": {fullName: "opt.Expr", isExpr: true, isInterface: true}, - "ScalarExpr": {fullName: "opt.ScalarExpr", isExpr: true, isInterface: true}, - "Operator": {fullName: "opt.Operator", passByVal: true}, - "ColumnID": {fullName: "opt.ColumnID", passByVal: true}, - "ColSet": {fullName: "opt.ColSet", passByVal: true}, - "ColList": {fullName: "opt.ColList", passByVal: true}, - "TableID": {fullName: "opt.TableID", passByVal: true}, - "SchemaID": {fullName: "opt.SchemaID", passByVal: true}, - "SequenceID": {fullName: "opt.SequenceID", passByVal: true}, - "UniqueID": {fullName: "opt.UniqueID", passByVal: true}, - "WithID": {fullName: "opt.WithID", passByVal: true}, - "Ordering": {fullName: "opt.Ordering", passByVal: true}, - "OrderingChoice": {fullName: "physical.OrderingChoice", passByVal: true}, - "TupleOrdinal": {fullName: "memo.TupleOrdinal", passByVal: true}, - "ScanLimit": {fullName: "memo.ScanLimit", passByVal: true}, - "ScanFlags": {fullName: "memo.ScanFlags", passByVal: true}, - "JoinFlags": {fullName: "memo.JoinFlags", passByVal: true}, - "WindowFrame": {fullName: "memo.WindowFrame", passByVal: true}, - "FKCascades": {fullName: "memo.FKCascades", passByVal: true}, - "ExplainOptions": {fullName: "tree.ExplainOptions", passByVal: true}, - "StatementType": {fullName: "tree.StatementType", passByVal: true}, - "ShowTraceType": {fullName: "tree.ShowTraceType", passByVal: true}, - "bool": {fullName: "bool", passByVal: true}, - "int": {fullName: "int", passByVal: true}, - "string": {fullName: "string", passByVal: true}, - "Type": {fullName: "types.T", isPointer: true}, - "Datum": {fullName: "tree.Datum", isInterface: true}, - "TypedExpr": {fullName: "tree.TypedExpr", isInterface: true}, - "Statement": {fullName: "tree.Statement", isInterface: true}, - "Subquery": {fullName: "tree.Subquery", isPointer: true, usePointerIntern: true}, - "CreateTable": {fullName: "tree.CreateTable", isPointer: true, usePointerIntern: true}, - "Constraint": {fullName: "constraint.Constraint", isPointer: true, usePointerIntern: true}, - "FuncProps": {fullName: "tree.FunctionProperties", isPointer: true, usePointerIntern: true}, - "FuncOverload": {fullName: "tree.Overload", isPointer: true, usePointerIntern: true}, - "PhysProps": {fullName: "physical.Required", isPointer: true}, - "Presentation": {fullName: "physical.Presentation", passByVal: true}, - "RelProps": {fullName: "props.Relational"}, - "RelPropsPtr": {fullName: "props.Relational", isPointer: true, usePointerIntern: true}, - "ScalarProps": {fullName: "props.Scalar"}, - "FuncDepSet": {fullName: "props.FuncDepSet"}, - "JoinMultiplicity": {fullName: "props.JoinMultiplicity"}, - "OpaqueMetadata": {fullName: "opt.OpaqueMetadata", isInterface: true}, - "JobCommand": {fullName: "tree.JobCommand", passByVal: true}, - "IndexOrdinal": {fullName: "cat.IndexOrdinal", passByVal: true}, - "ViewDeps": {fullName: "opt.ViewDeps", passByVal: true}, - "LockingItem": {fullName: "tree.LockingItem", isPointer: true}, - "MaterializeClause": {fullName: "tree.MaterializeClause", passByVal: true}, - "GeoRelationshipType": {fullName: "geoindex.RelationshipType", passByVal: true}, - "SpanExpression": {fullName: "invertedexpr.SpanExpression", isPointer: true, usePointerIntern: true}, + "RelExpr": {fullName: "memo.RelExpr", isExpr: true, isInterface: true}, + "Expr": {fullName: "opt.Expr", isExpr: true, isInterface: true}, + "ScalarExpr": {fullName: "opt.ScalarExpr", isExpr: true, isInterface: true}, + "Operator": {fullName: "opt.Operator", passByVal: true}, + "ColumnID": {fullName: "opt.ColumnID", passByVal: true}, + "ColSet": {fullName: "opt.ColSet", passByVal: true}, + "ColList": {fullName: "opt.ColList", passByVal: true}, + "TableID": {fullName: "opt.TableID", passByVal: true}, + "SchemaID": {fullName: "opt.SchemaID", passByVal: true}, + "SequenceID": {fullName: "opt.SequenceID", passByVal: true}, + "UniqueID": {fullName: "opt.UniqueID", passByVal: true}, + "WithID": {fullName: "opt.WithID", passByVal: true}, + "Ordering": {fullName: "opt.Ordering", passByVal: true}, + "OrderingChoice": {fullName: "physical.OrderingChoice", passByVal: true}, + "TupleOrdinal": {fullName: "memo.TupleOrdinal", passByVal: true}, + "ScanLimit": {fullName: "memo.ScanLimit", passByVal: true}, + "ScanFlags": {fullName: "memo.ScanFlags", passByVal: true}, + "JoinFlags": {fullName: "memo.JoinFlags", passByVal: true}, + "WindowFrame": {fullName: "memo.WindowFrame", passByVal: true}, + "FKCascades": {fullName: "memo.FKCascades", passByVal: true}, + "ExplainOptions": {fullName: "tree.ExplainOptions", passByVal: true}, + "StatementType": {fullName: "tree.StatementType", passByVal: true}, + "ShowTraceType": {fullName: "tree.ShowTraceType", passByVal: true}, + "bool": {fullName: "bool", passByVal: true}, + "int": {fullName: "int", passByVal: true}, + "string": {fullName: "string", passByVal: true}, + "Type": {fullName: "types.T", isPointer: true}, + "Datum": {fullName: "tree.Datum", isInterface: true}, + "TypedExpr": {fullName: "tree.TypedExpr", isInterface: true}, + "Statement": {fullName: "tree.Statement", isInterface: true}, + "Subquery": {fullName: "tree.Subquery", isPointer: true, usePointerIntern: true}, + "CreateTable": {fullName: "tree.CreateTable", isPointer: true, usePointerIntern: true}, + "Constraint": {fullName: "constraint.Constraint", isPointer: true, usePointerIntern: true}, + "FuncProps": {fullName: "tree.FunctionProperties", isPointer: true, usePointerIntern: true}, + "FuncOverload": {fullName: "tree.Overload", isPointer: true, usePointerIntern: true}, + "PhysProps": {fullName: "physical.Required", isPointer: true}, + "Presentation": {fullName: "physical.Presentation", passByVal: true}, + "RelProps": {fullName: "props.Relational"}, + "RelPropsPtr": {fullName: "props.Relational", isPointer: true, usePointerIntern: true}, + "ScalarProps": {fullName: "props.Scalar"}, + "FuncDepSet": {fullName: "props.FuncDepSet"}, + "JoinMultiplicity": {fullName: "props.JoinMultiplicity"}, + "OpaqueMetadata": {fullName: "opt.OpaqueMetadata", isInterface: true}, + "JobCommand": {fullName: "tree.JobCommand", passByVal: true}, + "IndexOrdinal": {fullName: "cat.IndexOrdinal", passByVal: true}, + "ViewDeps": {fullName: "opt.ViewDeps", passByVal: true}, + "LockingItem": {fullName: "tree.LockingItem", isPointer: true}, + "MaterializeClause": {fullName: "tree.MaterializeClause", passByVal: true}, + "SpanExpression": {fullName: "invertedexpr.SpanExpression", isPointer: true, usePointerIntern: true}, } // Add types of generated op and private structs. diff --git a/pkg/sql/opt/optgen/cmd/optgen/testdata/exprs b/pkg/sql/opt/optgen/cmd/optgen/testdata/exprs index c2fbab376808..c69d21555683 100644 --- a/pkg/sql/opt/optgen/cmd/optgen/testdata/exprs +++ b/pkg/sql/opt/optgen/cmd/optgen/testdata/exprs @@ -49,7 +49,6 @@ package memo import ( "unsafe" - "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" @@ -524,7 +523,6 @@ package memo import ( "unsafe" - "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" diff --git a/pkg/sql/opt/xform/coster.go b/pkg/sql/opt/xform/coster.go index 634e81d184d4..15f20c4547f9 100644 --- a/pkg/sql/opt/xform/coster.go +++ b/pkg/sql/opt/xform/coster.go @@ -187,8 +187,8 @@ func (c *coster) ComputeCost(candidate memo.RelExpr, required *physical.Required case opt.LookupJoinOp: cost = c.computeLookupJoinCost(candidate.(*memo.LookupJoinExpr), required) - case opt.GeoLookupJoinOp: - cost = c.computeGeoLookupJoinCost(candidate.(*memo.GeoLookupJoinExpr), required) + case opt.InvertedJoinOp: + cost = c.computeInvertedJoinCost(candidate.(*memo.InvertedJoinExpr), required) case opt.ZigzagJoinOp: cost = c.computeZigzagJoinCost(candidate.(*memo.ZigzagJoinExpr)) @@ -483,8 +483,8 @@ func (c *coster) computeLookupJoinCost( return cost } -func (c *coster) computeGeoLookupJoinCost( - join *memo.GeoLookupJoinExpr, required *physical.Required, +func (c *coster) computeInvertedJoinCost( + join *memo.InvertedJoinExpr, required *physical.Required, ) memo.Cost { lookupCount := join.Input.Relational().Stats.RowCount @@ -501,7 +501,7 @@ func (c *coster) computeGeoLookupJoinCost( // We shouldn't ever get here. Since we don't allow the memo // to be optimized twice, the coster should never be used after // logPropsBuilder.clear() is called. - panic(errors.AssertionFailedf("could not get rows processed for geolookup join")) + panic(errors.AssertionFailedf("could not get rows processed for inverted join")) } // Lookup joins can return early if enough rows have been found. An otherwise diff --git a/pkg/sql/opt/xform/custom_funcs.go b/pkg/sql/opt/xform/custom_funcs.go index 012e5820e206..e2d349f685de 100644 --- a/pkg/sql/opt/xform/custom_funcs.go +++ b/pkg/sql/opt/xform/custom_funcs.go @@ -14,6 +14,7 @@ import ( "fmt" "sort" + "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" @@ -974,7 +975,9 @@ func (c *CustomFuncs) GenerateInvertedIndexScans( // Check whether the filter can constrain the index. // TODO(rytaft): Unify these two cases so both return an invertedConstraint. - invertedConstraint, geoOk = c.tryConstrainGeoIndex(filters, scanPrivate.Table, iter.Index()) + invertedConstraint, geoOk = tryConstrainGeoIndex( + c.e.evalCtx.Context, filters, scanPrivate.Table, iter.Index(), + ) if geoOk { // Geo index scans can never be tight, so remaining filters is always the // same as filters. @@ -1826,6 +1829,10 @@ func (c *CustomFuncs) GenerateLookupJoins( // covering, all geospatial lookup joins must be wrapped in an index join with // the primary index of the table. See the description of Case 2 in the comment // above GenerateLookupJoins for details about how this works. +// TODO(rytaft): generalize this function to be GenerateInvertedJoins and add +// support for JSON and array inverted indexes. +// TODO(rytaft): handle more complicated geo-spatial expressions +// e.g. ST_Intersects(x, y) AND ST_Covers(x, y) where y is the indexed value. func (c *CustomFuncs) GenerateGeoLookupJoins( grp memo.RelExpr, joinType opt.Operator, @@ -1845,18 +1852,16 @@ func (c *CustomFuncs) GenerateGeoLookupJoins( return } - function := fn.(*memo.FunctionExpr) - inputProps := input.Relational() - - // Extract the geospatial relationship as well as the variable inputs to - // the geospatial function. - relationship, ok := geoRelationshipMap[function.Name] - if !ok { + if !IsGeoIndexFunction(fn) { panic(errors.AssertionFailedf( "GenerateGeoLookupJoins called on a function that cannot be index-accelerated", )) } + function := fn.(*memo.FunctionExpr) + inputProps := input.Relational() + + // Extract the the variable inputs to the geospatial function. if function.Args.ChildCount() < 2 { panic(errors.AssertionFailedf( "all index-accelerated geospatial functions should have at least two arguments", @@ -1912,27 +1917,27 @@ func (c *CustomFuncs) GenerateGeoLookupJoins( // primary key columns from it. indexCols := pkCols.ToSet() - lookupJoin := memo.GeoLookupJoinExpr{Input: input} + lookupJoin := memo.InvertedJoinExpr{Input: input} lookupJoin.JoinPrivate = *joinPrivate lookupJoin.JoinType = joinType lookupJoin.Table = scanPrivate.Table lookupJoin.Index = iter.IndexOrdinal() - lookupJoin.GeoRelationshipType = relationship - lookupJoin.GeoCol = inputGeoCol + lookupJoin.InvertedExpr = function + lookupJoin.InputCol = inputGeoCol lookupJoin.Cols = indexCols.Union(inputProps.OutputCols) var indexJoin memo.LookupJoinExpr // ON may have some conditions that are bound by the columns in the index // and some conditions that refer to other columns. We can put the former - // in the GeospatialLookupJoin and the latter in the index join. + // in the InvertedJoin and the latter in the index join. lookupJoin.On = c.ExtractBoundConditions(on, lookupJoin.Cols) indexJoin.On = c.ExtractUnboundConditions(on, lookupJoin.Cols) - indexJoin.Input = c.e.f.ConstructGeoLookupJoin( + indexJoin.Input = c.e.f.ConstructInvertedJoin( lookupJoin.Input, lookupJoin.On, - &lookupJoin.GeoLookupJoinPrivate, + &lookupJoin.InvertedJoinPrivate, ) indexJoin.JoinType = joinType indexJoin.Table = scanPrivate.Table @@ -3033,3 +3038,15 @@ func (c *CustomFuncs) AddPrimaryKeyColsToScanPrivate(sp *memo.ScanPrivate) *memo Locking: sp.Locking, } } + +// NewDatumToInvertedExpr returns a new DatumToInvertedExpr. Currently there +// is only one possible implementation returned, geoDatumToInvertedExpr. +func NewDatumToInvertedExpr( + expr tree.TypedExpr, desc *sqlbase.IndexDescriptor, +) (invertedexpr.DatumToInvertedExpr, error) { + if geoindex.IsEmptyConfig(&desc.GeoConfig) { + return nil, fmt.Errorf("inverted joins are currently only supported for geospatial indexes") + } + + return NewGeoDatumToInvertedExpr(expr, &desc.GeoConfig) +} diff --git a/pkg/sql/opt/xform/geo.go b/pkg/sql/opt/xform/geo.go index 743e8936aa23..3e847467a61e 100644 --- a/pkg/sql/opt/xform/geo.go +++ b/pkg/sql/opt/xform/geo.go @@ -11,15 +11,23 @@ package xform import ( + "context" + "fmt" + "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/invertedexpr" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/errors" ) +// This file contains functions for building geospatial inverted index scans +// and joins that are used throughout the xform package. + // geoRelationshipMap contains all the geospatial functions that can be index- // accelerated. Each function implies a certain type of geospatial relationship, // which affects how the index is queried as part of a constrained scan or @@ -57,29 +65,31 @@ func IsGeoIndexFunction(fn opt.ScalarExpr) bool { // geospatial relationship. It is implemented by getSpanExprForGeographyIndex // and getSpanExprForGeometryIndex and used in constrainGeoIndex. type getSpanExprForGeoIndexFn func( - tree.Datum, geoindex.RelationshipType, cat.Index, + context.Context, tree.Datum, geoindex.RelationshipType, *geoindex.Config, ) *invertedexpr.SpanExpression // tryConstrainGeoIndex tries to derive an inverted index constraint for the // given geospatial index from the specified filters. If a constraint is // derived, it is returned with ok=true. If no constraint can be derived, // then tryConstrainGeoIndex returns ok=false. -func (c *CustomFuncs) tryConstrainGeoIndex( - filters memo.FiltersExpr, tabID opt.TableID, index cat.Index, +func tryConstrainGeoIndex( + ctx context.Context, filters memo.FiltersExpr, tabID opt.TableID, index cat.Index, ) (invertedConstraint *invertedexpr.SpanExpression, ok bool) { config := index.GeoConfig() var getSpanExpr getSpanExprForGeoIndexFn if geoindex.IsGeographyConfig(config) { - getSpanExpr = c.getSpanExprForGeographyIndex + getSpanExpr = getSpanExprForGeographyIndex } else if geoindex.IsGeometryConfig(config) { - getSpanExpr = c.getSpanExprForGeometryIndex + getSpanExpr = getSpanExprForGeometryIndex } else { return nil, false } var invertedExpr invertedexpr.InvertedExpression for i := range filters { - invertedExprLocal := c.constrainGeoIndex(filters[i].Condition, tabID, index, getSpanExpr) + invertedExprLocal := constrainGeoIndex( + ctx, filters[i].Condition, tabID, index, getSpanExpr, + ) if invertedExpr == nil { invertedExpr = invertedExprLocal } else { @@ -101,23 +111,26 @@ func (c *CustomFuncs) tryConstrainGeoIndex( // getSpanExprForGeographyIndex gets a SpanExpression that constrains the given // geography index according to the given constant and geospatial relationship. -func (c *CustomFuncs) getSpanExprForGeographyIndex( - d tree.Datum, relationship geoindex.RelationshipType, index cat.Index, +func getSpanExprForGeographyIndex( + ctx context.Context, + d tree.Datum, + relationship geoindex.RelationshipType, + indexConfig *geoindex.Config, ) *invertedexpr.SpanExpression { - geogIdx := geoindex.NewS2GeographyIndex(*index.GeoConfig().S2Geography) + geogIdx := geoindex.NewS2GeographyIndex(*indexConfig.S2Geography) geog := d.(*tree.DGeography).Geography var spanExpr *invertedexpr.SpanExpression switch relationship { case geoindex.Covers: - unionKeySpans, err := geogIdx.Covers(c.e.evalCtx.Context, geog) + unionKeySpans, err := geogIdx.Covers(ctx, geog) if err != nil { panic(err) } spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) case geoindex.CoveredBy: - rpKeyExpr, err := geogIdx.CoveredBy(c.e.evalCtx.Context, geog) + rpKeyExpr, err := geogIdx.CoveredBy(ctx, geog) if err != nil { panic(err) } @@ -126,7 +139,7 @@ func (c *CustomFuncs) getSpanExprForGeographyIndex( } case geoindex.Intersects: - unionKeySpans, err := geogIdx.Intersects(c.e.evalCtx.Context, geog) + unionKeySpans, err := geogIdx.Intersects(ctx, geog) if err != nil { panic(err) } @@ -141,23 +154,26 @@ func (c *CustomFuncs) getSpanExprForGeographyIndex( // getSpanExprForGeometryIndex gets a SpanExpression that constrains the given // geometry index according to the given constant and geospatial relationship. -func (c *CustomFuncs) getSpanExprForGeometryIndex( - d tree.Datum, relationship geoindex.RelationshipType, index cat.Index, +func getSpanExprForGeometryIndex( + ctx context.Context, + d tree.Datum, + relationship geoindex.RelationshipType, + indexConfig *geoindex.Config, ) *invertedexpr.SpanExpression { - geomIdx := geoindex.NewS2GeometryIndex(*index.GeoConfig().S2Geometry) + geomIdx := geoindex.NewS2GeometryIndex(*indexConfig.S2Geometry) geom := d.(*tree.DGeometry).Geometry var spanExpr *invertedexpr.SpanExpression switch relationship { case geoindex.Covers: - unionKeySpans, err := geomIdx.Covers(c.e.evalCtx.Context, geom) + unionKeySpans, err := geomIdx.Covers(ctx, geom) if err != nil { panic(err) } spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) case geoindex.CoveredBy: - rpKeyExpr, err := geomIdx.CoveredBy(c.e.evalCtx.Context, geom) + rpKeyExpr, err := geomIdx.CoveredBy(ctx, geom) if err != nil { panic(err) } @@ -166,7 +182,7 @@ func (c *CustomFuncs) getSpanExprForGeometryIndex( } case geoindex.Intersects: - unionKeySpans, err := geomIdx.Intersects(c.e.evalCtx.Context, geom) + unionKeySpans, err := geomIdx.Intersects(ctx, geom) if err != nil { panic(err) } @@ -181,21 +197,25 @@ func (c *CustomFuncs) getSpanExprForGeometryIndex( // constrainGeoIndex returns an InvertedExpression representing a constraint // of the given geospatial index. -func (c *CustomFuncs) constrainGeoIndex( - expr opt.ScalarExpr, tabID opt.TableID, index cat.Index, getSpanExpr getSpanExprForGeoIndexFn, +func constrainGeoIndex( + ctx context.Context, + expr opt.ScalarExpr, + tabID opt.TableID, + index cat.Index, + getSpanExpr getSpanExprForGeoIndexFn, ) (_ invertedexpr.InvertedExpression) { var fn *memo.FunctionExpr switch t := expr.(type) { case *memo.AndExpr: return invertedexpr.And( - c.constrainGeoIndex(t.Left, tabID, index, getSpanExpr), - c.constrainGeoIndex(t.Right, tabID, index, getSpanExpr), + constrainGeoIndex(ctx, t.Left, tabID, index, getSpanExpr), + constrainGeoIndex(ctx, t.Right, tabID, index, getSpanExpr), ) case *memo.OrExpr: return invertedexpr.Or( - c.constrainGeoIndex(t.Left, tabID, index, getSpanExpr), - c.constrainGeoIndex(t.Right, tabID, index, getSpanExpr), + constrainGeoIndex(ctx, t.Left, tabID, index, getSpanExpr), + constrainGeoIndex(ctx, t.Right, tabID, index, getSpanExpr), ) case *memo.FunctionExpr: @@ -236,5 +256,68 @@ func (c *CustomFuncs) constrainGeoIndex( } relationship := geoRelationshipMap[fn.Name] - return getSpanExpr(d, relationship, index) + return getSpanExpr(ctx, d, relationship, index.GeoConfig()) +} + +// geoDatumToInvertedExpr implements invertedexpr.DatumToInvertedExpr for +// geospatial columns. +type geoDatumToInvertedExpr struct { + relationship geoindex.RelationshipType + indexConfig *geoindex.Config + typ *types.T + getSpanExpr getSpanExprForGeoIndexFn + alloc sqlbase.DatumAlloc +} + +var _ invertedexpr.DatumToInvertedExpr = &geoDatumToInvertedExpr{} + +// NewGeoDatumToInvertedExpr returns a new geoDatumToInvertedExpr. +func NewGeoDatumToInvertedExpr( + expr tree.TypedExpr, config *geoindex.Config, +) (invertedexpr.DatumToInvertedExpr, error) { + if geoindex.IsEmptyConfig(config) { + return nil, fmt.Errorf("inverted joins are currently only supported for geospatial indexes") + } + + fn, ok := expr.(*tree.FuncExpr) + if !ok { + return nil, fmt.Errorf("inverted joins are currently only supported for single geospatial functions") + } + + name := fn.Func.FunctionReference.String() + relationship, ok := geoRelationshipMap[name] + if !ok { + return nil, fmt.Errorf("%s cannot be index-accelerated", name) + } + + g := &geoDatumToInvertedExpr{ + relationship: relationship, + indexConfig: config, + } + if geoindex.IsGeographyConfig(config) { + g.typ = types.Geography + g.getSpanExpr = getSpanExprForGeographyIndex + } else if geoindex.IsGeometryConfig(config) { + g.typ = types.Geometry + g.getSpanExpr = getSpanExprForGeometryIndex + } else { + panic(errors.AssertionFailedf("not a geography or geometry index")) + } + + return g, nil +} + +// Convert implements the invertedexpr.DatumToInvertedExpr interface. +func (g *geoDatumToInvertedExpr) Convert( + ctx context.Context, d sqlbase.EncDatum, +) (*invertedexpr.SpanExpressionProto, error) { + if err := d.EnsureDecoded(g.typ, &g.alloc); err != nil { + return nil, err + } + spanExpr := g.getSpanExpr(ctx, d.Datum, g.relationship, g.indexConfig) + return spanExpr.ToProto(), nil +} + +func (g *geoDatumToInvertedExpr) String() string { + return fmt.Sprintf("geo-relationship: %v", g.relationship) } diff --git a/pkg/sql/opt/xform/testdata/rules/join b/pkg/sql/opt/xform/testdata/rules/join index f137659be5d3..9a8973dc56a2 100644 --- a/pkg/sql/opt/xform/testdata/rules/join +++ b/pkg/sql/opt/xform/testdata/rules/join @@ -1668,9 +1668,10 @@ project │ │ ├── lookup columns are key │ │ ├── immutable │ │ ├── fd: (9)==(12), (12)==(9) - │ │ ├── inner-join (geo-lookup nyc_census_blocks@nyc_census_blocks_geo_idx) + │ │ ├── inner-join (inverted-lookup nyc_census_blocks@nyc_census_blocks_geo_idx) │ │ │ ├── columns: c.gid:1!null n.boroname:12 name:13!null n.geom:14 - │ │ │ ├── geo-relationship: intersects + │ │ │ ├── inverted-expr + │ │ │ │ └── st_intersects(n.geom:14, c.geom:10) │ │ │ ├── select │ │ │ │ ├── columns: n.boroname:12 name:13!null n.geom:14 │ │ │ │ ├── scan n @@ -1723,9 +1724,9 @@ memo (optimized, ~23KB, required=[presentation: name:13,popn_per_sqkm:16]) │ ├── best: (select G14 G15) │ └── cost: 139.35 ├── G9: (filters G16 G17) - ├── G10: (geo-lookup-join G8 G18 nyc_census_blocks@nyc_census_blocks_geo_idx) + ├── G10: (inverted-join G8 G18 nyc_census_blocks@nyc_census_blocks_geo_idx) │ └── [] - │ ├── best: (geo-lookup-join G8 G18 nyc_census_blocks@nyc_census_blocks_geo_idx) + │ ├── best: (inverted-join G8 G18 nyc_census_blocks@nyc_census_blocks_geo_idx) │ └── cost: 1754.40 ├── G11: (sum G19) ├── G12: (variable sum) diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 82797b947367..b14d539312af 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -19,7 +19,6 @@ import ( "net/url" "strings" - "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" @@ -744,13 +743,13 @@ func (ef *execFactory) constructVirtualTableLookupJoin( return n, nil } -func (ef *execFactory) ConstructGeoLookupJoin( +func (ef *execFactory) ConstructInvertedJoin( joinType sqlbase.JoinType, - geoRelationshipType geoindex.RelationshipType, + invertedExpr tree.TypedExpr, input exec.Node, table cat.Table, index cat.Index, - geoCol exec.NodeColumnOrdinal, + inputCol exec.NodeColumnOrdinal, lookupCols exec.TableColumnOrdinalSet, onCond tree.TypedExpr, reqOrdering exec.OutputOrdering, diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index a925a88852b4..7ba9b39a78bf 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/opt/invertedexpr" + "github.com/cockroachdb/cockroach/pkg/sql/opt/xform" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/scrub" @@ -54,14 +55,6 @@ const ( ijEmittingRows ) -// DatumToInvertedExpr is constructed by the caller using -// InvertedJoinerSpec.InvertedExpr -- the invertedJoiner computes the returned -// expression. -type DatumToInvertedExpr interface { - // Convert uses the lookup column to construct an inverted expression. - Convert(sqlbase.EncDatum) (*invertedexpr.SpanExpressionProto, error) -} - type invertedJoiner struct { execinfra.ProcessorBase @@ -110,7 +103,7 @@ type invertedJoiner struct { input execinfra.RowSource inputTypes []*types.T lookupColumnIdx uint32 - datumToInvertedExpr DatumToInvertedExpr + datumToInvertedExpr invertedexpr.DatumToInvertedExpr // Batch size for fetches. Not a constant so we can lower for testing. batchSize int @@ -158,7 +151,7 @@ func newInvertedJoiner( flowCtx *execinfra.FlowCtx, processorID int32, spec *execinfrapb.InvertedJoinerSpec, - datumToInvertedExpr DatumToInvertedExpr, + datumToInvertedExpr invertedexpr.DatumToInvertedExpr, input execinfra.RowSource, post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, @@ -231,6 +224,17 @@ func newInvertedJoiner( } ij.combinedRow = make(sqlbase.EncDatumRow, 0, len(onExprColTypes)) + if ij.datumToInvertedExpr == nil { + var invertedExprHelper execinfra.ExprHelper + if err := invertedExprHelper.Init(spec.InvertedExpr, onExprColTypes, ij.EvalCtx); err != nil { + return nil, err + } + ij.datumToInvertedExpr, err = xform.NewDatumToInvertedExpr(invertedExprHelper.Expr, ij.index) + if err != nil { + return nil, err + } + } + var fetcher row.Fetcher // In general we need all the columns in the index to compute the set // expression. There may be InvertedJoinerSpec.InvertedExpr that are known @@ -387,7 +391,7 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce // result in an empty set as the evaluation result. ij.batchedExprEval.exprs = append(ij.batchedExprEval.exprs, nil) } else { - expr, err := ij.datumToInvertedExpr.Convert(row[ij.lookupColumnIdx]) + expr, err := ij.datumToInvertedExpr.Convert(ij.Ctx, row[ij.lookupColumnIdx]) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, ij.DrainHelper() diff --git a/pkg/sql/rowexec/inverted_joiner_test.go b/pkg/sql/rowexec/inverted_joiner_test.go index 320f81b536f2..6be51ef731de 100644 --- a/pkg/sql/rowexec/inverted_joiner_test.go +++ b/pkg/sql/rowexec/inverted_joiner_test.go @@ -59,10 +59,10 @@ const numRows = 99 // 50, since 50%10 = 0, 50/10 = 5. type arrayIntersectionExpr struct{} -var _ DatumToInvertedExpr = &arrayIntersectionExpr{} +var _ invertedexpr.DatumToInvertedExpr = &arrayIntersectionExpr{} func (arrayIntersectionExpr) Convert( - datum sqlbase.EncDatum, + ctx context.Context, datum sqlbase.EncDatum, ) (*invertedexpr.SpanExpressionProto, error) { d := int64(*(datum.Datum.(*tree.DInt))) d1Span := invertedexpr.MakeSingleInvertedValSpan(intToEncodedInvertedVal(d / 10)) @@ -79,10 +79,10 @@ func (arrayIntersectionExpr) Convert( // match a right side row with row index d. type jsonIntersectionExpr struct{} -var _ DatumToInvertedExpr = &jsonIntersectionExpr{} +var _ invertedexpr.DatumToInvertedExpr = &jsonIntersectionExpr{} func (jsonIntersectionExpr) Convert( - datum sqlbase.EncDatum, + ctx context.Context, datum sqlbase.EncDatum, ) (*invertedexpr.SpanExpressionProto, error) { d := int64(*(datum.Datum.(*tree.DInt))) d1 := d / 10 @@ -112,9 +112,11 @@ func (jsonIntersectionExpr) Convert( // {1..9, 15, 25, 35, ..., 95}. type jsonUnionExpr struct{} -var _ DatumToInvertedExpr = &jsonUnionExpr{} +var _ invertedexpr.DatumToInvertedExpr = &jsonUnionExpr{} -func (jsonUnionExpr) Convert(datum sqlbase.EncDatum) (*invertedexpr.SpanExpressionProto, error) { +func (jsonUnionExpr) Convert( + ctx context.Context, datum sqlbase.EncDatum, +) (*invertedexpr.SpanExpressionProto, error) { d := int64(*(datum.Datum.(*tree.DInt))) d1 := d / 10 d2 := d % 10 @@ -173,7 +175,7 @@ func TestInvertedJoiner(t *testing.T) { onExpr string input [][]tree.Datum lookupCol uint32 - datumToExpr DatumToInvertedExpr + datumToExpr invertedexpr.DatumToInvertedExpr joinType sqlbase.JoinType inputTypes []*types.T outputTypes []*types.T