-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19497][SS]Implement streaming deduplication #16970
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #73028 has finished for PR 16970 at commit
|
| outputMode = Update, | ||
| expectedMsgs = Seq("multiple streaming aggregations")) | ||
|
|
||
| assertSupportedInStreamingPlan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some missing tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great!!
| val key = getKey(row) | ||
| val value = store.get(key) | ||
| if (value.isEmpty) { | ||
| store.put(key.copy(), row.copy()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know how to create an empty UnsafeRow. Right now the value is not necessary but doubles the size of state store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you store a null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. StateStore assumes value is not null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naah. the HDFSBasedStateStore cant handle nulls. How about using UnsafeRow.createFromByteArray(0, 0). We can reused this immutable object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not work :(
java.lang.AssertionError: index (0) should < 0
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.assertIndexIsValid(UnsafeRow.java:133)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.isNullAt(UnsafeRow.java:352)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:113)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this.
val row = InternalRow.apply(null)
val unsafeRow = UnsafeProjection.create(Array[DataType](NullType)).apply(row)
This is valid generated unsafe row, that can be generated once, and reused.
Found this in the UnsafeRowSuite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool! Updated.
brkyvz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very excited about this. Left some comments about some extra things that we can support
| throwError("Queries with streaming sources must be executed with writeStream.start()")(p) | ||
|
|
||
| case p: Deduplication => | ||
| throwError("Batch queries should not use Deduplication")(p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because of this, I would prefer the naming to imply that as well. Maybe rename Deduplication to StreamingDeduplication or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is deduplication exclusive to streaming? Even if we don't want to implement a batch operator, I'd still allow it in the logical plan and just translate it to normal aggregation.
| "streaming DataFrames/Datasets")(plan) | ||
| } | ||
|
|
||
| // Disallow multiple streaming deduplications |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should support these. Example use case:
I dedup on some higher level columns to gain exactly once semantics (infrastructure/application specific). Then I do data transformations, then I do a dedup on some more specific data, e.g. region (query specific)
| throwErrorIf( | ||
| outputMode == InternalOutputModes.Complete | ||
| && collectStreamingDeduplications(subPlan).nonEmpty, | ||
| "Aggregation on dropDuplicates DataFrame/Dataset in Complete output mode " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not?
| } | ||
|
|
||
| /** Streaming dropDuplicates */ | ||
| case class Deduplication( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto: IMHO Name should reflect that it is streaming
| val inputData = MemoryStream[(String, Int)] | ||
| val result = inputData.toDS().dropDuplicates("_1") | ||
|
|
||
| testStream(result, Append)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know the semantics are the same for Append and Update but just so that no one breaks it in the future, should we wrap these tests with:
Seq(Append, Update).foreach { mode =>
|
@brkyvz looks like you were looking at my old changes. I pushed a new commit and updated the PR description to reflect the latest supported queries. |
|
aw man. I should always refresh before starting a review |
|
Test build #73064 has finished for PR 16970 at commit
|
| if (groupColExprIds.contains(attr.exprId)) { | ||
| attr | ||
| } else { | ||
| Alias(new First(attr).toAggregateExpression(), attr.name)() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marmbrus I tried to move this to SparkPlanner but failed because Alias(new First(attr).toAggregateExpression(), attr.name)() needs to be resolved before planning. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could do this construction at planning time if you preserve the attribute ids?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it works.
| .format("memory") | ||
| .queryName("testquery") | ||
| .outputMode("complete") | ||
| .outputMode("append") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a behavior change: the user cannot use dropDuplicates with complete without aggregation now because dropDuplicates is not an aggregation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see. this was allowed earlier, because dropDuplicates was an aggregate. but not any more. I this is not consistent with the fact that we dont allow complete mode in map-like queries.
@marmbrus any thoughts?
|
|
||
| /** A logical plan for `dropDuplicates`. */ | ||
| case class Deduplication( | ||
| keys: Seq[Attribute], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent. can be on same line i think.
| */ | ||
| object Aggregation extends Strategy { | ||
| def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { | ||
| case Deduplication(keys, child) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldnt there be a new strategy? After all dropping duplicates is not conceptually an aggregation. its just so happens that it can be implemented as a aggregation?
| "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows")) | ||
| } | ||
|
|
||
| trait WatermarkSupport extends SparkPlan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs.
|
|
||
| trait WatermarkSupport extends SparkPlan { | ||
|
|
||
| def keyExpressions: Seq[Attribute] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs
| val numTotalStateRows = longMetric("numTotalStateRows") | ||
| val numUpdatedStateRows = longMetric("numUpdatedStateRows") | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra line
|
|
||
| child.execute().mapPartitionsWithStateStore( | ||
| getStateId.checkpointLocation, | ||
| operatorId = getStateId.operatorId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why are these two specified with param names?
|
Test build #73076 has finished for PR 16970 at commit
|
| ) | ||
| } | ||
|
|
||
| private def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods seems to be common across StreamingAggregationSuite, MapGroupsWithStateSuite and this one. Can you make a trait?
| StateStore.stop() | ||
| } | ||
|
|
||
| test("deduplication") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: deduplication with all columns
| ) | ||
| } | ||
|
|
||
| test("deduplication with columns") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: deduplication with some columns
| ) | ||
| } | ||
|
|
||
| test("deduplication with aggregation - update") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: update mode
| val inputData = MemoryStream[(String, Int)] | ||
| val result = inputData.toDS() | ||
| .dropDuplicates() | ||
| .groupBy($"_1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why not name the columns!?
| val inputData = MemoryStream[(String, Int)] | ||
| val result = inputData.toDS() | ||
| .dropDuplicates() | ||
| .groupBy($"_1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same as above.
|
overall looks good. just a bunch of nits. |
| outputMode = Complete, | ||
| expectedMsgs = Seq("(map/flatMap)GroupsWithState")) | ||
|
|
||
| // Deduplication: Not supported after a streaming aggregation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually can you add a test for both, dropdup and mapgroupsWithstate, that tests that these operations is allowed on a batch subplan inside a streaming plan. that is,
assertSupportedInStreamingPlan(
"Deduplication - Deduplication on batch relation",
Deduplication(Seq(att), batchRelation),
outputMode = Append
)
| rewrittenResultExpressions, | ||
| planLater(child)) | ||
|
|
||
| case Deduplication(keys, child) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same thought as below. this is not really aggregation so should be a different strategy.
| val resolver = sparkSession.sessionState.analyzer.resolver | ||
| val allColumns = queryExecution.analyzed.output | ||
| val groupCols = colNames.flatMap { colName => | ||
| val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed an issue that groupCols may contain duplicated columns. It's not necessary because optimizer will remove duplicated columns. However, it's better to make less assumptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this a bug with batch queries as well? and what would the result be without this fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The results will be same. It's just pretty weird that it depends on the optimizer to remove duplicated columns.
|
Test build #73225 has finished for PR 16970 at commit
|
|
Test build #73236 has finished for PR 16970 at commit
|
|
@tdas I created https://issues.apache.org/jira/browse/SPARK-19690 to track the issue when joining a batch DataFrame with a streaming DataFrame. I will fix it in a separate PR to unblock this one as it touches many files. |
|
Test build #73247 has finished for PR 16970 at commit
|
| } | ||
|
|
||
| /** A logical plan for `dropDuplicates`. */ | ||
| case class Deduplication( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most names are like "verbs" - aggregate, project, intersect. I think its best to name this "Deduplicate".
|
|
||
| /** | ||
| * Replaces logical [[Deduplication]] operator with an [[Aggregate]] operator. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReplaceDeduplicateWithAggregate. see comment below.
| outputMode = Append | ||
| ) | ||
|
|
||
| // Deduplication: Not supported after a streaming aggregation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Change this comment to just // Deduplication to reflect the whole subsection
| expectedMsgs = Seq("dropDuplicates")) | ||
|
|
||
| assertSupportedInStreamingPlan( | ||
| "Deduplication - Deduplication on batch relation inside streaming relation", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: inside a streaming query.
sounds weird otherwise.
|
|
||
| AddData(inputData, 40), // Emit items less than watermark and drop their state | ||
| CheckLastBatch((15 -> 1), (25 -> 1)), | ||
| // states in aggregation in [40, 45) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent.
| * @group typedrel | ||
| * @since 2.0.0 | ||
| */ | ||
| def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have to add more documentation for streaming usage! especially you have to document that this will keep all past data as intermediate state, and you can use the withWatermark to limit how late the duplicate data can be and system will accordingly limit the state.
Also, double the docs on withWatermark and make sure its consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you have not updates docs for dropDuplicates! You should at least point to withWatermark to limit the state, and mention its semantics (all data later than watermark will be ignored).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
Test build #73265 has started for PR 16970 at commit |
|
retest this please |
|
Test build #73285 has finished for PR 16970 at commit
|
|
Test build #73297 has finished for PR 16970 at commit
|
## What changes were proposed in this pull request? This PR adds a special streaming deduplication operator to support `dropDuplicates` with `aggregation` and watermark. It reuses the `dropDuplicates` API but creates new logical plan `Deduplication` and new physical plan `DeduplicationExec`. The following cases are supported: - one or multiple `dropDuplicates()` without aggregation (with or without watermark) - `dropDuplicates` before aggregation Not supported cases: - `dropDuplicates` after aggregation Breaking changes: - `dropDuplicates` without aggregation doesn't work with `complete` or `update` mode. ## How was this patch tested? The new unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#16970 from zsxwing/dedup.
|
witout aggregation, how to drop duplication between partitions? |
|
@lw-lin Thanks, got it |
What changes were proposed in this pull request?
This PR adds a special streaming deduplication operator to support
dropDuplicateswithaggregationand watermark. It reuses thedropDuplicatesAPI but creates new logical planDeduplicationand new physical planDeduplicationExec.The following cases are supported:
dropDuplicates()without aggregation (with or without watermark)dropDuplicatesbefore aggregationNot supported cases:
dropDuplicatesafter aggregationBreaking changes:
dropDuplicateswithout aggregation doesn't work withcompleteorupdatemode.How was this patch tested?
The new unit tests.