diff --git a/extensions/iceberg/s3/build.gradle b/extensions/iceberg/s3/build.gradle index dfb53c52388..9923ea10fad 100644 --- a/extensions/iceberg/s3/build.gradle +++ b/extensions/iceberg/s3/build.gradle @@ -29,7 +29,6 @@ dependencies { compileOnly libs.autoservice annotationProcessor libs.autoservice.compiler - testImplementation libs.junit4 testImplementation project(':engine-test-utils') testImplementation libs.testcontainers @@ -45,10 +44,20 @@ dependencies { testRuntimeOnly libs.slf4j.simple } -TestTools.addEngineOutOfBandTest(project) +test { + useJUnitPlatform { + excludeTags("testcontainers") + } +} + +tasks.register('testOutOfBand', Test) { + useJUnitPlatform { + includeTags("testcontainers") + } -testOutOfBand.dependsOn Docker.registryTask(project, 'localstack') -testOutOfBand.systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack') + dependsOn Docker.registryTask(project, 'localstack') + systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack') -testOutOfBand.dependsOn Docker.registryTask(project, 'minio') -testOutOfBand.systemProperty 'testcontainers.minio.image', Docker.localImageName('minio') \ No newline at end of file + dependsOn Docker.registryTask(project, 'minio') + systemProperty 'testcontainers.minio.image', Docker.localImageName('minio') +} diff --git a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java index e82c4a23f81..462bd6a4d9c 100644 --- a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java +++ b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java @@ -7,7 +7,9 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.aws.AwsClientProperties; import org.apache.iceberg.aws.glue.GlueCatalog; +import org.apache.iceberg.aws.s3.S3FileIO; import org.apache.iceberg.aws.s3.S3FileIOProperties; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.rest.RESTCatalog; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -16,12 +18,10 @@ import java.util.Map; /** - * Tools for accessing tables in the Iceberg table format. + * Tools for accessing tables in the Iceberg table format from S3. */ @SuppressWarnings("unused") public class IcebergToolsS3 extends IcebergTools { - private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO"; - /** * Create an Iceberg catalog adapter for a REST catalog backed by S3 storage. If {@code null} is provided for a * value, the system defaults will be used. @@ -47,14 +47,6 @@ public static IcebergCatalogAdapter createS3Rest( // Set up the properties map for the Iceberg catalog final Map properties = new HashMap<>(); - - final RESTCatalog catalog = new RESTCatalog(); - - properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName()); - properties.put(CatalogProperties.URI, catalogURI); - properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation); - - // Configure the properties map from the Iceberg instructions. if (!Strings.isNullOrEmpty(accessKeyId) && !Strings.isNullOrEmpty(secretAccessKey)) { properties.put(S3FileIOProperties.ACCESS_KEY_ID, accessKeyId); properties.put(S3FileIOProperties.SECRET_ACCESS_KEY, secretAccessKey); @@ -66,9 +58,8 @@ public static IcebergCatalogAdapter createS3Rest( properties.put(S3FileIOProperties.ENDPOINT, endpointOverride); } - final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI; - catalog.initialize(catalogName, properties); - return IcebergCatalogAdapter.of(catalog); + final Catalog catalog = new RESTCatalog(); + return createAdapterCommon(name, catalogURI, warehouseLocation, catalog, properties); } /** @@ -90,15 +81,27 @@ public static IcebergCatalogAdapter createGlue( // Set up the properties map for the Iceberg catalog final Map properties = new HashMap<>(); - final GlueCatalog catalog = new GlueCatalog(); + final Catalog catalog = new GlueCatalog(); + return createAdapterCommon(name, catalogURI, warehouseLocation, catalog, properties); + } + private static IcebergCatalogAdapter createAdapterCommon( + @Nullable final String name, + @NotNull final String catalogURI, + @NotNull final String warehouseLocation, + @NotNull final Catalog catalog, + @NotNull final Map properties) { properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName()); properties.put(CatalogProperties.URI, catalogURI); properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation); + // Following is needed to write new manifest files when writing new data. + // Not setting this will result in using ResolvingFileIO. + properties.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName()); + final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI; catalog.initialize(catalogName, properties); - return new IcebergCatalogAdapter(catalog, properties); + return IcebergCatalogAdapter.of(catalog, properties); } } diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java index e1c2baa5c7d..374b1389a29 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java @@ -4,8 +4,9 @@ package io.deephaven.iceberg.util; import io.deephaven.extensions.s3.S3Instructions.Builder; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack; -import org.junit.BeforeClass; import software.amazon.awssdk.services.s3.S3AsyncClient; import java.util.Map; @@ -15,10 +16,11 @@ import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT; import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY; -public class IcebergLocalStackTest extends IcebergToolsTest { +@Tag("testcontainers") +class IcebergLocalStackTest extends IcebergToolsTest { - @BeforeClass - public static void initContainer() { + @BeforeAll + static void initContainer() { // ensure container is started so container startup time isn't associated with a specific test LocalStack.init(); } diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergMinIOTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergMinIOTest.java index 9adb98610ca..6b24d68399a 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergMinIOTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergMinIOTest.java @@ -7,7 +7,8 @@ import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO; import io.deephaven.stats.util.OSUtil; import org.junit.jupiter.api.Assumptions; -import org.junit.BeforeClass; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; import software.amazon.awssdk.services.s3.S3AsyncClient; import java.util.Map; @@ -17,10 +18,11 @@ import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT; import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY; -public class IcebergMinIOTest extends IcebergToolsTest { +@Tag("testcontainers") +class IcebergMinIOTest extends IcebergToolsTest { - @BeforeClass - public static void initContainer() { + @BeforeAll + static void initContainer() { // TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X Assumptions.assumeFalse(OSUtil.runningMacOS(), "OSUtil.runningMacOS()"); // ensure container is started so container startup time isn't associated with a specific test diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergParquetWriteInstructionsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergParquetWriteInstructionsTest.java new file mode 100644 index 00000000000..d24a2691ccf --- /dev/null +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergParquetWriteInstructionsTest.java @@ -0,0 +1,176 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import io.deephaven.parquet.table.ParquetInstructions; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +class IcebergParquetWriteInstructionsTest { + + @Test + void defaults() { + final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder().build(); + assertThat(instructions.tableDefinition().isEmpty()).isTrue(); + assertThat(instructions.dataInstructions().isEmpty()).isTrue(); + assertThat(instructions.dhToIcebergColumnRenames().isEmpty()).isTrue(); + assertThat(instructions.createTableIfNotExist()).isFalse(); + assertThat(instructions.verifySchema()).isEmpty(); + assertThat(instructions.compressionCodecName()).isEqualTo("SNAPPY"); + assertThat(instructions.maximumDictionaryKeys()).isEqualTo(1048576); + assertThat(instructions.maximumDictionarySize()).isEqualTo(1048576); + } + + @Test + void testSetCreateTableIfNotExist() { + assertThat(IcebergParquetWriteInstructions.builder() + .createTableIfNotExist(true) + .build() + .createTableIfNotExist()) + .isTrue(); + } + + @Test + void testSetVerifySchema() { + assertThat(IcebergParquetWriteInstructions.builder() + .verifySchema(true) + .build() + .verifySchema()) + .hasValue(true); + } + + @Test + void testSetCompressionCodecName() { + assertThat(IcebergParquetWriteInstructions.builder() + .compressionCodecName("GZIP") + .build() + .compressionCodecName()) + .isEqualTo("GZIP"); + } + + @Test + void testSetMaximumDictionaryKeys() { + assertThat(IcebergParquetWriteInstructions.builder() + .maximumDictionaryKeys(100) + .build() + .maximumDictionaryKeys()) + .isEqualTo(100); + } + + @Test + void testSetMaximumDictionarySize() { + assertThat(IcebergParquetWriteInstructions.builder() + .maximumDictionarySize(100) + .build() + .maximumDictionarySize()) + .isEqualTo(100); + } + + @Test + void testSetTargetPageSize() { + assertThat(IcebergParquetWriteInstructions.builder() + .targetPageSize(1 << 20) + .build() + .targetPageSize()) + .isEqualTo(1 << 20); + } + + @Test + void testMinMaximumDictionaryKeys() { + + try { + IcebergParquetWriteInstructions.builder() + .maximumDictionaryKeys(-1) + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("maximumDictionaryKeys"); + } + } + + @Test + void testMinMaximumDictionarySize() { + try { + IcebergParquetWriteInstructions.builder() + .maximumDictionarySize(-1) + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("maximumDictionarySize"); + } + } + + @Test + void testMinTargetPageSize() { + try { + IcebergParquetWriteInstructions.builder() + .targetPageSize(1024) + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("targetPageSize"); + } + } + + @Test + void testSetToIcebergColumnRename() { + final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder() + .putDhToIcebergColumnRenames("dh1", "ice1") + .putDhToIcebergColumnRenames("dh2", "ice2") + .build(); + assertThat(instructions.dhToIcebergColumnRenames().size()).isEqualTo(2); + assertThat(instructions.dhToIcebergColumnRenames().get("dh1")).isEqualTo("ice1"); + assertThat(instructions.dhToIcebergColumnRenames().get("dh2")).isEqualTo("ice2"); + + final IcebergParquetWriteInstructions instructions2 = IcebergParquetWriteInstructions.builder() + .putAllDhToIcebergColumnRenames(Map.of( + "dh1", "ice1", + "dh2", "ice2", + "dh3", "ice3")) + .build(); + assertThat(instructions2.dhToIcebergColumnRenames().size()).isEqualTo(3); + assertThat(instructions2.dhToIcebergColumnRenames().get("dh1")).isEqualTo("ice1"); + assertThat(instructions2.dhToIcebergColumnRenames().get("dh2")).isEqualTo("ice2"); + assertThat(instructions2.dhToIcebergColumnRenames().get("dh3")).isEqualTo("ice3"); + } + + @Test + void testToIcebergColumnRenameUniqueness() { + try { + IcebergParquetWriteInstructions.builder() + .putDhToIcebergColumnRenames("dh1", "ice1") + .putDhToIcebergColumnRenames("dh2", "ice1") + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (final IllegalArgumentException e) { + assertThat(e).hasMessageContaining("Duplicate values in column renames"); + } + } + + @Test + void toParquetInstructionTest() { + final IcebergParquetWriteInstructions writeInstructions = IcebergParquetWriteInstructions.builder() + .compressionCodecName("GZIP") + .maximumDictionaryKeys(100) + .maximumDictionarySize(200) + .targetPageSize(1 << 20) + .build(); + final Map fieldIdToName = Map.of(2, "field2", 3, "field3"); + final ParquetInstructions parquetInstructions = writeInstructions.toParquetInstructions( + null, fieldIdToName); + + assertThat(parquetInstructions.getCompressionCodecName()).isEqualTo("GZIP"); + assertThat(parquetInstructions.getMaximumDictionaryKeys()).isEqualTo(100); + assertThat(parquetInstructions.getMaximumDictionarySize()).isEqualTo(200); + assertThat(parquetInstructions.getTargetPageSize()).isEqualTo(1 << 20); + assertThat(parquetInstructions.getFieldId("field1")).isEmpty(); + assertThat(parquetInstructions.getFieldId("field2")).hasValue(2); + assertThat(parquetInstructions.getFieldId("field3")).hasValue(3); + assertThat(parquetInstructions.onWriteCompleted()).isEmpty(); + } +} diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java index eb1640f07c2..529309d7395 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java @@ -12,16 +12,15 @@ import io.deephaven.engine.testutil.junit4.EngineCleanup; import io.deephaven.extensions.s3.S3Instructions; import io.deephaven.iceberg.TestCatalog.IcebergTestCatalog; -import io.deephaven.test.types.OutOfBandTest; +import io.deephaven.iceberg.base.IcebergUtils; import org.apache.iceberg.Snapshot; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.experimental.categories.Category; -import org.junit.Test; +import org.apache.iceberg.types.Type; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; @@ -32,6 +31,7 @@ import java.io.File; import java.math.BigDecimal; +import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; @@ -46,8 +46,7 @@ import static io.deephaven.iceberg.util.IcebergCatalogAdapter.SNAPSHOT_DEFINITION; import static io.deephaven.iceberg.util.IcebergCatalogAdapter.TABLES_DEFINITION; -@Category(OutOfBandTest.class) -public abstract class IcebergToolsTest { +abstract class IcebergToolsTest { private static final TableDefinition SALES_SINGLE_DEFINITION = TableDefinition.of( ColumnDefinition.ofString("Region"), @@ -95,7 +94,7 @@ public abstract class IcebergToolsTest { ColumnDefinition.ofString("ColumnType"), ColumnDefinition.ofBoolean("IsPartitioning")); - IcebergInstructions instructions; + private IcebergReadInstructions instructions; public abstract S3AsyncClient s3AsyncClient(); @@ -111,11 +110,11 @@ public abstract class IcebergToolsTest { private String warehousePath; private Catalog resourceCatalog; - @Rule - public final EngineCleanup framework = new EngineCleanup(); + private final EngineCleanup engineCleanup = new EngineCleanup(); - @Before - public void setUp() throws ExecutionException, InterruptedException { + @BeforeEach + void setUp() throws Exception { + engineCleanup.setUp(); bucket = "warehouse"; asyncClient = s3AsyncClient(); asyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).get(); @@ -127,19 +126,20 @@ public void setUp() throws ExecutionException, InterruptedException { final S3Instructions s3Instructions = s3Instructions(S3Instructions.builder()).build(); - instructions = IcebergInstructions.builder() + instructions = IcebergReadInstructions.builder() .dataInstructions(s3Instructions) .build(); } - @After - public void tearDown() throws ExecutionException, InterruptedException { + @AfterEach + void tearDown() throws Exception { for (String key : keys) { asyncClient.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build()).get(); } keys.clear(); asyncClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()).get(); asyncClient.close(); + engineCleanup.tearDown(); } private void uploadFiles(final File root, final String prefixToRemove) @@ -189,7 +189,7 @@ private void uploadSalesRenamed() throws ExecutionException, InterruptedExceptio } @Test - public void testListNamespaces() { + void testListNamespaces() { final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog); final Collection namespaces = adapter.listNamespaces(); @@ -206,7 +206,7 @@ public void testListNamespaces() { } @Test - public void testListTables() { + void testListTables() { final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog); final Namespace ns = Namespace.of("sales"); @@ -230,7 +230,7 @@ public void testListTables() { } @Test - public void testListSnapshots() { + void testListSnapshots() { final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog); final TLongArrayList snapshotIds = new TLongArrayList(); @@ -256,7 +256,7 @@ public void testListSnapshots() { } @Test - public void testOpenTableA() throws ExecutionException, InterruptedException, TimeoutException { + void testOpenTableA() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesPartitioned(); final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog); @@ -299,7 +299,7 @@ public void testOpenTableB() throws ExecutionException, InterruptedException, Ti } @Test - public void testOpenTableC() throws ExecutionException, InterruptedException, TimeoutException { + void testOpenTableC() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesSingle(); final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog); @@ -321,7 +321,7 @@ public void testOpenTableC() throws ExecutionException, InterruptedException, Ti } @Test - public void testOpenTableS3Only() throws ExecutionException, InterruptedException, TimeoutException { + void testOpenTableS3Only() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesPartitioned(); final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog); @@ -336,10 +336,10 @@ public void testOpenTableS3Only() throws ExecutionException, InterruptedExceptio } @Test - public void testOpenTableDefinition() throws ExecutionException, InterruptedException, TimeoutException { + void testOpenTableDefinition() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesPartitioned(); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(SALES_PARTITIONED_DEFINITION) .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -356,7 +356,7 @@ public void testOpenTableDefinition() throws ExecutionException, InterruptedExce } @Test - public void testOpenTablePartitionTypeException() { + void testOpenTablePartitionTypeException() { final TableDefinition tableDef = TableDefinition.of( ColumnDefinition.ofLong("year").withPartitioning(), ColumnDefinition.ofInt("month").withPartitioning(), @@ -366,7 +366,7 @@ public void testOpenTablePartitionTypeException() { ColumnDefinition.ofLong("Unit_Price"), ColumnDefinition.ofTime("Order_Date")); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(tableDef) .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -390,7 +390,7 @@ public void testOpenTablePartitionTypeException() { } @Test - public void testOpenTableDefinitionRename() throws ExecutionException, InterruptedException, TimeoutException { + void testOpenTableDefinitionRename() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesPartitioned(); final TableDefinition renamed = TableDefinition.of( @@ -402,7 +402,7 @@ public void testOpenTableDefinitionRename() throws ExecutionException, Interrupt ColumnDefinition.ofDouble("UnitPrice"), ColumnDefinition.ofTime("OrderDate")); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(renamed) .dataInstructions(instructions.dataInstructions().get()) .putColumnRenames("Region", "RegionName") @@ -426,7 +426,7 @@ public void testOpenTableDefinitionRename() throws ExecutionException, Interrupt } @Test - public void testSkippedPartitioningColumn() throws ExecutionException, InterruptedException, TimeoutException { + void testSkippedPartitioningColumn() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesPartitioned(); final TableDefinition tableDef = TableDefinition.of( @@ -438,7 +438,7 @@ public void testSkippedPartitioningColumn() throws ExecutionException, Interrupt ColumnDefinition.ofDouble("Unit_Price"), ColumnDefinition.ofTime("Order_Date")); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(tableDef) .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -455,7 +455,7 @@ public void testSkippedPartitioningColumn() throws ExecutionException, Interrupt } @Test - public void testReorderedPartitioningColumn() throws ExecutionException, InterruptedException, TimeoutException { + void testReorderedPartitioningColumn() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesPartitioned(); final TableDefinition tableDef = TableDefinition.of( @@ -467,7 +467,7 @@ public void testReorderedPartitioningColumn() throws ExecutionException, Interru ColumnDefinition.ofDouble("Unit_Price"), ColumnDefinition.ofTime("Order_Date")); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(tableDef) .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -484,10 +484,10 @@ public void testReorderedPartitioningColumn() throws ExecutionException, Interru } @Test - public void testZeroPartitioningColumns() throws ExecutionException, InterruptedException, TimeoutException { + void testZeroPartitioningColumns() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesPartitioned(); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(SALES_MULTI_DEFINITION) .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -504,7 +504,7 @@ public void testZeroPartitioningColumns() throws ExecutionException, Interrupted } @Test - public void testIncorrectPartitioningColumns() throws ExecutionException, InterruptedException, TimeoutException { + void testIncorrectPartitioningColumns() throws ExecutionException, InterruptedException, TimeoutException { final TableDefinition tableDef = TableDefinition.of( ColumnDefinition.ofInt("month").withPartitioning(), ColumnDefinition.ofInt("year").withPartitioning(), @@ -514,7 +514,7 @@ public void testIncorrectPartitioningColumns() throws ExecutionException, Interr ColumnDefinition.ofDouble("Unit_Price"), ColumnDefinition.ofTime("Order_Date")); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(tableDef) .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -539,7 +539,7 @@ public void testIncorrectPartitioningColumns() throws ExecutionException, Interr } @Test - public void testMissingPartitioningColumns() { + void testMissingPartitioningColumns() { final TableDefinition tableDef = TableDefinition.of( ColumnDefinition.ofInt("__year").withPartitioning(), // Incorrect name ColumnDefinition.ofInt("__month").withPartitioning(), // Incorrect name @@ -549,7 +549,7 @@ public void testMissingPartitioningColumns() { ColumnDefinition.ofLong("Unit_Price"), ColumnDefinition.ofTime("Order_Date")); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(tableDef) .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -573,10 +573,10 @@ public void testMissingPartitioningColumns() { } @Test - public void testOpenTableColumnRename() throws ExecutionException, InterruptedException, TimeoutException { + void testOpenTableColumnRename() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesPartitioned(); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) .putColumnRenames("Region", "RegionName") .putColumnRenames("Item_Type", "ItemType") @@ -593,10 +593,10 @@ public void testOpenTableColumnRename() throws ExecutionException, InterruptedEx } @Test - public void testOpenTableColumnLegalization() throws ExecutionException, InterruptedException, TimeoutException { + void testOpenTableColumnLegalization() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesRenamed(); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -612,11 +612,11 @@ public void testOpenTableColumnLegalization() throws ExecutionException, Interru } @Test - public void testOpenTableColumnLegalizationRename() + void testOpenTableColumnLegalizationRename() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesRenamed(); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) .putColumnRenames("Item&Type", "Item_Type") .putColumnRenames("Units/Sold", "Units_Sold") @@ -641,12 +641,12 @@ public void testOpenTableColumnLegalizationRename() } @Test - public void testOpenTableColumnLegalizationPartitionException() { + void testOpenTableColumnLegalizationPartitionException() { final TableDefinition tableDef = TableDefinition.of( ColumnDefinition.ofInt("Year").withPartitioning(), ColumnDefinition.ofInt("Month").withPartitioning()); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(tableDef) .putColumnRenames("Year", "Current Year") .putColumnRenames("Month", "Current Month") @@ -672,11 +672,11 @@ public void testOpenTableColumnLegalizationPartitionException() { } @Test - public void testOpenTableColumnRenamePartitioningColumns() + void testOpenTableColumnRenamePartitioningColumns() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesPartitioned(); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) .putColumnRenames("VendorID", "vendor_id") .putColumnRenames("month", "__month") @@ -704,7 +704,7 @@ public void testOpenTableColumnRenamePartitioningColumns() } @Test - public void testOpenTableSnapshot() throws ExecutionException, InterruptedException, TimeoutException { + void testOpenTableSnapshot() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesMulti(); final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog); @@ -736,7 +736,7 @@ public void testOpenTableSnapshot() throws ExecutionException, InterruptedExcept } @Test - public void testOpenTableSnapshotByID() throws ExecutionException, InterruptedException, TimeoutException { + void testOpenTableSnapshotByID() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesMulti(); final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog); @@ -783,7 +783,7 @@ public void testOpenTableSnapshotByID() throws ExecutionException, InterruptedEx } @Test - public void testOpenAllTypesTable() throws ExecutionException, InterruptedException, TimeoutException { + void testOpenAllTypesTable() throws ExecutionException, InterruptedException, TimeoutException { uploadAllTypes(); final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog); @@ -798,7 +798,7 @@ public void testOpenAllTypesTable() throws ExecutionException, InterruptedExcept } @Test - public void testTableDefinition() { + void testTableDefinition() { final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog); final Namespace ns = Namespace.of("sales"); @@ -823,7 +823,7 @@ public void testTableDefinition() { } @Test - public void testTableDefinitionTable() { + void testTableDefinitionTable() { final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog); final Namespace ns = Namespace.of("sales"); @@ -856,10 +856,10 @@ public void testTableDefinitionTable() { } @Test - public void testTableDefinitionWithInstructions() { + void testTableDefinitionWithInstructions() { final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog); - IcebergInstructions localInstructions = IcebergInstructions.builder() + IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) .putColumnRenames("Region", "Area") .putColumnRenames("Item_Type", "ItemType") @@ -886,7 +886,7 @@ public void testTableDefinitionWithInstructions() { ColumnDefinition.ofString("Item_Type"), ColumnDefinition.ofTime("Order_Date")); - localInstructions = IcebergInstructions.builder() + localInstructions = IcebergReadInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) .tableDefinition(userTableDef) .build(); @@ -895,4 +895,23 @@ public void testTableDefinitionWithInstructions() { tableDef = adapter.getTableDefinition("sales.sales_multi", localInstructions); Assert.equals(tableDef, "tableDef", userTableDef); } + + @Test + void testConvertToIcebergTypeAndBack() { + final Class[] javaTypes = { + Boolean.class, double.class, float.class, int.class, long.class, String.class, Instant.class, + LocalDateTime.class, LocalDate.class, LocalTime.class, byte[].class + }; + + for (final Class javaType : javaTypes) { + // Java type -> Iceberg type + final Type icebergType = IcebergUtils.convertToIcebergType(javaType); + + // Iceberg type -> Deephaven type + final io.deephaven.qst.type.Type deephavenType = IcebergUtils.convertToDHType(icebergType); + + // Deephaven type == Java type + Assert.eq(javaType, javaType.getName(), deephavenType.clazz(), deephavenType.clazz().getName()); + } + } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java new file mode 100644 index 00000000000..2494dbf2c90 --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java @@ -0,0 +1,327 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.base; + +import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.iceberg.util.IcebergWriteInstructions; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public final class IcebergUtils { + + private static final Map, Type> DH_TO_ICEBERG_TYPE_MAP = new HashMap<>(); + + static { + DH_TO_ICEBERG_TYPE_MAP.put(Boolean.class, Types.BooleanType.get()); + DH_TO_ICEBERG_TYPE_MAP.put(double.class, Types.DoubleType.get()); + DH_TO_ICEBERG_TYPE_MAP.put(float.class, Types.FloatType.get()); + DH_TO_ICEBERG_TYPE_MAP.put(int.class, Types.IntegerType.get()); + DH_TO_ICEBERG_TYPE_MAP.put(long.class, Types.LongType.get()); + DH_TO_ICEBERG_TYPE_MAP.put(String.class, Types.StringType.get()); + DH_TO_ICEBERG_TYPE_MAP.put(Instant.class, Types.TimestampType.withZone()); + DH_TO_ICEBERG_TYPE_MAP.put(LocalDateTime.class, Types.TimestampType.withoutZone()); + DH_TO_ICEBERG_TYPE_MAP.put(LocalDate.class, Types.DateType.get()); + DH_TO_ICEBERG_TYPE_MAP.put(LocalTime.class, Types.TimeType.get()); + DH_TO_ICEBERG_TYPE_MAP.put(byte[].class, Types.BinaryType.get()); + // TODO Add support for writing big decimals and lists + } + + /** + * Get a stream of all {@link DataFile} objects from the given {@link Table} and {@link Snapshot}. + * + * @param table The {@link Table} to retrieve data files for. + * @param snapshot The {@link Snapshot} to retrieve data files from. + * + * @return A stream of {@link DataFile} objects. + */ + public static Stream allDataFiles(@NotNull final Table table, @NotNull final Snapshot snapshot) { + return allManifests(table, snapshot).stream() + .map(manifestFile -> ManifestFiles.read(manifestFile, table.io())) + .flatMap(IcebergUtils::toStream); + } + + /** + * Retrieves a {@link List} of manifest files from the given {@link Table} and {@link Snapshot}. + * + * @param table The {@link Table} to retrieve manifest files for. + * @param snapshot The {@link Snapshot} to retrieve manifest files from. + * + * @return A {@link List} of {@link ManifestFile} objects. + * @throws TableDataException if there is an error retrieving the manifest files. + */ + static List allManifests(@NotNull final Table table, @NotNull final Snapshot snapshot) { + try { + return snapshot.allManifests(table.io()); + } catch (final RuntimeException e) { + throw new TableDataException( + String.format("%s:%d - error retrieving manifest files", table, snapshot.snapshotId()), e); + } + } + + private static Stream toStream(final CloseableIterable iterable) { + return StreamSupport.stream(iterable.spliterator(), false).onClose(() -> { + try { + iterable.close(); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + /** + * Convert an Iceberg data type to a Deephaven type. + * + * @param icebergType The Iceberg data type to be converted. + * @return The converted Deephaven type. + */ + public static io.deephaven.qst.type.Type convertToDHType(@NotNull final Type icebergType) { + final Type.TypeID typeId = icebergType.typeId(); + switch (typeId) { + case BOOLEAN: + return io.deephaven.qst.type.Type.booleanType().boxedType(); + case DOUBLE: + return io.deephaven.qst.type.Type.doubleType(); + case FLOAT: + return io.deephaven.qst.type.Type.floatType(); + case INTEGER: + return io.deephaven.qst.type.Type.intType(); + case LONG: + return io.deephaven.qst.type.Type.longType(); + case STRING: + return io.deephaven.qst.type.Type.stringType(); + case TIMESTAMP: + final Types.TimestampType timestampType = (Types.TimestampType) icebergType; + return timestampType.shouldAdjustToUTC() + ? io.deephaven.qst.type.Type.find(Instant.class) + : io.deephaven.qst.type.Type.find(LocalDateTime.class); + case DATE: + return io.deephaven.qst.type.Type.find(LocalDate.class); + case TIME: + return io.deephaven.qst.type.Type.find(LocalTime.class); + case DECIMAL: + return io.deephaven.qst.type.Type.find(BigDecimal.class); + case FIXED: // Fall through + case BINARY: + return io.deephaven.qst.type.Type.find(byte[].class); + case UUID: // Fall through + case STRUCT: // Fall through + case LIST: // Fall through + case MAP: // Fall through + default: + throw new TableDataException("Unsupported iceberg column type " + typeId.name()); + } + } + + /** + * Convert a Deephaven type to an Iceberg type. + * + * @param columnType The Deephaven type to be converted. + * @return The converted Iceberg type. + */ + public static Type convertToIcebergType(final Class columnType) { + final Type icebergType = DH_TO_ICEBERG_TYPE_MAP.get(columnType); + if (icebergType != null) { + return icebergType; + } else { + throw new TableDataException("Unsupported deephaven column type " + columnType.getName()); + } + } + + public static class SpecAndSchema { + private final PartitionSpec partitionSpec; + private final Schema schema; + + private SpecAndSchema(final PartitionSpec partitionSpec, final Schema schema) { + this.partitionSpec = partitionSpec; + this.schema = schema; + } + + public PartitionSpec partitionSpec() { + return partitionSpec; + } + + public Schema schema() { + return schema; + } + } + + /** + * Create {@link PartitionSpec} and {@link Schema} from a {@link TableDefinition} using the provided instructions. + */ + public static SpecAndSchema createSpecAndSchema( + @NotNull final TableDefinition tableDefinition, + @NotNull final IcebergWriteInstructions instructions) { + final Collection partitioningColumnNames = new ArrayList<>(); + final List fields = new ArrayList<>(); + int fieldID = 1; // Iceberg field IDs start from 1 + + // Create the schema first and use it to build the partition spec + final Map dhToIcebergColumnRenames = instructions.dhToIcebergColumnRenames(); + for (final ColumnDefinition columnDefinition : tableDefinition.getColumns()) { + final String dhColumnName = columnDefinition.getName(); + final String icebergColName = dhToIcebergColumnRenames.getOrDefault(dhColumnName, dhColumnName); + final Type icebergType = convertToIcebergType(columnDefinition.getDataType()); + fields.add(Types.NestedField.optional(fieldID, icebergColName, icebergType)); + if (columnDefinition.isPartitioning()) { + partitioningColumnNames.add(icebergColName); + } + fieldID++; + } + final Schema schema = new Schema(fields); + + final PartitionSpec partitionSpec = createPartitionSpec(schema, partitioningColumnNames); + return new SpecAndSchema(partitionSpec, schema); + } + + private static PartitionSpec createPartitionSpec( + @NotNull final Schema schema, + @NotNull final Iterable partitionColumnNames) { + final PartitionSpec.Builder partitionSpecBuilder = PartitionSpec.builderFor(schema); + for (final String partitioningColumnName : partitionColumnNames) { + partitionSpecBuilder.identity(partitioningColumnName); + } + return partitionSpecBuilder.build(); + } + + /** + * Check if an existing iceberg table with provided schema is compatible for overwriting with a new table with given + * schema. + * + * @param icebergSchema The schema of the existing iceberg table. + * @param newSchema The schema of the new table. + * + * @throws IllegalArgumentException if the schemas are not compatible. + */ + public static void verifyOverwriteCompatibility( + final Schema icebergSchema, + final Schema newSchema) { + if (!icebergSchema.sameSchema(newSchema)) { + throw new IllegalArgumentException("Schema mismatch, iceberg table schema: " + icebergSchema + + ", schema derived from the table definition: " + newSchema); + } + } + + /** + * Check if an existing iceberg table with provided partition spec is compatible for overwriting with a new table + * with given partition spec. + * + * @param icebergPartitionSpec The partition spec of the existing iceberg table. + * @param newPartitionSpec The partition spec of the new table. + * + * @throws IllegalArgumentException if the partition spec are not compatible. + */ + public static void verifyOverwriteCompatibility( + final PartitionSpec icebergPartitionSpec, + final PartitionSpec newPartitionSpec) { + if (!icebergPartitionSpec.compatibleWith(newPartitionSpec)) { + throw new IllegalArgumentException("Partition spec mismatch, iceberg table partition spec: " + + icebergPartitionSpec + ", partition spec derived from table definition: " + newPartitionSpec); + } + } + + /** + * Check if an existing iceberg table with provided schema is compatible for appending deephaven table with provided + * definition. + * + * @param icebergSchema The schema of the iceberg table. + * @param tableDefinition The table definition of the deephaven table. + * + * @throws IllegalArgumentException if the schemas are not compatible. + */ + public static void verifyAppendCompatibility( + @NotNull final Schema icebergSchema, + @NotNull final TableDefinition tableDefinition, + @NotNull final IcebergWriteInstructions instructions) { + // Check that all columns in the table definition are part of the Iceberg schema and have the same type + final Map dhToIcebergColumnRenames = instructions.dhToIcebergColumnRenames(); + for (final ColumnDefinition dhColumn : tableDefinition.getColumns()) { + final String dhColumnName = dhColumn.getName(); + final String icebergColName = dhToIcebergColumnRenames.getOrDefault(dhColumnName, dhColumnName); + final Types.NestedField icebergColumn = icebergSchema.findField(icebergColName); + if (icebergColumn == null) { + throw new IllegalArgumentException("Schema mismatch, column " + dhColumn.getName() + " from Deephaven " + + "table definition: " + tableDefinition + " is not found in Iceberg table schema: " + + icebergSchema); + } + if (!icebergColumn.type().equals(convertToIcebergType(dhColumn.getDataType()))) { + throw new IllegalArgumentException("Schema mismatch, column " + dhColumn.getName() + " from Deephaven " + + "table definition: " + tableDefinition + " has type " + dhColumn.getDataType() + + " which does not match the type " + icebergColumn.type() + " in Iceberg table schema: " + + icebergSchema); + } + } + + // Check that all required columns in the Iceberg schema are part of the table definition + final Map icebergToDhColumnRenames = instructions.icebergToDhColumnRenames(); + for (final Types.NestedField icebergColumn : icebergSchema.columns()) { + if (icebergColumn.isOptional()) { + continue; + } + final String icebergColumnName = icebergColumn.name(); + final String dhColName = icebergToDhColumnRenames.getOrDefault(icebergColumnName, icebergColumnName); + if (tableDefinition.getColumn(dhColName) == null) { + throw new IllegalArgumentException("Schema mismatch, required column " + icebergColumnName + + " from Iceberg table schema: " + icebergSchema + " is not found in Deephaven table " + + "definition: " + tableDefinition); + } + } + } + + /** + * Check if an existing iceberg table with provided partition spec is compatible for appending deephaven table with + * provided definition. + * + * @param partitionSpec The partition spec of the iceberg table. + * @param tableDefinition The table definition of the deephaven table. + * + * @throws IllegalArgumentException if the partition spec are not compatible. + */ + public static void verifyAppendCompatibility( + @NotNull final PartitionSpec partitionSpec, + @NotNull final TableDefinition tableDefinition, + @NotNull final IcebergWriteInstructions instructions) { + final Map icebergToDhColumnRenames = instructions.icebergToDhColumnRenames(); + final Set icebergPartitionColumns = partitionSpec.fields().stream() + .map(PartitionField::name) + .map(icebergColumnName -> icebergToDhColumnRenames.getOrDefault(icebergColumnName, icebergColumnName)) + .collect(Collectors.toSet()); + final Set dhPartitioningColumns = tableDefinition.getColumns().stream() + .filter(ColumnDefinition::isPartitioning) + .map(ColumnDefinition::getName) + .collect(Collectors.toSet()); + if (!icebergPartitionColumns.equals(dhPartitioningColumns)) { + throw new IllegalArgumentException("Partitioning column mismatch, iceberg table partition spec: " + + partitionSpec + ", deephaven table definition: " + tableDefinition); + } + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java index c2a3bfc7a67..8d6bda501ce 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java @@ -10,7 +10,7 @@ import io.deephaven.iceberg.location.IcebergTableLocationKey; import io.deephaven.iceberg.location.IcebergTableParquetLocationKey; import io.deephaven.iceberg.relative.RelativeFileIO; -import io.deephaven.iceberg.util.IcebergInstructions; +import io.deephaven.iceberg.util.IcebergReadInstructions; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.iceberg.internal.DataInstructionsProviderLoader; import org.apache.iceberg.*; @@ -20,9 +20,12 @@ import java.net.URI; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.Consumer; +import java.util.stream.Stream; + +import static io.deephaven.iceberg.base.IcebergUtils.allDataFiles; public abstract class IcebergBaseLayout implements TableLocationKeyFinder { /** @@ -48,7 +51,7 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder locationKeyObserver) { - try { - // Retrieve the manifest files from the snapshot - final List manifestFiles = snapshot.allManifests(fileIO); - for (final ManifestFile manifestFile : manifestFiles) { - // Currently only can process manifest files with DATA content type. - if (manifestFile.content() != ManifestContent.DATA) { - throw new TableDataException( - String.format("%s:%d - only DATA manifest files are currently supported, encountered %s", - table, snapshot.snapshotId(), manifestFile.content())); - } - try (final ManifestReader reader = ManifestFiles.read(manifestFile, fileIO)) { - for (DataFile df : reader) { + try (final Stream dataFiles = allDataFiles(table, snapshot)) { + dataFiles + .map(df -> { final URI fileUri = dataFileUri(df); - final IcebergTableLocationKey locationKey = - cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, fileUri)); - if (locationKey != null) { - locationKeyObserver.accept(locationKey); - } - } - } - } - } catch (final Exception e) { + return cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, uri)); + }) + .filter(Objects::nonNull) + .forEach(locationKeyObserver); + } catch (final RuntimeException e) { throw new TableDataException( String.format("%s:%d - error finding Iceberg locations", table, snapshot.snapshotId()), e); } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java index fd407d7702e..064f30d8065 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java @@ -6,7 +6,7 @@ import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; import io.deephaven.iceberg.location.IcebergTableLocationKey; -import io.deephaven.iceberg.util.IcebergInstructions; +import io.deephaven.iceberg.util.IcebergReadInstructions; import io.deephaven.iceberg.internal.DataInstructionsProviderLoader; import org.apache.iceberg.*; import org.apache.iceberg.io.FileIO; @@ -31,7 +31,7 @@ public IcebergFlatLayout( @NotNull final Table table, @NotNull final Snapshot tableSnapshot, @NotNull final FileIO fileIO, - @NotNull final IcebergInstructions instructions, + @NotNull final IcebergReadInstructions instructions, @NotNull final DataInstructionsProviderLoader dataInstructionsProvider) { super(tableDef, table, tableSnapshot, fileIO, instructions, dataInstructionsProvider); } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java index f1a3cc9a5ea..0614dbf7e5d 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java @@ -8,7 +8,7 @@ import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; import io.deephaven.iceberg.location.IcebergTableLocationKey; -import io.deephaven.iceberg.util.IcebergInstructions; +import io.deephaven.iceberg.util.IcebergReadInstructions; import io.deephaven.iceberg.internal.DataInstructionsProviderLoader; import io.deephaven.util.type.TypeUtils; import org.apache.commons.lang3.mutable.MutableInt; @@ -53,7 +53,7 @@ public IcebergKeyValuePartitionedLayout( @NotNull final org.apache.iceberg.Snapshot tableSnapshot, @NotNull final FileIO fileIO, @NotNull final PartitionSpec partitionSpec, - @NotNull final IcebergInstructions instructions, + @NotNull final IcebergReadInstructions instructions, @NotNull final DataInstructionsProviderLoader dataInstructionsProvider) { super(tableDef, table, tableSnapshot, fileIO, instructions, dataInstructionsProvider); diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergAppend.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergAppend.java new file mode 100644 index 00000000000..07e079f816f --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergAppend.java @@ -0,0 +1,51 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import io.deephaven.annotations.BuildableStyle; +import io.deephaven.engine.table.Table; +import org.immutables.value.Value; + +import java.util.List; + +@Value.Immutable +@BuildableStyle +public abstract class IcebergAppend { + /** + * The identifier string for the Iceberg table to append to + */ + public abstract String tableIdentifier(); + + /** + * The Deephaven tables to append. All tables should have the same definition, else a table definition should be + * provided in the {@link #instructions()}. + */ + public abstract List dhTables(); + + /** + * The instructions for customizations while writing, defaults to {@link IcebergParquetWriteInstructions#DEFAULT}. + */ + @Value.Default + public IcebergWriteInstructions instructions() { + return IcebergParquetWriteInstructions.DEFAULT; + } + + public static Builder builder() { + return ImmutableIcebergAppend.builder(); + } + + public interface Builder { + Builder tableIdentifier(String tableIdentifier); + + Builder addDhTables(Table element); + + Builder addDhTables(Table... elements); + + Builder addAllDhTables(Iterable elements); + + Builder instructions(IcebergWriteInstructions instructions); + + IcebergAppend build(); + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergBaseInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergBaseInstructions.java new file mode 100644 index 00000000000..172cee8da53 --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergBaseInstructions.java @@ -0,0 +1,28 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import io.deephaven.engine.table.TableDefinition; + +import java.util.Optional; + +public interface IcebergBaseInstructions { + + /** + * The {@link TableDefinition} to use when reading/writing Iceberg data files. + */ + Optional tableDefinition(); + + /** + * The data instructions to use for reading/writing the Iceberg data files (might be S3Instructions or other cloud + * provider-specific instructions). + */ + Optional dataInstructions(); + + interface Builder { + INSTRUCTIONS_BUILDER tableDefinition(TableDefinition tableDefinition); + + INSTRUCTIONS_BUILDER dataInstructions(Object s3Instructions); + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java index 736e1a3b7a3..d790764603c 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java @@ -24,16 +24,29 @@ import io.deephaven.iceberg.layout.IcebergKeyValuePartitionedLayout; import io.deephaven.iceberg.location.IcebergTableLocationFactory; import io.deephaven.iceberg.location.IcebergTableLocationKey; +import io.deephaven.parquet.table.ParquetInstructions; +import io.deephaven.parquet.table.ParquetTools; import io.deephaven.time.DateTimeUtils; import io.deephaven.util.annotations.VisibleForTesting; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.OverwriteFiles; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdatePartitionSpec; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.ResourcePaths; import org.apache.iceberg.types.Type; @@ -41,10 +54,18 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import java.net.URI; import java.time.Instant; -import java.time.LocalDateTime; import java.util.*; import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static io.deephaven.iceberg.base.IcebergUtils.SpecAndSchema; +import static io.deephaven.iceberg.base.IcebergUtils.convertToDHType; +import static io.deephaven.iceberg.base.IcebergUtils.allDataFiles; +import static io.deephaven.iceberg.base.IcebergUtils.createSpecAndSchema; +import static io.deephaven.iceberg.base.IcebergUtils.verifyAppendCompatibility; +import static io.deephaven.iceberg.base.IcebergUtils.verifyOverwriteCompatibility; public class IcebergCatalogAdapter { @@ -67,6 +88,8 @@ public class IcebergCatalogAdapter { ColumnDefinition.fromGenericType("Summary", Map.class), ColumnDefinition.fromGenericType("SnapshotObject", Snapshot.class)); + private static final String DEFAULT_GENERATED_FILE_FORMAT = "parquet"; + private final Catalog catalog; private final DataInstructionsProviderLoader dataInstructionsProvider; @@ -155,7 +178,7 @@ private static TableDefinition fromSchema( continue; } final Type type = field.type(); - final io.deephaven.qst.type.Type qstType = convertPrimitiveType(type); + final io.deephaven.qst.type.Type qstType = convertToDHType(type); final ColumnDefinition column; if (partitionNames.contains(name)) { column = ColumnDefinition.of(name, qstType).withPartitioning(); @@ -192,50 +215,6 @@ private static TableDefinition fromSchema( return tableDef; } - /** - * Convert an Iceberg data type to a Deephaven type. - * - * @param icebergType The Iceberg data type to be converted. - * @return The converted Deephaven type. - */ - static io.deephaven.qst.type.Type convertPrimitiveType(@NotNull final Type icebergType) { - final Type.TypeID typeId = icebergType.typeId(); - switch (typeId) { - case BOOLEAN: - return io.deephaven.qst.type.Type.booleanType().boxedType(); - case DOUBLE: - return io.deephaven.qst.type.Type.doubleType(); - case FLOAT: - return io.deephaven.qst.type.Type.floatType(); - case INTEGER: - return io.deephaven.qst.type.Type.intType(); - case LONG: - return io.deephaven.qst.type.Type.longType(); - case STRING: - return io.deephaven.qst.type.Type.stringType(); - case TIMESTAMP: - final Types.TimestampType timestampType = (Types.TimestampType) icebergType; - return timestampType.shouldAdjustToUTC() - ? io.deephaven.qst.type.Type.find(Instant.class) - : io.deephaven.qst.type.Type.find(LocalDateTime.class); - case DATE: - return io.deephaven.qst.type.Type.find(java.time.LocalDate.class); - case TIME: - return io.deephaven.qst.type.Type.find(java.time.LocalTime.class); - case DECIMAL: - return io.deephaven.qst.type.Type.find(java.math.BigDecimal.class); - case FIXED: // Fall through - case BINARY: - return io.deephaven.qst.type.Type.find(byte[].class); - case UUID: // Fall through - case STRUCT: // Fall through - case LIST: // Fall through - case MAP: // Fall through - default: - throw new TableDataException("Unsupported iceberg column type " + typeId.name()); - } - } - /** * List all {@link Namespace namespaces} in the catalog. This method is only supported if the catalog implements * {@link SupportsNamespaces} for namespace discovery. See {@link SupportsNamespaces#listNamespaces(Namespace)}. @@ -461,7 +440,7 @@ private Snapshot getSnapshot(@NotNull final TableIdentifier tableIdentifier, fin private Map getRenameColumnMap( @NotNull final org.apache.iceberg.Table table, @NotNull final Schema schema, - @NotNull final IcebergInstructions instructions) { + @NotNull final IcebergReadInstructions instructions) { final Set takenNames = new HashSet<>(); @@ -506,7 +485,7 @@ private Map getRenameColumnMap( */ public TableDefinition getTableDefinition( @NotNull final String tableIdentifier, - @Nullable final IcebergInstructions instructions) { + @Nullable final IcebergReadInstructions instructions) { final TableIdentifier tableId = TableIdentifier.parse(tableIdentifier); // Load the table from the catalog. return getTableDefinition(tableId, instructions); @@ -522,7 +501,7 @@ public TableDefinition getTableDefinition( */ public TableDefinition getTableDefinition( @NotNull final TableIdentifier tableIdentifier, - @Nullable final IcebergInstructions instructions) { + @Nullable final IcebergReadInstructions instructions) { // Load the table from the catalog. return getTableDefinitionInternal(tableIdentifier, null, instructions); } @@ -539,7 +518,7 @@ public TableDefinition getTableDefinition( public TableDefinition getTableDefinition( @NotNull final String tableIdentifier, final long snapshotId, - @Nullable final IcebergInstructions instructions) { + @Nullable final IcebergReadInstructions instructions) { final TableIdentifier tableId = TableIdentifier.parse(tableIdentifier); // Find the snapshot with the given snapshot id @@ -564,7 +543,7 @@ public TableDefinition getTableDefinition( public TableDefinition getTableDefinition( @NotNull final TableIdentifier tableIdentifier, @Nullable final Snapshot tableSnapshot, - @Nullable final IcebergInstructions instructions) { + @Nullable final IcebergReadInstructions instructions) { // Load the table from the catalog. return getTableDefinitionInternal(tableIdentifier, tableSnapshot, instructions); } @@ -579,7 +558,7 @@ public TableDefinition getTableDefinition( */ public Table getTableDefinitionTable( @NotNull final String tableIdentifier, - @Nullable final IcebergInstructions instructions) { + @Nullable final IcebergReadInstructions instructions) { final TableIdentifier tableId = TableIdentifier.parse(tableIdentifier); return getTableDefinitionTable(tableId, instructions); } @@ -594,7 +573,7 @@ public Table getTableDefinitionTable( */ public Table getTableDefinitionTable( @NotNull final TableIdentifier tableIdentifier, - @Nullable final IcebergInstructions instructions) { + @Nullable final IcebergReadInstructions instructions) { final TableDefinition definition = getTableDefinition(tableIdentifier, instructions); return TableTools.metaTable(definition); } @@ -611,7 +590,7 @@ public Table getTableDefinitionTable( public Table getTableDefinitionTable( @NotNull final String tableIdentifier, final long snapshotId, - @Nullable final IcebergInstructions instructions) { + @Nullable final IcebergReadInstructions instructions) { final TableIdentifier tableId = TableIdentifier.parse(tableIdentifier); // Find the snapshot with the given snapshot id @@ -635,7 +614,7 @@ public Table getTableDefinitionTable( public Table getTableDefinitionTable( @NotNull final TableIdentifier tableIdentifier, @Nullable final Snapshot tableSnapshot, - @Nullable final IcebergInstructions instructions) { + @Nullable final IcebergReadInstructions instructions) { final TableDefinition definition = getTableDefinition(tableIdentifier, tableSnapshot, instructions); return TableTools.metaTable(definition); } @@ -646,7 +625,7 @@ public Table getTableDefinitionTable( private TableDefinition getTableDefinitionInternal( @NotNull final TableIdentifier tableIdentifier, @Nullable final Snapshot tableSnapshot, - @Nullable final IcebergInstructions instructions) { + @Nullable final IcebergReadInstructions instructions) { final org.apache.iceberg.Table table = catalog.loadTable(tableIdentifier); if (table == null) { throw new IllegalArgumentException("Table not found: " + tableIdentifier); @@ -655,7 +634,8 @@ private TableDefinition getTableDefinitionInternal( final Snapshot snapshot = tableSnapshot != null ? tableSnapshot : table.currentSnapshot(); final Schema schema = snapshot != null ? table.schemas().get(snapshot.schemaId()) : table.schema(); - final IcebergInstructions userInstructions = instructions == null ? IcebergInstructions.DEFAULT : instructions; + final IcebergReadInstructions userInstructions = + instructions == null ? IcebergReadInstructions.DEFAULT : instructions; return fromSchema(schema, table.spec(), @@ -670,10 +650,9 @@ private TableDefinition getTableDefinitionInternal( * @param instructions The instructions for customizations while reading * @return The loaded table */ - @SuppressWarnings("unused") public Table readTable( @NotNull final TableIdentifier tableIdentifier, - @Nullable final IcebergInstructions instructions) { + @Nullable final IcebergReadInstructions instructions) { return readTableInternal(tableIdentifier, null, instructions); } @@ -684,10 +663,9 @@ public Table readTable( * @param instructions The instructions for customizations while reading * @return The loaded table */ - @SuppressWarnings("unused") public Table readTable( @NotNull final String tableIdentifier, - @Nullable final IcebergInstructions instructions) { + @Nullable final IcebergReadInstructions instructions) { return readTable(TableIdentifier.parse(tableIdentifier), instructions); } @@ -714,7 +692,6 @@ private Snapshot getTableSnapshot(@NotNull TableIdentifier tableIdentifier, long * @param tableSnapshotId The snapshot id to load * @return The loaded table */ - @SuppressWarnings("unused") public Table readTable(@NotNull final TableIdentifier tableIdentifier, final long tableSnapshotId) { // Find the snapshot with the given snapshot id final Snapshot tableSnapshot = getTableSnapshot(tableIdentifier, tableSnapshotId); @@ -747,11 +724,10 @@ public Table readTable(@NotNull final String tableIdentifier, final long tableSn * @param instructions The instructions for customizations while reading * @return The loaded table */ - @SuppressWarnings("unused") public Table readTable( @NotNull final TableIdentifier tableIdentifier, final long tableSnapshotId, - @Nullable final IcebergInstructions instructions) { + @Nullable final IcebergReadInstructions instructions) { // Find the snapshot with the given snapshot id final Snapshot tableSnapshot = getSnapshot(tableIdentifier, tableSnapshotId); if (tableSnapshot == null) { @@ -768,11 +744,10 @@ public Table readTable( * @param instructions The instructions for customizations while reading * @return The loaded table */ - @SuppressWarnings("unused") public Table readTable( @NotNull final String tableIdentifier, final long tableSnapshotId, - @Nullable final IcebergInstructions instructions) { + @Nullable final IcebergReadInstructions instructions) { return readTable(TableIdentifier.parse(tableIdentifier), tableSnapshotId, instructions); } @@ -784,33 +759,34 @@ public Table readTable( * @param instructions The instructions for customizations while reading * @return The loaded table */ - @SuppressWarnings("unused") public Table readTable( @NotNull final TableIdentifier tableIdentifier, @NotNull final Snapshot tableSnapshot, - @Nullable final IcebergInstructions instructions) { + @Nullable final IcebergReadInstructions instructions) { return readTableInternal(tableIdentifier, tableSnapshot, instructions); } private Table readTableInternal( @NotNull final TableIdentifier tableIdentifier, @Nullable final Snapshot tableSnapshot, - @Nullable final IcebergInstructions instructions) { + @Nullable final IcebergReadInstructions instructions) { // Load the table from the catalog. final org.apache.iceberg.Table table = catalog.loadTable(tableIdentifier); if (table == null) { throw new IllegalArgumentException("Table not found: " + tableIdentifier); } + final Schema schema = tableSnapshot == null ? table.schema() : table.schemas().get(tableSnapshot.schemaId()); + // Do we want the latest or a specific snapshot? final Snapshot snapshot = tableSnapshot != null ? tableSnapshot : table.currentSnapshot(); - final Schema schema = snapshot == null ? table.schema() : table.schemas().get(snapshot.schemaId()); // Load the partitioning schema. final org.apache.iceberg.PartitionSpec partitionSpec = table.spec(); // Get default instructions if none are provided - final IcebergInstructions userInstructions = instructions == null ? IcebergInstructions.DEFAULT : instructions; + final IcebergReadInstructions userInstructions = + instructions == null ? IcebergReadInstructions.DEFAULT : instructions; // Get the user supplied table definition. final TableDefinition userTableDef = userInstructions.tableDefinition().orElse(null); @@ -839,7 +815,7 @@ private Table readTableInternal( refreshService = null; updateSourceRegistrar = null; - description = "Read static iceberg table with " + keyFinder; + description = "Read static Iceberg table with " + keyFinder; final AbstractTableLocationProvider locationProvider = new PollingTableLocationProvider<>( StandaloneTableKey.getInstance(), @@ -860,8 +836,328 @@ private Table readTableInternal( /** * Returns the underlying Iceberg {@link Catalog catalog} used by this adapter. */ - @SuppressWarnings("unused") public Catalog catalog() { return catalog; } + + /** + * Append the provided Deephaven table as a new partition to the existing Iceberg table in a single snapshot. This + * will not change the schema of the existing table. + * + * @param icebergAppend The {@link IcebergAppend} object containing the data/instructions for writing + */ + public void append(@NotNull final IcebergAppend icebergAppend) { + writeImpl(TableIdentifier.parse(icebergAppend.tableIdentifier()), + icebergAppend.dhTables(), + icebergAppend.instructions(), + false, + true); + } + + /** + * Overwrite the existing Iceberg table with the provided Deephaven tables in a single snapshot. This will overwrite + * the schema of the existing table to match the provided Deephaven table if they do not match. + *

+ * Overwriting a table while racing with other writers can lead to failure/undefined results. + * + * @param icebergOverwrite The {@link IcebergOverwrite} object containing the data/instructions for writing + */ + public void overwrite(@NotNull final IcebergOverwrite icebergOverwrite) { + writeImpl(TableIdentifier.parse(icebergOverwrite.tableIdentifier()), + icebergOverwrite.dhTables(), + icebergOverwrite.instructions(), + true, + true); + } + + /** + * Writes data from Deephaven tables to an Iceberg table without creating a new snapshot. This method returns a list + * of data files that were written. Users can use this list to create a transaction/snapshot if needed. + * + * @param icebergWriteDataFiles The {@link IcebergWriteDataFiles} object containing the data/instructions for + * writing + */ + public List writeDataFiles(@NotNull final IcebergWriteDataFiles icebergWriteDataFiles) { + return writeImpl(TableIdentifier.parse(icebergWriteDataFiles.tableIdentifier()), + icebergWriteDataFiles.dhTables(), + icebergWriteDataFiles.instructions(), + false, + false); + } + + /** + * Appends or overwrites data in an Iceberg table with the provided Deephaven tables. + * + * @param tableIdentifier The identifier for the Iceberg table to append to or overwrite + * @param dhTables The Deephaven tables to write + * @param instructions The instructions for customizations while writing, or null to use default instructions + * @param overwrite If true, the existing data in the Iceberg table will be overwritten; if false, the data will be + * appended + * @param addSnapshot If true, a new snapshot will be created in the Iceberg table with the written data + * + * @return A list of DataFile objects representing the written data files. + */ + private List writeImpl( + @NotNull final TableIdentifier tableIdentifier, + @NotNull List

dhTables, + @NotNull final IcebergWriteInstructions instructions, + final boolean overwrite, + final boolean addSnapshot) { + if (overwrite && !addSnapshot) { + throw new IllegalArgumentException("Cannot overwrite an Iceberg table without adding a snapshot"); + } + if (dhTables.isEmpty()) { + if (!overwrite) { + // Nothing to append + return Collections.emptyList(); + } + // Overwrite with an empty table + dhTables = List.of(TableTools.emptyTable(0)); + } + + IcebergParquetWriteInstructions writeInstructions = verifyWriteInstructions(instructions); + + // Don't verify schema by default if overwriting + final boolean verifySchema = writeInstructions.verifySchema().orElse(!overwrite); + + final TableDefinition useDefinition; + if (writeInstructions.tableDefinition().isPresent()) { + useDefinition = writeInstructions.tableDefinition().get(); + } else { + // Verify that all tables have the same definition + final TableDefinition firstDefinition = dhTables.get(0).getDefinition(); + final int numTables = dhTables.size(); + for (int idx = 1; idx < numTables; idx++) { + if (!firstDefinition.equals(dhTables.get(idx).getDefinition())) { + throw new IllegalArgumentException( + "All Deephaven tables must have the same definition, else table definition should be " + + "provided when writing multiple tables with different definitions"); + } + } + useDefinition = firstDefinition; + writeInstructions = writeInstructions.withTableDefinition(useDefinition); + } + + // Try loading the table from the catalog, or create if required + final org.apache.iceberg.Table icebergTable; + final SpecAndSchema newSpecAndSchema; + final boolean newNamespaceCreated; + final boolean newTableCreated; + if (catalog.tableExists(tableIdentifier)) { + icebergTable = catalog.loadTable(tableIdentifier); + newSpecAndSchema = createSpecAndSchema(useDefinition, writeInstructions); + newNamespaceCreated = false; + newTableCreated = false; + if (verifySchema) { + try { + if (overwrite) { + verifyOverwriteCompatibility(icebergTable.schema(), newSpecAndSchema.schema()); + verifyOverwriteCompatibility(icebergTable.spec(), newSpecAndSchema.partitionSpec()); + } else { + verifyAppendCompatibility(icebergTable.schema(), useDefinition, writeInstructions); + verifyAppendCompatibility(icebergTable.spec(), useDefinition, writeInstructions); + } + } catch (final IllegalArgumentException e) { + throw new IllegalArgumentException("Schema verification failed. Please provide a compatible " + + "schema or disable verification in the Iceberg instructions. See the linked exception " + + "for more details.", e); + } + } + } else if (writeInstructions.createTableIfNotExist()) { + newNamespaceCreated = createNamespaceIfNotExists(tableIdentifier.namespace()); + newSpecAndSchema = createSpecAndSchema(useDefinition, writeInstructions); + icebergTable = createNewIcebergTable(tableIdentifier, newSpecAndSchema, writeInstructions); + newTableCreated = true; + } else { + throw new IllegalArgumentException( + "Table does not exist: " + tableIdentifier + ", update the instructions " + + "to create the table if it does not exist and try again."); + } + + try { + final List parquetFileInfo = + writeParquet(icebergTable, dhTables, writeInstructions); + final List appendFiles = dataFilesFromParquet(parquetFileInfo); + if (addSnapshot) { + commit(icebergTable, newSpecAndSchema, appendFiles, overwrite, verifySchema); + } + return appendFiles; + } catch (final Throwable throwable) { + if (newTableCreated) { + // Delete it to avoid leaving a partial table in the catalog + try { + catalog.dropTable(tableIdentifier, true); + } catch (final RuntimeException dropException) { + throwable.addSuppressed(dropException); + } + } + if (newNamespaceCreated) { + // Delete it to avoid leaving a partial namespace in the catalog + try { + dropNamespaceIfExists(tableIdentifier.namespace()); + } catch (final RuntimeException dropException) { + throwable.addSuppressed(dropException); + } + } + throw throwable; + } + } + + private static IcebergParquetWriteInstructions verifyWriteInstructions( + @NotNull final IcebergWriteInstructions instructions) { + // We ony support writing to Parquet files + if (!(instructions instanceof IcebergParquetWriteInstructions)) { + throw new IllegalArgumentException("Unsupported instructions of class " + instructions.getClass() + " for" + + " writing Iceberg table, expected: " + IcebergParquetWriteInstructions.class); + } + return (IcebergParquetWriteInstructions) instructions; + } + + private boolean createNamespaceIfNotExists(@NotNull final Namespace namespace) { + if (catalog instanceof SupportsNamespaces) { + final SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; + try { + nsCatalog.createNamespace(namespace); + return true; + } catch (final AlreadyExistsException | UnsupportedOperationException e) { + return false; + } + } + return false; + } + + private boolean dropNamespaceIfExists(@NotNull final Namespace namespace) { + if (catalog instanceof SupportsNamespaces) { + final SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; + try { + return nsCatalog.dropNamespace(namespace); + } catch (final NamespaceNotEmptyException e) { + return false; + } + } + return false; + } + + private org.apache.iceberg.Table createNewIcebergTable( + @NotNull final TableIdentifier tableIdentifier, + @NotNull final SpecAndSchema specAndSchema, + @NotNull final IcebergParquetWriteInstructions writeInstructions) { + return catalog.createTable(tableIdentifier, specAndSchema.schema(), specAndSchema.partitionSpec(), + Map.of(TableProperties.DEFAULT_FILE_FORMAT, DEFAULT_GENERATED_FILE_FORMAT)); + } + + private static class CompletedParquetWrite { + private final URI destination; + private final long numRows; + private final long numBytes; + + private CompletedParquetWrite(final URI destination, final long numRows, final long numBytes) { + this.destination = destination; + this.numRows = numRows; + this.numBytes = numBytes; + } + } + + @NotNull + private static List writeParquet( + @NotNull final org.apache.iceberg.Table icebergTable, + @NotNull final Collection
dhTables, + @NotNull final IcebergParquetWriteInstructions writeInstructions) { + // Build the parquet instructions + final List parquetFilesWritten = new ArrayList<>(dhTables.size()); + final ParquetInstructions.OnWriteCompleted onWriteCompleted = + (destination, numRows, numBytes) -> parquetFilesWritten + .add(new CompletedParquetWrite(destination, numRows, numBytes)); + final ParquetInstructions parquetInstructions = writeInstructions.toParquetInstructions( + onWriteCompleted, icebergTable.schema().idToName()); + + // Write the data to parquet files + int count = 0; + for (final Table dhTable : dhTables) { + if (dhTable.numColumns() == 0) { + // Skip writing empty tables with no columns + continue; + } + final String filename = String.format( + "00000-%d-%s.parquet", + count++, + UUID.randomUUID()); + final String newDataLocation = icebergTable.locationProvider().newDataLocation(filename); + ParquetTools.writeTable(dhTable, newDataLocation, parquetInstructions); + } + return parquetFilesWritten; + } + + /** + * Commit the changes to the Iceberg table by creating a snapshot. + */ + private static void commit( + @NotNull final org.apache.iceberg.Table icebergTable, + @NotNull final SpecAndSchema newSpecAndSchema, + @NotNull final Iterable appendFiles, + final boolean overwrite, + final boolean schemaVerified) { + final Transaction icebergTransaction = icebergTable.newTransaction(); + final Snapshot currentSnapshot = icebergTable.currentSnapshot(); + // For a null current snapshot, we are creating a new table. So we can just append instead of overwriting. + if (overwrite && currentSnapshot != null) { + // Fail if the table gets changed concurrently + final OverwriteFiles overwriteFiles = icebergTransaction.newOverwrite() + .validateFromSnapshot(currentSnapshot.snapshotId()) + .validateNoConflictingDeletes() + .validateNoConflictingData(); + + // Delete all the existing data files in the table + try (final Stream dataFiles = allDataFiles(icebergTable, currentSnapshot)) { + dataFiles.forEach(overwriteFiles::deleteFile); + } + appendFiles.forEach(overwriteFiles::addFile); + overwriteFiles.commit(); + + // Update the spec and schema of the existing table. + // If we have already verified the schema, we don't need to update it. + if (!schemaVerified) { + if (!icebergTable.schema().sameSchema(newSpecAndSchema.schema())) { + final UpdateSchema updateSchema = icebergTransaction.updateSchema().allowIncompatibleChanges(); + icebergTable.schema().columns().stream() + .map(Types.NestedField::name) + .forEach(updateSchema::deleteColumn); + newSpecAndSchema.schema().columns() + .forEach(column -> updateSchema.addColumn(column.name(), column.type())); + updateSchema.commit(); + } + if (!icebergTable.spec().compatibleWith(newSpecAndSchema.partitionSpec())) { + final UpdatePartitionSpec updateSpec = icebergTransaction.updateSpec(); + icebergTable.spec().fields().forEach(field -> updateSpec.removeField(field.name())); + newSpecAndSchema.partitionSpec().fields().forEach(field -> updateSpec.addField(field.name())); + updateSpec.commit(); + } + } + } else { + // Append the new data files to the table + final AppendFiles append = icebergTransaction.newAppend(); + appendFiles.forEach(append::appendFile); + append.commit(); + } + + // Commit the transaction, creating new snapshot for append/overwrite. + // Note that no new snapshot will be created for the schema change. + icebergTransaction.commitTransaction(); + } + + /** + * Generate a list of {@link DataFile} objects from a list of parquet files written. + */ + private static List dataFilesFromParquet( + @NotNull final Collection parquetFilesWritten) { + // TODO This assumes no partition data is written, is that okay? + return parquetFilesWritten.stream() + .map(parquetFileWritten -> DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(parquetFileWritten.destination.toString()) + .withFormat(FileFormat.PARQUET) + .withRecordCount(parquetFileWritten.numRows) + .withFileSizeInBytes(parquetFileWritten.numBytes) + .build()) + .collect(Collectors.toList()); + } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java deleted file mode 100644 index b595b4cfd14..00000000000 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java +++ /dev/null @@ -1,63 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.iceberg.util; - -import io.deephaven.annotations.BuildableStyle; -import io.deephaven.engine.table.TableDefinition; -import org.immutables.value.Value.Immutable; - -import java.util.Map; -import java.util.Optional; - -/** - * This class provides instructions intended for reading Iceberg catalogs and tables. The default values documented in - * this class may change in the future. As such, callers may wish to explicitly set the values. - */ -@Immutable -@BuildableStyle -public abstract class IcebergInstructions { - /** - * The default {@link IcebergInstructions} to use when reading Iceberg data files. Providing this will use system - * defaults for cloud provider-specific parameters - */ - @SuppressWarnings("unused") - public static final IcebergInstructions DEFAULT = builder().build(); - - public static Builder builder() { - return ImmutableIcebergInstructions.builder(); - } - - /** - * The {@link TableDefinition} to use when reading Iceberg data files. - */ - public abstract Optional tableDefinition(); - - /** - * The data instructions to use for reading the Iceberg data files (might be S3Instructions or other cloud - * provider-specific instructions). - */ - public abstract Optional dataInstructions(); - - /** - * A {@link Map map} of rename instructions from Iceberg to Deephaven column names to use when reading the Iceberg - * data files. - */ - public abstract Map columnRenames(); - - public interface Builder { - @SuppressWarnings("unused") - Builder tableDefinition(TableDefinition tableDefinition); - - @SuppressWarnings("unused") - Builder dataInstructions(Object s3Instructions); - - @SuppressWarnings("unused") - Builder putColumnRenames(String key, String value); - - @SuppressWarnings("unused") - Builder putAllColumnRenames(Map entries); - - IcebergInstructions build(); - } -} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergOverwrite.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergOverwrite.java new file mode 100644 index 00000000000..4af3224dfe3 --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergOverwrite.java @@ -0,0 +1,51 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import io.deephaven.annotations.BuildableStyle; +import io.deephaven.engine.table.Table; +import org.immutables.value.Value; + +import java.util.List; + +@Value.Immutable +@BuildableStyle +public abstract class IcebergOverwrite { + /** + * The identifier string for the Iceberg table to overwrite + */ + public abstract String tableIdentifier(); + + /** + * The Deephaven tables to overwrite with. All tables should have the same definition, else a table definition + * should be provided in the {@link #instructions()}. An empty list will overwrite with an empty table. + */ + public abstract List
dhTables(); + + /** + * The instructions for customizations while writing, defaults to {@link IcebergParquetWriteInstructions#DEFAULT}. + */ + @Value.Default + public IcebergWriteInstructions instructions() { + return IcebergParquetWriteInstructions.DEFAULT; + } + + public static Builder builder() { + return ImmutableIcebergOverwrite.builder(); + } + + public interface Builder { + Builder tableIdentifier(String tableIdentifier); + + Builder addDhTables(Table element); + + Builder addDhTables(Table... elements); + + Builder addAllDhTables(Iterable elements); + + Builder instructions(IcebergWriteInstructions instructions); + + IcebergOverwrite build(); + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergParquetWriteInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergParquetWriteInstructions.java new file mode 100644 index 00000000000..c1277105043 --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergParquetWriteInstructions.java @@ -0,0 +1,135 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import io.deephaven.annotations.CopyableStyle; +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.parquet.table.ParquetInstructions; +import org.immutables.value.Value.Default; +import org.immutables.value.Value.Immutable; +import org.immutables.value.Value.Check; +import org.jetbrains.annotations.NotNull; + +import java.util.Map; + +import static io.deephaven.parquet.table.ParquetInstructions.MIN_TARGET_PAGE_SIZE; + +/** + * This class provides instructions intended for writing Iceberg tables as Parquet data files. The default values + * documented in this class may change in the future. As such, callers may wish to explicitly set the values. + */ +@Immutable +@CopyableStyle +public abstract class IcebergParquetWriteInstructions extends IcebergWriteInstructions { + /** + * The default {@link IcebergParquetWriteInstructions} to use when reading/writing Iceberg tables as Parquet data + * files. + */ + @SuppressWarnings("unused") + public static final IcebergParquetWriteInstructions DEFAULT = builder().build(); + + public static Builder builder() { + return ImmutableIcebergParquetWriteInstructions.builder(); + } + + /** + * The name of the compression codec to use when writing Parquet files; defaults to + * {@value ParquetInstructions#DEFAULT_COMPRESSION_CODEC_NAME}. + */ + @Default + public String compressionCodecName() { + return ParquetInstructions.DEFAULT_COMPRESSION_CODEC_NAME; + } + + /** + * The maximum number of unique keys the parquet file writer should add to a dictionary page before switching to + * non-dictionary encoding; defaults to {@value ParquetInstructions#DEFAULT_MAXIMUM_DICTIONARY_KEYS}; never + * evaluated for non-String columns. + */ + @Default + public int maximumDictionaryKeys() { + return ParquetInstructions.DEFAULT_MAXIMUM_DICTIONARY_KEYS; + } + + /** + * The maximum number of bytes the parquet file writer should add to a dictionary before switching to non-dictionary + * encoding; defaults to {@value ParquetInstructions#DEFAULT_MAXIMUM_DICTIONARY_SIZE}; never evaluated for + * non-String columns. + */ + @Default + public int maximumDictionarySize() { + return ParquetInstructions.DEFAULT_MAXIMUM_DICTIONARY_SIZE; + } + + /** + * The target page size for writing the parquet files; defaults to + * {@value ParquetInstructions#DEFAULT_TARGET_PAGE_SIZE}, should be greater than or equal to + * {@value ParquetInstructions#MIN_TARGET_PAGE_SIZE}. + */ + @Default + public int targetPageSize() { + return ParquetInstructions.DEFAULT_TARGET_PAGE_SIZE; + } + + abstract IcebergParquetWriteInstructions withTableDefinition(@NotNull final TableDefinition tableDefinition); + + /** + * Convert this {@link IcebergParquetWriteInstructions} to a {@link ParquetInstructions}. + * + * @param onWriteCompleted The callback to be invoked after writing the parquet file. + * @param fieldIdToName Mapping of field id to field name, to be populated inside the parquet file's schema + */ + ParquetInstructions toParquetInstructions( + @NotNull final ParquetInstructions.OnWriteCompleted onWriteCompleted, + @NotNull final Map fieldIdToName) { + final ParquetInstructions.Builder builder = new ParquetInstructions.Builder(); + + tableDefinition().ifPresent(builder::setTableDefinition); + dataInstructions().ifPresent(builder::setSpecialInstructions); + + // Add parquet writing specific instructions. + builder.addFieldIdMapping(fieldIdToName); + builder.setCompressionCodecName(compressionCodecName()); + builder.setMaximumDictionaryKeys(maximumDictionaryKeys()); + builder.setMaximumDictionarySize(maximumDictionarySize()); + builder.setTargetPageSize(targetPageSize()); + builder.setOnWriteCompleted(onWriteCompleted); + + return builder.build(); + } + + public interface Builder extends IcebergWriteInstructions.Builder { + Builder compressionCodecName(String compressionCodecName); + + Builder maximumDictionaryKeys(int maximumDictionaryKeys); + + Builder maximumDictionarySize(int maximumDictionarySize); + + Builder targetPageSize(int targetPageSize); + + IcebergParquetWriteInstructions build(); + } + + @Check + final void boundsCheckMaxDictionaryKeys() { + if (maximumDictionaryKeys() < 0) { + throw new IllegalArgumentException("maximumDictionaryKeys(=" + maximumDictionaryKeys() + ") must be >= 0"); + } + } + + @Check + final void boundsCheckMaxDictionarySize() { + if (maximumDictionarySize() < 0) { + throw new IllegalArgumentException("maximumDictionarySize(=" + maximumDictionarySize() + ") must be >= 0"); + } + } + + @Check + final void boundsCheckTargetPageSize() { + if (targetPageSize() < MIN_TARGET_PAGE_SIZE) { + throw new IllegalArgumentException( + "targetPageSize(=" + targetPageSize() + ") must be >= " + MIN_TARGET_PAGE_SIZE); + } + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergReadInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergReadInstructions.java new file mode 100644 index 00000000000..0bd1405a69d --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergReadInstructions.java @@ -0,0 +1,42 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import io.deephaven.annotations.BuildableStyle; +import org.immutables.value.Value.Immutable; + +import java.util.Map; + +/** + * This class provides instructions intended for reading Iceberg catalogs and tables. The default values documented in + * this class may change in the future. As such, callers may wish to explicitly set the values. + */ +@Immutable +@BuildableStyle +public abstract class IcebergReadInstructions implements IcebergBaseInstructions { + /** + * The default {@link IcebergReadInstructions} to use when reading Iceberg data files. Providing this will use + * system defaults for cloud provider-specific parameters. + */ + public static final IcebergReadInstructions DEFAULT = builder().build(); + + public static Builder builder() { + return ImmutableIcebergReadInstructions.builder(); + } + + /** + * A {@link Map map} of rename instructions from Iceberg to Deephaven column names to use when reading the Iceberg + * data files. + */ + public abstract Map columnRenames(); + + public interface Builder extends IcebergBaseInstructions.Builder { + Builder putColumnRenames(String key, String value); + + @SuppressWarnings("unused") + Builder putAllColumnRenames(Map entries); + + IcebergReadInstructions build(); + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergWriteDataFiles.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergWriteDataFiles.java new file mode 100644 index 00000000000..494c8f2073f --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergWriteDataFiles.java @@ -0,0 +1,51 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import io.deephaven.annotations.BuildableStyle; +import io.deephaven.engine.table.Table; +import org.immutables.value.Value; + +import java.util.List; + +@Value.Immutable +@BuildableStyle +public abstract class IcebergWriteDataFiles { + /** + * The identifier string for the Iceberg table to write to. + */ + public abstract String tableIdentifier(); + + /** + * The Deephaven tables to be written. All tables should have the same definition, else a table definition should be + * provided in the {@link #instructions()}. + */ + public abstract List
dhTables(); + + /** + * The instructions for customizations while writing, defaults to {@link IcebergParquetWriteInstructions#DEFAULT}. + */ + @Value.Default + public IcebergWriteInstructions instructions() { + return IcebergParquetWriteInstructions.DEFAULT; + } + + public static Builder builder() { + return ImmutableIcebergWriteDataFiles.builder(); + } + + public interface Builder { + Builder tableIdentifier(String tableIdentifier); + + Builder addDhTables(Table element); + + Builder addDhTables(Table... elements); + + Builder addAllDhTables(Iterable elements); + + Builder instructions(IcebergWriteInstructions instructions); + + IcebergWriteDataFiles build(); + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergWriteInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergWriteInstructions.java new file mode 100644 index 00000000000..a863030281f --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergWriteInstructions.java @@ -0,0 +1,82 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import io.deephaven.util.annotations.InternalUseOnly; +import org.immutables.value.Value; +import org.immutables.value.Value.Default; + +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * This class provides instructions intended for writing Iceberg tables. The default values documented in this class may + * change in the future. As such, callers may wish to explicitly set the values. + */ +public abstract class IcebergWriteInstructions implements IcebergBaseInstructions { + /** + * While writing to an iceberg table, whether to create the iceberg table if it does not exist, defaults to + * {@code false}. + */ + @Default + public boolean createTableIfNotExist() { + return false; + } + + // @formatter:off + /** + * Specifies whether to verify that the partition spec and schema of the table being written are consistent with the + * Iceberg table. + * + *

Verification behavior differs based on the operation type:

+ *
    + *
  • Appending Data or Writing Data Files: Verification is enabled by default. It ensures that: + *
      + *
    • All columns from the Deephaven table are present in the Iceberg table and have compatible types.
    • + *
    • All required columns in the Iceberg table are present in the Deephaven table.
    • + *
    • The set of partitioning columns in both the Iceberg and Deephaven tables are identical.
    • + *
    + *
  • + *
  • Overwriting Data: Verification is disabled by default. When enabled, it ensures that the + * schema and partition spec of the table being written are identical to those of the Iceberg table.
  • + *
+ */ + public abstract Optional verifySchema(); + // @formatter:on + + /** + * A one-to-one {@link Map map} from Deephaven to Iceberg column names to use when writing deephaven tables to + * Iceberg tables. + */ + // TODO Please suggest better name for this method, on the read side its just called columnRenames + public abstract Map dhToIcebergColumnRenames(); + + /** + * The inverse map of {@link #dhToIcebergColumnRenames()}. + */ + @InternalUseOnly + @Value.Lazy + public Map icebergToDhColumnRenames() { + return dhToIcebergColumnRenames().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)); + } + + public interface Builder extends IcebergBaseInstructions.Builder { + INSTRUCTIONS_BUILDER createTableIfNotExist(boolean createTableIfNotExist); + + INSTRUCTIONS_BUILDER verifySchema(boolean verifySchema); + + INSTRUCTIONS_BUILDER putDhToIcebergColumnRenames(String key, String value); + + INSTRUCTIONS_BUILDER putAllDhToIcebergColumnRenames(Map entries); + } + + @Value.Check + final void checkColumnRenamesUnique() { + if (dhToIcebergColumnRenames().size() != dhToIcebergColumnRenames().values().stream().distinct().count()) { + throw new IllegalArgumentException("Duplicate values in column renames, values must be unique"); + } + } +} diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/CatalogAdapterTest.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/CatalogAdapterTest.java index d08e1735db0..bc0e7da12ce 100644 --- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/CatalogAdapterTest.java +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/CatalogAdapterTest.java @@ -3,19 +3,38 @@ // package io.deephaven.iceberg; +import io.deephaven.UncheckedDeephavenException; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.select.FormulaEvaluationException; +import io.deephaven.engine.util.TableTools; +import io.deephaven.iceberg.base.IcebergUtils; import io.deephaven.iceberg.junit5.CatalogAdapterBase; +import io.deephaven.iceberg.util.IcebergAppend; +import io.deephaven.iceberg.util.IcebergOverwrite; +import io.deephaven.iceberg.util.IcebergParquetWriteInstructions; +import io.deephaven.iceberg.util.IcebergWriteDataFiles; +import io.deephaven.iceberg.util.IcebergWriteInstructions; +import io.deephaven.parquet.table.ParquetTools; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; import static org.assertj.core.api.Assertions.assertThat; -public class CatalogAdapterTest extends CatalogAdapterBase { +class CatalogAdapterTest extends CatalogAdapterBase { @Test void empty() { assertThat(catalogAdapter.listNamespaces()).isEmpty(); @@ -46,4 +65,576 @@ void createEmptyTable() { // Note: this is failing w/ NPE, assumes that Snapshot is non-null. // assertThat(table.isEmpty()).isTrue(); } + + @Test + void appendTableBasicTest() { + final Table source = TableTools.emptyTable(10) + .update("intCol = (int) 2 * i + 10", + "doubleCol = (double) 2.5 * i + 10"); + final String tableIdentifier = "MyNamespace.MyTable"; + try { + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(source) + .build()); + } catch (RuntimeException e) { + assertThat(e.getMessage()).contains("Table does not exist"); + } + final IcebergWriteInstructions writeInstructions = IcebergParquetWriteInstructions.builder() + .createTableIfNotExist(true) + .build(); + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(source) + .instructions(writeInstructions) + .build()); + Table fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + assertTableEquals(source, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append")); + + // Append more data with different compression codec + final Table moreData = TableTools.emptyTable(5) + .update("intCol = (int) 3 * i + 20", + "doubleCol = (double) 3.5 * i + 20"); + final IcebergWriteInstructions writeInstructionsLZ4 = IcebergParquetWriteInstructions.builder() + .compressionCodecName("LZ4") + .build(); + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(moreData) + .instructions(writeInstructionsLZ4) + .build()); + fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + final Table expected = TableTools.merge(moreData, source); + assertTableEquals(expected, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append", "append")); + + // Append an empty table + final Table emptyTable = TableTools.emptyTable(0) + .update("intCol = (int) 4 * i + 30", + "doubleCol = (double) 4.5 * i + 30"); + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(emptyTable) + .instructions(writeInstructions) + .build()); + fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + assertTableEquals(expected, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append", "append", "append")); + + // Append multiple tables in a single call with different compression codec + final Table someMoreData = TableTools.emptyTable(3) + .update("intCol = (int) 5 * i + 40", + "doubleCol = (double) 5.5 * i + 40"); + final IcebergWriteInstructions writeInstructionsGZIP = IcebergParquetWriteInstructions.builder() + .compressionCodecName("GZIP") + .build(); + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(someMoreData, moreData, emptyTable) + .instructions(writeInstructionsGZIP) + .build()); + fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + final Table expected2 = TableTools.merge(someMoreData, moreData, expected); + assertTableEquals(expected2, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append", "append", "append", "append")); + } + + @Test + void overwriteTablesBasicTest() { + final Table source = TableTools.emptyTable(10) + .update("intCol = (int) 2 * i + 10", + "doubleCol = (double) 2.5 * i + 10"); + final String tableIdentifier = "MyNamespace.MyTable"; + try { + catalogAdapter.overwrite(IcebergOverwrite.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(source) + .build()); + } catch (RuntimeException e) { + assertThat(e.getMessage()).contains("Table does not exist"); + } + final IcebergWriteInstructions writeInstructions = IcebergParquetWriteInstructions.builder() + .createTableIfNotExist(true) + .build(); + catalogAdapter.overwrite(IcebergOverwrite.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(source) + .instructions(writeInstructions) + .build()); + Table fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + assertTableEquals(source, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append")); + + // Overwrite with more data + final Table moreData = TableTools.emptyTable(5) + .update("intCol = (int) 3 * i + 20", + "doubleCol = (double) 3.5 * i + 20"); + catalogAdapter.overwrite(IcebergOverwrite.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(moreData) + .instructions(writeInstructions) + .build()); + fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + assertTableEquals(moreData, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append", "overwrite")); + + // Overwrite with an empty table + final Table emptyTable = TableTools.emptyTable(0) + .update("intCol = (int) 4 * i + 30", + "doubleCol = (double) 4.5 * i + 30"); + catalogAdapter.overwrite(IcebergOverwrite.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(emptyTable) + .instructions(writeInstructions) + .build()); + fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + assertTableEquals(emptyTable, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append", "overwrite", "overwrite")); + + // Overwrite with multiple tables in a single call + final Table someMoreData = TableTools.emptyTable(3) + .update("intCol = (int) 5 * i + 40", + "doubleCol = (double) 5.5 * i + 40"); + catalogAdapter.overwrite(IcebergOverwrite.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(someMoreData, moreData, emptyTable) + .instructions(writeInstructions) + .build()); + fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + final Table expected2 = TableTools.merge(someMoreData, moreData); + assertTableEquals(expected2, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append", "overwrite", "overwrite", "overwrite")); + } + + private void verifySnapshots(final String tableIdentifier, final List expectedOperations) { + final Iterable snapshots = + catalogAdapter.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).snapshots(); + assertThat(snapshots).hasSize(expectedOperations.size()); + final Iterator snapshotIter = snapshots.iterator(); + for (final String expectedOperation : expectedOperations) { + assertThat(snapshotIter.next().operation()).isEqualTo(expectedOperation); + } + } + + @Test + void overwriteWithDifferentDefinition() { + final Table source = TableTools.emptyTable(10) + .update("intCol = (int) 2 * i + 10", + "doubleCol = (double) 2.5 * i + 10"); + final String tableIdentifier = "MyNamespace.MyTable"; + final IcebergWriteInstructions writeInstructionsWithSchemaMatching = IcebergParquetWriteInstructions.builder() + .createTableIfNotExist(true) + .verifySchema(true) + .build(); + + { + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(source) + .instructions(writeInstructionsWithSchemaMatching) + .build()); + final Table fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + assertTableEquals(source, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append")); + } + + final Table differentSource = TableTools.emptyTable(10) + .update("intCol = (int) 2 * i + 10"); + try { + catalogAdapter.overwrite(IcebergOverwrite.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(differentSource) + .instructions(writeInstructionsWithSchemaMatching) + .build()); + } catch (RuntimeException e) { + assertThat(e.getMessage()).contains("Schema verification failed"); + } + + // By default, schema verification should be disabled for overwriting + final IcebergWriteInstructions writeInstructionsWithoutSchemaMatching = + IcebergParquetWriteInstructions.builder().build(); + { + catalogAdapter.overwrite(IcebergOverwrite.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(differentSource) + .instructions(writeInstructionsWithoutSchemaMatching) + .build()); + final Table fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + assertTableEquals(differentSource, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append", "overwrite")); + } + + // Append more data to this table with the updated schema + { + final Table moreData = TableTools.emptyTable(5) + .update("intCol = (int) 3 * i + 20"); + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(moreData) + .instructions(writeInstructionsWithoutSchemaMatching) + .build()); + final Table fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + final Table expected = TableTools.merge(moreData, differentSource); + assertTableEquals(expected, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append", "overwrite", "append")); + } + + // Overwrite with an empty list + { + catalogAdapter.overwrite(IcebergOverwrite.builder() + .tableIdentifier(tableIdentifier) + .instructions(writeInstructionsWithoutSchemaMatching) + .build()); + final Table fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + assertTableEquals(TableTools.emptyTable(0), fromIceberg); + verifySnapshots(tableIdentifier, List.of("append", "overwrite", "append", "overwrite")); + } + } + + @Test + void appendWithDifferentDefinition() { + final Table source = TableTools.emptyTable(10) + .update("intCol = (int) 2 * i + 10", + "doubleCol = (double) 2.5 * i + 10"); + final String tableIdentifier = "MyNamespace.MyTable"; + + // By default, schema verification should be enabled for appending + final IcebergWriteInstructions writeInstructions = IcebergParquetWriteInstructions.builder() + .createTableIfNotExist(true) + .build(); + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(source) + .instructions(writeInstructions) + .build()); + Table fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + assertTableEquals(source, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append")); + + final Table differentSource = TableTools.emptyTable(10) + .update("shortCol = (short) 2 * i + 10"); + try { + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(differentSource) + .instructions(writeInstructions) + .build()); + } catch (RuntimeException e) { + assertThat(e.getMessage()).contains("Schema verification failed"); + } + + // Append a table with just the int column, should be compatible with the existing schema + final Table compatibleSource = TableTools.emptyTable(10) + .update("intCol = (int) 5 * i + 10"); + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(compatibleSource) + .instructions(writeInstructions) + .build()); + fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + final Table expected = TableTools.merge(compatibleSource.update("doubleCol = NULL_DOUBLE"), source); + assertTableEquals(expected, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append", "append")); + + // Append more data + final Table moreData = TableTools.emptyTable(5) + .update("intCol = (int) 3 * i + 20", + "doubleCol = (double) 3.5 * i + 20"); + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(moreData) + .instructions(writeInstructions) + .build()); + fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + final Table expected2 = TableTools.merge(moreData, expected); + assertTableEquals(expected2, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append", "append", "append")); + + // Append an empty list + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdentifier) + .instructions(writeInstructions) + .build()); + fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + assertTableEquals(expected2, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append", "append", "append")); + } + + @Test + void appendMultipleTablesWithDefinitionTest() { + final Table source = TableTools.emptyTable(10) + .update("intCol = (int) 2 * i + 10", + "doubleCol = (double) 2.5 * i + 10"); + final IcebergWriteInstructions writeInstructions = IcebergParquetWriteInstructions.builder() + .createTableIfNotExist(true) + .verifySchema(true) + .build(); + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier("MyNamespace.MyTable") + .addDhTables(source) + .instructions(writeInstructions) + .build()); + Table fromIceberg = catalogAdapter.readTable("MyNamespace.MyTable", null); + assertTableEquals(source, fromIceberg); + + final Table appendTable1 = TableTools.emptyTable(5) + .update("intCol = (int) 3 * i + 20", + "doubleCol = (double) 3.5 * i + 20", + "shortCol = (short) 3 * i + 20"); + final Table appendTable2 = TableTools.emptyTable(5) + .update("charCol = (char) 65 + i % 26", + "intCol = (int) 4 * i + 30", + "doubleCol = (double) 4.5 * i + 30"); + + try { + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier("MyNamespace.MyTable") + .addDhTables(appendTable1, appendTable2) + .instructions(writeInstructions) + .build()); + } catch (RuntimeException e) { + assertThat(e.getMessage()).contains("All Deephaven tables must have the same definition"); + } + + // Set a table definition that is compatible with all tables + final TableDefinition writeDefinition = TableDefinition.of( + ColumnDefinition.ofInt("intCol"), + ColumnDefinition.ofDouble("doubleCol")); + final IcebergWriteInstructions writeInstructionsWithDefinition = IcebergParquetWriteInstructions.builder() + .createTableIfNotExist(true) + .verifySchema(true) + .tableDefinition(writeDefinition) + .build(); + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier("MyNamespace.MyTable") + .addDhTables(appendTable1, appendTable2) + .instructions(writeInstructionsWithDefinition) + .build()); + fromIceberg = catalogAdapter.readTable("MyNamespace.MyTable", null); + final Table expected = TableTools.merge( + appendTable1.dropColumns("shortCol"), + appendTable2.dropColumns("charCol"), + source); + assertTableEquals(expected, fromIceberg); + } + + @Test + void appendToCatalogTableWithAllDataTypesTest() { + final Schema schema = new Schema( + Types.NestedField.required(1, "booleanCol", Types.BooleanType.get()), + Types.NestedField.required(2, "doubleCol", Types.DoubleType.get()), + Types.NestedField.required(3, "floatCol", Types.FloatType.get()), + Types.NestedField.required(4, "intCol", Types.IntegerType.get()), + Types.NestedField.required(5, "longCol", Types.LongType.get()), + Types.NestedField.required(6, "stringCol", Types.StringType.get()), + Types.NestedField.required(7, "instantCol", Types.TimestampType.withZone()), + Types.NestedField.required(8, "localDateTimeCol", Types.TimestampType.withoutZone()), + Types.NestedField.required(9, "localDateCol", Types.DateType.get()), + Types.NestedField.required(10, "localTimeCol", Types.TimeType.get()), + Types.NestedField.required(11, "binaryCol", Types.BinaryType.get())); + final Namespace myNamespace = Namespace.of("MyNamespace"); + final TableIdentifier myTableId = TableIdentifier.of(myNamespace, "MyTableWithAllDataTypes"); + catalogAdapter.catalog().createTable(myTableId, schema); + + final Table source = TableTools.emptyTable(10) + .update( + "booleanCol = i % 2 == 0", + "doubleCol = (double) 2.5 * i + 10", + "floatCol = (float) (2.5 * i + 10)", + "intCol = 2 * i + 10", + "longCol = (long) (2 * i + 10)", + "stringCol = String.valueOf(2 * i + 10)", + "instantCol = java.time.Instant.now()", + "localDateTimeCol = java.time.LocalDateTime.now()", + "localDateCol = java.time.LocalDate.now()", + "localTimeCol = java.time.LocalTime.now()", + "binaryCol = new byte[] {(byte) i}"); + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(myTableId.toString()) + .addDhTables(source) + .build()); + final Table fromIceberg = catalogAdapter.readTable(myTableId, null); + assertTableEquals(source, fromIceberg); + } + + @Test + void testFailureInWrite() { + // Try creating a new iceberg table with bad data + final Table badSource = TableTools.emptyTable(5) + .updateView( + "stringCol = ii % 2 == 0 ? Long.toString(ii) : null", + "intCol = (int) stringCol.charAt(0)"); + final IcebergWriteInstructions writeInstructions = IcebergParquetWriteInstructions.builder() + .createTableIfNotExist(true) + .verifySchema(true) + .build(); + final Namespace myNamespace = Namespace.of("MyNamespace"); + final TableIdentifier myTableId = TableIdentifier.of(myNamespace, "MyTable"); + final String tableIdString = myTableId.toString(); + + try { + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdString) + .addDhTables(badSource) + .instructions(writeInstructions) + .build()); + Assertions.fail("Exception expected for invalid formula in table"); + } catch (UncheckedDeephavenException e) { + assertThat(e.getCause() instanceof FormulaEvaluationException).isTrue(); + } + assertThat(catalogAdapter.listNamespaces()).isEmpty(); + + // Now create a table with good data with same schema and append a bad source to it + final Table goodSource = TableTools.emptyTable(5) + .update("stringCol = Long.toString(ii)", + "intCol = (int) i"); + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdString) + .addDhTables(goodSource) + .instructions(writeInstructions) + .build()); + Table fromIceberg = catalogAdapter.readTable(tableIdString, null); + assertTableEquals(goodSource, fromIceberg); + + try { + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdString) + .addDhTables(badSource) + .instructions(writeInstructions) + .build()); + Assertions.fail("Exception expected for invalid formula in table"); + } catch (UncheckedDeephavenException e) { + assertThat(e.getCause() instanceof FormulaEvaluationException).isTrue(); + } + + // Make sure existing good data is not deleted + assertThat(catalogAdapter.listNamespaces()).contains(myNamespace); + assertThat(catalogAdapter.listTables(myNamespace)).containsExactly(myTableId); + fromIceberg = catalogAdapter.readTable(myTableId, null); + assertTableEquals(goodSource, fromIceberg); + } + + @Test + void testColumnRenameWhileWriting() { + final Table source = TableTools.emptyTable(10) + .update("intCol = (int) 2 * i + 10", + "doubleCol = (double) 2.5 * i + 10"); + IcebergWriteInstructions writeInstructions = IcebergParquetWriteInstructions.builder() + .createTableIfNotExist(true) + .verifySchema(true) + .build(); + + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier("MyNamespace.MyTable") + .addDhTables(source) + .instructions(writeInstructions) + .build()); + // TODO: This is failing because we don't map columns based on the column ID when reading. Uncomment when this + // is fixed. + // final Table fromIceberg = catalogAdapter.readTable("MyNamespace.MyTable", null); + // assertTableEquals(source, fromIceberg); + + final TableIdentifier tableIdentifier = TableIdentifier.of("MyNamespace", "MyTable"); + verifyDataFiles(tableIdentifier, List.of(source)); + + // TODO Verify that the column ID is set correctly after #6156 is merged + + // Now append more data to it + final Table moreData = TableTools.emptyTable(5) + .update("newIntCol = (int) 3 * i + 20", + "newDoubleCol = (double) 3.5 * i + 20"); + writeInstructions = IcebergParquetWriteInstructions.builder() + .verifySchema(true) + .putDhToIcebergColumnRenames("newIntCol", "intCol") + .putDhToIcebergColumnRenames("newDoubleCol", "doubleCol") + .build(); + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier("MyNamespace.MyTable") + .addDhTables(moreData) + .instructions(writeInstructions) + .build()); + + // Verify the data files in the table. Note that we are assuming an order here. + verifyDataFiles(tableIdentifier, List.of(moreData, source)); + + // TODO Verify that the column ID is set correctly after #6156 is merged + } + + /** + * Verify that the data files in the table match the Deephaven tables in the given sequence. + */ + private void verifyDataFiles( + final TableIdentifier tableIdentifier, + final List
dhTables) { + final org.apache.iceberg.Table table = catalogAdapter.catalog().loadTable(tableIdentifier); + final List dataFileList = IcebergUtils.allDataFiles(table, table.currentSnapshot()) + .collect(Collectors.toList()); + assertThat(dataFileList).hasSize(dhTables.size()); + + // Check that each Deephaven table matches the corresponding data file in sequence + for (int i = 0; i < dhTables.size(); i++) { + final Table dhTable = dhTables.get(i); + final DataFile dataFile = dataFileList.get(i); + final String parquetFilePath = dataFile.path().toString(); + final Table fromParquet = ParquetTools.readTable(parquetFilePath); + assertTableEquals(dhTable, fromParquet); + } + } + + @Test + void writeDataFilesBasicTest() { + final Table source = TableTools.emptyTable(10) + .update("intCol = (int) 2 * i + 10", + "doubleCol = (double) 2.5 * i + 10"); + final Table anotherSource = TableTools.emptyTable(5) + .update("intCol = (int) 3 * i + 20", + "doubleCol = (double) 3.5 * i + 20"); + final String tableIdentifier = "MyNamespace.MyTable"; + try { + catalogAdapter.writeDataFiles(IcebergWriteDataFiles.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(source) + .build()); + } catch (RuntimeException e) { + assertThat(e.getMessage()).contains("Table does not exist"); + } + final IcebergWriteInstructions writeInstructions = IcebergParquetWriteInstructions.builder() + .createTableIfNotExist(true) + .build(); + final List dataFilesWritten = catalogAdapter.writeDataFiles(IcebergWriteDataFiles.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(source, anotherSource) + .instructions(writeInstructions) + .build()); + verifySnapshots(tableIdentifier, List.of()); + assertThat(dataFilesWritten).hasSize(2); + + // Append some data to the table + final Table moreData = TableTools.emptyTable(5) + .update("intCol = (int) 3 * i + 20", + "doubleCol = (double) 3.5 * i + 20"); + catalogAdapter.append(IcebergAppend.builder() + .tableIdentifier(tableIdentifier) + .addDhTables(moreData) + .instructions(writeInstructions) + .build()); + { + final Table fromIceberg = catalogAdapter.readTable(tableIdentifier, null); + assertTableEquals(moreData, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append")); + verifyDataFiles(TableIdentifier.parse(tableIdentifier), List.of(moreData)); + } + + // Now commit those data files to the table + final org.apache.iceberg.Table icebergTable = + catalogAdapter.catalog().loadTable(TableIdentifier.parse(tableIdentifier)); + final AppendFiles append = icebergTable.newAppend(); + dataFilesWritten.forEach(append::appendFile); + append.commit(); + + // Verify that the data files are now in the table + verifySnapshots(tableIdentifier, List.of("append", "append")); + verifyDataFiles(TableIdentifier.parse(tableIdentifier), List.of(source, anotherSource, moreData)); + } } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java index 8cf51a65e7e..05be3271f15 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java @@ -72,6 +72,13 @@ public RowGroupWriter addRowGroup(final long size) { return rowGroupWriter; } + /** + * Get the number of bytes written to the parquet file so far. + */ + public long getCount() { + return countingOutput.getCount(); + } + @Override public void close() throws IOException { serializeOffsetIndexes(); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/MappedSchema.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/MappedSchema.java index d818443bb58..3eedfa05f15 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/MappedSchema.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/MappedSchema.java @@ -26,21 +26,14 @@ static MappedSchema create( final TableDefinition definition, final RowSet rowSet, final Map> columnSourceMap, - final ParquetInstructions instructions, - final ColumnDefinition... extraColumns) { + final ParquetInstructions instructions) { final MessageTypeBuilder builder = Types.buildMessage(); for (final ColumnDefinition columnDefinition : definition.getColumns()) { - TypeInfos.TypeInfo typeInfo = + final TypeInfos.TypeInfo typeInfo = getTypeInfo(computedCache, columnDefinition, rowSet, columnSourceMap, instructions); - Type schemaType = typeInfo.createSchemaType(columnDefinition, instructions); + final Type schemaType = typeInfo.createSchemaType(columnDefinition, instructions); builder.addField(schemaType); } - - for (final ColumnDefinition extraColumn : extraColumns) { - builder.addField(getTypeInfo(computedCache, extraColumn, rowSet, columnSourceMap, instructions) - .createSchemaType(extraColumn, instructions)); - } - final MessageType schema = builder.named("root"); return new MappedSchema(definition, schema); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java index a34d6ad2c20..fb1f49f4ab3 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java @@ -3,6 +3,7 @@ // package io.deephaven.parquet.table; +import gnu.trove.map.hash.TIntObjectHashMap; import io.deephaven.base.verify.Require; import io.deephaven.configuration.Configuration; import io.deephaven.engine.table.TableDefinition; @@ -11,16 +12,18 @@ import io.deephaven.hash.KeyedObjectKey; import io.deephaven.parquet.base.ParquetUtils; import io.deephaven.util.annotations.VisibleForTesting; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalInt; import java.util.Set; import java.util.function.Function; import java.util.function.Predicate; @@ -33,7 +36,15 @@ */ public abstract class ParquetInstructions implements ColumnToCodecMappings { - private static volatile String defaultCompressionCodecName = CompressionCodecName.SNAPPY.toString(); + public static final String DEFAULT_COMPRESSION_CODEC_NAME = "SNAPPY"; + public static final int DEFAULT_MAXIMUM_DICTIONARY_KEYS = 1 << 20; + public static final int DEFAULT_MAXIMUM_DICTIONARY_SIZE = 1 << 20; + public static final int MIN_TARGET_PAGE_SIZE = 1 << 11; // 2KB + private static final int minTargetPageSize = Configuration.getInstance().getIntegerWithDefault( + "Parquet.minTargetPageSize", MIN_TARGET_PAGE_SIZE); + public static final int DEFAULT_TARGET_PAGE_SIZE = 1 << 16; // 64KB + private static final int defaultTargetPageSize = Configuration.getInstance().getIntegerWithDefault( + "Parquet.defaultTargetPageSize", DEFAULT_TARGET_PAGE_SIZE); /** * Throws an exception if {@link ParquetInstructions#getTableDefinition()} is empty. @@ -41,96 +52,15 @@ public abstract class ParquetInstructions implements ColumnToCodecMappings { * @param parquetInstructions the parquet instructions * @throws IllegalArgumentException if there is not a table definition */ - public static TableDefinition ensureDefinition(ParquetInstructions parquetInstructions) { + static TableDefinition ensureDefinition(final ParquetInstructions parquetInstructions) { return parquetInstructions.getTableDefinition() .orElseThrow(() -> new IllegalArgumentException("Table definition must be provided")); } - /** - * Set the default for {@link #getCompressionCodecName()}. - * - * @deprecated Use {@link Builder#setCompressionCodecName(String)} instead. - * @param name The new default - */ - @Deprecated - public static void setDefaultCompressionCodecName(final String name) { - defaultCompressionCodecName = name; - } - - /** - * @return The default for {@link #getCompressionCodecName()} - */ - public static String getDefaultCompressionCodecName() { - return defaultCompressionCodecName; - } - - private static volatile int defaultMaximumDictionaryKeys = 1 << 20; - - /** - * Set the default for {@link #getMaximumDictionaryKeys()}. - * - * @param maximumDictionaryKeys The new default - * @see Builder#setMaximumDictionaryKeys(int) - */ - public static void setDefaultMaximumDictionaryKeys(final int maximumDictionaryKeys) { - defaultMaximumDictionaryKeys = Require.geqZero(maximumDictionaryKeys, "maximumDictionaryKeys"); - } - - /** - * @return The default for {@link #getMaximumDictionaryKeys()} - */ - public static int getDefaultMaximumDictionaryKeys() { - return defaultMaximumDictionaryKeys; - } - - private static volatile int defaultMaximumDictionarySize = 1 << 20; - - /** - * Set the default for {@link #getMaximumDictionarySize()}. - * - * @param maximumDictionarySize The new default - * @see Builder#setMaximumDictionarySize(int) - */ - public static void setDefaultMaximumDictionarySize(final int maximumDictionarySize) { - defaultMaximumDictionarySize = Require.geqZero(maximumDictionarySize, "maximumDictionarySize"); - } - - /** - * @return The default for {@link #getMaximumDictionarySize()} - */ - public static int getDefaultMaximumDictionarySize() { - return defaultMaximumDictionarySize; - } - - public static final int MIN_TARGET_PAGE_SIZE = - Configuration.getInstance().getIntegerWithDefault("Parquet.minTargetPageSize", 1 << 11); // 2KB - private static final int DEFAULT_TARGET_PAGE_SIZE = - Configuration.getInstance().getIntegerWithDefault("Parquet.defaultTargetPageSize", 1 << 16); // 64KB - private static volatile int defaultTargetPageSize = DEFAULT_TARGET_PAGE_SIZE; - private static final boolean DEFAULT_IS_REFRESHING = false; - /** - * Set the default target page size (in bytes) used to section rows of data into pages during column writing. This - * number should be no smaller than {@link #MIN_TARGET_PAGE_SIZE}. - * - * @param newDefaultSizeBytes the new default target page size. - */ - public static void setDefaultTargetPageSize(final int newDefaultSizeBytes) { - if (newDefaultSizeBytes < MIN_TARGET_PAGE_SIZE) { - throw new IllegalArgumentException( - "Default target page size should be larger than " + MIN_TARGET_PAGE_SIZE + " bytes"); - } - defaultTargetPageSize = newDefaultSizeBytes; - } - - /** - * Get the current default target page size in bytes. - * - * @return the current default target page size in bytes. - */ - public static int getDefaultTargetPageSize() { - return defaultTargetPageSize; + public interface OnWriteCompleted { + void onWriteCompleted(URI destination, long numRows, long numBytes); } public enum ParquetFileLayout { @@ -168,7 +98,7 @@ public enum ParquetFileLayout { static final String FILE_INDEX_TOKEN = "{i}"; private static final String DEFAULT_BASE_NAME_FOR_PARTITIONED_PARQUET_DATA = UUID_TOKEN; - public ParquetInstructions() {} + private ParquetInstructions() {} public final String getColumnNameFromParquetColumnNameOrDefault(final String parquetColumnName) { final String mapped = getColumnNameFromParquetColumnName(parquetColumnName); @@ -191,6 +121,8 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par */ public abstract boolean useDictionary(String columnName); + public abstract OptionalInt getFieldId(final String columnName); + public abstract Object getSpecialInstructions(); public abstract String getCompressionCodecName(); @@ -262,6 +194,12 @@ public abstract ParquetInstructions withTableDefinitionAndLayout(final TableDefi */ public abstract String baseNameForPartitionedParquetData(); + /** + * @return A callback to be executed when on completing each parquet data file write (excluding the index and + * metadata files). + */ + public abstract Optional onWriteCompleted(); + @VisibleForTesting public static boolean sameColumnNamesAndCodecMappings(final ParquetInstructions i1, final ParquetInstructions i2) { if (i1 == EMPTY) { @@ -305,6 +243,11 @@ public boolean useDictionary(final String columnName) { return false; } + @Override + public OptionalInt getFieldId(final String columnName) { + return OptionalInt.empty(); + } + @Override @Nullable public Object getSpecialInstructions() { @@ -313,17 +256,17 @@ public Object getSpecialInstructions() { @Override public String getCompressionCodecName() { - return defaultCompressionCodecName; + return DEFAULT_COMPRESSION_CODEC_NAME; } @Override public int getMaximumDictionaryKeys() { - return defaultMaximumDictionaryKeys; + return DEFAULT_MAXIMUM_DICTIONARY_KEYS; } @Override public int getMaximumDictionarySize() { - return defaultMaximumDictionarySize; + return DEFAULT_MAXIMUM_DICTIONARY_SIZE; } @Override @@ -383,7 +326,7 @@ public ParquetInstructions withTableDefinitionAndLayout( return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(), - useLayout, useDefinition, null); + useLayout, useDefinition, null, null); } @Override @@ -391,7 +334,12 @@ ParquetInstructions withIndexColumns(final Collection> indexColumns return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(), - null, null, indexColumns); + null, null, indexColumns, null); + } + + @Override + public Optional onWriteCompleted() { + return Optional.empty(); } }; @@ -401,6 +349,7 @@ private static class ColumnInstructions { private String codecName; private String codecArgs; private boolean useDictionary; + private int fieldId = Integer.MIN_VALUE; public ColumnInstructions(final String columnName) { this.columnName = columnName; @@ -444,6 +393,14 @@ public boolean useDictionary() { public void useDictionary(final boolean useDictionary) { this.useDictionary = useDictionary; } + + public OptionalInt getFieldId() { + return fieldId == Integer.MIN_VALUE ? OptionalInt.empty() : OptionalInt.of(fieldId); + } + + public void setFieldId(final int fieldId) { + this.fieldId = fieldId; + } } private static final class ReadOnly extends ParquetInstructions { @@ -466,6 +423,7 @@ private static final class ReadOnly extends ParquetInstructions { private final ParquetFileLayout fileLayout; private final TableDefinition tableDefinition; private final Collection> indexColumns; + private final OnWriteCompleted onWriteCompleted; private ReadOnly( final KeyedObjectHashMap columnNameToInstructions, @@ -481,7 +439,8 @@ private ReadOnly( final String baseNameForPartitionedParquetData, final ParquetFileLayout fileLayout, final TableDefinition tableDefinition, - final Collection> indexColumns) { + final Collection> indexColumns, + final OnWriteCompleted onWriteCompleted) { this.columnNameToInstructions = columnNameToInstructions; this.parquetColumnNameToInstructions = parquetColumnNameToColumnName; this.compressionCodecName = compressionCodecName; @@ -499,10 +458,11 @@ private ReadOnly( : indexColumns.stream() .map(List::copyOf) .collect(Collectors.toUnmodifiableList()); + this.onWriteCompleted = onWriteCompleted; } - private String getOrDefault(final String columnName, final String defaultValue, - final Function fun) { + private VALUE_TYPE getOrDefault(final String columnName, final VALUE_TYPE defaultValue, + final Function fun) { if (columnNameToInstructions == null) { return defaultValue; } @@ -557,6 +517,11 @@ public boolean useDictionary(final String columnName) { return getOrDefault(columnName, false, ColumnInstructions::useDictionary); } + @Override + public OptionalInt getFieldId(final String columnName) { + return getOrDefault(columnName, OptionalInt.empty(), ColumnInstructions::getFieldId); + } + @Override public String getCompressionCodecName() { return compressionCodecName; @@ -636,7 +601,7 @@ public ParquetInstructions withTableDefinitionAndLayout( getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(), useLayout, useDefinition, - indexColumns); + indexColumns, onWriteCompleted); } @Override @@ -645,7 +610,12 @@ ParquetInstructions withIndexColumns(final Collection> useIndexColu getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(), fileLayout, - tableDefinition, useIndexColumns); + tableDefinition, useIndexColumns, onWriteCompleted); + } + + @Override + public Optional onWriteCompleted() { + return Optional.ofNullable(onWriteCompleted); } KeyedObjectHashMap copyColumnNameToInstructions() { @@ -692,9 +662,9 @@ public static class Builder { // We only store entries in parquetColumnNameToInstructions when the parquetColumnName is // different than the columnName (ie, the column name mapping is not the default mapping) private KeyedObjectHashMap parquetColumnNameToInstructions; - private String compressionCodecName = defaultCompressionCodecName; - private int maximumDictionaryKeys = defaultMaximumDictionaryKeys; - private int maximumDictionarySize = defaultMaximumDictionarySize; + private String compressionCodecName = DEFAULT_COMPRESSION_CODEC_NAME; + private int maximumDictionaryKeys = DEFAULT_MAXIMUM_DICTIONARY_KEYS; + private int maximumDictionarySize = DEFAULT_MAXIMUM_DICTIONARY_SIZE; private boolean isLegacyParquet; private int targetPageSize = defaultTargetPageSize; private boolean isRefreshing = DEFAULT_IS_REFRESHING; @@ -704,6 +674,8 @@ public static class Builder { private ParquetFileLayout fileLayout; private TableDefinition tableDefinition; private Collection> indexColumns; + private OnWriteCompleted onWriteCompleted; + private TIntObjectHashMap usedFieldIdToColumn; /** * For each additional field added, make sure to update the copy constructor builder @@ -731,6 +703,7 @@ public Builder(final ParquetInstructions parquetInstructions) { fileLayout = readOnlyParquetInstructions.getFileLayout().orElse(null); tableDefinition = readOnlyParquetInstructions.getTableDefinition().orElse(null); indexColumns = readOnlyParquetInstructions.getIndexColumns().orElse(null); + onWriteCompleted = readOnlyParquetInstructions.onWriteCompleted().orElse(null); } private void newColumnNameToInstructionsMap() { @@ -833,6 +806,53 @@ public Builder useDictionary(final String columnName, final boolean useDictionar return this; } + /** + * Add a mapping from field ID to column name. This field ID will be populated inside the parquet schema when + * writing the parquet file. + */ + public Builder addFieldId(final String columnName, final int fieldId) { + if (usedFieldIdToColumn == null) { + usedFieldIdToColumn = new TIntObjectHashMap<>(); + usedFieldIdToColumn.put(fieldId, columnName); + getColumnInstructions(columnName).setFieldId(fieldId); + } else { + addFieldIdHelper(fieldId, columnName); + } + return this; + } + + /** + * Populate mapping from field ID to column name using the provided map. These field IDs will be populated + * inside the parquet schema when writing the parquet file. + * + * @param fieldIdToColumn A map from field ID to column name + */ + public Builder addFieldIdMapping(final Map fieldIdToColumn) { + if (usedFieldIdToColumn == null) { + usedFieldIdToColumn = new TIntObjectHashMap<>(fieldIdToColumn.size()); + for (final Map.Entry entry : fieldIdToColumn.entrySet()) { + final int fieldId = entry.getKey(); + final String column = entry.getValue(); + usedFieldIdToColumn.put(fieldId, column); + getColumnInstructions(column).setFieldId(fieldId); + } + } else { + for (final Map.Entry entry : fieldIdToColumn.entrySet()) { + addFieldIdHelper(entry.getKey(), entry.getValue()); + } + } + return this; + } + + private void addFieldIdHelper(final int fieldId, final String column) { + if (usedFieldIdToColumn.containsKey(fieldId)) { + throw new IllegalArgumentException("Field ID " + fieldId + " is already assigned to column " + + usedFieldIdToColumn.get(fieldId) + " and cannot be assigned to column " + column); + } + usedFieldIdToColumn.put(fieldId, column); + getColumnInstructions(column).setFieldId(fieldId); + } + private ColumnInstructions getColumnInstructions(final String columnName) { final ColumnInstructions ci; if (columnNameToInstructions == null) { @@ -880,8 +900,8 @@ public Builder setIsLegacyParquet(final boolean isLegacyParquet) { } public Builder setTargetPageSize(final int targetPageSize) { - if (targetPageSize < MIN_TARGET_PAGE_SIZE) { - throw new IllegalArgumentException("Target page size should be >= " + MIN_TARGET_PAGE_SIZE); + if (targetPageSize < minTargetPageSize) { + throw new IllegalArgumentException("Target page size should be >= " + minTargetPageSize); } this.targetPageSize = targetPageSize; return this; @@ -998,6 +1018,15 @@ public Builder addAllIndexColumns(final Iterable> indexColumns) { return this; } + /** + * Adds a callback to be executed when on completing each parquet data file write (excluding the index and + * metadata files). + */ + public Builder setOnWriteCompleted(final OnWriteCompleted onWriteCompleted) { + this.onWriteCompleted = onWriteCompleted; + return this; + } + public ParquetInstructions build() { final KeyedObjectHashMap columnNameToInstructionsOut = columnNameToInstructions; columnNameToInstructions = null; @@ -1007,7 +1036,7 @@ public ParquetInstructions build() { return new ReadOnly(columnNameToInstructionsOut, parquetColumnNameToColumnNameOut, compressionCodecName, maximumDictionaryKeys, maximumDictionarySize, isLegacyParquet, targetPageSize, isRefreshing, specialInstructions, generateMetadataFiles, baseNameForPartitionedParquetData, fileLayout, - tableDefinition, indexColumns); + tableDefinition, indexColumns, onWriteCompleted); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java index ebb1d17571d..32f749fb825 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java @@ -3,6 +3,7 @@ // package io.deephaven.parquet.table; +import com.google.common.io.CountingOutputStream; import io.deephaven.api.SortColumn; import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.rowset.RowSet; @@ -108,10 +109,11 @@ static class IndexWritingInfo { * impacting both schema and written data, we store results in computedCache to avoid having to calculate * twice. An example is the necessary precision and scale for a BigDecimal column written as a decimal * logical type. + * @return The number of bytes written for the table (excluding indexes) * * @throws IOException For file writing related errors */ - static void write( + static long write( @NotNull final Table t, @NotNull final TableDefinition definition, @NotNull final ParquetInstructions writeInstructions, @@ -173,8 +175,8 @@ static void write( if (!sortedColumns.isEmpty()) { tableInfoBuilder.addSortingColumns(SortColumnInfo.of(sortedColumns.get(0))); } - write(t, definition, writeInstructions, dest, destOutputStream, incomingMeta, - tableInfoBuilder, metadataFileWriter, computedCache); + return write(t, definition, writeInstructions, dest, destOutputStream, incomingMeta, tableInfoBuilder, + metadataFileWriter, computedCache); } /** @@ -191,9 +193,11 @@ static void write( * @param metadataFileWriter The writer for the {@value ParquetUtils#METADATA_FILE_NAME} and * {@value ParquetUtils#COMMON_METADATA_FILE_NAME} files * @param computedCache Per column cache tags + * @return The number of bytes written + * * @throws IOException For file writing related errors */ - private static void write( + private static long write( @NotNull final Table table, @NotNull final TableDefinition definition, @NotNull final ParquetInstructions writeInstructions, @@ -207,13 +211,18 @@ private static void write( final Table t = pretransformTable(table, definition); final TrackingRowSet tableRowSet = t.getRowSet(); final Map> columnSourceMap = t.getColumnSourceMap(); - try (final ParquetFileWriter parquetFileWriter = getParquetFileWriter(computedCache, definition, - tableRowSet, columnSourceMap, dest, destOutputStream, writeInstructions, tableMeta, - tableInfoBuilder, metadataFileWriter)) { + final long numBytesWritten; + { + final ParquetFileWriter parquetFileWriter = getParquetFileWriter(computedCache, definition, + tableRowSet, columnSourceMap, dest, destOutputStream, writeInstructions, tableMeta, + tableInfoBuilder, metadataFileWriter); // Given the transformation, do not use the original table's "definition" for writing write(t, writeInstructions, parquetFileWriter, computedCache); + parquetFileWriter.close(); + numBytesWritten = parquetFileWriter.getCount(); } destOutputStream.done(); + return numBytesWritten; } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index 19dbb52aabf..70ce1217245 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -595,12 +595,15 @@ private static void writeTablesImpl( // Write the tables without any index info for (int tableIdx = 0; tableIdx < sources.length; tableIdx++) { final Table source = sources[tableIdx]; + final URI tableDestination = destinations[tableIdx]; final CompletableOutputStream outputStream = channelsProvider.getOutputStream( - destinations[tableIdx], PARQUET_OUTPUT_BUFFER_SIZE); + tableDestination, PARQUET_OUTPUT_BUFFER_SIZE); outputStreams.add(outputStream); - ParquetTableWriter.write(source, definition, writeInstructions, destinations[tableIdx], - outputStream, Collections.emptyMap(), (List) null, - metadataFileWriter, computedCache); + final long numBytes = ParquetTableWriter.write(source, definition, writeInstructions, + tableDestination, outputStream, Collections.emptyMap(), + (List) null, metadataFileWriter, computedCache); + writeInstructions.onWriteCompleted().ifPresent(callback -> callback.onWriteCompleted( + tableDestination, source.size(), numBytes)); } } else { // Shared parquet column names across all tables @@ -621,9 +624,12 @@ private static void writeTablesImpl( for (final ParquetTableWriter.IndexWritingInfo info : indexInfoList) { outputStreams.add(info.destOutputStream); } - final Table sourceTable = sources[tableIdx]; - ParquetTableWriter.write(sourceTable, definition, writeInstructions, destinations[tableIdx], - outputStream, Collections.emptyMap(), indexInfoList, metadataFileWriter, computedCache); + final Table source = sources[tableIdx]; + final long numBytes = ParquetTableWriter.write(source, definition, writeInstructions, + tableDestination, outputStream, Collections.emptyMap(), indexInfoList, + metadataFileWriter, computedCache); + writeInstructions.onWriteCompleted().ifPresent(callback -> callback.onWriteCompleted( + tableDestination, source.size(), numBytes)); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java index b95dfe98412..536e22e1786 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java @@ -13,6 +13,7 @@ import io.deephaven.util.codec.SerializableCodec; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; @@ -474,10 +475,15 @@ default Type createSchemaType( builder = getBuilder(isRequired(columnDefinition), false, dataType); isRepeating = false; } + if (!isRepeating) { + instructions.getFieldId(columnDefinition.getName()).ifPresent(builder::id); return builder.named(parquetColumnName); } - return Types.buildGroup(Type.Repetition.OPTIONAL).addField( + // For repeated fields (like lists), we need to wrap the field in a group + final Types.GroupBuilder groupBuilder = Types.buildGroup(Type.Repetition.OPTIONAL); + instructions.getFieldId(columnDefinition.getName()).ifPresent(groupBuilder::id); + return groupBuilder.addField( Types.buildGroup(Type.Repetition.REPEATED).addField( builder.named("item")).named(parquetColumnName)) .as(LogicalTypeAnnotation.listType()).named(parquetColumnName); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 00ef39474a1..09edbfcf3bb 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -2623,6 +2623,54 @@ public void testReadingParquetFilesWithDifferentPageSizes() { assertTableEquals(expected, fromDisk); } + private static class CompletedParquetWrite { + private final URI destination; + private final long numRows; + private final long numBytes; + + private CompletedParquetWrite(final URI destination, final long numRows, final long numBytes) { + this.destination = destination; + this.numRows = numRows; + this.numBytes = numBytes; + } + } + + @Test + public void testOnWriteCallback() { + // Write a few tables to disk and check the sizes and number of rows in the files + final Table table1 = TableTools.emptyTable(100_000).update( + "someIntColumn = i * 200", + "someLongColumn = ii * 500"); + final File dest1 = new File(rootFile, "table1.parquet"); + final Table table2 = TableTools.emptyTable(2000).update( + "someIntColumn = i", + "someLongColumn = ii"); + final File dest2 = new File(rootFile, "table2.parquet"); + + final List parquetFilesWritten = new ArrayList<>(); + final ParquetInstructions.OnWriteCompleted onWriteCompleted = + (destination, numRows, numBytes) -> parquetFilesWritten + .add(new CompletedParquetWrite(destination, numRows, numBytes)); + final ParquetInstructions writeInstructions = new ParquetInstructions.Builder() + .setOnWriteCompleted(onWriteCompleted) + .build(); + ParquetTools.writeTables(new Table[] {table1, table2}, + new String[] {dest1.getPath(), dest2.getPath()}, writeInstructions); + + assertEquals(2, parquetFilesWritten.size()); + // Check the destination URIs + assertEquals(dest1.toURI(), parquetFilesWritten.get(0).destination); + assertEquals(dest2.toURI(), parquetFilesWritten.get(1).destination); + + // Check the number of rows + assertEquals(100_000, parquetFilesWritten.get(0).numRows); + assertEquals(2000, parquetFilesWritten.get(1).numRows); + + // Check the size of the files + assertEquals(dest1.length(), parquetFilesWritten.get(0).numBytes); + assertEquals(dest2.length(), parquetFilesWritten.get(1).numBytes); + } + // Following is used for testing both writing APIs for parquet tables private interface TestParquetTableWriter { void writeTable(final Table table, final File destFile); diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py index 0492bc1fab1..9b488c6d51b 100644 --- a/py/server/deephaven/experimental/iceberg.py +++ b/py/server/deephaven/experimental/iceberg.py @@ -13,7 +13,7 @@ from deephaven.jcompat import j_hashmap -_JIcebergInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergInstructions") +_JIcebergReadInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergReadInstructions") _JIcebergCatalogAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalogAdapter") _JIcebergTools = jpy.get_type("io.deephaven.iceberg.util.IcebergTools") @@ -28,13 +28,13 @@ _JSnapshot = jpy.get_type("org.apache.iceberg.Snapshot") -class IcebergInstructions(JObjectWrapper): +class IcebergReadInstructions(JObjectWrapper): """ This class specifies the instructions for reading an Iceberg table into Deephaven. These include column rename instructions and table definitions, as well as special data instructions for loading data files from the cloud. """ - j_object_type = _JIcebergInstructions + j_object_type = _JIcebergReadInstructions def __init__(self, table_definition: Optional[TableDefinitionLike] = None, @@ -133,14 +133,14 @@ def snapshots(self, table_identifier: str) -> Table: return self.j_object.listSnapshotsAsTable(table_identifier) - def read_table(self, table_identifier: str, instructions: Optional[IcebergInstructions] = None, snapshot_id: Optional[int] = None) -> Table: + def read_table(self, table_identifier: str, instructions: Optional[IcebergReadInstructions] = None, snapshot_id: Optional[int] = None) -> Table: """ Reads the table from the catalog using the provided instructions. Optionally, a snapshot id can be provided to read a specific snapshot of the table. Args: table_identifier (str): the table to read. - instructions (Optional[IcebergInstructions]): the instructions for reading the table. These instructions + instructions (Optional[IcebergReadInstructions]): the instructions for reading the table. These instructions can include column renames, table definition, and specific data instructions for reading the data files from the provider. If omitted, the table will be read with default instructions. snapshot_id (Optional[int]): the snapshot id to read; if omitted the most recent snapshot will be selected. @@ -152,7 +152,7 @@ def read_table(self, table_identifier: str, instructions: Optional[IcebergInstru if instructions is not None: instructions_object = instructions.j_object else: - instructions_object = _JIcebergInstructions.DEFAULT + instructions_object = _JIcebergReadInstructions.DEFAULT if snapshot_id is not None: return Table(self.j_object.readTable(table_identifier, snapshot_id, instructions_object)) diff --git a/py/server/tests/test_iceberg.py b/py/server/tests/test_iceberg.py index 8934299b74d..13331dffa5b 100644 --- a/py/server/tests/test_iceberg.py +++ b/py/server/tests/test_iceberg.py @@ -23,14 +23,14 @@ def tearDown(self): super().tearDown() def test_instruction_create_empty(self): - iceberg_instructions = iceberg.IcebergInstructions() + iceberg_instructions = iceberg.IcebergReadInstructions() def test_instruction_create_with_s3_instructions(self): s3_instructions = s3.S3Instructions(region_name="us-east-1", access_key_id="some_access_key_id", secret_access_key="som_secret_access_key" ) - iceberg_instructions = iceberg.IcebergInstructions(data_instructions=s3_instructions) + iceberg_instructions = iceberg.IcebergReadInstructions(data_instructions=s3_instructions) def test_instruction_create_with_col_renames(self): renames = { @@ -38,7 +38,7 @@ def test_instruction_create_with_col_renames(self): "old_name_b": "new_name_b", "old_name_c": "new_name_c" } - iceberg_instructions = iceberg.IcebergInstructions(column_renames=renames) + iceberg_instructions = iceberg.IcebergReadInstructions(column_renames=renames) col_rename_dict = j_map_to_dict(iceberg_instructions.j_object.columnRenames()) self.assertTrue(col_rename_dict["old_name_a"] == "new_name_a") @@ -52,7 +52,7 @@ def test_instruction_create_with_table_definition_dict(self): "z": dtypes.double, } - iceberg_instructions = iceberg.IcebergInstructions(table_definition=table_def) + iceberg_instructions = iceberg.IcebergReadInstructions(table_definition=table_def) col_names = j_list_to_list(iceberg_instructions.j_object.tableDefinition().get().getColumnNames()) self.assertTrue(col_names[0] == "x") self.assertTrue(col_names[1] == "y") @@ -66,7 +66,7 @@ def test_instruction_create_with_table_definition_list(self): col_def("z", dtypes.double), ] - iceberg_instructions = iceberg.IcebergInstructions(table_definition=table_def) + iceberg_instructions = iceberg.IcebergReadInstructions(table_definition=table_def) col_names = j_list_to_list(iceberg_instructions.j_object.tableDefinition().get().getColumnNames()) self.assertTrue(col_names[0] == "Partition") self.assertTrue(col_names[1] == "x")