Skip to content

Commit b1142b5

Browse files
authored
JDBC: Handle schema evolution (#2714)
1 parent d2a607a commit b1142b5

File tree

15 files changed

+396
-54
lines changed

15 files changed

+396
-54
lines changed

persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -174,15 +174,18 @@ private void persistEntity(
174174
Connection connection,
175175
QueryAction queryAction)
176176
throws SQLException {
177-
ModelEntity modelEntity = ModelEntity.fromEntity(entity);
177+
ModelEntity modelEntity = ModelEntity.fromEntity(entity, schemaVersion);
178178
if (originalEntity == null) {
179179
try {
180180
List<Object> values =
181181
modelEntity.toMap(datasourceOperations.getDatabaseType()).values().stream().toList();
182182
queryAction.apply(
183183
connection,
184184
QueryGenerator.generateInsertQuery(
185-
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, values, realmId));
185+
ModelEntity.getAllColumnNames(schemaVersion),
186+
ModelEntity.TABLE_NAME,
187+
values,
188+
realmId));
186189
} catch (SQLException e) {
187190
if (datasourceOperations.isConstraintViolation(e)) {
188191
PolarisBaseEntity existingEntity =
@@ -222,7 +225,10 @@ private void persistEntity(
222225
queryAction.apply(
223226
connection,
224227
QueryGenerator.generateUpdateQuery(
225-
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, values, params));
228+
ModelEntity.getAllColumnNames(schemaVersion),
229+
ModelEntity.TABLE_NAME,
230+
values,
231+
params));
226232
if (rowsUpdated == 0) {
227233
throw new RetryOnConcurrencyException(
228234
"Entity '%s' id '%s' concurrently modified; expected version %s",
@@ -310,7 +316,7 @@ public void writeEvents(@Nonnull List<PolarisEvent> events) {
310316

311317
@Override
312318
public void deleteEntity(@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) {
313-
ModelEntity modelEntity = ModelEntity.fromEntity(entity);
319+
ModelEntity modelEntity = ModelEntity.fromEntity(entity, schemaVersion);
314320
Map<String, Object> params =
315321
Map.of(
316322
"id",
@@ -322,7 +328,7 @@ public void deleteEntity(@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBa
322328
try {
323329
datasourceOperations.executeUpdate(
324330
QueryGenerator.generateDeleteQuery(
325-
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params));
331+
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params));
326332
} catch (SQLException e) {
327333
throw new RuntimeException(
328334
String.format("Failed to delete entity due to %s", e.getMessage()), e);
@@ -370,7 +376,7 @@ public void deleteAll(@Nonnull PolarisCallContext callCtx) {
370376
datasourceOperations.execute(
371377
connection,
372378
QueryGenerator.generateDeleteQuery(
373-
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params));
379+
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params));
374380
datasourceOperations.execute(
375381
connection,
376382
QueryGenerator.generateDeleteQuery(
@@ -402,7 +408,7 @@ public PolarisBaseEntity lookupEntity(
402408
Map.of("catalog_id", catalogId, "id", entityId, "type_code", typeCode, "realm_id", realmId);
403409
return getPolarisBaseEntity(
404410
QueryGenerator.generateSelectQuery(
405-
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params));
411+
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params));
406412
}
407413

408414
@Override
@@ -426,13 +432,13 @@ public PolarisBaseEntity lookupEntityByName(
426432
realmId);
427433
return getPolarisBaseEntity(
428434
QueryGenerator.generateSelectQuery(
429-
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params));
435+
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params));
430436
}
431437

432438
@Nullable
433439
private PolarisBaseEntity getPolarisBaseEntity(QueryGenerator.PreparedQuery query) {
434440
try {
435-
var results = datasourceOperations.executeSelect(query, new ModelEntity());
441+
var results = datasourceOperations.executeSelect(query, new ModelEntity(schemaVersion));
436442
if (results.isEmpty()) {
437443
return null;
438444
} else if (results.size() > 1) {
@@ -454,9 +460,10 @@ private PolarisBaseEntity getPolarisBaseEntity(QueryGenerator.PreparedQuery quer
454460
public List<PolarisBaseEntity> lookupEntities(
455461
@Nonnull PolarisCallContext callCtx, List<PolarisEntityId> entityIds) {
456462
if (entityIds == null || entityIds.isEmpty()) return new ArrayList<>();
457-
PreparedQuery query = QueryGenerator.generateSelectQueryWithEntityIds(realmId, entityIds);
463+
PreparedQuery query =
464+
QueryGenerator.generateSelectQueryWithEntityIds(realmId, schemaVersion, entityIds);
458465
try {
459-
return datasourceOperations.executeSelect(query, new ModelEntity());
466+
return datasourceOperations.executeSelect(query, new ModelEntity(schemaVersion));
460467
} catch (SQLException e) {
461468
throw new RuntimeException(
462469
String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e);
@@ -472,7 +479,7 @@ public List<PolarisChangeTrackingVersions> lookupEntityVersions(
472479
.collect(
473480
Collectors.toMap(
474481
entry -> new PolarisEntityId(entry.getCatalogId(), entry.getId()),
475-
ModelEntity::fromEntity));
482+
entry -> ModelEntity.fromEntity(entry, schemaVersion)));
476483
return entityIds.stream()
477484
.map(
478485
entityId -> {
@@ -575,11 +582,16 @@ public <T> Page<T> loadEntities(
575582
try {
576583
PreparedQuery query =
577584
buildEntityQuery(
578-
catalogId, parentId, entityType, entitySubType, pageToken, ModelEntity.ALL_COLUMNS);
585+
catalogId,
586+
parentId,
587+
entityType,
588+
entitySubType,
589+
pageToken,
590+
ModelEntity.getAllColumnNames(schemaVersion));
579591
AtomicReference<Page<T>> results = new AtomicReference<>();
580592
datasourceOperations.executeSelectOverStream(
581593
query,
582-
new ModelEntity(),
594+
new ModelEntity(schemaVersion),
583595
stream -> {
584596
var data = stream.filter(entityFilter);
585597
results.set(Page.mapped(pageToken, data, transformer, EntityIdToken::fromEntity));
@@ -600,7 +612,7 @@ public int lookupEntityGrantRecordsVersion(
600612
PolarisBaseEntity b =
601613
getPolarisBaseEntity(
602614
QueryGenerator.generateSelectQuery(
603-
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params));
615+
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params));
604616
return b == null ? 0 : b.getGrantRecordsVersion();
605617
}
606618

@@ -714,8 +726,8 @@ public boolean hasChildren(
714726
var results =
715727
datasourceOperations.executeSelect(
716728
QueryGenerator.generateSelectQuery(
717-
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params),
718-
new ModelEntity());
729+
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params),
730+
new ModelEntity(schemaVersion));
719731
return results != null && !results.isEmpty();
720732
} catch (SQLException e) {
721733
throw new RuntimeException(
@@ -759,9 +771,9 @@ Optional<Optional<String>> hasOverlappingSiblings(
759771

760772
PreparedQuery query =
761773
QueryGenerator.generateOverlapQuery(
762-
realmId, entity.getCatalogId(), entity.getBaseLocation());
774+
realmId, schemaVersion, entity.getCatalogId(), entity.getBaseLocation());
763775
try {
764-
var results = datasourceOperations.executeSelect(query, new ModelEntity());
776+
var results = datasourceOperations.executeSelect(query, new ModelEntity(schemaVersion));
765777
if (!results.isEmpty()) {
766778
StorageLocation entityLocation = StorageLocation.of(entity.getBaseLocation());
767779
for (PolarisBaseEntity result : results) {

persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,13 @@ public static PreparedQuery generateDeleteQueryForEntityGrantRecords(
113113
* Builds a SELECT query using a list of entity ID pairs (catalog_id, id).
114114
*
115115
* @param realmId Realm to filter by.
116+
* @param schemaVersion The schema version of entities table to query
116117
* @param entityIds List of PolarisEntityId pairs.
117118
* @return SELECT query to retrieve matching entities.
118119
* @throws IllegalArgumentException if entityIds is empty.
119120
*/
120121
public static PreparedQuery generateSelectQueryWithEntityIds(
121-
@Nonnull String realmId, @Nonnull List<PolarisEntityId> entityIds) {
122+
@Nonnull String realmId, int schemaVersion, @Nonnull List<PolarisEntityId> entityIds) {
122123
if (entityIds.isEmpty()) {
123124
throw new IllegalArgumentException("Empty entity ids");
124125
}
@@ -131,7 +132,9 @@ public static PreparedQuery generateSelectQueryWithEntityIds(
131132
params.add(realmId);
132133
String where = " WHERE (catalog_id, id) IN (" + placeholders + ") AND realm_id = ?";
133134
return new PreparedQuery(
134-
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where, null).sql(),
135+
generateSelectQuery(
136+
ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, where, null)
137+
.sql(),
135138
params);
136139
}
137140

@@ -260,13 +263,14 @@ static PreparedQuery generateVersionQuery() {
260263
* This should be combined with a check using `StorageLocation`.
261264
*
262265
* @param realmId A realm to search within
266+
* @param schemaVersion The schema version of entities table to query
263267
* @param catalogId A catalog entity to search within
264268
* @param baseLocation The base location to look for overlap with, with or without a scheme
265269
* @return The list of possibly overlapping entities that meet the criteria
266270
*/
267271
@VisibleForTesting
268272
public static PreparedQuery generateOverlapQuery(
269-
String realmId, long catalogId, String baseLocation) {
273+
String realmId, int schemaVersion, long catalogId, String baseLocation) {
270274
StorageLocation baseStorageLocation = StorageLocation.of(baseLocation);
271275
String locationWithoutScheme = baseStorageLocation.withoutScheme();
272276

@@ -297,7 +301,11 @@ public static PreparedQuery generateOverlapQuery(
297301

298302
QueryFragment where = new QueryFragment(clause, finalParams);
299303
PreparedQuery query =
300-
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where.sql(), null);
304+
generateSelectQuery(
305+
ModelEntity.getAllColumnNames(schemaVersion),
306+
ModelEntity.TABLE_NAME,
307+
where.sql(),
308+
null);
301309
return new PreparedQuery(query.sql(), where.parameters());
302310
}
303311

persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/Converter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ public interface Converter<T> {
3535
T fromResultSet(ResultSet rs) throws SQLException;
3636

3737
/**
38-
* Convert a model into a Map with keys as snake case names, where as values as values of member
39-
* of model obj.
38+
* Convert a model into a Map with keys as snake case names, and values as values of member of
39+
* model obj.
4040
*/
4141
Map<String, Object> toMap(DatabaseType databaseType);
4242

persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,25 @@ public class ModelEntity implements Converter<PolarisBaseEntity> {
3535

3636
public static final String ID_COLUMN = "id";
3737

38-
public static final List<String> ALL_COLUMNS =
38+
private static final List<String> ALL_COLUMNS =
39+
List.of(
40+
"id",
41+
"catalog_id",
42+
"parent_id",
43+
"type_code",
44+
"name",
45+
"entity_version",
46+
"sub_type_code",
47+
"create_timestamp",
48+
"drop_timestamp",
49+
"purge_timestamp",
50+
"to_purge_timestamp",
51+
"last_update_timestamp",
52+
"properties",
53+
"internal_properties",
54+
"grant_records_version");
55+
56+
private static final List<String> ALL_COLUMNS_V2 =
3957
List.of(
4058
"id",
4159
"catalog_id",
@@ -54,6 +72,14 @@ public class ModelEntity implements Converter<PolarisBaseEntity> {
5472
"grant_records_version",
5573
"location_without_scheme");
5674

75+
public static List<String> getAllColumnNames(int schemaVersion) {
76+
if (schemaVersion < 2) {
77+
return ALL_COLUMNS;
78+
} else {
79+
return ALL_COLUMNS_V2;
80+
}
81+
}
82+
5783
public static final List<String> ENTITY_LOOKUP_COLUMNS =
5884
List.of("id", "catalog_id", "parent_id", "type_code", "name", "sub_type_code");
5985

@@ -106,6 +132,16 @@ public class ModelEntity implements Converter<PolarisBaseEntity> {
106132
// location for the entity but without a scheme, when applicable
107133
private String locationWithoutScheme;
108134

135+
// schema version of the entity
136+
// NOTE: this field is not stored in the database, but is used to handle schema changes
137+
private int schemaVersion;
138+
139+
public ModelEntity(int schemaVersion) {
140+
this.schemaVersion = schemaVersion;
141+
}
142+
143+
public ModelEntity() {}
144+
109145
public long getId() {
110146
return id;
111147
}
@@ -170,12 +206,17 @@ public String getLocationWithoutScheme() {
170206
return locationWithoutScheme;
171207
}
172208

209+
public int getSchemaVersion() {
210+
return schemaVersion;
211+
}
212+
173213
public static Builder builder() {
174214
return new Builder();
175215
}
176216

177217
@Override
178218
public PolarisBaseEntity fromResultSet(ResultSet r) throws SQLException {
219+
179220
var modelEntity =
180221
ModelEntity.builder()
181222
.catalogId(r.getObject("catalog_id", Long.class))
@@ -195,7 +236,8 @@ public PolarisBaseEntity fromResultSet(ResultSet r) throws SQLException {
195236
// JSONB: use getString(), not getObject().
196237
.internalProperties(r.getString("internal_properties"))
197238
.grantRecordsVersion(r.getObject("grant_records_version", Integer.class))
198-
.locationWithoutScheme(r.getString("location_without_scheme"))
239+
.locationWithoutScheme(
240+
this.schemaVersion >= 2 ? r.getString("location_without_scheme") : null)
199241
.build();
200242

201243
return toEntity(modelEntity);
@@ -224,7 +266,9 @@ public Map<String, Object> toMap(DatabaseType databaseType) {
224266
map.put("internal_properties", this.getInternalProperties());
225267
}
226268
map.put("grant_records_version", this.getGrantRecordsVersion());
227-
map.put("location_without_scheme", this.getLocationWithoutScheme());
269+
if (this.getSchemaVersion() >= 2) {
270+
map.put("location_without_scheme", this.getLocationWithoutScheme());
271+
}
228272
return map;
229273
}
230274

@@ -315,12 +359,17 @@ public Builder locationWithoutScheme(String location) {
315359
return this;
316360
}
317361

362+
public Builder schemaVersion(int schemaVersion) {
363+
entity.schemaVersion = schemaVersion;
364+
return this;
365+
}
366+
318367
public ModelEntity build() {
319368
return entity;
320369
}
321370
}
322371

323-
public static ModelEntity fromEntity(PolarisBaseEntity entity) {
372+
public static ModelEntity fromEntity(PolarisBaseEntity entity, int schemaVersion) {
324373
var builder =
325374
ModelEntity.builder()
326375
.catalogId(entity.getCatalogId())
@@ -355,6 +404,8 @@ public static ModelEntity fromEntity(PolarisBaseEntity entity) {
355404
.withoutScheme());
356405
}
357406

407+
builder.schemaVersion(schemaVersion);
408+
358409
return builder.build();
359410
}
360411

persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,18 @@
3434
import org.h2.jdbcx.JdbcConnectionPool;
3535
import org.mockito.Mockito;
3636

37-
public class AtomicMetastoreManagerWithJdbcBasePersistenceImplTest
37+
public abstract class AtomicMetastoreManagerWithJdbcBasePersistenceImplTest
3838
extends BasePolarisMetaStoreManagerTest {
3939

40-
public static DataSource createH2DataSource() {
41-
return JdbcConnectionPool.create("jdbc:h2:file:./build/test_data/polaris/db", "sa", "");
40+
public DataSource createH2DataSource() {
41+
return JdbcConnectionPool.create(
42+
String.format("jdbc:h2:file:./build/test_data/polaris/db_%s", schemaVersion()), "sa", "");
4243
}
4344

45+
public abstract int schemaVersion();
46+
4447
@Override
4548
protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
46-
int schemaVersion = 2;
4749
PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl();
4850
DatasourceOperations datasourceOperations;
4951
try {
@@ -52,7 +54,8 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
5254
ClassLoader classLoader = DatasourceOperations.class.getClassLoader();
5355
InputStream scriptStream =
5456
classLoader.getResourceAsStream(
55-
String.format("%s/schema-v%s.sql", DatabaseType.H2.getDisplayName(), schemaVersion));
57+
String.format(
58+
"%s/schema-v%s.sql", DatabaseType.H2.getDisplayName(), schemaVersion()));
5659
datasourceOperations.executeScript(scriptStream);
5760
} catch (SQLException e) {
5861
throw new RuntimeException(
@@ -69,7 +72,7 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
6972
RANDOM_SECRETS,
7073
Mockito.mock(),
7174
realmContext.getRealmIdentifier(),
72-
schemaVersion);
75+
schemaVersion());
7376
AtomicOperationMetaStoreManager metaStoreManager =
7477
new AtomicOperationMetaStoreManager(clock, diagServices);
7578
PolarisCallContext callCtx = new PolarisCallContext(realmContext, basePersistence);

0 commit comments

Comments
 (0)