Skip to content

Commit c86cd43

Browse files
committed
Introduce alternate in-memory buffering event listener
1 parent e02bb71 commit c86cd43

File tree

5 files changed

+326
-12
lines changed

5 files changed

+326
-12
lines changed

runtime/service/build.gradle.kts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,22 +128,23 @@ dependencies {
128128
testImplementation("io.quarkus:quarkus-junit5-mockito")
129129
testImplementation("io.quarkus:quarkus-rest-client")
130130
testImplementation("io.quarkus:quarkus-rest-client-jackson")
131+
testImplementation("io.quarkus:quarkus-jdbc-h2")
132+
131133
testImplementation("io.rest-assured:rest-assured")
134+
132135
testImplementation(libs.localstack)
133-
testImplementation("org.testcontainers:testcontainers")
136+
137+
testImplementation(project(":polaris-runtime-test-common"))
134138
testImplementation(project(":polaris-container-spec-helper"))
135139

136140
testImplementation(libs.threeten.extra)
137141
testImplementation(libs.hawkular.agent.prometheus.scraper)
138142

139-
testImplementation(project(":polaris-runtime-test-common"))
140-
141-
testImplementation("io.quarkus:quarkus-junit5")
142143
testImplementation(libs.awaitility)
144+
143145
testImplementation(platform(libs.testcontainers.bom))
144146
testImplementation("org.testcontainers:testcontainers")
145147
testImplementation("org.testcontainers:postgresql")
146-
testImplementation("org.postgresql:postgresql")
147148

148149
testFixturesImplementation(project(":polaris-core"))
149150
testFixturesImplementation(project(":polaris-api-management-model"))

runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,15 @@ void shutdown() {
127127

128128
@Nullable
129129
@Override
130-
String getRequestId() {
130+
protected String getRequestId() {
131131
if (containerRequestContext != null && containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
132132
return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
133133
}
134134
return null;
135135
}
136136

137137
@Override
138-
void processEvent(PolarisEvent polarisEvent) {
138+
protected void processEvent(PolarisEvent polarisEvent) {
139139
String realmId = callContext.getRealmContext().getRealmIdentifier();
140140

141141
ConcurrentLinkedQueueWithApproximateSize<PolarisEvent> realmQueue =
@@ -192,7 +192,7 @@ void checkAndFlushBufferIfNecessary(String realmId, boolean forceFlush) {
192192
}
193193

194194
@Override
195-
ContextSpecificInformation getContextSpecificInformation() {
195+
protected ContextSpecificInformation getContextSpecificInformation() {
196196
return new ContextSpecificInformation(
197197
clock.millis(),
198198
securityContext.getUserPrincipal() == null

runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,12 @@ public void onAfterCatalogCreated(AfterCatalogCreatedEvent event) {
112112
processEvent(polarisEvent);
113113
}
114114

115-
protected record ContextSpecificInformation(long timestamp, @Nullable String principalName) {}
115+
public record ContextSpecificInformation(long timestamp, @Nullable String principalName) {}
116116

117-
abstract ContextSpecificInformation getContextSpecificInformation();
117+
protected abstract ContextSpecificInformation getContextSpecificInformation();
118118

119119
@Nullable
120-
abstract String getRequestId();
120+
protected abstract String getRequestId();
121121

122-
abstract void processEvent(PolarisEvent event);
122+
protected abstract void processEvent(PolarisEvent event);
123123
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.service.events.listeners.inmemory;
21+
22+
import static org.apache.polaris.service.logging.LoggingMDCFilter.REQUEST_ID_KEY;
23+
24+
import com.github.benmanes.caffeine.cache.Caffeine;
25+
import com.github.benmanes.caffeine.cache.LoadingCache;
26+
import com.github.benmanes.caffeine.cache.RemovalCause;
27+
import io.smallrye.common.annotation.Identifier;
28+
import io.smallrye.mutiny.infrastructure.Infrastructure;
29+
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
30+
import jakarta.annotation.Nullable;
31+
import jakarta.annotation.PreDestroy;
32+
import jakarta.enterprise.context.ApplicationScoped;
33+
import jakarta.inject.Inject;
34+
import jakarta.ws.rs.container.ContainerRequestContext;
35+
import jakarta.ws.rs.core.Context;
36+
import jakarta.ws.rs.core.SecurityContext;
37+
import java.time.Clock;
38+
import java.time.Duration;
39+
import java.util.List;
40+
import java.util.Objects;
41+
import org.apache.polaris.core.PolarisCallContext;
42+
import org.apache.polaris.core.context.CallContext;
43+
import org.apache.polaris.core.context.RealmContext;
44+
import org.apache.polaris.core.entity.PolarisEvent;
45+
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
46+
import org.apache.polaris.service.events.listeners.InMemoryBufferEventListenerConfiguration;
47+
import org.apache.polaris.service.events.listeners.PolarisPersistenceEventListener;
48+
import org.eclipse.microprofile.faulttolerance.Fallback;
49+
import org.eclipse.microprofile.faulttolerance.Retry;
50+
import org.slf4j.Logger;
51+
import org.slf4j.LoggerFactory;
52+
53+
@ApplicationScoped
54+
@Identifier("persistence-in-memory")
55+
public class InMemoryEventListener extends PolarisPersistenceEventListener {
56+
57+
private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryEventListener.class);
58+
59+
@Inject CallContext callContext;
60+
@Inject Clock clock;
61+
@Inject MetaStoreManagerFactory metaStoreManagerFactory;
62+
@Inject InMemoryBufferEventListenerConfiguration configuration;
63+
64+
@Context SecurityContext securityContext;
65+
@Context ContainerRequestContext requestContext;
66+
67+
private final LoadingCache<String, UnicastProcessor<PolarisEvent>> processors =
68+
Caffeine.newBuilder()
69+
.expireAfterAccess(Duration.ofHours(1))
70+
.evictionListener(
71+
(String realmId, UnicastProcessor<?> processor, RemovalCause cause) ->
72+
processor.onComplete())
73+
.build(this::createProcessor);
74+
75+
@Override
76+
protected void processEvent(PolarisEvent event) {
77+
var realmId = callContext.getRealmContext().getRealmIdentifier();
78+
processEvent(realmId, event);
79+
}
80+
81+
protected void processEvent(String realmId, PolarisEvent event) {
82+
var processor = Objects.requireNonNull(processors.get(realmId));
83+
processor.onNext(event);
84+
}
85+
86+
@Override
87+
protected ContextSpecificInformation getContextSpecificInformation() {
88+
var principal = securityContext.getUserPrincipal();
89+
var principalName = principal == null ? null : principal.getName();
90+
return new ContextSpecificInformation(clock.millis(), principalName);
91+
}
92+
93+
@Nullable
94+
@Override
95+
protected String getRequestId() {
96+
return (String) requestContext.getProperty(REQUEST_ID_KEY);
97+
}
98+
99+
@PreDestroy
100+
public void shutdown() {
101+
processors.asMap().values().forEach(UnicastProcessor::onComplete);
102+
processors.invalidateAll(); // doesn't call the eviction listener
103+
}
104+
105+
protected UnicastProcessor<PolarisEvent> createProcessor(String realmId) {
106+
UnicastProcessor<PolarisEvent> processor = UnicastProcessor.create();
107+
processor
108+
.emitOn(Infrastructure.getDefaultWorkerPool())
109+
.group()
110+
.intoLists()
111+
.of(configuration.maxBufferSize(), configuration.bufferTime())
112+
.subscribe()
113+
.with(events -> flush(realmId, events));
114+
return processor;
115+
}
116+
117+
@Retry(maxRetries = 5, delay = 1000, jitter = 100)
118+
@Fallback(fallbackMethod = "flushFailed")
119+
protected void flush(String realmId, List<PolarisEvent> events) {
120+
RealmContext realmContext = () -> realmId;
121+
var metaStoreManager = metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
122+
var basePersistence = metaStoreManagerFactory.getOrCreateSession(realmContext);
123+
var callContext = new PolarisCallContext(realmContext, basePersistence);
124+
metaStoreManager.writeEvents(callContext, events);
125+
}
126+
127+
@SuppressWarnings("unused")
128+
protected void flushFailed(String realmId, List<PolarisEvent> events, Throwable error) {
129+
LOGGER.error("Failed to persist {} events for realm '{}'", events.size(), realmId, error);
130+
}
131+
}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.service.events.listeners.inmemory;
21+
22+
import static org.apache.polaris.core.entity.PolarisEvent.ResourceType.CATALOG;
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
import static org.awaitility.Awaitility.await;
25+
import static org.mockito.ArgumentMatchers.any;
26+
import static org.mockito.Mockito.doReturn;
27+
import static org.mockito.Mockito.doThrow;
28+
import static org.mockito.Mockito.reset;
29+
30+
import com.google.common.collect.ImmutableMap;
31+
import io.netty.channel.EventLoopGroup;
32+
import io.quarkus.netty.MainEventLoopGroup;
33+
import io.quarkus.test.junit.QuarkusTest;
34+
import io.quarkus.test.junit.QuarkusTestProfile;
35+
import io.quarkus.test.junit.TestProfile;
36+
import io.quarkus.test.junit.mockito.InjectSpy;
37+
import io.smallrye.common.annotation.Identifier;
38+
import jakarta.enterprise.inject.Instance;
39+
import jakarta.inject.Inject;
40+
import java.sql.Connection;
41+
import java.sql.ResultSet;
42+
import java.sql.Statement;
43+
import java.time.Duration;
44+
import java.util.Map;
45+
import java.util.UUID;
46+
import javax.sql.DataSource;
47+
import org.apache.polaris.core.entity.PolarisEvent;
48+
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
49+
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
50+
import org.junit.jupiter.api.AfterEach;
51+
import org.junit.jupiter.api.Test;
52+
import org.junit.jupiter.api.TestInstance;
53+
import org.mockito.Mockito;
54+
55+
@QuarkusTest
56+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
57+
@TestProfile(InMemoryEventProducerTest.Profile.class)
58+
class InMemoryEventProducerTest {
59+
60+
public static class Profile implements QuarkusTestProfile {
61+
62+
@Override
63+
public Map<String, String> getConfigOverrides() {
64+
return ImmutableMap.<String, String>builder()
65+
.put("polaris.realm-context.realms", "test1,test2")
66+
.put("polaris.persistence.type", "relational-jdbc")
67+
.put("polaris.persistence.auto-bootstrap-types", "relational-jdbc")
68+
.put("quarkus.datasource.db-kind", "h2")
69+
.put(
70+
"quarkus.datasource.jdbc.url",
71+
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE")
72+
.put("polaris.event-listener.type", "persistence-in-memory")
73+
.put("polaris.event-listener.persistence-in-memory-buffer.buffer-time", "5s")
74+
.put("polaris.event-listener.persistence-in-memory-buffer.max-buffer-size", "10")
75+
.put(
76+
"quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryEventListener/flush\".retry.max-retries",
77+
"1")
78+
.put(
79+
"quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryEventListener/flush\".retry.delay",
80+
"10")
81+
.build();
82+
}
83+
}
84+
85+
// A delay shorter than the full 5s buffer timeout
86+
private static final Duration SHORT_DELAY = Duration.ofSeconds(4);
87+
88+
// A delay longer than the full 5s buffer timeout
89+
public static final Duration LONG_DELAY = Duration.ofSeconds(8);
90+
91+
@Inject
92+
@Identifier("persistence-in-memory")
93+
InMemoryEventListener producer;
94+
95+
@InjectSpy
96+
@Identifier("relational-jdbc")
97+
@SuppressWarnings("CdiInjectionPointsInspection")
98+
MetaStoreManagerFactory metaStoreManagerFactory;
99+
100+
@Inject
101+
@MainEventLoopGroup
102+
@SuppressWarnings("CdiInjectionPointsInspection")
103+
EventLoopGroup eventLoopGroup;
104+
105+
@Inject Instance<DataSource> dataSource;
106+
107+
@Test
108+
void testFlushOnSize() {
109+
sendAsync("test1", 10);
110+
sendAsync("test2", 10);
111+
assertRows("test1", 10, SHORT_DELAY);
112+
assertRows("test2", 10, SHORT_DELAY);
113+
}
114+
115+
@Test
116+
void testFlushOnTimeout() {
117+
sendAsync("test1", 5);
118+
sendAsync("test2", 5);
119+
assertRows("test1", 5, LONG_DELAY);
120+
assertRows("test2", 5, LONG_DELAY);
121+
}
122+
123+
@Test
124+
void testFlushOnShutdown() {
125+
producer.processEvent("test1", event());
126+
producer.processEvent("test2", event());
127+
producer.shutdown();
128+
assertRows("test1", 1, SHORT_DELAY);
129+
assertRows("test2", 1, SHORT_DELAY);
130+
}
131+
132+
@Test
133+
void testFailureRecovery() {
134+
var manager = Mockito.mock(PolarisMetaStoreManager.class);
135+
doReturn(manager).when(metaStoreManagerFactory).getOrCreateMetaStoreManager(any());
136+
RuntimeException error = new RuntimeException("error");
137+
doThrow(error)
138+
.doThrow(error) // first batch will give up after 2 attempts
139+
.doThrow(error)
140+
.doCallRealMethod() // second batch will succeed on the 2nd attempt
141+
.when(manager)
142+
.writeEvents(any(), any());
143+
sendAsync("test1", 20);
144+
assertRows("test1", 10, SHORT_DELAY);
145+
}
146+
147+
@AfterEach
148+
void clearEvents() throws Exception {
149+
reset(metaStoreManagerFactory);
150+
producer.shutdown();
151+
try (Connection connection = dataSource.get().getConnection();
152+
Statement statement = connection.createStatement()) {
153+
statement.execute("DELETE FROM polaris_schema.events");
154+
}
155+
}
156+
157+
private void sendAsync(String realmId, int n) {
158+
for (int i = 0; i < n; i++) {
159+
eventLoopGroup.next().execute(() -> producer.processEvent(realmId, event()));
160+
}
161+
}
162+
163+
private void assertRows(String realmId, int expected, Duration timeout) {
164+
String query = "SELECT COUNT(*) FROM polaris_schema.events WHERE realm_id = '" + realmId + "'";
165+
await()
166+
.atMost(timeout)
167+
.untilAsserted(
168+
() -> {
169+
try (Connection connection = dataSource.get().getConnection();
170+
Statement statement = connection.createStatement();
171+
ResultSet rs = statement.executeQuery(query)) {
172+
rs.next();
173+
assertThat(rs.getInt(1)).isEqualTo(expected);
174+
}
175+
});
176+
}
177+
178+
private static PolarisEvent event() {
179+
String id = UUID.randomUUID().toString();
180+
return new PolarisEvent("test", id, null, "test", 0, null, CATALOG, "test");
181+
}
182+
}

0 commit comments

Comments
 (0)