Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,37 +149,18 @@ case class CreateViewCommand(
* SQL based on the analyzed plan, and also creates the proper schema for the view.
*/
private def prepareTable(sparkSession: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
val viewSQL: String =
if (sparkSession.sessionState.conf.canonicalView) {
val logicalPlan =
if (tableDesc.schema.isEmpty) {
analyzedPlan
} else {
val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
case (attr, col) => Alias(attr, col.name)()
}
sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}
new SQLBuilder(logicalPlan).toSQL
} else {
// When user specified column names for view, we should create a project to do the renaming.
// When no column name specified, we still need to create a project to declare the columns
// we need, to make us more robust to top level `*`s.
val viewOutput = {
val columnNames = analyzedPlan.output.map(f => quote(f.name))
if (tableDesc.schema.isEmpty) {
columnNames.mkString(", ")
} else {
columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
case (name, alias) => s"$name AS $alias"
}.mkString(", ")
val viewSQL: String = {
val logicalPlan =
if (tableDesc.schema.isEmpty) {
analyzedPlan
} else {
val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
case (attr, col) => Alias(attr, col.name)()
}
sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}

val viewText = tableDesc.viewText.get
val viewName = quote(tableDesc.identifier.table)
s"SELECT $viewOutput FROM ($viewText) $viewName"
}
new SQLBuilder(logicalPlan).toSQL
}

// Validate the view SQL - make sure we can parse it and analyze it.
// If we cannot analyze the generated query, there is probably a bug in SQL generation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,25 +258,6 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView")
.internal()
.doc("When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands. " +
"Note that this function is experimental and should ony be used when you are using " +
"non-hive-compatible tables written by Spark SQL. The SQL string used to create " +
"view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " +
"possible, or you may get wrong result.")
.booleanConf
.createWithDefault(true)

val CANONICAL_NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView.canonical")
.internal()
.doc("When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " +
"CREATE VIEW statement using SQL query string generated from view definition logical " +
"plan. If the logical plan doesn't have a SQL representation, we fallback to the " +
"original native view implementation.")
.booleanConf
.createWithDefault(true)

val COLUMN_NAME_OF_CORRUPT_RECORD = SQLConfigBuilder("spark.sql.columnNameOfCorruptRecord")
.doc("The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
.stringConf
Expand Down Expand Up @@ -613,8 +594,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)

def nativeView: Boolean = getConf(NATIVE_VIEW)

def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)

def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
Expand All @@ -625,8 +604,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)

def canonicalView: Boolean = getConf(CANONICAL_NATIVE_VIEW)

def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)

def subexpressionEliminationEnabled: Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,17 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {

test("reset - internal conf") {
spark.sessionState.conf.clear()
val original = spark.conf.get(SQLConf.NATIVE_VIEW)
val original = spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS)
try {
assert(spark.conf.get(SQLConf.NATIVE_VIEW) === true)
sql(s"set ${SQLConf.NATIVE_VIEW.key}=false")
assert(spark.conf.get(SQLConf.NATIVE_VIEW) === false)
assert(sql(s"set").where(s"key = '${SQLConf.NATIVE_VIEW.key}'").count() == 1)
assert(spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100)
sql(s"set ${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}=10")
assert(spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 10)
assert(sql(s"set").where(s"key = '${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}'").count() == 1)
sql(s"reset")
assert(spark.conf.get(SQLConf.NATIVE_VIEW) === true)
assert(sql(s"set").where(s"key = '${SQLConf.NATIVE_VIEW.key}'").count() == 0)
assert(spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100)
assert(sql(s"set").where(s"key = '${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}'").count() == 0)
} finally {
sql(s"set ${SQLConf.NATIVE_VIEW}=$original")
sql(s"set ${SQLConf.OPTIMIZER_MAX_ITERATIONS}=$original")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,13 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}

test("correctly parse CREATE VIEW statement") {
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
sql(
"""CREATE VIEW IF NOT EXISTS
|default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
|TBLPROPERTIES ('a' = 'b')
|AS SELECT * FROM jt""".stripMargin)
checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
sql("DROP VIEW testView")
}
sql(
"""CREATE VIEW IF NOT EXISTS
|default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
|TBLPROPERTIES ('a' = 'b')
|AS SELECT * FROM jt""".stripMargin)
checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
sql("DROP VIEW testView")
}

test("correctly parse CREATE TEMPORARY VIEW statement") {
Expand Down Expand Up @@ -145,18 +143,16 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}

test("correctly handle CREATE VIEW IF NOT EXISTS") {
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
withTable("jt2") {
withView("testView") {
sql("CREATE VIEW testView AS SELECT id FROM jt")
withTable("jt2") {
withView("testView") {
sql("CREATE VIEW testView AS SELECT id FROM jt")

val df = (1 until 10).map(i => i -> i).toDF("i", "j")
df.write.format("json").saveAsTable("jt2")
sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")
val df = (1 until 10).map(i => i -> i).toDF("i", "j")
df.write.format("json").saveAsTable("jt2")
sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")

// make sure our view doesn't change.
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
}
// make sure our view doesn't change.
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
}
}
}
Expand All @@ -174,134 +170,108 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

Seq(true, false).foreach { enabled =>
val prefix = (if (enabled) "With" else "Without") + " canonical native view: "
test(s"$prefix correctly handle CREATE OR REPLACE VIEW") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
withTable("jt2") {
sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))

val df = (1 until 10).map(i => i -> i).toDF("i", "j")
df.write.format("json").saveAsTable("jt2")
sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
// make sure the view has been changed.
checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))

sql("DROP VIEW testView")

val e = intercept[AnalysisException] {
sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
}
assert(e.message.contains(
"CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed"))
}
test("correctly handle CREATE OR REPLACE VIEW") {
withTable("jt2") {
sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))

val df = (1 until 10).map(i => i -> i).toDF("i", "j")
df.write.format("json").saveAsTable("jt2")
sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
// make sure the view has been changed.
checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))

sql("DROP VIEW testView")

val e = intercept[AnalysisException] {
sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
}
assert(e.message.contains(
"CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed"))
}
}

test(s"$prefix correctly handle ALTER VIEW") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
withTable("jt2") {
withView("testView") {
sql("CREATE VIEW testView AS SELECT id FROM jt")

val df = (1 until 10).map(i => i -> i).toDF("i", "j")
df.write.format("json").saveAsTable("jt2")
sql("ALTER VIEW testView AS SELECT * FROM jt2")
// make sure the view has been changed.
checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
}
}
test("correctly handle ALTER VIEW") {
withTable("jt2") {
withView("testView") {
sql("CREATE VIEW testView AS SELECT id FROM jt")

val df = (1 until 10).map(i => i -> i).toDF("i", "j")
df.write.format("json").saveAsTable("jt2")
sql("ALTER VIEW testView AS SELECT * FROM jt2")
// make sure the view has been changed.
checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
}
}
}

test(s"$prefix create hive view for json table") {
// json table is not hive-compatible, make sure the new flag fix it.
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
withView("testView") {
sql("CREATE VIEW testView AS SELECT id FROM jt")
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
}
}
test("create hive view for json table") {
// json table is not hive-compatible, make sure the new flag fix it.
withView("testView") {
sql("CREATE VIEW testView AS SELECT id FROM jt")
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
}
}

test(s"$prefix create hive view for partitioned parquet table") {
// partitioned parquet table is not hive-compatible, make sure the new flag fix it.
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
withTable("parTable") {
withView("testView") {
val df = Seq(1 -> "a").toDF("i", "j")
df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
}
}
test("create hive view for partitioned parquet table") {
// partitioned parquet table is not hive-compatible, make sure the new flag fix it.
withTable("parTable") {
withView("testView") {
val df = Seq(1 -> "a").toDF("i", "j")
df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
}
}
}

test("CTE within view") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
withView("cte_view") {
sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w")
checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
}
withView("cte_view") {
sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w")
checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
}
}

test("Using view after switching current database") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
withView("v") {
sql("CREATE VIEW v AS SELECT * FROM src")
withTempDatabase { db =>
activateDatabase(db) {
// Should look up table `src` in database `default`.
checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))

// The new `src` table shouldn't be scanned.
sql("CREATE TABLE src(key INT, value STRING)")
checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
}
withView("v") {
sql("CREATE VIEW v AS SELECT * FROM src")
withTempDatabase { db =>
activateDatabase(db) {
// Should look up table `src` in database `default`.
checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))

// The new `src` table shouldn't be scanned.
sql("CREATE TABLE src(key INT, value STRING)")
checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
}
}
}
}

test("Using view after adding more columns") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
withTable("add_col") {
spark.range(10).write.saveAsTable("add_col")
withView("v") {
sql("CREATE VIEW v AS SELECT * FROM add_col")
spark.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col")
checkAnswer(sql("SELECT * FROM v"), spark.range(10).toDF())
}
withTable("add_col") {
spark.range(10).write.saveAsTable("add_col")
withView("v") {
sql("CREATE VIEW v AS SELECT * FROM add_col")
spark.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col")
checkAnswer(sql("SELECT * FROM v"), spark.range(10).toDF())
}
}
}

test("create hive view for joined tables") {
// make sure the new flag can handle some complex cases like join and schema change.
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
withTable("jt1", "jt2") {
spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1")
spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2")
sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2")
checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))

val df = (1 until 10).map(i => i -> i).toDF("id1", "newCol")
df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("jt1")
checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))

sql("DROP VIEW testView")
}
withTable("jt1", "jt2") {
spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1")
spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2")
sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2")
checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))

val df = (1 until 10).map(i => i -> i).toDF("id1", "newCol")
df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("jt1")
checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))

sql("DROP VIEW testView")
}
}

Expand Down