From a832bea8c5513c867cd50dc7cf98a272840b9ec3 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 12 Feb 2025 20:07:53 +0100 Subject: [PATCH] Added unit tests --- .../util/S3WarehouseSqliteCatalogBase.java | 120 +++++++++++++++++- .../deephaven/iceberg/base/IcebergUtils.java | 16 +++ .../iceberg/layout/IcebergBaseLayout.java | 17 +-- .../iceberg/junit5/SqliteCatalogBase.java | 5 +- 4 files changed, 138 insertions(+), 20 deletions(-) diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/S3WarehouseSqliteCatalogBase.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/S3WarehouseSqliteCatalogBase.java index 821a52a0bce..94408eb0b12 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/S3WarehouseSqliteCatalogBase.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/S3WarehouseSqliteCatalogBase.java @@ -3,22 +3,40 @@ // package io.deephaven.iceberg.util; +import io.deephaven.engine.table.Table; +import io.deephaven.engine.util.TableTools; import io.deephaven.extensions.s3.S3Instructions; +import io.deephaven.iceberg.base.IcebergUtils; import io.deephaven.iceberg.junit5.SqliteCatalogBase; +import io.deephaven.iceberg.sqlite.SqliteHelper; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; import org.apache.iceberg.aws.s3.S3FileIO; import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.Test; +import org.apache.iceberg.catalog.TableIdentifier; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; +import static io.deephaven.engine.util.TableTools.doubleCol; +import static io.deephaven.engine.util.TableTools.intCol; import static io.deephaven.extensions.s3.testlib.S3Helper.TIMEOUT_SECONDS; +import static io.deephaven.iceberg.base.IcebergUtils.dataFileUri; +import static io.deephaven.iceberg.base.IcebergUtils.locationUri; +import static org.assertj.core.api.Assertions.assertThat; abstract class S3WarehouseSqliteCatalogBase extends SqliteCatalogBase { @@ -32,7 +50,18 @@ public final Object dataInstructions() { } @Override - protected IcebergCatalogAdapter catalogAdapter(TestInfo testInfo, Path rootDir, Map properties) + protected IcebergCatalogAdapter catalogAdapter( + final TestInfo testInfo, + final Path rootDir, + final Map properties) + throws ExecutionException, InterruptedException, TimeoutException { + return catalogAdapterForScheme(testInfo, properties, "s3"); + } + + private IcebergCatalogAdapter catalogAdapterForScheme( + final TestInfo testInfo, + final Map properties, + final String scheme) throws ExecutionException, InterruptedException, TimeoutException { final String methodName = testInfo.getTestMethod().orElseThrow().getName(); final String catalogName = methodName + "-catalog"; @@ -41,8 +70,95 @@ protected IcebergCatalogAdapter catalogAdapter(TestInfo testInfo, Path rootDir, client.createBucket(CreateBucketRequest.builder().bucket(bucket).build()) .get(TIMEOUT_SECONDS, TimeUnit.SECONDS); } - properties.put(CatalogProperties.WAREHOUSE_LOCATION, "s3://" + bucket + "/warehouse"); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, scheme + "://" + bucket + "/warehouse"); properties.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName()); return IcebergToolsS3.createAdapter(catalogName, properties, Map.of(), s3Instructions()); } + + @Test + void testIcebergTablesWithS3AScheme(TestInfo testInfo, @TempDir Path rootDir) + throws ExecutionException, InterruptedException, TimeoutException { + testIcebergTablesWithCustomScheme("s3a", testInfo, rootDir); + } + + @Test + void testIcebergTablesWithS3NScheme(TestInfo testInfo, @TempDir Path rootDir) + throws ExecutionException, InterruptedException, TimeoutException { + testIcebergTablesWithCustomScheme("s3n", testInfo, rootDir); + } + + private void testIcebergTablesWithCustomScheme(final String scheme, TestInfo testInfo, @TempDir Path rootDir) + throws ExecutionException, InterruptedException, TimeoutException { + final Map properties = new HashMap<>(); + SqliteHelper.setJdbcCatalogProperties(properties, rootDir); + final IcebergCatalogAdapter catalogAdapter = catalogAdapterForScheme(testInfo, properties, scheme); + + final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); + + final Table data = TableTools.newTable( + intCol("intCol", 2, 4, 6, 8, 10), + doubleCol("doubleCol", 2.5, 5.0, 7.5, 10.0, 12.5)); + + // Create a new iceberg table + final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, data.getDefinition()); + final org.apache.iceberg.Table icebergTable = tableAdapter.icebergTable(); + + // Verify that the table location has the right scheme + assertThat(locationUri(icebergTable).getScheme()).isEqualTo(scheme); + + // Add data to the table + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(data.getDefinition()) + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(data, data) + .build()); + + // Verify all data files have the right scheme + final List dataFiles = IcebergUtils.allDataFiles(icebergTable, icebergTable.currentSnapshot()) + .collect(Collectors.toList()); + assertThat(dataFiles).hasSize(2); + assertThat(dataFiles).allMatch(dataFile -> dataFileUri(icebergTable, dataFile).getScheme().equals(scheme)); + + // Verify the data is correct + Table fromIceberg = tableAdapter.table(); + Table expected = TableTools.merge(data, data); + assertTableEquals(expected, fromIceberg); + + // Create a new data file but with s3 scheme + final DataFile existingDataFile = dataFiles.get(0); + final String existingDataFileLocation = existingDataFile.location(); + assertThat(existingDataFileLocation).startsWith(scheme); + final String newLocation = existingDataFileLocation.replace(scheme + "://", "s3://"); + final DataFile newDataFile = DataFiles.builder(icebergTable.spec()) + .withPath(newLocation) + .withFormat(existingDataFile.format()) + .withRecordCount(existingDataFile.recordCount()) + .withFileSizeInBytes(existingDataFile.fileSizeInBytes()) + .build(); + + // Append the new data files to the table + icebergTable.newAppend().appendFile(newDataFile).commit(); + + // Verify the new data file has the right scheme + final List newDataFiles = IcebergUtils.allDataFiles(icebergTable, icebergTable.currentSnapshot()) + .collect(Collectors.toList()); + int s3DataFiles = 0; + int nonS3DataFiles = 0; + for (final DataFile dataFile : newDataFiles) { + if (dataFileUri(icebergTable, dataFile).getScheme().equals(scheme)) { + nonS3DataFiles++; + } else { + assertThat(dataFileUri(icebergTable, dataFile).getScheme()).isEqualTo("s3"); + s3DataFiles++; + } + } + assertThat(s3DataFiles).isEqualTo(1); + assertThat(nonS3DataFiles).isEqualTo(2); + + // Verify the data is correct + fromIceberg = tableAdapter.table(); + expected = TableTools.merge(expected, data); + assertTableEquals(expected, fromIceberg); + } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java index 06ad3bbbfe2..fadd1a7062d 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java @@ -3,9 +3,11 @@ // package io.deephaven.iceberg.base; +import io.deephaven.base.FileUtils; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.iceberg.relative.RelativeFileIO; import io.deephaven.iceberg.util.IcebergReadInstructions; import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestContent; @@ -21,6 +23,7 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.jetbrains.annotations.NotNull; @@ -29,6 +32,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.math.BigDecimal; +import java.net.URI; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -127,6 +131,18 @@ public static Stream toStream(final org.apache.iceberg.io.CloseableIterab }); } + private static String path(String path, FileIO io) { + return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path; + } + + public static URI locationUri(Table table) { + return FileUtils.convertToURI(path(table.location(), table.io()), true); + } + + public static URI dataFileUri(Table table, DataFile dataFile) { + return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false); + } + /** * Convert an Iceberg data type to a Deephaven type. * diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java index b5a08b6756a..b1e6ce12c1f 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java @@ -3,14 +3,12 @@ // package io.deephaven.iceberg.layout; -import io.deephaven.base.FileUtils; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; import io.deephaven.iceberg.base.IcebergUtils; import io.deephaven.iceberg.location.IcebergTableLocationKey; import io.deephaven.iceberg.location.IcebergTableParquetLocationKey; -import io.deephaven.iceberg.relative.RelativeFileIO; import io.deephaven.iceberg.util.IcebergReadInstructions; import io.deephaven.iceberg.util.IcebergTableAdapter; import io.deephaven.parquet.table.ParquetInstructions; @@ -20,7 +18,6 @@ import org.apache.iceberg.*; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.io.FileIO; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -33,6 +30,8 @@ import java.util.stream.Stream; import static io.deephaven.iceberg.base.IcebergUtils.allManifestFiles; +import static io.deephaven.iceberg.base.IcebergUtils.dataFileUri; +import static io.deephaven.iceberg.base.IcebergUtils.locationUri; public abstract class IcebergBaseLayout implements TableLocationKeyFinder { /** @@ -192,18 +191,6 @@ abstract IcebergTableLocationKey keyFromDataFile( URI fileUri, SeekableChannelsProvider channelsProvider); - private static String path(String path, FileIO io) { - return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path; - } - - private static URI locationUri(Table table) { - return FileUtils.convertToURI(path(table.location(), table.io()), true); - } - - private static URI dataFileUri(Table table, DataFile dataFile) { - return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false); - } - @Override public synchronized void findKeys(@NotNull final Consumer locationKeyObserver) { if (snapshot == null) { diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java index 62c61d7aa5b..b1b1062ae05 100644 --- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java @@ -35,7 +35,6 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.types.Types; import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; @@ -88,7 +87,7 @@ void tearDown() throws Exception { engineCleanup.tearDown(); } - private TableParquetWriterOptions.Builder writerOptionsBuilder() { + protected TableParquetWriterOptions.Builder writerOptionsBuilder() { final TableParquetWriterOptions.Builder builder = TableParquetWriterOptions.builder(); final Object dataInstructions; if ((dataInstructions = dataInstructions()) != null) { @@ -633,7 +632,7 @@ void writeDataFilesBasicTest() { verifyDataFiles(tableIdentifier, List.of(source, anotherSource, moreData)); { - // Verify thaty we read the data files in the correct order + // Verify that we read the data files in the correct order final Table fromIceberg = tableAdapter.table(); assertTableEquals(TableTools.merge(moreData, source, anotherSource), fromIceberg); }