Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

builtins: use ST_Union as aggregate #53127

Merged
merged 1 commit into from
Aug 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/generated/sql/aggregates.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@
</span></td></tr>
<tr><td><a name="st_makeline"></a><code>st_makeline(arg1: geometry) &rarr; geometry</code></td><td><span class="funcdesc"><p>Forms a LineString from Point, MultiPoint or LineStrings. Other shapes will be ignored.</p>
</span></td></tr>
<tr><td><a name="st_union"></a><code>st_union(arg1: geometry) &rarr; geometry</code></td><td><span class="funcdesc"><p>Applies a spatial union to the geometries provided.</p>
</span></td></tr>
<tr><td><a name="stddev"></a><code>stddev(arg1: <a href="decimal.html">decimal</a>) &rarr; <a href="decimal.html">decimal</a></code></td><td><span class="funcdesc"><p>Calculates the standard deviation of the selected values.</p>
</span></td></tr>
<tr><td><a name="stddev"></a><code>stddev(arg1: <a href="float.html">float</a>) &rarr; <a href="float.html">float</a></code></td><td><span class="funcdesc"><p>Calculates the standard deviation of the selected values.</p>
Expand Down
3 changes: 0 additions & 3 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1605,9 +1605,6 @@ Negative azimuth values and values greater than 2π (360 degrees) are supported.
</span></td></tr>
<tr><td><a name="st_translate"></a><code>st_translate(g: geometry, deltaX: <a href="float.html">float</a>, deltaY: <a href="float.html">float</a>) &rarr; geometry</code></td><td><span class="funcdesc"><p>Returns a modified Geometry translated by the given deltas</p>
</span></td></tr>
<tr><td><a name="st_union"></a><code>st_union(geometry_a: geometry, geometry_b: geometry) &rarr; geometry</code></td><td><span class="funcdesc"><p>Returns the union of the given geometries as a single Geometry object.</p>
<p>This function utilizes the GEOS module.</p>
</span></td></tr>
<tr><td><a name="st_within"></a><code>st_within(geometry_a: geometry, geometry_b: geometry) &rarr; <a href="bool.html">bool</a></code></td><td><span class="funcdesc"><p>Returns true if geometry_a is completely inside geometry_b.</p>
<p>This function utilizes the GEOS module.</p>
<p>This function variant will attempt to utilize any available geospatial index.</p>
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/distsql/columnar_operators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var aggregateFuncToNumArguments = map[execinfrapb.AggregatorSpec_Func]int{
execinfrapb.AggregatorSpec_STDDEV_POP: 1,
execinfrapb.AggregatorSpec_ST_MAKELINE: 1,
execinfrapb.AggregatorSpec_ST_EXTENT: 1,
execinfrapb.AggregatorSpec_ST_UNION: 1,
}

// TestAggregateFuncToNumArguments ensures that all aggregate functions are
Expand Down Expand Up @@ -183,9 +184,9 @@ func TestAggregatorAgainstProcessor(t *testing.T) {
for j := range aggFnInputTypes {
aggFnInputTypes[j] = sqlbase.RandType(rng)
}
// There is a special case for concat_agg, string_agg,
// st_makeline, and st_extent when at least one argument is a
// tuple. Such cases pass GetAggregateInfo check below,
// There is a special case for some functions when at
// least one argument is a tuple.
// Such cases pass GetAggregateInfo check below,
// but they are actually invalid, and during normal
// execution it is caught during type-checking.
// However, we don't want to do fully-fledged type
Expand All @@ -195,7 +196,8 @@ func TestAggregatorAgainstProcessor(t *testing.T) {
case execinfrapb.AggregatorSpec_CONCAT_AGG,
execinfrapb.AggregatorSpec_STRING_AGG,
execinfrapb.AggregatorSpec_ST_MAKELINE,
execinfrapb.AggregatorSpec_ST_EXTENT:
execinfrapb.AggregatorSpec_ST_EXTENT,
execinfrapb.AggregatorSpec_ST_UNION:
for _, typ := range aggFnInputTypes {
if typ.Family() == types.TupleFamily {
invalid = true
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ import (
//
// ATTENTION: When updating these fields, add to version_history.txt explaining
// what changed.
const Version execinfrapb.DistSQLVersion = 32
const Version execinfrapb.DistSQLVersion = 33

// MinAcceptedVersion is the oldest version that the server is
// compatible with; see above.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfra/version_history.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,6 @@
- Version: 32 (MinAcceptedVersion: 30)
- Added an aggregator for ST_Extent. The change is backwards compatible
(mixed versions will prevent parallelization).
- Version: 33 (MinAcceptedVersion: 30)
- Added an aggregator for ST_Union. The change is backwards compatible
(mixed versions will prevent parallelization).
415 changes: 209 additions & 206 deletions pkg/sql/execinfrapb/processors_sql.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/sql/execinfrapb/processors_sql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ message AggregatorSpec {
STDDEV_POP = 30;
ST_MAKELINE = 31;
ST_EXTENT = 32;
ST_UNION = 33;
}

enum Type {
Expand Down
170 changes: 17 additions & 153 deletions pkg/sql/logictest/testdata/logic_test/geospatial

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions pkg/sql/opt/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ var AggregateOpReverseMap = map[Operator]string{
VarPopOp: "var_pop",
StdDevPopOp: "stddev_pop",
STMakeLineOp: "st_makeline",
STUnionOp: "st_union",
STExtentOp: "st_extent",
}

Expand Down Expand Up @@ -305,7 +306,7 @@ func AggregateIgnoresNulls(op Operator) bool {
case AnyNotNullAggOp, AvgOp, BitAndAggOp, BitOrAggOp, BoolAndOp, BoolOrOp,
ConstNotNullAggOp, CorrOp, CountOp, MaxOp, MinOp, SqrDiffOp, StdDevOp,
StringAggOp, SumOp, SumIntOp, VarianceOp, XorAggOp, PercentileDiscOp,
PercentileContOp, STMakeLineOp, STExtentOp, StdDevPopOp, VarPopOp:
PercentileContOp, STMakeLineOp, STExtentOp, STUnionOp, StdDevPopOp, VarPopOp:
return true

case ArrayAggOp, ConcatAggOp, ConstAggOp, CountRowsOp, FirstAggOp, JsonAggOp,
Expand All @@ -328,7 +329,7 @@ func AggregateIsNullOnEmpty(op Operator) bool {
ConstNotNullAggOp, CorrOp, FirstAggOp, JsonAggOp, JsonbAggOp,
MaxOp, MinOp, SqrDiffOp, StdDevOp, STMakeLineOp, StringAggOp, SumOp, SumIntOp,
VarianceOp, XorAggOp, PercentileDiscOp, PercentileContOp,
JsonObjectAggOp, JsonbObjectAggOp, StdDevPopOp, STExtentOp, VarPopOp:
JsonObjectAggOp, JsonbObjectAggOp, StdDevPopOp, STExtentOp, STUnionOp, VarPopOp:
return true

case CountOp, CountRowsOp:
Expand All @@ -354,7 +355,7 @@ func AggregateIsNeverNullOnNonNullInput(op Operator) bool {
ConstNotNullAggOp, CountOp, CountRowsOp, FirstAggOp,
JsonAggOp, JsonbAggOp, MaxOp, MinOp, SqrDiffOp, STMakeLineOp,
StringAggOp, SumOp, SumIntOp, XorAggOp, PercentileDiscOp, PercentileContOp,
JsonObjectAggOp, JsonbObjectAggOp, StdDevPopOp, STExtentOp, VarPopOp:
JsonObjectAggOp, JsonbObjectAggOp, StdDevPopOp, STExtentOp, STUnionOp, VarPopOp:
return true

case VarianceOp, StdDevOp, CorrOp:
Expand Down Expand Up @@ -395,7 +396,7 @@ func AggregatesCanMerge(inner, outer Operator) bool {

case AnyNotNullAggOp, BitAndAggOp, BitOrAggOp, BoolAndOp,
BoolOrOp, ConstAggOp, ConstNotNullAggOp, FirstAggOp,
MaxOp, MinOp, STMakeLineOp, STExtentOp, SumOp, SumIntOp, XorAggOp:
MaxOp, MinOp, STMakeLineOp, STExtentOp, STUnionOp, SumOp, SumIntOp, XorAggOp:
return inner == outer

case CountOp, CountRowsOp:
Expand All @@ -418,7 +419,7 @@ func AggregatesCanMerge(inner, outer Operator) bool {
func AggregateIgnoresDuplicates(op Operator) bool {
switch op {
case AnyNotNullAggOp, BitAndAggOp, BitOrAggOp, BoolAndOp, BoolOrOp,
ConstAggOp, ConstNotNullAggOp, FirstAggOp, MaxOp, MinOp, STExtentOp:
ConstAggOp, ConstNotNullAggOp, FirstAggOp, MaxOp, MinOp, STExtentOp, STUnionOp:
return true

case ArrayAggOp, AvgOp, ConcatAggOp, CountOp, CorrOp, CountRowsOp, SumIntOp,
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/opt/ops/scalar.opt
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,11 @@ define STExtent {
Input ScalarExpr
}

[Scalar, Aggregate]
define STUnion {
Input ScalarExpr
}

[Scalar, Aggregate]
define XorAgg {
Input ScalarExpr
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/opt/optbuilder/groupby.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,8 @@ func (b *Builder) constructAggregate(name string, args []opt.ScalarExpr) opt.Sca
return b.factory.ConstructSTMakeLine(args[0])
case "st_extent":
return b.factory.ConstructSTExtent(args[0])
case "st_union":
return b.factory.ConstructSTUnion(args[0])
case "xor_agg":
return b.factory.ConstructXorAgg(args[0])
case "json_agg":
Expand Down
84 changes: 84 additions & 0 deletions pkg/sql/sem/builtins/aggregate_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

"github.com/cockroachdb/apd/v2"
"github.com/cockroachdb/cockroach/pkg/geo"
"github.com/cockroachdb/cockroach/pkg/geo/geopb"
"github.com/cockroachdb/cockroach/pkg/geo/geos"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -398,6 +400,26 @@ var aggregates = map[string]builtinDefinition{
tree.VolatilityImmutable,
),
),
"st_union": makeBuiltin(
tree.FunctionProperties{
Class: tree.AggregateClass,
NullableArgs: true,
AvailableOnPublicSchema: true,
},
makeAggOverload(
[]*types.T{types.Geometry},
types.Geometry,
func(
params []*types.T, evalCtx *tree.EvalContext, arguments tree.Datums,
) tree.AggregateFunc {
return &stUnionAgg{}
},
infoBuilder{
info: "Applies a spatial union to the geometries provided.",
}.String(),
tree.VolatilityImmutable,
),
),

AnyNotNull: makePrivate(makeBuiltin(aggProps(),
makeAggOverloadWithReturnType(
Expand Down Expand Up @@ -649,6 +671,66 @@ func (agg *stMakeLineAgg) Size() int64 {
return sizeOfSTMakeLineAggregate
}

type stUnionAgg struct {
srid geopb.SRID
// TODO(#geo): store the current union object in C memory, to avoid the EWKB round trips.
ewkb geopb.EWKB
set bool
}

// Add implements the AggregateFunc interface.
func (agg *stUnionAgg) Add(_ context.Context, firstArg tree.Datum, otherArgs ...tree.Datum) error {
if firstArg == tree.DNull {
return nil
}
geomArg := tree.MustBeDGeometry(firstArg)
if !agg.set {
agg.ewkb = geomArg.EWKB()
agg.set = true
agg.srid = geomArg.SRID()
return nil
}
if agg.srid != geomArg.SRID() {
c, err := geo.ParseGeometryFromEWKB(agg.ewkb)
if err != nil {
return err
}
return geo.NewMismatchingSRIDsError(geomArg.Geometry, c)
}
var err error
// TODO(#geo):We are allocating a slice for the result each time we
// call geos.Union in cStringToSafeGoBytes.
// We could change geos.Union to accept the existing slice.
agg.ewkb, err = geos.Union(agg.ewkb, geomArg.EWKB())
return err
}

// Result implements the AggregateFunc interface.
func (agg *stUnionAgg) Result() (tree.Datum, error) {
if !agg.set {
return tree.DNull, nil
}
g, err := geo.ParseGeometryFromEWKB(agg.ewkb)
if err != nil {
return nil, err
}
return tree.NewDGeometry(g), nil
}

// Reset implements the AggregateFunc interface.
func (agg *stUnionAgg) Reset(context.Context) {
agg.ewkb = nil
agg.set = false
}

// Close implements the AggregateFunc interface.
func (agg *stUnionAgg) Close(context.Context) {}

// Size implements the AggregateFunc interface.
func (agg *stUnionAgg) Size() int64 {
return sizeOfSTUnionAggregate
}

type stExtentAgg struct {
bbox *geo.CartesianBoundingBox
}
Expand Down Expand Up @@ -739,6 +821,7 @@ var _ tree.AggregateFunc = &bitBitOrAggregate{}
var _ tree.AggregateFunc = &percentileDiscAggregate{}
var _ tree.AggregateFunc = &percentileContAggregate{}
var _ tree.AggregateFunc = &stMakeLineAgg{}
var _ tree.AggregateFunc = &stUnionAgg{}
var _ tree.AggregateFunc = &stExtentAgg{}

const sizeOfArrayAggregate = int64(unsafe.Sizeof(arrayAggregate{}))
Expand Down Expand Up @@ -779,6 +862,7 @@ const sizeOfBitBitOrAggregate = int64(unsafe.Sizeof(bitBitOrAggregate{}))
const sizeOfPercentileDiscAggregate = int64(unsafe.Sizeof(percentileDiscAggregate{}))
const sizeOfPercentileContAggregate = int64(unsafe.Sizeof(percentileContAggregate{}))
const sizeOfSTMakeLineAggregate = int64(unsafe.Sizeof(stMakeLineAgg{}))
const sizeOfSTUnionAggregate = int64(unsafe.Sizeof(stUnionAgg{}))
const sizeOfSTExtentAggregate = int64(unsafe.Sizeof(stExtentAgg{}))

// singleDatumAggregateBase is a utility struct that helps aggregate builtins
Expand Down
18 changes: 0 additions & 18 deletions pkg/sql/sem/builtins/geo_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -2973,24 +2973,6 @@ For flags=1, validity considers self-intersecting rings forming holes as valid a
tree.VolatilityImmutable,
),
),
"st_union": makeBuiltin(
defProps(),
geometryOverload2(
func(ctx *tree.EvalContext, a *tree.DGeometry, b *tree.DGeometry) (tree.Datum, error) {
union, err := geomfn.Union(a.Geometry, b.Geometry)
if err != nil {
return nil, err
}
return tree.NewDGeometry(union), err
},
types.Geometry,
infoBuilder{
info: "Returns the union of the given geometries as a single Geometry object.",
libraryUsage: usesGEOS,
},
tree.VolatilityImmutable,
),
),

//
// Transformations
Expand Down