Skip to content

Commit 5ff6397

Browse files
authored
[Test Only] Don't stream from MV in SQLPipelineSuite (apache#127)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html 2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'. 4. Be sure to keep the PR description updated to reflect all changes. 5. Please write your PR title to summarize what this PR proposes. 6. If possible, provide a concise example to reproduce the issue for a faster review. 7. If you want to add a new configuration, please read the guideline first for naming configurations in 'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'. 8. If you want to add or modify an error type or message, please read the guideline first in 'common/utils/src/main/resources/error/README.md'. --> ### What changes were proposed in this pull request? Calling `STREAM(mv)` to populate a streaming table from a materialized view is incorrect usage and buggy if called multiple times. It will also become explicitly validated against in future PRs. Correct SQLPipelineSuite tests to stream from an external regular table instead. In the future we could even stream from a temporary file instead of an external table in these tests, but today SQL does not support streaming from files. ### Why are the changes needed? <!-- Please clarify why the changes are needed. For instance, 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. --> ### Does this PR introduce _any_ user-facing change? <!-- Note that it means *any* user-facing change including all aspects such as new features, bug fixes, or other behavior changes. Documentation-only updates are not considered user-facing changes. If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master. If no, write 'No'. --> ### How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks. --> ### Was this patch authored or co-authored using generative AI tooling? <!-- If generative AI tooling has been used in the process of authoring this patch, please include the phrase: 'Generated-by: ' followed by the name of the tool and its version. If no, write 'No'. Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details. -->
1 parent a794557 commit 5ff6397

File tree

1 file changed

+55
-26
lines changed

1 file changed

+55
-26
lines changed

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SQLPipelineSuite.scala

Lines changed: 55 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,45 @@
1717
package org.apache.spark.sql.pipelines.graph
1818

1919
import org.apache.spark.sql.{AnalysisException, Row}
20+
import org.apache.spark.sql.catalyst.TableIdentifier
2021
import org.apache.spark.sql.catalyst.parser.ParseException
2122
import org.apache.spark.sql.pipelines.utils.{PipelineTest, TestGraphRegistrationContext}
2223
import org.apache.spark.sql.types.{LongType, StructType}
2324
import org.apache.spark.util.Utils
2425

2526
class SQLPipelineSuite extends PipelineTest {
27+
private val externalTable1Ident = TableIdentifier(
28+
table = "external_t1",
29+
database = Option(TestGraphRegistrationContext.DEFAULT_DATABASE),
30+
catalog = Option(TestGraphRegistrationContext.DEFAULT_CATALOG)
31+
)
32+
private val externalTable2Ident = TableIdentifier(
33+
table = "external_t2",
34+
database = Option(TestGraphRegistrationContext.DEFAULT_DATABASE),
35+
catalog = Option(TestGraphRegistrationContext.DEFAULT_CATALOG)
36+
)
37+
38+
override def beforeEach(): Unit = {
39+
super.beforeEach()
40+
// Create mock external tables that tests can reference, ex. to stream from.
41+
spark.sql(s"CREATE TABLE $externalTable1Ident AS SELECT * FROM RANGE(3)")
42+
spark.sql(s"CREATE TABLE $externalTable2Ident AS SELECT * FROM RANGE(4)")
43+
}
44+
45+
override def afterEach(): Unit = {
46+
spark.sql(s"DROP TABLE IF EXISTS $externalTable1Ident")
47+
spark.sql(s"DROP TABLE IF EXISTS $externalTable2Ident")
48+
super.afterEach()
49+
}
50+
2651
test("Simple register SQL dataset test") {
2752
val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
28-
sqlText = """
53+
sqlText = s"""
2954
|CREATE MATERIALIZED VIEW mv AS SELECT 1;
30-
|CREATE STREAMING TABLE st AS SELECT * FROM STREAM mv;
55+
|CREATE STREAMING TABLE st AS SELECT * FROM STREAM $externalTable1Ident;
3156
|CREATE VIEW v AS SELECT * FROM mv;
32-
|CREATE FLOW f AS INSERT INTO st BY NAME SELECT * FROM v;
57+
|CREATE FLOW f AS INSERT INTO st BY NAME
58+
|SELECT * FROM STREAM $externalTable2Ident;
3359
|""".stripMargin
3460
)
3561
val resolvedDataflowGraph = unresolvedDataflowGraph.resolve()
@@ -49,7 +75,9 @@ class SQLPipelineSuite extends PipelineTest {
4975
resolvedDataflowGraph.resolvedFlows
5076
.filter(_.identifier == fullyQualifiedIdentifier("st"))
5177
.head
52-
assert(stFlow.inputs == Set(fullyQualifiedIdentifier("mv")))
78+
// The streaming table has 1 external input, and no internal (defined within pipeline) inputs
79+
assert(stFlow.funcResult.usedExternalInputs == Set(externalTable1Ident.quotedString))
80+
assert(stFlow.inputs.isEmpty)
5381
assert(stFlow.destinationIdentifier == fullyQualifiedIdentifier("st"))
5482

5583
val viewFlow =
@@ -61,7 +89,8 @@ class SQLPipelineSuite extends PipelineTest {
6189

6290
val namedFlow =
6391
resolvedDataflowGraph.resolvedFlows.filter(_.identifier == fullyQualifiedIdentifier("f")).head
64-
assert(namedFlow.inputs == Set(fullyQualifiedIdentifier("v")))
92+
assert(namedFlow.funcResult.usedExternalInputs == Set(externalTable2Ident.quotedString))
93+
assert(namedFlow.inputs.isEmpty)
6594
assert(namedFlow.destinationIdentifier == fullyQualifiedIdentifier("st"))
6695
}
6796

@@ -117,7 +146,7 @@ class SQLPipelineSuite extends PipelineTest {
117146
val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
118147
sqlText = """
119148
|CREATE MATERIALIZED VIEW `hyphen-mv` AS SELECT * FROM range(1, 4);
120-
|CREATE STREAMING TABLE `hyphen-st` AS SELECT * FROM STREAM(`hyphen-mv`)
149+
|CREATE MATERIALIZED VIEW `other-hyphen-mv` AS SELECT * FROM `hyphen-mv`
121150
|""".stripMargin
122151
)
123152

@@ -130,7 +159,8 @@ class SQLPipelineSuite extends PipelineTest {
130159

131160
assert(
132161
resolvedDataflowGraph.resolvedFlows
133-
.exists(f => f.identifier == fullyQualifiedIdentifier("hyphen-st") && f.df.isStreaming)
162+
.exists(f =>
163+
f.identifier == fullyQualifiedIdentifier("other-hyphen-mv") && !f.df.isStreaming)
134164
)
135165
}
136166

@@ -179,20 +209,19 @@ class SQLPipelineSuite extends PipelineTest {
179209

180210
test("Pipeline datasets can have dependency on streaming table") {
181211
val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
182-
sqlText = """
183-
|CREATE MATERIALIZED VIEW a AS SELECT * FROM RANGE(5);
184-
|CREATE STREAMING TABLE b AS SELECT * FROM STREAM(a);
185-
|CREATE MATERIALIZED VIEW c AS SELECT * FROM b;
212+
sqlText = s"""
213+
|CREATE STREAMING TABLE a AS SELECT * FROM STREAM($externalTable1Ident);
214+
|CREATE MATERIALIZED VIEW b AS SELECT * FROM a;
186215
|""".stripMargin
187216
)
188217

189218
startPipelineAndWaitForCompletion(unresolvedDataflowGraph)
190219

191220
assert(
192221
spark
193-
.sql(s"SELECT * FROM ${fullyQualifiedIdentifier("c").quotedString}")
222+
.sql(s"SELECT * FROM ${fullyQualifiedIdentifier("b").quotedString}")
194223
.collect()
195-
.toSet == Set(0, 1, 2, 3, 4).map(Row(_))
224+
.toSet == Set(0, 1, 2).map(Row(_))
196225
)
197226
}
198227

@@ -577,9 +606,10 @@ class SQLPipelineSuite extends PipelineTest {
577606

578607
val ex = intercept[AnalysisException] {
579608
sqlGraphRegistrationContext.processSqlFile(
580-
sqlText = """
581-
|CREATE MATERIALIZED VIEW mv AS SELECT 1;
582-
|CREATE FLOW some_database.f AS INSERT INTO mv BY NAME SELECT 2;
609+
sqlText = s"""
610+
|CREATE STREAMING TABLE st;
611+
|CREATE FLOW some_database.f AS INSERT INTO st BY NAME
612+
|SELECT * FROM STREAM $externalTable1Ident;
583613
|""".stripMargin,
584614
sqlFilePath = "a.sql",
585615
spark = spark
@@ -653,19 +683,19 @@ class SQLPipelineSuite extends PipelineTest {
653683
|-- catalog and database, regardless of what the active catalog/database are.
654684
|CREATE TEMPORARY VIEW tv AS SELECT * FROM $pipelineCatalog.$pipelineDatabase.mv;
655685
|
656-
|CREATE STREAMING TABLE st AS
686+
|CREATE MATERIALIZED VIEW mv4 AS
657687
|WITH mv2 AS (SELECT * FROM $pipelineCatalog.$otherDatabase2.mv2)
658688
|SELECT * FROM STREAM(mv2) WHERE mv2.id % 2 == 0;
659689
|
660690
|-- Use namespace command should also work, setting both catalog and database.
661691
|USE NAMESPACE $pipelineCatalog.$otherDatabase2;
662692
|-- mv2 was originally created in this same namespace, so implicit qualification
663693
|-- should work.
664-
|CREATE MATERIALIZED VIEW mv4 AS SELECT * FROM mv2;
694+
|CREATE MATERIALIZED VIEW mv5 AS SELECT * FROM mv2;
665695
|
666696
|-- Temp views, which don't support name qualification, should always resolve to
667697
|-- pipeline catalog and database despite the active catalog/database
668-
|CREATE MATERIALIZED VIEW mv5 AS SELECT * FROM tv;
698+
|CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM tv;
669699
|""".stripMargin,
670700
sqlFilePath = "file1.sql"
671701
),
@@ -677,7 +707,7 @@ class SQLPipelineSuite extends PipelineTest {
677707
|--
678708
|-- Should also be able to read dataset created in other file with custom catalog
679709
|-- and database.
680-
|CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM $pipelineCatalog.$otherDatabase2.mv4;
710+
|CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM $pipelineCatalog.$otherDatabase2.mv5;
681711
|""".stripMargin,
682712
sqlFilePath = "file2.sql"
683713
)
@@ -689,13 +719,13 @@ class SQLPipelineSuite extends PipelineTest {
689719
assert(spark.sql(s"SELECT * FROM $pipelineCatalog.$otherDatabase2.mv3")
690720
.collect().toSet == Set(1, 2).map(Row(_)))
691721

692-
assert(spark.sql(s"SELECT * FROM $otherCatalog.$otherDatabase.st")
722+
assert(spark.sql(s"SELECT * FROM $otherCatalog.$otherDatabase.mv4")
693723
.collect().toSet == Set(2, 4).map(Row(_)))
694724

695-
assert(spark.sql(s"SELECT * FROM $otherDatabase2.mv4")
725+
assert(spark.sql(s"SELECT * FROM $otherDatabase2.mv5")
696726
.collect().toSet == Set(1, 2, 3, 4).map(Row(_)))
697727

698-
assert(spark.sql(s"SELECT * FROM $otherDatabase2.mv5")
728+
assert(spark.sql(s"SELECT * FROM $otherDatabase2.mv6")
699729
.collect().toSet == Set(0, 1, 2).map(Row(_)))
700730

701731
assert(spark.sql(s"SELECT * FROM $pipelineCatalog.$pipelineDatabase.mv6")
@@ -751,9 +781,9 @@ class SQLPipelineSuite extends PipelineTest {
751781

752782
test("Creating streaming table without subquery works if streaming table is backed by flows") {
753783
val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
754-
sqlText = """
784+
sqlText = s"""
755785
|CREATE STREAMING TABLE st;
756-
|CREATE FLOW f AS INSERT INTO st BY NAME SELECT * FROM RANGE(3);
786+
|CREATE FLOW f AS INSERT INTO st BY NAME SELECT * FROM STREAM $externalTable1Ident;
757787
|""".stripMargin
758788
)
759789

@@ -781,5 +811,4 @@ class SQLPipelineSuite extends PipelineTest {
781811
parameters = Map("identifier" -> fullyQualifiedIdentifier("st").quotedString)
782812
)
783813
}
784-
// TODO: add persisted view test once implemented
785814
}

0 commit comments

Comments
 (0)