Skip to content

Commit d64b355

Browse files
squitoMarcelo Vanzin
authored andcommitted
[SPARK-25738][SQL] Fix LOAD DATA INPATH for hdfs port
## What changes were proposed in this pull request? LOAD DATA INPATH didn't work if the defaultFS included a port for hdfs. Handling this just requires a small change to use the correct URI constructor. ## How was this patch tested? Added a unit test, ran all tests via jenkins Closes #22733 from squito/SPARK-25738. Authored-by: Imran Rashid <irashid@cloudera.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> (cherry picked from commit fdaa998) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent b6e4aca commit d64b355

File tree

2 files changed

+15
-4
lines changed

2 files changed

+15
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,8 @@ case class LoadDataCommand(
306306
val loadPath = {
307307
if (isLocal) {
308308
val localFS = FileContext.getLocalFSFileContext()
309-
makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path))
309+
LoadDataCommand.makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(),
310+
new Path(path))
310311
} else {
311312
val loadPath = new Path(path)
312313
// Follow Hive's behavior:
@@ -323,7 +324,7 @@ case class LoadDataCommand(
323324
// by considering the wild card scenario in mind.as per old logic query param is
324325
// been considered while creating URI instance and if path contains wild card char '?'
325326
// the remaining charecters after '?' will be removed while forming URI instance
326-
makeQualified(defaultFS, uriPath, loadPath)
327+
LoadDataCommand.makeQualified(defaultFS, uriPath, loadPath)
327328
}
328329
}
329330
val fs = loadPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
@@ -363,7 +364,9 @@ case class LoadDataCommand(
363364
CommandUtils.updateTableStats(sparkSession, targetTable)
364365
Seq.empty[Row]
365366
}
367+
}
366368

369+
object LoadDataCommand {
367370
/**
368371
* Returns a qualified path object. Method ported from org.apache.hadoop.fs.Path class.
369372
*
@@ -372,7 +375,7 @@ case class LoadDataCommand(
372375
* @param path Path instance based on the path string specified by the user.
373376
* @return qualified path object
374377
*/
375-
private def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = {
378+
private[sql] def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = {
376379
val pathUri = if (path.isAbsolute()) path.toUri() else new Path(workingDir, path).toUri()
377380
if (pathUri.getScheme == null || pathUri.getAuthority == null &&
378381
defaultUri.getAuthority != null) {
@@ -383,7 +386,7 @@ case class LoadDataCommand(
383386
pathUri.getAuthority
384387
}
385388
try {
386-
val newUri = new URI(scheme, authority, pathUri.getPath, pathUri.getFragment)
389+
val newUri = new URI(scheme, authority, pathUri.getPath, null, pathUri.getFragment)
387390
new Path(newUri)
388391
} catch {
389392
case e: URISyntaxException =>

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.hive.execution
1919

2020
import java.io.File
21+
import java.net.URI
2122
import java.nio.charset.StandardCharsets
2223
import java.sql.{Date, Timestamp}
2324
import java.util.{Locale, Set}
@@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, Functio
3233
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation}
3334
import org.apache.spark.sql.catalyst.parser.ParseException
3435
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
36+
import org.apache.spark.sql.execution.command.LoadDataCommand
3537
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
3638
import org.apache.spark.sql.functions._
3739
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
@@ -1985,6 +1987,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
19851987
}
19861988
}
19871989

1990+
test("SPARK-25738: defaultFs can have a port") {
1991+
val defaultURI = new URI("hdfs://fizz.buzz.com:8020")
1992+
val r = LoadDataCommand.makeQualified(defaultURI, new Path("/foo/bar"), new Path("/flim/flam"))
1993+
assert(r === new Path("hdfs://fizz.buzz.com:8020/flim/flam"))
1994+
}
1995+
19881996
test("Insert overwrite with partition") {
19891997
withTable("tableWithPartition") {
19901998
sql(

0 commit comments

Comments
 (0)