-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-15517][SQL][STREAMING] Add support for complete output mode in Structure Streaming #13286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| if (moreStreamingAggregates.nonEmpty) { | ||
| throwError("Multiple streaming aggregations are not supported with " + | ||
| "streaming DataFrames/Datasets") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been moved around to better consolidate all the logic related to output modes and aggregations.
|
Test build #59232 has finished for PR 13286 at commit
|
| .agg(count("*")) | ||
| .as[(Int, Long)] | ||
|
|
||
| intercept[AnalysisException] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add checks on message
|
Test build #59235 has finished for PR 13286 at commit
|
| if (latestBatchId.isEmpty || batchId > latestBatchId.get) { | ||
| logDebug(s"Committing batch $batchId to $this") | ||
| outputMode match { | ||
| case OutputMode.Append | OutputMode.Update => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Since we don't support OutputMode.Update, could you remove it? I think it will have a different logic even if we add it in future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering whether we should support update mode for memory sink.
| query = | ||
| df.write | ||
| .format("memory") | ||
| .option("checkpointLocation", "memory") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "memory" -> metadataRoot
|
Finished my first round. Looks pretty good. Just some nits. |
|
|
||
| case object Append extends OutputMode | ||
| case object Update extends OutputMode | ||
| public enum OutputMode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: @Experimental
|
Test build #59236 has finished for PR 13286 at commit
|
|
Test build #59239 has finished for PR 13286 at commit
|
|
Test build #59253 has finished for PR 13286 at commit
|
| * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to | ||
| the sink | ||
| * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink | ||
| every time these is some updates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each time the trigger fires?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to write something that makes sense generally, without understanding trigger and all. As is, since the trigger is optional, one does not need to know about triggers at all to start running stuff in structured streaming.
|
LGTM with a few comments. |
|
Test build #59541 has finished for PR 13286 at commit
|
|
Test build #59542 has finished for PR 13286 at commit
|
|
LGTM |
… Structure Streaming ## What changes were proposed in this pull request? Currently structured streaming only supports append output mode. This PR adds the following. - Added support for Complete output mode in the internal state store, analyzer and planner. - Added public API in Scala and Python for users to specify output mode - Added checks for unsupported combinations of output mode and DF operations - Plans with no aggregation should support only Append mode - Plans with aggregation should support only Update and Complete modes - Default output mode is Append mode (**Question: should we change this to automatically set to Complete mode when there is aggregation?**) - Added support for Complete output mode in Memory Sink. So Memory Sink internally supports append and complete, update. But from public API only Complete and Append output modes are supported. ## How was this patch tested? Unit tests in various test suites - StreamingAggregationSuite: tests for complete mode - MemorySinkSuite: tests for checking behavior in Append and Complete modes. - UnsupportedOperationSuite: tests for checking unsupported combinations of DF ops and output modes - DataFrameReaderWriterSuite: tests for checking that output mode cannot be called on static DFs - Python doc test and existing unit tests modified to call write.outputMode. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13286 from tdas/complete-mode. (cherry picked from commit 90b1143) Signed-off-by: Michael Armbrust <michael@databricks.com>
|
@tdas @marmbrus this is failing I've created a PR to fix this #13464 |
|
The method naming with caps was intentional. We need to introduce exception
|
|
@tdas updated my PR with exclusions for Append and Complete |
| * | ||
| * @since 2.0.0 | ||
| */ | ||
| public static OutputMode Append() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #13464 -- this fails Java lint. Can this be append() as would be conventional in Java? I don't see that it's there to implement some interface
## What changes were proposed in this pull request? revived #13464 Fix Java Lint errors introduced by #13286 and #13280 Before: ``` Using `mvn` from path: /Users/pichu/Project/spark/build/apache-maven-3.3.9/bin/mvn Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512M; support was removed in 8.0 Checkstyle checks failed at following occurrences: [ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[340,5] (whitespace) FileTabCharacter: Line contains a tab character. [ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[341,5] (whitespace) FileTabCharacter: Line contains a tab character. [ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[342,5] (whitespace) FileTabCharacter: Line contains a tab character. [ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[343,5] (whitespace) FileTabCharacter: Line contains a tab character. [ERROR] src/main/java/org/apache/spark/sql/streaming/OutputMode.java:[41,28] (naming) MethodName: Method name 'Append' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'. [ERROR] src/main/java/org/apache/spark/sql/streaming/OutputMode.java:[52,28] (naming) MethodName: Method name 'Complete' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'. [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[61,8] (imports) UnusedImports: Unused import - org.apache.parquet.schema.PrimitiveType. [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[62,8] (imports) UnusedImports: Unused import - org.apache.parquet.schema.Type. ``` ## How was this patch tested? ran `dev/lint-java` locally Author: Sandeep Singh <sandeep@techaddict.me> Closes #13559 from techaddict/minor-3.
## What changes were proposed in this pull request? revived #13464 Fix Java Lint errors introduced by #13286 and #13280 Before: ``` Using `mvn` from path: /Users/pichu/Project/spark/build/apache-maven-3.3.9/bin/mvn Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512M; support was removed in 8.0 Checkstyle checks failed at following occurrences: [ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[340,5] (whitespace) FileTabCharacter: Line contains a tab character. [ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[341,5] (whitespace) FileTabCharacter: Line contains a tab character. [ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[342,5] (whitespace) FileTabCharacter: Line contains a tab character. [ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[343,5] (whitespace) FileTabCharacter: Line contains a tab character. [ERROR] src/main/java/org/apache/spark/sql/streaming/OutputMode.java:[41,28] (naming) MethodName: Method name 'Append' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'. [ERROR] src/main/java/org/apache/spark/sql/streaming/OutputMode.java:[52,28] (naming) MethodName: Method name 'Complete' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'. [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[61,8] (imports) UnusedImports: Unused import - org.apache.parquet.schema.PrimitiveType. [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[62,8] (imports) UnusedImports: Unused import - org.apache.parquet.schema.Type. ``` ## How was this patch tested? ran `dev/lint-java` locally Author: Sandeep Singh <sandeep@techaddict.me> Closes #13559 from techaddict/minor-3. (cherry picked from commit f958c1c) Signed-off-by: Sean Owen <sowen@cloudera.com>
## What changes were proposed in this pull request? revived apache#13464 Fix Java Lint errors introduced by apache#13286 and apache#13280 Before: ``` Using `mvn` from path: /Users/pichu/Project/spark/build/apache-maven-3.3.9/bin/mvn Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512M; support was removed in 8.0 Checkstyle checks failed at following occurrences: [ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[340,5] (whitespace) FileTabCharacter: Line contains a tab character. [ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[341,5] (whitespace) FileTabCharacter: Line contains a tab character. [ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[342,5] (whitespace) FileTabCharacter: Line contains a tab character. [ERROR] src/main/java/org/apache/spark/launcher/LauncherServer.java:[343,5] (whitespace) FileTabCharacter: Line contains a tab character. [ERROR] src/main/java/org/apache/spark/sql/streaming/OutputMode.java:[41,28] (naming) MethodName: Method name 'Append' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'. [ERROR] src/main/java/org/apache/spark/sql/streaming/OutputMode.java:[52,28] (naming) MethodName: Method name 'Complete' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'. [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[61,8] (imports) UnusedImports: Unused import - org.apache.parquet.schema.PrimitiveType. [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[62,8] (imports) UnusedImports: Unused import - org.apache.parquet.schema.Type. ``` ## How was this patch tested? ran `dev/lint-java` locally Author: Sandeep Singh <sandeep@techaddict.me> Closes apache#13559 from techaddict/minor-3.
What changes were proposed in this pull request?
Currently structured streaming only supports append output mode. This PR adds the following.
How was this patch tested?
Unit tests in various test suites