Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1583 apply RQL based filtering when streaming "historical" thing events #1815

Merged
merged 2 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public final class SubscribeForPersistedEvents extends AbstractStreamingSubscrip
@Nullable private final Instant fromHistoricalTimestamp;
@Nullable private final Instant toHistoricalTimestamp;
@Nullable private final String prefix;
@Nullable private final String filter;

private SubscribeForPersistedEvents(final EntityId entityId,
final JsonPointer resourcePath,
Expand All @@ -69,6 +70,7 @@ private SubscribeForPersistedEvents(final EntityId entityId,
@Nullable final Instant fromHistoricalTimestamp,
@Nullable final Instant toHistoricalTimestamp,
@Nullable final String prefix,
@Nullable final CharSequence filter,
final DittoHeaders dittoHeaders) {

super(TYPE, entityId, resourcePath, dittoHeaders);
Expand All @@ -77,6 +79,7 @@ private SubscribeForPersistedEvents(final EntityId entityId,
this.fromHistoricalTimestamp = fromHistoricalTimestamp;
this.toHistoricalTimestamp = toHistoricalTimestamp;
this.prefix = prefix;
this.filter = filter != null ? filter.toString() : null;
}

/**
Expand All @@ -89,7 +92,9 @@ private SubscribeForPersistedEvents(final EntityId entityId,
* @param dittoHeaders the command headers of the request.
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @deprecated since 3.5.0, use {@link #of(EntityId, JsonPointer, long, long, CharSequence, DittoHeaders)}
*/
@Deprecated
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
final long fromHistoricalRevision,
Expand All @@ -103,6 +108,38 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
null,
null,
null,
null,
dittoHeaders);
}

/**
* Creates a new {@code SudoStreamSnapshots} command based on "from" and "to" {@code long} revisions.
*
* @param entityId the entityId that should be streamed.
* @param resourcePath the resource path for which to stream events.
* @param fromHistoricalRevision the revision to start the streaming from.
* @param toHistoricalRevision the revision to stop the streaming at.
* @param dittoHeaders the command headers of the request.
* @param filter the optional RQL filter to apply for persisted events before publishing to the stream
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @since 3.5.0
*/
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
final long fromHistoricalRevision,
final long toHistoricalRevision,
@Nullable final CharSequence filter,
final DittoHeaders dittoHeaders) {

return new SubscribeForPersistedEvents(entityId,
resourcePath,
fromHistoricalRevision,
toHistoricalRevision,
null,
null,
null,
filter,
dittoHeaders);
}

Expand All @@ -116,7 +153,9 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
* @param dittoHeaders the command headers of the request.
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @deprecated since 3.5.0, use {@link #of(EntityId, JsonPointer, Instant, Instant, CharSequence, DittoHeaders)}
*/
@Deprecated
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
@Nullable final Instant fromHistoricalTimestamp,
Expand All @@ -130,6 +169,72 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
fromHistoricalTimestamp,
toHistoricalTimestamp,
null,
null,
dittoHeaders);
}

/**
* Creates a new {@code SudoStreamSnapshots} command based on "from" and "to" {@code Instant} timestamps.
*
* @param entityId the entityId that should be streamed.
* @param resourcePath the resource path for which to stream events.
* @param fromHistoricalTimestamp the timestamp to start the streaming from.
* @param toHistoricalTimestamp the timestamp to stop the streaming at.
* @param dittoHeaders the command headers of the request.
* @param filter the optional RQL filter to apply for persisted events before publishing to the stream
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @since 3.5.0
*/
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
@Nullable final Instant fromHistoricalTimestamp,
@Nullable final Instant toHistoricalTimestamp,
@Nullable final CharSequence filter,
final DittoHeaders dittoHeaders) {

return new SubscribeForPersistedEvents(entityId,
resourcePath,
0L,
Long.MAX_VALUE,
fromHistoricalTimestamp,
toHistoricalTimestamp,
null,
filter,
dittoHeaders);
}

/**
* Creates a new {@code SudoStreamSnapshots} command based on "from" and "to" {@code Instant} timestamps.
*
* @param entityId the entityId that should be streamed.
* @param resourcePath the resource path for which to stream events.
* @param fromHistoricalRevision the revision to start the streaming from.
* @param toHistoricalRevision the revision to stop the streaming at.
* @param fromHistoricalTimestamp the timestamp to start the streaming from.
* @param toHistoricalTimestamp the timestamp to stop the streaming at.
* @param dittoHeaders the command headers of the request.
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @deprecated since 3.5.0, use {@link #of(EntityId, JsonPointer, Long, Long, Instant, Instant, CharSequence, DittoHeaders)}
*/
@Deprecated
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
@Nullable final Long fromHistoricalRevision,
@Nullable final Long toHistoricalRevision,
@Nullable final Instant fromHistoricalTimestamp,
@Nullable final Instant toHistoricalTimestamp,
final DittoHeaders dittoHeaders) {

return new SubscribeForPersistedEvents(entityId,
resourcePath,
null != fromHistoricalRevision ? fromHistoricalRevision : 0L,
null != toHistoricalRevision ? toHistoricalRevision : Long.MAX_VALUE,
fromHistoricalTimestamp,
toHistoricalTimestamp,
null,
null,
dittoHeaders);
}

Expand All @@ -142,16 +247,19 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
* @param toHistoricalRevision the revision to stop the streaming at.
* @param fromHistoricalTimestamp the timestamp to start the streaming from.
* @param toHistoricalTimestamp the timestamp to stop the streaming at.
* @param filter the optional RQL filter to apply for persisted events before publishing to the stream
* @param dittoHeaders the command headers of the request.
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @since 3.5.0
*/
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
@Nullable final Long fromHistoricalRevision,
@Nullable final Long toHistoricalRevision,
@Nullable final Instant fromHistoricalTimestamp,
@Nullable final Instant toHistoricalTimestamp,
@Nullable final CharSequence filter,
final DittoHeaders dittoHeaders) {

return new SubscribeForPersistedEvents(entityId,
Expand All @@ -161,6 +269,7 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
fromHistoricalTimestamp,
toHistoricalTimestamp,
null,
filter,
dittoHeaders);
}

Expand All @@ -182,6 +291,7 @@ public static SubscribeForPersistedEvents fromJson(final JsonObject jsonObject,
jsonObject.getValue(JsonFields.JSON_FROM_HISTORICAL_TIMESTAMP).map(Instant::parse).orElse(null),
jsonObject.getValue(JsonFields.JSON_TO_HISTORICAL_TIMESTAMP).map(Instant::parse).orElse(null),
jsonObject.getValue(JsonFields.PREFIX).orElse(null),
jsonObject.getValue(JsonFields.FILTER).orElse(null),
dittoHeaders
);
}
Expand All @@ -195,7 +305,7 @@ public static SubscribeForPersistedEvents fromJson(final JsonObject jsonObject,
*/
public SubscribeForPersistedEvents setPrefix(@Nullable final String prefix) {
return new SubscribeForPersistedEvents(entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision,
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, getDittoHeaders());
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, filter, getDittoHeaders());
}

/**
Expand Down Expand Up @@ -244,6 +354,14 @@ public Optional<String> getPrefix() {
return Optional.ofNullable(prefix);
}

/**
* @return the optional RQL filter to apply for persisted events before publishing to the stream
* @since 3.5.0
*/
public Optional<String> getFilter() {
return Optional.ofNullable(filter);
}

@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder,
final JsonSchemaVersion schemaVersion,
Expand All @@ -263,6 +381,7 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder,
jsonObjectBuilder.set(JsonFields.JSON_TO_HISTORICAL_TIMESTAMP, toHistoricalTimestamp.toString(), predicate);
}
getPrefix().ifPresent(thePrefix -> jsonObjectBuilder.set(JsonFields.PREFIX, thePrefix));
getFilter().ifPresent(theFilter -> jsonObjectBuilder.set(JsonFields.FILTER, theFilter));
}

@Override
Expand All @@ -273,13 +392,13 @@ public String getTypePrefix() {
@Override
public SubscribeForPersistedEvents setDittoHeaders(final DittoHeaders dittoHeaders) {
return new SubscribeForPersistedEvents(entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision,
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, dittoHeaders);
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, filter, dittoHeaders);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision,
fromHistoricalTimestamp, toHistoricalTimestamp, prefix);
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, filter);
}

@Override
Expand All @@ -297,7 +416,8 @@ public boolean equals(@Nullable final Object obj) {
toHistoricalRevision == that.toHistoricalRevision &&
Objects.equals(fromHistoricalTimestamp, that.fromHistoricalTimestamp) &&
Objects.equals(toHistoricalTimestamp, that.toHistoricalTimestamp) &&
Objects.equals(prefix, that.prefix);
Objects.equals(prefix, that.prefix) &&
Objects.equals(filter, that.filter);
}

@Override
Expand All @@ -313,6 +433,7 @@ public String toString() {
+ ", fromHistoricalTimestamp=" + fromHistoricalTimestamp
+ ", toHistoricalTimestamp=" + toHistoricalTimestamp
+ ", prefix=" + prefix
+ ", filter=" + filter
+ "]";
}

Expand All @@ -339,6 +460,9 @@ private JsonFields() {

static final JsonFieldDefinition<String> PREFIX =
JsonFactory.newStringFieldDefinition("prefix", REGULAR, V_2);

public static final JsonFieldDefinition<String> FILTER =
JsonFactory.newStringFieldDefinition("filter", REGULAR, V_2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public final class SubscribeForPersistedEventsTest {
private static final long KNOWN_TO_REV = 42L;
private static final String KNOWN_FROM_TS = "2022-10-25T14:00:00Z";
private static final String KNOWN_TO_TS = "2022-10-25T15:00:00Z";
private static final String KNOWN_FILTER = "exists(thingId)";

private static final String JSON_ALL_FIELDS = JsonFactory.newObjectBuilder()
.set(Command.JsonFields.TYPE, SubscribeForPersistedEvents.TYPE)
Expand All @@ -53,6 +54,7 @@ public final class SubscribeForPersistedEventsTest {
.set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_REVISION, KNOWN_TO_REV)
.set(SubscribeForPersistedEvents.JsonFields.JSON_FROM_HISTORICAL_TIMESTAMP, KNOWN_FROM_TS)
.set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_TIMESTAMP, KNOWN_TO_TS)
.set(SubscribeForPersistedEvents.JsonFields.FILTER, KNOWN_FILTER)
.build()
.toString();

Expand All @@ -63,6 +65,7 @@ public final class SubscribeForPersistedEventsTest {
.set(StreamingSubscriptionCommand.JsonFields.JSON_RESOURCE_PATH, KNOWN_RESOURCE_PATH)
.set(SubscribeForPersistedEvents.JsonFields.JSON_FROM_HISTORICAL_REVISION, KNOWN_FROM_REV)
.set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_REVISION, KNOWN_TO_REV)
.set(SubscribeForPersistedEvents.JsonFields.FILTER, KNOWN_FILTER)
.build().toString();

@Test
Expand All @@ -88,6 +91,7 @@ public void toJsonWithAllFieldsSet() {
KNOWN_TO_REV,
Instant.parse(KNOWN_FROM_TS),
Instant.parse(KNOWN_TO_TS),
KNOWN_FILTER,
DittoHeaders.empty()
);

Expand All @@ -102,6 +106,7 @@ public void toJsonWithOnlyRequiredFieldsSet() {
JsonPointer.of(KNOWN_RESOURCE_PATH),
KNOWN_FROM_REV,
KNOWN_TO_REV,
KNOWN_FILTER,
DittoHeaders.empty());
final String json = command.toJsonString();
assertThat(json).isEqualTo(JSON_MINIMAL);
Expand All @@ -116,6 +121,7 @@ public void fromJsonWithAllFieldsSet() {
KNOWN_TO_REV,
Instant.parse(KNOWN_FROM_TS),
Instant.parse(KNOWN_TO_TS),
KNOWN_FILTER,
DittoHeaders.empty()
);
assertThat(SubscribeForPersistedEvents.fromJson(JsonObject.of(JSON_ALL_FIELDS), DittoHeaders.empty()))
Expand All @@ -130,6 +136,7 @@ public void fromJsonWithOnlyRequiredFieldsSet() {
JsonPointer.of(KNOWN_RESOURCE_PATH),
KNOWN_FROM_REV,
KNOWN_TO_REV,
KNOWN_FILTER,
DittoHeaders.empty()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,22 @@
import javax.annotation.Nullable;
import javax.jms.JMSRuntimeException;

import org.apache.pekko.actor.ActorKilledException;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.OneForOneStrategy;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.ReceiveTimeout;
import org.apache.pekko.actor.SupervisorStrategy;
import org.apache.pekko.japi.pf.DeciderBuilder;
import org.apache.pekko.japi.pf.FI;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.streaming.SubscribeForPersistedEvents;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.base.service.config.supervision.LocalAskTimeoutConfig;
Expand All @@ -48,17 +60,6 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.apache.pekko.actor.ActorKilledException;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.OneForOneStrategy;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.ReceiveTimeout;
import org.apache.pekko.actor.SupervisorStrategy;
import org.apache.pekko.japi.pf.DeciderBuilder;
import org.apache.pekko.japi.pf.FI;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;

/**
* Supervisor for {@link ConnectionPersistenceActor} which means it will create, start and watch it as child actor.
* <p>
Expand Down Expand Up @@ -158,6 +159,11 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha
.orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior));
}

@Override
protected boolean applyPersistedEventFilter(final Event<?> event, final SubscribeForPersistedEvents subscribe) {
return true;
}

@Override
protected boolean shouldBecomeTwinSignalProcessingAwaiting(final Signal<?> signal) {
return super.shouldBecomeTwinSignalProcessingAwaiting(signal) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
"type": "string",
"format": "date-time",
"description": "The timestamp to stop the streaming at."
},
"filter": {
"type": "string",
"description": "An RQL expression defining which events to filter for in the stream. Only supported for thing events."
}
}
}
Loading
Loading