Skip to content

Conversation

@imback82
Copy link
Contributor

@imback82 imback82 commented Nov 19, 2019

What changes were proposed in this pull request?

DataFrameNaFunctions.fill doesn't handle duplicate columns even when column names are not specified.

val left = Seq(("1", null), ("3", "4")).toDF("col1", "col2")
val right = Seq(("1", "2"), ("3", null)).toDF("col1", "col2")
val df = left.join(right, Seq("col1"))
df.printSchema
df.na.fill("hello").show

produces

root
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)
 |-- col2: string (nullable = true)

org.apache.spark.sql.AnalysisException: Reference 'col2' is ambiguous, could be: col2, col2.;
  at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:259)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveQuoted(LogicalPlan.scala:121)
  at org.apache.spark.sql.Dataset.resolve(Dataset.scala:221)
  at org.apache.spark.sql.Dataset.col(Dataset.scala:1268)

The reason for the above failure is that columns are looked up with DataSet.col() which tries to resolve a column by name and if there are multiple columns with the same name, it will fail due to ambiguity.

This PR updates DataFrameNaFunctions.fill such that if the columns to fill are not specified, it will resolve ambiguity gracefully by applying fill to all the eligible columns. (Note that if the user specifies the columns, it will still continue to fail due to ambiguity).

Why are the changes needed?

If column names are not specified, fill should not fail due to ambiguity since it should still be able to apply fill to the eligible columns.

Does this PR introduce any user-facing change?

Yes, now the above example displays the following:

+----+-----+-----+
|col1| col2| col2|
+----+-----+-----+
|   1|hello|    2|
|   3|    4|hello|
+----+-----+-----+

How was this patch tested?

Added new unit tests.

@imback82
Copy link
Contributor Author

cc: @cloud-fan

@SparkQA
Copy link

SparkQA commented Nov 19, 2019

Test build #114071 has finished for PR 26593 at commit 623099e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 20, 2019

Test build #114118 has finished for PR 26593 at commit 5642d9e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 20, 2019

Test build #114120 has finished for PR 26593 at commit b0f5f5c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 20, 2019

Test build #114121 has finished for PR 26593 at commit 204bb10.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

private def toAttributes(cols: Seq[String]): Seq[Attribute] = {
cols.map(df.col(_).named.toAttribute)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be more strict:

cols.map(resolve).map {
  case a: Attribute => a
  case _ => fail(is not a top-level column)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed as suggested. One naive question: do we need to be more restrictive here even if we use the result of df.col(_), which uses the outputAttributes of the plan?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

df.col may return a nested field, which is not an attribute.

}
}
cols.map(resolve)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I noticed that drop() is resolving column names by cols.map(name => df.resolve(name) instead of df.col(name). The difference (other than the return type) is that df.col() will try to resolve using regex and adding metadata. Do you think we need to make this consistent?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should. To avoid breaking change, I think we should change drop to follow fill to make it more powerful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I will do this in a separate PR which will address the issue with handling duplicate columns in drop when no columns are specified.

@imback82
Copy link
Contributor Author

@dongjoon-hyun I think this PR may have been mislabelled. Thanks!

@SparkQA
Copy link

SparkQA commented Nov 20, 2019

Test build #114178 has finished for PR 26593 at commit 3efcf13.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan added SQL and removed SPARK SHELL labels Nov 21, 2019
def resolve(colName: String) : Attribute = {
df.col(colName).named.toAttribute match {
case a: Attribute => a
case _ => throw new IllegalArgumentException(s"'$colName' is not a top level column.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for this branch? The code seems not to work as expected. toAttribute always return an Attribute and we never hit this branch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Let me merge different threads here)

The previous implementation resolved the column names as follow:

  1. Check the column names against the schema (df.schema.fields)
  2. Only the column names that matched the top level fields were used for df.col.

So, if we want to keep the previous behavior of fill (for handling *, and etc.), we first need to check the column names against the schema, then do df.col(colName).named.toAttribute.

Then, the nested field will not pass the schema check (since it's checking against the top-level fields), and toAttribute will always return Attribute. So, I am not sure how this branch can be tested. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we should call df.col to handle * etc. But we shouldn't call .named.toAttribute which turns everything to attribute and make us not able to detect nested fields.

Can we add a test to fill nested fields and see the result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested fields are not supported since the name is checked first against df.schema.fields which contains the top-level field, and these do not become candidates for fill. * is handled the same way and it is ignored. I can capture this behavior in the unit test.

Also, what's the best way to convert Column to Attribute? Now that we have a single point of entry def fillValue[T](value: T, cols: Seq[Attribute]), we need to convert Column (result of df.col) to Attribute.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes nested fields should be ignored, we should have a test to verify it. how about

cols.map(name => df.col(name).expr).collect {
  case a: Attribute => a
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great, thanks!

@SparkQA
Copy link

SparkQA commented Nov 22, 2019

Test build #114277 has finished for PR 26593 at commit 702897c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

retest this please

private def toAttributes(cols: Seq[String]): Seq[Attribute] = {
cols.flatMap { colName =>
df.col(colName).expr.collect {
case a: Attribute => a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this corrected? If df.col returns a nested field GetStructField("a", Attribute("b")), then we will return Attribute("b") which is unexpected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this works, but we can make it clearer.

cols.map(name => df.col(name).expr).collect {
  case a: Attribute => a
}

Since we know that struct type column will be ignored later, we don't need to collect them here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are absolutely right. Thanks for the suggestion!

Row(null) :: Row("b1") :: Nil)

// Nested columns are ignored for fill().
checkAnswer(df.na.fill("a1", Seq("c1.c1-1")), data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can expose the bug if the data is

val data = Seq(
  Row(Row(null, "a2")),
  Row(Row("b1", "b2")),
  Row(null))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example works fine, and you also need to update the previous line to:

checkAnswer(df.select("c1.c1-1"),
  Row(null) :: Row("b1") :: Row(null) :: Nil)

(This is just to illustrate the nested type, but I can remove it if you think it's confusing.)

The reason the nested types are ignored is the following check:

  case (NumericType, dt) => dt.isInstanceOf[NumericType]
  case (StringType, dt) => dt == StringType
  case (BooleanType, dt) => dt == BooleanType

The datatype for the nested column that is resolved to Attribute is StructType, so this will not be matched.

@SparkQA
Copy link

SparkQA commented Nov 22, 2019

Test build #114285 has finished for PR 26593 at commit 702897c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

@SparkQA
Copy link

SparkQA commented Nov 25, 2019

Test build #114376 has finished for PR 26593 at commit 033beb5.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 25, 2019

Test build #114388 has finished for PR 26593 at commit 033beb5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in f09c1a3 Nov 25, 2019
@dongjoon-hyun
Copy link
Member

Hi, All.
Can we have this in branch-2.4?
There was some request for this.

@imback82
Copy link
Contributor Author

Sure. Do you want me to handle back-porting this to branch-2.4? If so, do we a doc on the procedure?

@dongjoon-hyun
Copy link
Member

Thank you, @imback82 . It would be great.
BTW, I didn't catch the meaning of the following. Could you ask again?

do we a doc on the procedure?

We usually make another PR against branch-2.4. That will will review there

@dongjoon-hyun
Copy link
Member

Also, cc @PavithraRamachandran

@imback82
Copy link
Contributor Author

Got it. I will work on this today.

dongjoon-hyun pushed a commit that referenced this pull request Jan 31, 2020
…cate columns

(Backport of #26593)

### What changes were proposed in this pull request?

`DataFrameNaFunctions.fill` doesn't handle duplicate columns even when column names are not specified.

```Scala
val left = Seq(("1", null), ("3", "4")).toDF("col1", "col2")
val right = Seq(("1", "2"), ("3", null)).toDF("col1", "col2")
val df = left.join(right, Seq("col1"))
df.printSchema
df.na.fill("hello").show
```
produces
```
root
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)
 |-- col2: string (nullable = true)

org.apache.spark.sql.AnalysisException: Reference 'col2' is ambiguous, could be: col2, col2.;
  at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:259)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveQuoted(LogicalPlan.scala:121)
  at org.apache.spark.sql.Dataset.resolve(Dataset.scala:221)
  at org.apache.spark.sql.Dataset.col(Dataset.scala:1268)
```
The reason for the above failure is that columns are looked up with `DataSet.col()` which tries to resolve a column by name and if there are multiple columns with the same name, it will fail due to ambiguity.

This PR updates `DataFrameNaFunctions.fill` such that if the columns to fill are not specified, it will resolve ambiguity gracefully by applying `fill` to all the eligible columns. (Note that if the user specifies the columns, it will still continue to fail due to ambiguity).

### Why are the changes needed?

If column names are not specified, `fill` should not fail due to ambiguity since it should still be able to apply `fill` to the eligible columns.

### Does this PR introduce any user-facing change?

Yes, now the above example displays the following:
```
+----+-----+-----+
|col1| col2| col2|
+----+-----+-----+
|   1|hello|    2|
|   3|    4|hello|
+----+-----+-----+

```

### How was this patch tested?

Added new unit tests.

Closes #27407 from imback82/backport-SPARK-29890.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants