Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions .github/workflows/jmh-benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ on:
description: 'The branch name'
required: true
spark_version:
description: 'The spark project version to use, such as iceberg-spark-3.3'
default: 'iceberg-spark-3.3'
description: 'The spark project version to use, such as iceberg-spark-3.4'
default: 'iceberg-spark-3.4'
required: true
benchmarks:
description: 'A list of comma-separated double-quoted Benchmark names, such as "IcebergSourceFlatParquetDataReadBenchmark", "IcebergSourceFlatParquetDataFilterBenchmark"'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ jobs:
- run: |
./gradlew printVersion
./gradlew -DallVersions publishApachePublicationToMavenRepository -PmavenUser=${{ secrets.NEXUS_USER }} -PmavenPassword=${{ secrets.NEXUS_PW }}
./gradlew -DflinkVersions= -DsparkVersions=3.2,3.3 -DscalaVersion=2.13 -DhiveVersions= publishApachePublicationToMavenRepository -PmavenUser=${{ secrets.NEXUS_USER }} -PmavenPassword=${{ secrets.NEXUS_PW }}
./gradlew -DflinkVersions= -DsparkVersions=3.2,3.3,3.4 -DscalaVersion=2.13 -DhiveVersions= publishApachePublicationToMavenRepository -PmavenUser=${{ secrets.NEXUS_USER }} -PmavenPassword=${{ secrets.NEXUS_PW }}
2 changes: 1 addition & 1 deletion .github/workflows/recurring-jmh-benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
"IcebergSourceNestedParquetDataReadBenchmark", "IcebergSourceNestedParquetDataWriteBenchmark",
"IcebergSourceParquetEqDeleteBenchmark", "IcebergSourceParquetMultiDeleteFileBenchmark",
"IcebergSourceParquetPosDeleteBenchmark", "IcebergSourceParquetWithUnrelatedDeleteBenchmark"]
spark_version: ['iceberg-spark-3.3']
spark_version: ['iceberg-spark-3.4']
env:
SPARK_LOCAL_IP: localhost
steps:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/spark-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ jobs:
strategy:
matrix:
jvm: [8, 11]
spark: ['3.1', '3.2', '3.3']
spark: ['3.1', '3.2', '3.3', '3.4']
env:
SPARK_LOCAL_IP: localhost
steps:
Expand Down Expand Up @@ -116,7 +116,7 @@ jobs:
strategy:
matrix:
jvm: [8, 11]
spark: ['3.2','3.3']
spark: ['3.2','3.3', '3.4']
env:
SPARK_LOCAL_IP: localhost
steps:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ spark/v2.4/spark/benchmark/*
spark/v3.1/spark/benchmark/*
spark/v3.2/spark/benchmark/*
spark/v3.3/spark/benchmark/*
spark/v3.4/spark/benchmark/*
data/benchmark/*

__pycache__/
Expand Down
3 changes: 2 additions & 1 deletion dev/stage-binaries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

SCALA_VERSION=2.12
FLINK_VERSIONS=1.15,1.16,1.17
SPARK_VERSIONS=2.4,3.1,3.2,3.3
SPARK_VERSIONS=2.4,3.1,3.2,3.3,3.4
HIVE_VERSIONS=2,3

./gradlew -Prelease -DscalaVersion=$SCALA_VERSION -DflinkVersions=$FLINK_VERSIONS -DsparkVersions=$SPARK_VERSIONS -DhiveVersions=$HIVE_VERSIONS publishApachePublicationToMavenRepository
Expand All @@ -29,4 +29,5 @@ HIVE_VERSIONS=2,3
# Flink does not yet support 2.13 (and is largely dropping a user-facing dependency on Scala). Hive doesn't need a Scala specification.
./gradlew -Prelease -DscalaVersion=2.13 -DsparkVersions=3.2 :iceberg-spark:iceberg-spark-3.2_2.13:publishApachePublicationToMavenRepository :iceberg-spark:iceberg-spark-extensions-3.2_2.13:publishApachePublicationToMavenRepository :iceberg-spark:iceberg-spark-runtime-3.2_2.13:publishApachePublicationToMavenRepository
./gradlew -Prelease -DscalaVersion=2.13 -DsparkVersions=3.3 :iceberg-spark:iceberg-spark-3.3_2.13:publishApachePublicationToMavenRepository :iceberg-spark:iceberg-spark-extensions-3.3_2.13:publishApachePublicationToMavenRepository :iceberg-spark:iceberg-spark-runtime-3.3_2.13:publishApachePublicationToMavenRepository
./gradlew -Prelease -DscalaVersion=2.13 -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-3.4_2.13:publishApachePublicationToMavenRepository :iceberg-spark:iceberg-spark-extensions-3.4_2.13:publishApachePublicationToMavenRepository :iceberg-spark:iceberg-spark-runtime-3.4_2.13:publishApachePublicationToMavenRepository

4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ systemProp.defaultFlinkVersions=1.17
systemProp.knownFlinkVersions=1.15,1.16,1.17
systemProp.defaultHiveVersions=2
systemProp.knownHiveVersions=2,3
systemProp.defaultSparkVersions=3.3
systemProp.knownSparkVersions=2.4,3.1,3.2,3.3
systemProp.defaultSparkVersions=3.4
systemProp.knownSparkVersions=2.4,3.1,3.2,3.3,3.4
systemProp.defaultScalaVersion=2.12
systemProp.knownScalaVersions=2.12,2.13
org.gradle.parallel=true
Expand Down
4 changes: 4 additions & 0 deletions jmh.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ if (sparkVersions.contains("3.3")) {
jmhProjects.add(project(":iceberg-spark:iceberg-spark-3.3_${scalaVersion}"))
}

if (sparkVersions.contains("3.4")) {
jmhProjects.add(project(":iceberg-spark:iceberg-spark-3.4_${scalaVersion}"))
}

jmhProjects.add(project(":iceberg-data"))

configure(jmhProjects) {
Expand Down
12 changes: 12 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,18 @@ if (sparkVersions.contains("3.3")) {
project(":iceberg-spark:spark-runtime-3.3_${scalaVersion}").name = "iceberg-spark-runtime-3.3_${scalaVersion}"
}

if (sparkVersions.contains("3.4")) {
include ":iceberg-spark:spark-3.4_${scalaVersion}"
include ":iceberg-spark:spark-extensions-3.4_${scalaVersion}"
include ":iceberg-spark:spark-runtime-3.4_${scalaVersion}"
project(":iceberg-spark:spark-3.4_${scalaVersion}").projectDir = file('spark/v3.4/spark')
project(":iceberg-spark:spark-3.4_${scalaVersion}").name = "iceberg-spark-3.4_${scalaVersion}"
project(":iceberg-spark:spark-extensions-3.4_${scalaVersion}").projectDir = file('spark/v3.4/spark-extensions')
project(":iceberg-spark:spark-extensions-3.4_${scalaVersion}").name = "iceberg-spark-extensions-3.4_${scalaVersion}"
project(":iceberg-spark:spark-runtime-3.4_${scalaVersion}").projectDir = file('spark/v3.4/spark-runtime')
project(":iceberg-spark:spark-runtime-3.4_${scalaVersion}").name = "iceberg-spark-runtime-3.4_${scalaVersion}"
}

// hive 3 depends on hive 2, so always add hive 2 if hive3 is enabled
if (hiveVersions.contains("2") || hiveVersions.contains("3")) {
include 'mr'
Expand Down
6 changes: 5 additions & 1 deletion spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ if (sparkVersions.contains("3.2")) {

if (sparkVersions.contains("3.3")) {
apply from: file("$projectDir/v3.3/build.gradle")
}
}

if (sparkVersions.contains("3.4")) {
apply from: file("$projectDir/v3.4/build.gradle")
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,7 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse
currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false);
}

Snapshot snapshot = table.snapshot(currentOffset.snapshotId());

if (snapshot == null) {
throw new IllegalStateException(
String.format(
"Cannot load current offset at snapshot %d, the snapshot was expired or removed",
currentOffset.snapshotId()));
}

if (!shouldProcess(snapshot)) {
if (!shouldProcess(table.snapshot(currentOffset.snapshotId()))) {
LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name());
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.iceberg.spark.source;

import static org.apache.iceberg.expressions.Expressions.ref;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -325,46 +323,6 @@ public void testResumingStreamReadFromCheckpoint() throws Exception {
}
}

@Test
public void testFailReadingCheckpointInvalidSnapshot() throws IOException, TimeoutException {
File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder");
File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint");
File output = temp.newFolder();

DataStreamWriter querySource =
spark
.readStream()
.format("iceberg")
.load(tableName)
.writeStream()
.option("checkpointLocation", writerCheckpoint.toString())
.format("parquet")
.queryName("checkpoint_test")
.option("path", output.getPath());

List<SimpleRecord> firstSnapshotRecordList = Lists.newArrayList(new SimpleRecord(1, "one"));
List<SimpleRecord> secondSnapshotRecordList = Lists.newArrayList(new SimpleRecord(2, "two"));
StreamingQuery startQuery = querySource.start();

appendData(firstSnapshotRecordList);
table.refresh();
long firstSnapshotid = table.currentSnapshot().snapshotId();
startQuery.processAllAvailable();
startQuery.stop();

appendData(secondSnapshotRecordList);

table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit();

StreamingQuery restartedQuery = querySource.start();
assertThatThrownBy(restartedQuery::processAllAvailable)
.hasCauseInstanceOf(IllegalStateException.class)
.hasMessageContaining(
String.format(
"Cannot load current offset at snapshot %d, the snapshot was expired or removed",
firstSnapshotid));
}

@Test
public void testParquetOrcAvroDataInOneTable() throws Exception {
List<SimpleRecord> parquetFileRecords =
Expand Down
Loading