Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1915 fix retrieving the historical value of a thing with at-historical-timestamp #2033

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ protected void onEntityModified() {
* Publish an event.
*
* @param previousEntity the previous state of the entity before the event was applied.
* @param event the event which was applied.
* @param event the event which was applied.
*/
protected abstract void publishEvent(@Nullable S previousEntity, E event);

Expand Down Expand Up @@ -373,18 +373,17 @@ private void handleHistoricalRetrieveCommand(final C command) {
final EventStrategy<E, S> eventStrategy = getEventStrategy();
final ActorRef sender = getSender();
final ActorRef self = getSelf();
final long atHistoricalRevision = Optional
final Optional<Long> atHistoricalRevision = Optional
.ofNullable(command.getDittoHeaders().get(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey()))
.map(Long::parseLong)
.orElseGet(this::lastSequenceNr);
if (atHistoricalRevision > lastSequenceNr()) {
.map(Long::parseLong);
if (atHistoricalRevision.isPresent() && atHistoricalRevision.get() > lastSequenceNr()) {
getSender().tell(
newHistoryNotAccessibleExceptionBuilder(atHistoricalRevision)
newHistoryNotAccessibleExceptionBuilder(atHistoricalRevision.get())
.dittoHeaders(command.getDittoHeaders())
.build(),
getSelf()
);
} else if (atHistoricalRevision == lastSequenceNr()) {
} else if (atHistoricalRevision.isPresent() && atHistoricalRevision.get() == lastSequenceNr()) {
// for current revision, don't make the effort to load snapshot, etc., but return from memory like a normal
// "RetrieveThing" command does:
handleByCommandStrategy(command);
Expand All @@ -404,7 +403,7 @@ private void handleHistoricalRetrieveCommand(final C command) {
}

loadSnapshot(persistenceId(), SnapshotSelectionCriteria.create(
atHistoricalRevision,
atHistoricalRevision.orElseGet(this::lastSequenceNr),
atHistoricalTimestamp.equals(Instant.EPOCH) ? Long.MAX_VALUE : atHistoricalTimestamp.toEpochMilli(),
0L,
0L
Expand All @@ -421,7 +420,7 @@ private void handleHistoricalRetrieveCommand(final C command) {
eventStrategy,
sender,
self,
atHistoricalRevision,
atHistoricalRevision.orElseGet(this::lastSequenceNr),
atHistoricalTimestamp,
cancellableSnapshotLoadTimeout,
loadSnapshotResult
Expand Down Expand Up @@ -455,6 +454,7 @@ private void historicalRetrieveHandleLoadSnapshotResult(final C command,
.map(snapshotAdapter::fromSnapshotStore);
final boolean snapshotIsPresent = snapshotEntity.isDefined();

// when "getLatestSnapshotSequenceNumber() == 0", then no snapshot was stored yet to the snapshot store (e.g. for new things)
if (snapshotIsPresent || getLatestSnapshotSequenceNumber() == 0) {
final long snapshotEntityRevision = snapshotIsPresent ?
loadSnapshotResult.snapshot().get().metadata().sequenceNr() : 0L;
Expand All @@ -473,7 +473,8 @@ private void historicalRetrieveHandleLoadSnapshotResult(final C command,
)
.map(AbstractPersistenceActor::mapJournalEntryToEvent)
.map(journalEntryEvent -> new EntityWithEvent(
eventStrategy.handle((E) journalEntryEvent, entityFromSnapshot, journalEntryEvent.getRevision()),
eventStrategy.handle((E) journalEntryEvent, entityFromSnapshot,
journalEntryEvent.getRevision()),
(E) journalEntryEvent
))
.takeWhile(entityWithEvent -> {
Expand All @@ -487,18 +488,34 @@ private void historicalRetrieveHandleLoadSnapshotResult(final C command,
.isPresent();
}
})
.reduce((ewe1, ewe2) -> new EntityWithEvent(
eventStrategy.handle(ewe2.event, ewe1.entity, ewe2.revision),
ewe2.event
))
.runWith(Sink.foreach(entityWithEvent ->
.fold(new EntityWithEvent(null, null), (ewe1, ewe2) -> {
if (ewe2.event != null && ewe2.revision != null) {
return new EntityWithEvent(
eventStrategy.handle(ewe2.event, ewe1.entity, ewe2.revision),
ewe2.event
);
} else {
return ewe1;
}
})
.runWith(Sink.foreach(entityWithEvent -> {
if (entityWithEvent.event != null && entityWithEvent.revision != null) {
commandStrategy.apply(getStrategyContext(),
entityWithEvent.entity,
entityWithEvent.revision,
command
).accept(new HistoricalResultListener(sender,
entityWithEvent.event.getDittoHeaders()))
),
entityWithEvent.event.getDittoHeaders()));
} else {
if (!atHistoricalTimestamp.equals(Instant.EPOCH)) {
sender.tell(newHistoryNotAccessibleExceptionBuilder(atHistoricalTimestamp).build(),
self);
} else {
sender.tell(newHistoryNotAccessibleExceptionBuilder(atHistoricalRevision).build(),
self);
}
}
}),
getContext().getSystem());
} else {
if (!atHistoricalTimestamp.equals(Instant.EPOCH)) {
Expand Down Expand Up @@ -545,7 +562,7 @@ protected void becomeDeletedHandler() {
/**
* Persist an event, modify actor state by the event strategy, then invoke the handler.
*
* @param event the event to persist and apply.
* @param event the event to persist and apply.
* @param handler what happens afterwards.
*/
protected void persistAndApplyEvent(final E event, final BiConsumer<E, S> handler) {
Expand All @@ -564,7 +581,9 @@ protected void persistAndApplyEvent(final E event, final BiConsumer<E, S> handle

private record PersistEventAsync<
E extends EventsourcedEvent<? extends E>,
S extends Jsonifiable.WithFieldSelectorAndPredicate<JsonField>>(E event, BiConsumer<E, S> handler) {};
S extends Jsonifiable.WithFieldSelectorAndPredicate<JsonField>>(E event, BiConsumer<E, S> handler) {}

;

/**
* Persist an event, modify actor state by the event strategy, then invoke the handler.
Expand Down Expand Up @@ -692,7 +711,8 @@ private ReceiveBuilder handleByDeletedStrategyReceiveBuilder() {
.match(deletedStrategy.getMatchingClass(), deletedStrategy::isDefined,
// get the current deletedStrategy during "matching time" to allow implementing classes
// to update the strategy during runtime
command -> handleByStrategy(command, entity, (CommandStrategy<C, S, K, E>) getDeletedStrategy()));
command -> handleByStrategy(command, entity,
(CommandStrategy<C, S, K, E>) getDeletedStrategy()));
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -811,7 +831,7 @@ public void onError(final DittoRuntimeException error, final Command<?> errorCau
/**
* Send a reply and increment access counter.
*
* @param sender recipient of the message.
* @param sender recipient of the message.
* @param message the message.
*/
protected void notifySender(final ActorRef sender, final WithDittoHeaders message) {
Expand Down Expand Up @@ -1068,13 +1088,14 @@ public String toString() {

@Immutable
private final class EntityWithEvent {

@Nullable private final S entity;
private final long revision;
private final E event;
@Nullable private final Long revision;
@Nullable private final E event;

private EntityWithEvent(@Nullable final S entity, final E event) {
private EntityWithEvent(@Nullable final S entity, @Nullable final E event) {
this.entity = entity;
this.revision = event.getRevision();
this.revision = event != null ? event.getRevision() : null;
this.event = event;
}

Expand Down
Loading