Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added support to write iceberg tables #5989

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8c81883
Initial commit
malhotrashivam Aug 27, 2024
758c1f3
Added type info map and modified instructions class hierarchy
malhotrashivam Aug 28, 2024
48cb8d8
Minor tweaks to the instructions class hierarchy
malhotrashivam Aug 28, 2024
244bc99
Merged writeTable and appendTable into addPartition
malhotrashivam Aug 29, 2024
09340c2
Split IcebergParquetWriteInstructions into WriteInstr and ParquetWrit…
malhotrashivam Aug 30, 2024
33b60e2
Merge branch 'main' into sm-ice-write
malhotrashivam Sep 25, 2024
c70b50e
Resolving more conflicts
malhotrashivam Sep 25, 2024
cd278ab
Merge branch 'main' into sm-ice-write
malhotrashivam Sep 30, 2024
689e8a1
Added unit tests and moved Iceberg tests to Junit5
malhotrashivam Sep 30, 2024
d7f2c81
Preparing change for code review Part 1
malhotrashivam Oct 1, 2024
131a552
Preparing for review Part 2
malhotrashivam Oct 1, 2024
3021585
Added more unit tests
malhotrashivam Oct 1, 2024
9f82ba0
Review with Larry part 1
malhotrashivam Oct 3, 2024
cbae64e
Fix for failing job
malhotrashivam Oct 3, 2024
7de59b0
Review with Larry Part 2
malhotrashivam Oct 3, 2024
83a0b14
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 3, 2024
c83ddbd
Review with Devin Part 1
malhotrashivam Oct 7, 2024
f0f86cc
Fix for failing jobs
malhotrashivam Oct 7, 2024
adb21e9
Review with Devin Part 2
malhotrashivam Oct 8, 2024
744ce60
Review with Devin Part 3
malhotrashivam Oct 8, 2024
a8252ce
Review with Devin Part 4
malhotrashivam Oct 8, 2024
38f55f0
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 14, 2024
5a64faf
Minor tweaks
malhotrashivam Oct 14, 2024
ba70f1a
More tweaks
malhotrashivam Oct 14, 2024
96db353
Updated some comments
malhotrashivam Oct 14, 2024
6e2c233
Updated javadoc and added new tests
malhotrashivam Oct 15, 2024
de6eba0
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 17, 2024
0ebeba2
Review with Ryan Part 1
malhotrashivam Oct 17, 2024
31f46ba
Review with Ryan Part 2
malhotrashivam Oct 18, 2024
946def0
Fix for failing parquet reads
malhotrashivam Oct 18, 2024
bd8535c
Added more tests for writeDataFile
malhotrashivam Oct 18, 2024
78bd605
Added tests for on write callback
malhotrashivam Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,8 @@ public static IcebergCatalogAdapter createS3Rest(
@Nullable final String secretAccessKey,
@Nullable final String endpointOverride) {

// Set up the properties map for the Iceberg catalog
// Set up the properties map for the Iceberg catalog, and configure it from Iceberg instructions.
final Map<String, String> properties = new HashMap<>();

final Catalog catalog = new RESTCatalog();

// Configure the properties map from the Iceberg instructions.
if (!Strings.isNullOrEmpty(accessKeyId) && !Strings.isNullOrEmpty(secretAccessKey)) {
properties.put(S3FileIOProperties.ACCESS_KEY_ID, accessKeyId);
properties.put(S3FileIOProperties.SECRET_ACCESS_KEY, secretAccessKey);
Expand All @@ -61,6 +57,8 @@ public static IcebergCatalogAdapter createS3Rest(
if (!Strings.isNullOrEmpty(endpointOverride)) {
properties.put(S3FileIOProperties.ENDPOINT, endpointOverride);
}

final Catalog catalog = new RESTCatalog();
return createAdapterCommon(name, catalogURI, warehouseLocation, catalog, properties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@
package io.deephaven.iceberg.util;

import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;

import static org.apache.iceberg.aws.AwsClientProperties.CLIENT_REGION;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ACCESS_KEY_ID;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;

@Tag("testcontainers")
public class IcebergLocalStackTest extends IcebergToolsTest {
class IcebergLocalStackTest extends IcebergToolsTest {

@BeforeAll
public static void initContainer() {
static void initContainer() {
// ensure container is started so container startup time isn't associated with a specific test
LocalStack.init();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,13 @@

import java.util.Map;

import static org.apache.iceberg.aws.AwsClientProperties.CLIENT_REGION;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ACCESS_KEY_ID;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;

@Tag("testcontainers")
public class IcebergMinIOTest extends IcebergToolsTest {
class IcebergMinIOTest extends IcebergToolsTest {

@BeforeAll
static void initContainer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.Optional;

import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

class IcebergParquetWriteInstructionsTest {
Expand All @@ -17,9 +19,9 @@ void defaults() {
final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder().build();
assertThat(instructions.tableDefinition().isEmpty()).isTrue();
assertThat(instructions.dataInstructions().isEmpty()).isTrue();
assertThat(instructions.columnRenames().isEmpty()).isTrue();
assertThat(instructions.dhToIcebergColumnRenames().isEmpty()).isTrue();
assertThat(instructions.createTableIfNotExist()).isFalse();
assertThat(instructions.verifySchema()).isFalse();
assertThat(instructions.verifySchema()).isEmpty();
assertThat(instructions.compressionCodecName()).isEqualTo("SNAPPY");
assertThat(instructions.maximumDictionaryKeys()).isEqualTo(1048576);
assertThat(instructions.maximumDictionarySize()).isEqualTo(1048576);
Expand All @@ -40,11 +42,11 @@ void testSetVerifySchema() {
.verifySchema(true)
.build()
.verifySchema())
.isTrue();
.isEqualTo(Optional.of(true));
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
public void testSetCompressionCodecName() {
void testSetCompressionCodecName() {
assertThat(IcebergParquetWriteInstructions.builder()
.compressionCodecName("GZIP")
.build()
Expand All @@ -62,7 +64,7 @@ void testSetMaximumDictionaryKeys() {
}

@Test
public void testSetMaximumDictionarySize() {
void testSetMaximumDictionarySize() {
assertThat(IcebergParquetWriteInstructions.builder()
.maximumDictionarySize(100)
.build()
Expand All @@ -71,31 +73,34 @@ public void testSetMaximumDictionarySize() {
}

@Test
public void testSetTargetPageSize() {
void testSetTargetPageSize() {
assertThat(IcebergParquetWriteInstructions.builder()
.targetPageSize(1024 * 1024)
.targetPageSize(1 << 20)
.build()
.targetPageSize())
.isEqualTo(1024 * 1024);
.isEqualTo(1 << 20);
}

@Test
void testMinMaximumDictionaryKeys() {

try {
IcebergParquetWriteInstructions.builder()
.maximumDictionaryKeys(-1)
.build();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("maximumDictionaryKeys");
}
}

@Test
public void testMinMaximumDictionarySize() {
void testMinMaximumDictionarySize() {
try {
IcebergParquetWriteInstructions.builder()
.maximumDictionarySize(-1)
.build();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("maximumDictionarySize");
}
Expand All @@ -107,26 +112,63 @@ void testMinTargetPageSize() {
IcebergParquetWriteInstructions.builder()
.targetPageSize(1024)
.build();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("targetPageSize");
}
}

@Test
void testSetToIcebergColumnRename() {
final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder()
.putDhToIcebergColumnRenames("dh1", "ice1")
.putDhToIcebergColumnRenames("dh2", "ice2")
.build();
assertThat(instructions.dhToIcebergColumnRenames().size()).isEqualTo(2);
assertThat(instructions.dhToIcebergColumnRenames().get("dh1")).isEqualTo("ice1");
assertThat(instructions.dhToIcebergColumnRenames().get("dh2")).isEqualTo("ice2");

final IcebergParquetWriteInstructions instructions2 = IcebergParquetWriteInstructions.builder()
.putAllDhToIcebergColumnRenames(Map.of(
"dh1", "ice1",
"dh2", "ice2",
"dh3", "ice3"))
.build();
assertThat(instructions2.dhToIcebergColumnRenames().size()).isEqualTo(3);
assertThat(instructions2.dhToIcebergColumnRenames().get("dh1")).isEqualTo("ice1");
assertThat(instructions2.dhToIcebergColumnRenames().get("dh2")).isEqualTo("ice2");
assertThat(instructions2.dhToIcebergColumnRenames().get("dh3")).isEqualTo("ice3");
}

@Test
void testToIcebergColumnRenameUniqueness() {
try {
IcebergParquetWriteInstructions.builder()
.putDhToIcebergColumnRenames("dh1", "ice1")
.putDhToIcebergColumnRenames("dh2", "ice1")
.build();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (final IllegalArgumentException e) {
assertThat(e).hasMessageContaining("Duplicate values in column renames");
}
}

@Test
void toParquetInstructionTest() {
final IcebergParquetWriteInstructions icebergInstructions = IcebergParquetWriteInstructions.builder()
final IcebergParquetWriteInstructions writeInstructions = IcebergParquetWriteInstructions.builder()
.compressionCodecName("GZIP")
.maximumDictionaryKeys(100)
.maximumDictionarySize(200)
.targetPageSize(1024 * 1024)
.targetPageSize(1 << 20)
.build();
final Map<Integer, String> fieldIdToName = Map.of(2, "field2", 3, "field3");
final ParquetInstructions parquetInstructions = icebergInstructions.toParquetInstructions(
final ParquetInstructions parquetInstructions = writeInstructions.toParquetInstructions(
null, fieldIdToName);

assertThat(parquetInstructions.getCompressionCodecName()).isEqualTo("GZIP");
assertThat(parquetInstructions.getMaximumDictionaryKeys()).isEqualTo(100);
assertThat(parquetInstructions.getMaximumDictionarySize()).isEqualTo(200);
assertThat(parquetInstructions.getTargetPageSize()).isEqualTo(1024 * 1024);
assertThat(parquetInstructions.getTargetPageSize()).isEqualTo(1 << 20);
assertThat(parquetInstructions.getFieldId("field1")).isEmpty();
assertThat(parquetInstructions.getFieldId("field2")).hasValue(2);
assertThat(parquetInstructions.getFieldId("field3")).hasValue(3);
Expand Down
Loading