From 2f1d503e10971ff1a4aa1a675f731bb4b9a134d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Novotn=C3=BD?= Date: Sun, 26 May 2024 10:49:08 +0200 Subject: [PATCH] fix(#18): optimized access to currently opened sessions --- .../java/io/evitadb/core/SessionRegistry.java | 42 +++++++------------ .../event/session/SessionClosedEvent.java | 10 ++++- 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/evita_engine/src/main/java/io/evitadb/core/SessionRegistry.java b/evita_engine/src/main/java/io/evitadb/core/SessionRegistry.java index 2f84bdb9e..02e5f90e4 100644 --- a/evita_engine/src/main/java/io/evitadb/core/SessionRegistry.java +++ b/evita_engine/src/main/java/io/evitadb/core/SessionRegistry.java @@ -61,6 +61,7 @@ import java.util.UUID; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Stream; @@ -88,6 +89,10 @@ final class SessionRegistry { * Keeps information about currently active sessions. */ private final Map activeSessions = new ConcurrentHashMap<>(64); + /** + * Keeps information about sessions sorted according to date of creation. + */ + private final ConcurrentLinkedQueue sessionsFifoQueue = new ConcurrentLinkedQueue<>(); /** * Keeps information about count of currently active sessions. Counter is used to safely control single session * limits in parallel execution. @@ -158,7 +163,9 @@ public EvitaInternalSessionContract addSession(boolean warmUp, @Nonnull Supplier new Class[]{EvitaInternalSessionContract.class, EvitaProxyFinalization.class}, new EvitaSessionProxy(newSession, this.tracingContext) ); - this.activeSessions.put(newSession.getId(), new EvitaSessionTuple(newSession, newSessionProxy)); + final EvitaSessionTuple sessionTuple = new EvitaSessionTuple(newSession, newSessionProxy); + this.activeSessions.put(newSession.getId(), sessionTuple); + this.sessionsFifoQueue.add(sessionTuple); this.catalogConsumedVersions.computeIfAbsent(catalogName, k -> new VersionConsumingSessions()) .registerSessionConsumingCatalogInVersion(catalogVersion); @@ -172,29 +179,11 @@ public void removeSession(@Nonnull EvitaSession session) { final EvitaSessionTuple activeSession = this.activeSessions.remove(session.getId()); if (activeSession != null) { this.activeSessionsCounter.decrementAndGet(); + Assert.isPremiseValid(this.sessionsFifoQueue.remove(activeSession), "Session not found in the queue."); - // this may be expensive if there are many active sessions, - // but we expect that there will be hundreds of them at most - // if this becomes a bottleneck, we will optimize it by using a specialized data structure - OffsetDateTime oldestTransactionTimestamp = OffsetDateTime.MIN; - OffsetDateTime oldestSessionTimestamp = OffsetDateTime.MIN; - for (EvitaSessionTuple sessionTuple : this.activeSessions.values()) { - //noinspection resource - final OffsetDateTime sessionTransactionTimestamp = sessionTuple.plainSession() - .getTransaction() - .map(Transaction::getCreated) - .orElse(OffsetDateTime.MIN); - if (oldestTransactionTimestamp.isBefore(sessionTransactionTimestamp)) { - oldestTransactionTimestamp = sessionTransactionTimestamp; - } - final OffsetDateTime sessionTimestamp = sessionTuple.proxySession().getCreated(); - if (oldestSessionTimestamp.isBefore(sessionTimestamp)) { - oldestSessionTimestamp = sessionTimestamp; - } - } - - final OffsetDateTime theOldestTransactionTimestamp = oldestTransactionTimestamp == OffsetDateTime.MIN ? - null : oldestTransactionTimestamp; + final OffsetDateTime theOldestTransactionTimestamp = ofNullable(this.sessionsFifoQueue.peek()) + .map(it -> it.plainSession().getCreated()) + .orElse(null); session.getTransaction().ifPresent(transaction -> { // emit event @@ -217,7 +206,7 @@ public void removeSession(@Nonnull EvitaSession session) { // emit event //noinspection CastToIncompatibleInterface,resource ((EvitaProxyFinalization)activeSession.proxySession()) - .finish(oldestSessionTimestamp); + .finish(theOldestTransactionTimestamp, this.activeSessionsCounter.get()); } } @@ -261,7 +250,7 @@ public EvitaSessionProxy(@Nonnull EvitaSession evitaSession, @Nonnull TracingCon public Object invoke(Object proxy, Method method, Object[] args) { if (method.getDeclaringClass().equals(EvitaProxyFinalization.class)) { sessionClosedEvent - .finish((OffsetDateTime) args[0]) + .finish((OffsetDateTime) args[0], (int) args[1]) .commit(); return null; } else { @@ -432,8 +421,9 @@ private interface EvitaProxyFinalization { /** * Method should be called when session proxy is terminated. * @param oldestSessionTimestamp the oldest active session timestamp + * @param activeSessions the number of still active sessions */ - void finish(@Null OffsetDateTime oldestSessionTimestamp); + void finish(@Null OffsetDateTime oldestSessionTimestamp, int activeSessions); } diff --git a/evita_engine/src/main/java/io/evitadb/core/metric/event/session/SessionClosedEvent.java b/evita_engine/src/main/java/io/evitadb/core/metric/event/session/SessionClosedEvent.java index b0682265d..28d1c28e2 100644 --- a/evita_engine/src/main/java/io/evitadb/core/metric/event/session/SessionClosedEvent.java +++ b/evita_engine/src/main/java/io/evitadb/core/metric/event/session/SessionClosedEvent.java @@ -58,6 +58,10 @@ public class SessionClosedEvent extends AbstractSessionEvent { @ExportMetric(metricType = MetricType.GAUGE) private long oldestSessionTimestampSeconds; + @Label("Number of still active sessions") + @ExportMetric(metricType = MetricType.GAUGE) + private long activeSessions; + public SessionClosedEvent(@Nonnull String catalogName) { super(catalogName); this.begin(); @@ -83,10 +87,12 @@ public void recordMutation() { */ @Nonnull public SessionClosedEvent finish( - @Nullable OffsetDateTime oldestSessionTimestampSeconds - ) { + @Nullable OffsetDateTime oldestSessionTimestampSeconds, + int activeSessions + ) { this.oldestSessionTimestampSeconds = oldestSessionTimestampSeconds == null ? 0 : oldestSessionTimestampSeconds.toEpochSecond(); + this.activeSessions = activeSessions; this.end(); return this; }