Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate delta to iceberg round 15 #49

Merged
merged 7 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase {
private String newIcebergTableLocation;
private String externalDataFilesTableLocation;
private String typeTestTableLocation;
private Dataset<Row> typeTestDataFrame;
private Dataset<Row> nestedDataFrame;

@Parameterized.Parameters(name = "Catalog Name {0} - Options {2}")
public static Object[][] parameters() {
Expand Down Expand Up @@ -143,15 +145,12 @@ public void before() throws IOException {
spark.sql(String.format("DROP TABLE IF EXISTS %s", typeTestIdentifier));

// generate the dataframe
Dataset<Row> nestedDataFrame = nestedDataFrame();
Dataset<Row> typeTestDataFrame = typeTestDataFrame();
nestedDataFrame = nestedDataFrame();
typeTestDataFrame = typeTestDataFrame();

// write to delta tables
writeDeltaTable(nestedDataFrame, partitionedIdentifier, partitionedLocation, "id");
writeDeltaTable(nestedDataFrame, unpartitionedIdentifier, unpartitionedLocation, null);
writeDeltaTable(
nestedDataFrame, externalDataFilesIdentifier, externalDataFilesTableLocation, null);
writeDeltaTable(typeTestDataFrame, typeTestIdentifier, typeTestTableLocation, "stringCol");

// Delete a record from the table
spark.sql("DELETE FROM " + partitionedIdentifier + " WHERE id=3");
Expand Down Expand Up @@ -273,6 +272,8 @@ public void testSnapshotWithAdditionalProperties() {

@Test
public void testSnapshotTableWithExternalDataFiles() {
writeDeltaTable(
nestedDataFrame, externalDataFilesIdentifier, externalDataFilesTableLocation, null);
// Add parquet files to default.external_data_files_table. The newly added parquet files
// are not at the same location as the table.
addExternalDatafiles(externalDataFilesTableLocation, unpartitionedLocation);
Expand All @@ -290,6 +291,7 @@ public void testSnapshotTableWithExternalDataFiles() {

@Test
public void testSnapshotSupportedTypes() {
writeDeltaTable(typeTestDataFrame, typeTestIdentifier, typeTestTableLocation, "stringCol");
String newTableIdentifier = destName(icebergCatalogName, snapshotTypeTestTableName);
SnapshotDeltaLakeTable.Result result =
DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.slf4j.Logger;
Expand Down Expand Up @@ -90,6 +89,7 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable {
private TableIdentifier newTableIdentifier;
private String newTableLocation;
private HadoopFileIO deltaLakeFileIO;
private long deltaStartVersion;

/**
* Snapshot a delta lake table to be an iceberg table. The action will read the delta lake table's
Expand Down Expand Up @@ -139,6 +139,8 @@ public SnapshotDeltaLakeTable icebergCatalog(Catalog catalog) {
public SnapshotDeltaLakeTable deltaLakeConfiguration(Configuration conf) {
this.deltaLog = DeltaLog.forTable(conf, deltaTableLocation);
this.deltaLakeFileIO = new HadoopFileIO(conf);
// get the earliest version available in the delta lake table
this.deltaStartVersion = deltaLog.getVersionAtOrAfterTimestamp(0L);
return this;
}

Expand All @@ -150,6 +152,10 @@ public Result execute() {
Preconditions.checkArgument(
deltaLog != null && deltaLakeFileIO != null,
"Make sure to configure the action with a valid deltaLakeConfiguration");
Preconditions.checkArgument(
deltaLog.tableExists(),
"Delta lake table does not exist at the given location: %s",
deltaTableLocation);
io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update();
Schema schema = convertDeltaLakeSchema(updatedSnapshot.getMetadata().getSchema());
PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(schema);
Expand All @@ -169,8 +175,8 @@ public Result execute() {
.commit();
Iterator<VersionLog> versionLogIterator =
deltaLog.getChanges(
0, // retrieve actions starting from the initial version
false); // not throw exception when data loss detected
deltaStartVersion, false // not throw exception when data loss detected
);
while (versionLogIterator.hasNext()) {
VersionLog versionLog = versionLogIterator.next();
commitDeltaVersionLogToIcebergTransaction(versionLog, icebergTransaction);
Expand Down Expand Up @@ -227,17 +233,25 @@ private PartitionSpec getPartitionSpecFromDeltaSnapshot(Schema schema) {
*/
private void commitDeltaVersionLogToIcebergTransaction(
VersionLog versionLog, Transaction transaction) {
List<Action> actions = versionLog.getActions();

// Create a map of Delta Lake Action (AddFile, RemoveFile, etc.) --> List<Action>
Map<String, List<Action>> deltaLakeActionMap =
actions.stream()
.filter(action -> action instanceof AddFile || action instanceof RemoveFile)
.collect(Collectors.groupingBy(a -> a.getClass().getSimpleName()));
List<Action> dataFileActions;
if (versionLog.getVersion() == deltaStartVersion) {
// The first version log is a special case, since it contains the initial table state.
// we need to get all dataFiles from the corresponding delta snapshot to construct the table.
dataFileActions =
deltaLog.getSnapshotForVersionAsOf(deltaStartVersion).getAllFiles().stream()
.map(addFile -> (Action) addFile)
.collect(Collectors.toList());
} else {
// Only need actions related to data change: AddFile and RemoveFile
dataFileActions =
versionLog.getActions().stream()
.filter(action -> action instanceof AddFile || action instanceof RemoveFile)
.collect(Collectors.toList());
}

List<DataFile> filesToAdd = Lists.newArrayList();
List<DataFile> filesToRemove = Lists.newArrayList();
for (Action action : Iterables.concat(deltaLakeActionMap.values())) {
for (Action action : dataFileActions) {
DataFile dataFile = buildDataFileFromAction(action, transaction.table());
if (action instanceof AddFile) {
filesToAdd.add(dataFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ public void testRequiredDeltaLakeConfiguration() {
.hasMessage("Make sure to configure the action with a valid deltaLakeConfiguration");
}

@Test
public void testDeltaTableNotExist() {
SnapshotDeltaLakeTable testAction =
new BaseSnapshotDeltaLakeTableAction(sourceTableLocation)
.as(TableIdentifier.of("test", "test"))
.deltaLakeConfiguration(testHadoopConf)
.icebergCatalog(testCatalog)
.tableLocation(newTableLocation);
Assertions.assertThatThrownBy(testAction::execute)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Delta lake table does not exist at the given location: %s", sourceTableLocation);
}

private static class TestCatalog extends BaseMetastoreCatalog {
TestCatalog() {}

Expand Down