Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.collection.immutable

import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetOutputCommitter

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -55,7 +56,7 @@ object SQLConf {
val WAREHOUSE_PATH = SQLConfigBuilder("spark.sql.warehouse.dir")
.doc("The default location for managed databases and tables.")
.stringConf
.createWithDefault("file:${system:user.dir}/spark-warehouse")
.createWithDefault("${system:user.dir}/spark-warehouse")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avulanov can I call on your expertise here? @koertkuipers and I noticed that this causes a problem, in that this path intends to be a local file system path in the local home dir, but will now be interpreted as a path on HDFS for HDFS deployments.

If this is intended to be a local path always, and it seems like it is, then the usages of the new makeQualifiedPath are a bit wrong in that they explicitly resolve the path against the Hadoop file system, which can be HDFS.

Alternatively, just removing user.dir kind of works too, in that it will at least become a path relative to the HDFS user dir I think. Do you know which is better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or use FileSystem.getHomeDirectory?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that would resolve it, probably, if the intent is to let this become a directory on HDFS. I think it was supposed to be a local file so maybe we have to find a Windows-friendly way to bring back the file: prefix.

Maybe make the default value just "spark-warehouse" and then below in def warehousePath, add logic to resolve this explicitly against the LocalFilesystem? I'll give that a shot soon if nobody has better ideas.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the goal is to have a local directory here, always, you could add this to the config constant:

.transform(new File(_).toURI().toString())

Which should be Windows-compatible, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've filed https://issues.apache.org/jira/browse/SPARK-17810 and am about to open a PR for the fix I proposed. I think you're right, though then I wonder, what if I set the value to "/my/local/path"? it still will get interpreted later as an HDFS path, when as I understand it's always supposed to be treated as a local path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.transform also applies to user-provided values, so "/my/local/path" would become "file:/my/local/path" or something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can use File.toURI() for WAREHOUSE_PATH, if we are sure that it is always a local path. However, I remember someone in this thread mentioned that the path might be an amazon s3 path. Is this supposed to happen?


val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations")
.internal()
Expand Down Expand Up @@ -679,7 +680,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH)

def warehousePath: String = getConf(WAREHOUSE_PATH)
def warehousePath: String = new Path(getConf(WAREHOUSE_PATH)).toString

override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,36 +111,40 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)
}

private def appendTrailingSlash(path: String): String = {
if (!path.endsWith(File.separator)) path + File.separator else path
}

test("the qualified path of a database is stored in the catalog") {
val catalog = spark.sessionState.catalog

withTempDir { tmpDir =>
val path = tmpDir.toString
// The generated temp path is not qualified.
assert(!path.startsWith("file:/"))
sql(s"CREATE DATABASE db1 LOCATION '$path'")
val uri = tmpDir.toURI
sql(s"CREATE DATABASE db1 LOCATION '$uri'")
val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri
assert("file" === pathInCatalog.getScheme)
val expectedPath = if (path.endsWith(File.separator)) path.dropRight(1) else path
assert(expectedPath === pathInCatalog.getPath)
val expectedPath = new Path(path).toUri
assert(expectedPath.getPath === pathInCatalog.getPath)

withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
sql(s"CREATE DATABASE db2")
val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri
assert("file" === pathInCatalog.getScheme)
val expectedPath = appendTrailingSlash(spark.sessionState.conf.warehousePath) + "db2.db"
assert(expectedPath === pathInCatalog.getPath)
val pathInCatalog2 = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri
assert("file" === pathInCatalog2.getScheme)
val expectedPath2 = new Path(spark.sessionState.conf.warehousePath + "/" + "db2.db").toUri
assert(expectedPath2.getPath === pathInCatalog2.getPath)
}

sql("DROP DATABASE db1")
sql("DROP DATABASE db2")
}
}

private def makeQualifiedPath(path: String): String = {
// copy-paste from SessionCatalog
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(sparkContext.hadoopConfiguration)
fs.makeQualified(hadoopPath).toString
}

test("Create/Drop Database") {
withTempDir { tmpDir =>
val path = tmpDir.toString
Expand All @@ -154,8 +158,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {

sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
val expectedLocation =
"file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db"
val expectedLocation = makeQualifiedPath(path + "/" + s"$dbNameWithoutBackTicks.db")
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
Expand All @@ -181,8 +184,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbName)
val expectedLocation =
"file:" + appendTrailingSlash(System.getProperty("user.dir")) +
s"spark-warehouse/$dbName.db"
makeQualifiedPath(s"${System.getProperty("user.dir")}/spark-warehouse" +
"/" + s"$dbName.db")
assert(db1 == CatalogDatabase(
dbName,
"",
Expand All @@ -200,17 +203,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val catalog = spark.sessionState.catalog
val databaseNames = Seq("db1", "`database`")
withTempDir { tmpDir =>
val path = tmpDir.toString
val dbPath = "file:" + path
val path = new Path(tmpDir.toString).toUri.toString
databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName Location '$path'")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
val expPath = makeQualifiedPath(tmpDir.toString)
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath,
expPath,
Map.empty))
sql(s"DROP DATABASE $dbName CASCADE")
assert(!catalog.databaseExists(dbNameWithoutBackTicks))
Expand All @@ -233,8 +236,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
val expectedLocation =
"file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db"
val expectedLocation = makeQualifiedPath(path + "/" + s"$dbNameWithoutBackTicks.db")
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
Expand Down Expand Up @@ -263,12 +265,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val partitionClause =
userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("")
val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("")
val uri = path.toURI
sql(
s"""
|CREATE TABLE $tabName $schemaClause
|USING parquet
|OPTIONS (
| path '$path'
| path '$uri'
|)
|$partitionClause
""".stripMargin)
Expand Down Expand Up @@ -367,14 +370,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val path = dir.getCanonicalPath
val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("col1", "col2")
df.write.format("json").save(path)
val uri = dir.toURI

withTable(tabName) {
sql(
s"""
|CREATE TABLE $tabName
|USING json
|OPTIONS (
| path '$path'
| path '$uri'
|)
""".stripMargin)

Expand Down Expand Up @@ -407,14 +411,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
.add("col2", StringType).add("col4", LongType)
.add("col1", IntegerType).add("col3", IntegerType)
val partitionCols = Seq("col1", "col3")
val uri = dir.toURI

withTable(tabName) {
spark.sql(
s"""
|CREATE TABLE $tabName
|USING json
|OPTIONS (
| path '$path'
| path '$uri'
|)
""".stripMargin)
val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName))
Expand Down Expand Up @@ -474,7 +479,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
val location = "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db"
val location = makeQualifiedPath(path + "/" + s"$dbNameWithoutBackTicks.db")

sql(s"CREATE DATABASE $dbName")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.internal

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{QueryTest, Row, SparkSession, SQLContext}
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}
Expand Down Expand Up @@ -214,7 +216,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
// to get the default value, always unset it
spark.conf.unset(SQLConf.WAREHOUSE_PATH.key)
assert(spark.sessionState.conf.warehousePath
=== s"file:${System.getProperty("user.dir")}/spark-warehouse")
=== new Path(s"${System.getProperty("user.dir")}/spark-warehouse").toString)
} finally {
sql(s"set ${SQLConf.WAREHOUSE_PATH}=$original")
}
Expand Down