Skip to content

Conversation

@xuanyuanking
Copy link
Member

@xuanyuanking xuanyuanking commented Apr 24, 2020

Credit to @LiangchangZ, this PR reuses the UT as well as integrate test in #24457. Thanks Liangchang for your solid work.

What changes were proposed in this pull request?

Make metadata propagatable between Aliases.

Why are the changes needed?

In Structured Streaming, we added an Alias for TimeWindow by default.

TimeWindow(timeColumn.expr, windowDuration, slideDuration, startTime)
}.as("window")

For some cases like stream join with watermark and window, users need to add an alias for convenience(we also added one in StreamingJoinSuite). The current metadata handling logic for as will lose the watermark metadata
def name(alias: String): Column = withExpr {
normalizedExpr() match {
case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata))
case other => Alias(other, alias)()
}
}

and finally cause the AnalysisException:

Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition

Does this PR introduce any user-facing change?

Bugfix for an alias on time window with watermark.

How was this patch tested?

New UTs added. One for the functionality and one for explaining the common scenario.

xuanyuanking and others added 2 commits April 24, 2020 16:32
Co-authored-by: zhuliangchang <zhuliangchang@baidu.com>
@SparkQA
Copy link

SparkQA commented Apr 24, 2020

Test build #121746 has finished for PR 28326 at commit 3fd8ce2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Apr 24, 2020

Thank you, @xuanyuanking . This is a different approach with the same test cases, right?

@xuanyuanking
Copy link
Member Author

@dongjoon-hyun Yes that's right, the approach in #24457 has a side-effect on the scenario of creating a new NamedExpression.
Also cc @cloud-fan @tdas @zsxwing @HeartSaVioR @LinhongLiu for taking a look. Thanks!

@HeartSaVioR
Copy link
Contributor

(Please consider that I'm not an expert of SQL area.)

I've read through the code around Alias and played with the reproducer (test), and feel #24457 is the fix addressing root cause.

Looking into the reproducer with debugger,

val left = df1.select('key, window('leftTime, "10 second") as 'leftWindow, 'leftValue)

'leftTime in here is unresolved when applying as (hence window is also unresolved as well), and as sets the metadata with what original column has whereas the metadata of 'leftWindow cannot be determined here, hence the problem arises.

Even without the analysis, logically thinking, I'm wondering why Alias has an explicit metadata and hides the actual attribute's metadata, except the case of optimization which should be done without side-effects.

Sorry for the dumb question, but is there any real case to do it, and even if it's valid, is it intentional to hide the metadata of actual attribute? Shouldn't we retain the metadata of actual attribute's metadata as well?

If we concern about the performance about not having shortcut of Alias metadata, below fix may bring same effect with #24457, whereas it only changes the behavior when Alias renames Column which has unresolved expression:

def name(alias: String): Column = withExpr {
    normalizedExpr() match {
      case ne: NamedExpression if ne.resolved =>
        Alias(expr, alias)(explicitMetadata = Some(ne.metadata))
      case other => Alias(other, alias)()
    }
  }

@cloud-fan
Copy link
Contributor

I'm wondering why Alias has an explicit metadata and hides the actual attribute's metadata

Sometimes we have to, for example, a + b as c, what should be the metadata of c?

I think the root cause is we rely on column metadata to store some important information, and we should make column metadata reliable as possible as we can. For example, a as c, if users do not specify the metadata of c explicitly, it should inherit metadata from a.

case a: Alias =>
a.copy(child = trimAliases(a.child))(
val newChild = trimAliases(a.child)
// Specific logic for keeping the eventTime watermark metadata in the top level alias.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make it general? I think what we need to do is to propagate the alias metadata when there are contiguous Alias nodes and we merge them into one.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Apr 27, 2020

Sometimes we have to, for example, a + b as c, what should be the metadata of c?

The metadata of expression for a + b if there's no explicit set of metadata. That's the role of + operator defining metadata how to merge two metadata into one, or simply discard both.

I'm not referring about explicit set of metadata: that's pretty natural to be overwritten because they want to. The issue is not the case.

For example, a as c, if users do not specify the metadata of c explicitly, it should inherit metadata from a.

Here a is unresolved attribute in this case which will be replaced with actual attribute later. Can metadata of Alias be resolved while origin attribute is not resolved yet? (Except the case where explicit metadata is available.)

Retrieving metadata of chained Alias simply works as it does find the underlying attribute via recursive manner. That said, I'm not seeing the bug in such rule the PR is about to fix. If Alias does provide metadata correctly, it should work and the fix would be simpler (one-liner).

@cloud-fan
Copy link
Contributor

Seems we don't have any rule for column metadata propagation. Each Attribute/Alias has its own metadata and can easily be hidden by the outer-most Alias.

The only "propagation" I know is in Column.name, where we keep the column metadata when adding a new Alias. We need a more clear rule about how to propagate column metadata.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Apr 27, 2020

Each Attribute/Alias has its own metadata and can easily be hidden by the outer-most Alias.

Yeah I see the concern - I'm not sure the column metadata was considered as critical information for the first time it was introduced (was it before Structured Streaming was introduced?), and it becomes critical for the structured streaming queries. If the query fails while analyzing that might be happy case - if the query doesn't fail then state will grow without evicting anything (end users may not notice it if they don't watch the status from streaming listener), and incurs runtime issues in production.

I also see the benefit of this patch - the patch "ignores" the explicit metadata in Alias, which helps to mitigate unintended explicit metadata in Aliases. But that's only to avoid impacts on the possible bugs and doesn't still sound as a fix for root cause.

The only "propagation" I know is in Column.name, where we keep the column metadata when adding a new Alias.

That sounds as event-time metadata could be lost when we apply non-Alias operations. I roughly remember I've met the situation when I played with flatMapGroupsWithState (where I should convert the untyped Dataset to typed one, especially convert to Dataset[<case class>]) but I've just struggled with workaround at that time and I don't have the reproducer as of now.

@xuanyuanking
Copy link
Member Author

Change the approach to delete the metadata propagating logic in Column in 05ed338.
At first, I just followed the name API meaning and do the fix in analyzer rule.
But after discussing with @cloud-fan and checking the #8215, which introduces the changes for Column.as, revert the propagating here should be the right way.
For watermark metadata here, we need to keep it in top-level alias. Since the TimeWindow will be resolved in TimeWindowingRule and then the metadata take effect, the existing logic in Alias.metadata already ensure the propagation of metadata.

*/
def name(alias: String): Column = withExpr {
normalizedExpr() match {
case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added by #8215. But looking at the added tests at that time, we just want to carry over the metadata from the child of Alias, which doesn't need this change as Alias.metedata can inherit its child's metadata.

}
}

test("SPARK-27340 Windowed left out join with Alias on TimeWindow") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit. out -> outer, Let's ignore for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, delete this test in #28390.

@SparkQA
Copy link

SparkQA commented Apr 27, 2020

Test build #121905 has finished for PR 28326 at commit 05ed338.

  • This patch fails from timeout after a configured wait of 400m.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code change looks great. Left comments to simplify the new test to verify this.

}
}

test("SPARK-27340 Windowed left out join with Alias on TimeWindow") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is to retain the efforts of origin PR, but based on root cause, it should be pretty much easier to reproduce (and you actually did it in EventTimeWatermarkSuite).

Let's remove this test in new commit (so that we can still retain the credit) and append more code on new UT to do E2E test. I'll comment there for code we need to add.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, addressed in #28390.

.select(window($"eventTime", "5 seconds") as 'aliasWindow)
// Check the eventTime metadata is kept in the top level alias.
assert(aliasWindow.logicalPlan.output.exists(
_.metadata.contains(EventTimeWatermark.delayKey)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


    val windowedAggregation = aliasWindow
      .groupBy('aliasWindow)
      .agg(count("*") as 'count)
      .select($"aliasWindow".getField("start").cast("long").as[Long], $"count".as[Long])

    testStream(windowedAggregation)(
      AddData(inputData, 10, 11, 12, 13, 14, 15),
      CheckNewAnswer(),
      AddData(inputData, 25),   // Advance watermark to 15 seconds
      CheckNewAnswer((10, 5)),
      assertNumStateRows(2),
      AddData(inputData, 10),   // Should not emit anything as data less than watermark
      CheckNewAnswer(),
      assertNumStateRows(2)
    )

Let's append this to make the UT verifying E2E (yes this is same as other UTs in this suite, and the revised UT fails on master branch even without assertion to check metadata directly) - and then we no longer need to have complicated stream-stream join UT.

@dongjoon-hyun
Copy link
Member

Thank you, @xuanyuanking , @cloud-fan , @HeartSaVioR .

The test passed almost and was time-outed 6 hr 42 min during the end of Python Tests. In these days, this frequently happens. I'll merge this first.

@xuanyuanking . Please try to address @HeartSaVioR 's comment as a follow-up [TESTS] PR. Thanks.

dongjoon-hyun pushed a commit that referenced this pull request Apr 27, 2020
…data lost

Credit to LiangchangZ, this PR reuses the UT as well as integrate test in #24457. Thanks Liangchang for your solid work.

### What changes were proposed in this pull request?
Make metadata propagatable between Aliases.

### Why are the changes needed?
In Structured Streaming, we added an Alias for TimeWindow by default.
https://github.com/apache/spark/blob/590b9a0132b68d9523e663997def957b2e46dfb1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L3272-L3273
For some cases like stream join with watermark and window, users need to add an alias for convenience(we also added one in StreamingJoinSuite). The current metadata handling logic for `as` will lose the watermark metadata
https://github.com/apache/spark/blob/590b9a0132b68d9523e663997def957b2e46dfb1/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L1049-L1054
 and finally cause the AnalysisException:
```
Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition
```

### Does this PR introduce any user-facing change?
Bugfix for an alias on time window with watermark.

### How was this patch tested?
New UTs added. One for the functionality and one for explaining the common scenario.

Closes #28326 from xuanyuanking/SPARK-27340.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit ba7adc4)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@dongjoon-hyun
Copy link
Member

Also, could you make a backporting PR to branch-2.4 please, @xuanyuanking ?

cc @holdenk since she is the release manager for 2.4.6.

@zsxwing
Copy link
Member

zsxwing commented Apr 27, 2020

Is it safe for a maintenance branch?

@HeartSaVioR
Copy link
Contributor

Just FYI, looks like the merged commit doesn't reflect the credit properly (not showing as 2 authors) - maybe because of the authorship of the first commit. author and committer were swapped. Maybe ideal to ask about @LiangchangZ whether it's OK.

Btw, I'd like to be sure about how to address the follow up PR across all branches, as it leaves up two tasks, backport PR for branch-2.4, follow up PR for master. Ideally we'd be better to sync up the commit, hence making backport PR don't contain the follow-up, and follow-up PR to be ported back as well. (Ideally it'd be nice to wait for couple of days more to address all of valid comments to avoid such situation.)

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Apr 27, 2020

@zsxwing . For branch-2.4, shall we talk on the backporting PR? I didn't backport it directly because we need a further discussion about critical bug fix and the behavior change. BTW, without this, we cannot run long running Structure Streaming with State queries.

@HeartSaVioR . Ya. Unfortunately, it looks like that. When this PR is made, I checked that the three commits has two authors and it still does, but merge-script seems to have a corner case and to miss it.. :(

@zsxwing
Copy link
Member

zsxwing commented Apr 27, 2020

For branch-2.4, shall we talk on the backporting PR?

IMO, it's better to discuss here because submitting a backporting PR is not a zero work. It's better to make a decision here so that we can avoid wasting the contributor's time when a backporting PR is not necessary.

@dongjoon-hyun
Copy link
Member

Okay. I'll make a PR to save the contributor's time because I was the requestor.

@dongjoon-hyun
Copy link
Member

* }}}
*
* If the current column has metadata associated with it, this metadata will be propagated
* to the new column. If this not desired, use `as` with explicitly empty metadata.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removing these comments?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments added together with the changes we just reverted. But it's good to have clear comments, I'll rephrase and add them back in the follow-up.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Apr 28, 2020

Sorry for maybe out-of-topic, but I'm seeing that column metadata may have more issues due to not properly defined its usage and how it should be handled.

I'm actually a bit surprised that both Spark and end users co-use metadata and can overwrite/hide each other. I thought that's only used internally from Spark, and wasn't aware that Spark exposes a public API to modify metadata.

I'm not sure this is really needed to be provided on end users side (or even 3rd party), because end users (+ 3rd party) would have no way to retrieve metadata from only public API. Retrieving metadata in end user's perspective requires pattern matching col.expr with NamedExpression and call metadata which is already in catalyst area (not a public API), or package hack to call named method (not a public API). That means, they may just blindly overwrite the one and hide the metadata of the underlying attribute. Do we have actual usage on it?

Also, as @cloud-fan commented earlier #28326 (comment), metadata propagation doesn't seem to be clearly defined. Alias keeps the metadata being propagated, but I'm not sure which other operations consider the metadata propagation while considering output of the operation.

@xuanyuanking xuanyuanking deleted the SPARK-27340 branch April 28, 2020 09:40
cloud-fan pushed a commit that referenced this pull request Apr 30, 2020
…y tests

### What changes were proposed in this pull request?

- Rephrase the API doc for `Column.as`
- Simplify the UTs

### Why are the changes needed?
Address comments in #28326

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
New UT added.

Closes #28390 from xuanyuanking/SPARK-27340-follow.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Apr 30, 2020
…y tests

### What changes were proposed in this pull request?

- Rephrase the API doc for `Column.as`
- Simplify the UTs

### Why are the changes needed?
Address comments in #28326

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
New UT added.

Closes #28390 from xuanyuanking/SPARK-27340-follow.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 7195a18)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
dongjoon-hyun pushed a commit that referenced this pull request Jun 10, 2020
### What changes were proposed in this pull request?

This is a followup of #28695 , to fix the problem completely.

The root cause is that, `df("col").as("name")` is not a column reference anymore, and should not have the special column metadata. However, this was broken in ba7adc4#diff-ac415c903887e49486ba542a65eec980L1050-L1053

This PR fixes the regression, by strip the special column metadata in `Column.name`, which is the behavior before #28326 .

### Why are the changes needed?

Fix a regression. We shouldn't fail if there is no ambiguous self-join.

### Does this PR introduce _any_ user-facing change?

Yes, the query in the test can run now.

### How was this patch tested?

updated test

Closes #28783 from cloud-fan/self-join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
dongjoon-hyun pushed a commit that referenced this pull request Jun 10, 2020
### What changes were proposed in this pull request?

This is a followup of #28695 , to fix the problem completely.

The root cause is that, `df("col").as("name")` is not a column reference anymore, and should not have the special column metadata. However, this was broken in ba7adc4#diff-ac415c903887e49486ba542a65eec980L1050-L1053

This PR fixes the regression, by strip the special column metadata in `Column.name`, which is the behavior before #28326 .

### Why are the changes needed?

Fix a regression. We shouldn't fail if there is no ambiguous self-join.

### Does this PR introduce _any_ user-facing change?

Yes, the query in the test can run now.

### How was this patch tested?

updated test

Closes #28783 from cloud-fan/self-join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit c400519)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
holdenk pushed a commit to holdenk/spark that referenced this pull request Jun 25, 2020
### What changes were proposed in this pull request?

This is a followup of apache#28695 , to fix the problem completely.

The root cause is that, `df("col").as("name")` is not a column reference anymore, and should not have the special column metadata. However, this was broken in apache@ba7adc4#diff-ac415c903887e49486ba542a65eec980L1050-L1053

This PR fixes the regression, by strip the special column metadata in `Column.name`, which is the behavior before apache#28326 .

### Why are the changes needed?

Fix a regression. We shouldn't fail if there is no ambiguous self-join.

### Does this PR introduce _any_ user-facing change?

Yes, the query in the test can run now.

### How was this patch tested?

updated test

Closes apache#28783 from cloud-fan/self-join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit c400519)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
rshkv pushed a commit to palantir/spark that referenced this pull request Jan 28, 2021
…data lost

Credit to LiangchangZ, this PR reuses the UT as well as integrate test in apache#24457. Thanks Liangchang for your solid work.

Make metadata propagatable between Aliases.

In Structured Streaming, we added an Alias for TimeWindow by default.
https://github.com/apache/spark/blob/590b9a0132b68d9523e663997def957b2e46dfb1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L3272-L3273
For some cases like stream join with watermark and window, users need to add an alias for convenience(we also added one in StreamingJoinSuite). The current metadata handling logic for `as` will lose the watermark metadata
https://github.com/apache/spark/blob/590b9a0132b68d9523e663997def957b2e46dfb1/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L1049-L1054
 and finally cause the AnalysisException:
```
Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition
```

Bugfix for an alias on time window with watermark.

New UTs added. One for the functionality and one for explaining the common scenario.

Closes apache#28326 from xuanyuanking/SPARK-27340.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
rshkv pushed a commit to palantir/spark that referenced this pull request Jan 28, 2021
This is a followup of apache#28695 , to fix the problem completely.

The root cause is that, `df("col").as("name")` is not a column reference anymore, and should not have the special column metadata. However, this was broken in apache@ba7adc4#diff-ac415c903887e49486ba542a65eec980L1050-L1053

This PR fixes the regression, by strip the special column metadata in `Column.name`, which is the behavior before apache#28326 .

Fix a regression. We shouldn't fail if there is no ambiguous self-join.

Yes, the query in the test can run now.

updated test

Closes apache#28783 from cloud-fan/self-join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants