Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta: Support Snapshot Delta Lake Table to Iceberg Table #6449

Merged
merged 57 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from 53 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
73e38e5
Cannot pass tests for unknown reason
JonasJ-ap Dec 18, 2022
5544f45
fix test config issue and formatting
JonasJ-ap Dec 18, 2022
b8b6119
transfer icebergCatalog to parent class
JonasJ-ap Dec 18, 2022
274560c
fix formatting
JonasJ-ap Dec 18, 2022
39e3541
implement direct schema transformation and stop using spark context t…
JonasJ-ap Dec 19, 2022
92f962c
refactor and simplify the implementation
JonasJ-ap Dec 19, 2022
033c997
no need to make spark utils public
JonasJ-ap Dec 19, 2022
681a32f
simplify the implementation in Spark Action
JonasJ-ap Dec 19, 2022
77bdb27
add support for scala 2.13
JonasJ-ap Dec 19, 2022
3dd540a
add format check for data files
JonasJ-ap Dec 19, 2022
27ece93
fix naming issue
JonasJ-ap Dec 19, 2022
a9faabf
make delta type visitor abstract
JonasJ-ap Dec 21, 2022
3982711
fix typo and nit problems
JonasJ-ap Dec 21, 2022
9a7c443
migrate from iceberg-core to delta-lake
JonasJ-ap Dec 21, 2022
173534e
move get Metrics for File to iceberg-delta-lake
JonasJ-ap Dec 21, 2022
bdd1ccf
fix comment
JonasJ-ap Dec 21, 2022
85abac2
fix wrong import
JonasJ-ap Dec 21, 2022
32e1af8
Migrate delta to iceberg round 1 (#29)
JonasJ-ap Dec 24, 2022
ac1141d
Migrate delta to iceberg util refactor (#30)
JonasJ-ap Dec 25, 2022
8e9b3fc
Migrate delta to iceberg refactor 1.5 (#31)
JonasJ-ap Dec 28, 2022
12b60ca
Merge branch 'master' into migrate_delta_to_iceberg
JonasJ-ap Dec 28, 2022
8a8adef
use transaction, refactor structure, add optional newTableLocation, a…
JonasJ-ap Dec 29, 2022
6fbf740
fix the potential path error due to ambiguous return value of AddFile…
JonasJ-ap Dec 29, 2022
69671b9
refactor getFullPath with unit tests, use newCreateTableTransaction, …
JonasJ-ap Dec 30, 2022
e3138a6
allow user to specify a custom location for migrated table, fix load …
JonasJ-ap Jan 3, 2023
2e8dfd0
Fix nit problems and optimize some implementation (#38)
JonasJ-ap Jan 6, 2023
f4589e8
optimize the constructor to be more clean
JonasJ-ap Jan 6, 2023
59c96cb
move everthing to iceberg-delta-lake, build demo integration test (#35)
JonasJ-ap Jan 7, 2023
afd783b
optimize api structure, refactor the integration test, add more tests…
JonasJ-ap Jan 9, 2023
5b95925
refactor the interfaces, add new tests to integration tests, add new …
JonasJ-ap Jan 10, 2023
f43c325
fix error messages and add default implementation for actionProvider …
JonasJ-ap Jan 10, 2023
b2a8bfe
refactor the default implementation and javadoc (#43)
JonasJ-ap Jan 10, 2023
450a08c
fix error when migrating table with nested fields, add CI, upgrade te…
JonasJ-ap Jan 12, 2023
300d39b
remove unused helper class in test
JonasJ-ap Jan 12, 2023
a285c4a
add null check for stopMetastoreAndSpark, remove unnecessary try-catch
JonasJ-ap Jan 12, 2023
5760a83
use assertions.assertThatThrownBy to test precondition checks
JonasJ-ap Jan 12, 2023
e41c787
use assertThat to fix assert.True in TestDeltaLakeTypeToType
JonasJ-ap Jan 12, 2023
7a16809
use AssertionJ in TestSnapshotDeltaLakeTable
JonasJ-ap Jan 13, 2023
7072612
fix format and nit issue
JonasJ-ap Jan 13, 2023
c2293c9
remove unnecessary fields and class and let integrationTest collected…
JonasJ-ap Jan 13, 2023
f38d7b1
remove unnecessary try catch
JonasJ-ap Jan 13, 2023
10163f8
fix wrong modifier of a private method
JonasJ-ap Jan 13, 2023
99dbba8
Merge remote-tracking branch 'origin/master' into migrate_delta_to_ic…
JonasJ-ap Jan 15, 2023
a7c3de1
simplify the test base (#46)
JonasJ-ap Jan 17, 2023
6c4ab2c
save getLength Call for AddFile and when RemoveFile contains size (#47)
JonasJ-ap Jan 20, 2023
dadd76a
add null check for table.currentSnapshot() when querying the total nu…
JonasJ-ap Jan 21, 2023
1cd36b9
Refactor iceberg-delta's integration test(#48)
JonasJ-ap Jan 24, 2023
4463f30
Adapt for delta.logRetentionDuration (#49)
JonasJ-ap Jan 25, 2023
d3ccc86
fix comment and format issue
JonasJ-ap Jan 26, 2023
1affcb3
remove support for avro, orc since it can allow use to get rid of a d…
JonasJ-ap Feb 6, 2023
098a3a2
using resolvingFileIO instead
JonasJ-ap Feb 6, 2023
f0d1536
Merge remote-tracking branch 'origin/master' into migrate_delta_to_ic…
JonasJ-ap Feb 6, 2023
a98461a
rollback to hadoopFileIO
JonasJ-ap Feb 7, 2023
fe6da17
add test for array of structs
JonasJ-ap Feb 7, 2023
8e9a3e2
use Do not Support instead of cannot determine, remove support for av…
JonasJ-ap Feb 7, 2023
24405e0
nit fix
JonasJ-ap Feb 7, 2023
c5a6186
error message nit fix
JonasJ-ap Feb 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions .github/workflows/delta-conversion-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: "Delta Conversion CI"
on:
push:
branches:
- 'master'
- '0.**'
tags:
- 'apache-iceberg-**'
pull_request:
paths-ignore:
- '.github/ISSUE_TEMPLATE/iceberg_bug_report.yml'
- '.github/workflows/python-ci.yml'
- '.github/workflows/flink-ci.yml'
- '.github/workflows/hive-ci.yml'
- '.gitignore'
- '.asf.yml'
- 'dev/**'
- 'mr/**'
- 'hive3/**'
- 'hive3-orc-bundle/**'
- 'hive-runtime/**'
- 'flink/**'
- 'pig/**'
- 'python/**'
- 'python_legacy/**'
- 'docs/**'
- 'open-api/**'
- 'format/**'
- '.gitattributes'
- 'README.md'
- 'CONTRIBUTING.md'
- 'LICENSE'
- 'NOTICE'

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: ${{ github.event_name == 'pull_request' }}

jobs:
delta-conversion-scala-2-12-tests:
runs-on: ubuntu-20.04
strategy:
matrix:
jvm: [8, 11]
env:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v3
- uses: actions/setup-java@v3
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
- uses: actions/cache@v3
with:
path: |
~/.gradle/caches
~/.gradle/wrapper
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
restore-keys: ${{ runner.os }}-gradle-
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.12 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
- uses: actions/upload-artifact@v3
if: failure()
with:
name: test logs
path: |
**/build/testlogs

delta-conversion-scala-2-13-tests:
runs-on: ubuntu-20.04
strategy:
matrix:
jvm: [ 8, 11 ]
env:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v3
- uses: actions/setup-java@v3
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
- uses: actions/cache@v3
with:
path: |
~/.gradle/caches
~/.gradle/wrapper
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
restore-keys: ${{ runner.os }}-gradle-
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.13 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
- uses: actions/upload-artifact@v3
if: failure()
with:
name: test logs
path: |
**/build/testlogs
74 changes: 74 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ plugins {
id 'nebula.dependency-recommender' version '11.0.0'
}

String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")
String sparkVersionsString = System.getProperty("sparkVersions") != null ? System.getProperty("sparkVersions") : System.getProperty("defaultSparkVersions")
List<String> sparkVersions = sparkVersionsString != null && !sparkVersionsString.isEmpty() ? sparkVersionsString.split(",") : []

try {
// apply these plugins in a try-catch block so that we can handle cases without .git directory
apply plugin: 'com.palantir.git-version'
Expand Down Expand Up @@ -447,6 +451,76 @@ project(':iceberg-aws') {
}
}

project(':iceberg-delta-lake') {
// use integration test since we can take advantages of spark 3.3 to read datafiles of delta lake table
// and create some tests involving sql query.
configurations {
integrationImplementation.extendsFrom testImplementation
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved
integrationRuntime.extendsFrom testRuntimeOnly
}

dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
implementation project(':iceberg-common')
implementation project(':iceberg-core')
implementation project(':iceberg-parquet')
implementation "com.fasterxml.jackson.core:jackson-databind"

compileOnly "io.delta:delta-standalone_${scalaVersion}"

compileOnly("org.apache.hadoop:hadoop-common") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'javax.servlet', module: 'servlet-api'
exclude group: 'com.google.code.gson', module: 'gson'
}

// The newest version of delta-core uses Spark 3.3.*. Since its only for test, we do
// not need to include older version of delta-core
if (sparkVersions.contains("3.3")) {
integrationImplementation "io.delta:delta-core_${scalaVersion}"
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-3.3_${scalaVersion}")
integrationImplementation("org.apache.hadoop:hadoop-minicluster") {
exclude group: 'org.apache.avro', module: 'avro'
// to make sure netty libs only come from project(':iceberg-arrow')
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'io.netty', module: 'netty-common'
}
integrationImplementation project(path: ':iceberg-hive-metastore')
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
integrationImplementation("org.apache.spark:spark-hive_${scalaVersion}:3.3.1") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
exclude group: 'org.apache.parquet'
// to make sure netty libs only come from project(':iceberg-arrow')
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'io.netty', module: 'netty-common'
exclude group: 'org.roaringbitmap'
}
}
}

// The newest version of delta-core uses Spark 3.3.*. The integration test should only be built
// if iceberg-spark-3.3 is available
if (sparkVersions.contains("3.3")) {
sourceSets {
integration {
java.srcDir "$projectDir/src/integration/java"
resources.srcDir "$projectDir/src/integration/resources"
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
}
}

task integrationTest(type: Test) {
testClassesDirs = sourceSets.integration.output.classesDirs
classpath = sourceSets.integration.runtimeClasspath
}
check.dependsOn integrationTest
}
JonasJ-ap marked this conversation as resolved.
Show resolved Hide resolved
}

project(':iceberg-gcp') {
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.delta;

import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;

/** An example class shows how to use the delta lake migration actions in SparkContext. */
class DeltaLakeToIcebergMigrationSparkIntegration {

private DeltaLakeToIcebergMigrationSparkIntegration() {}

/**
* Example of how to use a {@link SparkSession}, a table identifier and a delta table location to
* construct an action for snapshotting the delta table to an iceberg table.
*
* @param spark a SparkSession with iceberg catalog configured.
* @param newTableIdentifier can be both 2 parts and 3 parts identifier, if it is 2 parts, the
* default spark catalog will be used
* @param deltaTableLocation the location of the delta table
* @return an instance of snapshot delta lake table action.
*/
static SnapshotDeltaLakeTable snapshotDeltaLakeTable(
jackye1995 marked this conversation as resolved.
Show resolved Hide resolved
SparkSession spark, String newTableIdentifier, String deltaTableLocation) {
JonasJ-ap marked this conversation as resolved.
Show resolved Hide resolved
Preconditions.checkArgument(
spark != null, "The SparkSession cannot be null, please provide a valid SparkSession");
Preconditions.checkArgument(
newTableIdentifier != null,
"The table identifier cannot be null, please provide a valid table identifier for the new iceberg table");
Preconditions.checkArgument(
deltaTableLocation != null,
"The delta lake table location cannot be null, please provide a valid location of the delta lake table to be snapshot");

String ctx = "delta lake snapshot target";
CatalogPlugin defaultCatalog = spark.sessionState().catalogManager().currentCatalog();
Spark3Util.CatalogAndIdentifier catalogAndIdent =
Spark3Util.catalogAndIdentifier(ctx, spark, newTableIdentifier, defaultCatalog);
return DeltaLakeToIcebergMigrationActionsProvider.defaultActions()
.snapshotDeltaLakeTable(deltaTableLocation)
.as(TableIdentifier.parse(catalogAndIdent.identifier().toString()))
.deltaLakeConfiguration(spark.sessionState().newHadoopConf())
.icebergCatalog(Spark3Util.loadIcebergCatalog(spark, catalogAndIdent.catalog().name()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.delta;

import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.AfterClass;
import org.junit.BeforeClass;

@SuppressWarnings("VisibilityModifier")
public abstract class SparkDeltaLakeSnapshotTestBase {
protected static TestHiveMetastore metastore = null;
protected static HiveConf hiveConf = null;
protected static SparkSession spark = null;

@BeforeClass
public static void startMetastoreAndSpark() {
SparkDeltaLakeSnapshotTestBase.metastore = new TestHiveMetastore();
metastore.start();
SparkDeltaLakeSnapshotTestBase.hiveConf = metastore.hiveConf();

SparkDeltaLakeSnapshotTestBase.spark =
SparkSession.builder()
.master("local[2]")
.config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
.config(
"spark.hadoop." + HiveConf.ConfVars.METASTOREURIS.varname,
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.enableHiveSupport()
.getOrCreate();
}

@AfterClass
public static void stopMetastoreAndSpark() throws Exception {
JonasJ-ap marked this conversation as resolved.
Show resolved Hide resolved
if (metastore != null) {
metastore.stop();
SparkDeltaLakeSnapshotTestBase.metastore = null;
}
if (spark != null) {
spark.stop();
SparkDeltaLakeSnapshotTestBase.spark = null;
}
}

public SparkDeltaLakeSnapshotTestBase(
String catalogName, String implementation, Map<String, String> config) {

spark.conf().set("spark.sql.catalog." + catalogName, implementation);
config.forEach(
(key, value) -> spark.conf().set("spark.sql.catalog." + catalogName + "." + key, value));
}
}
Loading