Skip to content

Commit 07edd30

Browse files
authored
Add properties from TableMetadata into Table entity internalProperties (#2735)
* Add properties from TableMetadata into Table entity internalProperties * Made table properties constants and pulled out static utility method
1 parent b04c7cd commit 07edd30

File tree

5 files changed

+217
-3
lines changed

5 files changed

+217
-3
lines changed

polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ public B addProperty(String key, String value) {
411411
return (B) this;
412412
}
413413

414-
public B setInternalProperties(Map<String, String> internalProperties) {
414+
public B setInternalProperties(@Nonnull Map<String, String> internalProperties) {
415415
this.internalProperties = new HashMap<>(internalProperties);
416416
return (B) this;
417417
}

polaris-core/src/main/java/org/apache/polaris/core/entity/table/IcebergTableLikeEntity.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020

2121
import com.fasterxml.jackson.annotation.JsonIgnore;
2222
import com.google.common.base.Preconditions;
23+
import jakarta.annotation.Nonnull;
2324
import jakarta.annotation.Nullable;
25+
import java.util.HashMap;
26+
import java.util.Map;
2427
import java.util.Optional;
2528
import org.apache.iceberg.catalog.Namespace;
2629
import org.apache.iceberg.catalog.TableIdentifier;
@@ -47,6 +50,40 @@ public class IcebergTableLikeEntity extends TableLikeEntity {
4750
public static final String LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY =
4851
"last-notification-timestamp";
4952

53+
/*
54+
* The following constants are copied from the TableMetadataParser in Iceberg
55+
* They represent the keys used in the table metadata JSON file.
56+
*/
57+
58+
public static final String FORMAT_VERSION = "format-version";
59+
public static final String TABLE_UUID = "table-uuid";
60+
public static final String LOCATION = "location";
61+
public static final String LAST_SEQUENCE_NUMBER = "last-sequence-number";
62+
public static final String LAST_UPDATED_MILLIS = "last-updated-ms";
63+
public static final String LAST_COLUMN_ID = "last-column-id";
64+
public static final String SCHEMA = "schema";
65+
public static final String SCHEMAS = "schemas";
66+
public static final String CURRENT_SCHEMA_ID = "current-schema-id";
67+
public static final String PARTITION_SPEC = "partition-spec";
68+
public static final String PARTITION_SPECS = "partition-specs";
69+
public static final String DEFAULT_SPEC_ID = "default-spec-id";
70+
public static final String LAST_PARTITION_ID = "last-partition-id";
71+
public static final String DEFAULT_SORT_ORDER_ID = "default-sort-order-id";
72+
public static final String SORT_ORDERS = "sort-orders";
73+
public static final String PROPERTIES = "properties";
74+
public static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";
75+
public static final String REFS = "refs";
76+
public static final String SNAPSHOTS = "snapshots";
77+
public static final String SNAPSHOT_ID = "snapshot-id";
78+
public static final String TIMESTAMP_MS = "timestamp-ms";
79+
public static final String SNAPSHOT_LOG = "snapshot-log";
80+
public static final String METADATA_FILE = "metadata-file";
81+
public static final String METADATA_LOG = "metadata-log";
82+
public static final String STATISTICS = "statistics";
83+
public static final String PARTITION_STATISTICS = "partition-statistics";
84+
public static final String ENCRYPTION_KEYS = "encryption-keys";
85+
public static final String NEXT_ROW_ID = "next-row-id";
86+
5087
public IcebergTableLikeEntity(PolarisBaseEntity sourceEntity) {
5188
super(sourceEntity);
5289
PolarisEntitySubType subType = getSubType();
@@ -83,6 +120,24 @@ public String getBaseLocation() {
83120
}
84121

85122
public static class Builder extends PolarisEntity.BaseBuilder<IcebergTableLikeEntity, Builder> {
123+
124+
public Builder(
125+
PolarisEntitySubType subType,
126+
TableIdentifier identifier,
127+
Map<String, String> properties,
128+
Map<String, String> internalProperties,
129+
String metadataLocation) {
130+
super();
131+
setType(PolarisEntityType.TABLE_LIKE);
132+
setSubType(subType);
133+
setProperties(properties);
134+
setInternalProperties(internalProperties);
135+
// order here matters. properties and internal properties must be set prior to the following
136+
// properties, which merely update the map, whereas the above calls replace the map entirely.
137+
setTableIdentifier(identifier);
138+
setMetadataLocation(metadataLocation);
139+
}
140+
86141
public Builder(
87142
PolarisEntitySubType subType, TableIdentifier identifier, String metadataLocation) {
88143
super();
@@ -121,6 +176,29 @@ public Builder setBaseLocation(String location) {
121176
return this;
122177
}
123178

179+
@Override
180+
public Builder setInternalProperties(@Nonnull Map<String, String> internalProperties) {
181+
// ensure we carry forward the parent namespace and metadata location if already set.
182+
// however, we allow for overriding them if explicitly specified in the provided map.
183+
Map<String, String> newInternalProperties = new HashMap<>();
184+
if (this.internalProperties.get(NamespaceEntity.PARENT_NAMESPACE_KEY) != null) {
185+
newInternalProperties.put(
186+
NamespaceEntity.PARENT_NAMESPACE_KEY,
187+
this.internalProperties.get(NamespaceEntity.PARENT_NAMESPACE_KEY));
188+
}
189+
if (this.internalProperties.get(METADATA_LOCATION_KEY) != null) {
190+
newInternalProperties.put(
191+
METADATA_LOCATION_KEY, this.internalProperties.get(METADATA_LOCATION_KEY));
192+
}
193+
if (this.internalProperties.get(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY) != null) {
194+
newInternalProperties.put(
195+
LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY,
196+
this.internalProperties.get(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY));
197+
}
198+
newInternalProperties.putAll(internalProperties);
199+
return super.setInternalProperties(newInternalProperties);
200+
}
201+
124202
public Builder setMetadataLocation(String location) {
125203
internalProperties.put(METADATA_LOCATION_KEY, location);
126204
return this;

polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/EntityWeigherTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public void testExactWeightCalculation() {
124124
"location",
125125
"{\"a\": \"b\"}",
126126
Optional.of("{\"c\": \"d\", \"e\": \"f\"}")));
127-
Assertions.assertThat(preciseWeight).isEqualTo(1090);
127+
Assertions.assertThat(preciseWeight).isEqualTo(1183); // :( this is hard-coded
128128
}
129129

130130
private static Map<String, String> getPropertiesMap(String properties) {

runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1493,14 +1493,19 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
14931493
"Generic table with same name already exists: %s", tableIdentifier);
14941494
}
14951495
}
1496+
Map<String, String> storedProperties = buildTableMetadataPropertiesMap(metadata);
14961497
IcebergTableLikeEntity entity =
14971498
IcebergTableLikeEntity.of(resolvedPath == null ? null : resolvedPath.getRawLeafEntity());
14981499
String existingLocation;
14991500
if (null == entity) {
15001501
existingLocation = null;
15011502
entity =
15021503
new IcebergTableLikeEntity.Builder(
1503-
PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier, newLocation)
1504+
PolarisEntitySubType.ICEBERG_TABLE,
1505+
tableIdentifier,
1506+
Map.of(),
1507+
storedProperties,
1508+
newLocation)
15041509
.setCatalogId(getCatalogId())
15051510
.setBaseLocation(metadata.location())
15061511
.setId(
@@ -1510,6 +1515,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
15101515
existingLocation = entity.getMetadataLocation();
15111516
entity =
15121517
new IcebergTableLikeEntity.Builder(entity)
1518+
.setInternalProperties(storedProperties)
15131519
.setBaseLocation(metadata.location())
15141520
.setMetadataLocation(newLocation)
15151521
.build();
@@ -1639,6 +1645,41 @@ private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
16391645
}
16401646
}
16411647

1648+
private static Map<String, String> buildTableMetadataPropertiesMap(TableMetadata metadata) {
1649+
Map<String, String> storedProperties = new HashMap<>();
1650+
storedProperties.put(IcebergTableLikeEntity.LOCATION, metadata.location());
1651+
storedProperties.put(
1652+
IcebergTableLikeEntity.FORMAT_VERSION, String.valueOf(metadata.formatVersion()));
1653+
storedProperties.put(IcebergTableLikeEntity.TABLE_UUID, metadata.uuid());
1654+
storedProperties.put(
1655+
IcebergTableLikeEntity.CURRENT_SCHEMA_ID, String.valueOf(metadata.currentSchemaId()));
1656+
if (metadata.currentSnapshot() != null) {
1657+
storedProperties.put(
1658+
IcebergTableLikeEntity.CURRENT_SNAPSHOT_ID,
1659+
String.valueOf(metadata.currentSnapshot().snapshotId()));
1660+
}
1661+
storedProperties.put(
1662+
IcebergTableLikeEntity.LAST_COLUMN_ID, String.valueOf(metadata.lastColumnId()));
1663+
storedProperties.put(IcebergTableLikeEntity.NEXT_ROW_ID, String.valueOf(metadata.nextRowId()));
1664+
storedProperties.put(
1665+
IcebergTableLikeEntity.LAST_SEQUENCE_NUMBER, String.valueOf(metadata.lastSequenceNumber()));
1666+
storedProperties.put(
1667+
IcebergTableLikeEntity.LAST_UPDATED_MILLIS, String.valueOf(metadata.lastUpdatedMillis()));
1668+
if (metadata.sortOrder() != null) {
1669+
storedProperties.put(
1670+
IcebergTableLikeEntity.DEFAULT_SORT_ORDER_ID,
1671+
String.valueOf(metadata.defaultSortOrderId()));
1672+
}
1673+
if (metadata.spec() != null) {
1674+
storedProperties.put(
1675+
IcebergTableLikeEntity.DEFAULT_SPEC_ID, String.valueOf(metadata.defaultSpecId()));
1676+
storedProperties.put(
1677+
IcebergTableLikeEntity.LAST_PARTITION_ID,
1678+
String.valueOf(metadata.lastAssignedPartitionId()));
1679+
}
1680+
return storedProperties;
1681+
}
1682+
16421683
/**
16431684
* An implementation of {@link ViewOperations} that integrates with {@link IcebergCatalog}. Much
16441685
* of this code was originally copied from {@link org.apache.iceberg.view.BaseViewOperations}.

runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.lang.reflect.Method;
4545
import java.time.Clock;
4646
import java.util.Arrays;
47+
import java.util.Comparator;
4748
import java.util.HashMap;
4849
import java.util.List;
4950
import java.util.Map;
@@ -62,6 +63,7 @@
6263
import org.apache.iceberg.FileMetadata;
6364
import org.apache.iceberg.FileScanTask;
6465
import org.apache.iceberg.MetadataUpdate;
66+
import org.apache.iceberg.NullOrder;
6567
import org.apache.iceberg.PartitionSpec;
6668
import org.apache.iceberg.RowDelta;
6769
import org.apache.iceberg.Schema;
@@ -103,12 +105,14 @@
103105
import org.apache.polaris.core.context.CallContext;
104106
import org.apache.polaris.core.context.RealmContext;
105107
import org.apache.polaris.core.entity.CatalogEntity;
108+
import org.apache.polaris.core.entity.NamespaceEntity;
106109
import org.apache.polaris.core.entity.PolarisBaseEntity;
107110
import org.apache.polaris.core.entity.PolarisEntity;
108111
import org.apache.polaris.core.entity.PolarisEntitySubType;
109112
import org.apache.polaris.core.entity.PolarisEntityType;
110113
import org.apache.polaris.core.entity.PrincipalEntity;
111114
import org.apache.polaris.core.entity.TaskEntity;
115+
import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
112116
import org.apache.polaris.core.exceptions.CommitConflictException;
113117
import org.apache.polaris.core.identity.provider.ServiceIdentityProvider;
114118
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
@@ -156,6 +160,7 @@
156160
import org.apache.polaris.service.types.TableUpdateNotification;
157161
import org.assertj.core.api.AbstractCollectionAssert;
158162
import org.assertj.core.api.Assertions;
163+
import org.assertj.core.api.InstanceOfAssertFactories;
159164
import org.assertj.core.api.ListAssert;
160165
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
161166
import org.assertj.core.configuration.PreferredAssumptionException;
@@ -2286,6 +2291,96 @@ public void testTableOperationsDoesNotRefreshAfterCommit(boolean updateMetadataO
22862291
}
22872292
}
22882293

2294+
@Test
2295+
public void testTableInternalPropertiesStoredOnCommit() {
2296+
Assumptions.assumeTrue(
2297+
requiresNamespaceCreate(),
2298+
"Only applicable if namespaces must be created before adding children");
2299+
2300+
catalog.createNamespace(NS);
2301+
catalog.buildTable(TABLE, SCHEMA).create();
2302+
catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit();
2303+
Table afterAppend = catalog.loadTable(TABLE);
2304+
EntityResult schemaResult =
2305+
metaStoreManager.readEntityByName(
2306+
polarisContext,
2307+
List.of(catalogEntity),
2308+
PolarisEntityType.NAMESPACE,
2309+
PolarisEntitySubType.NULL_SUBTYPE,
2310+
NS.toString());
2311+
Assertions.assertThat(schemaResult).returns(true, EntityResult::isSuccess);
2312+
EntityResult tableResult =
2313+
metaStoreManager.readEntityByName(
2314+
polarisContext,
2315+
List.of(catalogEntity, schemaResult.getEntity()),
2316+
PolarisEntityType.TABLE_LIKE,
2317+
PolarisEntitySubType.ICEBERG_TABLE,
2318+
TABLE.name());
2319+
Assertions.assertThat(tableResult)
2320+
.returns(true, EntityResult::isSuccess)
2321+
.extracting(er -> PolarisEntity.of(er.getEntity()))
2322+
.extracting(PolarisEntity::getInternalPropertiesAsMap)
2323+
.asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
2324+
.containsEntry(NamespaceEntity.PARENT_NAMESPACE_KEY, NS.toString())
2325+
.containsEntry(
2326+
IcebergTableLikeEntity.CURRENT_SNAPSHOT_ID,
2327+
String.valueOf(afterAppend.currentSnapshot().snapshotId()))
2328+
.containsEntry(IcebergTableLikeEntity.LOCATION, afterAppend.location())
2329+
.containsEntry(IcebergTableLikeEntity.TABLE_UUID, afterAppend.uuid().toString())
2330+
.containsEntry(
2331+
IcebergTableLikeEntity.CURRENT_SCHEMA_ID,
2332+
String.valueOf(afterAppend.schema().schemaId()))
2333+
.containsEntry(
2334+
IcebergTableLikeEntity.LAST_COLUMN_ID,
2335+
afterAppend.schema().columns().stream()
2336+
.max(Comparator.comparing(Types.NestedField::fieldId))
2337+
.map(Types.NestedField::fieldId)
2338+
.orElse(0)
2339+
.toString())
2340+
.containsEntry(
2341+
IcebergTableLikeEntity.LAST_SEQUENCE_NUMBER,
2342+
String.valueOf(afterAppend.currentSnapshot().sequenceNumber()));
2343+
2344+
catalog.loadTable(TABLE).refresh();
2345+
catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_B).commit();
2346+
validatePropertiesUpdated(
2347+
schemaResult,
2348+
IcebergTableLikeEntity.CURRENT_SNAPSHOT_ID,
2349+
tbl -> String.valueOf(tbl.currentSnapshot().snapshotId()));
2350+
2351+
catalog.loadTable(TABLE).refresh();
2352+
catalog.loadTable(TABLE).updateSchema().addColumn("new_col", Types.LongType.get()).commit();
2353+
validatePropertiesUpdated(
2354+
schemaResult,
2355+
IcebergTableLikeEntity.CURRENT_SCHEMA_ID,
2356+
tbl -> String.valueOf(tbl.schema().schemaId()));
2357+
2358+
catalog.loadTable(TABLE).refresh();
2359+
catalog.loadTable(TABLE).replaceSortOrder().desc("new_col", NullOrder.NULLS_FIRST).commit();
2360+
validatePropertiesUpdated(
2361+
schemaResult,
2362+
IcebergTableLikeEntity.DEFAULT_SORT_ORDER_ID,
2363+
table -> String.valueOf(table.sortOrder().orderId()));
2364+
}
2365+
2366+
private void validatePropertiesUpdated(
2367+
EntityResult schemaResult, String key, Function<Table, String> expectedValue) {
2368+
Table afterUpdate = catalog.loadTable(TABLE);
2369+
EntityResult tableResult =
2370+
metaStoreManager.readEntityByName(
2371+
polarisContext,
2372+
List.of(catalogEntity, schemaResult.getEntity()),
2373+
PolarisEntityType.TABLE_LIKE,
2374+
PolarisEntitySubType.ICEBERG_TABLE,
2375+
TABLE.name());
2376+
Assertions.assertThat(tableResult)
2377+
.returns(true, EntityResult::isSuccess)
2378+
.extracting(er -> PolarisEntity.of(er.getEntity()))
2379+
.extracting(PolarisEntity::getInternalPropertiesAsMap)
2380+
.asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
2381+
.containsEntry(key, expectedValue.apply(afterUpdate));
2382+
}
2383+
22892384
@Test
22902385
public void testEventsAreEmitted() {
22912386
IcebergCatalog catalog = catalog();

0 commit comments

Comments
 (0)