-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24050][SS] Calculate input / processing rates correctly for DataSourceV2 streaming sources #21126
Conversation
Test build #89702 has finished for PR 21126 at commit
|
jenkins retest this please. |
Test build #89707 has finished for PR 21126 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the behavior if there's a self join? Will we double count the numInputRows?
// Check whether the streaming query's logical plan has only V2 data sources | ||
val allStreamingLeaves = | ||
logicalPlan.collect { case s: StreamingExecutionRelation => s } | ||
allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't have a way to track these for ContinuousProcessing at the moment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe i can make it work for continuous as well with a small tweak
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A point fix here won't be sufficient - right now the row count metrics don't make it to the driver at all in continuous processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. This code path is not used by continuous processing.
) | ||
|
||
val streamInput2 = MemoryStream[Int] | ||
val staticInputDF2 = staticInputDF.union(staticInputDF).cache() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unpersist later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really doesnt matter as the testsuite will shutdown the sparkcontext anyways.
q.recentProgress.filter(_.numInputRows > 0).lastOption | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra line
val streamInput2 = MemoryStream[Int] | ||
val staticInputDF2 = staticInputDF.union(staticInputDF).cache() | ||
|
||
testStream(streamInput2.toDF().join(staticInputDF2, "value"), useV2Sink = true)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if you do a stream-stream join?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g. self-join?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then there will be two DataSourceScanV2Execs reading from the same location. So we will be reading data twice, and the counts will reflect that. But yes, I should add a test for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns one things were broken for self-joins and self-union. updated the logic and added tests for those for v2 sources.
assert(lastProgress.get.sources(0).numInputRows == 3) | ||
assert(lastProgress.get.sources(1).numInputRows == 0) | ||
true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i'd suggest doing an AddData() for the other stream after, to make sure there's not some weird order dependence
Test build #89766 has finished for PR 21126 at commit
|
Test build #89777 has finished for PR 21126 at commit
|
"min" -> stats.min, | ||
"avg" -> stats.avg.toLong).mapValues(formatTimestamp) | ||
}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This above code stayed the same. The diff is pretty dumb.
lgtm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for all the tests. I'm trusting the unit tests. I don't see a better way of figuring out unique sources.
s" of the execution plan:\n" + | ||
s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" + | ||
s"execution plan leaves: ${toString(allExecPlanLeaves)}\n") | ||
s" of the execution plan:\n" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
existing nit: maybe we should've just used
"""
|
|
"""
Test build #4159 has finished for PR 21126 at commit
|
Test build #89810 has started for PR 21126 at commit |
jenkins retest this please |
Test build #89811 has finished for PR 21126 at commit
|
jenkins retest this please |
Test build #89825 has finished for PR 21126 at commit
|
jenkins retest this please |
Test build #89834 has finished for PR 21126 at commit
|
What changes were proposed in this pull request?
In some streaming queries, the input and processing rates are not calculated at all (shows up as zero) because MicroBatchExecution fails to associated metrics from the executed plan of a trigger with the sources in the logical plan of the trigger. The way this executed-plan-leaf-to-logical-source attribution works is as follows. With V1 sources, there was no way to identify which execution plan leaves were generated by a streaming source. So did a best-effort attempt to match logical and execution plan leaves when the number of leaves were same. In cases where the number of leaves is different, we just give up and report zero rates. An example where this may happen is as follows.
In this case, the
cachedStaticDF
has multiple logical leaves, but in the trigger's execution plan it only has leaf because a cached subplan is represented as a single InMemoryTableScanExec leaf. This leads to a mismatch in the number of leaves causing the input rates to be computed as zero.With DataSourceV2, all inputs are represented in the executed plan using
DataSourceV2ScanExec
, each of which has a reference to the associated logicalDataSource
andDataSourceReader
. So its easy to associate the metrics to the original streaming sources.In this PR, the solution is as follows. If all the streaming sources in a streaming query as v2 sources, then use a new code path where the execution-metrics-to-source mapping is done directly. Otherwise we fall back to existing mapping logic.
How was this patch tested?