Skip to content

Commit 111fe81

Browse files
authored
[Spark] Support building against both Spark 3.0 and Spark 3.1. (#2512)
Code changes that allow spark3 and spark3-extensions to be tested against both Spark 3.0 and Spark 3.1 while still built against a single Spark 3.0 version. Although additional tests are are created we still only produce a single set of Spark3 binaries which are compatible with Spark 3.0 and 3.1
1 parent 92a264b commit 111fe81

File tree

18 files changed

+314
-47
lines changed

18 files changed

+314
-47
lines changed

build.gradle

Lines changed: 89 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ allprojects {
6464
mavenCentral()
6565
mavenLocal()
6666
}
67+
project.ext {
68+
Spark30Version = '3.0.1'
69+
Spark31Version = '3.1.1'
70+
}
6771
}
6872

6973
subprojects {
@@ -977,6 +981,21 @@ if (jdkVersion == '8') {
977981
}
978982

979983
project(':iceberg-spark3') {
984+
sourceSets {
985+
// Compile test source against Spark 3.1 and main classes compiled against Spark 3.0
986+
spark31 {
987+
java.srcDir "$projectDir/src/test/java"
988+
resources.srcDir "$projectDir/src/test/resources"
989+
compileClasspath += sourceSets.test.output + sourceSets.main.output
990+
runtimeClasspath += sourceSets.test.output
991+
}
992+
}
993+
994+
configurations {
995+
spark31Implementation.extendsFrom testImplementation
996+
spark31RuntimeOnly.extendsFrom testRuntimeOnly
997+
}
998+
980999
dependencies {
9811000
compile project(':iceberg-api')
9821001
compile project(':iceberg-common')
@@ -989,7 +1008,7 @@ project(':iceberg-spark3') {
9891008
compile project(':iceberg-spark')
9901009

9911010
compileOnly "org.apache.avro:avro"
992-
compileOnly("org.apache.spark:spark-hive_2.12") {
1011+
compileOnly("org.apache.spark:spark-hive_2.12:${project.ext.Spark30Version}") {
9931012
exclude group: 'org.apache.avro', module: 'avro'
9941013
exclude group: 'org.apache.arrow'
9951014
}
@@ -1003,9 +1022,14 @@ project(':iceberg-spark3') {
10031022
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
10041023
testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
10051024
testCompile "org.xerial:sqlite-jdbc"
1025+
1026+
spark31Implementation("org.apache.spark:spark-hive_2.12:${project.ext.Spark31Version}") {
1027+
exclude group: 'org.apache.avro', module: 'avro'
1028+
exclude group: 'org.apache.arrow'
1029+
}
10061030
}
10071031

1008-
test {
1032+
tasks.withType(Test) {
10091033
// For vectorized reads
10101034
// Allow unsafe memory access to avoid the costly check arrow does to check if index is within bounds
10111035
systemProperty("arrow.enable_unsafe_memory_access", "true")
@@ -1014,16 +1038,39 @@ project(':iceberg-spark3') {
10141038
systemProperty("arrow.enable_null_check_for_get", "false")
10151039

10161040
// Vectorized reads need more memory
1017-
maxHeapSize '2500m'
1041+
maxHeapSize '2560m'
1042+
}
1043+
1044+
task testSpark31(type: Test) {
1045+
dependsOn classes
1046+
description = "Test against Spark 3.1"
1047+
testClassesDirs = sourceSets.spark31.output.classesDirs
1048+
classpath = sourceSets.spark31.runtimeClasspath + sourceSets.main.output
10181049
}
1050+
1051+
test.dependsOn testSpark31
10191052
}
10201053

10211054
project(":iceberg-spark3-extensions") {
10221055
apply plugin: 'java'
10231056
apply plugin: 'scala'
10241057
apply plugin: 'antlr'
10251058

1059+
sourceSets {
1060+
// Compile test source against Spark 3.1 and main classes compiled against Spark 3.0
1061+
spark31 {
1062+
// Main source is in scala, but test source is only in java
1063+
java.srcDir "$projectDir/src/test/java"
1064+
resources.srcDir "$projectDir/src/test/resources"
1065+
compileClasspath += sourceSets.test.output + sourceSets.main.output
1066+
runtimeClasspath += sourceSets.test.output
1067+
}
1068+
}
1069+
10261070
configurations {
1071+
spark31Implementation.extendsFrom testImplementation
1072+
spark31RuntimeOnly.extendsFrom testRuntimeOnly
1073+
10271074
/*
10281075
The Gradle Antlr plugin erroneously adds both antlr-build and runtime dependencies to the runtime path. This
10291076
bug https://github.com/gradle/gradle/issues/820 exists because older versions of Antlr do not have separate
@@ -1037,10 +1084,9 @@ project(":iceberg-spark3-extensions") {
10371084
}
10381085

10391086
dependencies {
1040-
compileOnly project(':iceberg-spark3')
1041-
10421087
compileOnly "org.scala-lang:scala-library"
1043-
compileOnly("org.apache.spark:spark-hive_2.12") {
1088+
compileOnly project(':iceberg-spark3')
1089+
compileOnly("org.apache.spark:spark-hive_2.12:${project.ext.Spark30Version}") {
10441090
exclude group: 'org.apache.avro', module: 'avro'
10451091
exclude group: 'org.apache.arrow'
10461092
}
@@ -1050,6 +1096,11 @@ project(":iceberg-spark3-extensions") {
10501096
testCompile project(path: ':iceberg-spark', configuration: 'testArtifacts')
10511097
testCompile project(path: ':iceberg-spark3', configuration: 'testArtifacts')
10521098

1099+
spark31Implementation("org.apache.spark:spark-hive_2.12:${project.ext.Spark31Version}") {
1100+
exclude group: 'org.apache.avro', module: 'avro'
1101+
exclude group: 'org.apache.arrow'
1102+
}
1103+
10531104
// Required because we remove antlr plugin dependencies from the compile configuration, see note above
10541105
// We shade this in Spark3 Runtime to avoid issues with Spark's Antlr Runtime
10551106
runtime "org.antlr:antlr4-runtime:4.7.1"
@@ -1060,6 +1111,15 @@ project(":iceberg-spark3-extensions") {
10601111
maxHeapSize = "64m"
10611112
arguments += ['-visitor', '-package', 'org.apache.spark.sql.catalyst.parser.extensions']
10621113
}
1114+
1115+
task testSpark31(type: Test) {
1116+
dependsOn classes
1117+
description = "Test against Spark 3.1"
1118+
testClassesDirs = sourceSets.spark31.output.classesDirs
1119+
classpath = sourceSets.spark31.runtimeClasspath + sourceSets.main.output
1120+
}
1121+
1122+
test.dependsOn testSpark31
10631123
}
10641124

10651125
project(':iceberg-spark3-runtime') {
@@ -1072,6 +1132,12 @@ project(':iceberg-spark3-runtime') {
10721132
java.srcDir "$projectDir/src/integration/java"
10731133
resources.srcDir "$projectDir/src/integration/resources"
10741134
}
1135+
spark31 {
1136+
java.srcDir "$projectDir/src/integration/java"
1137+
resources.srcDir "$projectDir/src/integration/resources"
1138+
compileClasspath += sourceSets.integration.output
1139+
runtimeClasspath += sourceSets.integration.output
1140+
}
10751141
}
10761142

10771143
configurations {
@@ -1086,6 +1152,8 @@ project(':iceberg-spark3-runtime') {
10861152
exclude group: 'javax.xml.bind'
10871153
exclude group: 'javax.annotation'
10881154
}
1155+
spark31Implementation.extendsFrom integrationImplementation
1156+
spark31CompileOnly.extendsFrom integrationCompileOnly
10891157
}
10901158

10911159
dependencies {
@@ -1096,7 +1164,7 @@ project(':iceberg-spark3-runtime') {
10961164
exclude group: 'com.google.code.findbugs', module: 'jsr305'
10971165
}
10981166

1099-
integrationImplementation 'org.apache.spark:spark-hive_2.12'
1167+
integrationImplementation "org.apache.spark:spark-hive_2.12:${project.ext.Spark30Version}"
11001168
integrationImplementation 'junit:junit'
11011169
integrationImplementation 'org.slf4j:slf4j-simple'
11021170
integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
@@ -1107,6 +1175,8 @@ project(':iceberg-spark3-runtime') {
11071175
// Not allowed on our classpath, only the runtime jar is allowed
11081176
integrationCompileOnly project(':iceberg-spark3-extensions')
11091177
integrationCompileOnly project(':iceberg-spark3')
1178+
1179+
spark31Implementation "org.apache.spark:spark-hive_2.12:${project.ext.Spark31Version}"
11101180
}
11111181

11121182
shadowJar {
@@ -1144,14 +1214,24 @@ project(':iceberg-spark3-runtime') {
11441214
}
11451215

11461216
task integrationTest(type: Test) {
1147-
description = "Test Spark3 Runtime Jar"
1217+
description = "Test Spark3 Runtime Jar against Spark 3.0"
11481218
group = "verification"
11491219
testClassesDirs = sourceSets.integration.output.classesDirs
11501220
classpath = sourceSets.integration.runtimeClasspath + files(shadowJar.archiveFile.get().asFile.path)
11511221
inputs.file(shadowJar.archiveFile.get().asFile.path)
11521222
}
11531223
integrationTest.dependsOn shadowJar
1154-
check.dependsOn integrationTest
1224+
1225+
task spark31IntegrationTest(type: Test) {
1226+
dependsOn classes
1227+
description = "Test Spark3 Runtime Jar against Spark 3.1"
1228+
group = "verification"
1229+
testClassesDirs = sourceSets.spark31.output.classesDirs
1230+
classpath = sourceSets.spark31.runtimeClasspath + files(shadowJar.archiveFile.get().asFile.path)
1231+
}
1232+
spark31IntegrationTest.dependsOn shadowJar
1233+
1234+
check.dependsOn integrationTest, spark31IntegrationTest
11551235

11561236
jar {
11571237
enabled = false

core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ private DateTimeUtil() {
3333

3434
public static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
3535
public static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
36+
public static final long MICROS_PER_MILLIS = 1000L;
3637

3738
public static LocalDate dateFromDays(int daysFromEpoch) {
3839
return ChronoUnit.DAYS.addTo(EPOCH_DAY, daysFromEpoch);
@@ -66,6 +67,13 @@ public static long microsFromTimestamp(LocalDateTime dateTime) {
6667
return ChronoUnit.MICROS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC));
6768
}
6869

70+
public static long microsToMillis(long micros) {
71+
// When the timestamp is negative, i.e before 1970, we need to adjust the milliseconds portion.
72+
// Example - 1965-01-01 10:11:12.123456 is represented as (-157700927876544) in micro precision.
73+
// In millis precision the above needs to be represented as (-157700927877).
74+
return Math.floorDiv(micros, MICROS_PER_MILLIS);
75+
}
76+
6977
public static OffsetDateTime timestamptzFromMicros(long microsFromEpoch) {
7078
return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch);
7179
}

spark3-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
4141
// analyzer extensions
4242
extensions.injectResolutionRule { spark => ResolveProcedures(spark) }
4343
extensions.injectResolutionRule { _ => ProcedureArgumentCoercion }
44-
extensions.injectPostHocResolutionRule { spark => AlignRowLevelOperations(spark.sessionState.conf)}
44+
extensions.injectPostHocResolutionRule { spark => AlignRowLevelOperations }
4545
extensions.injectCheckRule { _ => RowLevelOperationsPredicateCheck }
4646

4747
// optimizer extensions

spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlignRowLevelOperations.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ import org.apache.spark.sql.catalyst.rules.Rule
3131
import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
3232
import org.apache.spark.sql.internal.SQLConf
3333

34-
case class AlignRowLevelOperations(conf: SQLConf) extends Rule[LogicalPlan] with AssignmentAlignmentSupport {
34+
case object AlignRowLevelOperations extends Rule[LogicalPlan]
35+
with AssignmentAlignmentSupport with CastSupport {
36+
37+
override def conf: SQLConf = SQLConf.get
3538

3639
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
3740
case u: UpdateTable if u.resolved && isIcebergRelation(u.table)=>

spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,17 @@ import org.apache.spark.sql.catalyst.expressions.Literal
3232
import org.apache.spark.sql.catalyst.expressions.NamedExpression
3333
import org.apache.spark.sql.catalyst.plans.logical.Assignment
3434
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
35+
import org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper.createAlias
36+
import org.apache.spark.sql.internal.SQLConf
3537
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
3638
import org.apache.spark.sql.types.DataType
3739
import org.apache.spark.sql.types.StructField
3840
import org.apache.spark.sql.types.StructType
3941
import scala.collection.mutable
4042

41-
trait AssignmentAlignmentSupport extends CastSupport {
43+
trait AssignmentAlignmentSupport {
44+
45+
def conf: SQLConf
4246

4347
private case class ColumnUpdate(ref: Seq[String], expr: Expression)
4448

@@ -96,7 +100,7 @@ trait AssignmentAlignmentSupport extends CastSupport {
96100
case StructType(fields) =>
97101
// build field expressions
98102
val fieldExprs = fields.zipWithIndex.map { case (field, ordinal) =>
99-
Alias(GetStructField(col, ordinal, Some(field.name)), field.name)()
103+
createAlias(GetStructField(col, ordinal, Some(field.name)), field.name)
100104
}
101105

102106
// recursively apply this method on nested fields

spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.expressions.EqualNullSafe
2828
import org.apache.spark.sql.catalyst.expressions.Expression
2929
import org.apache.spark.sql.catalyst.expressions.Literal
3030
import org.apache.spark.sql.catalyst.expressions.Not
31-
import org.apache.spark.sql.catalyst.expressions.SortOrder
3231
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
3332
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
3433
import org.apache.spark.sql.catalyst.plans.logical.Filter
@@ -38,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
3837
import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
3938
import org.apache.spark.sql.catalyst.plans.logical.Sort
4039
import org.apache.spark.sql.catalyst.rules.Rule
40+
import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils
4141
import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
4242
import org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper
4343
import org.apache.spark.sql.connector.catalog.Table
@@ -52,6 +52,9 @@ case class RewriteDelete(spark: SparkSession) extends Rule[LogicalPlan] with Rew
5252

5353
import ExtendedDataSourceV2Implicits._
5454
import RewriteRowLevelOperationHelper._
55+
import DistributionAndOrderingUtils._
56+
57+
override def conf: SQLConf = SQLConf.get
5558

5659
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
5760
// don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
@@ -66,7 +69,7 @@ case class RewriteDelete(spark: SparkSession) extends Rule[LogicalPlan] with Rew
6669
val mergeBuilder = r.table.asMergeable.newMergeBuilder("delete", writeInfo)
6770

6871
val matchingRowsPlanBuilder = scanRelation => Filter(cond, scanRelation)
69-
val scanPlan = buildDynamicFilterScanPlan(spark, r.table, r.output, mergeBuilder, cond, matchingRowsPlanBuilder)
72+
val scanPlan = buildDynamicFilterScanPlan(spark, r, r.output, mergeBuilder, cond, matchingRowsPlanBuilder)
7073

7174
val remainingRowFilter = Not(EqualNullSafe(cond, Literal(true, BooleanType)))
7275
val remainingRowsPlan = Filter(remainingRowFilter, scanPlan)
@@ -91,11 +94,11 @@ case class RewriteDelete(spark: SparkSession) extends Rule[LogicalPlan] with Rew
9194
remainingRowsPlan
9295
case _ =>
9396
// apply hash partitioning by file if the distribution mode is hash or range
94-
val numShufflePartitions = SQLConf.get.numShufflePartitions
97+
val numShufflePartitions = conf.numShufflePartitions
9598
RepartitionByExpression(Seq(fileNameCol), remainingRowsPlan, numShufflePartitions)
9699
}
97100

98-
val order = Seq(SortOrder(fileNameCol, Ascending), SortOrder(rowPosCol, Ascending))
101+
val order = Seq(createSortOrder(fileNameCol, Ascending), createSortOrder(rowPosCol, Ascending))
99102
val sort = Sort(order, global = false, planWithDistribution)
100103
Project(output, sort)
101104
}

spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,12 @@ import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implici
5555
import org.apache.spark.sql.internal.SQLConf
5656
import org.apache.spark.sql.types.BooleanType
5757

58-
case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with RewriteRowLevelOperationHelper {
58+
case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with RewriteRowLevelOperationHelper {
5959
import ExtendedDataSourceV2Implicits._
6060
import RewriteMergeInto._
61+
import RewriteRowLevelOperationHelper._
62+
63+
override def conf: SQLConf = SQLConf.get
6164

6265
override def apply(plan: LogicalPlan): LogicalPlan = {
6366
plan transform {
@@ -79,7 +82,7 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
7982

8083
val outputExprs = insertAction.assignments.map(_.value)
8184
val outputColNames = target.output.map(_.name)
82-
val outputCols = outputExprs.zip(outputColNames).map { case (expr, name) => Alias(expr, name)() }
85+
val outputCols = outputExprs.zip(outputColNames).map { case (expr, name) => createAlias(expr, name) }
8386
val mergePlan = Project(outputCols, joinPlan)
8487

8588
val writePlan = buildWritePlan(mergePlan, target.table)
@@ -121,7 +124,7 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
121124

122125
// when there are no not-matched actions, use a right outer join to ignore source rows that do not match, but
123126
// keep all unmatched target rows that must be preserved.
124-
val sourceTableProj = source.output ++ Seq(Alias(TRUE_LITERAL, ROW_FROM_SOURCE)())
127+
val sourceTableProj = source.output ++ Seq(createAlias(TRUE_LITERAL, ROW_FROM_SOURCE))
125128
val newSourceTableScan = Project(sourceTableProj, source)
126129
val targetTableScan = buildDynamicFilterTargetScan(mergeBuilder, target, source, cond, matchedActions)
127130
val joinPlan = Join(newSourceTableScan, targetTableScan, RightOuter, Some(cond), JoinHint.NONE)
@@ -151,10 +154,10 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
151154
val (matchedConditions, matchedOutputs) = rewriteMatchedActions(matchedActions, target.output)
152155

153156
// use a full outer join because there are both matched and not matched actions
154-
val sourceTableProj = source.output ++ Seq(Alias(TRUE_LITERAL, ROW_FROM_SOURCE)())
157+
val sourceTableProj = source.output ++ Seq(createAlias(TRUE_LITERAL, ROW_FROM_SOURCE))
155158
val newSourceTableScan = Project(sourceTableProj, source)
156159
val targetTableScan = buildDynamicFilterTargetScan(mergeBuilder, target, source, cond, matchedActions)
157-
val targetTableProj = targetTableScan.output ++ Seq(Alias(TRUE_LITERAL, ROW_FROM_TARGET)())
160+
val targetTableProj = targetTableScan.output ++ Seq(createAlias(TRUE_LITERAL, ROW_FROM_TARGET))
158161
val newTargetTableScan = Project(targetTableProj, targetTableScan)
159162
val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)
160163

@@ -202,7 +205,7 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
202205
val output = target.output
203206
val matchingRowsPlanBuilder = rel => Join(source, rel, Inner, Some(cond), JoinHint.NONE)
204207
val runCardinalityCheck = isCardinalityCheckEnabled(table) && isCardinalityCheckNeeded(matchedActions)
205-
buildDynamicFilterScanPlan(spark, table, output, mergeBuilder, cond, matchingRowsPlanBuilder, runCardinalityCheck)
208+
buildDynamicFilterScanPlan(spark, target, output, mergeBuilder, cond, matchingRowsPlanBuilder, runCardinalityCheck)
206209
}
207210

208211
private def rewriteMatchedActions(

0 commit comments

Comments
 (0)