Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@
import org.apache.polaris.core.persistence.dao.entity.EntityResult;
import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
import org.apache.polaris.core.persistence.transactional.TransactionalPersistence;
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.apache.polaris.core.secrets.UserSecretsManagerFactory;
Expand Down Expand Up @@ -221,50 +220,12 @@ public Map<String, String> getConfigOverrides() {
private PolarisAdminService adminService;
private PolarisEntityManager entityManager;
private FileIOFactory fileIOFactory;
private InMemoryFileIO fileIO;
private PolarisEntity catalogEntity;
private SecurityContext securityContext;
private TestPolarisEventListener testPolarisEventListener;
private ReservedProperties reservedProperties;

/**
* A subclass of IcebergCatalog that adds FileIO management capabilities. This allows the file IO
* logic to be encapsulated in a dedicated class.
*/
public static class IcebergFileIOCatalog extends IcebergCatalog {

public IcebergFileIOCatalog(
PolarisEntityManager entityManager,
PolarisMetaStoreManager metaStoreManager,
CallContext callContext,
PolarisResolutionManifestCatalogView resolvedEntityView,
SecurityContext securityContext,
TaskExecutor taskExecutor,
FileIOFactory fileIOFactory,
PolarisEventListener polarisEventListener) {
super(
entityManager,
metaStoreManager,
callContext,
resolvedEntityView,
securityContext,
taskExecutor,
fileIOFactory,
polarisEventListener);
}

@Override
public synchronized FileIO getIo() {
if (catalogFileIO == null) {
catalogFileIO = loadFileIO(ioImplClassName, tableDefaultProperties);
if (closeableGroup != null) {
closeableGroup.addCloseable(catalogFileIO);
}
}

return catalogFileIO;
}
}

@BeforeAll
public static void setUpMocks() {
PolarisStorageIntegrationProviderImpl mock =
Expand Down Expand Up @@ -410,7 +371,7 @@ protected IcebergCatalog initCatalog(
callContext, entityManager, securityContext, CATALOG_NAME);
TaskExecutor taskExecutor = Mockito.mock();
IcebergCatalog icebergCatalog =
new IcebergFileIOCatalog(
new IcebergCatalog(
entityManager,
metaStoreManager,
callContext,
Expand All @@ -419,6 +380,8 @@ protected IcebergCatalog initCatalog(
taskExecutor,
fileIOFactory,
polarisEventListener);
fileIO = new InMemoryFileIO();
icebergCatalog.setCatalogFileIo(fileIO);
ImmutableMap.Builder<String, String> propertiesBuilder =
ImmutableMap.<String, String>builder()
.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")
Expand Down Expand Up @@ -630,7 +593,6 @@ public void testValidateNotificationWhenTableAndNamespacesDontExist() {

// Now also check that despite creating the metadata file, the validation call still doesn't
// create any namespaces or tables.
InMemoryFileIO fileIO = getInMemoryIo(catalog);
fileIO.addFile(
tableMetadataLocation,
TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8));
Expand Down Expand Up @@ -766,8 +728,6 @@ public void testUpdateNotificationWhenTableAndNamespacesDontExist() {
update.setTimestamp(230950845L);
request.setPayload(update);

InMemoryFileIO fileIO = getInMemoryIo(catalog);

fileIO.addFile(
tableMetadataLocation,
TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8));
Expand Down Expand Up @@ -810,8 +770,6 @@ public void testUpdateNotificationCreateTableInDisallowedLocation() {
update.setTimestamp(230950845L);
request.setPayload(update);

InMemoryFileIO fileIO = getInMemoryIo(catalog);

fileIO.addFile(
tableMetadataLocation,
TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8));
Expand Down Expand Up @@ -859,7 +817,7 @@ public void testCreateNotificationCreateTableInExternalLocation() {
.addPartitionSpec(PartitionSpec.unpartitioned())
.addSortOrder(SortOrder.unsorted())
.build();
TableMetadataParser.write(tableMetadata, catalog.getIo().newOutputFile(tableMetadataLocation));
TableMetadataParser.write(tableMetadata, fileIO.newOutputFile(tableMetadataLocation));

Namespace namespace = Namespace.of("parent", "child1");
TableIdentifier table = TableIdentifier.of(namespace, "my_table");
Expand Down Expand Up @@ -919,7 +877,7 @@ public void testCreateNotificationCreateTableOutsideOfMetadataLocation() {
.addPartitionSpec(PartitionSpec.unpartitioned())
.addSortOrder(SortOrder.unsorted())
.build();
TableMetadataParser.write(tableMetadata, catalog.getIo().newOutputFile(tableMetadataLocation));
TableMetadataParser.write(tableMetadata, fileIO.newOutputFile(tableMetadataLocation));

Namespace namespace = Namespace.of("parent", "child1");
TableIdentifier table = TableIdentifier.of(namespace, "my_table");
Expand Down Expand Up @@ -968,7 +926,6 @@ public void testUpdateNotificationCreateTableInExternalLocation() {
FeatureConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION.catalogConfig(), "true")
.build());
IcebergCatalog catalog = catalog();
InMemoryFileIO fileIO = getInMemoryIo(catalog);

fileIO.addFile(
tableMetadataLocation,
Expand Down Expand Up @@ -999,7 +956,7 @@ public void testUpdateNotificationCreateTableInExternalLocation() {
.addPartitionSpec(PartitionSpec.unpartitioned())
.addSortOrder(SortOrder.unsorted())
.build();
TableMetadataParser.write(tableMetadata, catalog.getIo().newOutputFile(maliciousMetadataFile));
TableMetadataParser.write(tableMetadata, fileIO.newOutputFile(maliciousMetadataFile));

NotificationRequest updateRequest = new NotificationRequest();
updateRequest.setNotificationType(NotificationType.UPDATE);
Expand Down Expand Up @@ -1042,7 +999,7 @@ public void testUpdateNotificationCreateTableWithLocalFilePrefix() {
callContext, entityManager, securityContext, catalogWithoutStorage);
TaskExecutor taskExecutor = Mockito.mock();
IcebergCatalog catalog =
new IcebergFileIOCatalog(
new IcebergCatalog(
entityManager,
metaStoreManager,
callContext,
Expand All @@ -1068,8 +1025,6 @@ public void testUpdateNotificationCreateTableWithLocalFilePrefix() {
update.setTimestamp(230950845L);
request.setPayload(update);

InMemoryFileIO fileIO = getInMemoryIo(catalog);

fileIO.addFile(
metadataLocation,
TableMetadataParser.toJson(createSampleTableMetadata(metadataLocation)).getBytes(UTF_8));
Expand Down Expand Up @@ -1108,8 +1063,9 @@ public void testUpdateNotificationCreateTableWithHttpPrefix() {
new PolarisPassthroughResolutionView(
callContext, entityManager, securityContext, catalogName);
TaskExecutor taskExecutor = Mockito.mock();
InMemoryFileIO localFileIO = new InMemoryFileIO();
IcebergCatalog catalog =
new IcebergFileIOCatalog(
new IcebergCatalog(
entityManager,
metaStoreManager,
callContext,
Expand All @@ -1126,8 +1082,6 @@ public void testUpdateNotificationCreateTableWithHttpPrefix() {
Namespace namespace = Namespace.of("parent", "child1");
TableIdentifier table = TableIdentifier.of(namespace, "table");

InMemoryFileIO fileIO = getInMemoryIo(catalog);

// The location of the metadata JSON file specified in the create will be forbidden.
final String metadataLocation = "http://maliciousdomain.com/metadata.json";
NotificationRequest request = new NotificationRequest();
Expand Down Expand Up @@ -1204,8 +1158,6 @@ public void testUpdateNotificationWhenNamespacesExist() {
update.setTimestamp(230950845L);
request.setPayload(update);

InMemoryFileIO fileIO = getInMemoryIo(catalog);

fileIO.addFile(
tableMetadataLocation,
TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8));
Expand Down Expand Up @@ -1256,8 +1208,6 @@ public void testUpdateNotificationWhenTableExists() {
update.setTimestamp(230950845L);
request.setPayload(update);

InMemoryFileIO fileIO = getInMemoryIo(catalog);

fileIO.addFile(
tableMetadataLocation,
TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8));
Expand Down Expand Up @@ -1309,8 +1259,6 @@ public void testUpdateNotificationWhenTableExistsInDisallowedLocation() {
update.setTimestamp(230950845L);
request.setPayload(update);

InMemoryFileIO fileIO = getInMemoryIo(catalog);

fileIO.addFile(
tableMetadataLocation,
TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8));
Expand Down Expand Up @@ -1347,8 +1295,6 @@ public void testUpdateNotificationRejectOutOfOrderTimestamp() {
update.setTimestamp(timestamp);
request.setPayload(update);

InMemoryFileIO fileIO = getInMemoryIo(catalog);

fileIO.addFile(
tableMetadataLocation,
TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8));
Expand Down Expand Up @@ -1420,8 +1366,6 @@ public void testUpdateNotificationWhenTableExistsFileSpecifiesDisallowedLocation
update.setTimestamp(230950845L);
request.setPayload(update);

InMemoryFileIO fileIO = getInMemoryIo(catalog);

// Though the metadata JSON file itself is in an allowed location, make it internally specify
// a forbidden table location.
TableMetadata forbiddenMetadata =
Expand Down Expand Up @@ -1498,8 +1442,6 @@ public void testDropNotificationWhenNamespacesExist() {
update.setTimestamp(230950845L);
request.setPayload(update);

InMemoryFileIO fileIO = getInMemoryIo(catalog);

fileIO.addFile(
tableMetadataLocation,
TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8));
Expand Down Expand Up @@ -1550,8 +1492,6 @@ public void testDropNotificationWhenTableExists() {
update.setTimestamp(230950845L);
request.setPayload(update);

InMemoryFileIO fileIO = getInMemoryIo(catalog);

fileIO.addFile(
tableMetadataLocation,
TableMetadataParser.toJson(createSampleTableMetadata(tableLocation)).getBytes(UTF_8));
Expand Down Expand Up @@ -2069,8 +2009,4 @@ public void testEventsAreEmitted() {
Assertions.assertThat(afterTableEvent.base().properties().get(key)).isEqualTo(valOld);
Assertions.assertThat(afterTableEvent.metadata().properties().get(key)).isEqualTo(valNew);
}

private static InMemoryFileIO getInMemoryIo(IcebergCatalog catalog) {
return (InMemoryFileIO) ((ExceptionMappingFileIO) catalog.getIo()).getInnerIo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog
private final SecurityContext securityContext;
private final PolarisEventListener polarisEventListener;

protected String ioImplClassName;
protected FileIO catalogFileIO;
protected CloseableGroup closeableGroup;
protected Map<String, String> tableDefaultProperties;
private String ioImplClassName;
private FileIO catalogFileIO;
private CloseableGroup closeableGroup;
private Map<String, String> tableDefaultProperties;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could remove this empty line.

private final String catalogName;
private long catalogId = -1;
Expand Down Expand Up @@ -218,8 +218,8 @@ public String name() {
}

@VisibleForTesting
public FileIO getIo() {
return catalogFileIO;
public void setCatalogFileIo(FileIO fileIO) {
catalogFileIO = fileIO;
}

@Override
Expand Down Expand Up @@ -339,7 +339,8 @@ public ViewBuilder buildView(TableIdentifier identifier) {
@VisibleForTesting
public TableOperations newTableOps(
TableIdentifier tableIdentifier, boolean makeMetadataCurrentOnCommit) {
return new BasePolarisTableOperations(getIo(), tableIdentifier, makeMetadataCurrentOnCommit);
return new BasePolarisTableOperations(
catalogFileIO, tableIdentifier, makeMetadataCurrentOnCommit);
}

@Override
Expand Down Expand Up @@ -844,7 +845,7 @@ private Page<TableIdentifier> listViews(Namespace namespace, PageToken pageToken
@VisibleForTesting
@Override
protected ViewOperations newViewOps(TableIdentifier identifier) {
return new BasePolarisViewOperations(getIo(), identifier);
return new BasePolarisViewOperations(catalogFileIO, identifier);
}

@Override
Expand Down
Loading