Skip to content

Commit c411d26

Browse files
dchvnhuaxingao
authored andcommitted
[SPARK-37330][SQL] Migrate ReplaceTableStatement to v2 command
### What changes were proposed in this pull request? This PR migrates ReplaceTableStatement to the v2 command ### Why are the changes needed? Migrate to the standard V2 framework ### Does this PR introduce any user-facing change? No ### How was this patch tested? existing tests Closes #34764 from dchvn/migrate-replacetable. Authored-by: dch nguyen <dgd_contributor@viettel.com.vn> Signed-off-by: Huaxin Gao <huaxin_gao@apple.com>
1 parent 1f3eb73 commit c411d26

File tree

9 files changed

+56
-84
lines changed

9 files changed

+56
-84
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,13 @@ import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Lo
2828
class ResolveCatalogs(val catalogManager: CatalogManager)
2929
extends Rule[LogicalPlan] with LookupCatalog {
3030
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
31-
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
3231

3332
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
3433
case UnresolvedDBObjectName(CatalogAndNamespace(catalog, name), isNamespace) if isNamespace =>
3534
ResolvedDBObjectName(catalog, name)
3635

3736
case UnresolvedDBObjectName(CatalogAndIdentifier(catalog, identifier), _) =>
3837
ResolvedDBObjectName(catalog, identifier.namespace :+ identifier.name())
39-
40-
case c @ ReplaceTableStatement(
41-
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
42-
ReplaceTable(
43-
catalog.asTableCatalog,
44-
tbl.asIdentifier,
45-
c.tableSchema,
46-
// convert the bucket spec and add it as a transform
47-
c.partitioning ++ c.bucketSpec.map(_.asTransform),
48-
convertTableProperties(c),
49-
orCreate = c.orCreate)
5038
}
5139

5240
object NonSessionCatalogAndTable {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3487,7 +3487,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
34873487
}
34883488

34893489
/**
3490-
* Replace a table, returning a [[ReplaceTableStatement]] or [[ReplaceTableAsSelect]]
3490+
* Replace a table, returning a [[ReplaceTable]] or [[ReplaceTableAsSelect]]
34913491
* logical plan.
34923492
*
34933493
* Expected format:
@@ -3540,6 +3540,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
35403540
}
35413541

35423542
val partitioning = partitionExpressions(partTransforms, partCols, ctx)
3543+
val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment,
3544+
serdeInfo, false)
35433545

35443546
Option(ctx.query).map(plan) match {
35453547
case Some(_) if columns.nonEmpty =>
@@ -3554,8 +3556,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
35543556
ctx)
35553557

35563558
case Some(query) =>
3557-
val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment,
3558-
serdeInfo, false)
35593559
ReplaceTableAsSelect(
35603560
UnresolvedDBObjectName(table, isNamespace = false),
35613561
partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate)
@@ -3564,8 +3564,9 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
35643564
// Note: table schema includes both the table columns list and the partition columns
35653565
// with data type.
35663566
val schema = StructType(columns ++ partCols)
3567-
ReplaceTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
3568-
options, location, comment, serdeInfo, orCreate = orCreate)
3567+
ReplaceTable(
3568+
UnresolvedDBObjectName(table, isNamespace = false),
3569+
schema, partitioning, tableSpec, orCreate = orCreate)
35693570
}
35703571
}
35713572

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818
package org.apache.spark.sql.catalyst.plans.logical
1919

2020
import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition}
21-
import org.apache.spark.sql.catalyst.catalog.BucketSpec
2221
import org.apache.spark.sql.catalyst.expressions.Attribute
2322
import org.apache.spark.sql.catalyst.trees.{LeafLike, UnaryLike}
24-
import org.apache.spark.sql.connector.expressions.Transform
2523
import org.apache.spark.sql.errors.QueryExecutionErrors
26-
import org.apache.spark.sql.types.{DataType, StructType}
24+
import org.apache.spark.sql.types.DataType
2725

2826
/**
2927
* A logical plan node that contains exactly what was parsed from SQL.
@@ -123,25 +121,6 @@ object SerdeInfo {
123121
}
124122
}
125123

126-
/**
127-
* A REPLACE TABLE command, as parsed from SQL.
128-
*
129-
* If the table exists prior to running this command, executing this statement
130-
* will replace the table's metadata and clear the underlying rows from the table.
131-
*/
132-
case class ReplaceTableStatement(
133-
tableName: Seq[String],
134-
tableSchema: StructType,
135-
partitioning: Seq[Transform],
136-
bucketSpec: Option[BucketSpec],
137-
properties: Map[String, String],
138-
provider: Option[String],
139-
options: Map[String, String],
140-
location: Option[String],
141-
comment: Option[String],
142-
serde: Option[SerdeInfo],
143-
orCreate: Boolean) extends LeafParsedStatement
144-
145124
/**
146125
* Column data as parsed by ALTER TABLE ... (ADD|REPLACE) COLUMNS.
147126
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,12 +264,23 @@ case class CreateTableAsSelect(
264264
* The persisted table will have no contents as a result of this operation.
265265
*/
266266
case class ReplaceTable(
267-
catalog: TableCatalog,
268-
tableName: Identifier,
267+
name: LogicalPlan,
269268
tableSchema: StructType,
270269
partitioning: Seq[Transform],
271-
properties: Map[String, String],
272-
orCreate: Boolean) extends LeafCommand with V2CreateTablePlan {
270+
tableSpec: TableSpec,
271+
orCreate: Boolean) extends UnaryCommand with V2CreateTablePlan {
272+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
273+
274+
override def child: LogicalPlan = name
275+
276+
override def tableName: Identifier = {
277+
assert(child.resolved)
278+
child.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
279+
}
280+
281+
override protected def withNewChildInternal(newChild: LogicalPlan): V2CreateTablePlan =
282+
copy(name = newChild)
283+
273284
override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = {
274285
this.copy(partitioning = rewritten)
275286
}

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.Collections
2323
import scala.collection.JavaConverters._
2424

2525
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
26-
import org.apache.spark.sql.catalyst.plans.logical.{ReplaceTableStatement, SerdeInfo, TableSpec}
26+
import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
2727
import org.apache.spark.sql.connector.catalog.TableChange._
2828
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
2929
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
@@ -305,10 +305,6 @@ private[sql] object CatalogV2Util {
305305
catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)
306306
}
307307

308-
def convertTableProperties(r: ReplaceTableStatement): Map[String, String] = {
309-
convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider)
310-
}
311-
312308
def convertTableProperties(t: TableSpec): Map[String, String] = {
313309
val props = convertTableProperties(
314310
t.properties, t.options, t.serde, t.location, t.comment, t.provider, t.external)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -721,7 +721,7 @@ class DDLParserSuite extends AnalysisTest {
721721
assert(create.ignoreIfExists == expectedIfNotExists)
722722
case ctas: CreateTableAsSelect if newTableToken == "CREATE" =>
723723
assert(ctas.ignoreIfExists == expectedIfNotExists)
724-
case replace: ReplaceTableStatement if newTableToken == "REPLACE" =>
724+
case replace: ReplaceTable if newTableToken == "REPLACE" =>
725725
case replace: ReplaceTableAsSelect if newTableToken == "REPLACE" =>
726726
case other =>
727727
fail("First token in statement does not match the expected parsed plan; CREATE TABLE" +
@@ -2298,18 +2298,18 @@ class DDLParserSuite extends AnalysisTest {
22982298
create.tableSpec.comment,
22992299
create.tableSpec.serde,
23002300
create.tableSpec.external)
2301-
case replace: ReplaceTableStatement =>
2301+
case replace: ReplaceTable =>
23022302
TableSpec(
2303-
replace.tableName,
2303+
replace.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
23042304
Some(replace.tableSchema),
23052305
replace.partitioning,
2306-
replace.bucketSpec,
2307-
replace.properties,
2308-
replace.provider,
2309-
replace.options,
2310-
replace.location,
2311-
replace.comment,
2312-
replace.serde)
2306+
replace.tableSpec.bucketSpec,
2307+
replace.tableSpec.properties,
2308+
replace.tableSpec.provider,
2309+
replace.tableSpec.options,
2310+
replace.tableSpec.location,
2311+
replace.tableSpec.comment,
2312+
replace.tableSpec.serde)
23132313
case ctas: CreateTableAsSelect =>
23142314
TableSpec(
23152315
ctas.name.asInstanceOf[UnresolvedDBObjectName].nameParts,

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -188,20 +188,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
188188

189189
// For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
190190
// session catalog and the table provider is not v2.
191-
case c @ ReplaceTableStatement(
192-
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
193-
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
194-
if (!isV2Provider(provider)) {
191+
case c @ ReplaceTable(
192+
ResolvedDBObjectName(catalog, name), _, _, _, _) =>
193+
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
194+
if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
195195
throw QueryCompilationErrors.replaceTableOnlySupportedWithV2TableError
196196
} else {
197-
ReplaceTable(
198-
catalog.asTableCatalog,
199-
tbl.asIdentifier,
200-
c.tableSchema,
201-
// convert the bucket spec and add it as a transform
202-
c.partitioning ++ c.bucketSpec.map(_.asTransform),
203-
convertTableProperties(c),
204-
orCreate = c.orCreate)
197+
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
198+
c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform),
199+
tableSpec = newTableSpec)
205200
}
206201

207202
case c @ ReplaceTableAsSelect(ResolvedDBObjectName(catalog, _), _, _, _, _, _)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning
2828
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2929
import org.apache.spark.sql.catalyst.plans.logical._
3030
import org.apache.spark.sql.catalyst.util.toPrettySQL
31-
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog}
31+
import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog}
3232
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
3333
import org.apache.spark.sql.connector.expressions.{FieldReference, Literal => V2Literal, LiteralValue}
3434
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse => V2AlwaysFalse, AlwaysTrue => V2AlwaysTrue, And => V2And, EqualNullSafe => V2EqualNullSafe, EqualTo => V2EqualTo, Filter => V2Filter, GreaterThan => V2GreaterThan, GreaterThanOrEqual => V2GreaterThanOrEqual, In => V2In, IsNotNull => V2IsNotNull, IsNull => V2IsNull, LessThan => V2LessThan, LessThanOrEqual => V2LessThanOrEqual, Not => V2Not, Or => V2Or, StringContains => V2StringContains, StringEndsWith => V2StringEndsWith, StringStartsWith => V2StringStartsWith}
@@ -184,19 +184,16 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
184184
case RefreshTable(r: ResolvedTable) =>
185185
RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil
186186

187-
case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
188-
val newProps = props.get(TableCatalog.PROP_LOCATION).map { loc =>
189-
props + (TableCatalog.PROP_LOCATION -> makeQualifiedDBObjectPath(loc))
190-
}.getOrElse(props)
191-
val propsWithOwner = CatalogV2Util.withDefaultOwnership(newProps)
187+
case ReplaceTable(ResolvedDBObjectName(catalog, ident), schema, parts, tableSpec, orCreate) =>
188+
val qualifiedLocation = tableSpec.location.map(makeQualifiedDBObjectPath(_))
192189
catalog match {
193190
case staging: StagingTableCatalog =>
194-
AtomicReplaceTableExec(
195-
staging, ident, schema, parts, propsWithOwner, orCreate = orCreate,
196-
invalidateCache) :: Nil
191+
AtomicReplaceTableExec(staging, ident.asIdentifier, schema, parts,
192+
tableSpec.copy(location = qualifiedLocation),
193+
orCreate = orCreate, invalidateCache) :: Nil
197194
case _ =>
198-
ReplaceTableExec(
199-
catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate,
195+
ReplaceTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, parts,
196+
tableSpec.copy(location = qualifiedLocation), orCreate = orCreate,
200197
invalidateCache) :: Nil
201198
}
202199

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import scala.collection.JavaConverters._
2222
import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
2424
import org.apache.spark.sql.catalyst.expressions.Attribute
25-
import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
25+
import org.apache.spark.sql.catalyst.plans.logical.TableSpec
26+
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
2627
import org.apache.spark.sql.connector.expressions.Transform
2728
import org.apache.spark.sql.errors.QueryCompilationErrors
2829
import org.apache.spark.sql.types.StructType
@@ -33,10 +34,12 @@ case class ReplaceTableExec(
3334
ident: Identifier,
3435
tableSchema: StructType,
3536
partitioning: Seq[Transform],
36-
tableProperties: Map[String, String],
37+
tableSpec: TableSpec,
3738
orCreate: Boolean,
3839
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends LeafV2CommandExec {
3940

41+
val tableProperties = CatalogV2Util.convertTableProperties(tableSpec)
42+
4043
override protected def run(): Seq[InternalRow] = {
4144
if (catalog.tableExists(ident)) {
4245
val table = catalog.loadTable(ident)
@@ -57,10 +60,12 @@ case class AtomicReplaceTableExec(
5760
identifier: Identifier,
5861
tableSchema: StructType,
5962
partitioning: Seq[Transform],
60-
tableProperties: Map[String, String],
63+
tableSpec: TableSpec,
6164
orCreate: Boolean,
6265
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends LeafV2CommandExec {
6366

67+
val tableProperties = CatalogV2Util.convertTableProperties(tableSpec)
68+
6469
override protected def run(): Seq[InternalRow] = {
6570
if (catalog.tableExists(identifier)) {
6671
val table = catalog.loadTable(identifier)

0 commit comments

Comments
 (0)