Skip to content

Commit

Permalink
Delta: Support Snapshot Delta Lake Table to Iceberg Table (#6449)
Browse files Browse the repository at this point in the history
Co-authored-by: ericlgoodman <erigood@amazon.com>
  • Loading branch information
JonasJ-ap and ericlgoodman authored Feb 7, 2023
1 parent 10df74a commit eeb055a
Show file tree
Hide file tree
Showing 15 changed files with 1,936 additions and 0 deletions.
115 changes: 115 additions & 0 deletions .github/workflows/delta-conversion-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: "Delta Conversion CI"
on:
push:
branches:
- 'master'
- '0.**'
tags:
- 'apache-iceberg-**'
pull_request:
paths-ignore:
- '.github/ISSUE_TEMPLATE/iceberg_bug_report.yml'
- '.github/workflows/python-ci.yml'
- '.github/workflows/flink-ci.yml'
- '.github/workflows/hive-ci.yml'
- '.gitignore'
- '.asf.yml'
- 'dev/**'
- 'mr/**'
- 'hive3/**'
- 'hive3-orc-bundle/**'
- 'hive-runtime/**'
- 'flink/**'
- 'pig/**'
- 'python/**'
- 'python_legacy/**'
- 'docs/**'
- 'open-api/**'
- 'format/**'
- '.gitattributes'
- 'README.md'
- 'CONTRIBUTING.md'
- 'LICENSE'
- 'NOTICE'

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: ${{ github.event_name == 'pull_request' }}

jobs:
delta-conversion-scala-2-12-tests:
runs-on: ubuntu-20.04
strategy:
matrix:
jvm: [8, 11]
env:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v3
- uses: actions/setup-java@v3
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
- uses: actions/cache@v3
with:
path: |
~/.gradle/caches
~/.gradle/wrapper
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
restore-keys: ${{ runner.os }}-gradle-
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.12 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
- uses: actions/upload-artifact@v3
if: failure()
with:
name: test logs
path: |
**/build/testlogs
delta-conversion-scala-2-13-tests:
runs-on: ubuntu-20.04
strategy:
matrix:
jvm: [ 8, 11 ]
env:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v3
- uses: actions/setup-java@v3
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
- uses: actions/cache@v3
with:
path: |
~/.gradle/caches
~/.gradle/wrapper
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
restore-keys: ${{ runner.os }}-gradle-
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.13 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
- uses: actions/upload-artifact@v3
if: failure()
with:
name: test logs
path: |
**/build/testlogs
74 changes: 74 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ plugins {
id 'nebula.dependency-recommender' version '11.0.0'
}

String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")
String sparkVersionsString = System.getProperty("sparkVersions") != null ? System.getProperty("sparkVersions") : System.getProperty("defaultSparkVersions")
List<String> sparkVersions = sparkVersionsString != null && !sparkVersionsString.isEmpty() ? sparkVersionsString.split(",") : []

try {
// apply these plugins in a try-catch block so that we can handle cases without .git directory
apply plugin: 'com.palantir.git-version'
Expand Down Expand Up @@ -447,6 +451,76 @@ project(':iceberg-aws') {
}
}

project(':iceberg-delta-lake') {
// use integration test since we can take advantages of spark 3.3 to read datafiles of delta lake table
// and create some tests involving sql query.
configurations {
integrationImplementation.extendsFrom testImplementation
integrationRuntime.extendsFrom testRuntimeOnly
}

dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
implementation project(':iceberg-common')
implementation project(':iceberg-core')
implementation project(':iceberg-parquet')
implementation "com.fasterxml.jackson.core:jackson-databind"

compileOnly "io.delta:delta-standalone_${scalaVersion}"

compileOnly("org.apache.hadoop:hadoop-common") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'javax.servlet', module: 'servlet-api'
exclude group: 'com.google.code.gson', module: 'gson'
}

// The newest version of delta-core uses Spark 3.3.*. Since its only for test, we do
// not need to include older version of delta-core
if (sparkVersions.contains("3.3")) {
integrationImplementation "io.delta:delta-core_${scalaVersion}"
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-3.3_${scalaVersion}")
integrationImplementation("org.apache.hadoop:hadoop-minicluster") {
exclude group: 'org.apache.avro', module: 'avro'
// to make sure netty libs only come from project(':iceberg-arrow')
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'io.netty', module: 'netty-common'
}
integrationImplementation project(path: ':iceberg-hive-metastore')
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
integrationImplementation("org.apache.spark:spark-hive_${scalaVersion}:3.3.1") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
exclude group: 'org.apache.parquet'
// to make sure netty libs only come from project(':iceberg-arrow')
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'io.netty', module: 'netty-common'
exclude group: 'org.roaringbitmap'
}
}
}

// The newest version of delta-core uses Spark 3.3.*. The integration test should only be built
// if iceberg-spark-3.3 is available
if (sparkVersions.contains("3.3")) {
sourceSets {
integration {
java.srcDir "$projectDir/src/integration/java"
resources.srcDir "$projectDir/src/integration/resources"
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
}
}

task integrationTest(type: Test) {
testClassesDirs = sourceSets.integration.output.classesDirs
classpath = sourceSets.integration.runtimeClasspath
}
check.dependsOn integrationTest
}
}

project(':iceberg-gcp') {
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.delta;

import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;

/** An example class shows how to use the delta lake migration actions in SparkContext. */
class DeltaLakeToIcebergMigrationSparkIntegration {

private DeltaLakeToIcebergMigrationSparkIntegration() {}

/**
* Example of how to use a {@link SparkSession}, a table identifier and a delta table location to
* construct an action for snapshotting the delta table to an iceberg table.
*
* @param spark a SparkSession with iceberg catalog configured.
* @param newTableIdentifier can be both 2 parts and 3 parts identifier, if it is 2 parts, the
* default spark catalog will be used
* @param deltaTableLocation the location of the delta table
* @return an instance of snapshot delta lake table action.
*/
static SnapshotDeltaLakeTable snapshotDeltaLakeTable(
SparkSession spark, String newTableIdentifier, String deltaTableLocation) {
Preconditions.checkArgument(
spark != null, "The SparkSession cannot be null, please provide a valid SparkSession");
Preconditions.checkArgument(
newTableIdentifier != null,
"The table identifier cannot be null, please provide a valid table identifier for the new iceberg table");
Preconditions.checkArgument(
deltaTableLocation != null,
"The delta lake table location cannot be null, please provide a valid location of the delta lake table to be snapshot");

String ctx = "delta lake snapshot target";
CatalogPlugin defaultCatalog = spark.sessionState().catalogManager().currentCatalog();
Spark3Util.CatalogAndIdentifier catalogAndIdent =
Spark3Util.catalogAndIdentifier(ctx, spark, newTableIdentifier, defaultCatalog);
return DeltaLakeToIcebergMigrationActionsProvider.defaultActions()
.snapshotDeltaLakeTable(deltaTableLocation)
.as(TableIdentifier.parse(catalogAndIdent.identifier().toString()))
.deltaLakeConfiguration(spark.sessionState().newHadoopConf())
.icebergCatalog(Spark3Util.loadIcebergCatalog(spark, catalogAndIdent.catalog().name()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.delta;

import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.AfterClass;
import org.junit.BeforeClass;

@SuppressWarnings("VisibilityModifier")
public abstract class SparkDeltaLakeSnapshotTestBase {
protected static TestHiveMetastore metastore = null;
protected static HiveConf hiveConf = null;
protected static SparkSession spark = null;

@BeforeClass
public static void startMetastoreAndSpark() {
SparkDeltaLakeSnapshotTestBase.metastore = new TestHiveMetastore();
metastore.start();
SparkDeltaLakeSnapshotTestBase.hiveConf = metastore.hiveConf();

SparkDeltaLakeSnapshotTestBase.spark =
SparkSession.builder()
.master("local[2]")
.config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
.config(
"spark.hadoop." + HiveConf.ConfVars.METASTOREURIS.varname,
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.enableHiveSupport()
.getOrCreate();
}

@AfterClass
public static void stopMetastoreAndSpark() throws Exception {
if (metastore != null) {
metastore.stop();
SparkDeltaLakeSnapshotTestBase.metastore = null;
}
if (spark != null) {
spark.stop();
SparkDeltaLakeSnapshotTestBase.spark = null;
}
}

public SparkDeltaLakeSnapshotTestBase(
String catalogName, String implementation, Map<String, String> config) {

spark.conf().set("spark.sql.catalog." + catalogName, implementation);
config.forEach(
(key, value) -> spark.conf().set("spark.sql.catalog." + catalogName + "." + key, value));
}
}
Loading

0 comments on commit eeb055a

Please sign in to comment.