Skip to content

Commit

Permalink
Added unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Feb 12, 2025
1 parent bff92cb commit a832bea
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,40 @@
//
package io.deephaven.iceberg.util;

import io.deephaven.engine.table.Table;
import io.deephaven.engine.util.TableTools;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.iceberg.base.IcebergUtils;
import io.deephaven.iceberg.junit5.SqliteCatalogBase;
import io.deephaven.iceberg.sqlite.SqliteHelper;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.Test;
import org.apache.iceberg.catalog.TableIdentifier;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
import static io.deephaven.engine.util.TableTools.doubleCol;
import static io.deephaven.engine.util.TableTools.intCol;
import static io.deephaven.extensions.s3.testlib.S3Helper.TIMEOUT_SECONDS;
import static io.deephaven.iceberg.base.IcebergUtils.dataFileUri;
import static io.deephaven.iceberg.base.IcebergUtils.locationUri;
import static org.assertj.core.api.Assertions.assertThat;

abstract class S3WarehouseSqliteCatalogBase extends SqliteCatalogBase {

Expand All @@ -32,7 +50,18 @@ public final Object dataInstructions() {
}

@Override
protected IcebergCatalogAdapter catalogAdapter(TestInfo testInfo, Path rootDir, Map<String, String> properties)
protected IcebergCatalogAdapter catalogAdapter(
final TestInfo testInfo,
final Path rootDir,
final Map<String, String> properties)
throws ExecutionException, InterruptedException, TimeoutException {
return catalogAdapterForScheme(testInfo, properties, "s3");
}

private IcebergCatalogAdapter catalogAdapterForScheme(
final TestInfo testInfo,
final Map<String, String> properties,
final String scheme)
throws ExecutionException, InterruptedException, TimeoutException {
final String methodName = testInfo.getTestMethod().orElseThrow().getName();
final String catalogName = methodName + "-catalog";
Expand All @@ -41,8 +70,95 @@ protected IcebergCatalogAdapter catalogAdapter(TestInfo testInfo, Path rootDir,
client.createBucket(CreateBucketRequest.builder().bucket(bucket).build())
.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "s3://" + bucket + "/warehouse");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, scheme + "://" + bucket + "/warehouse");
properties.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName());
return IcebergToolsS3.createAdapter(catalogName, properties, Map.of(), s3Instructions());
}

@Test
void testIcebergTablesWithS3AScheme(TestInfo testInfo, @TempDir Path rootDir)
throws ExecutionException, InterruptedException, TimeoutException {
testIcebergTablesWithCustomScheme("s3a", testInfo, rootDir);
}

@Test
void testIcebergTablesWithS3NScheme(TestInfo testInfo, @TempDir Path rootDir)
throws ExecutionException, InterruptedException, TimeoutException {
testIcebergTablesWithCustomScheme("s3n", testInfo, rootDir);
}

private void testIcebergTablesWithCustomScheme(final String scheme, TestInfo testInfo, @TempDir Path rootDir)
throws ExecutionException, InterruptedException, TimeoutException {
final Map<String, String> properties = new HashMap<>();
SqliteHelper.setJdbcCatalogProperties(properties, rootDir);
final IcebergCatalogAdapter catalogAdapter = catalogAdapterForScheme(testInfo, properties, scheme);

final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable");

final Table data = TableTools.newTable(
intCol("intCol", 2, 4, 6, 8, 10),
doubleCol("doubleCol", 2.5, 5.0, 7.5, 10.0, 12.5));

// Create a new iceberg table
final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, data.getDefinition());
final org.apache.iceberg.Table icebergTable = tableAdapter.icebergTable();

// Verify that the table location has the right scheme
assertThat(locationUri(icebergTable).getScheme()).isEqualTo(scheme);

// Add data to the table
final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder()
.tableDefinition(data.getDefinition())
.build());
tableWriter.append(IcebergWriteInstructions.builder()
.addTables(data, data)
.build());

// Verify all data files have the right scheme
final List<DataFile> dataFiles = IcebergUtils.allDataFiles(icebergTable, icebergTable.currentSnapshot())
.collect(Collectors.toList());
assertThat(dataFiles).hasSize(2);
assertThat(dataFiles).allMatch(dataFile -> dataFileUri(icebergTable, dataFile).getScheme().equals(scheme));

// Verify the data is correct
Table fromIceberg = tableAdapter.table();
Table expected = TableTools.merge(data, data);
assertTableEquals(expected, fromIceberg);

// Create a new data file but with s3 scheme
final DataFile existingDataFile = dataFiles.get(0);
final String existingDataFileLocation = existingDataFile.location();
assertThat(existingDataFileLocation).startsWith(scheme);
final String newLocation = existingDataFileLocation.replace(scheme + "://", "s3://");
final DataFile newDataFile = DataFiles.builder(icebergTable.spec())
.withPath(newLocation)
.withFormat(existingDataFile.format())
.withRecordCount(existingDataFile.recordCount())
.withFileSizeInBytes(existingDataFile.fileSizeInBytes())
.build();

// Append the new data files to the table
icebergTable.newAppend().appendFile(newDataFile).commit();

// Verify the new data file has the right scheme
final List<DataFile> newDataFiles = IcebergUtils.allDataFiles(icebergTable, icebergTable.currentSnapshot())
.collect(Collectors.toList());
int s3DataFiles = 0;
int nonS3DataFiles = 0;
for (final DataFile dataFile : newDataFiles) {
if (dataFileUri(icebergTable, dataFile).getScheme().equals(scheme)) {
nonS3DataFiles++;
} else {
assertThat(dataFileUri(icebergTable, dataFile).getScheme()).isEqualTo("s3");
s3DataFiles++;
}
}
assertThat(s3DataFiles).isEqualTo(1);
assertThat(nonS3DataFiles).isEqualTo(2);

// Verify the data is correct
fromIceberg = tableAdapter.table();
expected = TableTools.merge(expected, data);
assertTableEquals(expected, fromIceberg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
//
package io.deephaven.iceberg.base;

import io.deephaven.base.FileUtils;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.iceberg.relative.RelativeFileIO;
import io.deephaven.iceberg.util.IcebergReadInstructions;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestContent;
Expand All @@ -21,6 +23,7 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.jetbrains.annotations.NotNull;
Expand All @@ -29,6 +32,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.net.URI;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -127,6 +131,18 @@ public static <T> Stream<T> toStream(final org.apache.iceberg.io.CloseableIterab
});
}

private static String path(String path, FileIO io) {
return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path;
}

public static URI locationUri(Table table) {
return FileUtils.convertToURI(path(table.location(), table.io()), true);
}

public static URI dataFileUri(Table table, DataFile dataFile) {
return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false);
}

/**
* Convert an Iceberg data type to a Deephaven type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
//
package io.deephaven.iceberg.layout;

import io.deephaven.base.FileUtils;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import io.deephaven.iceberg.base.IcebergUtils;
import io.deephaven.iceberg.location.IcebergTableLocationKey;
import io.deephaven.iceberg.location.IcebergTableParquetLocationKey;
import io.deephaven.iceberg.relative.RelativeFileIO;
import io.deephaven.iceberg.util.IcebergReadInstructions;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.parquet.table.ParquetInstructions;
Expand All @@ -20,7 +18,6 @@
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -33,6 +30,8 @@
import java.util.stream.Stream;

import static io.deephaven.iceberg.base.IcebergUtils.allManifestFiles;
import static io.deephaven.iceberg.base.IcebergUtils.dataFileUri;
import static io.deephaven.iceberg.base.IcebergUtils.locationUri;

public abstract class IcebergBaseLayout implements TableLocationKeyFinder<IcebergTableLocationKey> {
/**
Expand Down Expand Up @@ -192,18 +191,6 @@ abstract IcebergTableLocationKey keyFromDataFile(
URI fileUri,
SeekableChannelsProvider channelsProvider);

private static String path(String path, FileIO io) {
return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path;
}

private static URI locationUri(Table table) {
return FileUtils.convertToURI(path(table.location(), table.io()), true);
}

private static URI dataFileUri(Table table, DataFile dataFile) {
return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false);
}

@Override
public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKey> locationKeyObserver) {
if (snapshot == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -88,7 +87,7 @@ void tearDown() throws Exception {
engineCleanup.tearDown();
}

private TableParquetWriterOptions.Builder writerOptionsBuilder() {
protected TableParquetWriterOptions.Builder writerOptionsBuilder() {
final TableParquetWriterOptions.Builder builder = TableParquetWriterOptions.builder();
final Object dataInstructions;
if ((dataInstructions = dataInstructions()) != null) {
Expand Down Expand Up @@ -633,7 +632,7 @@ void writeDataFilesBasicTest() {
verifyDataFiles(tableIdentifier, List.of(source, anotherSource, moreData));

{
// Verify thaty we read the data files in the correct order
// Verify that we read the data files in the correct order
final Table fromIceberg = tableAdapter.table();
assertTableEquals(TableTools.merge(moreData, source, anotherSource), fromIceberg);
}
Expand Down

0 comments on commit a832bea

Please sign in to comment.