Skip to content

Commit

Permalink
Adapt for delta.logRetentionDuration (#49)
Browse files Browse the repository at this point in the history
* remove a redundant map collector in commitDeltaVersionLogToIcebergTransaction

* get the earliest possible version rather than hard code from 0

* add unit test to check if table exists

* refactor action extracted from the versionlog

* fix format issue

* move non-share table write operation to the test itself, instead of in before()

* fix type
  • Loading branch information
JonasJ-ap authored Jan 25, 2023
1 parent 1cd36b9 commit 4463f30
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
private String newIcebergTableLocation;
private String externalDataFilesTableLocation;
private String typeTestTableLocation;
private Dataset<Row> typeTestDataFrame;
private Dataset<Row> nestedDataFrame;

@Parameterized.Parameters(name = "Catalog Name {0} - Options {2}")
public static Object[][] parameters() {
Expand Down Expand Up @@ -143,15 +145,12 @@ public void before() throws IOException {
spark.sql(String.format("DROP TABLE IF EXISTS %s", typeTestIdentifier));

// generate the dataframe
Dataset<Row> nestedDataFrame = nestedDataFrame();
Dataset<Row> typeTestDataFrame = typeTestDataFrame();
nestedDataFrame = nestedDataFrame();
typeTestDataFrame = typeTestDataFrame();

// write to delta tables
writeDeltaTable(nestedDataFrame, partitionedIdentifier, partitionedLocation, "id");
writeDeltaTable(nestedDataFrame, unpartitionedIdentifier, unpartitionedLocation, null);
writeDeltaTable(
nestedDataFrame, externalDataFilesIdentifier, externalDataFilesTableLocation, null);
writeDeltaTable(typeTestDataFrame, typeTestIdentifier, typeTestTableLocation, "stringCol");

// Delete a record from the table
spark.sql("DELETE FROM " + partitionedIdentifier + " WHERE id=3");
Expand Down Expand Up @@ -273,6 +272,8 @@ public void testSnapshotWithAdditionalProperties() {

@Test
public void testSnapshotTableWithExternalDataFiles() {
writeDeltaTable(
nestedDataFrame, externalDataFilesIdentifier, externalDataFilesTableLocation, null);
// Add parquet files to default.external_data_files_table. The newly added parquet files
// are not at the same location as the table.
addExternalDatafiles(externalDataFilesTableLocation, unpartitionedLocation);
Expand All @@ -290,6 +291,7 @@ public void testSnapshotTableWithExternalDataFiles() {

@Test
public void testSnapshotSupportedTypes() {
writeDeltaTable(typeTestDataFrame, typeTestIdentifier, typeTestTableLocation, "stringCol");
String newTableIdentifier = destName(icebergCatalogName, snapshotTypeTestTableName);
SnapshotDeltaLakeTable.Result result =
DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.slf4j.Logger;
Expand Down Expand Up @@ -90,6 +89,7 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
private TableIdentifier newTableIdentifier;
private String newTableLocation;
private HadoopFileIO deltaLakeFileIO;
private long deltaStartVersion;

/**
* Snapshot a delta lake table to be an iceberg table. The action will read the delta lake table's
Expand Down Expand Up @@ -139,6 +139,8 @@ public SnapshotDeltaLakeTable icebergCatalog(Catalog catalog) {
public SnapshotDeltaLakeTable deltaLakeConfiguration(Configuration conf) {
this.deltaLog = DeltaLog.forTable(conf, deltaTableLocation);
this.deltaLakeFileIO = new HadoopFileIO(conf);
// get the earliest version available in the delta lake table
this.deltaStartVersion = deltaLog.getVersionAtOrAfterTimestamp(0L);
return this;
}

Expand All @@ -150,6 +152,10 @@ public Result execute() {
Preconditions.checkArgument(
deltaLog != null && deltaLakeFileIO != null,
"Make sure to configure the action with a valid deltaLakeConfiguration");
Preconditions.checkArgument(
deltaLog.tableExists(),
"Delta lake table does not exist at the given location: %s",
deltaTableLocation);
io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update();
Schema schema = convertDeltaLakeSchema(updatedSnapshot.getMetadata().getSchema());
PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(schema);
Expand All @@ -169,8 +175,8 @@ public Result execute() {
.commit();
Iterator<VersionLog> versionLogIterator =
deltaLog.getChanges(
0, // retrieve actions starting from the initial version
false); // not throw exception when data loss detected
deltaStartVersion, false // not throw exception when data loss detected
);
while (versionLogIterator.hasNext()) {
VersionLog versionLog = versionLogIterator.next();
commitDeltaVersionLogToIcebergTransaction(versionLog, icebergTransaction);
Expand Down Expand Up @@ -227,17 +233,25 @@ private PartitionSpec getPartitionSpecFromDeltaSnapshot(Schema schema) {
*/
private void commitDeltaVersionLogToIcebergTransaction(
VersionLog versionLog, Transaction transaction) {
List<Action> actions = versionLog.getActions();

// Create a map of Delta Lake Action (AddFile, RemoveFile, etc.) --> List<Action>
Map<String, List<Action>> deltaLakeActionMap =
actions.stream()
.filter(action -> action instanceof AddFile || action instanceof RemoveFile)
.collect(Collectors.groupingBy(a -> a.getClass().getSimpleName()));
List<Action> dataFileActions;
if (versionLog.getVersion() == deltaStartVersion) {
// The first version log is a special case, since it contains the initial table state.
// we need to get all dataFiles from the corresponding delta snapshot to construct the table.
dataFileActions =
deltaLog.getSnapshotForVersionAsOf(deltaStartVersion).getAllFiles().stream()
.map(addFile -> (Action) addFile)
.collect(Collectors.toList());
} else {
// Only need actions related to data change: AddFile and RemoveFile
dataFileActions =
versionLog.getActions().stream()
.filter(action -> action instanceof AddFile || action instanceof RemoveFile)
.collect(Collectors.toList());
}

List<DataFile> filesToAdd = Lists.newArrayList();
List<DataFile> filesToRemove = Lists.newArrayList();
for (Action action : Iterables.concat(deltaLakeActionMap.values())) {
for (Action action : dataFileActions) {
DataFile dataFile = buildDataFileFromAction(action, transaction.table());
if (action instanceof AddFile) {
filesToAdd.add(dataFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ public void testRequiredDeltaLakeConfiguration() {
.hasMessage("Make sure to configure the action with a valid deltaLakeConfiguration");
}

@Test
public void testDeltaTableNotExist() {
SnapshotDeltaLakeTable testAction =
new BaseSnapshotDeltaLakeTableAction(sourceTableLocation)
.as(TableIdentifier.of("test", "test"))
.deltaLakeConfiguration(testHadoopConf)
.icebergCatalog(testCatalog)
.tableLocation(newTableLocation);
Assertions.assertThatThrownBy(testAction::execute)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Delta lake table does not exist at the given location: %s", sourceTableLocation);
}

private static class TestCatalog extends BaseMetastoreCatalog {
TestCatalog() {}

Expand Down

0 comments on commit 4463f30

Please sign in to comment.