Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ protected TransactionalPersistence createMetaStoreSession(
@Nullable RootCredentialsSet rootCredentialsSet,
@Nonnull PolarisDiagnostics diagnostics) {
return new PolarisEclipseLinkMetaStoreSessionImpl(
diagnostics,
store,
storageIntegrationProvider,
realmContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.EntityNameLookupRecord;
import org.apache.polaris.core.entity.LocationBasedEntity;
Expand Down Expand Up @@ -106,12 +107,14 @@ public class PolarisEclipseLinkMetaStoreSessionImpl extends AbstractTransactiona
* @param persistenceUnitName Optional persistence-unit name in confFile. Default to 'polaris'.
*/
public PolarisEclipseLinkMetaStoreSessionImpl(
@Nonnull PolarisDiagnostics diagnostics,
@Nonnull PolarisEclipseLinkStore store,
@Nonnull PolarisStorageIntegrationProvider storageIntegrationProvider,
@Nonnull RealmContext realmContext,
@Nullable String confFile,
@Nullable String persistenceUnitName,
@Nonnull PrincipalSecretsGenerator secretsGenerator) {
super(diagnostics);
LOGGER.debug(
"Creating EclipseLink Meta Store Session for realm {}", realmContext.getRealmIdentifier());
emf = createEntityManagerFactory(realmContext, confFile, persistenceUnitName);
Expand Down Expand Up @@ -159,7 +162,7 @@ static void clearEntityManagerFactories() {
@Override
public <T> T runInTransaction(
@Nonnull PolarisCallContext callCtx, @Nonnull Supplier<T> transactionCode) {
callCtx.getDiagServices().check(localSession.get() == null, "cannot nest transaction");
getDiagnostics().check(localSession.get() == null, "cannot nest transaction");

try (EntityManager session = emf.createEntityManager()) {
localSession.set(session);
Expand Down Expand Up @@ -206,7 +209,7 @@ public <T> T runInTransaction(
@Override
public void runActionInTransaction(
@Nonnull PolarisCallContext callCtx, @Nonnull Runnable transactionCode) {
callCtx.getDiagServices().check(localSession.get() == null, "cannot nest transaction");
getDiagnostics().check(localSession.get() == null, "cannot nest transaction");

try (EntityManager session = emf.createEntityManager()) {
localSession.set(session);
Expand Down Expand Up @@ -560,8 +563,7 @@ public int lookupEntityGrantRecordsVersionInCurrentTxn(
this.store.lookupPrincipalSecrets(localSession.get(), clientId));

// should be found
callCtx
.getDiagServices()
getDiagnostics()
.checkNotNull(
principalSecrets,
"cannot_find_secrets",
Expand All @@ -570,8 +572,7 @@ public int lookupEntityGrantRecordsVersionInCurrentTxn(
principalId);

// ensure principal id is matching
callCtx
.getDiagServices()
getDiagnostics()
.check(
principalId == principalSecrets.getPrincipalId(),
"principal_id_mismatch",
Expand Down Expand Up @@ -601,8 +602,7 @@ public void deletePrincipalSecretsInCurrentTxn(
this.store.lookupPrincipalSecrets(localSession.get(), clientId);

// should be found
callCtx
.getDiagServices()
getDiagnostics()
.checkNotNull(
principalSecrets,
"cannot_find_secrets",
Expand All @@ -611,8 +611,7 @@ public void deletePrincipalSecretsInCurrentTxn(
principalId);

// ensure principal id is matching
callCtx
.getDiagServices()
getDiagnostics()
.check(
principalId == principalSecrets.getPrincipalId(),
"principal_id_mismatch",
Expand Down Expand Up @@ -642,7 +641,7 @@ PolarisStorageIntegration<T> createStorageIntegrationInCurrentTxn(
PolarisStorageIntegration<T> loadPolarisStorageIntegrationInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) {
PolarisStorageConfigurationInfo storageConfig =
BaseMetaStoreManager.extractStorageConfiguration(callCtx.getDiagServices(), entity);
BaseMetaStoreManager.extractStorageConfiguration(getDiagnostics(), entity);
return storageIntegrationProvider.getStorageIntegrationForConfig(storageConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
RealmContext realmContext = () -> "realm";
PolarisEclipseLinkMetaStoreSessionImpl session =
new PolarisEclipseLinkMetaStoreSessionImpl(
store, Mockito.mock(), realmContext, null, "polaris", RANDOM_SECRETS);
diagServices, store, Mockito.mock(), realmContext, null, "polaris", RANDOM_SECRETS);
TransactionalMetaStoreManagerImpl metaStoreManager =
new TransactionalMetaStoreManagerImpl(clock, diagServices);
PolarisCallContext callCtx = new PolarisCallContext(realmContext, session, diagServices);
Expand All @@ -104,7 +104,13 @@ void testCreateStoreSession(String confFile, boolean success) {
try {
var session =
new PolarisEclipseLinkMetaStoreSessionImpl(
store, Mockito.mock(), () -> "realm", confFile, "polaris", RANDOM_SECRETS);
diagServices,
store,
Mockito.mock(),
() -> "realm",
confFile,
"polaris",
RANDOM_SECRETS);
assertNotNull(session);
assertTrue(success);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.entity.EntityNameLookupRecord;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisChangeTrackingVersions;
Expand All @@ -51,6 +52,17 @@
* the BasePersistence methods in terms of lower-level methods that subclasses must implement.
*/
public abstract class AbstractTransactionalPersistence implements TransactionalPersistence {

private final PolarisDiagnostics diagnostics;

protected AbstractTransactionalPersistence(PolarisDiagnostics diagnostics) {
this.diagnostics = diagnostics;
}

protected PolarisDiagnostics getDiagnostics() {
return diagnostics;
}

//
// New abstract methods specific to this slice-based transactional persistence that subclasses
// must implement to inherit implementations of lookup/write/delete
Expand Down Expand Up @@ -210,8 +222,7 @@ public void writeEntities(
@Nonnull List<PolarisBaseEntity> entities,
@Nullable List<PolarisBaseEntity> originalEntities) {
if (originalEntities != null) {
callCtx
.getDiagServices()
getDiagnostics()
.check(
entities.size() == originalEntities.size(),
"mismatched_entities_and_original_entities_size",
Expand Down Expand Up @@ -580,8 +591,7 @@ public void writeEntitiesInCurrentTxn(
@Nonnull List<PolarisBaseEntity> entities,
@Nullable List<PolarisBaseEntity> originalEntities) {
if (originalEntities != null) {
callCtx
.getDiagServices()
getDiagnostics()
.check(
entities.size() == originalEntities.size(),
"mismatched_entities_and_original_entities_size",
Expand Down Expand Up @@ -643,8 +653,7 @@ public PolarisBaseEntity lookupEntityByNameInCurrentTxn(
entityActiveRecord.getCatalogId(),
entityActiveRecord.getId(),
entityActiveRecord.getTypeCode());
callCtx
.getDiagServices()
getDiagnostics()
.checkNotNull(
entity, "unexpected_not_found_entity", "entityActiveRecord={}", entityActiveRecord);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.entity.EntityNameLookupRecord;
import org.apache.polaris.core.entity.LocationBasedEntity;
import org.apache.polaris.core.entity.PolarisBaseEntity;
Expand Down Expand Up @@ -62,11 +63,11 @@ public class TreeMapTransactionalPersistenceImpl extends AbstractTransactionalPe
private final PrincipalSecretsGenerator secretsGenerator;

public TreeMapTransactionalPersistenceImpl(
@Nonnull PolarisDiagnostics diagnostics,
@Nonnull TreeMapMetaStore store,
@Nonnull PolarisStorageIntegrationProvider storageIntegrationProvider,
@Nonnull PrincipalSecretsGenerator secretsGenerator) {

// init store
super(diagnostics);
this.store = store;
this.storageIntegrationProvider = storageIntegrationProvider;
this.secretsGenerator = secretsGenerator;
Expand All @@ -78,7 +79,7 @@ public <T> T runInTransaction(
@Nonnull PolarisCallContext callCtx, @Nonnull Supplier<T> transactionCode) {

// run transaction on our underlying store
return store.runInTransaction(callCtx.getDiagServices(), transactionCode);
return store.runInTransaction(getDiagnostics(), transactionCode);
}

/** {@inheritDoc} */
Expand All @@ -87,15 +88,15 @@ public void runActionInTransaction(
@Nonnull PolarisCallContext callCtx, @Nonnull Runnable transactionCode) {

// run transaction on our underlying store
store.runActionInTransaction(callCtx.getDiagServices(), transactionCode);
store.runActionInTransaction(getDiagnostics(), transactionCode);
}

/** {@inheritDoc} */
@Override
public <T> T runInReadTransaction(
@Nonnull PolarisCallContext callCtx, @Nonnull Supplier<T> transactionCode) {
// run transaction on our underlying store
return store.runInReadTransaction(callCtx.getDiagServices(), transactionCode);
return store.runInReadTransaction(getDiagnostics(), transactionCode);
}

/** {@inheritDoc} */
Expand All @@ -104,7 +105,7 @@ public void runActionInReadTransaction(
@Nonnull PolarisCallContext callCtx, @Nonnull Runnable transactionCode) {

// run transaction on our underlying store
store.runActionInReadTransaction(callCtx.getDiagServices(), transactionCode);
store.runActionInReadTransaction(getDiagnostics(), transactionCode);
}

/**
Expand Down Expand Up @@ -462,8 +463,7 @@ public int lookupEntityGrantRecordsVersionInCurrentTxn(
PolarisPrincipalSecrets principalSecrets = this.store.getSlicePrincipalSecrets().read(clientId);

// should be found
callCtx
.getDiagServices()
getDiagnostics()
.checkNotNull(
principalSecrets,
"cannot_find_secrets",
Expand All @@ -472,8 +472,7 @@ public int lookupEntityGrantRecordsVersionInCurrentTxn(
principalId);

// ensure principal id is matching
callCtx
.getDiagServices()
getDiagnostics()
.check(
principalId == principalSecrets.getPrincipalId(),
"principal_id_mismatch",
Expand Down Expand Up @@ -502,8 +501,7 @@ public void deletePrincipalSecretsInCurrentTxn(
PolarisPrincipalSecrets principalSecrets = this.store.getSlicePrincipalSecrets().read(clientId);

// should be found
callCtx
.getDiagServices()
getDiagnostics()
.checkNotNull(
principalSecrets,
"cannot_find_secrets",
Expand All @@ -512,8 +510,7 @@ public void deletePrincipalSecretsInCurrentTxn(
principalId);

// ensure principal id is matching
callCtx
.getDiagServices()
getDiagnostics()
.check(
principalId == principalSecrets.getPrincipalId(),
"principal_id_mismatch",
Expand Down Expand Up @@ -543,7 +540,7 @@ PolarisStorageIntegration<T> createStorageIntegrationInCurrentTxn(
PolarisStorageIntegration<T> loadPolarisStorageIntegrationInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) {
PolarisStorageConfigurationInfo storageConfig =
BaseMetaStoreManager.extractStorageConfiguration(callCtx.getDiagServices(), entity);
BaseMetaStoreManager.extractStorageConfiguration(getDiagnostics(), entity);
return storageIntegrationProvider.getStorageIntegrationForConfig(storageConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ public class PolarisTreeMapAtomicOperationMetaStoreManagerTest
public PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl();
TreeMapMetaStore store = new TreeMapMetaStore(diagServices);
TreeMapTransactionalPersistenceImpl metaStore =
new TreeMapTransactionalPersistenceImpl(
diagServices, store, Mockito.mock(), RANDOM_SECRETS);
AtomicOperationMetaStoreManager metaStoreManager = new AtomicOperationMetaStoreManager(clock);
PolarisCallContext callCtx =
new PolarisCallContext(
() -> "testRealm",
new TreeMapTransactionalPersistenceImpl(store, Mockito.mock(), RANDOM_SECRETS),
diagServices);
PolarisCallContext callCtx = new PolarisCallContext(() -> "testRealm", metaStore, diagServices);
return new PolarisTestMetaStoreManager(metaStoreManager, callCtx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@ public class PolarisTreeMapMetaStoreManagerTest extends BasePolarisMetaStoreMana
public PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl();
TreeMapMetaStore store = new TreeMapMetaStore(diagServices);
TreeMapTransactionalPersistenceImpl metaStore =
new TreeMapTransactionalPersistenceImpl(
diagServices, store, Mockito.mock(), RANDOM_SECRETS);
TransactionalMetaStoreManagerImpl metaStoreManager =
new TransactionalMetaStoreManagerImpl(clock, diagServices);
PolarisCallContext callCtx =
new PolarisCallContext(
() -> "testRealm",
new TreeMapTransactionalPersistenceImpl(store, Mockito.mock(), RANDOM_SECRETS),
diagServices);
PolarisCallContext callCtx = new PolarisCallContext(() -> "testRealm", metaStore, diagServices);
return new PolarisTestMetaStoreManager(metaStoreManager, callCtx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ protected PolarisCallContext callCtx() {
if (callCtx == null) {
TreeMapMetaStore store = new TreeMapMetaStore(diagServices);
TreeMapTransactionalPersistenceImpl metaStore =
new TreeMapTransactionalPersistenceImpl(store, Mockito.mock(), RANDOM_SECRETS);
new TreeMapTransactionalPersistenceImpl(
diagServices, store, Mockito.mock(), RANDOM_SECRETS);
callCtx = new PolarisCallContext(() -> "testRealm", metaStore, diagServices);
}
return callCtx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public InMemoryEntityCacheTest() {
diagServices = new PolarisDefaultDiagServiceImpl();
TreeMapMetaStore store = new TreeMapMetaStore(diagServices);
TransactionalPersistence metaStore =
new TreeMapTransactionalPersistenceImpl(store, Mockito.mock(), RANDOM_SECRETS);
new TreeMapTransactionalPersistenceImpl(
diagServices, store, Mockito.mock(), RANDOM_SECRETS);
metaStoreManager = new TransactionalMetaStoreManagerImpl(Clock.systemUTC(), diagServices);
callCtx = new PolarisCallContext(() -> "testRealm", metaStore, diagServices);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public StorageCredentialCacheTest() {
TreeMapMetaStore store = new TreeMapMetaStore(diagServices);
// to interact with the metastore
TransactionalPersistence metaStore =
new TreeMapTransactionalPersistenceImpl(store, Mockito.mock(), RANDOM_SECRETS);
new TreeMapTransactionalPersistenceImpl(
diagServices, store, Mockito.mock(), RANDOM_SECRETS);
callCtx = new PolarisCallContext(() -> "testRealm", metaStore, diagServices);
storageCredentialCacheConfig = () -> 10_000;
metaStoreManager = Mockito.mock(PolarisMetaStoreManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected TransactionalPersistence createMetaStoreSession(
@Nullable RootCredentialsSet rootCredentialsSet,
@Nonnull PolarisDiagnostics diagnostics) {
return new TreeMapTransactionalPersistenceImpl(
store, storageIntegration, secretsGenerator(realmContext, rootCredentialsSet));
diagnostics, store, storageIntegration, secretsGenerator(realmContext, rootCredentialsSet));
}

@Override
Expand Down