Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Sep 30, 2019

What changes were proposed in this pull request?

This patch proposes to delete old Hive external partition directory even the partition does not exist in Hive, when insert overwrite Hive external table partition.

Why are the changes needed?

When insert overwrite to a Hive external table partition, if the partition does not exist, Hive will not check if the external partition directory exists or not before copying files. So if users drop the partition, and then do insert overwrite to the same partition, the partition will have both old and new data.

For example:

withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") {
  // test is an external Hive table.
  sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 1")
  sql("ALTER TABLE test DROP PARTITION(name='n1')")
  sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 2")
  sql("SELECT id FROM test WHERE name = 'n1' ORDER BY id") // Got both 1 and 2.
}

Does this PR introduce any user-facing change?

Yes. This fix a correctness issue when users drop partition on a Hive external table partition and then insert overwrite it.

How was this patch tested?

Added test.

// insert overwrite to the same partition, the partition will have both old and new
// data.
val updatedPart = if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
AlterTableAddPartitionCommand(
Copy link
Member Author

@viirya viirya Sep 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of constructing partition directory manually, I let hive add the target parititon to be overwritten. We will delete the partition directory below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we rollback if the follow loadPartition operation fails?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we provide rollback for load partition before?

@viirya
Copy link
Member Author

viirya commented Sep 30, 2019

@viirya
Copy link
Member Author

viirya commented Sep 30, 2019

cc @felixcheung

@SparkQA
Copy link

SparkQA commented Sep 30, 2019

Test build #111616 has finished for PR 25979 at commit 5e5c6ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

How does hive handle it? Does it also produce duplicated data?

@viirya
Copy link
Member Author

viirya commented Oct 2, 2019

As I quickly tried on Hive, Hive does not delete existing directory with "INSERT OVERWRITE", if the external partition was dropped by "DROP PARTITION" before. Hive just moves data from staging into the directory in this case.

Whether it produces duplicated data, depending on the filename generated. I tested two version of Hive locally. I do not know the logic Hive produces data filename.

On Hive 2.1.0, two "INSERT OVERWRITE" produces data file with same name like 000000_0. The second "INSERT OVERWRITE" moves the file into and overwrite old file.

On Hive 2.3.2, the second "INSERT OVERWRITE" causes following failure when moving file with same name:

0: jdbc:hive2://localhost:10000> CREATE EXTERNAL TABLE test(id int) PARTITIONED BY (name string) STORED AS TEXTFILE LOCATION '/tmp/test';
No rows affected (0.074 seconds)
0: jdbc:hive2://localhost:10000> INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 1;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
No rows affected (1.809 seconds)
0: jdbc:hive2://localhost:10000> ALTER TABLE test DROP PARTITION(name='n1');
No rows affected (0.175 seconds)
0: jdbc:hive2://localhost:10000> INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 2;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Error: org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.MoveTask. java.io.IOException: rename for src path: hdfs://namenode:8020/tmp/test/name=n1/.hive-staging_hive_2019-10-02_16-01-28_570_7296144361967433532-1/-ext-10000/000000_0 to dest path:hdfs://namenode:8020/tmp/test/name=n1/000000_0 returned false
	at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:380)
	at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:257)
	at org.apache.hive.service.cli.operation.SQLOperation.access$800(SQLOperation.java:91)
	at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork$1.run(SQLOperation.java:348)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
	at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork.run(SQLOperation.java:362)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

So, I think it is sure that Hive does not delete existing directory when "INSERT OVERWRITE" into a non-existing partition. Whether duplicated data are seen, it depends on if filenames are unique or not between different "INSERT OVERWRITE".

If your first "INSERT OVERWRITE" produces filename "000000_0" and "000001_0", but the second "INSERT OVERWRITE" produces just "000000_0", you will read partial old data "000001_0" with new "000000_0" together.

Above tests are using MR as Hive execution engine. I am not sure if Hive has different output filename in different execution engine like Spark or Tez.

@cloud-fan
Copy link
Contributor

Do you mean the current behavior is consistent with some Hive versions?

@viirya
Copy link
Member Author

viirya commented Oct 3, 2019

Currently behavior of INSERT OVERWRITE into a Hive external table partition is consistent with Hive. If the partition was dropped by DROP PARTITION before, INSERT OVERWRITE will not overwrite existing partition directory.

If there are files in the directory, conflicting files will be overwritten (Hive 2.1) or exception thrown (Hive 2.3.2). Unique files will be read as duplicated data.

@felixcheung
Copy link
Member

yeah, this is a hard one, obviously the behavior is buggy, hard to detect etc. but that's how Hive is designed. I think we should log a warn in Spark at least so interested folks (like us) can detect this after the job is run

On Hive 2.1.0, two "INSERT OVERWRITE" produces data file with same name like 000000_0. The second "INSERT OVERWRITE" moves the file into and overwrite old file.

On Hive 2.3.2, the second "INSERT OVERWRITE" causes following failure when moving file with same name

we can't really rely on the name being the same to overwrite. it depends on a number of things. for instance, if the original partition has 10B row and 1M file, overwritten with new partition having 1B and 100k file, then a lot of files are not going to be overwritten (like 900k)

@felixcheung
Copy link
Member

or we do

exception thrown (Hive 2.3.2)

@viirya
Copy link
Member Author

viirya commented Oct 3, 2019

I am not sure if this is designed behavior of Hive.

You can see a similar issue at Hive https://jira.apache.org/jira/browse/HIVE-18702. INSERT OVERWRITE should clean table directory.

The last comment in that ticket, someone left a comment describing the same case as this. He said he will create a ticket, but I do not see that.

@viirya
Copy link
Member Author

viirya commented Oct 3, 2019

I think the semantics of INSERT OVERWRITE should be overwriting existing data with new data. By no means it leaves old data intact with new data.

I think there are few options in this case:

  1. delete old data like this PR currently did
  2. no delete old data, throw an exception to user, saying there are old data. (I think this might be confusing to user. I am also wondering how user can react to this, if they do not access filesystem)

Any ideas? @cloud-fan @felixcheung

@felixcheung
Copy link
Member

felixcheung commented Oct 6, 2019 via email

@SparkQA
Copy link

SparkQA commented Oct 8, 2019

Test build #111876 has finished for PR 25979 at commit 4df8a79.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 8, 2019

Test build #111900 has finished for PR 25979 at commit 1618e56.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 8, 2019

Test build #111914 has finished for PR 25979 at commit 78beb8a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val splitPart = part.split("=")
assert(splitPart.size == 2, s"Invalid written partition path: $part")
ExternalCatalogUtils.unescapePathName(splitPart(0)) ->
ExternalCatalogUtils.unescapePathName(splitPart(1))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding some partition values that get escaped, we need to unescape them.

@SparkQA
Copy link

SparkQA commented Oct 8, 2019

Test build #111916 has finished for PR 25979 at commit 965dc4a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Oct 10, 2019

@cloud-fan @felixcheung Do you have more comments on this?

@viirya
Copy link
Member Author

viirya commented Oct 14, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Oct 14, 2019

Test build #112053 has finished for PR 25979 at commit 965dc4a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Oct 16, 2019

cc @dongjoon-hyun @HyukjinKwon too

@SparkQA
Copy link

SparkQA commented Oct 17, 2019

Test build #112203 has finished for PR 25979 at commit d15576f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 17, 2019

Test build #112204 has finished for PR 25979 at commit e395fac.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 17, 2019

Test build #112209 has finished for PR 25979 at commit 8337870.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan closed this in 5692680 Oct 18, 2019
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@HyukjinKwon
Copy link
Member

Sorry for late response, @viirya. Looks making sense to me too.

@AngersZhuuuu
Copy link
Contributor

AngersZhuuuu commented Nov 12, 2019

As I quickly tried on Hive, Hive does not delete existing directory with "INSERT OVERWRITE", if the external partition was dropped by "DROP PARTITION" before. Hive just moves data from staging into the directory in this case.

Whether it produces duplicated data, depending on the filename generated. I tested two version of Hive locally. I do not know the logic Hive produces data filename.

On Hive 2.1.0, two "INSERT OVERWRITE" produces data file with same name like 000000_0. The second "INSERT OVERWRITE" moves the file into and overwrite old file.

On Hive 2.3.2, the second "INSERT OVERWRITE" causes following failure when moving file with same name:

0: jdbc:hive2://localhost:10000> CREATE EXTERNAL TABLE test(id int) PARTITIONED BY (name string) STORED AS TEXTFILE LOCATION '/tmp/test';
No rows affected (0.074 seconds)
0: jdbc:hive2://localhost:10000> INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 1;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
No rows affected (1.809 seconds)
0: jdbc:hive2://localhost:10000> ALTER TABLE test DROP PARTITION(name='n1');
No rows affected (0.175 seconds)
0: jdbc:hive2://localhost:10000> INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 2;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Error: org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.MoveTask. java.io.IOException: rename for src path: hdfs://namenode:8020/tmp/test/name=n1/.hive-staging_hive_2019-10-02_16-01-28_570_7296144361967433532-1/-ext-10000/000000_0 to dest path:hdfs://namenode:8020/tmp/test/name=n1/000000_0 returned false
	at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:380)
	at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:257)
	at org.apache.hive.service.cli.operation.SQLOperation.access$800(SQLOperation.java:91)
	at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork$1.run(SQLOperation.java:348)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
	at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork.run(SQLOperation.java:362)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

So, I think it is sure that Hive does not delete existing directory when "INSERT OVERWRITE" into a non-existing partition. Whether duplicated data are seen, it depends on if filenames are unique or not between different "INSERT OVERWRITE".

If your first "INSERT OVERWRITE" produces filename "000000_0" and "000001_0", but the second "INSERT OVERWRITE" produces just "000000_0", you will read partial old data "000001_0" with new "000000_0" together.

Above tests are using MR as Hive execution engine. I am not sure if Hive has different output filename in different execution engine like Spark or Tez.

Hive use mv here , so if name is same, it will be replaced, if your partition was dropped remain two files

0000000_0
0000001_0

Then you insert overwrite into this partition with one file 0000000_0, then the old data 0000001_0 still remains.

In spark, each result's file name is different....so, data dunplicted.

@dongjoon-hyun
Copy link
Member

Hi, @viirya . Since this seems to be a correctness fix, could you make a backport PR to branch-2.4?

Hi, @AngersZhuuuu . Do you mean that is another corner case existing still?

@viirya
Copy link
Member Author

viirya commented Mar 11, 2020

@dongjoon-hyun Ok. I can prepare a backport PR for 2.4.

@dongjoon-hyun
Copy link
Member

Thank you so much, @viirya .

dongjoon-hyun pushed a commit that referenced this pull request Mar 12, 2020
…tion should delete old data

### What changes were proposed in this pull request?

This patch proposes to delete old Hive external partition directory even the partition does not exist in Hive, when insert overwrite Hive external table partition.

This is backport of #25979 to branch-2.4.

### Why are the changes needed?

When insert overwrite to a Hive external table partition, if the partition does not exist, Hive will not check if the external partition directory exists or not before copying files. So if users drop the partition, and then do insert overwrite to the same partition, the partition will have both old and new data.

For example:
```scala
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") {
  // test is an external Hive table.
  sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 1")
  sql("ALTER TABLE test DROP PARTITION(name='n1')")
  sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 2")
  sql("SELECT id FROM test WHERE name = 'n1' ORDER BY id") // Got both 1 and 2.
}
```

### Does this PR introduce any user-facing change?

Yes. This fix a correctness issue when users drop partition on a Hive external table partition and then insert overwrite it.

### How was this patch tested?

Added test.

Closes #27887 from viirya/SPARK-29295-2.4.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
cloud-fan pushed a commit that referenced this pull request Jun 9, 2020
…ion path should be case insensitive

### What changes were proposed in this pull request?

This is a follow up of #25979.
When we inserting overwrite  an external hive partitioned table with upper case dynamic partition key, exception thrown.

like:
```
org.apache.spark.SparkException: Dynamic partition key P1 is not among written partition paths.
```
The root cause is that Hive metastore is not case preserving and keeps partition columns with lower cased names, see details in:

https://github.com/apache/spark/blob/ddd8d5f5a0b6db17babc201ba4b73f7df91df1a3/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala#L895-L901
https://github.com/apache/spark/blob/e28914095aa1fa7a4680b5e4fcf69e3ef64b3dbc/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala#L228-L234

In this PR, we convert the dynamic partition map to a case insensitive map.
### Why are the changes needed?

To fix the issue when inserting overwrite into external hive partitioned table with upper case dynamic partition key.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
UT.

Closes #28765 from turboFei/SPARK-29295-follow-up.

Authored-by: turbofei <fwang12@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Jun 9, 2020
…ion path should be case insensitive

### What changes were proposed in this pull request?

This is a follow up of #25979.
When we inserting overwrite  an external hive partitioned table with upper case dynamic partition key, exception thrown.

like:
```
org.apache.spark.SparkException: Dynamic partition key P1 is not among written partition paths.
```
The root cause is that Hive metastore is not case preserving and keeps partition columns with lower cased names, see details in:

https://github.com/apache/spark/blob/ddd8d5f5a0b6db17babc201ba4b73f7df91df1a3/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala#L895-L901
https://github.com/apache/spark/blob/e28914095aa1fa7a4680b5e4fcf69e3ef64b3dbc/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala#L228-L234

In this PR, we convert the dynamic partition map to a case insensitive map.
### Why are the changes needed?

To fix the issue when inserting overwrite into external hive partitioned table with upper case dynamic partition key.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
UT.

Closes #28765 from turboFei/SPARK-29295-follow-up.

Authored-by: turbofei <fwang12@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 717ec5e)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@viirya viirya deleted the SPARK-29295 branch December 27, 2023 18:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants