Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1727 add new "thing-json" placeholder to be used in connections #1749

Merged
merged 5 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;

import javax.annotation.Nullable;

import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.model.PayloadMapping;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;

/**
* Represent an outbound signal that is ready to be mapped with the given {@link org.eclipse.ditto.connectivity.model.PayloadMapping}.
Expand Down Expand Up @@ -65,6 +66,11 @@ public List<Target> getTargets() {
return delegate.getTargets();
}

@Override
public Optional<JsonObject> getExtra() {
return delegate.getExtra();
}

@Override
public JsonObject toJson(final JsonSchemaVersion schemaVersion, final Predicate<JsonField> thePredicate) {
// the externalMessage is omitted as this should not be required to go over the wire
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;

import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.base.model.signals.Signal;

/**
* Represent an outbound signal that was mapped to an external message. It wraps the original signal, the mapped
Expand Down Expand Up @@ -60,6 +61,11 @@ public List<Target> getTargets() {
return delegate.getTargets();
}

@Override
public Optional<JsonObject> getExtra() {
return delegate.getExtra();
}

@Override
public JsonObject toJson(final JsonSchemaVersion schemaVersion, final Predicate<JsonField> thePredicate) {
// the externalMessage is omitted as this should not be required to go over the wire
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.base.model.signals.Signal;

import akka.actor.ActorRef;

Expand Down Expand Up @@ -75,6 +75,11 @@ public List<Target> getTargets() {
return first().getTargets();
}

@Override
public Optional<JsonObject> getExtra() {
return first().getExtra();
}

@Override
public List<Mapped> getMappedOutboundSignals() {
return outboundSignals;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.edge.service.headers.DittoHeadersValidator;
import org.eclipse.ditto.edge.service.placeholders.ThingJsonPlaceholder;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.internal.utils.akka.controlflow.AbstractGraphActor;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
Expand All @@ -96,6 +97,7 @@
import org.eclipse.ditto.rql.query.criteria.Criteria;
import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory;
import org.eclipse.ditto.rql.query.things.ThingPredicateVisitor;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingFieldSelector;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
Expand Down Expand Up @@ -137,6 +139,7 @@ public final class OutboundMappingProcessorActor
private static final TopicPathPlaceholder TOPIC_PATH_PLACEHOLDER = TopicPathPlaceholder.getInstance();
private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance();
private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance();
private static final ThingJsonPlaceholder THING_JSON_PLACEHOLDER = ThingJsonPlaceholder.getInstance();

private final ActorRef clientActor;
private final Connection connection;
Expand Down Expand Up @@ -791,8 +794,13 @@ private Collection<OutboundSignalWithSender> applyFilter(final OutboundSignalWit
return outboundSignalWithExtra.getExtra()
.flatMap(extra -> ThingEventToThingConverter
.mergeThingWithExtraFields(signal, extraFields.get(), extra)
.filter(ThingPredicateVisitor.apply(criteria, topicPathPlaceholderResolver,
resourcePlaceholderResolver, timePlaceholderResolver))
.filter(thing -> {
final PlaceholderResolver<Thing> thingPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(THING_JSON_PLACEHOLDER, thing);
return ThingPredicateVisitor.apply(criteria, topicPathPlaceholderResolver,
resourcePlaceholderResolver, timePlaceholderResolver, thingPlaceholderResolver)
.test(thing);
})
.map(thing -> outboundSignalWithExtra))
.map(Collections::singletonList)
.orElse(List.of());
Expand Down Expand Up @@ -938,7 +946,7 @@ private OutboundSignalWithSender setFailedEnrichment(final DittoRuntimeException
sender, Pair.apply(e, t), extra);
}

private OutboundSignalWithSender setExtra(final JsonObject extra) {
public OutboundSignalWithSender setExtra(final JsonObject extra) {
return new OutboundSignalWithSender(
OutboundSignalFactory.newOutboundSignal(delegate.getSource(), getTargets()),
sender, enrichmentFailure, extra
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import javax.annotation.Nullable;

Expand All @@ -26,15 +27,18 @@
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.api.OutboundSignal;
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.Placeholder;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
import org.eclipse.ditto.placeholders.PlaceholderResolver;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;
import org.eclipse.ditto.things.model.signals.events.ThingEventToThingConverter;

/**
* Creator of expression resolvers for incoming and outgoing messages.
Expand All @@ -52,17 +56,25 @@ private Resolvers() {
*/
private static final List<ResolverCreator<?>> RESOLVER_CREATORS = Arrays.asList(
// For incoming messages, header mapping injects headers of external messages into Ditto headers.
ResolverCreator.of(PlaceholderFactory.newHeadersPlaceholder(), (e, s, t, a, c) -> e),
ResolverCreator.of(PlaceholderFactory.newHeadersPlaceholder(), (h, s, t, a, c, e) -> h),
ResolverCreator.of(ConnectivityPlaceholders.newEntityPlaceholder(),
(e, s, t, a, c) -> WithEntityId.getEntityIdOfType(EntityId.class, s).orElse(null)),
(h, s, t, a, c, e) -> WithEntityId.getEntityIdOfType(EntityId.class, s).orElse(null)),
ResolverCreator.of(ConnectivityPlaceholders.newThingPlaceholder(),
(e, s, t, a, c) -> WithEntityId.getEntityIdOfType(EntityId.class, s).orElse(null)),
ResolverCreator.of(ConnectivityPlaceholders.newFeaturePlaceholder(), (e, s, t, a, c) -> s),
ResolverCreator.of(ConnectivityPlaceholders.newTopicPathPlaceholder(), (e, s, t, a, c) -> t),
ResolverCreator.of(ConnectivityPlaceholders.newResourcePlaceholder(), (e, s, t, a, c) -> s),
ResolverCreator.of(ConnectivityPlaceholders.newTimePlaceholder(), (e, s, t, a, c) -> new Object()),
ResolverCreator.of(ConnectivityPlaceholders.newRequestPlaceholder(), (e, s, t, a, c) -> a),
ResolverCreator.of(ConnectivityPlaceholders.newConnectionIdPlaceholder(), (e, s, t, a, c) -> c)
(h, s, t, a, c, e) -> WithEntityId.getEntityIdOfType(EntityId.class, s).orElse(null)),
ResolverCreator.of(ConnectivityPlaceholders.newThingJsonPlaceholder(),
(h, s, t, a, c, e) -> ThingEventToThingConverter
.mergeThingWithExtraFields(s,
JsonFieldSelector.newInstance(""),
Optional.ofNullable(e).orElse(JsonObject.empty())
)
.orElse(null)
),
ResolverCreator.of(ConnectivityPlaceholders.newFeaturePlaceholder(), (h, s, t, a, c, e) -> s),
ResolverCreator.of(ConnectivityPlaceholders.newTopicPathPlaceholder(), (h, s, t, a, c, e) -> t),
ResolverCreator.of(ConnectivityPlaceholders.newResourcePlaceholder(), (h, s, t, a, c, e) -> s),
ResolverCreator.of(ConnectivityPlaceholders.newTimePlaceholder(), (h, s, t, a, c, e) -> new Object()),
ResolverCreator.of(ConnectivityPlaceholders.newRequestPlaceholder(), (h, s, t, a, c, e) -> a),
ResolverCreator.of(ConnectivityPlaceholders.newConnectionIdPlaceholder(), (h, s, t, a, c, e) -> c)
);

private static final List<Placeholder<?>> PLACEHOLDERS = Collections.unmodifiableList(
Expand Down Expand Up @@ -94,16 +106,21 @@ public static ExpressionResolver forOutbound(final OutboundSignal.Mapped mappedO
final Adaptable adaptable = mappedOutboundSignal.getAdaptable();
return PlaceholderFactory.newExpressionResolver(
RESOLVER_CREATORS.stream()
.map(creator -> creator.create(adaptable.getDittoHeaders(), signal,
.map(creator -> creator.create(adaptable.getDittoHeaders(),
signal,
externalMessage.getTopicPath().orElse(null),
signal.getDittoHeaders().getAuthorizationContext(),
sendingConnectionId))
sendingConnectionId,
mappedOutboundSignal.getExtra()
.or(() -> mappedOutboundSignal.getAdaptable().getPayload().getExtra())
.orElse(null)
))
.toArray(PlaceholderResolver[]::new)
);
}

/**
* Create an expression resolver for an signal.
* Create an expression resolver for a signal.
*
* @param signal the signal.
* @param connectionId the ID of the connection that handles the signal
Expand All @@ -114,10 +131,13 @@ public static ExpressionResolver forSignal(final Signal<?> signal,
final ConnectionId connectionId) {
return PlaceholderFactory.newExpressionResolver(
RESOLVER_CREATORS.stream()
.map(creator -> creator.create(signal.getDittoHeaders(), signal,
.map(creator -> creator.create(signal.getDittoHeaders(),
signal,
PROTOCOL_ADAPTER.toTopicPath(signal),
signal.getDittoHeaders().getAuthorizationContext(),
connectionId))
connectionId,
null
))
.toArray(PlaceholderResolver[]::new)
);
}
Expand All @@ -135,10 +155,13 @@ public static ExpressionResolver forExternalMessage(final ExternalMessage messag

return PlaceholderFactory.newExpressionResolver(
RESOLVER_CREATORS.stream()
.map(creator -> creator.create(makeCaseInsensitive(message.getHeaders()), null,
.map(creator -> creator.create(makeCaseInsensitive(message.getHeaders()),
null,
message.getTopicPath().orElse(null),
message.getAuthorizationContext().orElse(null),
receivingConnectionId))
receivingConnectionId,
null
))
.toArray(PlaceholderResolver[]::new)
);
}
Expand All @@ -151,16 +174,18 @@ public static ExpressionResolver forExternalMessage(final ExternalMessage messag
* @return the expression resolver.
* @since 1.2.0
*/
public static ExpressionResolver forOutboundSignal(final OutboundSignal.Mappable outboundSignal, final
ConnectionId sendingConnectionId) {
public static ExpressionResolver forOutboundSignal(final OutboundSignal.Mappable outboundSignal,
final ConnectionId sendingConnectionId) {

return PlaceholderFactory.newExpressionResolver(
RESOLVER_CREATORS.stream()
.map(creator -> creator.create(outboundSignal.getSource().getDittoHeaders(),
outboundSignal.getSource(),
PROTOCOL_ADAPTER.toTopicPath(outboundSignal.getSource()),
outboundSignal.getSource().getDittoHeaders().getAuthorizationContext(),
sendingConnectionId))
sendingConnectionId,
outboundSignal.getExtra().orElse(null)
))
.toArray(PlaceholderResolver[]::new)
);
}
Expand All @@ -180,9 +205,13 @@ public static ExpressionResolver forInbound(final ExternalMessage externalMessag
@Nullable final ConnectionId connectionId) {
return PlaceholderFactory.newExpressionResolver(
RESOLVER_CREATORS.stream()
.map(creator ->
creator.create(makeCaseInsensitive(externalMessage.getHeaders()), signal, topicPath,
authorizationContext, connectionId))
.map(creator -> creator.create(makeCaseInsensitive(externalMessage.getHeaders()),
signal,
topicPath,
authorizationContext,
connectionId,
null
))
.toArray(PlaceholderResolver[]::new)
);
}
Expand All @@ -200,8 +229,9 @@ private static DittoHeaders makeCaseInsensitive(final Map<String, String> extern
private interface ResolverDataExtractor<T> {

@Nullable
T extract(Map<String, String> inputHeaders, @Nullable Signal<?> signal, @Nullable TopicPath topicPath,
@Nullable AuthorizationContext authorizationContext, @Nullable final ConnectionId connectionId);
T extract(Map<String, String> headers, @Nullable Signal<?> signal, @Nullable TopicPath topicPath,
@Nullable AuthorizationContext authorizationContext, @Nullable ConnectionId connectionId,
@Nullable JsonObject extraFields);
}

/**
Expand All @@ -228,9 +258,10 @@ private static <T> ResolverCreator<T> of(final Placeholder<T> placeholder,

private PlaceholderResolver<T> create(final Map<String, String> inputHeaders, @Nullable final Signal<?> signal,
@Nullable final TopicPath topicPath, @Nullable final AuthorizationContext authorizationContext,
@Nullable ConnectionId connectionId) {
return PlaceholderFactory.newPlaceholderResolver(placeholder,
dataExtractor.extract(inputHeaders, signal, topicPath, authorizationContext, connectionId));
@Nullable final ConnectionId connectionId, @Nullable final JsonObject extraFields) {
return PlaceholderFactory.newPlaceholderResolver(placeholder, dataExtractor.extract(
inputHeaders, signal, topicPath, authorizationContext, connectionId, extraFields
));
}

private Placeholder<T> getPlaceholder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.eclipse.ditto.edge.service.placeholders.FeaturePlaceholder;
import org.eclipse.ditto.edge.service.placeholders.PolicyPlaceholder;
import org.eclipse.ditto.edge.service.placeholders.RequestPlaceholder;
import org.eclipse.ditto.edge.service.placeholders.ThingJsonPlaceholder;
import org.eclipse.ditto.edge.service.placeholders.ThingPlaceholder;
import org.eclipse.ditto.placeholders.TimePlaceholder;
import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder;
Expand All @@ -34,6 +35,13 @@ public static ThingPlaceholder newThingPlaceholder() {
return ThingPlaceholder.getInstance();
}

/**
* @return the singleton instance of {@link ThingJsonPlaceholder}
*/
public static ThingJsonPlaceholder newThingJsonPlaceholder() {
return ThingJsonPlaceholder.getInstance();
}

/**
* @return the singleton instance of {@link PolicyPlaceholder}
*/
Expand Down
Loading
Loading