Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide option to provide negative numbers to historical event streaming #1866

Merged
merged 1 commit into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
"properties": {
"fromHistoricalRevision": {
"type": "integer",
"description": "The revision to start the streaming from."
"description": "The revision to start the streaming from. May also be negative in order to specify to get the last n revisions relative to the 'toHistoricalRevision'."
},
"toHistoricalRevision": {
"type": "integer",
"description": "The revision to stop the streaming at."
"description": "The revision to stop the streaming at. May also be 0 or negative in order to specify to get either the latest (0) or the nth most recent revision."
},
"fromHistoricalTimestamp": {
"type": "string",
Expand Down
6 changes: 4 additions & 2 deletions documentation/src/main/resources/pages/ditto/basic-history.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ This API is however **only available for things** (not for policies).
Use the following query parameters in order to specify the start/stop revision/timestamp.

Either use the revision based parameters:
* `from-historical-revision`: specifies the revision number to start streaming historical modification events from
* `to-historical-revision`: optionally specifies the revision number to stop streaming at (if omitted, it streams events until the current state of the entity)
* `from-historical-revision`: Specifies the revision number to start streaming historical modification events from.
May also be negative in order to specify to get the last `n` revisions relative to the `to-historical-revision`.
* `to-historical-revision`: Optionally specifies the revision number to stop streaming at (if omitted, it streams events until the current state of the entity).
May also be 0 or negative in order to specify to get either the latest (`0`) or the `n`th most recent revision.

Alternatively, use the timestamp based parameters:
* `from-historical-timestamp`: specifies the timestamp to start streaming historical modification events from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,26 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.persistence.query.EventEnvelope;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.persistence.query.PersistenceQuery;
import org.apache.pekko.persistence.query.javadsl.CurrentEventsByPersistenceIdQuery;
import org.apache.pekko.persistence.query.javadsl.CurrentEventsByTagQuery;
import org.apache.pekko.persistence.query.javadsl.CurrentPersistenceIdsQuery;
import org.apache.pekko.persistence.query.javadsl.EventsByPersistenceIdQuery;
import org.apache.pekko.persistence.query.javadsl.EventsByTagQuery;
import org.apache.pekko.persistence.query.javadsl.PersistenceIdsQuery;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.RestartSettings;
import org.apache.pekko.stream.SystemMaterializer;
import org.apache.pekko.stream.javadsl.RestartSource;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
Expand Down Expand Up @@ -54,27 +74,6 @@
import com.mongodb.reactivestreams.client.MongoCollection;
import com.typesafe.config.Config;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.persistence.query.EventEnvelope;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.persistence.query.PersistenceQuery;
import org.apache.pekko.persistence.query.javadsl.CurrentEventsByPersistenceIdQuery;
import org.apache.pekko.persistence.query.javadsl.CurrentEventsByTagQuery;
import org.apache.pekko.persistence.query.javadsl.CurrentPersistenceIdsQuery;
import org.apache.pekko.persistence.query.javadsl.EventsByPersistenceIdQuery;
import org.apache.pekko.persistence.query.javadsl.EventsByTagQuery;
import org.apache.pekko.persistence.query.javadsl.PersistenceIdsQuery;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.RestartSettings;
import org.apache.pekko.stream.SystemMaterializer;
import org.apache.pekko.stream.javadsl.RestartSource;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;

import pekko.contrib.persistence.mongodb.JavaDslMongoReadJournal;
import pekko.contrib.persistence.mongodb.JournallingFieldNames$;
import pekko.contrib.persistence.mongodb.SnapshottingFieldNames$;
Expand Down Expand Up @@ -547,6 +546,23 @@ public Source<Optional<Long>, NotUsed> getSmallestEventSeqNo(final String pid) {
.orElse(Source.single(Optional.empty()));
}

/**
* Find the latest/newest event sequence number of a PID.
*
* @param pid the PID to search for.
* @return source of the latest event sequence number, or an empty optional.
*/
public Source<Optional<Long>, NotUsed> getLatestEventSeqNo(final String pid) {
return getJournal()
.flatMapConcat(journal -> Source.fromPublisher(
journal.find(Filters.eq(J_PROCESSOR_ID, pid))
.sort(Sorts.descending(J_TO))
.limit(1)
))
.map(document -> Optional.of(document.getLong(J_TO)))
.orElse(Source.single(Optional.empty()));
}

/**
* Find the smallest snapshot sequence number of a PID.
*
Expand Down Expand Up @@ -603,7 +619,16 @@ public Source<DeleteResult, NotUsed> deleteSnapshots(final String pid, final lon
public Source<EventEnvelope, NotUsed> currentEventsByPersistenceId(final String persistenceId,
final long fromSequenceNr,
final long toSequenceNr) {
return pekkoReadJournal.currentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr);
if (fromSequenceNr <= 0 || toSequenceNr <= 0) {
return getLatestEventSeqNo(persistenceId).flatMapConcat(latestSnOpt -> {
final long effectiveTo = toSequenceNr <= 0 ?
latestSnOpt.map(latest -> latest + toSequenceNr).orElse(toSequenceNr) : toSequenceNr;
final long effectiveFrom = fromSequenceNr <= 0 ? effectiveTo + 1 + fromSequenceNr : fromSequenceNr;
return pekkoReadJournal.currentEventsByPersistenceId(persistenceId, effectiveFrom, effectiveTo);
});
} else {
return pekkoReadJournal.currentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr);
}
}

@Override
Expand Down
Loading