Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions runtime/service/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -129,22 +129,23 @@ dependencies {
testImplementation("io.quarkus:quarkus-junit5-mockito")
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("io.quarkus:quarkus-rest-client-jackson")
testImplementation("io.quarkus:quarkus-jdbc-h2")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some cleanup in this file which had a few duplicate dependencies.


testImplementation("io.rest-assured:rest-assured")

testImplementation(libs.localstack)
testImplementation("org.testcontainers:testcontainers")

testImplementation(project(":polaris-runtime-test-common"))
testImplementation(project(":polaris-container-spec-helper"))

testImplementation(libs.threeten.extra)
testImplementation(libs.hawkular.agent.prometheus.scraper)

testImplementation(project(":polaris-runtime-test-common"))

testImplementation("io.quarkus:quarkus-junit5")
testImplementation(libs.awaitility)

testImplementation(platform(libs.testcontainers.bom))
testImplementation("org.testcontainers:testcontainers")
testImplementation("org.testcontainers:postgresql")
testImplementation("org.postgresql:postgresql")

testFixturesImplementation(project(":polaris-core"))
testFixturesImplementation(project(":polaris-api-management-model"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ void shutdown() {

@Nullable
@Override
String getRequestId() {
protected String getRequestId() {
if (containerRequestContext != null && containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
}
return null;
}

@Override
void processEvent(PolarisEvent polarisEvent) {
protected void processEvent(PolarisEvent polarisEvent) {
String realmId = callContext.getRealmContext().getRealmIdentifier();

ConcurrentLinkedQueueWithApproximateSize<PolarisEvent> realmQueue =
Expand Down Expand Up @@ -192,7 +192,7 @@ void checkAndFlushBufferIfNecessary(String realmId, boolean forceFlush) {
}

@Override
ContextSpecificInformation getContextSpecificInformation() {
protected ContextSpecificInformation getContextSpecificInformation() {
return new ContextSpecificInformation(
clock.millis(),
securityContext.getUserPrincipal() == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ public void onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent ev
processEvent(polarisEvent);
}

protected record ContextSpecificInformation(long timestamp, @Nullable String principalName) {}
public record ContextSpecificInformation(long timestamp, @Nullable String principalName) {}

abstract ContextSpecificInformation getContextSpecificInformation();
protected abstract ContextSpecificInformation getContextSpecificInformation();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious as to why these changes were made?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because otherwise you cannot access this method from outside this package.

Generally speaking, since this class is meant to be subclassed, even by implementors outside Polaris, it's not recommended to use package-private members.


@Nullable
abstract String getRequestId();
protected abstract String getRequestId();

abstract void processEvent(PolarisEvent event);
protected abstract void processEvent(PolarisEvent event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.service.events.listeners.inmemory;

import static org.apache.polaris.service.logging.LoggingMDCFilter.REQUEST_ID_KEY;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.google.common.annotations.VisibleForTesting;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import jakarta.annotation.Nullable;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.container.ContainerRequestContext;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.SecurityContext;
import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.PolarisEvent;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.service.events.listeners.InMemoryBufferEventListenerConfiguration;
import org.apache.polaris.service.events.listeners.PolarisPersistenceEventListener;
import org.eclipse.microprofile.faulttolerance.Fallback;
import org.eclipse.microprofile.faulttolerance.Retry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
@Identifier("persistence-in-memory")
public class InMemoryEventListener extends PolarisPersistenceEventListener {

private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryEventListener.class);

@Inject CallContext callContext;
@Inject Clock clock;
@Inject MetaStoreManagerFactory metaStoreManagerFactory;
@Inject InMemoryBufferEventListenerConfiguration configuration;

@Context SecurityContext securityContext;
@Context ContainerRequestContext requestContext;

@VisibleForTesting
final LoadingCache<String, UnicastProcessor<PolarisEvent>> processors =
Caffeine.newBuilder()
.expireAfterAccess(Duration.ofHours(1))
.evictionListener(
(String realmId, UnicastProcessor<?> processor, RemovalCause cause) ->
processor.onComplete())
.build(this::createProcessor);

@Override
protected void processEvent(PolarisEvent event) {
var realmId = callContext.getRealmContext().getRealmIdentifier();
processEvent(realmId, event);
}

protected void processEvent(String realmId, PolarisEvent event) {
var processor = Objects.requireNonNull(processors.get(realmId));
processor.onNext(event);
}

@Override
protected ContextSpecificInformation getContextSpecificInformation() {
var principal = securityContext.getUserPrincipal();
var principalName = principal == null ? null : principal.getName();
return new ContextSpecificInformation(clock.millis(), principalName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be this is not in the scope of the pr, but do we know why just principal and not the activated roles ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is on me, I forgot to put it in the original PR - I will enhance this in the future to include activated roles.

}

@Nullable
@Override
protected String getRequestId() {
return (String) requestContext.getProperty(REQUEST_ID_KEY);
}

@PreDestroy
public void shutdown() {
processors.asMap().values().forEach(UnicastProcessor::onComplete);
processors.invalidateAll(); // doesn't call the eviction listener
}

protected UnicastProcessor<PolarisEvent> createProcessor(String realmId) {
UnicastProcessor<PolarisEvent> processor = UnicastProcessor.create();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe that UnicastProcessor is thread-safe for multiple producers by default. However, if we pass in a ConcurrentLinkedQueue during create, we may be able to make it become thread-safe. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good question! But in fact, it is safe for multiple producers as the onNext method is synchronized. No need to use a MPSC queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW: the tests are testing this. If you look at the sendAsync method, it fires events from multiple threads.

processor
.emitOn(Infrastructure.getDefaultWorkerPool())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question on this: is this creating a separate execution service? Is it possible for this to become a bottleneck in terms of how fast events can be ingested?

It it be better or worse to have the threads here managed by Quarkus instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a Quarkus platform, Infrastructure.getDefaultWorkerPool() always returns Quarkus default worker pool. So this method is not creating a separate execution service – which is why we don't need to care about shutting it down either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to configure this workerPool ? lets say if i my one instance is handling a lot of realms ?

Copy link
Contributor Author

@adutra adutra Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's configurable through the quarkus.thread-pool.* options:

https://quarkus.io/guides/all-config#quarkus-core_quarkus-thread-pool-core-threads

The core thread pool is responsible for running all blocking tasks in Quarkus. I think this is appropriate for this use case. Note: this thread pool does NOT handle HTTP requests.

If this becomes problematic one day, the solution would be to declare a separate Executor CDI bean and inject it here.

.group()
.intoLists()
.of(configuration.maxBufferSize(), configuration.bufferTime())
.subscribe()
.with(events -> flush(realmId, events), error -> onProcessorError(realmId, error));
return processor;
}

@Retry(maxRetries = 5, delay = 1000, jitter = 100)
@Fallback(fallbackMethod = "onFlushError")
protected void flush(String realmId, List<PolarisEvent> events) {
RealmContext realmContext = () -> realmId;
var metaStoreManager = metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
var basePersistence = metaStoreManagerFactory.getOrCreateSession(realmContext);
var callContext = new PolarisCallContext(realmContext, basePersistence);
metaStoreManager.writeEvents(callContext, events);
}

@SuppressWarnings("unused")
protected void onFlushError(String realmId, List<PolarisEvent> events, Throwable error) {
LOGGER.error("Failed to persist {} events for realm '{}'", events.size(), realmId, error);
}

protected void onProcessorError(String realmId, Throwable error) {
LOGGER.error(
"Unexpected error while processing events for realm '{}'; some events may have been dropped",
realmId,
error);
Comment on lines +135 to +138
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if its just okay to drop events considering they are deemed to be Audit logs ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since the beginning the consistency guarantee has been "at most once", so it's understood that some events may be lost.

processors.invalidate(realmId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.service.events.listeners.inmemory;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;

import com.google.common.collect.ImmutableMap;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.mutiny.subscription.BackPressureFailure;
import java.util.Map;
import org.apache.polaris.core.entity.PolarisEvent;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.mockito.Mockito;

@QuarkusTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@TestProfile(InMemoryEventListenerBufferSizeTest.Profile.class)
class InMemoryEventListenerBufferSizeTest extends InMemoryEventListenerTestBase {

public static class Profile implements QuarkusTestProfile {

@Override
public Map<String, String> getConfigOverrides() {
return ImmutableMap.<String, String>builder()
.putAll(BASE_CONFIG)
.put("polaris.event-listener.persistence-in-memory-buffer.buffer-time", "60s")
.put("polaris.event-listener.persistence-in-memory-buffer.max-buffer-size", "10")
.build();
}
}

@Test
void testFlushOnSize() {
sendAsync("test1", 10);
sendAsync("test2", 10);
assertRows("test1", 10);
assertRows("test2", 10);
}

@Test
void testFlushOnShutdown() {
producer.processEvent("test1", event());
producer.processEvent("test2", event());
producer.shutdown();
assertRows("test1", 1);
assertRows("test2", 1);
}

@Test
void testFlushFailureRecovery() {
var manager = Mockito.mock(PolarisMetaStoreManager.class);
doReturn(manager).when(metaStoreManagerFactory).getOrCreateMetaStoreManager(any());
RuntimeException error = new RuntimeException("error");
doThrow(error)
.doThrow(error) // first batch will give up after 2 attempts
.doThrow(error)
.doCallRealMethod() // second batch will succeed on the 2nd attempt
.when(manager)
.writeEvents(any(), any());
sendAsync("test1", 20);
assertRows("test1", 10);
}

@Test
void testProcessorFailureRecovery() {
producer.processEvent("test1", event());
UnicastProcessor<PolarisEvent> test1 = producer.processors.get("test1");
assertThat(test1).isNotNull();
// emulate backpressure error; will drop the event and invalidate the processor
test1.onError(new BackPressureFailure("error"));
// will create a new processor and recover
sendAsync("test1", 10);
assertRows("test1", 10);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.service.events.listeners.inmemory;

import com.google.common.collect.ImmutableMap;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@QuarkusTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@TestProfile(InMemoryEventListenerBufferTimeTest.Profile.class)
class InMemoryEventListenerBufferTimeTest extends InMemoryEventListenerTestBase {

public static class Profile implements QuarkusTestProfile {

@Override
public Map<String, String> getConfigOverrides() {
return ImmutableMap.<String, String>builder()
.putAll(BASE_CONFIG)
.put("polaris.event-listener.persistence-in-memory-buffer.buffer-time", "100ms")
.put("polaris.event-listener.persistence-in-memory-buffer.max-buffer-size", "1000")
.build();
}
}

@Test
void testFlushOnTimeout() {
sendAsync("test1", 5);
sendAsync("test2", 1);
assertRows("test1", 5);
assertRows("test2", 1);
}
}
Loading