diff --git a/base/model/src/main/java/org/eclipse/ditto/base/model/signals/commands/streaming/SubscribeForPersistedEvents.java b/base/model/src/main/java/org/eclipse/ditto/base/model/signals/commands/streaming/SubscribeForPersistedEvents.java index aeca3a7738..d41b6f7d57 100755 --- a/base/model/src/main/java/org/eclipse/ditto/base/model/signals/commands/streaming/SubscribeForPersistedEvents.java +++ b/base/model/src/main/java/org/eclipse/ditto/base/model/signals/commands/streaming/SubscribeForPersistedEvents.java @@ -61,6 +61,7 @@ public final class SubscribeForPersistedEvents extends AbstractStreamingSubscrip @Nullable private final Instant fromHistoricalTimestamp; @Nullable private final Instant toHistoricalTimestamp; @Nullable private final String prefix; + @Nullable private final String filter; private SubscribeForPersistedEvents(final EntityId entityId, final JsonPointer resourcePath, @@ -69,6 +70,7 @@ private SubscribeForPersistedEvents(final EntityId entityId, @Nullable final Instant fromHistoricalTimestamp, @Nullable final Instant toHistoricalTimestamp, @Nullable final String prefix, + @Nullable final CharSequence filter, final DittoHeaders dittoHeaders) { super(TYPE, entityId, resourcePath, dittoHeaders); @@ -77,6 +79,7 @@ private SubscribeForPersistedEvents(final EntityId entityId, this.fromHistoricalTimestamp = fromHistoricalTimestamp; this.toHistoricalTimestamp = toHistoricalTimestamp; this.prefix = prefix; + this.filter = filter != null ? filter.toString() : null; } /** @@ -89,7 +92,9 @@ private SubscribeForPersistedEvents(final EntityId entityId, * @param dittoHeaders the command headers of the request. * @return the command. * @throws NullPointerException if any non-nullable argument is {@code null}. + * @deprecated since 3.5.0, use {@link #of(EntityId, JsonPointer, long, long, CharSequence, DittoHeaders)} */ + @Deprecated public static SubscribeForPersistedEvents of(final EntityId entityId, final JsonPointer resourcePath, final long fromHistoricalRevision, @@ -103,6 +108,38 @@ public static SubscribeForPersistedEvents of(final EntityId entityId, null, null, null, + null, + dittoHeaders); + } + + /** + * Creates a new {@code SudoStreamSnapshots} command based on "from" and "to" {@code long} revisions. + * + * @param entityId the entityId that should be streamed. + * @param resourcePath the resource path for which to stream events. + * @param fromHistoricalRevision the revision to start the streaming from. + * @param toHistoricalRevision the revision to stop the streaming at. + * @param dittoHeaders the command headers of the request. + * @param filter the optional RQL filter to apply for persisted events before publishing to the stream + * @return the command. + * @throws NullPointerException if any non-nullable argument is {@code null}. + * @since 3.5.0 + */ + public static SubscribeForPersistedEvents of(final EntityId entityId, + final JsonPointer resourcePath, + final long fromHistoricalRevision, + final long toHistoricalRevision, + @Nullable final CharSequence filter, + final DittoHeaders dittoHeaders) { + + return new SubscribeForPersistedEvents(entityId, + resourcePath, + fromHistoricalRevision, + toHistoricalRevision, + null, + null, + null, + filter, dittoHeaders); } @@ -116,7 +153,9 @@ public static SubscribeForPersistedEvents of(final EntityId entityId, * @param dittoHeaders the command headers of the request. * @return the command. * @throws NullPointerException if any non-nullable argument is {@code null}. + * @deprecated since 3.5.0, use {@link #of(EntityId, JsonPointer, Instant, Instant, CharSequence, DittoHeaders)} */ + @Deprecated public static SubscribeForPersistedEvents of(final EntityId entityId, final JsonPointer resourcePath, @Nullable final Instant fromHistoricalTimestamp, @@ -130,6 +169,72 @@ public static SubscribeForPersistedEvents of(final EntityId entityId, fromHistoricalTimestamp, toHistoricalTimestamp, null, + null, + dittoHeaders); + } + + /** + * Creates a new {@code SudoStreamSnapshots} command based on "from" and "to" {@code Instant} timestamps. + * + * @param entityId the entityId that should be streamed. + * @param resourcePath the resource path for which to stream events. + * @param fromHistoricalTimestamp the timestamp to start the streaming from. + * @param toHistoricalTimestamp the timestamp to stop the streaming at. + * @param dittoHeaders the command headers of the request. + * @param filter the optional RQL filter to apply for persisted events before publishing to the stream + * @return the command. + * @throws NullPointerException if any non-nullable argument is {@code null}. + * @since 3.5.0 + */ + public static SubscribeForPersistedEvents of(final EntityId entityId, + final JsonPointer resourcePath, + @Nullable final Instant fromHistoricalTimestamp, + @Nullable final Instant toHistoricalTimestamp, + @Nullable final CharSequence filter, + final DittoHeaders dittoHeaders) { + + return new SubscribeForPersistedEvents(entityId, + resourcePath, + 0L, + Long.MAX_VALUE, + fromHistoricalTimestamp, + toHistoricalTimestamp, + null, + filter, + dittoHeaders); + } + + /** + * Creates a new {@code SudoStreamSnapshots} command based on "from" and "to" {@code Instant} timestamps. + * + * @param entityId the entityId that should be streamed. + * @param resourcePath the resource path for which to stream events. + * @param fromHistoricalRevision the revision to start the streaming from. + * @param toHistoricalRevision the revision to stop the streaming at. + * @param fromHistoricalTimestamp the timestamp to start the streaming from. + * @param toHistoricalTimestamp the timestamp to stop the streaming at. + * @param dittoHeaders the command headers of the request. + * @return the command. + * @throws NullPointerException if any non-nullable argument is {@code null}. + * @deprecated since 3.5.0, use {@link #of(EntityId, JsonPointer, Long, Long, Instant, Instant, CharSequence, DittoHeaders)} + */ + @Deprecated + public static SubscribeForPersistedEvents of(final EntityId entityId, + final JsonPointer resourcePath, + @Nullable final Long fromHistoricalRevision, + @Nullable final Long toHistoricalRevision, + @Nullable final Instant fromHistoricalTimestamp, + @Nullable final Instant toHistoricalTimestamp, + final DittoHeaders dittoHeaders) { + + return new SubscribeForPersistedEvents(entityId, + resourcePath, + null != fromHistoricalRevision ? fromHistoricalRevision : 0L, + null != toHistoricalRevision ? toHistoricalRevision : Long.MAX_VALUE, + fromHistoricalTimestamp, + toHistoricalTimestamp, + null, + null, dittoHeaders); } @@ -142,9 +247,11 @@ public static SubscribeForPersistedEvents of(final EntityId entityId, * @param toHistoricalRevision the revision to stop the streaming at. * @param fromHistoricalTimestamp the timestamp to start the streaming from. * @param toHistoricalTimestamp the timestamp to stop the streaming at. + * @param filter the optional RQL filter to apply for persisted events before publishing to the stream * @param dittoHeaders the command headers of the request. * @return the command. * @throws NullPointerException if any non-nullable argument is {@code null}. + * @since 3.5.0 */ public static SubscribeForPersistedEvents of(final EntityId entityId, final JsonPointer resourcePath, @@ -152,6 +259,7 @@ public static SubscribeForPersistedEvents of(final EntityId entityId, @Nullable final Long toHistoricalRevision, @Nullable final Instant fromHistoricalTimestamp, @Nullable final Instant toHistoricalTimestamp, + @Nullable final CharSequence filter, final DittoHeaders dittoHeaders) { return new SubscribeForPersistedEvents(entityId, @@ -161,6 +269,7 @@ public static SubscribeForPersistedEvents of(final EntityId entityId, fromHistoricalTimestamp, toHistoricalTimestamp, null, + filter, dittoHeaders); } @@ -182,6 +291,7 @@ public static SubscribeForPersistedEvents fromJson(final JsonObject jsonObject, jsonObject.getValue(JsonFields.JSON_FROM_HISTORICAL_TIMESTAMP).map(Instant::parse).orElse(null), jsonObject.getValue(JsonFields.JSON_TO_HISTORICAL_TIMESTAMP).map(Instant::parse).orElse(null), jsonObject.getValue(JsonFields.PREFIX).orElse(null), + jsonObject.getValue(JsonFields.FILTER).orElse(null), dittoHeaders ); } @@ -195,7 +305,7 @@ public static SubscribeForPersistedEvents fromJson(final JsonObject jsonObject, */ public SubscribeForPersistedEvents setPrefix(@Nullable final String prefix) { return new SubscribeForPersistedEvents(entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision, - fromHistoricalTimestamp, toHistoricalTimestamp, prefix, getDittoHeaders()); + fromHistoricalTimestamp, toHistoricalTimestamp, prefix, filter, getDittoHeaders()); } /** @@ -244,6 +354,14 @@ public Optional getPrefix() { return Optional.ofNullable(prefix); } + /** + * @return the optional RQL filter to apply for persisted events before publishing to the stream + * @since 3.5.0 + */ + public Optional getFilter() { + return Optional.ofNullable(filter); + } + @Override protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion, @@ -263,6 +381,7 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, jsonObjectBuilder.set(JsonFields.JSON_TO_HISTORICAL_TIMESTAMP, toHistoricalTimestamp.toString(), predicate); } getPrefix().ifPresent(thePrefix -> jsonObjectBuilder.set(JsonFields.PREFIX, thePrefix)); + getFilter().ifPresent(theFilter -> jsonObjectBuilder.set(JsonFields.FILTER, theFilter)); } @Override @@ -273,13 +392,13 @@ public String getTypePrefix() { @Override public SubscribeForPersistedEvents setDittoHeaders(final DittoHeaders dittoHeaders) { return new SubscribeForPersistedEvents(entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision, - fromHistoricalTimestamp, toHistoricalTimestamp, prefix, dittoHeaders); + fromHistoricalTimestamp, toHistoricalTimestamp, prefix, filter, dittoHeaders); } @Override public int hashCode() { return Objects.hash(super.hashCode(), entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision, - fromHistoricalTimestamp, toHistoricalTimestamp, prefix); + fromHistoricalTimestamp, toHistoricalTimestamp, prefix, filter); } @Override @@ -297,7 +416,8 @@ public boolean equals(@Nullable final Object obj) { toHistoricalRevision == that.toHistoricalRevision && Objects.equals(fromHistoricalTimestamp, that.fromHistoricalTimestamp) && Objects.equals(toHistoricalTimestamp, that.toHistoricalTimestamp) && - Objects.equals(prefix, that.prefix); + Objects.equals(prefix, that.prefix) && + Objects.equals(filter, that.filter); } @Override @@ -313,6 +433,7 @@ public String toString() { + ", fromHistoricalTimestamp=" + fromHistoricalTimestamp + ", toHistoricalTimestamp=" + toHistoricalTimestamp + ", prefix=" + prefix + + ", filter=" + filter + "]"; } @@ -339,6 +460,9 @@ private JsonFields() { static final JsonFieldDefinition PREFIX = JsonFactory.newStringFieldDefinition("prefix", REGULAR, V_2); + + public static final JsonFieldDefinition FILTER = + JsonFactory.newStringFieldDefinition("filter", REGULAR, V_2); } } diff --git a/base/model/src/test/java/org/eclipse/ditto/base/model/signals/commands/streaming/SubscribeForPersistedEventsTest.java b/base/model/src/test/java/org/eclipse/ditto/base/model/signals/commands/streaming/SubscribeForPersistedEventsTest.java index c78c8dab60..9d04d800bc 100755 --- a/base/model/src/test/java/org/eclipse/ditto/base/model/signals/commands/streaming/SubscribeForPersistedEventsTest.java +++ b/base/model/src/test/java/org/eclipse/ditto/base/model/signals/commands/streaming/SubscribeForPersistedEventsTest.java @@ -43,6 +43,7 @@ public final class SubscribeForPersistedEventsTest { private static final long KNOWN_TO_REV = 42L; private static final String KNOWN_FROM_TS = "2022-10-25T14:00:00Z"; private static final String KNOWN_TO_TS = "2022-10-25T15:00:00Z"; + private static final String KNOWN_FILTER = "exists(thingId)"; private static final String JSON_ALL_FIELDS = JsonFactory.newObjectBuilder() .set(Command.JsonFields.TYPE, SubscribeForPersistedEvents.TYPE) @@ -53,6 +54,7 @@ public final class SubscribeForPersistedEventsTest { .set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_REVISION, KNOWN_TO_REV) .set(SubscribeForPersistedEvents.JsonFields.JSON_FROM_HISTORICAL_TIMESTAMP, KNOWN_FROM_TS) .set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_TIMESTAMP, KNOWN_TO_TS) + .set(SubscribeForPersistedEvents.JsonFields.FILTER, KNOWN_FILTER) .build() .toString(); @@ -63,6 +65,7 @@ public final class SubscribeForPersistedEventsTest { .set(StreamingSubscriptionCommand.JsonFields.JSON_RESOURCE_PATH, KNOWN_RESOURCE_PATH) .set(SubscribeForPersistedEvents.JsonFields.JSON_FROM_HISTORICAL_REVISION, KNOWN_FROM_REV) .set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_REVISION, KNOWN_TO_REV) + .set(SubscribeForPersistedEvents.JsonFields.FILTER, KNOWN_FILTER) .build().toString(); @Test @@ -88,6 +91,7 @@ public void toJsonWithAllFieldsSet() { KNOWN_TO_REV, Instant.parse(KNOWN_FROM_TS), Instant.parse(KNOWN_TO_TS), + KNOWN_FILTER, DittoHeaders.empty() ); @@ -102,6 +106,7 @@ public void toJsonWithOnlyRequiredFieldsSet() { JsonPointer.of(KNOWN_RESOURCE_PATH), KNOWN_FROM_REV, KNOWN_TO_REV, + KNOWN_FILTER, DittoHeaders.empty()); final String json = command.toJsonString(); assertThat(json).isEqualTo(JSON_MINIMAL); @@ -116,6 +121,7 @@ public void fromJsonWithAllFieldsSet() { KNOWN_TO_REV, Instant.parse(KNOWN_FROM_TS), Instant.parse(KNOWN_TO_TS), + KNOWN_FILTER, DittoHeaders.empty() ); assertThat(SubscribeForPersistedEvents.fromJson(JsonObject.of(JSON_ALL_FIELDS), DittoHeaders.empty())) @@ -130,6 +136,7 @@ public void fromJsonWithOnlyRequiredFieldsSet() { JsonPointer.of(KNOWN_RESOURCE_PATH), KNOWN_FROM_REV, KNOWN_TO_REV, + KNOWN_FILTER, DittoHeaders.empty())); } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionSupervisorActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionSupervisorActor.java index 0584bf7153..df70c423ba 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionSupervisorActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionSupervisorActor.java @@ -25,10 +25,22 @@ import javax.annotation.Nullable; import javax.jms.JMSRuntimeException; +import org.apache.pekko.actor.ActorKilledException; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.OneForOneStrategy; +import org.apache.pekko.actor.Props; +import org.apache.pekko.actor.ReceiveTimeout; +import org.apache.pekko.actor.SupervisorStrategy; +import org.apache.pekko.japi.pf.DeciderBuilder; +import org.apache.pekko.japi.pf.FI; +import org.apache.pekko.japi.pf.ReceiveBuilder; +import org.apache.pekko.pattern.Patterns; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.base.model.signals.commands.streaming.SubscribeForPersistedEvents; +import org.eclipse.ditto.base.model.signals.events.Event; import org.eclipse.ditto.base.service.actors.ShutdownBehaviour; import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig; import org.eclipse.ditto.base.service.config.supervision.LocalAskTimeoutConfig; @@ -48,17 +60,6 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import org.apache.pekko.actor.ActorKilledException; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.OneForOneStrategy; -import org.apache.pekko.actor.Props; -import org.apache.pekko.actor.ReceiveTimeout; -import org.apache.pekko.actor.SupervisorStrategy; -import org.apache.pekko.japi.pf.DeciderBuilder; -import org.apache.pekko.japi.pf.FI; -import org.apache.pekko.japi.pf.ReceiveBuilder; -import org.apache.pekko.pattern.Patterns; - /** * Supervisor for {@link ConnectionPersistenceActor} which means it will create, start and watch it as child actor. *

@@ -158,6 +159,11 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha .orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior)); } + @Override + protected boolean applyPersistedEventFilter(final Event event, final SubscribeForPersistedEvents subscribe) { + return true; + } + @Override protected boolean shouldBecomeTwinSignalProcessingAwaiting(final Signal signal) { return super.shouldBecomeTwinSignalProcessingAwaiting(signal) && diff --git a/documentation/src/main/resources/jsonschema/protocol-streaming-subscription-subscribe-for-persisted-events-payload.json b/documentation/src/main/resources/jsonschema/protocol-streaming-subscription-subscribe-for-persisted-events-payload.json index bf525456c2..b34288fff4 100644 --- a/documentation/src/main/resources/jsonschema/protocol-streaming-subscription-subscribe-for-persisted-events-payload.json +++ b/documentation/src/main/resources/jsonschema/protocol-streaming-subscription-subscribe-for-persisted-events-payload.json @@ -21,6 +21,10 @@ "type": "string", "format": "date-time", "description": "The timestamp to stop the streaming at." + }, + "filter": { + "type": "string", + "description": "An RQL expression defining which events to filter for in the stream. Only supported for thing events." } } } diff --git a/documentation/src/main/resources/pages/ditto/basic-history.md b/documentation/src/main/resources/pages/ditto/basic-history.md index fdf9c908c7..1a6dfff801 100644 --- a/documentation/src/main/resources/pages/ditto/basic-history.md +++ b/documentation/src/main/resources/pages/ditto/basic-history.md @@ -138,6 +138,27 @@ curl --http2 -u ditto:ditto -H 'Accept:text/event-stream' -N \ http://localhost:8080/api/2/things/org.eclipse.ditto:thing-2?from-historical-revision=0&fields=thingId,attributes,features,_revision,_modified,_context ``` +#### Filtering streamed historical events for things via SEE + +When streaming historical events for [things](basic-thing.html), an optional `filter` in form of an +[RQL](basic-rql.html) may be declared in order to only receive thing events matching the defined query. + +This can e.g. be useful to only stream events in which a certain feature or a certain property/attribute was included. + +In addition to the parameters selecting from/to revision or timestamp, the following parameter can be defined: +* `filter`: specifies the [RQL](basic-rql.html) filter which events to return in the stream must match + +Examples: +```bash +# stream complete history starting from earliest available revision of a thing, but only those where a feature "bamboo" was modified: +curl --http2 -u ditto:ditto -H 'Accept:text/event-stream' -N \ + http://localhost:8080/api/2/things/org.eclipse.ditto:thing-2?from-historical-revision=0&fields=thingId,attributes,features,_revision,_modified&filter=exists(features/bamboo) + +# stream specific history range of a thing based on timestamps, filtering for temperature values of a sensor being greater than 50: +curl --http2 -u ditto:ditto -H 'Accept:text/event-stream' -N \ + http://localhost:8080/api/2/things/org.eclipse.ditto:thing-2?from-historical-timestamp=2022-10-24T11:44:36Z&to-historical-timestamp=2022-10-24T11:44:37Z&fields=thingId,attributes,features,_revision,_modified&filter=gt(features/temperature/properties/value,50) +``` + ### Streaming historical events via Ditto Protocol Please inspect the [protocol specification of DittoProtocol messages for streaming persisted events](protocol-specification-streaming-subscription.html) @@ -219,6 +240,24 @@ It will do so either until all existing events were sent, in that case a `comple Or it will stop after the `demand` was fulfilled, waiting for the requester to claim more demand with a new `request` message. +#### Filtering streamed historical events for things via Ditto Protocol + +The `filter` for streaming historical thing events may also be specified via Ditto Protocol. + +Example protocol message for subscribing for the persisted events of a thing with a `filter`: +```json +{ + "topic": "org.eclipse.ditto/thing-2/things/twin/streaming/subscribeForPersistedEvents", + "path": "/", + "headers": {}, + "value": { + "fromHistoricalRevision": 1, + "toHistoricalRevision": 10, + "filter": "exists(features/bamboo)" + } +} +``` + ## Configuring historical headers to persist diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder.java index d11705d0e0..f2efc90cca 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder.java @@ -393,6 +393,7 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage eventTs.isAfter(instant)) ).orElse(true) ) + .filter(event -> applyPersistedEventFilter(event, subscribeForPersistedEvents)) .takeWhile(event -> toHistoricalTimestamp.flatMap(instant -> event.getTimestamp().map(eventTs -> eventTs.isBefore(instant)) @@ -298,6 +299,15 @@ private void handleStreamPersistedEvents(final SubscribeForPersistedEvents subsc }); } + /** + * Applies filtering on the passed {@code event} subscribed to via the passed {@code subscribe} message. + * + * @param event the event which should be checked for being filtered out + * @param subscribe the subscribe message containing an RQL "filter" for applying filtering + * @return whether the event passes the filter + */ + protected abstract boolean applyPersistedEventFilter(Event event, SubscribeForPersistedEvents subscribe); + private Event mapJournalEntryToEvent(final SubscribeForPersistedEvents enforcedSubscribeForPersistedEvents, final EventEnvelope eventEnvelope) { diff --git a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicySupervisorActor.java b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicySupervisorActor.java index d01fbf3495..d123a60c2a 100755 --- a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicySupervisorActor.java +++ b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicySupervisorActor.java @@ -17,7 +17,12 @@ import javax.annotation.Nullable; +import org.apache.pekko.actor.ActorKilledException; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.Props; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder; +import org.eclipse.ditto.base.model.signals.commands.streaming.SubscribeForPersistedEvents; +import org.eclipse.ditto.base.model.signals.events.Event; import org.eclipse.ditto.base.service.actors.ShutdownBehaviour; import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig; import org.eclipse.ditto.base.service.config.supervision.LocalAskTimeoutConfig; @@ -37,10 +42,6 @@ import org.eclipse.ditto.policies.service.enforcement.PolicyCommandEnforcement; import org.eclipse.ditto.policies.service.persistence.actors.announcements.PolicyAnnouncementManager; -import org.apache.pekko.actor.ActorKilledException; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.Props; - /** * Supervisor for {@link org.eclipse.ditto.policies.service.persistence.actors.PolicyPersistenceActor} which means * it will create, start and watch it as child actor. @@ -142,6 +143,11 @@ protected ShutdownBehaviour getShutdownBehaviour(final PolicyId entityId) { return ShutdownBehaviour.fromId(entityId, pubSubMediator, getSelf()); } + @Override + protected boolean applyPersistedEventFilter(final Event event, final SubscribeForPersistedEvents subscribe) { + return true; + } + @Override protected DittoRuntimeExceptionBuilder getUnavailableExceptionBuilder(@Nullable final PolicyId entityId) { final PolicyId policyId = entityId != null ? entityId : PolicyId.of("UNKNOWN:ID"); diff --git a/policies/service/src/test/java/org/eclipse/ditto/policies/service/enforcement/PolicyCommandEnforcementTest.java b/policies/service/src/test/java/org/eclipse/ditto/policies/service/enforcement/PolicyCommandEnforcementTest.java index 9882f6cc15..47ef39cfb2 100644 --- a/policies/service/src/test/java/org/eclipse/ditto/policies/service/enforcement/PolicyCommandEnforcementTest.java +++ b/policies/service/src/test/java/org/eclipse/ditto/policies/service/enforcement/PolicyCommandEnforcementTest.java @@ -44,6 +44,8 @@ import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.entitytag.EntityTagMatchers; import org.eclipse.ditto.base.model.json.FieldType; +import org.eclipse.ditto.base.model.signals.commands.streaming.SubscribeForPersistedEvents; +import org.eclipse.ditto.base.model.signals.events.Event; import org.eclipse.ditto.base.service.actors.ShutdownBehaviour; import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig; import org.eclipse.ditto.base.service.config.supervision.LocalAskTimeoutConfig; @@ -826,6 +828,11 @@ protected ShutdownBehaviour getShutdownBehaviour(final PolicyId entityId) { return ShutdownBehaviour.fromId(entityId, pubSubMediator, getSelf()); } + @Override + protected boolean applyPersistedEventFilter(final Event event, final SubscribeForPersistedEvents subscribe) { + return true; + } + @Override protected DittoRuntimeExceptionBuilder getUnavailableExceptionBuilder(@Nullable final PolicyId entityId) { final PolicyId policyId = entityId != null ? entityId : PolicyId.of("UNKNOWN:ID"); diff --git a/protocol/src/main/java/org/eclipse/ditto/protocol/mapper/StreamingSubscriptionCommandSignalMapper.java b/protocol/src/main/java/org/eclipse/ditto/protocol/mapper/StreamingSubscriptionCommandSignalMapper.java index 49e1e1b6e5..fddd390b77 100644 --- a/protocol/src/main/java/org/eclipse/ditto/protocol/mapper/StreamingSubscriptionCommandSignalMapper.java +++ b/protocol/src/main/java/org/eclipse/ditto/protocol/mapper/StreamingSubscriptionCommandSignalMapper.java @@ -72,6 +72,9 @@ void enhancePayloadBuilder(final T command, final PayloadBuilder payloadBuilder) subscribeCommand.getToHistoricalTimestamp().ifPresent(toTs -> payloadContentBuilder.set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_TIMESTAMP, toTs.toString())); + subscribeCommand.getFilter().ifPresent(filter -> + payloadContentBuilder.set(SubscribeForPersistedEvents.JsonFields.FILTER, filter) + ); } else if (command instanceof CancelStreamingSubscription) { final CancelStreamingSubscription cancelCommand = (CancelStreamingSubscription) command; payloadContentBuilder diff --git a/protocol/src/main/java/org/eclipse/ditto/protocol/mappingstrategies/StreamingSubscriptionCommandMappingStrategies.java b/protocol/src/main/java/org/eclipse/ditto/protocol/mappingstrategies/StreamingSubscriptionCommandMappingStrategies.java index fb8881e7eb..a752372cfc 100644 --- a/protocol/src/main/java/org/eclipse/ditto/protocol/mappingstrategies/StreamingSubscriptionCommandMappingStrategies.java +++ b/protocol/src/main/java/org/eclipse/ditto/protocol/mappingstrategies/StreamingSubscriptionCommandMappingStrategies.java @@ -54,6 +54,7 @@ private static Map>> i toHistoricalRevision(adaptable), fromHistoricalTimestamp(adaptable), toHistoricalTimestamp(adaptable), + filterFrom(adaptable), dittoHeadersFrom(adaptable))); mappingStrategies.put(CancelStreamingSubscription.TYPE, adaptable -> CancelStreamingSubscription.of(entityIdFrom(adaptable), @@ -100,4 +101,9 @@ private static long demandFrom(final Adaptable adaptable) { return getFromValue(adaptable, RequestFromStreamingSubscription.JsonFields.DEMAND).orElse(0L); } + @Nullable + private static String filterFrom(final Adaptable adaptable) { + return getFromValue(adaptable, SubscribeForPersistedEvents.JsonFields.FILTER).orElse(null); + } + } diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java index 290e244ffe..7ee918714a 100755 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java @@ -47,6 +47,7 @@ import org.eclipse.ditto.base.model.signals.Signal; import org.eclipse.ditto.base.model.signals.commands.Command; import org.eclipse.ditto.base.model.signals.commands.CommandResponse; +import org.eclipse.ditto.base.model.signals.commands.streaming.SubscribeForPersistedEvents; import org.eclipse.ditto.base.model.signals.events.Event; import org.eclipse.ditto.base.service.actors.ShutdownBehaviour; import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig; @@ -65,7 +66,12 @@ import org.eclipse.ditto.policies.enforcement.PolicyEnforcerProvider; import org.eclipse.ditto.policies.enforcement.config.DefaultEnforcementConfig; import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicy; +import org.eclipse.ditto.rql.parser.RqlPredicateParser; +import org.eclipse.ditto.rql.query.criteria.Criteria; +import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory; +import org.eclipse.ditto.rql.query.things.ThingPredicateVisitor; import org.eclipse.ditto.things.api.ThingsMessagingConstants; +import org.eclipse.ditto.things.model.Thing; import org.eclipse.ditto.things.model.ThingId; import org.eclipse.ditto.things.model.signals.commands.ThingCommandResponse; import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingUnavailableException; @@ -74,6 +80,7 @@ import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse; import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommand; import org.eclipse.ditto.things.model.signals.events.ThingEvent; +import org.eclipse.ditto.things.model.signals.events.ThingEventToThingConverter; import org.eclipse.ditto.things.service.common.config.DittoThingsConfig; import org.eclipse.ditto.things.service.enforcement.ThingEnforcement; import org.eclipse.ditto.things.service.enforcement.ThingEnforcerActor; @@ -325,7 +332,8 @@ protected CompletionStage modifyTargetActorCommandResponse(final Signal< pair.response() instanceof RetrieveThingResponse retrieveThingResponse) { return inlinePolicyEnrichment.enrichPolicy(retrieveThing, retrieveThingResponse) .map(Object.class::cast); - } else if (RollbackCreatedPolicy.shouldRollbackBasedOnTargetActorResponse(pair.command(), pair.response())) { + } else if (RollbackCreatedPolicy.shouldRollbackBasedOnTargetActorResponse(pair.command(), + pair.response())) { final CompletableFuture responseF = new CompletableFuture<>(); getSelf().tell(RollbackCreatedPolicy.of(pair.command(), pair.response(), responseF), getSelf()); return Source.completionStage(responseF); @@ -338,12 +346,13 @@ protected CompletionStage modifyTargetActorCommandResponse(final Signal< } @Override - protected CompletableFuture handleTargetActorAndEnforcerException(final Signal signal, final Throwable throwable) { + protected CompletableFuture handleTargetActorAndEnforcerException(final Signal signal, + final Throwable throwable) { if (RollbackCreatedPolicy.shouldRollbackBasedOnException(signal, throwable)) { final Throwable cause = throwable.getCause(); log.withCorrelationId(signal) .info("Target actor exception received: <{}>, cause: <{}>. " + - "Sending RollbackCreatedPolicy msg to self, potentially rolling back a created policy.", + "Sending RollbackCreatedPolicy msg to self, potentially rolling back a created policy.", throwable.getClass().getSimpleName(), cause != null ? cause.getClass().getSimpleName() : "-"); final CompletableFuture responseFuture = new CompletableFuture<>(); getSelf().tell(RollbackCreatedPolicy.of(signal, throwable, responseFuture), getSelf()); @@ -461,13 +470,43 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha return ReceiveBuilder.create() .matchEquals(Control.SHUTDOWN_TIMEOUT, this::shutdownActor) .match(ThingPolicyCreated.class, msg -> { - log.withCorrelationId(msg.dittoHeaders()).info("ThingPolicyCreated msg received: <{}>", msg.policyId()); + log.withCorrelationId(msg.dittoHeaders()) + .info("ThingPolicyCreated msg received: <{}>", msg.policyId()); this.policyCreatedEvent = msg; }).match(RollbackCreatedPolicy.class, this::handleRollbackCreatedPolicy) .build() .orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior)); } + @Override + protected boolean applyPersistedEventFilter(final Event event, final SubscribeForPersistedEvents subscribe) { + + if (subscribe.getFilter().isEmpty()) { + return true; + } else if (event instanceof ThingEvent thingEvent) { + try { + final Criteria criteria = subscribe.getFilter() + .map(f -> parseCriteria(f, subscribe.getDittoHeaders())) + .orElse(null); + final Thing thing = ThingEventToThingConverter.thingEventToThing(thingEvent) + .orElseGet(() -> Thing.newBuilder().build()); + return ThingPredicateVisitor.apply(criteria).test(thing); + } catch (final DittoRuntimeException e) { + log.info("Got 'DittoRuntimeException' when parsing 'filter' during " + + "'SubscribeForPersistedEvents' processing: {}: <{}>", e.getClass().getSimpleName(), + e.getMessage()); + return false; + } + } else { + return false; + } + } + + private static Criteria parseCriteria(final String filter, final DittoHeaders dittoHeaders) { + final var queryFilterCriteriaFactory = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance()); + return queryFilterCriteriaFactory.filterCriteria(filter, dittoHeaders); + } + private void shutdownActor(final Control shutdown) { log.warning("Shutdown timeout <{}> reached; aborting <{}> ops and stopping myself", shutdownTimeout, getOpCounter()); @@ -483,15 +522,18 @@ private enum Control { /** * Used from the {@link org.eclipse.ditto.things.service.persistence.actors.ThingSupervisorActor} to signal itself * to delete an already created policy because of a failure in creating a thing + * * @param initialCommand the initial command that triggered the creation of a thing and policy * @param response the response from the thing persistence actor * @param responseFuture a future that when completed with the response from the thing persistence actor the response * will be sent to the initial sender. */ - private record RollbackCreatedPolicy(Signal initialCommand, Object response, CompletableFuture responseFuture) { + private record RollbackCreatedPolicy(Signal initialCommand, Object response, + CompletableFuture responseFuture) { /** * Initialises an instance of {@link org.eclipse.ditto.things.service.persistence.actors.ThingSupervisorActor.RollbackCreatedPolicy} + * * @param initialCommand the initial initialCommand that triggered the creation of a thing and policy * @param response the response from the thing persistence actor * @param responseFuture a future that when completed with the response from the thing persistence actor the response @@ -506,25 +548,30 @@ public static RollbackCreatedPolicy of(final Signal initialCommand, final Obj /** * Evaluates if a failure in the creation of a thing should lead to deleting of that thing's policy. * Should be used only to evaluate exceptions from the target actor not the enforcement actor. + * * @param command the initial command. * @param response the response from the {@link org.eclipse.ditto.things.service.persistence.actors.ThingPersistenceActor}. * @return if the thing's policy is to be deleted. */ - private static boolean shouldRollbackBasedOnTargetActorResponse(final Signal command, @Nullable final Object response) { + private static boolean shouldRollbackBasedOnTargetActorResponse(final Signal command, + @Nullable final Object response) { return command instanceof CreateThing && response instanceof DittoRuntimeException; } /** * Evaluates if a failure in the creation of a thing should lead to deleting of that thing's policy. + * * @param signal the initial signal. * @param throwable the throwable received from the Persistence Actor * @return if the thing's policy is to be deleted. */ - private static boolean shouldRollbackBasedOnException(final Signal signal, @Nullable final Throwable throwable) { - return signal instanceof CreateThing && ((throwable instanceof CompletionException ce1 && ce1.getCause() instanceof ThingUnavailableException) - || throwable instanceof AskTimeoutException - || (throwable instanceof CompletionException ce && ce.getCause() instanceof AskTimeoutException) - ); + private static boolean shouldRollbackBasedOnException(final Signal signal, + @Nullable final Throwable throwable) { + return signal instanceof CreateThing && ((throwable instanceof CompletionException ce1 && + ce1.getCause() instanceof ThingUnavailableException) + || throwable instanceof AskTimeoutException + || (throwable instanceof CompletionException ce && ce.getCause() instanceof AskTimeoutException) + ); } /**