diff --git a/.github/workflows/delta-conversion-ci.yml b/.github/workflows/delta-conversion-ci.yml new file mode 100644 index 000000000000..5302f048f773 --- /dev/null +++ b/.github/workflows/delta-conversion-ci.yml @@ -0,0 +1,115 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: "Delta Conversion CI" +on: + push: + branches: + - 'master' + - '0.**' + tags: + - 'apache-iceberg-**' + pull_request: + paths-ignore: + - '.github/ISSUE_TEMPLATE/iceberg_bug_report.yml' + - '.github/workflows/python-ci.yml' + - '.github/workflows/flink-ci.yml' + - '.github/workflows/hive-ci.yml' + - '.gitignore' + - '.asf.yml' + - 'dev/**' + - 'mr/**' + - 'hive3/**' + - 'hive3-orc-bundle/**' + - 'hive-runtime/**' + - 'flink/**' + - 'pig/**' + - 'python/**' + - 'python_legacy/**' + - 'docs/**' + - 'open-api/**' + - 'format/**' + - '.gitattributes' + - 'README.md' + - 'CONTRIBUTING.md' + - 'LICENSE' + - 'NOTICE' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ github.event_name == 'pull_request' }} + +jobs: + delta-conversion-scala-2-12-tests: + runs-on: ubuntu-20.04 + strategy: + matrix: + jvm: [8, 11] + env: + SPARK_LOCAL_IP: localhost + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-java@v3 + with: + distribution: zulu + java-version: ${{ matrix.jvm }} + - uses: actions/cache@v3 + with: + path: | + ~/.gradle/caches + ~/.gradle/wrapper + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }} + restore-keys: ${{ runner.os }}-gradle- + - run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts + - run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.12 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc + - uses: actions/upload-artifact@v3 + if: failure() + with: + name: test logs + path: | + **/build/testlogs + + delta-conversion-scala-2-13-tests: + runs-on: ubuntu-20.04 + strategy: + matrix: + jvm: [ 8, 11 ] + env: + SPARK_LOCAL_IP: localhost + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-java@v3 + with: + distribution: zulu + java-version: ${{ matrix.jvm }} + - uses: actions/cache@v3 + with: + path: | + ~/.gradle/caches + ~/.gradle/wrapper + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }} + restore-keys: ${{ runner.os }}-gradle- + - run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts + - run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.13 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc + - uses: actions/upload-artifact@v3 + if: failure() + with: + name: test logs + path: | + **/build/testlogs diff --git a/build.gradle b/build.gradle index 3e55beb0a6ed..a4edb72b71c6 100644 --- a/build.gradle +++ b/build.gradle @@ -48,6 +48,10 @@ plugins { id 'nebula.dependency-recommender' version '11.0.0' } +String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") +String sparkVersionsString = System.getProperty("sparkVersions") != null ? System.getProperty("sparkVersions") : System.getProperty("defaultSparkVersions") +List sparkVersions = sparkVersionsString != null && !sparkVersionsString.isEmpty() ? sparkVersionsString.split(",") : [] + try { // apply these plugins in a try-catch block so that we can handle cases without .git directory apply plugin: 'com.palantir.git-version' @@ -447,6 +451,76 @@ project(':iceberg-aws') { } } +project(':iceberg-delta-lake') { + // use integration test since we can take advantages of spark 3.3 to read datafiles of delta lake table + // and create some tests involving sql query. + configurations { + integrationImplementation.extendsFrom testImplementation + integrationRuntime.extendsFrom testRuntimeOnly + } + + dependencies { + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + api project(':iceberg-api') + implementation project(':iceberg-common') + implementation project(':iceberg-core') + implementation project(':iceberg-parquet') + implementation "com.fasterxml.jackson.core:jackson-databind" + + compileOnly "io.delta:delta-standalone_${scalaVersion}" + + compileOnly("org.apache.hadoop:hadoop-common") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'javax.servlet', module: 'servlet-api' + exclude group: 'com.google.code.gson', module: 'gson' + } + + // The newest version of delta-core uses Spark 3.3.*. Since its only for test, we do + // not need to include older version of delta-core + if (sparkVersions.contains("3.3")) { + integrationImplementation "io.delta:delta-core_${scalaVersion}" + integrationImplementation project(path: ":iceberg-spark:iceberg-spark-3.3_${scalaVersion}") + integrationImplementation("org.apache.hadoop:hadoop-minicluster") { + exclude group: 'org.apache.avro', module: 'avro' + // to make sure netty libs only come from project(':iceberg-arrow') + exclude group: 'io.netty', module: 'netty-buffer' + exclude group: 'io.netty', module: 'netty-common' + } + integrationImplementation project(path: ':iceberg-hive-metastore') + integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') + integrationImplementation("org.apache.spark:spark-hive_${scalaVersion}:3.3.1") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.apache.arrow' + exclude group: 'org.apache.parquet' + // to make sure netty libs only come from project(':iceberg-arrow') + exclude group: 'io.netty', module: 'netty-buffer' + exclude group: 'io.netty', module: 'netty-common' + exclude group: 'org.roaringbitmap' + } + } + } + + // The newest version of delta-core uses Spark 3.3.*. The integration test should only be built + // if iceberg-spark-3.3 is available + if (sparkVersions.contains("3.3")) { + sourceSets { + integration { + java.srcDir "$projectDir/src/integration/java" + resources.srcDir "$projectDir/src/integration/resources" + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + } + } + + task integrationTest(type: Test) { + testClassesDirs = sourceSets.integration.output.classesDirs + classpath = sourceSets.integration.runtimeClasspath + } + check.dependsOn integrationTest + } +} + project(':iceberg-gcp') { dependencies { implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') diff --git a/delta-lake/src/integration/java/org/apache/iceberg/delta/DeltaLakeToIcebergMigrationSparkIntegration.java b/delta-lake/src/integration/java/org/apache/iceberg/delta/DeltaLakeToIcebergMigrationSparkIntegration.java new file mode 100644 index 000000000000..5a3ad24a0e6b --- /dev/null +++ b/delta-lake/src/integration/java/org/apache/iceberg/delta/DeltaLakeToIcebergMigrationSparkIntegration.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.delta; + +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; + +/** An example class shows how to use the delta lake migration actions in SparkContext. */ +class DeltaLakeToIcebergMigrationSparkIntegration { + + private DeltaLakeToIcebergMigrationSparkIntegration() {} + + /** + * Example of how to use a {@link SparkSession}, a table identifier and a delta table location to + * construct an action for snapshotting the delta table to an iceberg table. + * + * @param spark a SparkSession with iceberg catalog configured. + * @param newTableIdentifier can be both 2 parts and 3 parts identifier, if it is 2 parts, the + * default spark catalog will be used + * @param deltaTableLocation the location of the delta table + * @return an instance of snapshot delta lake table action. + */ + static SnapshotDeltaLakeTable snapshotDeltaLakeTable( + SparkSession spark, String newTableIdentifier, String deltaTableLocation) { + Preconditions.checkArgument( + spark != null, "The SparkSession cannot be null, please provide a valid SparkSession"); + Preconditions.checkArgument( + newTableIdentifier != null, + "The table identifier cannot be null, please provide a valid table identifier for the new iceberg table"); + Preconditions.checkArgument( + deltaTableLocation != null, + "The delta lake table location cannot be null, please provide a valid location of the delta lake table to be snapshot"); + + String ctx = "delta lake snapshot target"; + CatalogPlugin defaultCatalog = spark.sessionState().catalogManager().currentCatalog(); + Spark3Util.CatalogAndIdentifier catalogAndIdent = + Spark3Util.catalogAndIdentifier(ctx, spark, newTableIdentifier, defaultCatalog); + return DeltaLakeToIcebergMigrationActionsProvider.defaultActions() + .snapshotDeltaLakeTable(deltaTableLocation) + .as(TableIdentifier.parse(catalogAndIdent.identifier().toString())) + .deltaLakeConfiguration(spark.sessionState().newHadoopConf()) + .icebergCatalog(Spark3Util.loadIcebergCatalog(spark, catalogAndIdent.catalog().name())); + } +} diff --git a/delta-lake/src/integration/java/org/apache/iceberg/delta/SparkDeltaLakeSnapshotTestBase.java b/delta-lake/src/integration/java/org/apache/iceberg/delta/SparkDeltaLakeSnapshotTestBase.java new file mode 100644 index 000000000000..2c100e2fbe95 --- /dev/null +++ b/delta-lake/src/integration/java/org/apache/iceberg/delta/SparkDeltaLakeSnapshotTestBase.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.delta; + +import java.util.Map; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.hive.TestHiveMetastore; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +@SuppressWarnings("VisibilityModifier") +public abstract class SparkDeltaLakeSnapshotTestBase { + protected static TestHiveMetastore metastore = null; + protected static HiveConf hiveConf = null; + protected static SparkSession spark = null; + + @BeforeClass + public static void startMetastoreAndSpark() { + SparkDeltaLakeSnapshotTestBase.metastore = new TestHiveMetastore(); + metastore.start(); + SparkDeltaLakeSnapshotTestBase.hiveConf = metastore.hiveConf(); + + SparkDeltaLakeSnapshotTestBase.spark = + SparkSession.builder() + .master("local[2]") + .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") + .config( + "spark.hadoop." + HiveConf.ConfVars.METASTOREURIS.varname, + hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)) + .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .enableHiveSupport() + .getOrCreate(); + } + + @AfterClass + public static void stopMetastoreAndSpark() throws Exception { + if (metastore != null) { + metastore.stop(); + SparkDeltaLakeSnapshotTestBase.metastore = null; + } + if (spark != null) { + spark.stop(); + SparkDeltaLakeSnapshotTestBase.spark = null; + } + } + + public SparkDeltaLakeSnapshotTestBase( + String catalogName, String implementation, Map config) { + + spark.conf().set("spark.sql.catalog." + catalogName, implementation); + config.forEach( + (key, value) -> spark.conf().set("spark.sql.catalog." + catalogName + "." + key, value)); + } +} diff --git a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java new file mode 100644 index 000000000000..c79129e0af5e --- /dev/null +++ b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.delta; + +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; + +import io.delta.standalone.DeltaLog; +import io.delta.standalone.Operation; +import io.delta.standalone.OptimisticTransaction; +import io.delta.standalone.actions.AddFile; +import io.delta.standalone.actions.RemoveFile; +import io.delta.standalone.exceptions.DeltaConcurrentModificationException; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.net.URLCodec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.delta.catalog.DeltaCatalog; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase { + private static final String SNAPSHOT_SOURCE_PROP = "snapshot_source"; + private static final String DELTA_SOURCE_VALUE = "delta"; + private static final String ORIGINAL_LOCATION_PROP = "original_location"; + private static final String NAMESPACE = "delta_conversion_test"; + private static final String defaultSparkCatalog = "spark_catalog"; + private static final String icebergCatalogName = "iceberg_hive"; + private String partitionedIdentifier; + private String unpartitionedIdentifier; + private String externalDataFilesIdentifier; + private String typeTestIdentifier; + private final String partitionedTableName = "partitioned_table"; + private final String unpartitionedTableName = "unpartitioned_table"; + private final String externalDataFilesTableName = "external_data_files_table"; + private final String typeTestTableName = "type_test_table"; + private final String snapshotPartitionedTableName = "iceberg_partitioned_table"; + private final String snapshotUnpartitionedTableName = "iceberg_unpartitioned_table"; + private final String snapshotExternalDataFilesTableName = "iceberg_external_data_files_table"; + private final String snapshotNewTableLocationTableName = "iceberg_new_table_location_table"; + private final String snapshotAdditionalPropertiesTableName = + "iceberg_additional_properties_table"; + private final String snapshotTypeTestTableName = "iceberg_type_test_table"; + private String partitionedLocation; + private String unpartitionedLocation; + private String newIcebergTableLocation; + private String externalDataFilesTableLocation; + private String typeTestTableLocation; + private Dataset typeTestDataFrame; + private Dataset nestedDataFrame; + + @Parameterized.Parameters(name = "Catalog Name {0} - Options {2}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] { + icebergCatalogName, + SparkCatalog.class.getName(), + ImmutableMap.of( + "type", + "hive", + "default-namespace", + "default", + "parquet-enabled", + "true", + "cache-enabled", + "false" // Spark will delete tables using v1, leaving the cache out of sync + ) + } + }; + } + + @Rule public TemporaryFolder temp1 = new TemporaryFolder(); + @Rule public TemporaryFolder temp2 = new TemporaryFolder(); + @Rule public TemporaryFolder temp3 = new TemporaryFolder(); + @Rule public TemporaryFolder temp4 = new TemporaryFolder(); + @Rule public TemporaryFolder temp5 = new TemporaryFolder(); + + public TestSnapshotDeltaLakeTable( + String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + spark.conf().set("spark.sql.catalog." + defaultSparkCatalog, DeltaCatalog.class.getName()); + } + + @Before + public void before() throws IOException { + File partitionedFolder = temp1.newFolder(); + File unpartitionedFolder = temp2.newFolder(); + File newIcebergTableFolder = temp3.newFolder(); + File externalDataFilesTableFolder = temp4.newFolder(); + File typeTestTableFolder = temp5.newFolder(); + partitionedLocation = partitionedFolder.toURI().toString(); + unpartitionedLocation = unpartitionedFolder.toURI().toString(); + newIcebergTableLocation = newIcebergTableFolder.toURI().toString(); + externalDataFilesTableLocation = externalDataFilesTableFolder.toURI().toString(); + typeTestTableLocation = typeTestTableFolder.toURI().toString(); + + spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", NAMESPACE)); + + partitionedIdentifier = destName(defaultSparkCatalog, partitionedTableName); + unpartitionedIdentifier = destName(defaultSparkCatalog, unpartitionedTableName); + externalDataFilesIdentifier = destName(defaultSparkCatalog, externalDataFilesTableName); + typeTestIdentifier = destName(defaultSparkCatalog, typeTestTableName); + + spark.sql(String.format("DROP TABLE IF EXISTS %s", partitionedIdentifier)); + spark.sql(String.format("DROP TABLE IF EXISTS %s", unpartitionedIdentifier)); + spark.sql(String.format("DROP TABLE IF EXISTS %s", externalDataFilesIdentifier)); + spark.sql(String.format("DROP TABLE IF EXISTS %s", typeTestIdentifier)); + + // generate the dataframe + nestedDataFrame = nestedDataFrame(); + typeTestDataFrame = typeTestDataFrame(); + + // write to delta tables + writeDeltaTable(nestedDataFrame, partitionedIdentifier, partitionedLocation, "id"); + writeDeltaTable(nestedDataFrame, unpartitionedIdentifier, unpartitionedLocation, null); + + // Delete a record from the table + spark.sql("DELETE FROM " + partitionedIdentifier + " WHERE id=3"); + spark.sql("DELETE FROM " + unpartitionedIdentifier + " WHERE id=3"); + + // Update a record + spark.sql("UPDATE " + partitionedIdentifier + " SET id=3 WHERE id=1"); + spark.sql("UPDATE " + unpartitionedIdentifier + " SET id=3 WHERE id=1"); + } + + @After + public void after() { + // Drop delta lake tables. + spark.sql( + String.format( + "DROP TABLE IF EXISTS %s", destName(defaultSparkCatalog, partitionedTableName))); + spark.sql( + String.format( + "DROP TABLE IF EXISTS %s", destName(defaultSparkCatalog, unpartitionedTableName))); + spark.sql( + String.format( + "DROP TABLE IF EXISTS %s", destName(defaultSparkCatalog, externalDataFilesTableName))); + spark.sql( + String.format("DROP TABLE IF EXISTS %s", destName(defaultSparkCatalog, typeTestTableName))); + + // Drop iceberg tables. + spark.sql( + String.format( + "DROP TABLE IF EXISTS %s", destName(icebergCatalogName, snapshotPartitionedTableName))); + spark.sql( + String.format( + "DROP TABLE IF EXISTS %s", + destName(icebergCatalogName, snapshotUnpartitionedTableName))); + spark.sql( + String.format( + "DROP TABLE IF EXISTS %s", + destName(icebergCatalogName, snapshotExternalDataFilesTableName))); + spark.sql( + String.format( + "DROP TABLE IF EXISTS %s", + destName(icebergCatalogName, snapshotNewTableLocationTableName))); + spark.sql( + String.format( + "DROP TABLE IF EXISTS %s", + destName(icebergCatalogName, snapshotAdditionalPropertiesTableName))); + spark.sql( + String.format( + "DROP TABLE IF EXISTS %s", destName(icebergCatalogName, snapshotTypeTestTableName))); + + spark.sql(String.format("DROP DATABASE IF EXISTS %s", NAMESPACE)); + } + + @Test + public void testBasicSnapshotPartitioned() { + String newTableIdentifier = destName(icebergCatalogName, snapshotPartitionedTableName); + SnapshotDeltaLakeTable.Result result = + DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( + spark, newTableIdentifier, partitionedLocation) + .execute(); + + checkSnapshotIntegrity(partitionedLocation, partitionedIdentifier, newTableIdentifier, result); + checkIcebergTableLocation(newTableIdentifier, partitionedLocation); + } + + @Test + public void testBasicSnapshotUnpartitioned() { + String newTableIdentifier = destName(icebergCatalogName, snapshotUnpartitionedTableName); + SnapshotDeltaLakeTable.Result result = + DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( + spark, newTableIdentifier, unpartitionedLocation) + .execute(); + + checkSnapshotIntegrity( + unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result); + checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation); + } + + @Test + public void testSnapshotWithNewLocation() { + String newTableIdentifier = destName(icebergCatalogName, snapshotNewTableLocationTableName); + SnapshotDeltaLakeTable.Result result = + DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( + spark, newTableIdentifier, partitionedLocation) + .tableLocation(newIcebergTableLocation) + .execute(); + + checkSnapshotIntegrity(partitionedLocation, partitionedIdentifier, newTableIdentifier, result); + checkIcebergTableLocation(newTableIdentifier, newIcebergTableLocation); + } + + @Test + public void testSnapshotWithAdditionalProperties() { + // add some properties to the original delta table + spark.sql( + "ALTER TABLE " + + unpartitionedIdentifier + + " SET TBLPROPERTIES ('foo'='bar', 'test0'='test0')"); + String newTableIdentifier = destName(icebergCatalogName, snapshotAdditionalPropertiesTableName); + SnapshotDeltaLakeTable.Result result = + DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( + spark, newTableIdentifier, unpartitionedLocation) + .tableProperty("test1", "test1") + .tableProperties( + ImmutableMap.of( + "test2", "test2", "test3", "test3", "test4", + "test4")) // add additional iceberg table properties + .execute(); + + checkSnapshotIntegrity( + unpartitionedLocation, unpartitionedIdentifier, newTableIdentifier, result); + checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation); + checkIcebergTableProperties( + newTableIdentifier, + ImmutableMap.of( + "foo", "bar", "test0", "test0", "test1", "test1", "test2", "test2", "test3", "test3", + "test4", "test4"), + unpartitionedLocation); + } + + @Test + public void testSnapshotTableWithExternalDataFiles() { + writeDeltaTable( + nestedDataFrame, externalDataFilesIdentifier, externalDataFilesTableLocation, null); + // Add parquet files to default.external_data_files_table. The newly added parquet files + // are not at the same location as the table. + addExternalDatafiles(externalDataFilesTableLocation, unpartitionedLocation); + + String newTableIdentifier = destName(icebergCatalogName, snapshotExternalDataFilesTableName); + SnapshotDeltaLakeTable.Result result = + DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( + spark, newTableIdentifier, externalDataFilesTableLocation) + .execute(); + checkSnapshotIntegrity( + externalDataFilesTableLocation, externalDataFilesIdentifier, newTableIdentifier, result); + checkIcebergTableLocation(newTableIdentifier, externalDataFilesTableLocation); + checkDataFilePathsIntegrity(newTableIdentifier, externalDataFilesTableLocation); + } + + @Test + public void testSnapshotSupportedTypes() { + writeDeltaTable(typeTestDataFrame, typeTestIdentifier, typeTestTableLocation, "stringCol"); + String newTableIdentifier = destName(icebergCatalogName, snapshotTypeTestTableName); + SnapshotDeltaLakeTable.Result result = + DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable( + spark, newTableIdentifier, typeTestTableLocation) + .tableProperty(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false") + .execute(); + checkSnapshotIntegrity(typeTestTableLocation, typeTestIdentifier, newTableIdentifier, result); + checkIcebergTableLocation(newTableIdentifier, typeTestTableLocation); + checkIcebergTableProperties( + newTableIdentifier, + ImmutableMap.of(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false"), + typeTestTableLocation); + } + + private void checkSnapshotIntegrity( + String deltaTableLocation, + String deltaTableIdentifier, + String icebergTableIdentifier, + SnapshotDeltaLakeTable.Result snapshotReport) { + DeltaLog deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), deltaTableLocation); + + List deltaTableContents = + spark.sql("SELECT * FROM " + deltaTableIdentifier).collectAsList(); + List icebergTableContents = + spark.sql("SELECT * FROM " + icebergTableIdentifier).collectAsList(); + + Assertions.assertThat(deltaTableContents).hasSize(icebergTableContents.size()); + Assertions.assertThat(deltaLog.update().getAllFiles()) + .hasSize((int) snapshotReport.snapshotDataFilesCount()); + Assertions.assertThat(icebergTableContents).containsAll(deltaTableContents); + Assertions.assertThat(deltaTableContents).containsAll(icebergTableContents); + } + + private void checkIcebergTableLocation(String icebergTableIdentifier, String expectedLocation) { + Table icebergTable = getIcebergTable(icebergTableIdentifier); + Assertions.assertThat(icebergTable.location()).isEqualTo(expectedLocation); + } + + private void checkIcebergTableProperties( + String icebergTableIdentifier, + Map expectedAdditionalProperties, + String deltaTableLocation) { + Table icebergTable = getIcebergTable(icebergTableIdentifier); + ImmutableMap.Builder expectedPropertiesBuilder = ImmutableMap.builder(); + // The snapshot action will put some fixed properties to the table + expectedPropertiesBuilder.put(SNAPSHOT_SOURCE_PROP, DELTA_SOURCE_VALUE); + expectedPropertiesBuilder.putAll(expectedAdditionalProperties); + ImmutableMap expectedProperties = expectedPropertiesBuilder.build(); + + Assertions.assertThat(icebergTable.properties().entrySet()) + .containsAll(expectedProperties.entrySet()); + Assertions.assertThat(icebergTable.properties()) + .containsEntry(ORIGINAL_LOCATION_PROP, deltaTableLocation); + } + + private void checkDataFilePathsIntegrity( + String icebergTableIdentifier, String deltaTableLocation) { + Table icebergTable = getIcebergTable(icebergTableIdentifier); + DeltaLog deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), deltaTableLocation); + // checkSnapshotIntegrity already checks the number of data files in the snapshot iceberg table + // equals that in the original delta lake table + List deltaTableDataFilePaths = + deltaLog.update().getAllFiles().stream() + .map(f -> getFullFilePath(f.getPath(), deltaLog.getPath().toString())) + .collect(Collectors.toList()); + icebergTable + .currentSnapshot() + .addedDataFiles(icebergTable.io()) + .forEach( + dataFile -> { + Assertions.assertThat(URI.create(dataFile.path().toString()).isAbsolute()).isTrue(); + Assertions.assertThat(deltaTableDataFilePaths).contains(dataFile.path().toString()); + }); + } + + private Table getIcebergTable(String icebergTableIdentifier) { + CatalogPlugin defaultCatalog = spark.sessionState().catalogManager().currentCatalog(); + Spark3Util.CatalogAndIdentifier catalogAndIdent = + Spark3Util.catalogAndIdentifier( + "test catalog", spark, icebergTableIdentifier, defaultCatalog); + return Spark3Util.loadIcebergCatalog(spark, catalogAndIdent.catalog().name()) + .loadTable(TableIdentifier.parse(catalogAndIdent.identifier().toString())); + } + + private String destName(String catalogName, String dest) { + if (catalogName.equals(defaultSparkCatalog)) { + return NAMESPACE + "." + catalogName + "_" + dest; + } + return catalogName + "." + NAMESPACE + "." + catalogName + "_" + dest; + } + + /** + * Add parquet files manually to a delta lake table to mock the situation that some data files are + * not in the same location as the delta lake table. The case that {@link AddFile#getPath()} or + * {@link RemoveFile#getPath()} returns absolute path. + * + *

The known issue makes it + * necessary to manually rebuild the AddFile to avoid deserialization error when committing the + * transaction. + */ + private void addExternalDatafiles( + String targetDeltaTableLocation, String sourceDeltaTableLocation) { + DeltaLog targetLog = + DeltaLog.forTable(spark.sessionState().newHadoopConf(), targetDeltaTableLocation); + OptimisticTransaction transaction = targetLog.startTransaction(); + DeltaLog sourceLog = + DeltaLog.forTable(spark.sessionState().newHadoopConf(), sourceDeltaTableLocation); + List newFiles = + sourceLog.update().getAllFiles().stream() + .map( + f -> + AddFile.builder( + getFullFilePath(f.getPath(), sourceLog.getPath().toString()), + f.getPartitionValues(), + f.getSize(), + System.currentTimeMillis(), + true) + .build()) + .collect(Collectors.toList()); + try { + transaction.commit(newFiles, new Operation(Operation.Name.UPDATE), "Delta-Lake/2.2.0"); + } catch (DeltaConcurrentModificationException e) { + throw new RuntimeException(e); + } + } + + private static String getFullFilePath(String path, String tableRoot) { + URI dataFileUri = URI.create(path); + try { + String decodedPath = new URLCodec().decode(path); + if (dataFileUri.isAbsolute()) { + return decodedPath; + } else { + return tableRoot + File.separator + decodedPath; + } + } catch (DecoderException e) { + throw new IllegalArgumentException(String.format("Cannot decode path %s", path), e); + } + } + + private Dataset typeTestDataFrame() { + return spark + .range(0, 5, 1, 5) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("dateCol", date_add(current_date(), 1)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(timestampCol AS STRING)")) + .withColumn("booleanCol", expr("longCol > 5")) + .withColumn("binaryCol", expr("CAST(longCol AS BINARY)")) + .withColumn("byteCol", expr("CAST(longCol AS BYTE)")) + .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))")) + .withColumn("shortCol", expr("CAST(longCol AS SHORT)")) + .withColumn("mapCol", expr("MAP(longCol, decimalCol)")) + .withColumn("arrayCol", expr("ARRAY(longCol)")) + .withColumn("structCol", expr("STRUCT(mapCol, arrayCol)")); + } + + private Dataset nestedDataFrame() { + return spark + .range(0, 5, 1, 5) + .withColumn("longCol", expr("id")) + .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))")) + .withColumn("magic_number", expr("rand(5) * 100")) + .withColumn("dateCol", date_add(current_date(), 1)) + .withColumn("dateString", expr("CAST(dateCol AS STRING)")) + .withColumn("random1", expr("CAST(rand(5) * 100 as LONG)")) + .withColumn("random2", expr("CAST(rand(51) * 100 as LONG)")) + .withColumn("random3", expr("CAST(rand(511) * 100 as LONG)")) + .withColumn("random4", expr("CAST(rand(15) * 100 as LONG)")) + .withColumn("random5", expr("CAST(rand(115) * 100 as LONG)")) + .withColumn("innerStruct1", expr("STRUCT(random1, random2)")) + .withColumn("innerStruct2", expr("STRUCT(random3, random4)")) + .withColumn("structCol1", expr("STRUCT(innerStruct1, innerStruct2)")) + .withColumn( + "innerStruct3", + expr("STRUCT(SHA1(CAST(random5 AS BINARY)), SHA1(CAST(random1 AS BINARY)))")) + .withColumn( + "structCol2", + expr( + "STRUCT(innerStruct3, STRUCT(SHA1(CAST(random2 AS BINARY)), SHA1(CAST(random3 AS BINARY))))")) + .withColumn("arrayCol", expr("ARRAY(random1, random2, random3, random4, random5)")) + .withColumn("arrayStructCol", expr("ARRAY(innerStruct1, innerStruct1, innerStruct1)")) + .withColumn("mapCol1", expr("MAP(structCol1, structCol2)")) + .withColumn("mapCol2", expr("MAP(longCol, dateString)")) + .withColumn("mapCol3", expr("MAP(dateCol, arrayCol)")) + .withColumn("structCol3", expr("STRUCT(structCol2, mapCol3, arrayCol)")); + } + + private void writeDeltaTable( + Dataset df, String identifier, String path, String partitionColumn) { + if (partitionColumn != null) { + df.write() + .format("delta") + .mode(SaveMode.Append) + .option("path", path) + .partitionBy(partitionColumn) + .saveAsTable(identifier); + } else { + df.write().format("delta").mode(SaveMode.Append).option("path", path).saveAsTable(identifier); + } + } +} diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java new file mode 100644 index 000000000000..3ab561de9a58 --- /dev/null +++ b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.delta; + +import io.delta.standalone.DeltaLog; +import io.delta.standalone.VersionLog; +import io.delta.standalone.actions.Action; +import io.delta.standalone.actions.AddFile; +import io.delta.standalone.actions.RemoveFile; +import java.io.File; +import java.net.URI; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.net.URLCodec; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.OverwriteFiles; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.parquet.ParquetUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Takes a Delta Lake table's location and attempts to create an Iceberg table snapshot in an + * optional user-specified location (default to the Delta Lake table's location) with a different + * identifier. + */ +class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable { + + private static final Logger LOG = LoggerFactory.getLogger(BaseSnapshotDeltaLakeTableAction.class); + + private static final String SNAPSHOT_SOURCE_PROP = "snapshot_source"; + private static final String DELTA_SOURCE_VALUE = "delta"; + private static final String ORIGINAL_LOCATION_PROP = "original_location"; + private static final String PARQUET_SUFFIX = ".parquet"; + private final ImmutableMap.Builder additionalPropertiesBuilder = + ImmutableMap.builder(); + private DeltaLog deltaLog; + private Catalog icebergCatalog; + private final String deltaTableLocation; + private TableIdentifier newTableIdentifier; + private String newTableLocation; + private HadoopFileIO deltaLakeFileIO; + private long deltaStartVersion; + + /** + * Snapshot a delta lake table to be an iceberg table. The action will read the delta lake table's + * log through the table's path, create a new iceberg table using the given icebergCatalog and + * newTableIdentifier, and commit all changes in one iceberg transaction. + * + *

The new table will only be created if the snapshot is successful. + * + * @param deltaTableLocation the delta lake table's path + */ + BaseSnapshotDeltaLakeTableAction(String deltaTableLocation) { + this.deltaTableLocation = deltaTableLocation; + this.newTableLocation = deltaTableLocation; + } + + @Override + public SnapshotDeltaLakeTable tableProperties(Map properties) { + additionalPropertiesBuilder.putAll(properties); + return this; + } + + @Override + public SnapshotDeltaLakeTable tableProperty(String name, String value) { + additionalPropertiesBuilder.put(name, value); + return this; + } + + @Override + public SnapshotDeltaLakeTable tableLocation(String location) { + this.newTableLocation = location; + return this; + } + + @Override + public SnapshotDeltaLakeTable as(TableIdentifier identifier) { + this.newTableIdentifier = identifier; + return this; + } + + @Override + public SnapshotDeltaLakeTable icebergCatalog(Catalog catalog) { + this.icebergCatalog = catalog; + return this; + } + + @Override + public SnapshotDeltaLakeTable deltaLakeConfiguration(Configuration conf) { + this.deltaLog = DeltaLog.forTable(conf, deltaTableLocation); + this.deltaLakeFileIO = new HadoopFileIO(conf); + // get the earliest version available in the delta lake table + this.deltaStartVersion = deltaLog.getVersionAtOrAfterTimestamp(0L); + return this; + } + + @Override + public Result execute() { + Preconditions.checkArgument( + icebergCatalog != null && newTableIdentifier != null, + "Iceberg catalog and identifier cannot be null. Make sure to configure the action with a valid Iceberg catalog and identifier."); + Preconditions.checkArgument( + deltaLog != null && deltaLakeFileIO != null, + "Make sure to configure the action with a valid deltaLakeConfiguration"); + Preconditions.checkArgument( + deltaLog.tableExists(), + "Delta lake table does not exist at the given location: %s", + deltaTableLocation); + io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update(); + Schema schema = convertDeltaLakeSchema(updatedSnapshot.getMetadata().getSchema()); + PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(schema); + Transaction icebergTransaction = + icebergCatalog.newCreateTableTransaction( + newTableIdentifier, + schema, + partitionSpec, + newTableLocation, + destTableProperties(updatedSnapshot, deltaTableLocation)); + icebergTransaction + .table() + .updateProperties() + .set( + TableProperties.DEFAULT_NAME_MAPPING, + NameMappingParser.toJson(MappingUtil.create(icebergTransaction.table().schema()))) + .commit(); + + Iterator versionLogIterator = + deltaLog.getChanges( + deltaStartVersion, false // not throw exception when data loss detected + ); + while (versionLogIterator.hasNext()) { + VersionLog versionLog = versionLogIterator.next(); + commitDeltaVersionLogToIcebergTransaction(versionLog, icebergTransaction); + } + + Snapshot icebergSnapshot = icebergTransaction.table().currentSnapshot(); + long totalDataFiles = + icebergSnapshot != null + ? Long.parseLong(icebergSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)) + : 0; + + icebergTransaction.commitTransaction(); + LOG.info( + "Successfully created Iceberg table {} from delta lake table at {}, total data file count: {}", + newTableIdentifier, + deltaTableLocation, + totalDataFiles); + return new BaseSnapshotDeltaLakeTableActionResult(totalDataFiles); + } + + private Schema convertDeltaLakeSchema(io.delta.standalone.types.StructType deltaSchema) { + Type converted = + DeltaLakeDataTypeVisitor.visit(deltaSchema, new DeltaLakeTypeToType(deltaSchema)); + return new Schema(converted.asNestedType().asStructType().fields()); + } + + private PartitionSpec getPartitionSpecFromDeltaSnapshot(Schema schema) { + List partitionNames = deltaLog.snapshot().getMetadata().getPartitionColumns(); + if (partitionNames.isEmpty()) { + return PartitionSpec.unpartitioned(); + } + + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + for (String partitionName : partitionNames) { + builder.identity(partitionName); + } + return builder.build(); + } + + /** + * Iterate through the {@code VersionLog} to determine the update type and commit the update to + * the given {@code Transaction}. + * + *

There are 3 cases: + * + *

1. AppendFiles - when there are only AddFile instances (an INSERT on the table) + * + *

2. DeleteFiles - when there are only RemoveFile instances (a DELETE where all the records of + * file(s) were removed) + * + *

3. OverwriteFiles - when there are a mix of AddFile and RemoveFile (a DELETE/UPDATE) + * + * @param versionLog the delta log version to commit to iceberg table transaction + * @param transaction the iceberg table transaction to commit to + */ + private void commitDeltaVersionLogToIcebergTransaction( + VersionLog versionLog, Transaction transaction) { + List dataFileActions; + if (versionLog.getVersion() == deltaStartVersion) { + // The first version is a special case, since it represents the initial table state. + // Need to get all dataFiles from the corresponding delta snapshot to construct the table. + dataFileActions = + deltaLog.getSnapshotForVersionAsOf(deltaStartVersion).getAllFiles().stream() + .map(addFile -> (Action) addFile) + .collect(Collectors.toList()); + } else { + // Only need actions related to data change: AddFile and RemoveFile + dataFileActions = + versionLog.getActions().stream() + .filter(action -> action instanceof AddFile || action instanceof RemoveFile) + .collect(Collectors.toList()); + } + + List filesToAdd = Lists.newArrayList(); + List filesToRemove = Lists.newArrayList(); + for (Action action : dataFileActions) { + DataFile dataFile = buildDataFileFromAction(action, transaction.table()); + if (action instanceof AddFile) { + filesToAdd.add(dataFile); + } else if (action instanceof RemoveFile) { + filesToRemove.add(dataFile); + } else { + throw new ValidationException( + "The action %s's is unsupported", action.getClass().getSimpleName()); + } + } + + if (filesToAdd.size() > 0 && filesToRemove.size() > 0) { + // OverwriteFiles case + OverwriteFiles overwriteFiles = transaction.newOverwrite(); + filesToAdd.forEach(overwriteFiles::addFile); + filesToRemove.forEach(overwriteFiles::deleteFile); + overwriteFiles.commit(); + } else if (filesToAdd.size() > 0) { + // AppendFiles case + AppendFiles appendFiles = transaction.newAppend(); + filesToAdd.forEach(appendFiles::appendFile); + appendFiles.commit(); + } else if (filesToRemove.size() > 0) { + // DeleteFiles case + DeleteFiles deleteFiles = transaction.newDelete(); + filesToRemove.forEach(deleteFiles::deleteFile); + deleteFiles.commit(); + } + } + + private DataFile buildDataFileFromAction(Action action, Table table) { + PartitionSpec spec = table.spec(); + String path; + long fileSize; + Long nullableFileSize; + Map partitionValues; + + if (action instanceof AddFile) { + AddFile addFile = (AddFile) action; + path = addFile.getPath(); + nullableFileSize = addFile.getSize(); + partitionValues = addFile.getPartitionValues(); + } else if (action instanceof RemoveFile) { + RemoveFile removeFile = (RemoveFile) action; + path = removeFile.getPath(); + nullableFileSize = removeFile.getSize().orElse(null); + partitionValues = removeFile.getPartitionValues(); + } else { + throw new ValidationException( + "Unexpected action type for Delta Lake: %s", action.getClass().getSimpleName()); + } + + String fullFilePath = getFullFilePath(path, deltaLog.getPath().toString()); + // For unpartitioned table, the partitionValues should be an empty map rather than null + Preconditions.checkArgument( + partitionValues != null, + String.format("File %s does not specify a partitionValues", fullFilePath)); + + FileFormat format = determineFileFormatFromPath(fullFilePath); + InputFile file = deltaLakeFileIO.newInputFile(fullFilePath); + + // If the file size is not specified, the size should be read from the file + if (nullableFileSize != null) { + fileSize = nullableFileSize; + } else { + fileSize = file.getLength(); + } + + // get metrics from the file + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); + NameMapping nameMapping = + nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null; + Metrics metrics = getMetricsForFile(file, format, metricsConfig, nameMapping); + + String partition = + spec.fields().stream() + .map(PartitionField::name) + .map(name -> String.format("%s=%s", name, partitionValues.get(name))) + .collect(Collectors.joining("/")); + + return DataFiles.builder(spec) + .withPath(fullFilePath) + .withFormat(format) + .withFileSizeInBytes(fileSize) + .withMetrics(metrics) + .withPartitionPath(partition) + .build(); + } + + private FileFormat determineFileFormatFromPath(String path) { + if (path.endsWith(PARQUET_SUFFIX)) { + return FileFormat.PARQUET; + } else { + throw new ValidationException("Do not support file format in path %s", path); + } + } + + private Metrics getMetricsForFile( + InputFile file, FileFormat format, MetricsConfig metricsSpec, NameMapping mapping) { + if (format == FileFormat.PARQUET) { + return ParquetUtil.fileMetrics(file, metricsSpec, mapping); + } + throw new ValidationException("Cannot get metrics from file format: %s", format); + } + + private Map destTableProperties( + io.delta.standalone.Snapshot deltaSnapshot, String originalLocation) { + additionalPropertiesBuilder.putAll(deltaSnapshot.getMetadata().getConfiguration()); + additionalPropertiesBuilder.putAll( + ImmutableMap.of( + SNAPSHOT_SOURCE_PROP, DELTA_SOURCE_VALUE, ORIGINAL_LOCATION_PROP, originalLocation)); + + return additionalPropertiesBuilder.build(); + } + + /** + * Get the full file path, the input {@code String} path can be either a relative path or an + * absolute path of a data file in delta table + * + * @param path the return value of {@link AddFile#getPath()} or {@link RemoveFile#getPath()} + * (either absolute or relative) + * @param tableRoot the root path of the delta table + */ + private static String getFullFilePath(String path, String tableRoot) { + URI dataFileUri = URI.create(path); + try { + String decodedPath = new URLCodec().decode(path); + if (dataFileUri.isAbsolute()) { + return decodedPath; + } else { + return tableRoot + File.separator + decodedPath; + } + } catch (DecoderException e) { + throw new IllegalArgumentException(String.format("Cannot decode path %s", path), e); + } + } +} diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableActionResult.java b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableActionResult.java new file mode 100644 index 000000000000..53c9b0d7fe8b --- /dev/null +++ b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableActionResult.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.delta; + +class BaseSnapshotDeltaLakeTableActionResult implements SnapshotDeltaLakeTable.Result { + + private final long snapshotDataFilesCount; + + BaseSnapshotDeltaLakeTableActionResult(long snapshotDataFilesCount) { + this.snapshotDataFilesCount = snapshotDataFilesCount; + } + + @Override + public long snapshotDataFilesCount() { + return snapshotDataFilesCount; + } +} diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeDataTypeVisitor.java b/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeDataTypeVisitor.java new file mode 100644 index 000000000000..8af654a97b38 --- /dev/null +++ b/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeDataTypeVisitor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.delta; + +import io.delta.standalone.types.ArrayType; +import io.delta.standalone.types.DataType; +import io.delta.standalone.types.MapType; +import io.delta.standalone.types.StructField; +import io.delta.standalone.types.StructType; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +abstract class DeltaLakeDataTypeVisitor { + public static T visit(DataType type, DeltaLakeDataTypeVisitor visitor) { + if (type instanceof StructType) { + StructField[] fields = ((StructType) type).getFields(); + List fieldResults = Lists.newArrayListWithExpectedSize(fields.length); + + for (StructField field : fields) { + fieldResults.add(visitor.field(field, visit(field.getDataType(), visitor))); + } + + return visitor.struct((StructType) type, fieldResults); + + } else if (type instanceof MapType) { + return visitor.map( + (MapType) type, + visit(((MapType) type).getKeyType(), visitor), + visit(((MapType) type).getValueType(), visitor)); + + } else if (type instanceof ArrayType) { + return visitor.array((ArrayType) type, visit(((ArrayType) type).getElementType(), visitor)); + + } else { + return visitor.atomic(type); + } + } + + public abstract T struct(StructType struct, List fieldResults); + + public abstract T field(StructField field, T typeResult); + + public abstract T array(ArrayType array, T elementResult); + + public abstract T map(MapType map, T keyResult, T valueResult); + + public abstract T atomic(DataType atomic); +} diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeToIcebergMigrationActionsProvider.java b/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeToIcebergMigrationActionsProvider.java new file mode 100644 index 000000000000..8699eb3b5d26 --- /dev/null +++ b/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeToIcebergMigrationActionsProvider.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.delta; + +/** + * An API that provide actions for migration from a delta lake table to an iceberg table. Query + * engines can use {@code defaultActions()} to access default action implementations, or implement + * this provider to supply a different implementation if necessary. + */ +public interface DeltaLakeToIcebergMigrationActionsProvider { + + /** + * Initiates an action to snapshot an existing Delta Lake table to an Iceberg table. + * + * @param sourceTableLocation the location of the Delta Lake table + * @return a {@link SnapshotDeltaLakeTable} action + */ + default SnapshotDeltaLakeTable snapshotDeltaLakeTable(String sourceTableLocation) { + return new BaseSnapshotDeltaLakeTableAction(sourceTableLocation); + } + + /** + * Get the default implementation of {@link DeltaLakeToIcebergMigrationActionsProvider} + * + * @return an instance with access to all default actions + */ + static DeltaLakeToIcebergMigrationActionsProvider defaultActions() { + return DefaultDeltaLakeToIcebergMigrationActions.defaultMigrationActions(); + } + + class DefaultDeltaLakeToIcebergMigrationActions + implements DeltaLakeToIcebergMigrationActionsProvider { + private static final DefaultDeltaLakeToIcebergMigrationActions defaultMigrationActions = + new DefaultDeltaLakeToIcebergMigrationActions(); + + private DefaultDeltaLakeToIcebergMigrationActions() {} + + static DefaultDeltaLakeToIcebergMigrationActions defaultMigrationActions() { + return defaultMigrationActions; + } + } +} diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeTypeToType.java b/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeTypeToType.java new file mode 100644 index 000000000000..752fb11ba04c --- /dev/null +++ b/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeTypeToType.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.delta; + +import io.delta.standalone.types.ArrayType; +import io.delta.standalone.types.BinaryType; +import io.delta.standalone.types.BooleanType; +import io.delta.standalone.types.ByteType; +import io.delta.standalone.types.DataType; +import io.delta.standalone.types.DateType; +import io.delta.standalone.types.DecimalType; +import io.delta.standalone.types.DoubleType; +import io.delta.standalone.types.FloatType; +import io.delta.standalone.types.IntegerType; +import io.delta.standalone.types.LongType; +import io.delta.standalone.types.MapType; +import io.delta.standalone.types.ShortType; +import io.delta.standalone.types.StringType; +import io.delta.standalone.types.StructField; +import io.delta.standalone.types.StructType; +import io.delta.standalone.types.TimestampType; +import java.util.List; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +class DeltaLakeTypeToType extends DeltaLakeDataTypeVisitor { + private final StructType root; + private int nextId = 0; + + DeltaLakeTypeToType() { + this.root = null; + } + + DeltaLakeTypeToType(StructType root) { + this.root = root; + this.nextId = root.getFields().length; + } + + private int getNextId() { + int next = nextId; + nextId += 1; + return next; + } + + @Override + @SuppressWarnings("ReferenceEquality") + public Type struct(StructType struct, List types) { + StructField[] fields = struct.getFields(); + List newFields = Lists.newArrayListWithExpectedSize(fields.length); + boolean isRoot = root == struct; + for (int i = 0; i < fields.length; i += 1) { + StructField field = fields[i]; + Type type = types.get(i); + + int id; + if (isRoot) { + // for new conversions, use ordinals for ids in the root struct + id = i; + } else { + id = getNextId(); + } + + String doc = + field.getMetadata().contains("comment") + ? field.getMetadata().get("comment").toString() + : null; + + if (field.isNullable()) { + newFields.add(Types.NestedField.optional(id, field.getName(), type, doc)); + } else { + newFields.add(Types.NestedField.required(id, field.getName(), type, doc)); + } + } + + return Types.StructType.of(newFields); + } + + @Override + public Type field(StructField field, Type typeResult) { + return typeResult; + } + + @Override + public Type array(ArrayType array, Type elementType) { + if (array.containsNull()) { + return Types.ListType.ofOptional(getNextId(), elementType); + } else { + return Types.ListType.ofRequired(getNextId(), elementType); + } + } + + @Override + public Type map(MapType map, Type keyType, Type valueType) { + if (map.valueContainsNull()) { + return Types.MapType.ofOptional(getNextId(), getNextId(), keyType, valueType); + } else { + return Types.MapType.ofRequired(getNextId(), getNextId(), keyType, valueType); + } + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + @Override + public Type atomic(DataType atomic) { + if (atomic instanceof BooleanType) { + return Types.BooleanType.get(); + + } else if (atomic instanceof IntegerType + || atomic instanceof ShortType + || atomic instanceof ByteType) { + return Types.IntegerType.get(); + + } else if (atomic instanceof LongType) { + return Types.LongType.get(); + + } else if (atomic instanceof FloatType) { + return Types.FloatType.get(); + + } else if (atomic instanceof DoubleType) { + return Types.DoubleType.get(); + + } else if (atomic instanceof StringType) { + return Types.StringType.get(); + + } else if (atomic instanceof DateType) { + return Types.DateType.get(); + + } else if (atomic instanceof TimestampType) { + return Types.TimestampType.withZone(); + + } else if (atomic instanceof DecimalType) { + return Types.DecimalType.of( + ((DecimalType) atomic).getPrecision(), ((DecimalType) atomic).getScale()); + } else if (atomic instanceof BinaryType) { + return Types.BinaryType.get(); + } + + throw new ValidationException("Not a supported type: %s", atomic.getCatalogString()); + } +} diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/SnapshotDeltaLakeTable.java b/delta-lake/src/main/java/org/apache/iceberg/delta/SnapshotDeltaLakeTable.java new file mode 100644 index 000000000000..6f4bd726a58b --- /dev/null +++ b/delta-lake/src/main/java/org/apache/iceberg/delta/SnapshotDeltaLakeTable.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.delta; + +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.actions.Action; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; + +/** Snapshot an existing Delta Lake table to Iceberg in place. */ +public interface SnapshotDeltaLakeTable + extends Action { + + /** + * Sets table properties in the newly created Iceberg table. Any properties with the same key name + * will be overwritten. + * + * @param properties a map of properties to set + * @return this for method chaining + */ + SnapshotDeltaLakeTable tableProperties(Map properties); + + /** + * Sets a table property in the newly created Iceberg table. Any properties with the same key will + * be overwritten. + * + * @param name a table property name + * @param value a table property value + * @return this for method chaining + */ + SnapshotDeltaLakeTable tableProperty(String name, String value); + + /** + * Sets the location of the newly created Iceberg table. Default location is the same as the Delta + * Lake table. + * + * @param location a path to the new table location + * @return this for method chaining + */ + SnapshotDeltaLakeTable tableLocation(String location); + + /** + * Sets the identifier of the newly created Iceberg table. This is required to be set before + * execute the action. + * + * @param identifier a table identifier (namespace, name) @Returns this for method chaining + */ + SnapshotDeltaLakeTable as(TableIdentifier identifier); + + /** + * Sets the catalog of the newly created Iceberg table. This is required to be set before execute + * the action + * + * @param catalog a catalog @Returns this for method chaining + */ + SnapshotDeltaLakeTable icebergCatalog(Catalog catalog); + + /** + * Sets the Hadoop configuration used to access delta lake table's logs and datafiles. This is + * required to be set before execute the action. + * + * @param conf a Hadoop configuration @Returns this for method chaining + */ + SnapshotDeltaLakeTable deltaLakeConfiguration(Configuration conf); + + /** The action result that contains a summary of the execution. */ + interface Result { + + /** Returns the number of migrated data files. */ + long snapshotDataFilesCount(); + } +} diff --git a/delta-lake/src/test/java/org/apache/iceberg/delta/TestBaseSnapshotDeltaLakeTableAction.java b/delta-lake/src/test/java/org/apache/iceberg/delta/TestBaseSnapshotDeltaLakeTableAction.java new file mode 100644 index 000000000000..41261d4ab975 --- /dev/null +++ b/delta-lake/src/test/java/org/apache/iceberg/delta/TestBaseSnapshotDeltaLakeTableAction.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.delta; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.assertj.core.api.Assertions; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestBaseSnapshotDeltaLakeTableAction { + @Rule public TemporaryFolder temp1 = new TemporaryFolder(); + @Rule public TemporaryFolder temp2 = new TemporaryFolder(); + private String sourceTableLocation; + private final Configuration testHadoopConf = new Configuration(); + private String newTableLocation; + private final Catalog testCatalog = new TestCatalog(); + + @Before + public void before() throws IOException { + File sourceFolder = temp1.newFolder(); + File destFolder = temp2.newFolder(); + sourceTableLocation = sourceFolder.toURI().toString(); + newTableLocation = destFolder.toURI().toString(); + } + + @Test + public void testRequiredTableIdentifier() { + SnapshotDeltaLakeTable testAction = + new BaseSnapshotDeltaLakeTableAction(sourceTableLocation) + .icebergCatalog(testCatalog) + .deltaLakeConfiguration(testHadoopConf) + .tableLocation(newTableLocation); + Assertions.assertThatThrownBy(testAction::execute) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Iceberg catalog and identifier cannot be null. Make sure to configure the action with a valid Iceberg catalog and identifier."); + } + + @Test + public void testRequiredIcebergCatalog() { + SnapshotDeltaLakeTable testAction = + new BaseSnapshotDeltaLakeTableAction(sourceTableLocation) + .as(TableIdentifier.of("test", "test")) + .deltaLakeConfiguration(testHadoopConf) + .tableLocation(newTableLocation); + Assertions.assertThatThrownBy(testAction::execute) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Iceberg catalog and identifier cannot be null. Make sure to configure the action with a valid Iceberg catalog and identifier."); + } + + @Test + public void testRequiredDeltaLakeConfiguration() { + SnapshotDeltaLakeTable testAction = + new BaseSnapshotDeltaLakeTableAction(sourceTableLocation) + .as(TableIdentifier.of("test", "test")) + .icebergCatalog(testCatalog) + .tableLocation(newTableLocation); + Assertions.assertThatThrownBy(testAction::execute) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Make sure to configure the action with a valid deltaLakeConfiguration"); + } + + @Test + public void testDeltaTableNotExist() { + SnapshotDeltaLakeTable testAction = + new BaseSnapshotDeltaLakeTableAction(sourceTableLocation) + .as(TableIdentifier.of("test", "test")) + .deltaLakeConfiguration(testHadoopConf) + .icebergCatalog(testCatalog) + .tableLocation(newTableLocation); + Assertions.assertThatThrownBy(testAction::execute) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Delta lake table does not exist at the given location: %s", sourceTableLocation); + } + + private static class TestCatalog extends BaseMetastoreCatalog { + TestCatalog() {} + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + return null; + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + return null; + } + + @Override + public List listTables(Namespace namespace) { + return null; + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + return false; + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) {} + } +} diff --git a/delta-lake/src/test/java/org/apache/iceberg/delta/TestDeltaLakeTypeToType.java b/delta-lake/src/test/java/org/apache/iceberg/delta/TestDeltaLakeTypeToType.java new file mode 100644 index 000000000000..29a5c63c3d22 --- /dev/null +++ b/delta-lake/src/test/java/org/apache/iceberg/delta/TestDeltaLakeTypeToType.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.delta; + +import io.delta.standalone.types.ArrayType; +import io.delta.standalone.types.BinaryType; +import io.delta.standalone.types.BooleanType; +import io.delta.standalone.types.DoubleType; +import io.delta.standalone.types.LongType; +import io.delta.standalone.types.MapType; +import io.delta.standalone.types.NullType; +import io.delta.standalone.types.StringType; +import io.delta.standalone.types.StructType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.Before; +import org.junit.Test; + +public class TestDeltaLakeTypeToType { + private static final String optionalBooleanType = "testNullableBoolType"; + private static final String requiredBinaryType = "testRequiredBinaryType"; + private static final String doubleArrayType = "testNullableArrayType"; + private static final String structArrayType = "testStructArrayType"; + private static final String innerAtomicSchema = "testInnerAtomicSchema"; + private static final String stringLongMapType = "testStringLongMap"; + private static final String nullType = "testNullType"; + private StructType deltaAtomicSchema; + private StructType deltaNestedSchema; + private StructType deltaShallowNullTypeSchema; + private StructType deltaNullTypeSchema; + + @Before + public void constructDeltaLakeSchema() { + deltaAtomicSchema = + new StructType() + .add(optionalBooleanType, new BooleanType()) + .add(requiredBinaryType, new BinaryType(), false); + deltaNestedSchema = + new StructType() + .add(innerAtomicSchema, deltaAtomicSchema) + .add(doubleArrayType, new ArrayType(new DoubleType(), true), false) + .add(structArrayType, new ArrayType(deltaAtomicSchema, true), false) + .add(stringLongMapType, new MapType(new StringType(), new LongType(), false), false); + deltaNullTypeSchema = + new StructType() + .add(innerAtomicSchema, deltaAtomicSchema) + .add(doubleArrayType, new ArrayType(new DoubleType(), true), false) + .add(stringLongMapType, new MapType(new NullType(), new LongType(), false), false); + deltaShallowNullTypeSchema = new StructType().add(nullType, new NullType(), false); + } + + @Test + public void testAtomicTypeConversion() { + Type converted = + DeltaLakeDataTypeVisitor.visit( + deltaAtomicSchema, new DeltaLakeTypeToType(deltaAtomicSchema)); + Schema convertedSchema = new Schema(converted.asNestedType().asStructType().fields()); + + Assertions.assertThat(convertedSchema.findType(optionalBooleanType)) + .isInstanceOf(Types.BooleanType.class); + Assertions.assertThat(convertedSchema.findField(optionalBooleanType).isOptional()).isTrue(); + Assertions.assertThat(convertedSchema.findType(requiredBinaryType)) + .isInstanceOf(Types.BinaryType.class); + Assertions.assertThat(convertedSchema.findField(requiredBinaryType).isRequired()).isTrue(); + } + + @Test + public void testNestedTypeConversion() { + Type converted = + DeltaLakeDataTypeVisitor.visit( + deltaNestedSchema, new DeltaLakeTypeToType(deltaNestedSchema)); + Schema convertedSchema = new Schema(converted.asNestedType().asStructType().fields()); + + Assertions.assertThat(convertedSchema.findType(innerAtomicSchema)) + .isInstanceOf(Types.StructType.class); + Assertions.assertThat(convertedSchema.findField(innerAtomicSchema).isOptional()).isTrue(); + Assertions.assertThat( + convertedSchema + .findType(innerAtomicSchema) + .asStructType() + .fieldType(optionalBooleanType)) + .isInstanceOf(Types.BooleanType.class); + Assertions.assertThat( + convertedSchema + .findType(innerAtomicSchema) + .asStructType() + .fieldType(requiredBinaryType)) + .isInstanceOf(Types.BinaryType.class); + Assertions.assertThat( + convertedSchema + .findType(innerAtomicSchema) + .asStructType() + .field(requiredBinaryType) + .isRequired()) + .isTrue(); + Assertions.assertThat(convertedSchema.findType(stringLongMapType)) + .isInstanceOf(Types.MapType.class); + Assertions.assertThat(convertedSchema.findType(stringLongMapType).asMapType().keyType()) + .isInstanceOf(Types.StringType.class); + Assertions.assertThat(convertedSchema.findType(stringLongMapType).asMapType().valueType()) + .isInstanceOf(Types.LongType.class); + Assertions.assertThat(convertedSchema.findType(doubleArrayType)) + .isInstanceOf(Types.ListType.class); + Assertions.assertThat(convertedSchema.findField(doubleArrayType).isRequired()).isTrue(); + Assertions.assertThat( + convertedSchema.findType(doubleArrayType).asListType().isElementOptional()) + .isTrue(); + Assertions.assertThat(convertedSchema.findType(structArrayType)) + .isInstanceOf(Types.ListType.class); + Assertions.assertThat(convertedSchema.findField(structArrayType).isRequired()).isTrue(); + Assertions.assertThat( + convertedSchema.findType(structArrayType).asListType().isElementOptional()) + .isTrue(); + Assertions.assertThat(convertedSchema.findType(structArrayType).asListType().elementType()) + .isInstanceOf(Types.StructType.class); + Assertions.assertThat( + convertedSchema + .findType(structArrayType) + .asListType() + .elementType() + .asStructType() + .fieldType(optionalBooleanType)) + .isInstanceOf(Types.BooleanType.class); + Assertions.assertThat( + convertedSchema + .findType(structArrayType) + .asListType() + .elementType() + .asStructType() + .field(optionalBooleanType) + .isOptional()) + .isTrue(); + Assertions.assertThat( + convertedSchema + .findType(structArrayType) + .asListType() + .elementType() + .asStructType() + .fieldType(requiredBinaryType)) + .isInstanceOf(Types.BinaryType.class); + Assertions.assertThat( + convertedSchema + .findType(structArrayType) + .asListType() + .elementType() + .asStructType() + .field(requiredBinaryType) + .isRequired()) + .isTrue(); + } + + @Test + public void testNullTypeConversion() { + Assertions.assertThatThrownBy( + () -> + DeltaLakeDataTypeVisitor.visit( + deltaNullTypeSchema, new DeltaLakeTypeToType(deltaNullTypeSchema))) + .isInstanceOf(ValidationException.class) + .hasMessage(String.format("Not a supported type: %s", new NullType().getCatalogString())); + Assertions.assertThatThrownBy( + () -> + DeltaLakeDataTypeVisitor.visit( + deltaShallowNullTypeSchema, + new DeltaLakeTypeToType(deltaShallowNullTypeSchema))) + .isInstanceOf(ValidationException.class) + .hasMessage(String.format("Not a supported type: %s", new NullType().getCatalogString())); + } +} diff --git a/settings.gradle b/settings.gradle index c5ac07e080c2..ea528cf4427e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -35,6 +35,7 @@ include 'nessie' include 'gcp' include 'dell' include 'snowflake' +include 'delta-lake' project(':api').name = 'iceberg-api' project(':common').name = 'iceberg-common' @@ -53,6 +54,7 @@ project(':nessie').name = 'iceberg-nessie' project(':gcp').name = 'iceberg-gcp' project(':dell').name = 'iceberg-dell' project(':snowflake').name = 'iceberg-snowflake' +project(':delta-lake').name = 'iceberg-delta-lake' if (null != System.getProperty("allVersions")) { System.setProperty("flinkVersions", System.getProperty("knownFlinkVersions")) diff --git a/versions.props b/versions.props index 4e4d30dbc284..24d7045f83f9 100644 --- a/versions.props +++ b/versions.props @@ -29,6 +29,7 @@ org.scala-lang.modules:scala-collection-compat_2.13 = 2.6.0 com.emc.ecs:object-client-bundle = 3.3.2 org.immutables:value = 2.9.2 net.snowflake:snowflake-jdbc = 3.13.22 +io.delta:delta-standalone_* = 0.6.0 # test deps org.junit.vintage:junit-vintage-engine = 5.8.2 @@ -46,3 +47,4 @@ org.mock-server:mockserver-client-java = 5.13.2 com.esotericsoftware:kryo = 4.0.2 org.eclipse.jetty:* = 9.4.43.v20210629 org.testcontainers:* = 1.17.5 +io.delta:delta-core_* = 2.2.0