Skip to content

Commit d96a16c

Browse files
authored
Allow BasePolarisTableOperations to skip refreshing metadata after a commit (#1456)
* initial commit * fix another test * changes per comments * visibility * changes per review * autolint * oops
1 parent 0c48970 commit d96a16c

File tree

4 files changed

+127
-6
lines changed

4 files changed

+127
-6
lines changed

polaris-core/src/main/java/org/apache/polaris/core/config/BehaviorChangeConfiguration.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,15 @@ protected BehaviorChangeConfiguration(
5959
.description("Whether or not to use soft values in the entity cache")
6060
.defaultValue(false)
6161
.buildBehaviorChangeConfiguration();
62+
63+
public static final BehaviorChangeConfiguration<Boolean>
64+
TABLE_OPERATIONS_MAKE_METADATA_CURRENT_ON_COMMIT =
65+
PolarisConfiguration.<Boolean>builder()
66+
.key("TABLE_OPERATIONS_MAKE_METADATA_CURRENT_ON_COMMIT")
67+
.description(
68+
"If true, BasePolarisTableOperations should mark the metadata that is passed into"
69+
+ " `commit` as current, and re-use it to skip a trip to object storage to re-construct"
70+
+ " the committed metadata again.")
71+
.defaultValue(true)
72+
.buildBehaviorChangeConfiguration();
6273
}

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@
137137
import org.junit.jupiter.params.ParameterizedTest;
138138
import org.junit.jupiter.params.provider.Arguments;
139139
import org.junit.jupiter.params.provider.MethodSource;
140+
import org.junit.jupiter.params.provider.ValueSource;
141+
import org.mockito.MockedStatic;
140142
import org.mockito.Mockito;
141143
import software.amazon.awssdk.core.exception.NonRetryableException;
142144
import software.amazon.awssdk.core.exception.RetryableException;
@@ -1698,8 +1700,8 @@ public void testFileIOWrapper() {
16981700

16991701
table.updateProperties().set("foo", "bar").commit();
17001702
Assertions.assertThat(measured.getInputBytes())
1701-
.as("A table was read and written")
1702-
.isGreaterThan(0);
1703+
.as("A table was read and written, but a trip to storage was made")
1704+
.isEqualTo(0);
17031705

17041706
Assertions.assertThat(catalog.dropTable(TABLE)).as("Table deletion should succeed").isTrue();
17051707
TaskEntity taskEntity =
@@ -1848,6 +1850,56 @@ public void testConcurrencyConflictUpdateTableDuringFinalTransaction() {
18481850
.hasMessageContaining("conflict_table");
18491851
}
18501852

1853+
@ParameterizedTest
1854+
@ValueSource(booleans = {false, true})
1855+
public void testTableOperationsDoesNotRefreshAfterCommit(boolean updateMetadataOnCommit) {
1856+
Assumptions.assumeTrue(
1857+
requiresNamespaceCreate(),
1858+
"Only applicable if namespaces must be created before adding children");
1859+
1860+
catalog.createNamespace(NS);
1861+
catalog.buildTable(TABLE, SCHEMA).create();
1862+
1863+
IcebergCatalog.BasePolarisTableOperations realOps =
1864+
(IcebergCatalog.BasePolarisTableOperations)
1865+
catalog.newTableOps(TABLE, updateMetadataOnCommit);
1866+
IcebergCatalog.BasePolarisTableOperations ops = Mockito.spy(realOps);
1867+
1868+
try (MockedStatic<TableMetadataParser> mocked =
1869+
Mockito.mockStatic(TableMetadataParser.class, Mockito.CALLS_REAL_METHODS)) {
1870+
TableMetadata base1 = ops.current();
1871+
mocked.verify(
1872+
() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1));
1873+
1874+
TableMetadata base2 = ops.refresh();
1875+
mocked.verify(
1876+
() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1));
1877+
1878+
Assertions.assertThat(base1.metadataFileLocation()).isEqualTo(base2.metadataFileLocation());
1879+
Assertions.assertThat(base1).isEqualTo(base2);
1880+
1881+
Schema newSchema =
1882+
new Schema(Types.NestedField.optional(100, "new_col", Types.LongType.get()));
1883+
TableMetadata newMetadata =
1884+
TableMetadata.buildFrom(base1).setCurrentSchema(newSchema, 100).build();
1885+
ops.commit(base2, newMetadata);
1886+
mocked.verify(
1887+
() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()), Mockito.times(1));
1888+
1889+
ops.current();
1890+
int expectedReads = updateMetadataOnCommit ? 1 : 2;
1891+
mocked.verify(
1892+
() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()),
1893+
Mockito.times(expectedReads));
1894+
ops.refresh();
1895+
mocked.verify(
1896+
() -> TableMetadataParser.read(Mockito.any(), Mockito.anyString()),
1897+
Mockito.times(expectedReads));
1898+
} finally {
1899+
catalog.dropTable(TABLE, true);
1900+
}
1901+
}
1902+
18511903
private static InMemoryFileIO getInMemoryIo(IcebergCatalog catalog) {
18521904
return (InMemoryFileIO) ((ExceptionMappingFileIO) catalog.getIo()).getInnerIo();
18531905
}

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static org.apache.iceberg.types.Types.NestedField.required;
2222
import static org.assertj.core.api.Assertions.assertThat;
23+
import static org.assertj.core.api.Assertions.assertThatCode;
2324
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2425

2526
import com.azure.core.exception.AzureException;
@@ -117,6 +118,30 @@ private static void requestCreateTable() {
117118
res.close();
118119
}
119120

121+
private static void requestDropTable() {
122+
Response res =
123+
services
124+
.restApi()
125+
.dropTable(
126+
catalog, "ns1", "t1", false, services.realmContext(), services.securityContext());
127+
res.close();
128+
}
129+
130+
private static void requestLoadTable() {
131+
Response res =
132+
services
133+
.restApi()
134+
.loadTable(
135+
catalog,
136+
"ns1",
137+
"t1",
138+
null,
139+
"ALL",
140+
services.realmContext(),
141+
services.securityContext());
142+
res.close();
143+
}
144+
120145
static Stream<RuntimeException> exceptions() {
121146
return Stream.of(
122147
new AzureException("Forbidden"),
@@ -135,7 +160,9 @@ void testLoadFileIOExceptionPropagation(RuntimeException ex) {
135160
@MethodSource("exceptions")
136161
void testNewInputFileExceptionPropagation(RuntimeException ex) {
137162
ioFactory.newInputFileExceptionSupplier = Optional.of(() -> ex);
138-
assertThatThrownBy(FileIOExceptionsTest::requestCreateTable).isSameAs(ex);
163+
assertThatCode(FileIOExceptionsTest::requestCreateTable).doesNotThrowAnyException();
164+
assertThatThrownBy(FileIOExceptionsTest::requestLoadTable).isSameAs(ex);
165+
assertThatCode(FileIOExceptionsTest::requestDropTable).doesNotThrowAnyException();
139166
}
140167

141168
@ParameterizedTest

service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -360,9 +360,22 @@ public ViewBuilder buildView(TableIdentifier identifier) {
360360
return new PolarisIcebergCatalogViewBuilder(identifier);
361361
}
362362

363+
@VisibleForTesting
364+
public TableOperations newTableOps(
365+
TableIdentifier tableIdentifier, boolean makeMetadataCurrentOnCommit) {
366+
return new BasePolarisTableOperations(
367+
catalogFileIO, tableIdentifier, makeMetadataCurrentOnCommit);
368+
}
369+
363370
@Override
364371
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
365-
return new BasePolarisTableOperations(catalogFileIO, tableIdentifier);
372+
boolean makeMetadataCurrentOnCommit =
373+
getCurrentPolarisContext()
374+
.getConfigurationStore()
375+
.getConfiguration(
376+
getCurrentPolarisContext(),
377+
BehaviorChangeConfiguration.TABLE_OPERATIONS_MAKE_METADATA_CURRENT_ON_COMMIT);
378+
return newTableOps(tableIdentifier, makeMetadataCurrentOnCommit);
366379
}
367380

368381
@Override
@@ -1207,17 +1220,24 @@ public ViewBuilder withLocation(String newLocation) {
12071220
* org.apache.iceberg.BaseMetastoreTableOperations}. CODE_COPIED_TO_POLARIS From Apache Iceberg
12081221
* Version: 1.8
12091222
*/
1210-
private class BasePolarisTableOperations extends PolarisOperationsBase<TableMetadata>
1223+
@VisibleForTesting
1224+
public class BasePolarisTableOperations extends PolarisOperationsBase<TableMetadata>
12111225
implements TableOperations {
12121226
private final TableIdentifier tableIdentifier;
12131227
private final String fullTableName;
1228+
private final boolean makeMetadataCurrentOnCommit;
1229+
12141230
private FileIO tableFileIO;
12151231

1216-
BasePolarisTableOperations(FileIO defaultFileIO, TableIdentifier tableIdentifier) {
1232+
BasePolarisTableOperations(
1233+
FileIO defaultFileIO,
1234+
TableIdentifier tableIdentifier,
1235+
boolean makeMetadataCurrentOnCommit) {
12171236
LOGGER.debug("new BasePolarisTableOperations for {}", tableIdentifier);
12181237
this.tableIdentifier = tableIdentifier;
12191238
this.fullTableName = fullTableName(catalogName, tableIdentifier);
12201239
this.tableFileIO = defaultFileIO;
1240+
this.makeMetadataCurrentOnCommit = makeMetadataCurrentOnCommit;
12211241
}
12221242

12231243
@Override
@@ -1476,6 +1496,17 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
14761496
+ "because it has been concurrently modified to %s",
14771497
tableIdentifier, oldLocation, newLocation, existingLocation);
14781498
}
1499+
1500+
// We diverge from `BaseMetastoreTableOperations` in the below code block
1501+
if (makeMetadataCurrentOnCommit) {
1502+
currentMetadata =
1503+
TableMetadata.buildFrom(metadata)
1504+
.withMetadataLocation(newLocation)
1505+
.discardChanges()
1506+
.build();
1507+
currentMetadataLocation = newLocation;
1508+
}
1509+
14791510
if (null == existingLocation) {
14801511
createTableLike(tableIdentifier, entity);
14811512
} else {

0 commit comments

Comments
 (0)