From 17ad339dbe9e58d027cdc4ba332bb2d6b60f066e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20J=C3=A4ckle?= Date: Fri, 5 Jan 2024 13:03:18 +0100 Subject: [PATCH] Provide option to provide negative numbers to historical event streaming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * in order to get e.g. "last 10" persisted events Signed-off-by: Thomas Jäckle --- ...ubscribe-for-persisted-events-payload.json | 4 +- .../resources/pages/ditto/basic-history.md | 6 +- .../mongo/streaming/MongoReadJournal.java | 69 +++++++++++++------ 3 files changed, 53 insertions(+), 26 deletions(-) diff --git a/documentation/src/main/resources/jsonschema/protocol-streaming-subscription-subscribe-for-persisted-events-payload.json b/documentation/src/main/resources/jsonschema/protocol-streaming-subscription-subscribe-for-persisted-events-payload.json index b34288fff4..6f89337fa0 100644 --- a/documentation/src/main/resources/jsonschema/protocol-streaming-subscription-subscribe-for-persisted-events-payload.json +++ b/documentation/src/main/resources/jsonschema/protocol-streaming-subscription-subscribe-for-persisted-events-payload.json @@ -6,11 +6,11 @@ "properties": { "fromHistoricalRevision": { "type": "integer", - "description": "The revision to start the streaming from." + "description": "The revision to start the streaming from. May also be negative in order to specify to get the last n revisions relative to the 'toHistoricalRevision'." }, "toHistoricalRevision": { "type": "integer", - "description": "The revision to stop the streaming at." + "description": "The revision to stop the streaming at. May also be 0 or negative in order to specify to get either the latest (0) or the nth most recent revision." }, "fromHistoricalTimestamp": { "type": "string", diff --git a/documentation/src/main/resources/pages/ditto/basic-history.md b/documentation/src/main/resources/pages/ditto/basic-history.md index 1a6dfff801..d5b7ec5646 100644 --- a/documentation/src/main/resources/pages/ditto/basic-history.md +++ b/documentation/src/main/resources/pages/ditto/basic-history.md @@ -109,8 +109,10 @@ This API is however **only available for things** (not for policies). Use the following query parameters in order to specify the start/stop revision/timestamp. Either use the revision based parameters: -* `from-historical-revision`: specifies the revision number to start streaming historical modification events from -* `to-historical-revision`: optionally specifies the revision number to stop streaming at (if omitted, it streams events until the current state of the entity) +* `from-historical-revision`: Specifies the revision number to start streaming historical modification events from. + May also be negative in order to specify to get the last `n` revisions relative to the `to-historical-revision`. +* `to-historical-revision`: Optionally specifies the revision number to stop streaming at (if omitted, it streams events until the current state of the entity). + May also be 0 or negative in order to specify to get either the latest (`0`) or the `n`th most recent revision. Alternatively, use the timestamp based parameters: * `from-historical-timestamp`: specifies the timestamp to start streaming historical modification events from diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java index 35aa8ee5f5..bf055ab086 100644 --- a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java +++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java @@ -25,6 +25,26 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.pekko.Done; +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.japi.Pair; +import org.apache.pekko.persistence.query.EventEnvelope; +import org.apache.pekko.persistence.query.Offset; +import org.apache.pekko.persistence.query.PersistenceQuery; +import org.apache.pekko.persistence.query.javadsl.CurrentEventsByPersistenceIdQuery; +import org.apache.pekko.persistence.query.javadsl.CurrentEventsByTagQuery; +import org.apache.pekko.persistence.query.javadsl.CurrentPersistenceIdsQuery; +import org.apache.pekko.persistence.query.javadsl.EventsByPersistenceIdQuery; +import org.apache.pekko.persistence.query.javadsl.EventsByTagQuery; +import org.apache.pekko.persistence.query.javadsl.PersistenceIdsQuery; +import org.apache.pekko.stream.Attributes; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.RestartSettings; +import org.apache.pekko.stream.SystemMaterializer; +import org.apache.pekko.stream.javadsl.RestartSource; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; import org.bson.BsonArray; import org.bson.BsonDocument; import org.bson.BsonInt32; @@ -54,27 +74,6 @@ import com.mongodb.reactivestreams.client.MongoCollection; import com.typesafe.config.Config; -import org.apache.pekko.Done; -import org.apache.pekko.NotUsed; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.japi.Pair; -import org.apache.pekko.persistence.query.EventEnvelope; -import org.apache.pekko.persistence.query.Offset; -import org.apache.pekko.persistence.query.PersistenceQuery; -import org.apache.pekko.persistence.query.javadsl.CurrentEventsByPersistenceIdQuery; -import org.apache.pekko.persistence.query.javadsl.CurrentEventsByTagQuery; -import org.apache.pekko.persistence.query.javadsl.CurrentPersistenceIdsQuery; -import org.apache.pekko.persistence.query.javadsl.EventsByPersistenceIdQuery; -import org.apache.pekko.persistence.query.javadsl.EventsByTagQuery; -import org.apache.pekko.persistence.query.javadsl.PersistenceIdsQuery; -import org.apache.pekko.stream.Attributes; -import org.apache.pekko.stream.Materializer; -import org.apache.pekko.stream.RestartSettings; -import org.apache.pekko.stream.SystemMaterializer; -import org.apache.pekko.stream.javadsl.RestartSource; -import org.apache.pekko.stream.javadsl.Sink; -import org.apache.pekko.stream.javadsl.Source; - import pekko.contrib.persistence.mongodb.JavaDslMongoReadJournal; import pekko.contrib.persistence.mongodb.JournallingFieldNames$; import pekko.contrib.persistence.mongodb.SnapshottingFieldNames$; @@ -547,6 +546,23 @@ public Source, NotUsed> getSmallestEventSeqNo(final String pid) { .orElse(Source.single(Optional.empty())); } + /** + * Find the latest/newest event sequence number of a PID. + * + * @param pid the PID to search for. + * @return source of the latest event sequence number, or an empty optional. + */ + public Source, NotUsed> getLatestEventSeqNo(final String pid) { + return getJournal() + .flatMapConcat(journal -> Source.fromPublisher( + journal.find(Filters.eq(J_PROCESSOR_ID, pid)) + .sort(Sorts.descending(J_TO)) + .limit(1) + )) + .map(document -> Optional.of(document.getLong(J_TO))) + .orElse(Source.single(Optional.empty())); + } + /** * Find the smallest snapshot sequence number of a PID. * @@ -603,7 +619,16 @@ public Source deleteSnapshots(final String pid, final lon public Source currentEventsByPersistenceId(final String persistenceId, final long fromSequenceNr, final long toSequenceNr) { - return pekkoReadJournal.currentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr); + if (fromSequenceNr <= 0 || toSequenceNr <= 0) { + return getLatestEventSeqNo(persistenceId).flatMapConcat(latestSnOpt -> { + final long effectiveTo = toSequenceNr <= 0 ? + latestSnOpt.map(latest -> latest + toSequenceNr).orElse(toSequenceNr) : toSequenceNr; + final long effectiveFrom = fromSequenceNr <= 0 ? effectiveTo + 1 + fromSequenceNr : fromSequenceNr; + return pekkoReadJournal.currentEventsByPersistenceId(persistenceId, effectiveFrom, effectiveTo); + }); + } else { + return pekkoReadJournal.currentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr); + } } @Override