Skip to content

Commit

Permalink
Merge pull request #2033 from beyonnex-io/bugfix/1915-at-historical-t…
Browse files Browse the repository at this point in the history
…imestamp

#1915 fix retrieving the historical value of a thing with `at-historical-timestamp`
  • Loading branch information
thjaeckle authored Sep 27, 2024
2 parents 5f02056 + 3ffe395 commit fa6537d
Showing 1 changed file with 46 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ protected void onEntityModified() {
* Publish an event.
*
* @param previousEntity the previous state of the entity before the event was applied.
* @param event the event which was applied.
* @param event the event which was applied.
*/
protected abstract void publishEvent(@Nullable S previousEntity, E event);

Expand Down Expand Up @@ -373,18 +373,17 @@ private void handleHistoricalRetrieveCommand(final C command) {
final EventStrategy<E, S> eventStrategy = getEventStrategy();
final ActorRef sender = getSender();
final ActorRef self = getSelf();
final long atHistoricalRevision = Optional
final Optional<Long> atHistoricalRevision = Optional
.ofNullable(command.getDittoHeaders().get(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey()))
.map(Long::parseLong)
.orElseGet(this::lastSequenceNr);
if (atHistoricalRevision > lastSequenceNr()) {
.map(Long::parseLong);
if (atHistoricalRevision.isPresent() && atHistoricalRevision.get() > lastSequenceNr()) {
getSender().tell(
newHistoryNotAccessibleExceptionBuilder(atHistoricalRevision)
newHistoryNotAccessibleExceptionBuilder(atHistoricalRevision.get())
.dittoHeaders(command.getDittoHeaders())
.build(),
getSelf()
);
} else if (atHistoricalRevision == lastSequenceNr()) {
} else if (atHistoricalRevision.isPresent() && atHistoricalRevision.get() == lastSequenceNr()) {
// for current revision, don't make the effort to load snapshot, etc., but return from memory like a normal
// "RetrieveThing" command does:
handleByCommandStrategy(command);
Expand All @@ -404,7 +403,7 @@ private void handleHistoricalRetrieveCommand(final C command) {
}

loadSnapshot(persistenceId(), SnapshotSelectionCriteria.create(
atHistoricalRevision,
atHistoricalRevision.orElseGet(this::lastSequenceNr),
atHistoricalTimestamp.equals(Instant.EPOCH) ? Long.MAX_VALUE : atHistoricalTimestamp.toEpochMilli(),
0L,
0L
Expand All @@ -421,7 +420,7 @@ private void handleHistoricalRetrieveCommand(final C command) {
eventStrategy,
sender,
self,
atHistoricalRevision,
atHistoricalRevision.orElseGet(this::lastSequenceNr),
atHistoricalTimestamp,
cancellableSnapshotLoadTimeout,
loadSnapshotResult
Expand Down Expand Up @@ -455,6 +454,7 @@ private void historicalRetrieveHandleLoadSnapshotResult(final C command,
.map(snapshotAdapter::fromSnapshotStore);
final boolean snapshotIsPresent = snapshotEntity.isDefined();

// when "getLatestSnapshotSequenceNumber() == 0", then no snapshot was stored yet to the snapshot store (e.g. for new things)
if (snapshotIsPresent || getLatestSnapshotSequenceNumber() == 0) {
final long snapshotEntityRevision = snapshotIsPresent ?
loadSnapshotResult.snapshot().get().metadata().sequenceNr() : 0L;
Expand All @@ -473,7 +473,8 @@ private void historicalRetrieveHandleLoadSnapshotResult(final C command,
)
.map(AbstractPersistenceActor::mapJournalEntryToEvent)
.map(journalEntryEvent -> new EntityWithEvent(
eventStrategy.handle((E) journalEntryEvent, entityFromSnapshot, journalEntryEvent.getRevision()),
eventStrategy.handle((E) journalEntryEvent, entityFromSnapshot,
journalEntryEvent.getRevision()),
(E) journalEntryEvent
))
.takeWhile(entityWithEvent -> {
Expand All @@ -487,18 +488,34 @@ private void historicalRetrieveHandleLoadSnapshotResult(final C command,
.isPresent();
}
})
.reduce((ewe1, ewe2) -> new EntityWithEvent(
eventStrategy.handle(ewe2.event, ewe1.entity, ewe2.revision),
ewe2.event
))
.runWith(Sink.foreach(entityWithEvent ->
.fold(new EntityWithEvent(null, null), (ewe1, ewe2) -> {
if (ewe2.event != null && ewe2.revision != null) {
return new EntityWithEvent(
eventStrategy.handle(ewe2.event, ewe1.entity, ewe2.revision),
ewe2.event
);
} else {
return ewe1;
}
})
.runWith(Sink.foreach(entityWithEvent -> {
if (entityWithEvent.event != null && entityWithEvent.revision != null) {
commandStrategy.apply(getStrategyContext(),
entityWithEvent.entity,
entityWithEvent.revision,
command
).accept(new HistoricalResultListener(sender,
entityWithEvent.event.getDittoHeaders()))
),
entityWithEvent.event.getDittoHeaders()));
} else {
if (!atHistoricalTimestamp.equals(Instant.EPOCH)) {
sender.tell(newHistoryNotAccessibleExceptionBuilder(atHistoricalTimestamp).build(),
self);
} else {
sender.tell(newHistoryNotAccessibleExceptionBuilder(atHistoricalRevision).build(),
self);
}
}
}),
getContext().getSystem());
} else {
if (!atHistoricalTimestamp.equals(Instant.EPOCH)) {
Expand Down Expand Up @@ -545,7 +562,7 @@ protected void becomeDeletedHandler() {
/**
* Persist an event, modify actor state by the event strategy, then invoke the handler.
*
* @param event the event to persist and apply.
* @param event the event to persist and apply.
* @param handler what happens afterwards.
*/
protected void persistAndApplyEvent(final E event, final BiConsumer<E, S> handler) {
Expand All @@ -564,7 +581,9 @@ protected void persistAndApplyEvent(final E event, final BiConsumer<E, S> handle

private record PersistEventAsync<
E extends EventsourcedEvent<? extends E>,
S extends Jsonifiable.WithFieldSelectorAndPredicate<JsonField>>(E event, BiConsumer<E, S> handler) {};
S extends Jsonifiable.WithFieldSelectorAndPredicate<JsonField>>(E event, BiConsumer<E, S> handler) {}

;

/**
* Persist an event, modify actor state by the event strategy, then invoke the handler.
Expand Down Expand Up @@ -692,7 +711,8 @@ private ReceiveBuilder handleByDeletedStrategyReceiveBuilder() {
.match(deletedStrategy.getMatchingClass(), deletedStrategy::isDefined,
// get the current deletedStrategy during "matching time" to allow implementing classes
// to update the strategy during runtime
command -> handleByStrategy(command, entity, (CommandStrategy<C, S, K, E>) getDeletedStrategy()));
command -> handleByStrategy(command, entity,
(CommandStrategy<C, S, K, E>) getDeletedStrategy()));
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -811,7 +831,7 @@ public void onError(final DittoRuntimeException error, final Command<?> errorCau
/**
* Send a reply and increment access counter.
*
* @param sender recipient of the message.
* @param sender recipient of the message.
* @param message the message.
*/
protected void notifySender(final ActorRef sender, final WithDittoHeaders message) {
Expand Down Expand Up @@ -1068,13 +1088,14 @@ public String toString() {

@Immutable
private final class EntityWithEvent {

@Nullable private final S entity;
private final long revision;
private final E event;
@Nullable private final Long revision;
@Nullable private final E event;

private EntityWithEvent(@Nullable final S entity, final E event) {
private EntityWithEvent(@Nullable final S entity, @Nullable final E event) {
this.entity = entity;
this.revision = event.getRevision();
this.revision = event != null ? event.getRevision() : null;
this.event = event;
}

Expand Down

0 comments on commit fa6537d

Please sign in to comment.