Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added support to write iceberg tables #5989

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8c81883
Initial commit
malhotrashivam Aug 27, 2024
758c1f3
Added type info map and modified instructions class hierarchy
malhotrashivam Aug 28, 2024
48cb8d8
Minor tweaks to the instructions class hierarchy
malhotrashivam Aug 28, 2024
244bc99
Merged writeTable and appendTable into addPartition
malhotrashivam Aug 29, 2024
09340c2
Split IcebergParquetWriteInstructions into WriteInstr and ParquetWrit…
malhotrashivam Aug 30, 2024
33b60e2
Merge branch 'main' into sm-ice-write
malhotrashivam Sep 25, 2024
c70b50e
Resolving more conflicts
malhotrashivam Sep 25, 2024
cd278ab
Merge branch 'main' into sm-ice-write
malhotrashivam Sep 30, 2024
689e8a1
Added unit tests and moved Iceberg tests to Junit5
malhotrashivam Sep 30, 2024
d7f2c81
Preparing change for code review Part 1
malhotrashivam Oct 1, 2024
131a552
Preparing for review Part 2
malhotrashivam Oct 1, 2024
3021585
Added more unit tests
malhotrashivam Oct 1, 2024
9f82ba0
Review with Larry part 1
malhotrashivam Oct 3, 2024
cbae64e
Fix for failing job
malhotrashivam Oct 3, 2024
7de59b0
Review with Larry Part 2
malhotrashivam Oct 3, 2024
83a0b14
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 3, 2024
c83ddbd
Review with Devin Part 1
malhotrashivam Oct 7, 2024
f0f86cc
Fix for failing jobs
malhotrashivam Oct 7, 2024
adb21e9
Review with Devin Part 2
malhotrashivam Oct 8, 2024
744ce60
Review with Devin Part 3
malhotrashivam Oct 8, 2024
a8252ce
Review with Devin Part 4
malhotrashivam Oct 8, 2024
38f55f0
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 14, 2024
5a64faf
Minor tweaks
malhotrashivam Oct 14, 2024
ba70f1a
More tweaks
malhotrashivam Oct 14, 2024
96db353
Updated some comments
malhotrashivam Oct 14, 2024
6e2c233
Updated javadoc and added new tests
malhotrashivam Oct 15, 2024
de6eba0
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 17, 2024
0ebeba2
Review with Ryan Part 1
malhotrashivam Oct 17, 2024
31f46ba
Review with Ryan Part 2
malhotrashivam Oct 18, 2024
946def0
Fix for failing parquet reads
malhotrashivam Oct 18, 2024
bd8535c
Added more tests for writeDataFile
malhotrashivam Oct 18, 2024
78bd605
Added tests for on write callback
malhotrashivam Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions extensions/iceberg/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ dependencies {
compileOnly libs.autoservice
annotationProcessor libs.autoservice.compiler

testImplementation libs.junit4
testImplementation project(':engine-test-utils')

testImplementation libs.testcontainers
Expand All @@ -45,10 +44,20 @@ dependencies {
testRuntimeOnly libs.slf4j.simple
}

TestTools.addEngineOutOfBandTest(project)
test {
useJUnitPlatform {
excludeTags("testcontainers")
}
}

tasks.register('testOutOfBand', Test) {
useJUnitPlatform {
includeTags("testcontainers")
}

testOutOfBand.dependsOn Docker.registryTask(project, 'localstack')
testOutOfBand.systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack')
dependsOn Docker.registryTask(project, 'localstack')
systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack')

testOutOfBand.dependsOn Docker.registryTask(project, 'minio')
testOutOfBand.systemProperty 'testcontainers.minio.image', Docker.localImageName('minio')
dependsOn Docker.registryTask(project, 'minio')
systemProperty 'testcontainers.minio.image', Docker.localImageName('minio')
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.rest.RESTCatalog;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -16,12 +18,10 @@
import java.util.Map;

/**
* Tools for accessing tables in the Iceberg table format.
* Tools for accessing tables in the Iceberg table format from S3.
*/
@SuppressWarnings("unused")
public class IcebergToolsS3 extends IcebergTools {
private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO";

/**
* Create an Iceberg catalog adapter for a REST catalog backed by S3 storage. If {@code null} is provided for a
* value, the system defaults will be used.
Expand All @@ -48,11 +48,7 @@ public static IcebergCatalogAdapter createS3Rest(
// Set up the properties map for the Iceberg catalog
final Map<String, String> properties = new HashMap<>();

final RESTCatalog catalog = new RESTCatalog();

properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName());
properties.put(CatalogProperties.URI, catalogURI);
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
final Catalog catalog = new RESTCatalog();

// Configure the properties map from the Iceberg instructions.
if (!Strings.isNullOrEmpty(accessKeyId) && !Strings.isNullOrEmpty(secretAccessKey)) {
Expand All @@ -65,11 +61,7 @@ public static IcebergCatalogAdapter createS3Rest(
if (!Strings.isNullOrEmpty(endpointOverride)) {
properties.put(S3FileIOProperties.ENDPOINT, endpointOverride);
}

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.initialize(catalogName, properties);

return new IcebergCatalogAdapter(catalog, properties);
return createAdapterCommon(name, catalogURI, warehouseLocation, catalog, properties);
}

/**
Expand All @@ -91,12 +83,24 @@ public static IcebergCatalogAdapter createGlue(
// Set up the properties map for the Iceberg catalog
final Map<String, String> properties = new HashMap<>();

final GlueCatalog catalog = new GlueCatalog();
final Catalog catalog = new GlueCatalog();
return createAdapterCommon(name, catalogURI, warehouseLocation, catalog, properties);
}

private static IcebergCatalogAdapter createAdapterCommon(
@Nullable final String name,
@NotNull final String catalogURI,
@NotNull final String warehouseLocation,
@NotNull final Catalog catalog,
@NotNull final Map<String, String> properties) {
properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName());
properties.put(CatalogProperties.URI, catalogURI);
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);

// Following is needed to write new manifest files when writing new data.
// Not setting this will result in using ResolvingFileIO.
properties.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it a problem to use ResolvingFileIO? You will need to provide HadoopConf info.

Copy link
Contributor Author

@malhotrashivam malhotrashivam Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So from what I understood, ResolvingFileIO would add additional step of resolving which file IO to use.
And based on the file name, I thought we are sure here that its in S3. That's why I thought of using S3FileIO. Does that sound reasonable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your point, we clearly know that this should resolve to S3FileIO. In all other scenarios, though, we've trusted the Iceberg API to resolve correctly and I'd be happier to stick with that.

I don't feel strongly about this however.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this context, I'm happy to have specify S3FileIO specified. I think in general though, we are leaning away from providing these "pre-configured" entrypoints for the user, and prefer they go through the generic catalog creation? In which case I would argue that IcebergToolsS3 we might want to deprecate.


final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.initialize(catalogName, properties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@

import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers;
import org.junit.BeforeClass;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;

@Tag("testcontainers")
public class IcebergLocalStackTest extends IcebergToolsTest {

@BeforeClass
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
@BeforeAll
public static void initContainer() {
// ensure container is started so container startup time isn't associated with a specific test
SingletonContainers.LocalStack.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@
import io.deephaven.extensions.s3.testlib.SingletonContainers;
import io.deephaven.stats.util.OSUtil;
import org.junit.jupiter.api.Assumptions;
import org.junit.BeforeClass;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;

@Tag("testcontainers")
public class IcebergMinIOTest extends IcebergToolsTest {

@BeforeClass
public static void initContainer() {
@BeforeAll
static void initContainer() {
// TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X
Assumptions.assumeFalse(OSUtil.runningMacOS(), "OSUtil.runningMacOS()");
// ensure container is started so container startup time isn't associated with a specific test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg.util;

import io.deephaven.parquet.table.ParquetInstructions;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

class IcebergParquetWriteInstructionsTest {

@Test
void defaults() {
final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder().build();
assertThat(instructions.tableDefinition().isEmpty()).isTrue();
assertThat(instructions.dataInstructions().isEmpty()).isTrue();
assertThat(instructions.columnRenames().isEmpty()).isTrue();
assertThat(instructions.createTableIfNotExist()).isFalse();
assertThat(instructions.verifySchema()).isFalse();
assertThat(instructions.compressionCodecName()).isEqualTo("SNAPPY");
assertThat(instructions.maximumDictionaryKeys()).isEqualTo(1048576);
assertThat(instructions.maximumDictionarySize()).isEqualTo(1048576);
}

@Test
void testSetCreateTableIfNotExist() {
assertThat(IcebergParquetWriteInstructions.builder()
.createTableIfNotExist(true)
.build()
.createTableIfNotExist())
.isTrue();
}

@Test
void testSetVerifySchema() {
assertThat(IcebergParquetWriteInstructions.builder()
.verifySchema(true)
.build()
.verifySchema())
.isTrue();
}

@Test
public void testSetCompressionCodecName() {
assertThat(IcebergParquetWriteInstructions.builder()
.compressionCodecName("GZIP")
.build()
.compressionCodecName())
.isEqualTo("GZIP");
}

@Test
void testSetMaximumDictionaryKeys() {
assertThat(IcebergParquetWriteInstructions.builder()
.maximumDictionaryKeys(100)
.build()
.maximumDictionaryKeys())
.isEqualTo(100);
}

@Test
public void testSetMaximumDictionarySize() {
assertThat(IcebergParquetWriteInstructions.builder()
.maximumDictionarySize(100)
.build()
.maximumDictionarySize())
.isEqualTo(100);
}

@Test
public void testSetTargetPageSize() {
assertThat(IcebergParquetWriteInstructions.builder()
.targetPageSize(1024 * 1024)
.build()
.targetPageSize())
.isEqualTo(1024 * 1024);
}

@Test
void testMinMaximumDictionaryKeys() {
try {
IcebergParquetWriteInstructions.builder()
.maximumDictionaryKeys(-1)
.build();
} catch (IllegalArgumentException e) {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
assertThat(e).hasMessageContaining("maximumDictionaryKeys");
}
}

@Test
public void testMinMaximumDictionarySize() {
try {
IcebergParquetWriteInstructions.builder()
.maximumDictionarySize(-1)
.build();
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("maximumDictionarySize");
}
}

@Test
void testMinTargetPageSize() {
try {
IcebergParquetWriteInstructions.builder()
.targetPageSize(1024)
.build();
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("targetPageSize");
}
}

@Test
void toParquetInstructionTest() {
final IcebergParquetWriteInstructions icebergInstructions = IcebergParquetWriteInstructions.builder()
.compressionCodecName("GZIP")
.maximumDictionaryKeys(100)
.maximumDictionarySize(200)
.targetPageSize(1024 * 1024)
.build();
final Map<Integer, String> fieldIdToName = Map.of(2, "field2", 3, "field3");
final ParquetInstructions parquetInstructions = icebergInstructions.toParquetInstructions(
null, fieldIdToName);
assertThat(parquetInstructions.getCompressionCodecName()).isEqualTo("GZIP");
assertThat(parquetInstructions.getMaximumDictionaryKeys()).isEqualTo(100);
assertThat(parquetInstructions.getMaximumDictionarySize()).isEqualTo(200);
assertThat(parquetInstructions.getTargetPageSize()).isEqualTo(1024 * 1024);
assertThat(parquetInstructions.getFieldId("field1")).isEmpty();
assertThat(parquetInstructions.getFieldId("field2")).hasValue(2);
assertThat(parquetInstructions.getFieldId("field3")).hasValue(3);
assertThat(parquetInstructions.onWriteCompleted()).isEmpty();
}
}
Loading