diff --git a/engine/table/src/main/java/io/deephaven/engine/util/TableTools.java b/engine/table/src/main/java/io/deephaven/engine/util/TableTools.java index 62a873325e2..924137c6947 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/TableTools.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/TableTools.java @@ -15,6 +15,7 @@ import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.InMemoryTable; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; +import io.deephaven.engine.table.impl.sources.NullValueColumnSource; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.time.DateTimeUtils; import io.deephaven.engine.table.impl.QueryTable; @@ -731,7 +732,7 @@ public static Table newTable(long size, Map> columns) { public static Table newTable(TableDefinition definition) { Map> columns = new LinkedHashMap<>(); for (ColumnDefinition columnDefinition : definition.getColumns()) { - columns.put(columnDefinition.getName(), ArrayBackedColumnSource.getMemoryColumnSource(0, + columns.put(columnDefinition.getName(), NullValueColumnSource.getInstance( columnDefinition.getDataType(), columnDefinition.getComponentType())); } return new QueryTable(definition, RowSetFactory.empty().toTracking(), columns) { diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java index da373ef3a37..44a17942cdf 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java @@ -9,36 +9,92 @@ import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.locations.TableDataException; -import io.deephaven.engine.util.TableTools; import io.deephaven.extensions.s3.S3Instructions; import io.deephaven.iceberg.TestCatalog.IcebergTestCatalog; import io.deephaven.iceberg.TestCatalog.IcebergTestFileIO; -import io.deephaven.time.DateTimeUtils; import org.apache.iceberg.Snapshot; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.model.*; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; import java.io.File; -import java.time.Instant; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import static io.deephaven.iceberg.util.IcebergCatalogAdapter.NAMESPACE_DEFINITION; +import static io.deephaven.iceberg.util.IcebergCatalogAdapter.SNAPSHOT_DEFINITION; +import static io.deephaven.iceberg.util.IcebergCatalogAdapter.TABLES_DEFINITION; + public abstract class IcebergToolsTest { + + private static final TableDefinition SALES_SINGLE_DEFINITION = TableDefinition.of( + ColumnDefinition.ofString("Region"), + ColumnDefinition.ofString("Item_Type"), + ColumnDefinition.ofInt("Units_Sold"), + ColumnDefinition.ofDouble("Unit_Price"), + ColumnDefinition.ofTime("Order_Date")); + + private static final TableDefinition SALES_RENAMED_DEFINITION = TableDefinition.of( + ColumnDefinition.ofString("Region_Name"), + ColumnDefinition.ofString("ItemType"), + ColumnDefinition.ofInt("UnitsSold"), + ColumnDefinition.ofDouble("Unit_Price"), + ColumnDefinition.ofTime("Order_Date")); + + private static final TableDefinition SALES_MULTI_DEFINITION = SALES_SINGLE_DEFINITION; + + private static final TableDefinition SALES_PARTITIONED_DEFINITION = TableDefinition.of( + ColumnDefinition.ofString("Region"), + ColumnDefinition.ofString("Item_Type"), + ColumnDefinition.ofInt("Units_Sold"), + ColumnDefinition.ofDouble("Unit_Price"), + ColumnDefinition.ofTime("Order_Date"), + ColumnDefinition.ofInt("year").withPartitioning(), + ColumnDefinition.ofInt("month").withPartitioning()); + + private static final TableDefinition ALL_TYPES_DEF = TableDefinition.of( + ColumnDefinition.ofBoolean("booleanField"), + ColumnDefinition.ofInt("integerField"), + ColumnDefinition.ofLong("longField"), + ColumnDefinition.ofFloat("floatField"), + ColumnDefinition.ofDouble("doubleField"), + ColumnDefinition.ofString("stringField"), + ColumnDefinition.fromGenericType("dateField", LocalDate.class), + ColumnDefinition.fromGenericType("timeField", LocalTime.class), + ColumnDefinition.fromGenericType("timestampField", LocalDateTime.class), + ColumnDefinition.fromGenericType("decimalField", BigDecimal.class), + ColumnDefinition.fromGenericType("fixedField", byte[].class), + ColumnDefinition.fromGenericType("binaryField", byte[].class), + ColumnDefinition.ofTime("instantField")); + + private static final TableDefinition META_DEF = TableDefinition.of( + ColumnDefinition.ofString("Name"), + ColumnDefinition.ofString("DataType"), + ColumnDefinition.ofString("ColumnType"), + ColumnDefinition.ofBoolean("IsPartitioning")); + IcebergInstructions instructions; public abstract S3AsyncClient s3AsyncClient(); @@ -94,6 +150,31 @@ private void uploadParquetFiles(final File root, final String prefixToRemove) } } + private void uploadSalesPartitioned() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), + warehousePath); + } + + private void uploadAllTypes() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sample/all_types").getPath()), + warehousePath); + } + + private void uploadSalesSingle() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_single").getPath()), + warehousePath); + } + + private void uploadSalesMulti() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_multi").getPath()), + warehousePath); + } + + private void uploadSalesRenamed() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_renamed").getPath()), + warehousePath); + } + @AfterEach public void tearDown() throws ExecutionException, InterruptedException { for (String key : keys) { @@ -118,9 +199,7 @@ public void testListNamespaces() { final Table table = adapter.listNamespacesAsTable(); Assert.eq(table.size(), "table.size()", 2, "2 namespace in the catalog"); - Assert.eqTrue(table.getColumnSource("Namespace").getType().equals(String.class), "namespace column type"); - Assert.eqTrue(table.getColumnSource("NamespaceObject").getType().equals(Namespace.class), - "namespace_object column type"); + Assert.equals(table.getDefinition(), "table.getDefinition()", NAMESPACE_DEFINITION); } @Test @@ -139,14 +218,12 @@ public void testListTables() { Table table = adapter.listTablesAsTable(ns); Assert.eq(table.size(), "table.size()", 4, "4 tables in the namespace"); - Assert.eqTrue(table.getColumnSource("Namespace").getType().equals(String.class), "namespace column type"); - Assert.eqTrue(table.getColumnSource("TableName").getType().equals(String.class), "table_name column type"); - Assert.eqTrue(table.getColumnSource("TableIdentifierObject").getType().equals(TableIdentifier.class), - "table_identifier_object column type"); + Assert.equals(table.getDefinition(), "table.getDefinition()", TABLES_DEFINITION); // Test the string versions of the methods table = adapter.listTablesAsTable("sales"); Assert.eq(table.size(), "table.size()", 4, "4 tables in the namespace"); + Assert.equals(table.getDefinition(), "table.getDefinition()", TABLES_DEFINITION); } @Test @@ -167,22 +244,17 @@ public void testListSnapshots() { Table table = adapter.listSnapshotsAsTable(tableIdentifier); Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi"); - Assert.eqTrue(table.getColumnSource("Id").getType().equals(long.class), "id column type"); - Assert.eqTrue(table.getColumnSource("Timestamp").getType().equals(Instant.class), "timestamp column type"); - Assert.eqTrue(table.getColumnSource("Operation").getType().equals(String.class), "operation column type"); - Assert.eqTrue(table.getColumnSource("Summary").getType().equals(Map.class), "summary column type"); - Assert.eqTrue(table.getColumnSource("SnapshotObject").getType().equals(Snapshot.class), - "snapshot_object column type"); + Assert.equals(table.getDefinition(), "table.getDefinition()", SNAPSHOT_DEFINITION); // Test the string versions of the methods table = adapter.listSnapshotsAsTable("sales.sales_multi"); Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi"); + Assert.equals(table.getDefinition(), "table.getDefinition()", SNAPSHOT_DEFINITION); } @Test public void testOpenTableA() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), - warehousePath); + uploadSalesPartitioned(); final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); @@ -193,18 +265,19 @@ public void testOpenTableA() throws ExecutionException, InterruptedException, Ti // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table.getDefinition(), "table.getDefinition()", SALES_PARTITIONED_DEFINITION); // Test the string versions of the methods table = adapter.readTable("sales.sales_partitioned", instructions); // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table.getDefinition(), "table.getDefinition()", SALES_PARTITIONED_DEFINITION); } @Test public void testOpenTableB() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_multi").getPath()), - warehousePath); + uploadSalesMulti(); final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); @@ -214,18 +287,19 @@ public void testOpenTableB() throws ExecutionException, InterruptedException, Ti io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table.getDefinition(), "table.getDefinition()", SALES_MULTI_DEFINITION); // Test the string versions of the methods table = adapter.readTable("sales.sales_multi", instructions); // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table.getDefinition(), "table.getDefinition()", SALES_MULTI_DEFINITION); } @Test public void testOpenTableC() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_single").getPath()), - warehousePath); + uploadSalesSingle(); final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); @@ -236,18 +310,19 @@ public void testOpenTableC() throws ExecutionException, InterruptedException, Ti // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table.getDefinition(), "table.getDefinition()", SALES_SINGLE_DEFINITION); // Test the string versions of the methods table = adapter.readTable("sales.sales_single", instructions); // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table.getDefinition(), "table.getDefinition()", SALES_SINGLE_DEFINITION); } @Test public void testOpenTableS3Only() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), - warehousePath); + uploadSalesPartitioned(); final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); @@ -257,24 +332,15 @@ public void testOpenTableS3Only() throws ExecutionException, InterruptedExceptio // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table.getDefinition(), "table.getDefinition()", SALES_PARTITIONED_DEFINITION); } @Test public void testOpenTableDefinition() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), - warehousePath); - - final TableDefinition tableDef = TableDefinition.of( - ColumnDefinition.ofInt("year").withPartitioning(), - ColumnDefinition.ofInt("month").withPartitioning(), - ColumnDefinition.ofString("Region"), - ColumnDefinition.ofString("Item_Type"), - ColumnDefinition.ofInt("Units_Sold"), - ColumnDefinition.ofDouble("Unit_Price"), - ColumnDefinition.fromGenericType("Order_Date", Instant.class)); + uploadSalesPartitioned(); final IcebergInstructions localInstructions = IcebergInstructions.builder() - .tableDefinition(tableDef) + .tableDefinition(SALES_PARTITIONED_DEFINITION) .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -287,6 +353,7 @@ public void testOpenTableDefinition() throws ExecutionException, InterruptedExce // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table.getDefinition(), "table.getDefinition()", SALES_PARTITIONED_DEFINITION); } @Test @@ -298,7 +365,7 @@ public void testOpenTablePartitionTypeException() { ColumnDefinition.ofString("Item_Type"), ColumnDefinition.ofDouble("Units_Sold"), ColumnDefinition.ofLong("Unit_Price"), - ColumnDefinition.fromGenericType("Order_Date", Instant.class)); + ColumnDefinition.ofTime("Order_Date")); final IcebergInstructions localInstructions = IcebergInstructions.builder() .tableDefinition(tableDef) @@ -310,31 +377,35 @@ public void testOpenTablePartitionTypeException() { final Namespace ns = Namespace.of("sales"); final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); - try { - final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); - TableTools.showWithRowSet(table, 100, DateTimeUtils.timeZone(), System.out); - Assert.statementNeverExecuted("Expected an exception for missing columns"); - } catch (final TableDefinition.IncompatibleTableDefinitionException e) { - Assert.eqTrue(e.getMessage().startsWith("Table definition incompatibilities"), "Exception message"); + + for (Runnable runnable : Arrays.asList( + () -> adapter.readTable(tableId, localInstructions), + () -> adapter.getTableDefinition(tableId, localInstructions), + () -> adapter.getTableDefinitionTable(tableId, localInstructions))) { + try { + runnable.run(); + Assert.statementNeverExecuted("Expected an exception for missing columns"); + } catch (final TableDefinition.IncompatibleTableDefinitionException e) { + Assert.eqTrue(e.getMessage().startsWith("Table definition incompatibilities"), "Exception message"); + } } } @Test public void testOpenTableDefinitionRename() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), - warehousePath); + uploadSalesPartitioned(); - final TableDefinition tableDef = TableDefinition.of( + final TableDefinition renamed = TableDefinition.of( ColumnDefinition.ofInt("__year").withPartitioning(), ColumnDefinition.ofInt("__month").withPartitioning(), ColumnDefinition.ofString("RegionName"), ColumnDefinition.ofString("ItemType"), ColumnDefinition.ofInt("UnitsSold"), ColumnDefinition.ofDouble("UnitPrice"), - ColumnDefinition.fromGenericType("OrderDate", Instant.class)); + ColumnDefinition.ofTime("OrderDate")); final IcebergInstructions localInstructions = IcebergInstructions.builder() - .tableDefinition(tableDef) + .tableDefinition(renamed) .dataInstructions(instructions.dataInstructions().get()) .putColumnRenames("Region", "RegionName") .putColumnRenames("Item_Type", "ItemType") @@ -354,12 +425,12 @@ public void testOpenTableDefinitionRename() throws ExecutionException, Interrupt // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table.getDefinition(), "table.getDefinition()", renamed); } @Test public void testSkippedPartitioningColumn() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), - warehousePath); + uploadSalesPartitioned(); final TableDefinition tableDef = TableDefinition.of( ColumnDefinition.ofInt("year").withPartitioning(), @@ -368,7 +439,7 @@ public void testSkippedPartitioningColumn() throws ExecutionException, Interrupt ColumnDefinition.ofString("Item_Type"), ColumnDefinition.ofInt("Units_Sold"), ColumnDefinition.ofDouble("Unit_Price"), - ColumnDefinition.fromGenericType("Order_Date", Instant.class)); + ColumnDefinition.ofTime("Order_Date")); final IcebergInstructions localInstructions = IcebergInstructions.builder() .tableDefinition(tableDef) @@ -384,12 +455,12 @@ public void testSkippedPartitioningColumn() throws ExecutionException, Interrupt // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table.getDefinition(), "table.getDefinition()", tableDef); } @Test public void testReorderedPartitioningColumn() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), - warehousePath); + uploadSalesPartitioned(); final TableDefinition tableDef = TableDefinition.of( ColumnDefinition.ofInt("month").withPartitioning(), @@ -398,7 +469,7 @@ public void testReorderedPartitioningColumn() throws ExecutionException, Interru ColumnDefinition.ofString("Item_Type"), ColumnDefinition.ofInt("Units_Sold"), ColumnDefinition.ofDouble("Unit_Price"), - ColumnDefinition.fromGenericType("Order_Date", Instant.class)); + ColumnDefinition.ofTime("Order_Date")); final IcebergInstructions localInstructions = IcebergInstructions.builder() .tableDefinition(tableDef) @@ -414,22 +485,15 @@ public void testReorderedPartitioningColumn() throws ExecutionException, Interru // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table.getDefinition(), "table.getDefinition()", tableDef); } @Test public void testZeroPartitioningColumns() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), - warehousePath); - - final TableDefinition tableDef = TableDefinition.of( - ColumnDefinition.ofString("Region"), - ColumnDefinition.ofString("Item_Type"), - ColumnDefinition.ofInt("Units_Sold"), - ColumnDefinition.ofDouble("Unit_Price"), - ColumnDefinition.fromGenericType("Order_Date", Instant.class)); + uploadSalesPartitioned(); final IcebergInstructions localInstructions = IcebergInstructions.builder() - .tableDefinition(tableDef) + .tableDefinition(SALES_MULTI_DEFINITION) .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -442,6 +506,7 @@ public void testZeroPartitioningColumns() throws ExecutionException, Interrupted // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table.getDefinition(), "table.getDefinition()", SALES_MULTI_DEFINITION); } @Test @@ -453,7 +518,7 @@ public void testIncorrectPartitioningColumns() throws ExecutionException, Interr ColumnDefinition.ofString("Item_Type"), ColumnDefinition.ofInt("Units_Sold"), ColumnDefinition.ofDouble("Unit_Price"), - ColumnDefinition.fromGenericType("Order_Date", Instant.class)); + ColumnDefinition.ofTime("Order_Date")); final IcebergInstructions localInstructions = IcebergInstructions.builder() .tableDefinition(tableDef) @@ -466,11 +531,17 @@ public void testIncorrectPartitioningColumns() throws ExecutionException, Interr final Namespace ns = Namespace.of("sales"); final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); - try { - final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); - Assert.statementNeverExecuted("Expected an exception for missing columns"); - } catch (final TableDataException e) { - Assert.eqTrue(e.getMessage().startsWith("The following columns are not partitioned"), "Exception message"); + for (Runnable runnable : Arrays.asList( + () -> adapter.readTable(tableId, localInstructions), + () -> adapter.getTableDefinition(tableId, localInstructions), + () -> adapter.getTableDefinitionTable(tableId, localInstructions))) { + try { + runnable.run(); + Assert.statementNeverExecuted("Expected an exception for missing columns"); + } catch (final TableDataException e) { + Assert.eqTrue(e.getMessage().startsWith("The following columns are not partitioned"), + "Exception message"); + } } } @@ -483,7 +554,7 @@ public void testMissingPartitioningColumns() { ColumnDefinition.ofString("Item_Type"), ColumnDefinition.ofDouble("Units_Sold"), ColumnDefinition.ofLong("Unit_Price"), - ColumnDefinition.fromGenericType("Order_Date", Instant.class)); + ColumnDefinition.ofTime("Order_Date")); final IcebergInstructions localInstructions = IcebergInstructions.builder() .tableDefinition(tableDef) @@ -495,18 +566,23 @@ public void testMissingPartitioningColumns() { final Namespace ns = Namespace.of("sales"); final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); - try { - final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); - Assert.statementNeverExecuted("Expected an exception for missing columns"); - } catch (final TableDefinition.IncompatibleTableDefinitionException e) { - Assert.eqTrue(e.getMessage().startsWith("Table definition incompatibilities"), "Exception message"); + + for (Runnable runnable : Arrays.asList( + () -> adapter.readTable(tableId, localInstructions), + () -> adapter.getTableDefinition(tableId, localInstructions), + () -> adapter.getTableDefinitionTable(tableId, localInstructions))) { + try { + runnable.run(); + Assert.statementNeverExecuted("Expected an exception for missing columns"); + } catch (final TableDefinition.IncompatibleTableDefinitionException e) { + Assert.eqTrue(e.getMessage().startsWith("Table definition incompatibilities"), "Exception message"); + } } } @Test public void testOpenTableColumnRename() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), - warehousePath); + uploadSalesPartitioned(); final IcebergInstructions localInstructions = IcebergInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) @@ -527,8 +603,7 @@ public void testOpenTableColumnRename() throws ExecutionException, InterruptedEx @Test public void testOpenTableColumnLegalization() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_renamed").getPath()), - warehousePath); + uploadSalesRenamed(); final IcebergInstructions localInstructions = IcebergInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) @@ -543,19 +618,13 @@ public void testOpenTableColumnLegalization() throws ExecutionException, Interru // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); - - Assert.eqTrue(table.getDefinition().getColumn("Region_Name") != null, "'Region Name' renamed"); - Assert.eqTrue(table.getDefinition().getColumn("ItemType") != null, "'Item&Type' renamed"); - Assert.eqTrue(table.getDefinition().getColumn("UnitsSold") != null, "'Units/Sold' renamed"); - Assert.eqTrue(table.getDefinition().getColumn("Unit_Price") != null, "'Unit Pricee' renamed"); - Assert.eqTrue(table.getDefinition().getColumn("Order_Date") != null, "'Order Date' renamed"); + Assert.equals(table.getDefinition(), "table.getDefinition()", SALES_RENAMED_DEFINITION); } @Test public void testOpenTableColumnLegalizationRename() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_renamed").getPath()), - warehousePath); + uploadSalesRenamed(); final IcebergInstructions localInstructions = IcebergInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) @@ -570,19 +639,20 @@ public void testOpenTableColumnLegalizationRename() final TableIdentifier tableId = TableIdentifier.of(ns, "sales_renamed"); final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); + final TableDefinition expected = TableDefinition.of( + ColumnDefinition.ofString("Region_Name"), + ColumnDefinition.ofString("Item_Type"), + ColumnDefinition.ofInt("Units_Sold"), + ColumnDefinition.ofDouble("Unit_Price"), + ColumnDefinition.ofTime("Order_Date")); + // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); - - Assert.eqTrue(table.getDefinition().getColumn("Region_Name") != null, "'Region Name' renamed"); - Assert.eqTrue(table.getDefinition().getColumn("Item_Type") != null, "'Item&Type' renamed"); - Assert.eqTrue(table.getDefinition().getColumn("Units_Sold") != null, "'Units/Sold' renamed"); - Assert.eqTrue(table.getDefinition().getColumn("Unit_Price") != null, "'Unit Pricee' renamed"); - Assert.eqTrue(table.getDefinition().getColumn("Order_Date") != null, "'Order Date' renamed"); + Assert.equals(table.getDefinition(), "table.getDefinition()", expected); } @Test - public void testOpenTableColumnLegalizationPartitionException() - throws ExecutionException, InterruptedException, TimeoutException { + public void testOpenTableColumnLegalizationPartitionException() { final TableDefinition tableDef = TableDefinition.of( ColumnDefinition.ofInt("Year").withPartitioning(), ColumnDefinition.ofInt("Month").withPartitioning()); @@ -599,19 +669,24 @@ public void testOpenTableColumnLegalizationPartitionException() final Namespace ns = Namespace.of("sales"); final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); - try { - final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); - Assert.statementNeverExecuted("Expected an exception for missing columns"); - } catch (final TableDataException e) { - Assert.eqTrue(e.getMessage().contains("invalid column name provided"), "Exception message"); + + for (Runnable runnable : Arrays.asList( + () -> adapter.readTable(tableId, localInstructions), + () -> adapter.getTableDefinition(tableId, localInstructions), + () -> adapter.getTableDefinitionTable(tableId, localInstructions))) { + try { + runnable.run(); + Assert.statementNeverExecuted("Expected an exception for missing columns"); + } catch (final TableDataException e) { + Assert.eqTrue(e.getMessage().contains("invalid column name provided"), "Exception message"); + } } } @Test public void testOpenTableColumnRenamePartitioningColumns() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), - warehousePath); + uploadSalesPartitioned(); final IcebergInstructions localInstructions = IcebergInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) @@ -627,14 +702,23 @@ public void testOpenTableColumnRenamePartitioningColumns() final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); + final TableDefinition expected = TableDefinition.of( + ColumnDefinition.ofString("Region"), + ColumnDefinition.ofString("Item_Type"), + ColumnDefinition.ofInt("Units_Sold"), + ColumnDefinition.ofDouble("Unit_Price"), + ColumnDefinition.ofTime("Order_Date"), + ColumnDefinition.ofInt("__year").withPartitioning(), + ColumnDefinition.ofInt("__month").withPartitioning()); + // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table.getDefinition(), "table.getDefinition()", expected); } @Test public void testOpenTableSnapshot() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_multi").getPath()), - warehousePath); + uploadSalesMulti(); final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); @@ -646,24 +730,27 @@ public void testOpenTableSnapshot() throws ExecutionException, InterruptedExcept final io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0).snapshotId(), instructions); Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table"); + Assert.equals(table0.getDefinition(), "table0.getDefinition()", SALES_MULTI_DEFINITION); final io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1).snapshotId(), instructions); Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table"); + Assert.equals(table1.getDefinition(), "table1.getDefinition()", SALES_MULTI_DEFINITION); final io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2).snapshotId(), instructions); Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table"); + Assert.equals(table2.getDefinition(), "table2.getDefinition()", SALES_MULTI_DEFINITION); final io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3).snapshotId(), instructions); Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table3.getDefinition(), "table3.getDefinition()", SALES_MULTI_DEFINITION); } @Test public void testOpenTableSnapshotByID() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_multi").getPath()), - warehousePath); + uploadSalesMulti(); final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); @@ -674,45 +761,151 @@ public void testOpenTableSnapshotByID() throws ExecutionException, InterruptedEx // Verify we retrieved all the rows. io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions); Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table"); + Assert.equals(table0.getDefinition(), "table0.getDefinition()", SALES_MULTI_DEFINITION); io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions); Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table"); + Assert.equals(table1.getDefinition(), "table1.getDefinition()", SALES_MULTI_DEFINITION); io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions); Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table"); + Assert.equals(table2.getDefinition(), "table2.getDefinition()", SALES_MULTI_DEFINITION); io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions); Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table3.getDefinition(), "table3.getDefinition()", SALES_MULTI_DEFINITION); // Test the string versions of the methods // Verify we retrieved all the rows. table0 = adapter.readTable("sales.sales_multi", snapshots.get(0).snapshotId(), instructions); Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table"); + Assert.equals(table0.getDefinition(), "table0.getDefinition()", SALES_MULTI_DEFINITION); table1 = adapter.readTable(tableId, snapshots.get(1).snapshotId(), instructions); Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table"); + Assert.equals(table1.getDefinition(), "table1.getDefinition()", SALES_MULTI_DEFINITION); table2 = adapter.readTable(tableId, snapshots.get(2).snapshotId(), instructions); Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table"); + Assert.equals(table2.getDefinition(), "table2.getDefinition()", SALES_MULTI_DEFINITION); table3 = adapter.readTable(tableId, snapshots.get(3).snapshotId(), instructions); Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table"); + Assert.equals(table3.getDefinition(), "table0.getDefinition()", SALES_MULTI_DEFINITION); } @Test public void testOpenAllTypesTable() throws ExecutionException, InterruptedException, TimeoutException { - uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sample/all_types").getPath()), - warehousePath); + uploadAllTypes(); final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); final Namespace ns = Namespace.of("sample"); final TableIdentifier tableId = TableIdentifier.of(ns, "all_types"); - final List snapshots = adapter.listSnapshots(tableId); // Verify we retrieved all the rows. final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); Assert.eq(table.size(), "table.size()", 10, "10 rows in the table"); + Assert.equals(table.getDefinition(), "table.getDefinition()", ALL_TYPES_DEF); + } + + @Test + public void testTableDefinition() { + final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_multi"); + final List snapshots = adapter.listSnapshots(tableId); + + // Use string and current snapshot + TableDefinition tableDef = adapter.getTableDefinition("sales.sales_multi", null); + Assert.equals(tableDef, "tableDef", SALES_MULTI_DEFINITION); + + // Use TableIdentifier and Snapshot + tableDef = adapter.getTableDefinition(tableId, null); + Assert.equals(tableDef, "tableDef", SALES_MULTI_DEFINITION); + + // Use string and long snapshot ID + tableDef = adapter.getTableDefinition("sales.sales_multi", snapshots.get(0).snapshotId(), null); + Assert.equals(tableDef, "tableDef", SALES_MULTI_DEFINITION); + + // Use TableIdentifier and Snapshot + tableDef = adapter.getTableDefinition(tableId, snapshots.get(0), null); + Assert.equals(tableDef, "tableDef", SALES_MULTI_DEFINITION); + } + + @Test + public void testTableDefinitionTable() { + final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_multi"); + final List snapshots = adapter.listSnapshots(tableId); + + // Use string and current snapshot + Table tableDefTable = adapter.getTableDefinitionTable("sales.sales_multi", null); + + Assert.eq(tableDefTable.size(), "tableDefTable.size()", 5, "5 rows in the table"); + Assert.equals(tableDefTable.getDefinition(), "tableDefTable.getDefinition()", META_DEF); + + // Use TableIdentifier and Snapshot + tableDefTable = adapter.getTableDefinitionTable(tableId, null); + + Assert.eq(tableDefTable.size(), "tableDefTable.size()", 5, "5 rows in the table"); + Assert.equals(tableDefTable.getDefinition(), "tableDefTable.getDefinition()", META_DEF); + + // Use string and long snapshot ID + tableDefTable = adapter.getTableDefinitionTable("sales.sales_multi", snapshots.get(0).snapshotId(), null); + + Assert.eq(tableDefTable.size(), "tableDefTable.size()", 5, "5 rows in the table"); + Assert.equals(tableDefTable.getDefinition(), "tableDefTable.getDefinition()", META_DEF); + + // Use TableIdentifier and Snapshot + tableDefTable = adapter.getTableDefinitionTable(tableId, snapshots.get(0), null); + + Assert.eq(tableDefTable.size(), "tableDefTable.size()", 5, "5 rows in the table"); + Assert.equals(tableDefTable.getDefinition(), "tableDefTable.getDefinition()", META_DEF); + } + + @Test + public void testTableDefinitionWithInstructions() { + final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + IcebergInstructions localInstructions = IcebergInstructions.builder() + .dataInstructions(instructions.dataInstructions().get()) + .putColumnRenames("Region", "Area") + .putColumnRenames("Item_Type", "ItemType") + .putColumnRenames("Units_Sold", "UnitsSold") + .putColumnRenames("Unit_Price", "UnitPrice") + .putColumnRenames("Order_Date", "OrderDate") + .build(); + + final TableDefinition renamed = TableDefinition.of( + ColumnDefinition.ofString("Area"), + ColumnDefinition.ofString("ItemType"), + ColumnDefinition.ofInt("UnitsSold"), + ColumnDefinition.ofDouble("UnitPrice"), + ColumnDefinition.ofTime("OrderDate")); + + // Use string and current snapshot + TableDefinition tableDef = adapter.getTableDefinition("sales.sales_multi", localInstructions); + Assert.equals(tableDef, "tableDef", renamed); + + ///////////////////////////////////////////////////// + + final TableDefinition userTableDef = TableDefinition.of( + ColumnDefinition.ofString("Region"), + ColumnDefinition.ofString("Item_Type"), + ColumnDefinition.ofTime("Order_Date")); + + localInstructions = IcebergInstructions.builder() + .dataInstructions(instructions.dataInstructions().get()) + .tableDefinition(userTableDef) + .build(); + + // Use string and current snapshot + tableDef = adapter.getTableDefinition("sales.sales_multi", localInstructions); + Assert.equals(tableDef, "tableDef", userTableDef); } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java index c3ad3cd4970..486bcf18655 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java @@ -17,11 +17,13 @@ import io.deephaven.engine.table.impl.sources.InMemoryColumnSource; import io.deephaven.engine.table.impl.sources.regioned.RegionedTableComponentFactoryImpl; import io.deephaven.engine.updategraph.UpdateSourceRegistrar; +import io.deephaven.engine.util.TableTools; import io.deephaven.iceberg.layout.IcebergFlatLayout; import io.deephaven.iceberg.layout.IcebergKeyValuePartitionedLayout; import io.deephaven.iceberg.location.IcebergTableLocationFactory; import io.deephaven.iceberg.location.IcebergTableLocationKey; import io.deephaven.time.DateTimeUtils; +import io.deephaven.util.annotations.VisibleForTesting; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -42,6 +44,26 @@ import java.util.stream.Collectors; public class IcebergCatalogAdapter { + + @VisibleForTesting + static final TableDefinition NAMESPACE_DEFINITION = TableDefinition.of( + ColumnDefinition.ofString("Namespace"), + ColumnDefinition.fromGenericType("NamespaceObject", Namespace.class)); + + @VisibleForTesting + static final TableDefinition TABLES_DEFINITION = TableDefinition.of( + ColumnDefinition.ofString("Namespace"), + ColumnDefinition.ofString("TableName"), + ColumnDefinition.fromGenericType("TableIdentifierObject", TableIdentifier.class)); + + @VisibleForTesting + static final TableDefinition SNAPSHOT_DEFINITION = TableDefinition.of( + ColumnDefinition.ofLong("Id"), + ColumnDefinition.ofTime("Timestamp"), + ColumnDefinition.ofString("Operation"), + ColumnDefinition.fromGenericType("Summary", Map.class), + ColumnDefinition.fromGenericType("SnapshotObject", Snapshot.class)); + private final Catalog catalog; private final FileIO fileIO; @@ -61,18 +83,18 @@ public class IcebergCatalogAdapter { * * @param schema The schema of the table. * @param partitionSpec The partition specification of the table. - * @param tableDefinition The table definition. + * @param userTableDef The table definition. * @param columnRename The map for renaming columns. * @return The generated TableDefinition. */ private static TableDefinition fromSchema( @NotNull final Schema schema, @NotNull final PartitionSpec partitionSpec, - @Nullable final TableDefinition tableDefinition, + @Nullable final TableDefinition userTableDef, @NotNull final Map columnRename) { - final Set columnNames = tableDefinition != null - ? tableDefinition.getColumnNameSet() + final Set columnNames = userTableDef != null + ? userTableDef.getColumnNameSet() : null; final Set partitionNames = @@ -100,7 +122,31 @@ private static TableDefinition fromSchema( columns.add(column); } - return TableDefinition.of(columns); + final TableDefinition icebergTableDef = TableDefinition.of(columns); + if (userTableDef == null) { + return icebergTableDef; + } + + // If the user supplied a table definition, make sure it's fully compatible. + final TableDefinition tableDef = icebergTableDef.checkCompatibility(userTableDef); + + // Ensure that the user has not marked non-partitioned columns as partitioned. + final Set userPartitionColumns = userTableDef.getPartitioningColumns().stream() + .map(ColumnDefinition::getName) + .collect(Collectors.toSet()); + final Set partitionColumns = tableDef.getPartitioningColumns().stream() + .map(ColumnDefinition::getName) + .collect(Collectors.toSet()); + + // The working partitioning column set must be a super-set of the user-supplied set. + if (!partitionColumns.containsAll(userPartitionColumns)) { + final Set invalidColumns = new HashSet<>(userPartitionColumns); + invalidColumns.removeAll(partitionColumns); + + throw new TableDataException("The following columns are not partitioned in the Iceberg table: " + + invalidColumns); + } + return tableDef; } /** @@ -214,7 +260,7 @@ public Table listNamespacesAsTable(@NotNull final Namespace namespace) { } // Create and return the table - return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap); + return new QueryTable(NAMESPACE_DEFINITION, RowSetFactory.flat(size).toTracking(), columnSourceMap); } /** @@ -273,7 +319,7 @@ public Table listTablesAsTable(@NotNull final Namespace namespace) { } // Create and return the table - return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap); + return new QueryTable(TABLES_DEFINITION, RowSetFactory.flat(size).toTracking(), columnSourceMap); } public Table listTablesAsTable(@NotNull final String... namespace) { @@ -338,7 +384,7 @@ public Table listSnapshotsAsTable(@NotNull final TableIdentifier tableIdentifier } // Create and return the table - return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap); + return new QueryTable(SNAPSHOT_DEFINITION, RowSetFactory.flat(size).toTracking(), columnSourceMap); } /** @@ -352,6 +398,228 @@ public Table listSnapshotsAsTable(@NotNull final String tableIdentifier) { return listSnapshotsAsTable(TableIdentifier.parse(tableIdentifier)); } + /** + * Get a specific {@link Snapshot snapshot} of a given Iceberg table (or null if it does not exist). + * + * @param tableIdentifier The identifier of the table from which to gather snapshots + * @param snapshotId The id of the snapshot to retrieve + * @return The snapshot with the given id, or null if it does not exist + */ + private Snapshot getSnapshot(@NotNull final TableIdentifier tableIdentifier, final long snapshotId) { + return listSnapshots(tableIdentifier).stream() + .filter(snapshot -> snapshot.snapshotId() == snapshotId) + .findFirst() + .orElse(null); + } + + /** + * Get a legalized column rename map from a table schema and user instructions. + */ + private Map getRenameColumnMap( + @NotNull final org.apache.iceberg.Table table, + @NotNull final Schema schema, + @NotNull final IcebergInstructions instructions) { + + final Set takenNames = new HashSet<>(); + + // Map all the column names in the schema to their legalized names. + final Map legalizedColumnRenames = new HashMap<>(); + + // Validate user-supplied names meet legalization instructions + for (final Map.Entry entry : instructions.columnRenames().entrySet()) { + final String destinationName = entry.getValue(); + if (!NameValidator.isValidColumnName(destinationName)) { + throw new TableDataException( + String.format("%s - invalid column name provided (%s)", table, destinationName)); + } + // Add these renames to the legalized list. + legalizedColumnRenames.put(entry.getKey(), destinationName); + takenNames.add(destinationName); + } + + for (final Types.NestedField field : schema.columns()) { + final String name = field.name(); + // Do we already have a valid rename for this column from the user or a partitioned column? + if (!legalizedColumnRenames.containsKey(name)) { + final String legalizedName = + NameValidator.legalizeColumnName(name, s -> s.replace(" ", "_"), takenNames); + if (!legalizedName.equals(name)) { + legalizedColumnRenames.put(name, legalizedName); + takenNames.add(legalizedName); + } + } + } + + return legalizedColumnRenames; + } + + /** + * Return {@link TableDefinition table definition} for a given Iceberg table, with optional instructions for + * customizations while reading. + * + * @param tableIdentifier The identifier of the table to load + * @param instructions The instructions for customizations while reading + * @return The table definition + */ + public TableDefinition getTableDefinition( + @NotNull final String tableIdentifier, + @Nullable final IcebergInstructions instructions) { + final TableIdentifier tableId = TableIdentifier.parse(tableIdentifier); + // Load the table from the catalog. + return getTableDefinition(tableId, instructions); + } + + /** + * Return {@link TableDefinition table definition} for a given Iceberg table, with optional instructions for + * customizations while reading. + * + * @param tableIdentifier The identifier of the table to load + * @param instructions The instructions for customizations while reading + * @return The table definition + */ + public TableDefinition getTableDefinition( + @NotNull final TableIdentifier tableIdentifier, + @Nullable final IcebergInstructions instructions) { + // Load the table from the catalog. + return getTableDefinitionInternal(tableIdentifier, null, instructions); + } + + /** + * Return {@link TableDefinition table definition} for a given Iceberg table and snapshot id, with optional + * instructions for customizations while reading. + * + * @param tableIdentifier The identifier of the table to load + * @param snapshotId The identifier of the snapshot to load + * @param instructions The instructions for customizations while reading + * @return The table definition + */ + public TableDefinition getTableDefinition( + @NotNull final String tableIdentifier, + final long snapshotId, + @Nullable final IcebergInstructions instructions) { + final TableIdentifier tableId = TableIdentifier.parse(tableIdentifier); + + // Find the snapshot with the given snapshot id + final Snapshot tableSnapshot = getSnapshot(tableId, snapshotId); + if (tableSnapshot == null) { + throw new IllegalArgumentException("Snapshot with id " + snapshotId + " not found"); + } + + // Load the table from the catalog. + return getTableDefinition(tableId, tableSnapshot, instructions); + } + + /** + * Return {@link TableDefinition table definition} for a given Iceberg table and snapshot id, with optional + * instructions for customizations while reading. + * + * @param tableIdentifier The identifier of the table to load + * @param tableSnapshot The snapshot to load + * @param instructions The instructions for customizations while reading + * @return The table definition + */ + public TableDefinition getTableDefinition( + @NotNull final TableIdentifier tableIdentifier, + @Nullable final Snapshot tableSnapshot, + @Nullable final IcebergInstructions instructions) { + // Load the table from the catalog. + return getTableDefinitionInternal(tableIdentifier, tableSnapshot, instructions); + } + + /** + * Return {@link Table table} containing the {@link TableDefinition definition} of a given Iceberg table, with + * optional instructions for customizations while reading. + * + * @param tableIdentifier The identifier of the table to load + * @param instructions The instructions for customizations while reading + * @return The table definition as a Deephaven table + */ + public Table getTableDefinitionTable( + @NotNull final String tableIdentifier, + @Nullable final IcebergInstructions instructions) { + final TableIdentifier tableId = TableIdentifier.parse(tableIdentifier); + return getTableDefinitionTable(tableId, instructions); + } + + /** + * Return {@link Table table} containing the {@link TableDefinition definition} of a given Iceberg table, with + * optional instructions for customizations while reading. + * + * @param tableIdentifier The identifier of the table to load + * @param instructions The instructions for customizations while reading + * @return The table definition as a Deephaven table + */ + public Table getTableDefinitionTable( + @NotNull final TableIdentifier tableIdentifier, + @Nullable final IcebergInstructions instructions) { + final TableDefinition definition = getTableDefinition(tableIdentifier, instructions); + return TableTools.metaTable(definition); + } + + /** + * Return {@link Table table} containing the {@link TableDefinition definition} of a given Iceberg table and + * snapshot id, with optional instructions for customizations while reading. + * + * @param tableIdentifier The identifier of the table to load + * @param snapshotId The identifier of the snapshot to load + * @param instructions The instructions for customizations while reading + * @return The table definition as a Deephaven table + */ + public Table getTableDefinitionTable( + @NotNull final String tableIdentifier, + final long snapshotId, + @Nullable final IcebergInstructions instructions) { + final TableIdentifier tableId = TableIdentifier.parse(tableIdentifier); + + // Find the snapshot with the given snapshot id + final Snapshot tableSnapshot = getSnapshot(tableId, snapshotId); + if (tableSnapshot == null) { + throw new IllegalArgumentException("Snapshot with id " + snapshotId + " not found"); + } + + return getTableDefinitionTable(tableId, tableSnapshot, instructions); + } + + /** + * Return {@link Table table} containing the {@link TableDefinition definition} of a given Iceberg table and + * snapshot id, with optional instructions for customizations while reading. + * + * @param tableIdentifier The identifier of the table to load + * @param tableSnapshot The snapshot to load + * @param instructions The instructions for customizations while reading + * @return The table definition as a Deephaven table + */ + public Table getTableDefinitionTable( + @NotNull final TableIdentifier tableIdentifier, + @Nullable final Snapshot tableSnapshot, + @Nullable final IcebergInstructions instructions) { + final TableDefinition definition = getTableDefinition(tableIdentifier, tableSnapshot, instructions); + return TableTools.metaTable(definition); + } + + /** + * Internal method to create a {@link TableDefinition} from the table schema, snapshot and user instructions. + */ + private TableDefinition getTableDefinitionInternal( + @NotNull final TableIdentifier tableIdentifier, + @Nullable final Snapshot tableSnapshot, + @Nullable final IcebergInstructions instructions) { + final org.apache.iceberg.Table table = catalog.loadTable(tableIdentifier); + if (table == null) { + throw new IllegalArgumentException("Table not found: " + tableIdentifier); + } + + final Snapshot snapshot = tableSnapshot != null ? tableSnapshot : table.currentSnapshot(); + final Schema schema = snapshot != null ? table.schemas().get(snapshot.schemaId()) : table.schema(); + + final IcebergInstructions userInstructions = instructions == null ? IcebergInstructions.DEFAULT : instructions; + + return fromSchema(schema, + table.spec(), + userInstructions.tableDefinition().orElse(null), + getRenameColumnMap(table, schema, userInstructions)); + } + /** * Read the latest static snapshot of an Iceberg table from the Iceberg catalog. * @@ -393,13 +661,11 @@ public Table readTable( @NotNull final TableIdentifier tableIdentifier, final long tableSnapshotId, @Nullable final IcebergInstructions instructions) { - // Find the snapshot with the given snapshot id - final Snapshot tableSnapshot = listSnapshots(tableIdentifier).stream() - .filter(snapshot -> snapshot.snapshotId() == tableSnapshotId) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException("Snapshot with id " + tableSnapshotId + " not found")); - + final Snapshot tableSnapshot = getSnapshot(tableIdentifier, tableSnapshotId); + if (tableSnapshot == null) { + throw new IllegalArgumentException("Snapshot with id " + tableSnapshotId + " not found"); + } return readTableInternal(tableIdentifier, tableSnapshot, instructions); } @@ -439,9 +705,11 @@ private Table readTableInternal( @NotNull final TableIdentifier tableIdentifier, @Nullable final Snapshot tableSnapshot, @Nullable final IcebergInstructions instructions) { - // Load the table from the catalog. final org.apache.iceberg.Table table = catalog.loadTable(tableIdentifier); + if (table == null) { + throw new IllegalArgumentException("Table not found: " + tableIdentifier); + } // Do we want the latest or a specific snapshot? final Snapshot snapshot = tableSnapshot != null ? tableSnapshot : table.currentSnapshot(); @@ -456,66 +724,12 @@ private Table readTableInternal( // Get the user supplied table definition. final TableDefinition userTableDef = userInstructions.tableDefinition().orElse(null); - final Set takenNames = new HashSet<>(); - // Map all the column names in the schema to their legalized names. - final Map legalizedColumnRenames = new HashMap<>(); - - // Validate user-supplied names meet legalization requirements - for (final Map.Entry entry : userInstructions.columnRenames().entrySet()) { - final String destinationName = entry.getValue(); - if (!NameValidator.isValidColumnName(destinationName)) { - throw new TableDataException( - String.format("%s:%d - invalid column name provided (%s)", table, snapshot.snapshotId(), - destinationName)); - } - // Add these renames to the legalized list. - legalizedColumnRenames.put(entry.getKey(), destinationName); - takenNames.add(destinationName); - } - - for (final Types.NestedField field : schema.columns()) { - final String name = field.name(); - // Do we already have a valid rename for this column from the user or a partitioned column? - if (!legalizedColumnRenames.containsKey(name)) { - final String legalizedName = - NameValidator.legalizeColumnName(name, s -> s.replace(" ", "_"), takenNames); - if (!legalizedName.equals(name)) { - legalizedColumnRenames.put(name, legalizedName); - takenNames.add(legalizedName); - } - } - } + final Map legalizedColumnRenames = getRenameColumnMap(table, schema, userInstructions); // Get the table definition from the schema (potentially limited by the user supplied table definition and // applying column renames). - final TableDefinition icebergTableDef = fromSchema(schema, partitionSpec, userTableDef, legalizedColumnRenames); - - // If the user supplied a table definition, make sure it's fully compatible. - final TableDefinition tableDef; - if (userTableDef != null) { - tableDef = icebergTableDef.checkCompatibility(userTableDef); - - // Ensure that the user has not marked non-partitioned columns as partitioned. - final Set userPartitionColumns = userTableDef.getPartitioningColumns().stream() - .map(ColumnDefinition::getName) - .collect(Collectors.toSet()); - final Set partitionColumns = tableDef.getPartitioningColumns().stream() - .map(ColumnDefinition::getName) - .collect(Collectors.toSet()); - - // The working partitioning column set must be a super-set of the user-supplied set. - if (!partitionColumns.containsAll(userPartitionColumns)) { - final Set invalidColumns = new HashSet<>(userPartitionColumns); - invalidColumns.removeAll(partitionColumns); - - throw new TableDataException("The following columns are not partitioned in the Iceberg table: " + - invalidColumns); - } - } else { - // Use the snapshot schema as the table definition. - tableDef = icebergTableDef; - } + final TableDefinition tableDef = fromSchema(schema, partitionSpec, userTableDef, legalizedColumnRenames); final String description; final TableLocationKeyFinder keyFinder;