-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27340][SS] Alias on TimeWindow expression may cause watermark metadata lost #24457
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ne explicitMetadata, so the metadata of Alias object will directly got from its child Change-Id: Ia2246b05688461ad907f1e16c96e7282f655d5a6
Change-Id: Ie04be37fd6a4859b3b0da0c7c07d54558cb23758
| case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata)) | ||
| case other => Alias(other, alias)() | ||
| } | ||
| Alias(expr, alias)() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function introduced in #11908, the change here will also influence def alias(alias: String): Column and def as(alias: String): Column, do you check all the test cases related?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have successfully run all relevant UT test.
(I‘m sorry that we have a long holiday recently, so the reply is late.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related tests maybe not enough, as your change, as function has a different behavior, metadata no longer pass to explicitMetadata. My suggestion:
- For safety, run all test, not only the related.
- Do the fix just for SS scenario.
|
ok to test |
|
Test build #108285 has finished for PR 24457 at commit
|
|
We're closing this PR because it hasn't been updated in a while. If you'd like to revive this PR, please reopen it! |
| .select(window($"eventTime", "5 seconds") as 'aliasWindow) | ||
|
|
||
| assert(aliasWindow.logicalPlan.output.exists( | ||
| _.metadata.contains(EventTimeWatermark.delayKey))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this test case seems to fail on the master branch (as of today), the issue seems to exist still.
|
While preparing at 2.4.5 release, I just noticed that this was closed recently and we might need to fix the underlying issue. The test case failed in both If watermarks are ignored, the internal state grows indefinitely. How do you think about the reported issue, @tdas , @zsxwing , @cloud-fan , @HeartSaVioR , @gatorsmile? |
|
Looking at the test code, the issue seems to be valid and PR fixes the issue correctly. But I'm not sure about the side effect, as @xuanyuanking commented. Btw I think this has been known issue and underlying issue may not just be missing copying metadata. I'm not sure Spark can ensure metadata is propagated correctly during any multiple transformations, including typed -> untyped, and vice versa. It doesn't seem to be a thing we can rely on. I think the root issue is that the event time column and value are open to modify. Other streaming frameworks provide the way to specify the event time per row, and the value is treated as special column which cannot be modified (both column and value) during transformation. I've had a long discussion with @echauchot (working with Spark runner in Beam) regarding this. Please follow the link : #23576 (comment) |
|
Oh bot closed the PR again... Looks like we should also remove |
|
Thank you for feedback, @HeartSaVioR . I also agree with you and was surprised with this. With watermark bugs, Apache Spark structured streaming is not usable at all in case of the state operations. For auto-close, @nchammas , what is the correct reopening process? |
|
Can one of the admins verify this patch? |
|
I think removing the Stale tag should do the trick. If not, I can investigate. We should perhaps update the close message to direct people accordingly. Can PR authors remove the tag themselves, by the way, or does that require a committer? |
|
@dongjoon-hyun @HeartSaVioR @xuanyuanking |
|
Thank you, @nchammas . Tagging requires committership. Thank you, @LinhongLiu . Yes. We can do that while keeping @LiangchangZ 's authorship. The current status of this PR is a stage where we are discussing the validity and impact of this bug. I believe this should be considered as |
|
@LinhongLiu Thanks Linhong, agree to continue this bugfix, please go ahead. |
Indeed, we (Beam) are stuck in the streaming mode implementation of the translation layer (Beam runner) using StructuredStreaming framework. What about re-opening #23576 as well ? CC @arunmahadevan |
|
This was already open. I think the stale tag just has to be removed, done yesterday. |
Follow-on to #26877. ### What changes were proposed in this pull request? This PR tweaks the stale PR message to [clarify](#24457 (comment)) the procedure for reopening a PR after it has been marked as stale. ### Why are the changes needed? This change should clarify the reopening process for contributors. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? N/A Closes #27114 from nchammas/SPARK-30173-stale-tweaks. Authored-by: Nicholas Chammas <nicholas.chammas@gmail.com> Signed-off-by: Sean Owen <srowen@gmail.com>
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
|
@LinhongLiu Are you still working on this? If not, I will take over and ping you for review. |
|
Please take over this, @xuanyuanking . Thanks~ |
|
Thanks, I will submit a new PR today. |
…data lost Credit to LiangchangZ, this PR reuses the UT as well as integrate test in #24457. Thanks Liangchang for your solid work. ### What changes were proposed in this pull request? Make metadata propagatable between Aliases. ### Why are the changes needed? In Structured Streaming, we added an Alias for TimeWindow by default. https://github.com/apache/spark/blob/590b9a0132b68d9523e663997def957b2e46dfb1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L3272-L3273 For some cases like stream join with watermark and window, users need to add an alias for convenience(we also added one in StreamingJoinSuite). The current metadata handling logic for `as` will lose the watermark metadata https://github.com/apache/spark/blob/590b9a0132b68d9523e663997def957b2e46dfb1/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L1049-L1054 and finally cause the AnalysisException: ``` Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition ``` ### Does this PR introduce any user-facing change? Bugfix for an alias on time window with watermark. ### How was this patch tested? New UTs added. One for the functionality and one for explaining the common scenario. Closes #28326 from xuanyuanking/SPARK-27340. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…data lost Credit to LiangchangZ, this PR reuses the UT as well as integrate test in #24457. Thanks Liangchang for your solid work. ### What changes were proposed in this pull request? Make metadata propagatable between Aliases. ### Why are the changes needed? In Structured Streaming, we added an Alias for TimeWindow by default. https://github.com/apache/spark/blob/590b9a0132b68d9523e663997def957b2e46dfb1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L3272-L3273 For some cases like stream join with watermark and window, users need to add an alias for convenience(we also added one in StreamingJoinSuite). The current metadata handling logic for `as` will lose the watermark metadata https://github.com/apache/spark/blob/590b9a0132b68d9523e663997def957b2e46dfb1/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L1049-L1054 and finally cause the AnalysisException: ``` Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition ``` ### Does this PR introduce any user-facing change? Bugfix for an alias on time window with watermark. ### How was this patch tested? New UTs added. One for the functionality and one for explaining the common scenario. Closes #28326 from xuanyuanking/SPARK-27340. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit ba7adc4) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…data lost Credit to LiangchangZ, this PR reuses the UT as well as integrate test in apache#24457. Thanks Liangchang for your solid work. Make metadata propagatable between Aliases. In Structured Streaming, we added an Alias for TimeWindow by default. https://github.com/apache/spark/blob/590b9a0132b68d9523e663997def957b2e46dfb1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L3272-L3273 For some cases like stream join with watermark and window, users need to add an alias for convenience(we also added one in StreamingJoinSuite). The current metadata handling logic for `as` will lose the watermark metadata https://github.com/apache/spark/blob/590b9a0132b68d9523e663997def957b2e46dfb1/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L1049-L1054 and finally cause the AnalysisException: ``` Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition ``` Bugfix for an alias on time window with watermark. New UTs added. One for the functionality and one for explaining the common scenario. Closes apache#28326 from xuanyuanking/SPARK-27340. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
window($"fooTime", "2 seconds").alias("fooWindow")can generate an expression treeAlias(fooWindow) <- TimeWindow. The tree will becomeAlias(fooWindow) <- Alias(window) <- Window(start, end)after analyzed by TimeWindowing rule. TheAlias(window)got metadata of watermark when created:but the
Alias(fooWindow)is created before TimeWindowing rule effected. Its code path is:before TimeWindowing rule effected, the
ne.metadatais None and cause the watermark metadata lostWe make the
def name(alias: String)return aAliaswhich get metadata from its child automatically, when not specifying metadata explicitly.Thank @LinhongLiu for helping analyzing this problem!
How was this patch tested?
Add a UT and do the integration tests by run the example in jira successfully and do not throw org.apache.spark.sql.AnalysisException anymore