Skip to content

Commit

Permalink
Core persistence refactor phase 3 (diffbase on phase 2) - Add AtomicO…
Browse files Browse the repository at this point in the history
…perationMetaStoreManager which passes all tests (#1139)

* Initial sketch of how shared code can be lifted from PolarisMetaStoreManagerImpl into BaseMetaStoreManager

* Make all TransactionalPersistence methods explicit in terms of whether they are
one-shot atomic or if they are intended to be run as part of a caller-managed
transaction, in which case the action will not be flushed/durable until the
transaction is committed.

Delegate all the BasePersistence methods to subclass variations of *InCurrentTxn
so that the TransactionalPersistence implementations also happen to fully
implement the real semantics of BasePersistence.

* Invert the method naming semantics; "doFooAtomically" is now the set of "new" methods defined in `BasePersistence`
while the original "doFoo" will mean that it expects to be run in a current transaction. Removed all the InCurrentTxn
suffixes.

* Copy/paste PolarisMetaStoreManagerImpl.java to a new AtomicOperationMetaStoreManager
as a diffbase.

* Add AtomicOperationMetaStoreManager  class which starts as just a full copy/paste
of PolarisMetaStoreManagerImpl, but remove all its usage of runInTransaction
instead only using the "doFooAtomically" BasePersistence methods.

Implement the intended compare-and-swap semantics in writeEntity/writeEntities
in AbstractTransactionalPersistence so that the TreeMap and EclipseLink impls
can both actually successfully be used in "single-durable-operation" mode
as plain BasePersistence implementations (e.g. without using their
runInTransaction capabilities at all).

* Update all concurrency TODO comments with more thorough explanation of implications
of failure modes in the absence of additional bulk methods,.

Reorder the internals of dropEntity to drop the entity first, instead of grants,
so that the drop attempt can't put the entity into a partially-broken but still
existing state; instead, server failures just might cause garbage grant records
which are safe/expected but need to be cleaned up in a background task somewhere.

Fix the atomicity of name-collision-checks in createCatalog and createPrincipal.

With these changes, the behavior of AtomicOperationMetaStoreManager should be
fully "safe"/correct, with only the following suboptimal scenarios:

-If the server fails during CreateCatalog, it's possible for a
partially-initialized catalog (e.g. lacking catalog_admin and grants) to exist;
this should just be deleted and the creation should be retried in such a case.

-If grants on parents of an entity (e.g. namespaces, catalogs) are changed
in the middle of an operation on the child entity, it's possible for
happens-before semantics to be violated up to the duration of a single
in-flight request if grants are not cached, and up to the duration of
cache expiration in the worse case if the server crashes after updating
grants but before updating grantRecordVersions on affected entities.

-If a parent entity (e.g. namespace, catalog) is deleted in the middle
of creating or updating a child entity, the create/update may still
return "success" despite the parent being deleted, which effectively
deletes the child entity as well.

-If a new entity is being created under an empty namespace/catalog at
the same time the parent namespace/catalog is being deleted, despite
the semantic of preventing deletion of non-empty containers, the
creation may succeed *and* the deletion of the parent may succeed,
effectively deleting the child entity as well; this can only happen
within the concurrency window of a single request (e.g. happening
within milliseconds of each other).

-In general, in the face of server failures or concurrency collisions,
grantRecords may be stale up to the lifetime of EntityCache elements.

-In general, whether or not there are server failures or concurrency
collisions, the grantsOnSecurable and grantsToGrantee of a given
entity may not be evaluated at the same effective SNAPSHOT_READ
point in time, but this shouldn't be a problem because the securable
side of grants can already be modified independently from the
grantee side of grants.

* Step 1 of inverting the semantics again so that we only define "InCurrentTxn" versions of methods
in TransactionalPersistence.

The fact that this unit compiles before we've renamed doFooAtomically back to doFoo helps
prove that we didn't leave anything accidentally behind to call the atomic versions inside
a transaction, because as of this moment there is not unqualified method name doFoo anymore;
only either doFooInCurrentTxn or doFooAtomically. Committing this as a separate unit to
have the intermediate proof-of-correctness.

* Step 2 of inverting the method names; remove the "Atomically" suffix from all method
names; now all the unqualified methods in BasePersistence are implicitly supposed
to behave "atomically". All the methods that expect to be run inside a transaction
that's managed by the outer caller will have the "InCurrentTxn" suffix.

* Update javadoc for writeEntities per suggestion from offline discussion

* From offline discussion, make PolarisCallContext just return a BasePersistence
instead of a TransactionalPersistence. Just use blind casts for now in
PolarisMetaStoreManagerImpl; refactoring can be done later to better define
how the Persistence impl is injected in a more typesafe way for compatible
PolarisMetaStoreManager impls.

* Restore application[-it].properties to default in-memory impl for now instead
of using the atomic one.
  • Loading branch information
dennishuo authored Mar 11, 2025
1 parent 9483de5 commit b6ebbaf
Show file tree
Hide file tree
Showing 18 changed files with 3,228 additions and 446 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,57 +247,58 @@ public void runActionInReadTransaction(
* @return new unique entity identifier
*/
@Override
public long generateNewId(@Nonnull PolarisCallContext callCtx) {
public long generateNewIdInCurrentTxn(@Nonnull PolarisCallContext callCtx) {
// This function can be called within a transaction or out of transaction.
// If called out of transaction, create a new transaction, otherwise run in current transaction
return localSession.get() != null
? this.store.getNextSequence(localSession.get())
: runInReadTransaction(callCtx, () -> generateNewId(callCtx));
: runInReadTransaction(callCtx, () -> generateNewIdInCurrentTxn(callCtx));
}

/** {@inheritDoc} */
@Override
public void writeToEntities(
public void writeToEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) {
this.store.writeToEntities(localSession.get(), entity);
}

/** {@inheritDoc} */
@Override
public <T extends PolarisStorageConfigurationInfo> void persistStorageIntegrationIfNeeded(
@Nonnull PolarisCallContext callContext,
@Nonnull PolarisBaseEntity entity,
@Nullable PolarisStorageIntegration<T> storageIntegration) {
public <T extends PolarisStorageConfigurationInfo>
void persistStorageIntegrationIfNeededInCurrentTxn(
@Nonnull PolarisCallContext callContext,
@Nonnull PolarisBaseEntity entity,
@Nullable PolarisStorageIntegration<T> storageIntegration) {
// not implemented for eclipselink store
}

/** {@inheritDoc} */
@Override
public void writeToEntitiesActive(
public void writeToEntitiesActiveInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) {
// write it
this.store.writeToEntitiesActive(localSession.get(), entity);
}

/** {@inheritDoc} */
@Override
public void writeToEntitiesChangeTracking(
public void writeToEntitiesChangeTrackingInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) {
// write it
this.store.writeToEntitiesChangeTracking(localSession.get(), entity);
}

/** {@inheritDoc} */
@Override
public void writeToGrantRecords(
public void writeToGrantRecordsInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisGrantRecord grantRec) {
// write it
this.store.writeToGrantRecords(localSession.get(), grantRec);
}

/** {@inheritDoc} */
@Override
public void deleteFromEntities(
public void deleteFromEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisEntityCore entity) {

// delete it
Expand All @@ -306,7 +307,7 @@ public void deleteFromEntities(

/** {@inheritDoc} */
@Override
public void deleteFromEntitiesActive(
public void deleteFromEntitiesActiveInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisEntityCore entity) {
// delete it
this.store.deleteFromEntitiesActive(localSession.get(), new PolarisEntitiesActiveKey(entity));
Expand All @@ -319,22 +320,22 @@ public void deleteFromEntitiesActive(
* @param entity entity record to delete
*/
@Override
public void deleteFromEntitiesChangeTracking(
public void deleteFromEntitiesChangeTrackingInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisEntityCore entity) {
// delete it
this.store.deleteFromEntitiesChangeTracking(localSession.get(), entity);
}

/** {@inheritDoc} */
@Override
public void deleteFromGrantRecords(
public void deleteFromGrantRecordsInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisGrantRecord grantRec) {
this.store.deleteFromGrantRecords(localSession.get(), grantRec);
}

/** {@inheritDoc} */
@Override
public void deleteAllEntityGrantRecords(
public void deleteAllEntityGrantRecordsInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
@Nonnull PolarisEntityCore entity,
@Nonnull List<PolarisGrantRecord> grantsOnGrantee,
Expand All @@ -344,19 +345,19 @@ public void deleteAllEntityGrantRecords(

/** {@inheritDoc} */
@Override
public void deleteAll(@Nonnull PolarisCallContext callCtx) {
public void deleteAllInCurrentTxn(@Nonnull PolarisCallContext callCtx) {
this.store.deleteAll(localSession.get());
}

/** {@inheritDoc} */
@Override
public @Nullable PolarisBaseEntity lookupEntity(
public @Nullable PolarisBaseEntity lookupEntityInCurrentTxn(
@Nonnull PolarisCallContext callCtx, long catalogId, long entityId, int typeCode) {
return ModelEntity.toEntity(this.store.lookupEntity(localSession.get(), catalogId, entityId));
}

@Override
public @Nonnull List<PolarisBaseEntity> lookupEntities(
public @Nonnull List<PolarisBaseEntity> lookupEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx, List<PolarisEntityId> entityIds) {
return this.store.lookupEntities(localSession.get(), entityIds).stream()
.map(ModelEntity::toEntity)
Expand All @@ -365,7 +366,7 @@ public void deleteAll(@Nonnull PolarisCallContext callCtx) {

/** {@inheritDoc} */
@Override
public @Nonnull List<PolarisChangeTrackingVersions> lookupEntityVersions(
public @Nonnull List<PolarisChangeTrackingVersions> lookupEntityVersionsInCurrentTxn(
@Nonnull PolarisCallContext callCtx, List<PolarisEntityId> entityIds) {
Map<PolarisEntityId, ModelEntity> idToEntityMap =
this.store.lookupEntities(localSession.get(), entityIds).stream()
Expand All @@ -388,7 +389,7 @@ public void deleteAll(@Nonnull PolarisCallContext callCtx) {
/** {@inheritDoc} */
@Override
@Nullable
public EntityNameLookupRecord lookupEntityActive(
public EntityNameLookupRecord lookupEntityActiveInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisEntitiesActiveKey entityActiveKey) {
// lookup the active entity slice
return ModelEntityActive.toEntityActive(
Expand All @@ -398,34 +399,35 @@ public EntityNameLookupRecord lookupEntityActive(
/** {@inheritDoc} */
@Override
@Nonnull
public List<EntityNameLookupRecord> lookupEntityActiveBatch(
public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
@Nonnull List<PolarisEntitiesActiveKey> entityActiveKeys) {
// now build a list to quickly verify that nothing has changed
return entityActiveKeys.stream()
.map(entityActiveKey -> this.lookupEntityActive(callCtx, entityActiveKey))
.map(entityActiveKey -> this.lookupEntityActiveInCurrentTxn(callCtx, entityActiveKey))
.collect(Collectors.toList());
}

/** {@inheritDoc} */
@Override
public @Nonnull List<EntityNameLookupRecord> listEntities(
public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType) {
return listEntities(callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue());
return this.listEntitiesInCurrentTxn(
callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue());
}

@Override
public @Nonnull List<EntityNameLookupRecord> listEntities(
public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
@Nonnull Predicate<PolarisBaseEntity> entityFilter) {
// full range scan under the parent for that type
return listEntities(
return this.listEntitiesInCurrentTxn(
callCtx,
catalogId,
parentId,
Expand All @@ -443,7 +445,7 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatch(
}

@Override
public @Nonnull <T> List<T> listEntities(
public @Nonnull <T> List<T> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
Expand All @@ -464,7 +466,7 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatch(

/** {@inheritDoc} */
@Override
public boolean hasChildren(
public boolean hasChildrenInCurrentTxn(
@Nonnull PolarisCallContext callContext,
@Nullable PolarisEntityType entityType,
long catalogId,
Expand All @@ -476,7 +478,7 @@ public boolean hasChildren(

/** {@inheritDoc} */
@Override
public int lookupEntityGrantRecordsVersion(
public int lookupEntityGrantRecordsVersionInCurrentTxn(
@Nonnull PolarisCallContext callCtx, long catalogId, long entityId) {
ModelEntityChangeTracking entity =
this.store.lookupEntityChangeTracking(localSession.get(), catalogId, entityId);
Expand All @@ -487,7 +489,7 @@ public int lookupEntityGrantRecordsVersion(

/** {@inheritDoc} */
@Override
public @Nullable PolarisGrantRecord lookupGrantRecord(
public @Nullable PolarisGrantRecord lookupGrantRecordInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long securableCatalogId,
long securableId,
Expand All @@ -507,7 +509,7 @@ public int lookupEntityGrantRecordsVersion(

/** {@inheritDoc} */
@Override
public @Nonnull List<PolarisGrantRecord> loadAllGrantRecordsOnSecurable(
public @Nonnull List<PolarisGrantRecord> loadAllGrantRecordsOnSecurableInCurrentTxn(
@Nonnull PolarisCallContext callCtx, long securableCatalogId, long securableId) {
// now fetch all grants for this securable
return this.store
Expand All @@ -519,7 +521,7 @@ public int lookupEntityGrantRecordsVersion(

/** {@inheritDoc} */
@Override
public @Nonnull List<PolarisGrantRecord> loadAllGrantRecordsOnGrantee(
public @Nonnull List<PolarisGrantRecord> loadAllGrantRecordsOnGranteeInCurrentTxn(
@Nonnull PolarisCallContext callCtx, long granteeCatalogId, long granteeId) {
// now fetch all grants assigned to this grantee
return this.store
Expand All @@ -531,15 +533,15 @@ public int lookupEntityGrantRecordsVersion(

/** {@inheritDoc} */
@Override
public @Nullable PolarisPrincipalSecrets loadPrincipalSecrets(
public @Nullable PolarisPrincipalSecrets loadPrincipalSecretsInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull String clientId) {
return ModelPrincipalSecrets.toPrincipalSecrets(
this.store.lookupPrincipalSecrets(localSession.get(), clientId));
}

/** {@inheritDoc} */
@Override
public @Nonnull PolarisPrincipalSecrets generateNewPrincipalSecrets(
public @Nonnull PolarisPrincipalSecrets generateNewPrincipalSecretsInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull String principalName, long principalId) {
// ensure principal client id is unique
PolarisPrincipalSecrets principalSecrets;
Expand All @@ -563,7 +565,7 @@ public int lookupEntityGrantRecordsVersion(

/** {@inheritDoc} */
@Override
public @Nonnull PolarisPrincipalSecrets rotatePrincipalSecrets(
public @Nonnull PolarisPrincipalSecrets rotatePrincipalSecretsInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
@Nonnull String clientId,
long principalId,
Expand Down Expand Up @@ -610,7 +612,7 @@ public int lookupEntityGrantRecordsVersion(

/** {@inheritDoc} */
@Override
public void deletePrincipalSecrets(
public void deletePrincipalSecretsInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull String clientId, long principalId) {
// load the existing secrets
ModelPrincipalSecrets principalSecrets =
Expand Down Expand Up @@ -643,7 +645,7 @@ public void deletePrincipalSecrets(
/** {@inheritDoc} */
@Override
public @Nullable <T extends PolarisStorageConfigurationInfo>
PolarisStorageIntegration<T> createStorageIntegration(
PolarisStorageIntegration<T> createStorageIntegrationInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long entityId,
Expand All @@ -655,7 +657,7 @@ PolarisStorageIntegration<T> createStorageIntegration(
/** {@inheritDoc} */
@Override
public @Nullable <T extends PolarisStorageConfigurationInfo>
PolarisStorageIntegration<T> loadPolarisStorageIntegration(
PolarisStorageIntegration<T> loadPolarisStorageIntegrationInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) {
PolarisStorageConfigurationInfo storageConfig =
BaseMetaStoreManager.extractStorageConfiguration(callCtx, entity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import jakarta.annotation.Nonnull;
import java.time.Clock;
import java.time.ZoneId;
import org.apache.polaris.core.persistence.transactional.TransactionalPersistence;
import org.apache.polaris.core.persistence.BasePersistence;

/**
* The Call context is allocated each time a new REST request is processed. It contains instances of
Expand All @@ -30,7 +30,7 @@
public class PolarisCallContext {

// meta store which is used to persist Polaris entity metadata
private final TransactionalPersistence metaStore;
private final BasePersistence metaStore;

// diag services
private final PolarisDiagnostics diagServices;
Expand All @@ -40,7 +40,7 @@ public class PolarisCallContext {
private final Clock clock;

public PolarisCallContext(
@Nonnull TransactionalPersistence metaStore,
@Nonnull BasePersistence metaStore,
@Nonnull PolarisDiagnostics diagServices,
@Nonnull PolarisConfigurationStore configurationStore,
@Nonnull Clock clock) {
Expand All @@ -51,7 +51,7 @@ public PolarisCallContext(
}

public PolarisCallContext(
@Nonnull TransactionalPersistence metaStore, @Nonnull PolarisDiagnostics diagServices) {
@Nonnull BasePersistence metaStore, @Nonnull PolarisDiagnostics diagServices) {
this.metaStore = metaStore;
this.diagServices = diagServices;
this.configurationStore = new PolarisConfigurationStore() {};
Expand All @@ -66,7 +66,7 @@ public static PolarisCallContext copyOf(PolarisCallContext original) {
original.getClock());
}

public TransactionalPersistence getMetaStore() {
public BasePersistence getMetaStore() {
return metaStore;
}

Expand Down
Loading

0 comments on commit b6ebbaf

Please sign in to comment.