Skip to content

Commit 1460743

Browse files
author
Jack Ye
authored
Spark: Add extensions DDL to set identifier fields (#2560)
1 parent 63392d9 commit 1460743

File tree

9 files changed

+370
-3
lines changed

9 files changed

+370
-3
lines changed

spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ statement
7171
| ALTER TABLE multipartIdentifier DROP PARTITION FIELD transform #dropPartitionField
7272
| ALTER TABLE multipartIdentifier REPLACE PARTITION FIELD transform WITH transform (AS name=identifier)? #replacePartitionField
7373
| ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering
74+
| ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList #setIdentifierFields
75+
| ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields
7476
;
7577

7678
writeSpec
@@ -157,9 +159,13 @@ quotedIdentifier
157159
: BACKQUOTED_IDENTIFIER
158160
;
159161

162+
fieldList
163+
: fields+=multipartIdentifier (',' fields+=multipartIdentifier)*
164+
;
165+
160166
nonReserved
161167
: ADD | ALTER | AS | ASC | BY | CALL | DESC | DROP | FIELD | FIRST | LAST | NULLS | ORDERED | PARTITION | TABLE | WRITE
162-
| DISTRIBUTED | LOCALLY | UNORDERED
168+
| DISTRIBUTED | LOCALLY | UNORDERED | REPLACE | WITH | IDENTIFIER_KW | FIELDS | SET
163169
| TRUE | FALSE
164170
| MAP
165171
;
@@ -174,13 +180,16 @@ DESC: 'DESC';
174180
DISTRIBUTED: 'DISTRIBUTED';
175181
DROP: 'DROP';
176182
FIELD: 'FIELD';
183+
FIELDS: 'FIELDS';
177184
FIRST: 'FIRST';
178185
LAST: 'LAST';
179186
LOCALLY: 'LOCALLY';
180187
NULLS: 'NULLS';
181188
ORDERED: 'ORDERED';
182189
PARTITION: 'PARTITION';
183190
REPLACE: 'REPLACE';
191+
IDENTIFIER_KW: 'IDENTIFIER';
192+
SET: 'SET';
184193
TABLE: 'TABLE';
185194
UNORDERED: 'UNORDERED';
186195
WITH: 'WITH';

spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
116116
normalized.contains("write ordered by") ||
117117
normalized.contains("write locally ordered by") ||
118118
normalized.contains("write distributed by") ||
119-
normalized.contains("write unordered")))
119+
normalized.contains("write unordered") ||
120+
normalized.contains("set identifier fields") ||
121+
normalized.contains("drop identifier fields")))
120122
}
121123

122124
protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser => T): T = {

spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,13 @@ import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParse
3737
import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
3838
import org.apache.spark.sql.catalyst.plans.logical.CallArgument
3939
import org.apache.spark.sql.catalyst.plans.logical.CallStatement
40+
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
4041
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
4142
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
4243
import org.apache.spark.sql.catalyst.plans.logical.NamedArgument
4344
import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument
4445
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
46+
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
4547
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
4648
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
4749
import org.apache.spark.sql.catalyst.trees.Origin
@@ -85,7 +87,7 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
8587

8688

8789
/**
88-
* Create an CHANGE PARTITION FIELD logical command.
90+
* Create an REPLACE PARTITION FIELD logical command.
8991
*/
9092
override def visitReplacePartitionField(ctx: ReplacePartitionFieldContext): ReplacePartitionField = withOrigin(ctx) {
9193
ReplacePartitionField(
@@ -95,6 +97,24 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
9597
Option(ctx.name).map(_.getText))
9698
}
9799

100+
/**
101+
* Create an SET IDENTIFIER FIELDS logical command.
102+
*/
103+
override def visitSetIdentifierFields(ctx: SetIdentifierFieldsContext): SetIdentifierFields = withOrigin(ctx) {
104+
SetIdentifierFields(
105+
typedVisit[Seq[String]](ctx.multipartIdentifier),
106+
ctx.fieldList.fields.asScala.map(_.getText))
107+
}
108+
109+
/**
110+
* Create an DROP IDENTIFIER FIELDS logical command.
111+
*/
112+
override def visitDropIdentifierFields(ctx: DropIdentifierFieldsContext): DropIdentifierFields = withOrigin(ctx) {
113+
DropIdentifierFields(
114+
typedVisit[Seq[String]](ctx.multipartIdentifier),
115+
ctx.fieldList.fields.asScala.map(_.getText))
116+
}
117+
98118
/**
99119
* Create a [[SetWriteDistributionAndOrdering]] for changing the write distribution and ordering.
100120
*/
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.catalyst.plans.logical
21+
22+
import org.apache.spark.sql.catalyst.expressions.Attribute
23+
24+
case class DropIdentifierFields(
25+
table: Seq[String],
26+
fields: Seq[String]) extends Command {
27+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
28+
29+
override lazy val output: Seq[Attribute] = Nil
30+
31+
override def simpleString(maxFields: Int): String = {
32+
s"DropIdentifierFields ${table.quoted} (${fields.quoted})"
33+
}
34+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.catalyst.plans.logical
21+
22+
import org.apache.spark.sql.catalyst.expressions.Attribute
23+
import org.apache.spark.sql.connector.expressions.Transform
24+
25+
case class SetIdentifierFields(
26+
table: Seq[String],
27+
fields: Seq[String]) extends Command {
28+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
29+
30+
override lazy val output: Seq[Attribute] = Nil
31+
32+
override def simpleString(maxFields: Int): String = {
33+
s"SetIdentifierFields ${table.quoted} (${fields.quoted})"
34+
}
35+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.execution.datasources.v2
21+
22+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions
23+
import org.apache.iceberg.relocated.com.google.common.collect.Sets
24+
import org.apache.iceberg.spark.source.SparkTable
25+
import org.apache.spark.sql.catalyst.InternalRow
26+
import org.apache.spark.sql.catalyst.expressions.Attribute
27+
import org.apache.spark.sql.connector.catalog.Identifier
28+
import org.apache.spark.sql.connector.catalog.TableCatalog
29+
30+
case class DropIdentifierFieldsExec(
31+
catalog: TableCatalog,
32+
ident: Identifier,
33+
fields: Seq[String]) extends V2CommandExec {
34+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
35+
36+
override lazy val output: Seq[Attribute] = Nil
37+
38+
override protected def run(): Seq[InternalRow] = {
39+
catalog.loadTable(ident) match {
40+
case iceberg: SparkTable =>
41+
val schema = iceberg.table.schema
42+
val identifierFieldNames = Sets.newHashSet(schema.identifierFieldNames)
43+
44+
for (name <- fields) {
45+
Preconditions.checkArgument(schema.findField(name) != null,
46+
"Cannot complete drop identifier fields operation: field %s not found", name)
47+
Preconditions.checkArgument(identifierFieldNames.contains(name),
48+
"Cannot complete drop identifier fields operation: %s is not an identifier field", name)
49+
identifierFieldNames.remove(name)
50+
}
51+
52+
iceberg.table.updateSchema()
53+
.setIdentifierFields(identifierFieldNames)
54+
.commit();
55+
case table =>
56+
throw new UnsupportedOperationException(s"Cannot drop identifier fields in non-Iceberg table: $table")
57+
}
58+
59+
Nil
60+
}
61+
62+
override def simpleString(maxFields: Int): String = {
63+
s"DropIdentifierFields ${catalog.name}.${ident.quoted} (${fields.quoted})";
64+
}
65+
}

spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@ import org.apache.spark.sql.catalyst.expressions.NamedExpression
3333
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
3434
import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
3535
import org.apache.spark.sql.catalyst.plans.logical.Call
36+
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
3637
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
3738
import org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilter
3839
import org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilterWithCardinalityCheck
3940
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
4041
import org.apache.spark.sql.catalyst.plans.logical.MergeInto
4142
import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
4243
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
44+
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
4345
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
4446
import org.apache.spark.sql.connector.catalog.Identifier
4547
import org.apache.spark.sql.connector.catalog.TableCatalog
@@ -66,6 +68,12 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy {
6668
case ReplacePartitionField(IcebergCatalogAndIdentifier(catalog, ident), transformFrom, transformTo, name) =>
6769
ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo, name) :: Nil
6870

71+
case SetIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), fields) =>
72+
SetIdentifierFieldsExec(catalog, ident, fields) :: Nil
73+
74+
case DropIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), fields) =>
75+
DropIdentifierFieldsExec(catalog, ident, fields) :: Nil
76+
6977
case SetWriteDistributionAndOrdering(
7078
IcebergCatalogAndIdentifier(catalog, ident), distributionMode, ordering) =>
7179
SetWriteDistributionAndOrderingExec(catalog, ident, distributionMode, ordering) :: Nil
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.execution.datasources.v2
21+
22+
import org.apache.iceberg.spark.source.SparkTable
23+
import org.apache.spark.sql.catalyst.InternalRow
24+
import org.apache.spark.sql.catalyst.expressions.Attribute
25+
import org.apache.spark.sql.connector.catalog.Identifier
26+
import org.apache.spark.sql.connector.catalog.TableCatalog
27+
28+
case class SetIdentifierFieldsExec(
29+
catalog: TableCatalog,
30+
ident: Identifier,
31+
fields: Seq[String]) extends V2CommandExec {
32+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
33+
34+
override lazy val output: Seq[Attribute] = Nil
35+
36+
override protected def run(): Seq[InternalRow] = {
37+
catalog.loadTable(ident) match {
38+
case iceberg: SparkTable =>
39+
iceberg.table.updateSchema()
40+
.setIdentifierFields(scala.collection.JavaConverters.seqAsJavaList(fields))
41+
.commit();
42+
case table =>
43+
throw new UnsupportedOperationException(s"Cannot set identifier fields in non-Iceberg table: $table")
44+
}
45+
46+
Nil
47+
}
48+
49+
override def simpleString(maxFields: Int): String = {
50+
s"SetIdentifierFields ${catalog.name}.${ident.quoted} (${fields.quoted})";
51+
}
52+
}

0 commit comments

Comments
 (0)