Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d51c0dc
[SPARK-28227][SQL] Support TRANSFORM with aggregation
AngersZhuuuu Jul 13, 2020
5d85160
Fix UT
AngersZhuuuu Jul 14, 2020
dbb4d04
Merge branch 'master' into SPARK-28227-NEW
AngersZhuuuu Sep 4, 2020
2b8912e
Update SparkSqlParserSuite.scala
AngersZhuuuu Sep 4, 2020
d89afa9
Update SparkSqlParserSuite.scala
AngersZhuuuu Sep 4, 2020
b1cc739
Merge branch 'master' into SPARK-28227-NEW
AngersZhuuuu Dec 22, 2020
f2a640b
solve import
AngersZhuuuu Dec 22, 2020
b04909c
follow comment
AngersZhuuuu Dec 22, 2020
671711b
Update SQLQuerySuite.scala
AngersZhuuuu Dec 22, 2020
1a4262b
Merge branch 'master' into SPARK-28227-NEW
AngersZhuuuu Dec 23, 2020
8df104b
Fix UT
AngersZhuuuu Dec 23, 2020
1b4e0c1
update
AngersZhuuuu Dec 23, 2020
a85753f
Merge branch 'master' into SPARK-28227-NEW
AngersZhuuuu Dec 23, 2020
3eb8d11
follow commnet
AngersZhuuuu Jan 4, 2021
2e146c3
follow comment
AngersZhuuuu Jan 4, 2021
ca5a032
follow comment
AngersZhuuuu Jan 5, 2021
614f8f9
Add UT
AngersZhuuuu Jan 5, 2021
ee16a2f
follow comment
AngersZhuuuu Jan 5, 2021
9ef73b6
Update SparkSqlParserSuite.scala
AngersZhuuuu Jan 5, 2021
707f1e6
Merge branch 'master' into SPARK-28227-NEW
AngersZhuuuu Jan 6, 2021
327566f
Update transform.sql.out
AngersZhuuuu Jan 6, 2021
fa293cd
Update AstBuilder.scala
AngersZhuuuu Jan 6, 2021
a8233d4
Remove input parameters
AngersZhuuuu Jan 7, 2021
c3d423a
Revert "Remove input parameters"
AngersZhuuuu Jan 7, 2021
cf4085a
follow comment
AngersZhuuuu Jan 7, 2021
1278705
Merge branch 'master' into SPARK-28227-NEW
AngersZhuuuu Mar 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,11 @@ fromStatementBody
querySpecification
: transformClause
fromClause?
whereClause? #transformQuerySpecification
lateralView*
whereClause?
aggregationClause?
havingClause?
windowClause? #transformQuerySpecification
| selectClause
fromClause?
lateralView*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1558,14 +1558,9 @@ class Analyzer(override val catalogManager: CatalogManager)
} else {
a.copy(aggregateExpressions = buildExpandedProjectList(a.aggregateExpressions, a.child))
}
// If the script transformation input contains Stars, expand it.
// TODO: Remove this logic and see SPARK-34035
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not forget to fix this, @AngersZhuuuu

case t: ScriptTransformation if containsStar(t.input) =>
t.copy(
input = t.input.flatMap {
case s: Star => s.expand(t.child, resolver)
case o => o :: Nil
}
)
t.copy(input = t.child.output)
case g: Generate if containsStar(g.generator.children) =>
throw QueryCompilationErrors.invalidStarUsageError("explode/json_tuple/UDTF")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
withTransformQuerySpecification(
ctx,
ctx.transformClause,
ctx.lateralView,
ctx.whereClause,
ctx.aggregationClause,
ctx.havingClause,
ctx.windowClause,
plan
)
} else {
Expand Down Expand Up @@ -587,7 +591,16 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
val from = OneRowRelation().optional(ctx.fromClause) {
visitFromClause(ctx.fromClause)
}
withTransformQuerySpecification(ctx, ctx.transformClause, ctx.whereClause, from)
withTransformQuerySpecification(
ctx,
ctx.transformClause,
ctx.lateralView,
ctx.whereClause,
ctx.aggregationClause,
ctx.havingClause,
ctx.windowClause,
from
)
}

override def visitRegularQuerySpecification(
Expand Down Expand Up @@ -641,14 +654,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
private def withTransformQuerySpecification(
ctx: ParserRuleContext,
transformClause: TransformClauseContext,
lateralView: java.util.List[LateralViewContext],
whereClause: WhereClauseContext,
relation: LogicalPlan): LogicalPlan = withOrigin(ctx) {
// Add where.
val withFilter = relation.optionalMap(whereClause)(withWhereClause)

// Create the transform.
val expressions = visitNamedExpressionSeq(transformClause.namedExpressionSeq)

aggregationClause: AggregationClauseContext,
havingClause: HavingClauseContext,
windowClause: WindowClauseContext,
relation: LogicalPlan): LogicalPlan = withOrigin(ctx) {
// Create the attributes.
val (attributes, schemaLess) = if (transformClause.colTypeList != null) {
// Typed return columns.
Expand All @@ -664,12 +675,22 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
AttributeReference("value", StringType)()), true)
}

// Create the transform.
val plan = visitCommonSelectQueryClausePlan(
relation,
lateralView,
transformClause.namedExpressionSeq,
whereClause,
aggregationClause,
havingClause,
windowClause,
isDistinct = false)

ScriptTransformation(
expressions,
// TODO: Remove this logic and see SPARK-34035
Seq(UnresolvedStar(None)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, on second thought, we cannot remove this param input from ScriptTransformation in this PR? Since the input exprs of the current ScriptTransformation implementaiton always coms from child's output, IIUC we don't need this param anymore?

Copy link
Contributor Author

@AngersZhuuuu AngersZhuuuu Jan 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, on second thought, we cannot remove this param input from ScriptTransformation in this PR? Since the input exprs of the current ScriptTransformation implementaiton always coms from child's output, IIUC we don't need this param anymore?

It looks like this. We can replace input with child.output directly. That's a really nice suggestion since current way(converting LogicalPlan it looks a little weird). If we remove this input parameter and add correct comment.
The whole process looks more natural. I have tried it in local, a new big diff. How about a new ticket about refactor this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. Just removing the param causes a big diff? I'd like to remove the current the weird Analyzer code to handle the unresolved star in this PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. Just removing the param causes a big diff? I'd like to remove the current the weird Analyzer code to handle the unresolved star in this PR though.

Yea, will change a lot of file. Compared to the origin code here(the current the weird Analyzer code) ,current change seem s not so weird, but for the whole process, really weird.

Create a ticket for this https://issues.apache.org/jira/browse/SPARK-34035
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, will change a lot of file.

How many changed lines of codes there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, will change a lot of file.

How many changed lines of codes there?

Nearly 100 lines, most of changes are about UT

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I don't have a strong opinion on it, so please follow other reviewer's comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I don't have a strong opinion on it, so please follow other reviewer's comment.

Yea

string(transformClause.script),
attributes,
withFilter,
plan,
withScriptIOSchema(
ctx,
transformClause.inRowFormat,
Expand Down Expand Up @@ -697,13 +718,40 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
havingClause: HavingClauseContext,
windowClause: WindowClauseContext,
relation: LogicalPlan): LogicalPlan = withOrigin(ctx) {
val isDistinct = selectClause.setQuantifier() != null &&
selectClause.setQuantifier().DISTINCT() != null

val plan = visitCommonSelectQueryClausePlan(
relation,
lateralView,
selectClause.namedExpressionSeq,
whereClause,
aggregationClause,
havingClause,
windowClause,
isDistinct)

// Hint
selectClause.hints.asScala.foldRight(plan)(withHints)
}

def visitCommonSelectQueryClausePlan(
relation: LogicalPlan,
lateralView: java.util.List[LateralViewContext],
namedExpressionSeq: NamedExpressionSeqContext,
whereClause: WhereClauseContext,
aggregationClause: AggregationClauseContext,
havingClause: HavingClauseContext,
windowClause: WindowClauseContext,
isDistinct: Boolean): LogicalPlan = {
// Add lateral views.
val withLateralView = lateralView.asScala.foldLeft(relation)(withGenerate)

// Add where.
val withFilter = withLateralView.optionalMap(whereClause)(withWhereClause)

val expressions = visitNamedExpressionSeq(selectClause.namedExpressionSeq)
val expressions = visitNamedExpressionSeq(namedExpressionSeq)

// Add aggregation or a project.
val namedExpressions = expressions.map {
case e: NamedExpression => e
Expand Down Expand Up @@ -737,9 +785,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

// Distinct
val withDistinct = if (
selectClause.setQuantifier() != null &&
selectClause.setQuantifier().DISTINCT() != null) {
val withDistinct = if (isDistinct) {
Distinct(withProject)
} else {
withProject
Expand All @@ -748,8 +794,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
// Window
val withWindow = withDistinct.optionalMap(windowClause)(withWindowClause)

// Hint
selectClause.hints.asScala.foldRight(withWindow)(withHints)
withWindow
}

// Script Transform's input/output format.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.parser

import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedGenerator, UnresolvedInlineTable, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTableValuedFunction}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedGenerator, UnresolvedInlineTable, UnresolvedRelation, UnresolvedStar, UnresolvedSubqueryColumnAliases, UnresolvedTableValuedFunction}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -1061,11 +1061,11 @@ class PlanParserSuite extends AnalysisTest {
|FROM testData
""".stripMargin,
ScriptTransformation(
Seq('a, 'b, 'c),
Seq(UnresolvedStar(None)),
"cat",
Seq(AttributeReference("key", StringType)(),
AttributeReference("value", StringType)()),
UnresolvedRelation(TableIdentifier("testData")),
Project(Seq('a, 'b, 'c), UnresolvedRelation(TableIdentifier("testData"))),
ScriptInputOutputSchema(List.empty, List.empty, None, None,
List.empty, List.empty, None, None, true))
)
Expand All @@ -1078,12 +1078,12 @@ class PlanParserSuite extends AnalysisTest {
|FROM testData
""".stripMargin,
ScriptTransformation(
Seq('a, 'b, 'c),
Seq(UnresolvedStar(None)),
"cat",
Seq(AttributeReference("a", StringType)(),
AttributeReference("b", StringType)(),
AttributeReference("c", StringType)()),
UnresolvedRelation(TableIdentifier("testData")),
Project(Seq('a, 'b, 'c), UnresolvedRelation(TableIdentifier("testData"))),
ScriptInputOutputSchema(List.empty, List.empty, None, None,
List.empty, List.empty, None, None, false)))

Expand All @@ -1095,12 +1095,12 @@ class PlanParserSuite extends AnalysisTest {
|FROM testData
""".stripMargin,
ScriptTransformation(
Seq('a, 'b, 'c),
Seq(UnresolvedStar(None)),
"cat",
Seq(AttributeReference("a", IntegerType)(),
AttributeReference("b", StringType)(),
AttributeReference("c", LongType)()),
UnresolvedRelation(TableIdentifier("testData")),
Project(Seq('a, 'b, 'c), UnresolvedRelation(TableIdentifier("testData"))),
ScriptInputOutputSchema(List.empty, List.empty, None, None,
List.empty, List.empty, None, None, false)))

Expand All @@ -1124,12 +1124,12 @@ class PlanParserSuite extends AnalysisTest {
|FROM testData
""".stripMargin,
ScriptTransformation(
Seq('a, 'b, 'c),
Seq(UnresolvedStar(None)),
"cat",
Seq(AttributeReference("a", StringType)(),
AttributeReference("b", StringType)(),
AttributeReference("c", StringType)()),
UnresolvedRelation(TableIdentifier("testData")),
Project(Seq('a, 'b, 'c), UnresolvedRelation(TableIdentifier("testData"))),
ScriptInputOutputSchema(
Seq(("TOK_TABLEROWFORMATFIELD", "\t"),
("TOK_TABLEROWFORMATCOLLITEMS", "\u0002"),
Expand Down
132 changes: 132 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/transform.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ CREATE OR REPLACE TEMPORARY VIEW t AS SELECT * FROM VALUES
('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03'))
AS t(a, b, c, d, e, f, g, h, i, j, k, l);

CREATE OR REPLACE TEMPORARY VIEW script_trans AS SELECT * FROM VALUES
(1, 2, 3),
(4, 5, 6),
(7, 8, 9)
AS script_trans(a, b, c);

SELECT TRANSFORM(a)
USING 'cat' AS (a)
FROM t;
Expand Down Expand Up @@ -184,6 +190,132 @@ SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM (
FROM t
) tmp;

SELECT TRANSFORM(b, a, CAST(c AS STRING))
USING 'cat' AS (a, b, c)
FROM script_trans
WHERE a <= 4;

SELECT TRANSFORM(1, 2, 3)
USING 'cat' AS (a, b, c)
FROM script_trans
WHERE a <= 4;

SELECT TRANSFORM(1, 2)
USING 'cat' AS (a INT, b INT)
FROM script_trans
LIMIT 1;

SELECT TRANSFORM(
b AS d5, a,
CASE
WHEN c > 100 THEN 1
WHEN c < 100 THEN 2
ELSE 3 END)
USING 'cat' AS (a, b, c)
FROM script_trans
WHERE a <= 4;

SELECT TRANSFORM(b, a, c + 1)
USING 'cat' AS (a, b, c)
FROM script_trans
WHERE a <= 4;

SELECT TRANSFORM(*)
USING 'cat' AS (a, b, c)
FROM script_trans
WHERE a <= 4;
Comment on lines +193 to +226
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These queries above are not supported in the current master?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These queries above are not supported in the current master?

Support, what's wrong? It have a correct answer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to know why these tests are added in this PR... That's because these tests seems to be not related to aggregation/window/lateralView.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to know why these tests are added in this PR... That's because these tests seems to be not related to aggregation/window/lateralView.

Yea, here want to show after this pr's change, each kind of expressions can work well, such as alias, case when, binary compute etc...


SELECT TRANSFORM(b AS d, MAX(a) as max_a, CAST(SUM(c) AS STRING))
USING 'cat' AS (a, b, c)
FROM script_trans
WHERE a <= 4
GROUP BY b;

SELECT TRANSFORM(b AS d, MAX(a) FILTER (WHERE a > 3) AS max_a, CAST(SUM(c) AS STRING))
USING 'cat' AS (a,b,c)
FROM script_trans
WHERE a <= 4
GROUP BY b;

SELECT TRANSFORM(b, MAX(a) as max_a, CAST(sum(c) AS STRING))
USING 'cat' AS (a, b, c)
FROM script_trans
WHERE a <= 2
GROUP BY b;

SELECT TRANSFORM(b, MAX(a) as max_a, CAST(SUM(c) AS STRING))
USING 'cat' AS (a, b, c)
FROM script_trans
WHERE a <= 4
GROUP BY b
HAVING max_a > 0;

SELECT TRANSFORM(b, MAX(a) as max_a, CAST(SUM(c) AS STRING))
USING 'cat' AS (a, b, c)
FROM script_trans
WHERE a <= 4
GROUP BY b
HAVING max(a) > 1;

SELECT TRANSFORM(b, MAX(a) OVER w as max_a, CAST(SUM(c) OVER w AS STRING))
USING 'cat' AS (a, b, c)
FROM script_trans
WHERE a <= 4
WINDOW w AS (PARTITION BY b ORDER BY a);

SELECT TRANSFORM(b, MAX(a) as max_a, CAST(SUM(c) AS STRING), myCol, myCol2)
USING 'cat' AS (a, b, c, d, e)
FROM script_trans
LATERAL VIEW explode(array(array(1,2,3))) myTable AS myCol
LATERAL VIEW explode(myTable.myCol) myTable2 AS myCol2
WHERE a <= 4
GROUP BY b, myCol, myCol2
HAVING max(a) > 1;

FROM(
FROM script_trans
SELECT TRANSFORM(a, b)
USING 'cat' AS (`a` INT, b STRING)
) t
SELECT a + 1;

FROM(
SELECT TRANSFORM(a, SUM(b) b)
USING 'cat' AS (`a` INT, b STRING)
FROM script_trans
GROUP BY a
) t
SELECT (b + 1) AS result
ORDER BY result;

MAP k / 10 USING 'cat' AS (one) FROM (SELECT 10 AS k);

FROM (SELECT 1 AS key, 100 AS value) src
MAP src.*, src.key, CAST(src.key / 10 AS INT), CAST(src.key % 10 AS INT), src.value
USING 'cat' AS (k, v, tkey, ten, one, tvalue);

SELECT TRANSFORM(1)
USING 'cat' AS (a)
FROM script_trans
HAVING true;

SET spark.sql.legacy.parser.havingWithoutGroupByAsWhere=true;

SELECT TRANSFORM(1)
USING 'cat' AS (a)
FROM script_trans
HAVING true;

SET spark.sql.legacy.parser.havingWithoutGroupByAsWhere=false;

SET spark.sql.parser.quotedRegexColumnNames=true;

SELECT TRANSFORM(`(a|b)?+.+`)
USING 'cat' AS (c)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2 spaces indentation

FROM script_trans;

SET spark.sql.parser.quotedRegexColumnNames=false;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test something like TRANSFORM(distinct a, b) and check the error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will raise a follow up soon

-- SPARK-34634: self join using CTE contains transform
WITH temp AS (
SELECT TRANSFORM(a) USING 'cat' AS (b string) FROM t
Expand Down
Loading