Skip to content

Commit

Permalink
fix: For iceberg test failures with unclosed ResolvingFileIO instance (
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Oct 18, 2024
1 parent 22c20cf commit 8f02178
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.deephaven.iceberg.TestCatalog.IcebergTestCatalog;
import io.deephaven.test.types.OutOfBandTest;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.After;
Expand Down Expand Up @@ -109,7 +108,7 @@ public abstract class IcebergToolsTest {
private final List<String> keys = new ArrayList<>();

private String warehousePath;
private Catalog resourceCatalog;
private IcebergTestCatalog resourceCatalog;

@Rule
public final EngineCleanup framework = new EngineCleanup();
Expand All @@ -134,6 +133,7 @@ public void setUp() throws ExecutionException, InterruptedException {

@After
public void tearDown() throws ExecutionException, InterruptedException {
resourceCatalog.close();
for (String key : keys) {
asyncClient.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build()).get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,33 @@
//
package io.deephaven.iceberg.TestCatalog;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.io.ResolvingFileIO;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.util.*;

public class IcebergTestCatalog implements Catalog, SupportsNamespaces {
public class IcebergTestCatalog implements Catalog, SupportsNamespaces, AutoCloseable {
private final Map<Namespace, Map<TableIdentifier, Table>> namespaceTableMap;
private final Map<TableIdentifier, Table> tableMap;

private final ResolvingFileIO fileIO;

private IcebergTestCatalog(final String path, @NotNull final Map<String, String> properties) {
namespaceTableMap = new HashMap<>();
tableMap = new HashMap<>();
final Configuration hadoopConf = new Configuration();
fileIO = new ResolvingFileIO();
fileIO.setConf(hadoopConf);
fileIO.initialize(properties);

// Assume first level is namespace.
final File root = new File(path);
Expand All @@ -33,7 +41,7 @@ private IcebergTestCatalog(final String path, @NotNull final Map<String, String>
if (tableFile.isDirectory()) {
// Second level is table name.
final TableIdentifier tableId = TableIdentifier.of(namespace, tableFile.getName());
final Table table = IcebergTestTable.loadFromMetadata(tableFile.getAbsolutePath(), properties);
final Table table = IcebergTestTable.loadFromMetadata(tableFile.getAbsolutePath(), fileIO);

// Add it to the maps.
namespaceTableMap.get(namespace).put(tableId, table);
Expand Down Expand Up @@ -103,4 +111,9 @@ public boolean setProperties(Namespace namespace, Map<String, String> map) throw
public boolean removeProperties(Namespace namespace, Set<String> set) throws NoSuchNamespaceException {
return false;
}

@Override
public void close() {
fileIO.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
//
package io.deephaven.iceberg.TestCatalog;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.ResolvingFileIO;
import org.jetbrains.annotations.NotNull;

import java.io.File;
Expand All @@ -19,12 +17,12 @@

public class IcebergTestTable implements Table {
private final TableMetadata metadata;
private final Map<String, String> properties;
private final Configuration hadoopConf;
private final FileIO fileIO;

private IcebergTestTable(@NotNull final String path, @NotNull final Map<String, String> properties) {
this.properties = properties;
hadoopConf = new Configuration();
private IcebergTestTable(
@NotNull final String path,
@NotNull final FileIO fileIO) {
this.fileIO = fileIO;

final File metadataRoot = new File(path, "metadata");

Expand All @@ -50,8 +48,8 @@ private IcebergTestTable(@NotNull final String path, @NotNull final Map<String,

public static IcebergTestTable loadFromMetadata(
@NotNull final String path,
@NotNull final Map<String, String> properties) {
return new IcebergTestTable(path, properties);
@NotNull final FileIO fileIO) {
return new IcebergTestTable(path, fileIO);
}

@Override
Expand Down Expand Up @@ -220,10 +218,7 @@ public Transaction newTransaction() {

@Override
public FileIO io() {
final ResolvingFileIO io = new ResolvingFileIO();
io.setConf(hadoopConf);
io.initialize(properties);
return io;
return fileIO;
}

@Override
Expand Down

0 comments on commit 8f02178

Please sign in to comment.