Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1931: fix that not all placeholders were supported in connection target filtering #1932

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.edge.service.headers.DittoHeadersValidator;
import org.eclipse.ditto.edge.service.placeholders.EntityIdPlaceholder;
import org.eclipse.ditto.edge.service.placeholders.FeaturePlaceholder;
import org.eclipse.ditto.edge.service.placeholders.ThingJsonPlaceholder;
import org.eclipse.ditto.edge.service.placeholders.ThingPlaceholder;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.pekko.controlflow.AbstractGraphActor;
Expand All @@ -99,6 +102,7 @@
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.HeadersPlaceholder;
import org.eclipse.ditto.placeholders.PipelineElement;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
import org.eclipse.ditto.placeholders.PlaceholderResolver;
Expand Down Expand Up @@ -138,9 +142,13 @@ public final class OutboundMappingProcessorActor

private static final DittoProtocolAdapter DITTO_PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();
private static final TopicPathPlaceholder TOPIC_PATH_PLACEHOLDER = TopicPathPlaceholder.getInstance();
private static final EntityIdPlaceholder ENTITY_ID_PLACEHOLDER = EntityIdPlaceholder.getInstance();
private static final ThingPlaceholder THING_PLACEHOLDER = ThingPlaceholder.getInstance();
private static final FeaturePlaceholder FEATURE_PLACEHOLDER = FeaturePlaceholder.getInstance();
private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance();
private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance();
private static final ThingJsonPlaceholder THING_JSON_PLACEHOLDER = ThingJsonPlaceholder.getInstance();
private static final HeadersPlaceholder HEADERS_PLACEHOLDER = PlaceholderFactory.newHeadersPlaceholder();

private final ActorRef clientActor;
private final Connection connection;
Expand Down Expand Up @@ -787,22 +795,33 @@ private Collection<OutboundSignalWithSender> applyFilter(final OutboundSignalWit
final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal);
final PlaceholderResolver<TopicPath> topicPathPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, topicPath);
final PlaceholderResolver<EntityId> entityIdPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(ENTITY_ID_PLACEHOLDER,
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
final PlaceholderResolver<EntityId> thingPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(THING_PLACEHOLDER,
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
final PlaceholderResolver<Signal<?>> featurePlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(FEATURE_PLACEHOLDER, signal);
final PlaceholderResolver<WithResource> resourcePlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(RESOURCE_PLACEHOLDER, signal);
final PlaceholderResolver<Object> timePlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(TIME_PLACEHOLDER, new Object());
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
final Criteria criteria = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(),
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver
).filterCriteria(filter.get(), dittoHeaders);
return outboundSignalWithExtra.getExtra()
.flatMap(extra -> ThingEventToThingConverter
.mergeThingWithExtraFields(signal, extraFields.get(), extra)
.filter(thing -> {
final PlaceholderResolver<Thing> thingPlaceholderResolver = PlaceholderFactory
final PlaceholderResolver<Thing> thingJsonPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(THING_JSON_PLACEHOLDER, thing);
return ThingPredicateVisitor.apply(criteria, topicPathPlaceholderResolver,
resourcePlaceholderResolver, timePlaceholderResolver, thingPlaceholderResolver)
entityIdPlaceholderResolver, thingPlaceholderResolver,
featurePlaceholderResolver, resourcePlaceholderResolver,
timePlaceholderResolver, thingJsonPlaceholderResolver)
.test(thing);
})
.map(thing -> outboundSignalWithExtra))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.namespaces.NamespaceReader;
Expand All @@ -39,6 +40,9 @@
import org.eclipse.ditto.connectivity.model.signals.announcements.ConnectivityAnnouncement;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitorRegistry;
import org.eclipse.ditto.edge.service.placeholders.EntityIdPlaceholder;
import org.eclipse.ditto.edge.service.placeholders.FeaturePlaceholder;
import org.eclipse.ditto.edge.service.placeholders.ThingPlaceholder;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
Expand Down Expand Up @@ -70,6 +74,9 @@ public final class SignalFilter {

private static final DittoProtocolAdapter DITTO_PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();
private static final TopicPathPlaceholder TOPIC_PATH_PLACEHOLDER = TopicPathPlaceholder.getInstance();
private static final EntityIdPlaceholder ENTITY_ID_PLACEHOLDER = EntityIdPlaceholder.getInstance();
private static final ThingPlaceholder THING_PLACEHOLDER = ThingPlaceholder.getInstance();
private static final FeaturePlaceholder FEATURE_PLACEHOLDER = FeaturePlaceholder.getInstance();
private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance();
private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance();

Expand Down Expand Up @@ -165,24 +172,35 @@ private static boolean matchesFilterBeforeEnrichment(final FilteredTopic filtere
final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal);
final PlaceholderResolver<TopicPath> topicPathPlaceholderResolver =
PlaceholderFactory.newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, topicPath);
final PlaceholderResolver<EntityId> entityIdPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(ENTITY_ID_PLACEHOLDER,
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
final PlaceholderResolver<EntityId> thingPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(THING_PLACEHOLDER,
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
final PlaceholderResolver<Signal<?>> featurePlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(FEATURE_PLACEHOLDER, signal);
final PlaceholderResolver<WithResource> resourcePlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(RESOURCE_PLACEHOLDER, signal);
final PlaceholderResolver<Object> timePlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(TIME_PLACEHOLDER, new Object());
final Criteria criteria = parseCriteria(filterOptional.get(), signal.getDittoHeaders(),
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver);
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver);
final Set<JsonPointer> extraFields = filteredTopic.getExtraFields()
.map(JsonFieldSelector::getPointers)
.orElse(Collections.emptySet());
if (signal instanceof ThingEvent) {
return ThingEventToThingConverter.thingEventToThing((ThingEvent<?>) signal)
.filter(thing -> Thing3ValuePredicateVisitor.couldBeTrue(criteria, extraFields, thing,
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver))
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver))
.isPresent();
} else {
final Thing emptyThing = Thing.newBuilder().build();
return Thing3ValuePredicateVisitor.couldBeTrue(criteria, extraFields, emptyThing,
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver);
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver);
}
} else {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@

import javax.annotation.concurrent.Immutable;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.event.LoggingAdapter;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelInvalidException;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotUniqueException;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.connectivity.model.ClientCertificateCredentials;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException;
Expand All @@ -52,6 +53,10 @@
import org.eclipse.ditto.connectivity.service.config.mapping.MapperLimitsConfig;
import org.eclipse.ditto.connectivity.service.messaging.internal.ssl.SSLContextCreator;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.edge.service.placeholders.EntityIdPlaceholder;
import org.eclipse.ditto.edge.service.placeholders.FeaturePlaceholder;
import org.eclipse.ditto.edge.service.placeholders.ThingPlaceholder;
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
import org.eclipse.ditto.placeholders.TimePlaceholder;
Expand All @@ -60,9 +65,6 @@
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.event.LoggingAdapter;

/**
* Validate a connection according to its type.
*/
Expand All @@ -83,7 +85,8 @@ private ConnectionValidator(LoggingAdapter loggingAdapter,
.collect(Collectors.toMap(AbstractProtocolValidator::type, Function.identity()));
this.specMap = Collections.unmodifiableMap(theSpecMap);
queryFilterCriteriaFactory = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(),
TopicPathPlaceholder.getInstance(), ResourcePlaceholder.getInstance(), TimePlaceholder.getInstance());
TopicPathPlaceholder.getInstance(), ResourcePlaceholder.getInstance(), TimePlaceholder.getInstance(),
EntityIdPlaceholder.getInstance(), ThingPlaceholder.getInstance(), FeaturePlaceholder.getInstance());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyThing;
import org.eclipse.ditto.things.model.signals.events.FeatureDesiredPropertiesModified;
import org.eclipse.ditto.things.model.signals.events.FeaturePropertiesModified;
import org.eclipse.ditto.things.model.signals.events.ThingModified;
import org.eclipse.ditto.things.model.signals.events.ThingModifiedEvent;
import org.mockito.Mockito;
Expand Down Expand Up @@ -331,6 +332,8 @@ public static final class Things {
public static final class Feature {

public static final String FEATURE_ID = "Feature";
public static final FeatureProperties FEATURE_PROPERTIES = FeatureProperties.newBuilder()
.set("property", "test").build();
public static final FeatureProperties FEATURE_DESIRED_PROPERTIES = FeatureProperties.newBuilder()
.set("property", "test").build();

Expand Down Expand Up @@ -996,8 +999,14 @@ public static ThingModifiedEvent<?> thingModified(final Collection<Authorization
TestConstants.INSTANT, dittoHeaders, TestConstants.METADATA);
}

public static ThingModifiedEvent<?> featurePropertiesModified(Collection<AuthorizationSubject> readSubjects) {
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder().readGrantedSubjects(readSubjects).build();
return FeaturePropertiesModified.of(Things.THING_ID, Feature.FEATURE_ID,
Feature.FEATURE_PROPERTIES, 1, null, dittoHeaders, null);
}

public static ThingModifiedEvent<?> featureDesiredPropertiesModified(Collection<AuthorizationSubject> readSubjects) {
DittoHeaders dittoHeaders = DittoHeaders.newBuilder().readGrantedSubjects(readSubjects).build();
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder().readGrantedSubjects(readSubjects).build();
return FeatureDesiredPropertiesModified.of(Things.THING_ID, Feature.FEATURE_ID,
Feature.FEATURE_DESIRED_PROPERTIES, 1, null, dittoHeaders, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,16 +313,36 @@ public void applySignalFilterForMessagesWithExtraFieldsAndRqlFilter() {
*/
@Test
public void applySignalFilterOnFeatureDesiredPropertiesModified() {
Target target = ConnectivityModelFactory.newTargetBuilder().address("address")
final Target target = ConnectivityModelFactory.newTargetBuilder().address("address")
.authorizationContext(newAuthContext(DittoAuthorizationContextType.UNSPECIFIED, AUTHORIZED))
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(TWIN_EVENTS)
.withFilter("like(resource:path,'/features/" + TestConstants.Feature.FEATURE_ID + "*')")
.build()).build();
Connection connection = TestConstants.createConnection(CONNECTION_ID, target);
SignalFilter signalFilter = new SignalFilter(connection, connectionMonitorRegistry);
Signal<?> signal = TestConstants.featureDesiredPropertiesModified(Collections.singletonList(AUTHORIZED));
final Connection connection = TestConstants.createConnection(CONNECTION_ID, target);
final SignalFilter signalFilter = new SignalFilter(connection, connectionMonitorRegistry);
final Signal<?> signal = TestConstants.featureDesiredPropertiesModified(Collections.singletonList(AUTHORIZED));

final List<Target> filteredTargets = signalFilter.filter(signal);
Assertions.assertThat(filteredTargets).hasSize(1).contains(target);
}

/**
* Test that target filtering works using feature:id placeholder
*/
@Test
public void applySignalFilterWithFeatureIdPlaceholder() {
Target target = ConnectivityModelFactory.newTargetBuilder().address("address")
.authorizationContext(newAuthContext(DittoAuthorizationContextType.UNSPECIFIED, AUTHORIZED))
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(TWIN_EVENTS)
.withFilter("eq(feature:id,'Feature')")
.build()
)
.build();
final Connection connection = TestConstants.createConnection(CONNECTION_ID, target);
final SignalFilter signalFilter = new SignalFilter(connection, connectionMonitorRegistry);
final Signal<?> signal = TestConstants.featurePropertiesModified(Collections.singletonList(AUTHORIZED));

List<Target> filteredTargets = signalFilter.filter(signal);
final List<Target> filteredTargets = signalFilter.filter(signal);
Assertions.assertThat(filteredTargets).hasSize(1).contains(target);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.http.javadsl.model.Uri;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelInvalidException;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotUniqueException;
Expand Down Expand Up @@ -75,11 +79,6 @@
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.http.javadsl.model.Uri;
import org.apache.pekko.testkit.javadsl.TestKit;

/**
* Tests {@link ConnectionValidator}.
*/
Expand Down Expand Up @@ -459,6 +458,22 @@ private static Connection createConnection(final ConnectionId connectionId) {
.build();
}

@Test
public void acceptValidConnectionWithValidTargetFilterContainingPlaceholders() {
final List<Target> targetWithValidFilter = singletonList(
ConnectivityModelFactory.newTargetBuilder(TestConstants.Targets.TWIN_TARGET)
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS)
.withFilter("and(exists(feature:id),eq(thing:namespace,'org.eclipse.ditto'))")
.build())
.build());
final Connection connection = createConnection(CONNECTION_ID)
.toBuilder()
.setTargets(targetWithValidFilter)
.build();
final ConnectionValidator underTest = getConnectionValidator();
underTest.validate(connection, DittoHeaders.empty(), actorSystem);
}

@Test
public void acceptValidConnectionWithValidNumberPayloadMapping() {
final Connection connection = createConnection(CONNECTION_ID)
Expand Down
Loading
Loading