From fe17691f0592eb8e135509736987b65104756da0 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 15 Oct 2018 16:10:02 -0500 Subject: [PATCH] [SPARK-25738][SQL] Fix LOAD DATA INPATH for hdfs port LOAD DATA INPATH didn't work if the defaultFS included a port for hdfs. Handling this just requires a small change to use the correct URI constructor. --- .../apache/spark/sql/execution/command/tables.scala | 11 +++++++---- .../spark/sql/hive/execution/SQLQuerySuite.scala | 8 ++++++++ 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 2eca1c40a5b3f..64831e5089a67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -306,7 +306,8 @@ case class LoadDataCommand( val loadPath = { if (isLocal) { val localFS = FileContext.getLocalFSFileContext() - makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) + LoadDataCommand.makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), + new Path(path)) } else { val loadPath = new Path(path) // Follow Hive's behavior: @@ -323,7 +324,7 @@ case class LoadDataCommand( // by considering the wild card scenario in mind.as per old logic query param is // been considered while creating URI instance and if path contains wild card char '?' // the remaining charecters after '?' will be removed while forming URI instance - makeQualified(defaultFS, uriPath, loadPath) + LoadDataCommand.makeQualified(defaultFS, uriPath, loadPath) } } val fs = loadPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) @@ -363,7 +364,9 @@ case class LoadDataCommand( CommandUtils.updateTableStats(sparkSession, targetTable) Seq.empty[Row] } +} +object LoadDataCommand { /** * Returns a qualified path object. Method ported from org.apache.hadoop.fs.Path class. * @@ -372,7 +375,7 @@ case class LoadDataCommand( * @param path Path instance based on the path string specified by the user. * @return qualified path object */ - private def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { + private[sql] def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { val pathUri = if (path.isAbsolute()) path.toUri() else new Path(workingDir, path).toUri() if (pathUri.getScheme == null || pathUri.getAuthority == null && defaultUri.getAuthority != null) { @@ -383,7 +386,7 @@ case class LoadDataCommand( pathUri.getAuthority } try { - val newUri = new URI(scheme, authority, pathUri.getPath, pathUri.getFragment) + val newUri = new URI(scheme, authority, pathUri.getPath, null, pathUri.getFragment) new Path(newUri) } catch { case e: URISyntaxException => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index e49aea267026e..dfcde8cc0d39f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import java.io.File +import java.net.URI import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import java.util.{Locale, Set} @@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, Functio import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.command.LoadDataCommand import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} @@ -1985,6 +1987,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("SPARK-25738: defaultFs can have a port") { + val defaultURI = new URI("hdfs://fizz.buzz.com:8020") + val r = LoadDataCommand.makeQualified(defaultURI, new Path("/foo/bar"), new Path("/flim/flam")) + assert(r === new Path("hdfs://fizz.buzz.com:8020/flim/flam")) + } + test("Insert overwrite with partition") { withTable("tableWithPartition") { sql(