Skip to content

Commit 44064cb

Browse files
authored
Interface changes for pagination (#1528)
* add missing apis * more tests, fixes * clean up drop * autolint * changes per review * revert iceberg messages to comply with oss tests * another revert * more iceberg catalog changes * autolint * dependency issues * more wiring * continuing rebase * remaining issues are related to task loading * re-add tests * debugging * fix failing tests * fix another test * changes per review * autolint * some fixes * stable * updates for new persistence * fix * continuing work * more reverts * continue reverts * more reverts * yank tests * autolint * test reverts * try to support limit without real page tokens * autolint * Stable * change comment * autolint * remove catalog config for now * changes per review * more tweaks * simplify types per review * Stable, about to refactor more * re-stable * polish * autolint * more changes per review * stable
1 parent f7222a0 commit 44064cb

File tree

32 files changed

+823
-199
lines changed

32 files changed

+823
-199
lines changed

extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.function.Predicate;
3838
import java.util.function.Supplier;
3939
import java.util.stream.Collectors;
40+
import java.util.stream.Stream;
4041
import org.apache.polaris.core.PolarisCallContext;
4142
import org.apache.polaris.core.context.RealmContext;
4243
import org.apache.polaris.core.entity.EntityNameLookupRecord;
@@ -52,6 +53,9 @@
5253
import org.apache.polaris.core.persistence.BaseMetaStoreManager;
5354
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
5455
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
56+
import org.apache.polaris.core.persistence.pagination.HasPageSize;
57+
import org.apache.polaris.core.persistence.pagination.Page;
58+
import org.apache.polaris.core.persistence.pagination.PageToken;
5559
import org.apache.polaris.core.persistence.transactional.AbstractTransactionalPersistence;
5660
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
5761
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
@@ -419,29 +423,30 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
419423

420424
/** {@inheritDoc} */
421425
@Override
422-
public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn(
426+
public @Nonnull Page<EntityNameLookupRecord> listEntitiesInCurrentTxn(
423427
@Nonnull PolarisCallContext callCtx,
424428
long catalogId,
425429
long parentId,
426-
@Nonnull PolarisEntityType entityType) {
430+
@Nonnull PolarisEntityType entityType,
431+
@Nonnull PageToken pageToken) {
427432
return this.listEntitiesInCurrentTxn(
428-
callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue());
433+
callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue(), pageToken);
429434
}
430435

431436
@Override
432-
public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn(
437+
public @Nonnull Page<EntityNameLookupRecord> listEntitiesInCurrentTxn(
433438
@Nonnull PolarisCallContext callCtx,
434439
long catalogId,
435440
long parentId,
436441
@Nonnull PolarisEntityType entityType,
437-
@Nonnull Predicate<PolarisBaseEntity> entityFilter) {
442+
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
443+
@Nonnull PageToken pageToken) {
438444
// full range scan under the parent for that type
439445
return this.listEntitiesInCurrentTxn(
440446
callCtx,
441447
catalogId,
442448
parentId,
443449
entityType,
444-
Integer.MAX_VALUE,
445450
entityFilter,
446451
entity ->
447452
new EntityNameLookupRecord(
@@ -450,27 +455,33 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
450455
entity.getParentId(),
451456
entity.getName(),
452457
entity.getTypeCode(),
453-
entity.getSubTypeCode()));
458+
entity.getSubTypeCode()),
459+
pageToken);
454460
}
455461

456462
@Override
457-
public @Nonnull <T> List<T> listEntitiesInCurrentTxn(
463+
public @Nonnull <T> Page<T> listEntitiesInCurrentTxn(
458464
@Nonnull PolarisCallContext callCtx,
459465
long catalogId,
460466
long parentId,
461467
@Nonnull PolarisEntityType entityType,
462-
int limit,
463468
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
464-
@Nonnull Function<PolarisBaseEntity, T> transformer) {
469+
@Nonnull Function<PolarisBaseEntity, T> transformer,
470+
@Nonnull PageToken pageToken) {
465471
// full range scan under the parent for that type
466-
return this.store
467-
.lookupFullEntitiesActive(localSession.get(), catalogId, parentId, entityType)
468-
.stream()
469-
.map(ModelEntity::toEntity)
470-
.filter(entityFilter)
471-
.limit(limit)
472-
.map(transformer)
473-
.collect(Collectors.toList());
472+
Stream<PolarisBaseEntity> data =
473+
this.store
474+
.lookupFullEntitiesActive(
475+
localSession.get(), catalogId, parentId, entityType, pageToken)
476+
.stream()
477+
.map(ModelEntity::toEntity)
478+
.filter(entityFilter);
479+
480+
if (pageToken instanceof HasPageSize hasPageSize) {
481+
data = data.limit(hasPageSize.getPageSize());
482+
}
483+
484+
return Page.fromItems(data.map(transformer).collect(Collectors.toList()));
474485
}
475486

476487
/** {@inheritDoc} */

extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.polaris.core.entity.PolarisEntityType;
3636
import org.apache.polaris.core.entity.PolarisGrantRecord;
3737
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
38+
import org.apache.polaris.core.persistence.pagination.PageToken;
3839
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
3940
import org.apache.polaris.jpa.models.ModelEntity;
4041
import org.apache.polaris.jpa.models.ModelEntityActive;
@@ -282,7 +283,11 @@ long countActiveChildEntities(
282283
}
283284

284285
List<ModelEntity> lookupFullEntitiesActive(
285-
EntityManager session, long catalogId, long parentId, @Nonnull PolarisEntityType entityType) {
286+
EntityManager session,
287+
long catalogId,
288+
long parentId,
289+
@Nonnull PolarisEntityType entityType,
290+
@Nonnull PageToken pageToken) {
286291
diagnosticServices.check(session != null, "session_is_null");
287292
checkInitialized();
288293

extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@
4949
import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
5050
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
5151
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
52+
import org.apache.polaris.core.persistence.pagination.HasPageSize;
53+
import org.apache.polaris.core.persistence.pagination.Page;
54+
import org.apache.polaris.core.persistence.pagination.PageToken;
5255
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
5356
import org.apache.polaris.core.policy.PolicyType;
5457
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
@@ -352,49 +355,51 @@ public List<PolarisChangeTrackingVersions> lookupEntityVersions(
352355

353356
@Nonnull
354357
@Override
355-
public List<EntityNameLookupRecord> listEntities(
358+
public Page<EntityNameLookupRecord> listEntities(
356359
@Nonnull PolarisCallContext callCtx,
357360
long catalogId,
358361
long parentId,
359-
@Nonnull PolarisEntityType entityType) {
362+
@Nonnull PolarisEntityType entityType,
363+
@Nonnull PageToken pageToken) {
360364
return listEntities(
361365
callCtx,
362366
catalogId,
363367
parentId,
364368
entityType,
365-
Integer.MAX_VALUE,
366369
entity -> true,
367-
EntityNameLookupRecord::new);
370+
EntityNameLookupRecord::new,
371+
pageToken);
368372
}
369373

370374
@Nonnull
371375
@Override
372-
public List<EntityNameLookupRecord> listEntities(
376+
public Page<EntityNameLookupRecord> listEntities(
373377
@Nonnull PolarisCallContext callCtx,
374378
long catalogId,
375379
long parentId,
376380
@Nonnull PolarisEntityType entityType,
377-
@Nonnull Predicate<PolarisBaseEntity> entityFilter) {
381+
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
382+
@Nonnull PageToken pageToken) {
378383
return listEntities(
379384
callCtx,
380385
catalogId,
381386
parentId,
382387
entityType,
383-
Integer.MAX_VALUE,
384388
entityFilter,
385-
EntityNameLookupRecord::new);
389+
EntityNameLookupRecord::new,
390+
pageToken);
386391
}
387392

388393
@Nonnull
389394
@Override
390-
public <T> List<T> listEntities(
395+
public <T> Page<T> listEntities(
391396
@Nonnull PolarisCallContext callCtx,
392397
long catalogId,
393398
long parentId,
394399
PolarisEntityType entityType,
395-
int limit,
396400
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
397-
@Nonnull Function<PolarisBaseEntity, T> transformer) {
401+
@Nonnull Function<PolarisBaseEntity, T> transformer,
402+
@Nonnull PageToken pageToken) {
398403
Map<String, Object> params =
399404
Map.of(
400405
"catalog_id",
@@ -415,15 +420,17 @@ public <T> List<T> listEntities(
415420
query,
416421
new ModelEntity(),
417422
stream -> {
418-
stream
419-
.map(ModelEntity::toEntity)
420-
.filter(entityFilter)
421-
.limit(limit)
422-
.forEach(results::add);
423+
var data = stream.map(ModelEntity::toEntity).filter(entityFilter);
424+
if (pageToken instanceof HasPageSize hasPageSize) {
425+
data = data.limit(hasPageSize.getPageSize());
426+
}
427+
data.forEach(results::add);
423428
});
424-
return results == null
425-
? Collections.emptyList()
426-
: results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList());
429+
List<T> resultsOrEmpty =
430+
results == null
431+
? Collections.emptyList()
432+
: results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList());
433+
return Page.fromItems(resultsOrEmpty);
427434
} catch (SQLException e) {
428435
throw new RuntimeException(
429436
String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e);

polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,13 @@ public static void enforceFeatureEnabledOrThrow(
202202
.defaultValue(2)
203203
.buildFeatureConfiguration();
204204

205+
public static final PolarisConfiguration<Boolean> LIST_PAGINATION_ENABLED =
206+
PolarisConfiguration.<Boolean>builder()
207+
.key("LIST_PAGINATION_ENABLED")
208+
.description("If set to true, pagination for APIs like listTables is enabled.")
209+
.defaultValue(false)
210+
.buildFeatureConfiguration();
211+
205212
public static final FeatureConfiguration<Boolean> ENABLE_GENERIC_TABLES =
206213
PolarisConfiguration.<Boolean>builder()
207214
.key("ENABLE_GENERIC_TABLES")

polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@
6262
import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult;
6363
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
6464
import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult;
65+
import org.apache.polaris.core.persistence.pagination.Page;
66+
import org.apache.polaris.core.persistence.pagination.PageToken;
6567
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
6668
import org.apache.polaris.core.policy.PolicyEntity;
6769
import org.apache.polaris.core.policy.PolicyMappingUtil;
@@ -687,7 +689,8 @@ private void revokeGrantRecord(
687689
@Nonnull PolarisCallContext callCtx,
688690
@Nullable List<PolarisEntityCore> catalogPath,
689691
@Nonnull PolarisEntityType entityType,
690-
@Nonnull PolarisEntitySubType entitySubType) {
692+
@Nonnull PolarisEntitySubType entitySubType,
693+
@Nonnull PageToken pageToken) {
691694
// get meta store we should be using
692695
BasePersistence ms = callCtx.getMetaStore();
693696

@@ -699,15 +702,16 @@ private void revokeGrantRecord(
699702
catalogPath == null || catalogPath.size() == 0
700703
? 0l
701704
: catalogPath.get(catalogPath.size() - 1).getId();
702-
List<EntityNameLookupRecord> toreturnList =
703-
ms.listEntities(callCtx, catalogId, parentId, entityType);
705+
Page<EntityNameLookupRecord> resultPage =
706+
ms.listEntities(callCtx, catalogId, parentId, entityType, pageToken);
704707

705708
// prune the returned list with only entities matching the entity subtype
706709
if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) {
707-
toreturnList =
708-
toreturnList.stream()
709-
.filter(rec -> rec.getSubTypeCode() == entitySubType.getCode())
710-
.collect(Collectors.toList());
710+
resultPage =
711+
pageToken.buildNextPage(
712+
resultPage.items.stream()
713+
.filter(rec -> rec.getSubTypeCode() == entitySubType.getCode())
714+
.collect(Collectors.toList()));
711715
}
712716

713717
// TODO: Use post-validation to enforce consistent view against catalogPath. In the
@@ -717,7 +721,7 @@ private void revokeGrantRecord(
717721
// in-flight request (the cache-based resolution follows a different path entirely).
718722

719723
// done
720-
return new ListEntitiesResult(toreturnList);
724+
return ListEntitiesResult.fromPage(resultPage);
721725
}
722726

723727
/** {@inheritDoc} */
@@ -1176,13 +1180,14 @@ private void revokeGrantRecord(
11761180
// get the list of catalog roles, at most 2
11771181
List<PolarisBaseEntity> catalogRoles =
11781182
ms.listEntities(
1179-
callCtx,
1180-
catalogId,
1181-
catalogId,
1182-
PolarisEntityType.CATALOG_ROLE,
1183-
2,
1184-
entity -> true,
1185-
Function.identity());
1183+
callCtx,
1184+
catalogId,
1185+
catalogId,
1186+
PolarisEntityType.CATALOG_ROLE,
1187+
entity -> true,
1188+
Function.identity(),
1189+
PageToken.fromLimit(2))
1190+
.items;
11861191

11871192
// if we have 2, we cannot drop the catalog. If only one left, better be the admin role
11881193
if (catalogRoles.size() > 1) {
@@ -1488,17 +1493,16 @@ private void revokeGrantRecord(
14881493

14891494
@Override
14901495
public @Nonnull EntitiesResult loadTasks(
1491-
@Nonnull PolarisCallContext callCtx, String executorId, int limit) {
1496+
@Nonnull PolarisCallContext callCtx, String executorId, PageToken pageToken) {
14921497
BasePersistence ms = callCtx.getMetaStore();
14931498

14941499
// find all available tasks
1495-
List<PolarisBaseEntity> availableTasks =
1500+
Page<PolarisBaseEntity> availableTasks =
14961501
ms.listEntities(
14971502
callCtx,
14981503
PolarisEntityConstants.getRootEntityId(),
14991504
PolarisEntityConstants.getRootEntityId(),
15001505
PolarisEntityType.TASK,
1501-
limit,
15021506
entity -> {
15031507
PolarisObjectMapperUtil.TaskExecutionState taskState =
15041508
PolarisObjectMapperUtil.parseTaskState(entity);
@@ -1513,11 +1517,12 @@ private void revokeGrantRecord(
15131517
|| taskState.executor == null
15141518
|| callCtx.getClock().millis() - taskState.lastAttemptStartTime > taskAgeTimeout;
15151519
},
1516-
Function.identity());
1520+
Function.identity(),
1521+
pageToken);
15171522

15181523
List<PolarisBaseEntity> loadedTasks = new ArrayList<>();
15191524
final AtomicInteger failedLeaseCount = new AtomicInteger(0);
1520-
availableTasks.forEach(
1525+
availableTasks.items.forEach(
15211526
task -> {
15221527
PolarisBaseEntity updatedTask = new PolarisBaseEntity(task);
15231528
Map<String, String> properties =
@@ -1554,7 +1559,7 @@ private void revokeGrantRecord(
15541559
throw new RetryOnConcurrencyException(
15551560
"Failed to lease any of %s tasks due to concurrent leases", failedLeaseCount.get());
15561561
}
1557-
return new EntitiesResult(loadedTasks);
1562+
return EntitiesResult.fromPage(Page.fromItems(loadedTasks));
15581563
}
15591564

15601565
/** {@inheritDoc} */

0 commit comments

Comments
 (0)