|
17 | 17 |
|
18 | 18 | package org.apache.spark.sql.hive.execution |
19 | 19 |
|
| 20 | +import java.net.URI |
20 | 21 | import java.util.Locale |
21 | 22 |
|
22 | 23 | import org.apache.hadoop.conf.Configuration |
23 | | -import org.apache.hadoop.fs.Path |
| 24 | +import org.apache.hadoop.fs.{FileSystem, Path} |
24 | 25 | import org.apache.hadoop.hive.ql.ErrorMsg |
25 | 26 | import org.apache.hadoop.hive.ql.plan.TableDesc |
26 | 27 |
|
27 | 28 | import org.apache.spark.SparkException |
28 | 29 | import org.apache.spark.sql.{AnalysisException, Row, SparkSession} |
29 | | -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog, ExternalCatalogUtils} |
| 30 | +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog, ExternalCatalogUtils, ExternalCatalogWithListener} |
30 | 31 | import org.apache.spark.sql.catalyst.expressions.Attribute |
31 | 32 | import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan |
32 | 33 | import org.apache.spark.sql.execution.SparkPlan |
33 | 34 | import org.apache.spark.sql.execution.command.CommandUtils |
| 35 | +import org.apache.spark.sql.hive.HiveExternalCatalog |
34 | 36 | import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} |
35 | | -import org.apache.spark.sql.hive.client.HiveClientImpl |
| 37 | +import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion} |
| 38 | +import org.apache.spark.sql.hive.client.hive._ |
36 | 39 |
|
37 | 40 |
|
38 | 41 | /** |
@@ -281,11 +284,27 @@ case class InsertIntoHiveTable( |
281 | 284 | oldPart.flatMap(_.storage.locationUri.map(uri => new Path(uri))) |
282 | 285 | } |
283 | 286 |
|
| 287 | + val hiveVersion = externalCatalog.asInstanceOf[ExternalCatalogWithListener] |
| 288 | + .unwrapped.asInstanceOf[HiveExternalCatalog] |
| 289 | + .client |
| 290 | + .version |
| 291 | + // https://issues.apache.org/jira/browse/SPARK-31684, |
| 292 | + // For Hive 2.0.0 and onwards, as https://issues.apache.org/jira/browse/HIVE-11940 |
| 293 | + // has been fixed, and there is no performance issue anymore. |
| 294 | + // We should leave the overwrite logic to hive to avoid failure in `FileSystem#checkPath` |
| 295 | + // For Hive versions before 2.0.0, we leave the replace work to hive when the table |
| 296 | + // and partition locations do not belong to the same FileSystem |
| 297 | + // TODO(SPARK-31675): For Hive 2.2.0 and earlier, if the table and partition locations |
| 298 | + // do not belong together, we will still get the same error thrown by hive encryption |
| 299 | + // check. |
| 300 | + val hiveVersDoHiveOverwrite: Set[HiveVersion] = Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1) |
| 301 | + val canDisable = !hiveVersDoHiveOverwrite.contains(hiveVersion) && |
| 302 | + canDisableHiveOverwrite(table.location, partitionPath.map(_.toUri).orNull, hadoopConf) |
284 | 303 | // SPARK-18107: Insert overwrite runs much slower than hive-client. |
285 | 304 | // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive |
286 | 305 | // version and we may not want to catch up new Hive version every time. We delete the |
287 | 306 | // Hive partition first and then load data file into the Hive partition. |
288 | | - if (partitionPath.nonEmpty && overwrite) { |
| 307 | + if (partitionPath.nonEmpty && overwrite && canDisable) { |
289 | 308 | partitionPath.foreach { path => |
290 | 309 | val fs = path.getFileSystem(hadoopConf) |
291 | 310 | if (fs.exists(path)) { |
@@ -321,4 +340,42 @@ case class InsertIntoHiveTable( |
321 | 340 | isSrcLocal = false) |
322 | 341 | } |
323 | 342 | } |
| 343 | + |
| 344 | + // scalastyle:off line.size.limit |
| 345 | + /** |
| 346 | + * If the table location and partition location do not belong to the same [[FileSystem]], We |
| 347 | + * should not disable hive overwrite. Otherwise, hive will use the [[FileSystem]] instance belong |
| 348 | + * to the table location to copy files, which will fail in [[FileSystem#checkPath]] |
| 349 | + * see https://github.com/apache/hive/blob/rel/release-2.3.7/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L1648-L1659 |
| 350 | + */ |
| 351 | + // scalastyle:on line.size.limit |
| 352 | + private def canDisableHiveOverwrite( |
| 353 | + tableLocation: URI, |
| 354 | + partitionLocation: URI, |
| 355 | + hadoopConf: Configuration): Boolean = { |
| 356 | + if (tableLocation == null || partitionLocation == null) return true |
| 357 | + val partScheme = partitionLocation.getScheme |
| 358 | + if (partScheme == null) return true // relative path |
| 359 | + |
| 360 | + val tblScheme = tableLocation.getScheme |
| 361 | + // authority and scheme are not case sensitive |
| 362 | + if (partScheme.equalsIgnoreCase(tblScheme)) { |
| 363 | + val partAuthority = partitionLocation.getAuthority |
| 364 | + val tblAuthority = tableLocation.getAuthority |
| 365 | + if (partAuthority != null && tblAuthority != null) { |
| 366 | + tblAuthority.equalsIgnoreCase(partAuthority) |
| 367 | + } else { |
| 368 | + val defaultUri = FileSystem.getDefaultUri(hadoopConf) |
| 369 | + if (tblAuthority != null) { |
| 370 | + tblAuthority.equalsIgnoreCase(defaultUri.getAuthority) |
| 371 | + } else if (partAuthority != null) { |
| 372 | + partAuthority.equalsIgnoreCase(defaultUri.getAuthority) |
| 373 | + } else { |
| 374 | + true |
| 375 | + } |
| 376 | + } |
| 377 | + } else { |
| 378 | + false |
| 379 | + } |
| 380 | + } |
324 | 381 | } |
0 commit comments