Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ statement
: CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call
| ALTER TABLE multipartIdentifier ADD PARTITION FIELD transform (AS name=identifier)? #addPartitionField
| ALTER TABLE multipartIdentifier DROP PARTITION FIELD transform #dropPartitionField
| ALTER TABLE multipartIdentifier REPLACE PARTITION FIELD transform WITH transform (AS name=identifier)? #replacePartitionField
| ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering
;

Expand Down Expand Up @@ -179,8 +180,10 @@ LOCALLY: 'LOCALLY';
NULLS: 'NULLS';
ORDERED: 'ORDERED';
PARTITION: 'PARTITION';
REPLACE: 'REPLACE';
TABLE: 'TABLE';
UNORDERED: 'UNORDERED';
WITH: 'WITH';
WRITE: 'WRITE';

TRUE: 'TRUE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
normalized.startsWith("alter table") && (
normalized.contains("add partition field") ||
normalized.contains("drop partition field") ||
normalized.contains("replace partition field") ||
normalized.contains("write ordered by") ||
normalized.contains("write locally ordered by") ||
normalized.contains("write distributed by") ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.NamedArgument
import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.connector.expressions
import org.apache.spark.sql.connector.expressions.ApplyTransform
Expand Down Expand Up @@ -80,6 +81,18 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
typedVisit[Transform](ctx.transform))
}


/**
* Create an CHANGE PARTITION FIELD logical command.
*/
override def visitReplacePartitionField(ctx: ReplacePartitionFieldContext): ReplacePartitionField = withOrigin(ctx) {
ReplacePartitionField(
typedVisit[Seq[String]](ctx.multipartIdentifier),
typedVisit[Transform](ctx.transform(0)),
typedVisit[Transform](ctx.transform(1)),
Option(ctx.name).map(_.getText))
}

/**
* Create a [[SetWriteDistributionAndOrdering]] for changing the write distribution and ordering.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.expressions.Transform

case class ReplacePartitionField(
table: Seq[String],
transformFrom: Transform,
transformTo: Transform,
name: Option[String]) extends Command {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override def simpleString(maxFields: Int): String = {
s"ReplacePartitionField ${table.quoted} ${transformFrom.describe} " +
s"with ${name.map(n => s"$n=").getOrElse("")}${transformTo.describe}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilterWithCardinal
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.MergeInto
import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog
Expand All @@ -62,6 +63,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy {
case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform) =>
DropPartitionFieldExec(catalog, ident, transform) :: Nil

case ReplacePartitionField(IcebergCatalogAndIdentifier(catalog, ident), transformFrom, transformTo, name) =>
ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo, name) :: Nil

case SetWriteDistributionAndOrdering(
IcebergCatalogAndIdentifier(catalog, ident), distributionMode, ordering) =>
SetWriteDistributionAndOrderingExec(catalog, ident, distributionMode, ordering) :: Nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.Spark3Util
import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.connector.expressions.FieldReference
import org.apache.spark.sql.connector.expressions.IdentityTransform
import org.apache.spark.sql.connector.expressions.Transform

case class ReplacePartitionFieldExec(
catalog: TableCatalog,
ident: Identifier,
transformFrom: Transform,
transformTo: Transform,
name: Option[String]) extends V2CommandExec {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
catalog.loadTable(ident) match {
case iceberg: SparkTable =>
val schema = iceberg.table.schema
transformFrom match {
case IdentityTransform(FieldReference(parts)) if parts.size == 1 && schema.findField(parts.head) == null =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it work to match on FieldReference(Seq(name)) instead of checking parts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought it might be shorter. Not a problem to have it this way.

// the name is not present in the Iceberg schema, so it must be a partition field name, not a column name
iceberg.table.updateSpec()
.removeField(parts.head)
.addField(name.orNull, Spark3Util.toIcebergTerm(transformTo))
.commit()

case _ =>
iceberg.table.updateSpec()
.removeField(Spark3Util.toIcebergTerm(transformFrom))
.addField(name.orNull, Spark3Util.toIcebergTerm(transformTo))
.commit()
}

case table =>
throw new UnsupportedOperationException(s"Cannot replace partition field in non-Iceberg table: $table")
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"ReplacePartitionField ${catalog.name}.${ident.quoted} ${transformFrom.describe} " +
s"with ${name.map(n => s"$n=").getOrElse("")}${transformTo.describe}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,100 @@ public void testDropPartitionByName() {

Assert.assertEquals("Should have new spec field", expected, table.spec());
}

@Test
public void testReplacePartition() {
sql("CREATE TABLE %s (id bigint NOT NULL, category string, ts timestamp, data string) USING iceberg", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertTrue("Table should start unpartitioned", table.spec().isUnpartitioned());

sql("ALTER TABLE %s ADD PARTITION FIELD days(ts)", tableName);
table.refresh();
PartitionSpec expected = PartitionSpec.builderFor(table.schema())
.withSpecId(1)
.day("ts")
.build();
Assert.assertEquals("Should have new spec field", expected, table.spec());

sql("ALTER TABLE %s REPLACE PARTITION FIELD days(ts) WITH hours(ts)", tableName);
table.refresh();
expected = PartitionSpec.builderFor(table.schema())
.withSpecId(2)
.alwaysNull("ts", "ts_day")
.hour("ts")
.build();
Assert.assertEquals("Should changed from daily to hourly partitioned field", expected, table.spec());
}

@Test
public void testReplacePartitionAndRename() {
sql("CREATE TABLE %s (id bigint NOT NULL, category string, ts timestamp, data string) USING iceberg", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertTrue("Table should start unpartitioned", table.spec().isUnpartitioned());

sql("ALTER TABLE %s ADD PARTITION FIELD days(ts)", tableName);
table.refresh();
PartitionSpec expected = PartitionSpec.builderFor(table.schema())
.withSpecId(1)
.day("ts")
.build();
Assert.assertEquals("Should have new spec field", expected, table.spec());

sql("ALTER TABLE %s REPLACE PARTITION FIELD days(ts) WITH hours(ts) AS hour_col", tableName);
table.refresh();
expected = PartitionSpec.builderFor(table.schema())
.withSpecId(2)
.alwaysNull("ts", "ts_day")
.hour("ts", "hour_col")
.build();
Assert.assertEquals("Should changed from daily to hourly partitioned field", expected, table.spec());
}

@Test
public void testReplaceNamedPartition() {
sql("CREATE TABLE %s (id bigint NOT NULL, category string, ts timestamp, data string) USING iceberg", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertTrue("Table should start unpartitioned", table.spec().isUnpartitioned());

sql("ALTER TABLE %s ADD PARTITION FIELD days(ts) AS day_col", tableName);
table.refresh();
PartitionSpec expected = PartitionSpec.builderFor(table.schema())
.withSpecId(1)
.day("ts", "day_col")
.build();
Assert.assertEquals("Should have new spec field", expected, table.spec());

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_col WITH hours(ts)", tableName);
table.refresh();
expected = PartitionSpec.builderFor(table.schema())
.withSpecId(2)
.alwaysNull("ts", "day_col")
.hour("ts")
.build();
Assert.assertEquals("Should changed from daily to hourly partitioned field", expected, table.spec());
}

@Test
public void testReplaceNamedPartitionAndRenameDifferently() {
sql("CREATE TABLE %s (id bigint NOT NULL, category string, ts timestamp, data string) USING iceberg", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertTrue("Table should start unpartitioned", table.spec().isUnpartitioned());

sql("ALTER TABLE %s ADD PARTITION FIELD days(ts) AS day_col", tableName);
table.refresh();
PartitionSpec expected = PartitionSpec.builderFor(table.schema())
.withSpecId(1)
.day("ts", "day_col")
.build();
Assert.assertEquals("Should have new spec field", expected, table.spec());

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_col WITH hours(ts) AS hour_col", tableName);
table.refresh();
expected = PartitionSpec.builderFor(table.schema())
.withSpecId(2)
.alwaysNull("ts", "day_col")
.hour("ts", "hour_col")
.build();
Assert.assertEquals("Should changed from daily to hourly partitioned field", expected, table.spec());
}
}