Skip to content

Commit

Permalink
Merge pull request #1749 from eclipse-ditto/feature/thing-json-placeh…
Browse files Browse the repository at this point in the history
…older

#1727 add new "thing-json" placeholder to be used in connections
  • Loading branch information
thjaeckle authored Sep 18, 2023
2 parents ea8055e + a630334 commit 6405ec8
Show file tree
Hide file tree
Showing 25 changed files with 1,210 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;

import javax.annotation.Nullable;

import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.model.PayloadMapping;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;

/**
* Represent an outbound signal that is ready to be mapped with the given {@link org.eclipse.ditto.connectivity.model.PayloadMapping}.
Expand Down Expand Up @@ -65,6 +66,11 @@ public List<Target> getTargets() {
return delegate.getTargets();
}

@Override
public Optional<JsonObject> getExtra() {
return delegate.getExtra();
}

@Override
public JsonObject toJson(final JsonSchemaVersion schemaVersion, final Predicate<JsonField> thePredicate) {
// the externalMessage is omitted as this should not be required to go over the wire
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;

import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.base.model.signals.Signal;

/**
* Represent an outbound signal that was mapped to an external message. It wraps the original signal, the mapped
Expand Down Expand Up @@ -60,6 +61,11 @@ public List<Target> getTargets() {
return delegate.getTargets();
}

@Override
public Optional<JsonObject> getExtra() {
return delegate.getExtra();
}

@Override
public JsonObject toJson(final JsonSchemaVersion schemaVersion, final Predicate<JsonField> thePredicate) {
// the externalMessage is omitted as this should not be required to go over the wire
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.base.model.signals.Signal;

import akka.actor.ActorRef;

Expand Down Expand Up @@ -75,6 +75,11 @@ public List<Target> getTargets() {
return first().getTargets();
}

@Override
public Optional<JsonObject> getExtra() {
return first().getExtra();
}

@Override
public List<Mapped> getMappedOutboundSignals() {
return outboundSignals;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.edge.service.headers.DittoHeadersValidator;
import org.eclipse.ditto.edge.service.placeholders.ThingJsonPlaceholder;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.internal.utils.akka.controlflow.AbstractGraphActor;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
Expand All @@ -96,6 +97,7 @@
import org.eclipse.ditto.rql.query.criteria.Criteria;
import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory;
import org.eclipse.ditto.rql.query.things.ThingPredicateVisitor;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingFieldSelector;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
Expand Down Expand Up @@ -137,6 +139,7 @@ public final class OutboundMappingProcessorActor
private static final TopicPathPlaceholder TOPIC_PATH_PLACEHOLDER = TopicPathPlaceholder.getInstance();
private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance();
private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance();
private static final ThingJsonPlaceholder THING_JSON_PLACEHOLDER = ThingJsonPlaceholder.getInstance();

private final ActorRef clientActor;
private final Connection connection;
Expand Down Expand Up @@ -791,8 +794,13 @@ private Collection<OutboundSignalWithSender> applyFilter(final OutboundSignalWit
return outboundSignalWithExtra.getExtra()
.flatMap(extra -> ThingEventToThingConverter
.mergeThingWithExtraFields(signal, extraFields.get(), extra)
.filter(ThingPredicateVisitor.apply(criteria, topicPathPlaceholderResolver,
resourcePlaceholderResolver, timePlaceholderResolver))
.filter(thing -> {
final PlaceholderResolver<Thing> thingPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(THING_JSON_PLACEHOLDER, thing);
return ThingPredicateVisitor.apply(criteria, topicPathPlaceholderResolver,
resourcePlaceholderResolver, timePlaceholderResolver, thingPlaceholderResolver)
.test(thing);
})
.map(thing -> outboundSignalWithExtra))
.map(Collections::singletonList)
.orElse(List.of());
Expand Down Expand Up @@ -938,7 +946,7 @@ private OutboundSignalWithSender setFailedEnrichment(final DittoRuntimeException
sender, Pair.apply(e, t), extra);
}

private OutboundSignalWithSender setExtra(final JsonObject extra) {
public OutboundSignalWithSender setExtra(final JsonObject extra) {
return new OutboundSignalWithSender(
OutboundSignalFactory.newOutboundSignal(delegate.getSource(), getTargets()),
sender, enrichmentFailure, extra
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import javax.annotation.Nullable;

Expand All @@ -26,15 +27,18 @@
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.api.OutboundSignal;
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.Placeholder;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
import org.eclipse.ditto.placeholders.PlaceholderResolver;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;
import org.eclipse.ditto.things.model.signals.events.ThingEventToThingConverter;

/**
* Creator of expression resolvers for incoming and outgoing messages.
Expand All @@ -52,17 +56,25 @@ private Resolvers() {
*/
private static final List<ResolverCreator<?>> RESOLVER_CREATORS = Arrays.asList(
// For incoming messages, header mapping injects headers of external messages into Ditto headers.
ResolverCreator.of(PlaceholderFactory.newHeadersPlaceholder(), (e, s, t, a, c) -> e),
ResolverCreator.of(PlaceholderFactory.newHeadersPlaceholder(), (h, s, t, a, c, e) -> h),
ResolverCreator.of(ConnectivityPlaceholders.newEntityPlaceholder(),
(e, s, t, a, c) -> WithEntityId.getEntityIdOfType(EntityId.class, s).orElse(null)),
(h, s, t, a, c, e) -> WithEntityId.getEntityIdOfType(EntityId.class, s).orElse(null)),
ResolverCreator.of(ConnectivityPlaceholders.newThingPlaceholder(),
(e, s, t, a, c) -> WithEntityId.getEntityIdOfType(EntityId.class, s).orElse(null)),
ResolverCreator.of(ConnectivityPlaceholders.newFeaturePlaceholder(), (e, s, t, a, c) -> s),
ResolverCreator.of(ConnectivityPlaceholders.newTopicPathPlaceholder(), (e, s, t, a, c) -> t),
ResolverCreator.of(ConnectivityPlaceholders.newResourcePlaceholder(), (e, s, t, a, c) -> s),
ResolverCreator.of(ConnectivityPlaceholders.newTimePlaceholder(), (e, s, t, a, c) -> new Object()),
ResolverCreator.of(ConnectivityPlaceholders.newRequestPlaceholder(), (e, s, t, a, c) -> a),
ResolverCreator.of(ConnectivityPlaceholders.newConnectionIdPlaceholder(), (e, s, t, a, c) -> c)
(h, s, t, a, c, e) -> WithEntityId.getEntityIdOfType(EntityId.class, s).orElse(null)),
ResolverCreator.of(ConnectivityPlaceholders.newThingJsonPlaceholder(),
(h, s, t, a, c, e) -> ThingEventToThingConverter
.mergeThingWithExtraFields(s,
JsonFieldSelector.newInstance(""),
Optional.ofNullable(e).orElse(JsonObject.empty())
)
.orElse(null)
),
ResolverCreator.of(ConnectivityPlaceholders.newFeaturePlaceholder(), (h, s, t, a, c, e) -> s),
ResolverCreator.of(ConnectivityPlaceholders.newTopicPathPlaceholder(), (h, s, t, a, c, e) -> t),
ResolverCreator.of(ConnectivityPlaceholders.newResourcePlaceholder(), (h, s, t, a, c, e) -> s),
ResolverCreator.of(ConnectivityPlaceholders.newTimePlaceholder(), (h, s, t, a, c, e) -> new Object()),
ResolverCreator.of(ConnectivityPlaceholders.newRequestPlaceholder(), (h, s, t, a, c, e) -> a),
ResolverCreator.of(ConnectivityPlaceholders.newConnectionIdPlaceholder(), (h, s, t, a, c, e) -> c)
);

private static final List<Placeholder<?>> PLACEHOLDERS = Collections.unmodifiableList(
Expand Down Expand Up @@ -94,16 +106,21 @@ public static ExpressionResolver forOutbound(final OutboundSignal.Mapped mappedO
final Adaptable adaptable = mappedOutboundSignal.getAdaptable();
return PlaceholderFactory.newExpressionResolver(
RESOLVER_CREATORS.stream()
.map(creator -> creator.create(adaptable.getDittoHeaders(), signal,
.map(creator -> creator.create(adaptable.getDittoHeaders(),
signal,
externalMessage.getTopicPath().orElse(null),
signal.getDittoHeaders().getAuthorizationContext(),
sendingConnectionId))
sendingConnectionId,
mappedOutboundSignal.getExtra()
.or(() -> mappedOutboundSignal.getAdaptable().getPayload().getExtra())
.orElse(null)
))
.toArray(PlaceholderResolver[]::new)
);
}

/**
* Create an expression resolver for an signal.
* Create an expression resolver for a signal.
*
* @param signal the signal.
* @param connectionId the ID of the connection that handles the signal
Expand All @@ -114,10 +131,13 @@ public static ExpressionResolver forSignal(final Signal<?> signal,
final ConnectionId connectionId) {
return PlaceholderFactory.newExpressionResolver(
RESOLVER_CREATORS.stream()
.map(creator -> creator.create(signal.getDittoHeaders(), signal,
.map(creator -> creator.create(signal.getDittoHeaders(),
signal,
PROTOCOL_ADAPTER.toTopicPath(signal),
signal.getDittoHeaders().getAuthorizationContext(),
connectionId))
connectionId,
null
))
.toArray(PlaceholderResolver[]::new)
);
}
Expand All @@ -135,10 +155,13 @@ public static ExpressionResolver forExternalMessage(final ExternalMessage messag

return PlaceholderFactory.newExpressionResolver(
RESOLVER_CREATORS.stream()
.map(creator -> creator.create(makeCaseInsensitive(message.getHeaders()), null,
.map(creator -> creator.create(makeCaseInsensitive(message.getHeaders()),
null,
message.getTopicPath().orElse(null),
message.getAuthorizationContext().orElse(null),
receivingConnectionId))
receivingConnectionId,
null
))
.toArray(PlaceholderResolver[]::new)
);
}
Expand All @@ -151,16 +174,18 @@ public static ExpressionResolver forExternalMessage(final ExternalMessage messag
* @return the expression resolver.
* @since 1.2.0
*/
public static ExpressionResolver forOutboundSignal(final OutboundSignal.Mappable outboundSignal, final
ConnectionId sendingConnectionId) {
public static ExpressionResolver forOutboundSignal(final OutboundSignal.Mappable outboundSignal,
final ConnectionId sendingConnectionId) {

return PlaceholderFactory.newExpressionResolver(
RESOLVER_CREATORS.stream()
.map(creator -> creator.create(outboundSignal.getSource().getDittoHeaders(),
outboundSignal.getSource(),
PROTOCOL_ADAPTER.toTopicPath(outboundSignal.getSource()),
outboundSignal.getSource().getDittoHeaders().getAuthorizationContext(),
sendingConnectionId))
sendingConnectionId,
outboundSignal.getExtra().orElse(null)
))
.toArray(PlaceholderResolver[]::new)
);
}
Expand All @@ -180,9 +205,13 @@ public static ExpressionResolver forInbound(final ExternalMessage externalMessag
@Nullable final ConnectionId connectionId) {
return PlaceholderFactory.newExpressionResolver(
RESOLVER_CREATORS.stream()
.map(creator ->
creator.create(makeCaseInsensitive(externalMessage.getHeaders()), signal, topicPath,
authorizationContext, connectionId))
.map(creator -> creator.create(makeCaseInsensitive(externalMessage.getHeaders()),
signal,
topicPath,
authorizationContext,
connectionId,
null
))
.toArray(PlaceholderResolver[]::new)
);
}
Expand All @@ -200,8 +229,9 @@ private static DittoHeaders makeCaseInsensitive(final Map<String, String> extern
private interface ResolverDataExtractor<T> {

@Nullable
T extract(Map<String, String> inputHeaders, @Nullable Signal<?> signal, @Nullable TopicPath topicPath,
@Nullable AuthorizationContext authorizationContext, @Nullable final ConnectionId connectionId);
T extract(Map<String, String> headers, @Nullable Signal<?> signal, @Nullable TopicPath topicPath,
@Nullable AuthorizationContext authorizationContext, @Nullable ConnectionId connectionId,
@Nullable JsonObject extraFields);
}

/**
Expand All @@ -228,9 +258,10 @@ private static <T> ResolverCreator<T> of(final Placeholder<T> placeholder,

private PlaceholderResolver<T> create(final Map<String, String> inputHeaders, @Nullable final Signal<?> signal,
@Nullable final TopicPath topicPath, @Nullable final AuthorizationContext authorizationContext,
@Nullable ConnectionId connectionId) {
return PlaceholderFactory.newPlaceholderResolver(placeholder,
dataExtractor.extract(inputHeaders, signal, topicPath, authorizationContext, connectionId));
@Nullable final ConnectionId connectionId, @Nullable final JsonObject extraFields) {
return PlaceholderFactory.newPlaceholderResolver(placeholder, dataExtractor.extract(
inputHeaders, signal, topicPath, authorizationContext, connectionId, extraFields
));
}

private Placeholder<T> getPlaceholder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.eclipse.ditto.edge.service.placeholders.FeaturePlaceholder;
import org.eclipse.ditto.edge.service.placeholders.PolicyPlaceholder;
import org.eclipse.ditto.edge.service.placeholders.RequestPlaceholder;
import org.eclipse.ditto.edge.service.placeholders.ThingJsonPlaceholder;
import org.eclipse.ditto.edge.service.placeholders.ThingPlaceholder;
import org.eclipse.ditto.placeholders.TimePlaceholder;
import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder;
Expand All @@ -34,6 +35,13 @@ public static ThingPlaceholder newThingPlaceholder() {
return ThingPlaceholder.getInstance();
}

/**
* @return the singleton instance of {@link ThingJsonPlaceholder}
*/
public static ThingJsonPlaceholder newThingJsonPlaceholder() {
return ThingJsonPlaceholder.getInstance();
}

/**
* @return the singleton instance of {@link PolicyPlaceholder}
*/
Expand Down
Loading

0 comments on commit 6405ec8

Please sign in to comment.