Skip to content

Commit 83d87bd

Browse files
committed
making ANSI store assignment policy as default
1 parent ef81525 commit 83d87bd

File tree

4 files changed

+17
-28
lines changed

4 files changed

+17
-28
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2503,9 +2503,9 @@ class Analyzer(
25032503
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
25042504
case append @ AppendData(table, query, _, isByName)
25052505
if table.resolved && query.resolved && !append.outputResolved =>
2506+
validateStoreAssignmentPolicy()
25062507
val projection =
2507-
TableOutputResolver.resolveOutputColumns(
2508-
table.name, table.output, query, isByName, conf, storeAssignmentPolicy)
2508+
TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf)
25092509

25102510
if (projection != query) {
25112511
append.copy(query = projection)
@@ -2515,9 +2515,9 @@ class Analyzer(
25152515

25162516
case overwrite @ OverwriteByExpression(table, _, query, _, isByName)
25172517
if table.resolved && query.resolved && !overwrite.outputResolved =>
2518+
validateStoreAssignmentPolicy()
25182519
val projection =
2519-
TableOutputResolver.resolveOutputColumns(
2520-
table.name, table.output, query, isByName, conf, storeAssignmentPolicy)
2520+
TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf)
25212521

25222522
if (projection != query) {
25232523
overwrite.copy(query = projection)
@@ -2527,9 +2527,9 @@ class Analyzer(
25272527

25282528
case overwrite @ OverwritePartitionsDynamic(table, query, _, isByName)
25292529
if table.resolved && query.resolved && !overwrite.outputResolved =>
2530+
validateStoreAssignmentPolicy()
25302531
val projection =
2531-
TableOutputResolver.resolveOutputColumns(
2532-
table.name, table.output, query, isByName, conf, storeAssignmentPolicy)
2532+
TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf)
25332533

25342534
if (projection != query) {
25352535
overwrite.copy(query = projection)
@@ -2539,16 +2539,14 @@ class Analyzer(
25392539
}
25402540
}
25412541

2542-
private def storeAssignmentPolicy: StoreAssignmentPolicy.Value = {
2543-
val policy = conf.storeAssignmentPolicy.getOrElse(StoreAssignmentPolicy.STRICT)
2542+
private def validateStoreAssignmentPolicy(): Unit = {
25442543
// SPARK-28730: LEGACY store assignment policy is disallowed in data source v2.
2545-
if (policy == StoreAssignmentPolicy.LEGACY) {
2544+
if (conf.storeAssignmentPolicy == StoreAssignmentPolicy.LEGACY) {
25462545
val configKey = SQLConf.STORE_ASSIGNMENT_POLICY.key
25472546
throw new AnalysisException(s"""
25482547
|"LEGACY" store assignment policy is disallowed in Spark data source V2.
25492548
|Please set the configuration $configKey to other values.""".stripMargin)
25502549
}
2551-
policy
25522550
}
25532551

25542552
private def commonNaturalJoinProcessing(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ object TableOutputResolver {
3232
expected: Seq[Attribute],
3333
query: LogicalPlan,
3434
byName: Boolean,
35-
conf: SQLConf,
36-
storeAssignmentPolicy: StoreAssignmentPolicy.Value): LogicalPlan = {
35+
conf: SQLConf): LogicalPlan = {
3736

3837
if (expected.size < query.output.size) {
3938
throw new AnalysisException(
@@ -47,8 +46,7 @@ object TableOutputResolver {
4746
expected.flatMap { tableAttr =>
4847
query.resolve(Seq(tableAttr.name), conf.resolver) match {
4948
case Some(queryExpr) =>
50-
checkField(
51-
tableAttr, queryExpr, byName, conf, storeAssignmentPolicy, err => errors += err)
49+
checkField(tableAttr, queryExpr, byName, conf, err => errors += err)
5250
case None =>
5351
errors += s"Cannot find data for output column '${tableAttr.name}'"
5452
None
@@ -66,8 +64,7 @@ object TableOutputResolver {
6664

6765
query.output.zip(expected).flatMap {
6866
case (queryExpr, tableAttr) =>
69-
checkField(
70-
tableAttr, queryExpr, byName, conf, storeAssignmentPolicy, err => errors += err)
67+
checkField(tableAttr, queryExpr, byName, conf, err => errors += err)
7168
}
7269
}
7370

@@ -88,9 +85,9 @@ object TableOutputResolver {
8885
queryExpr: NamedExpression,
8986
byName: Boolean,
9087
conf: SQLConf,
91-
storeAssignmentPolicy: StoreAssignmentPolicy.Value,
9288
addError: String => Unit): Option[NamedExpression] = {
9389

90+
val storeAssignmentPolicy = conf.storeAssignmentPolicy
9491
lazy val outputField = if (tableAttr.dataType.sameType(queryExpr.dataType) &&
9592
tableAttr.name == queryExpr.name &&
9693
tableAttr.metadata == queryExpr.metadata) {

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1731,7 +1731,7 @@ object SQLConf {
17311731
.stringConf
17321732
.transform(_.toUpperCase(Locale.ROOT))
17331733
.checkValues(StoreAssignmentPolicy.values.map(_.toString))
1734-
.createOptional
1734+
.createWithDefault(StoreAssignmentPolicy.ANSI.toString)
17351735

17361736
val ANSI_ENABLED = buildConf("spark.sql.ansi.enabled")
17371737
.doc("When true, Spark tries to conform to the ANSI SQL specification: 1. Spark will " +
@@ -2461,8 +2461,8 @@ class SQLConf extends Serializable with Logging {
24612461
def partitionOverwriteMode: PartitionOverwriteMode.Value =
24622462
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
24632463

2464-
def storeAssignmentPolicy: Option[StoreAssignmentPolicy.Value] =
2465-
getConf(STORE_ASSIGNMENT_POLICY).map(StoreAssignmentPolicy.withName)
2464+
def storeAssignmentPolicy: StoreAssignmentPolicy.Value =
2465+
StoreAssignmentPolicy.withName(getConf(STORE_ASSIGNMENT_POLICY))
24662466

24672467
def ansiEnabled: Boolean = getConf(ANSI_ENABLED)
24682468

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -190,14 +190,11 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
190190
query
191191
}
192192

193-
// SPARK-28730: for V1 data source, we use the "LEGACY" as default store assignment policy.
194-
// TODO: use ANSI store assignment policy by default in SPARK-28495.
195-
val storeAssignmentPolicy = conf.storeAssignmentPolicy.getOrElse(StoreAssignmentPolicy.LEGACY)
196193
c.copy(
197194
tableDesc = existingTable,
198195
query = Some(TableOutputResolver.resolveOutputColumns(
199196
tableDesc.qualifiedName, existingTable.schema.toAttributes, newQuery,
200-
byName = true, conf, storeAssignmentPolicy)))
197+
byName = true, conf)))
201198

202199
// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
203200
// config, and do various checks:
@@ -403,11 +400,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
403400
s"including ${staticPartCols.size} partition column(s) having constant value(s).")
404401
}
405402

406-
// SPARK-28730: for V1 data source, we use the "LEGACY" as default store assignment policy.
407-
// TODO: use ANSI store assignment policy by default in SPARK-28495.
408-
val storeAssignmentPolicy = conf.storeAssignmentPolicy.getOrElse(StoreAssignmentPolicy.LEGACY)
409403
val newQuery = TableOutputResolver.resolveOutputColumns(
410-
tblName, expectedColumns, insert.query, byName = false, conf, storeAssignmentPolicy)
404+
tblName, expectedColumns, insert.query, byName = false, conf)
411405
if (normalizedPartSpec.nonEmpty) {
412406
if (normalizedPartSpec.size != partColNames.length) {
413407
throw new AnalysisException(

0 commit comments

Comments
 (0)