diff --git a/base/model/src/main/java/org/eclipse/ditto/base/model/headers/DittoHeaderDefinition.java b/base/model/src/main/java/org/eclipse/ditto/base/model/headers/DittoHeaderDefinition.java index 6313cf988d..882928eaab 100755 --- a/base/model/src/main/java/org/eclipse/ditto/base/model/headers/DittoHeaderDefinition.java +++ b/base/model/src/main/java/org/eclipse/ditto/base/model/headers/DittoHeaderDefinition.java @@ -527,6 +527,41 @@ public enum DittoHeaderDefinition implements HeaderDefinition { JsonObject.class, false, true, + HeaderValueValidators.getJsonObjectValidator()), + + /** + * Internal header containing the pre-defined configured {@code extraFields} as list of jsonPointers for the + * emitted thing event. + * + * @since 3.7.0 + */ + PRE_DEFINED_EXTRA_FIELDS("ditto-pre-defined-extra-fields", + JsonArray.class, + false, + false, + HeaderValueValidators.getJsonArrayValidator()), + + /** + * Internal header containing the pre-defined configured {@code extraFields} as keys and the allowed "read subjects" + * as array of stings - defining which "auth subjects" are allowed to read which pre-defined extra field. + * + * @since 3.7.0 + */ + PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT("ditto-pre-defined-extra-fields-read-grant", + JsonObject.class, + false, + false, + HeaderValueValidators.getJsonObjectValidator()), + + /** + * Internal header containing pre-defined {@code extraFields} as JSON object sent along for emitted thing event. + * + * @since 3.7.0 + */ + PRE_DEFINED_EXTRA_FIELDS_OBJECT("ditto-pre-defined-extra-fields-object", + JsonObject.class, + false, + false, HeaderValueValidators.getJsonObjectValidator()); /** diff --git a/base/model/src/test/java/org/eclipse/ditto/base/model/headers/ImmutableDittoHeadersTest.java b/base/model/src/test/java/org/eclipse/ditto/base/model/headers/ImmutableDittoHeadersTest.java index 5f8b115999..7a83049aae 100755 --- a/base/model/src/test/java/org/eclipse/ditto/base/model/headers/ImmutableDittoHeadersTest.java +++ b/base/model/src/test/java/org/eclipse/ditto/base/model/headers/ImmutableDittoHeadersTest.java @@ -137,6 +137,16 @@ public final class ImmutableDittoHeadersTest { .set(DittoHeaderDefinition.ORIGINATOR.getKey(), "foo:bar") .build(); + private static final JsonArray KNOWN_PRE_DEFINED_EXTRA_FIELDS = JsonArray.newBuilder() + .add("foo:bar:123") + .build(); + private static final JsonObject KNOWN_PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT = JsonObject.newBuilder() + .set("/definition", "known:subject") + .build(); + private static final JsonObject KNOWN_PRE_DEFINED_EXTRA_FIELDS_OBJECT = JsonObject.newBuilder() + .set("definition", "foo:bar:123") + .build(); + static { KNOWN_METADATA_HEADERS = MetadataHeaders.newInstance(); @@ -205,6 +215,12 @@ public void settingAllKnownHeadersWorksAsExpected() { .putHeader(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_REVISION)) .putHeader(DittoHeaderDefinition.AT_HISTORICAL_TIMESTAMP.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_TIMESTAMP)) .putHeader(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(), KNOWN_HISTORICAL_HEADERS.formatAsString()) + .putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS.formatAsString()) + .putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.formatAsString()) + .putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_OBJECT.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS_OBJECT.formatAsString()) .build(); assertThat(underTest).isEqualTo(expectedHeaderMap); @@ -535,6 +551,11 @@ public void toJsonReturnsExpected() { .set(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), KNOWN_AT_HISTORICAL_REVISION) .set(DittoHeaderDefinition.AT_HISTORICAL_TIMESTAMP.getKey(), KNOWN_AT_HISTORICAL_TIMESTAMP.toString()) .set(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(), KNOWN_HISTORICAL_HEADERS) + .set(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey(), KNOWN_PRE_DEFINED_EXTRA_FIELDS) + .set(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT) + .set(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_OBJECT.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS_OBJECT) .build(); final Map allKnownHeaders = createMapContainingAllKnownHeaders(); @@ -774,6 +795,12 @@ private static Map createMapContainingAllKnownHeaders() { result.put(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_REVISION)); result.put(DittoHeaderDefinition.AT_HISTORICAL_TIMESTAMP.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_TIMESTAMP)); result.put(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(), KNOWN_HISTORICAL_HEADERS.formatAsString()); + result.put(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS.formatAsString()); + result.put(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.formatAsString()); + result.put(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_OBJECT.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS_OBJECT.formatAsString()); return result; } diff --git a/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java index fc37e2586d..222b9bef00 100644 --- a/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java +++ b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java @@ -14,14 +14,17 @@ import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import javax.annotation.Nullable; @@ -38,8 +41,10 @@ import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger; import org.eclipse.ditto.internal.utils.tracing.DittoTracing; import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName; +import org.eclipse.ditto.json.JsonArray; import org.eclipse.ditto.json.JsonFactory; import org.eclipse.ditto.json.JsonFieldSelector; +import org.eclipse.ditto.json.JsonKey; import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.json.JsonObjectBuilder; import org.eclipse.ditto.json.JsonPointer; @@ -160,6 +165,16 @@ public CompletionStage retrievePartialThing(final ThingId thingId, (concernedSignal instanceof ThingEvent) && !(ProtocolAdapter.isLiveSignal(concernedSignal)) ? List.of((ThingEvent) concernedSignal) : List.of(); + final DittoHeaders signalHeaders = Optional.ofNullable(concernedSignal) + .map(Signal::getDittoHeaders) + .orElseGet(DittoHeaders::empty); + if (jsonFieldSelector != null && + signalHeaders.containsKey(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_OBJECT.getKey()) + ) { + return performPreDefinedExtraFieldsOptimization( + thingId, jsonFieldSelector, dittoHeaders, signalHeaders, thingEvents + ); + } // as second step only return what was originally requested as fields: final var cachingParameters = new CachingParameters(jsonFieldSelector, thingEvents, true, 0); @@ -199,6 +214,80 @@ public CompletionStage retrievePartialThing(final EntityId thingId, .thenApply(jsonObject -> applyJsonFieldSelector(jsonObject, jsonFieldSelector)); } + private CompletionStage performPreDefinedExtraFieldsOptimization(final ThingId thingId, + final JsonFieldSelector jsonFieldSelector, + final DittoHeaders dittoHeaders, + final DittoHeaders signalHeaders, + final List> thingEvents + ) { + final JsonArray configuredPredefinedExtraFields = + JsonArray.of(signalHeaders.get(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey())); + final Set allConfiguredPredefinedExtraFields = configuredPredefinedExtraFields.stream() + .filter(JsonValue::isString) + .map(JsonValue::asString) + .map(JsonPointer::of) + .collect(Collectors.toSet()); + + final JsonObject preDefinedExtraFields = + JsonObject.of(signalHeaders.get(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_OBJECT.getKey())); + final CompletionStage filteredPreDefinedExtraFieldsReadGranted = + filterPreDefinedExtraReadGrantedObject(jsonFieldSelector, dittoHeaders, signalHeaders, + preDefinedExtraFields); + + final boolean allExtraFieldsPresent = + allConfiguredPredefinedExtraFields.containsAll(jsonFieldSelector.getPointers()); + if (allExtraFieldsPresent) { + return filteredPreDefinedExtraFieldsReadGranted; + } else { + // optimization to only fetch extra fields which were not pre-defined + final List missingFieldsPointers = new ArrayList<>(jsonFieldSelector.getPointers()); + missingFieldsPointers.removeAll(allConfiguredPredefinedExtraFields); + final JsonFieldSelector missingFieldsSelector = JsonFactory.newFieldSelector(missingFieldsPointers); + final var cachingParameters = + new CachingParameters(missingFieldsSelector, thingEvents, true, 0); + + return doRetrievePartialThing(thingId, dittoHeaders, null, cachingParameters) + .thenCompose(jsonObject -> filteredPreDefinedExtraFieldsReadGranted + .thenApply(preDefinedObject -> + preDefinedObject.toBuilder() + .setAll(applyJsonFieldSelector(jsonObject, missingFieldsSelector)) + .build() + ) + ); + } + } + + private static CompletionStage filterPreDefinedExtraReadGrantedObject( + final JsonFieldSelector jsonFieldSelector, + final DittoHeaders dittoHeaders, final DittoHeaders signalHeaders, final JsonObject preDefinedExtraFields) { + final JsonObject preDefinedExtraFieldsReadGrant = JsonObject.of( + signalHeaders.get(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey()) + ); + final JsonFieldSelector grantedReadJsonFieldSelector = filterAskedForFieldSelectorToGrantedFields( + jsonFieldSelector, preDefinedExtraFieldsReadGrant, + dittoHeaders.getAuthorizationContext().getAuthorizationSubjectIds() + ); + return CompletableFuture.completedStage(preDefinedExtraFields.get(grantedReadJsonFieldSelector)); + } + + private static JsonFieldSelector filterAskedForFieldSelectorToGrantedFields( + final JsonFieldSelector jsonFieldSelector, + final JsonObject preDefinedExtraFieldsReadGrant, + final List authorizationSubjectIds) + { + final List allowedPointers = StreamSupport.stream(jsonFieldSelector.spliterator(), false) + .filter(pointer -> preDefinedExtraFieldsReadGrant.getValue(JsonKey.of(pointer.toString())) + .filter(JsonValue::isArray) + .map(JsonValue::asArray) + .filter(readGrantArray -> readGrantArray.stream() + .filter(JsonValue::isString) + .map(JsonValue::asString) + .anyMatch(authorizationSubjectIds::contains) + ).isPresent() + ).toList(); + return JsonFactory.newFieldSelector(allowedPointers); + } + protected CompletionStage doRetrievePartialThing(final EntityId thingId, final DittoHeaders dittoHeaders, @Nullable final DittoHeaders dittoHeadersNotAddedToCacheKey, diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/EventConfig.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/EventConfig.java index 436fdcc228..27779603be 100644 --- a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/EventConfig.java +++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/EventConfig.java @@ -36,7 +36,7 @@ public interface EventConfig { /** * An enumeration of the known config path expressions and their associated default values for - * {@code SnapshotConfig}. + * {@code EventConfig}. */ enum EventConfigValue implements KnownConfigValue { @@ -65,7 +65,6 @@ public Object getDefaultValue() { public String getConfigPath() { return path; } - - } + } } diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java index 3053b4156b..b03b6732a7 100755 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java @@ -585,8 +585,6 @@ private record PersistEventAsync< E extends EventsourcedEvent, S extends Jsonifiable.WithFieldSelectorAndPredicate>(E event, BiConsumer handler) {} - ; - /** * Persist an event, modify actor state by the event strategy, then invoke the handler. * diff --git a/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/config/EntityCreationConfig.java b/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/config/EntityCreationConfig.java index 5a3e7f4caf..834382fb09 100644 --- a/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/config/EntityCreationConfig.java +++ b/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/config/EntityCreationConfig.java @@ -21,7 +21,7 @@ import org.eclipse.ditto.internal.utils.config.KnownConfigValue; /** - * Provides configuration settings for Concierge entity creation behaviour. + * Provides configuration settings for entity creation behaviour. */ @Immutable public interface EntityCreationConfig { diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultPreDefinedExtraFieldsConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultPreDefinedExtraFieldsConfig.java new file mode 100644 index 0000000000..c1b6688f50 --- /dev/null +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultPreDefinedExtraFieldsConfig.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.things.service.common.config; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.regex.Pattern; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; + +import org.eclipse.ditto.base.model.common.LikeHelper; +import org.eclipse.ditto.internal.utils.config.ConfigWithFallback; +import org.eclipse.ditto.json.JsonFieldSelector; + +import com.typesafe.config.Config; + +/** + * This class implements {@link PreDefinedExtraFieldsConfig}. + */ +@Immutable +public final class DefaultPreDefinedExtraFieldsConfig implements PreDefinedExtraFieldsConfig { + + private final List namespacePatterns; + @Nullable private final String rqlCondition; + private final JsonFieldSelector extraFields; + + private DefaultPreDefinedExtraFieldsConfig(final ConfigWithFallback config) { + this.namespacePatterns = compile(List.copyOf(config.getStringList( + PreDefinedExtraFieldsConfig.ConfigValues.NAMESPACES.getConfigPath()) + )); + this.rqlCondition = config.getStringOrNull(ConfigValues.CONDITION); + final List configuredExtraFields = config.getStringList(ConfigValues.EXTRA_FIELDS.getConfigPath()); + this.extraFields = JsonFieldSelector.newInstance( + configuredExtraFields.getFirst(), + configuredExtraFields.subList(1, configuredExtraFields.size()).toArray(CharSequence[]::new) + ); + } + + /** + * Returns an instance of {@code CreationRestrictionConfig} based on the settings of the specified Config. + * + * @param config is supposed to provide the settings of the restriction config. + * @return the instance. + * @throws org.eclipse.ditto.internal.utils.config.DittoConfigError if {@code config} is invalid. + */ + public static DefaultPreDefinedExtraFieldsConfig of(final Config config) { + return new DefaultPreDefinedExtraFieldsConfig(ConfigWithFallback.newInstance(config, + PreDefinedExtraFieldsConfig.ConfigValues.values())); + } + + private static List compile(final List patterns) { + return patterns.stream() + .map(LikeHelper::convertToRegexSyntax) + .filter(Objects::nonNull) + .map(Pattern::compile) + .toList(); + } + + @Override + public List getNamespace() { + return namespacePatterns; + } + + @Override + public Optional getCondition() { + return Optional.ofNullable(rqlCondition); + } + + @Override + public JsonFieldSelector getExtraFields() { + return extraFields; + } + + @Override + public boolean equals(final Object o) { + if (!(o instanceof final DefaultPreDefinedExtraFieldsConfig that)) { + return false; + } + return Objects.equals(namespacePatterns, that.namespacePatterns) && + Objects.equals(rqlCondition, that.rqlCondition) && + Objects.equals(extraFields, that.extraFields); + } + + @Override + public int hashCode() { + return Objects.hash(namespacePatterns, rqlCondition, extraFields); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "[" + + "namespacePatterns=" + namespacePatterns + + ", rqlCondition='" + rqlCondition + '\'' + + ", extraFields=" + extraFields + + "]"; + } +} diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingConfig.java index 649e08d1ca..f8b5ec39d3 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingConfig.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingConfig.java @@ -23,9 +23,7 @@ import org.eclipse.ditto.internal.utils.config.ScopedConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.ActivityCheckConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultActivityCheckConfig; -import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultEventConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultSnapshotConfig; -import org.eclipse.ditto.internal.utils.persistence.mongo.config.EventConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig; import org.eclipse.ditto.internal.utils.persistentactors.cleanup.CleanupConfig; @@ -43,7 +41,7 @@ public final class DefaultThingConfig implements ThingConfig { private final SupervisorConfig supervisorConfig; private final ActivityCheckConfig activityCheckConfig; private final SnapshotConfig snapshotConfig; - private final EventConfig eventConfig; + private final ThingEventConfig eventConfig; private final CleanupConfig cleanupConfig; private DefaultThingConfig(final ScopedConfig scopedConfig) { @@ -51,7 +49,7 @@ private DefaultThingConfig(final ScopedConfig scopedConfig) { supervisorConfig = DefaultSupervisorConfig.of(scopedConfig); activityCheckConfig = DefaultActivityCheckConfig.of(scopedConfig); snapshotConfig = DefaultSnapshotConfig.of(scopedConfig); - eventConfig = DefaultEventConfig.of(scopedConfig); + eventConfig = DefaultThingEventConfig.of(scopedConfig); cleanupConfig = CleanupConfig.of(scopedConfig); } @@ -87,7 +85,7 @@ public CleanupConfig getCleanupConfig() { } @Override - public EventConfig getEventConfig() { + public ThingEventConfig getEventConfig() { return eventConfig; } diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingEventConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingEventConfig.java new file mode 100644 index 0000000000..0234970e7a --- /dev/null +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingEventConfig.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.things.service.common.config; + +import java.util.List; +import java.util.Objects; + +import org.eclipse.ditto.internal.utils.config.ConfigWithFallback; +import org.eclipse.ditto.internal.utils.config.ScopedConfig; +import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultEventConfig; + +import com.typesafe.config.Config; + +/** + * Default implementation of {@code ThingEventConfig}. + */ +public final class DefaultThingEventConfig implements ThingEventConfig { + + private static final String CONFIG_PATH = "event"; + + private final DefaultEventConfig defaultEventConfigDelegated; + private final List preDefinedExtraFieldsConfigs; + + private DefaultThingEventConfig(final DefaultEventConfig delegate, final ScopedConfig config) { + this.defaultEventConfigDelegated = delegate; + preDefinedExtraFieldsConfigs = + config.getObjectList(ThingEventConfigValue.PRE_DEFINED_EXTRA_FIELDS.getConfigPath()) + .stream() + .map(configObj -> DefaultPreDefinedExtraFieldsConfig.of(configObj.toConfig())) + .map(PreDefinedExtraFieldsConfig.class::cast) + .toList(); + } + + /** + * Returns an instance of the default event journal config based on the settings of the specified Config. + * + * @param config is supposed to provide the settings of the event journal config at {@value #CONFIG_PATH}. + * @return instance + * @throws org.eclipse.ditto.internal.utils.config.DittoConfigError if {@code config} is invalid. + */ + public static DefaultThingEventConfig of(final Config config) { + return new DefaultThingEventConfig(DefaultEventConfig.of(config), + ConfigWithFallback.newInstance(config, CONFIG_PATH, ThingEventConfigValue.values())); + } + + @Override + public List getHistoricalHeadersToPersist() { + return defaultEventConfigDelegated.getHistoricalHeadersToPersist(); + } + + @Override + public List getPredefinedExtraFieldsConfigs() { + return preDefinedExtraFieldsConfigs; + } + + @Override + public boolean equals(final Object o) { + if (!(o instanceof final DefaultThingEventConfig that)) { + return false; + } + return Objects.equals(defaultEventConfigDelegated, that.defaultEventConfigDelegated) && + Objects.equals(preDefinedExtraFieldsConfigs, that.preDefinedExtraFieldsConfigs); + } + + @Override + public int hashCode() { + return Objects.hash(defaultEventConfigDelegated, preDefinedExtraFieldsConfigs); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "[" + + "defaultEventConfigDelegated=" + defaultEventConfigDelegated + + ", preDefinedExtraFieldsConfigs=" + preDefinedExtraFieldsConfigs + + "]"; + } +} diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/PreDefinedExtraFieldsConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/PreDefinedExtraFieldsConfig.java new file mode 100644 index 0000000000..35040f95bd --- /dev/null +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/PreDefinedExtraFieldsConfig.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.things.service.common.config; + +import java.util.List; +import java.util.Optional; +import java.util.regex.Pattern; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; + +import org.eclipse.ditto.internal.utils.config.KnownConfigValue; +import org.eclipse.ditto.json.JsonFieldSelector; + +/** + * Provides a configuration entry for Thing event pre-defined {@code extraFields} injection. + */ +@Immutable +public interface PreDefinedExtraFieldsConfig { + + /** + * The list of namespace {@link Pattern}s this entry applies to. + * An empty list would match any. The pattern must match the full string. + * + * @return the list of values + */ + List getNamespace(); + + /** + * The optional RQL condition which - when evaluating to {@code true} - will apply sending the {@code extraFields}. + * Extra fields will not be injected when the condition evaluates to {@code false}. + * + * @return the optional RQL condition under which circumstances to inject extra fields. + */ + Optional getCondition(); + + /** + * The extra fields in form of {@link JsonFieldSelector} to send along all events in the matching namespaces + * whenever the optional condition matches. + * + * @return the extra fields to send along for thing events. + */ + JsonFieldSelector getExtraFields(); + + /** + * An enumeration of the known config path expressions and their associated default values for + * {@code PreDefinedExtraFieldsConfig}. + */ + enum ConfigValues implements KnownConfigValue { + /** + * Matching namespaces, supports wildcards. + */ + NAMESPACES("namespaces", List.of()), + + /** + * Optional RQL condition. + */ + CONDITION("condition", null), + + /** + * Matching auth subjects. + */ + EXTRA_FIELDS("extra-fields", List.of()); + + private final String path; + private final Object defaultValue; + + ConfigValues(final String thePath, @Nullable final Object theDefaultValue) { + path = thePath; + defaultValue = theDefaultValue; + } + + @Override + public String getConfigPath() { + return path; + } + + @Override + public Object getDefaultValue() { + return defaultValue; + } + + } +} diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingConfig.java index d9dc993651..5146c7ade8 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingConfig.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingConfig.java @@ -18,7 +18,6 @@ import org.eclipse.ditto.base.service.config.supervision.WithSupervisorConfig; import org.eclipse.ditto.internal.utils.config.KnownConfigValue; -import org.eclipse.ditto.internal.utils.persistence.mongo.config.EventConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.WithActivityCheckConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.WithSnapshotConfig; import org.eclipse.ditto.internal.utils.persistentactors.cleanup.WithCleanupConfig; @@ -35,7 +34,7 @@ public interface ThingConfig extends WithSupervisorConfig, WithActivityCheckConf * * @return the config. */ - EventConfig getEventConfig(); + ThingEventConfig getEventConfig(); /** * Get the timeout waiting for responses and acknowledgements during coordinated shutdown. diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingEventConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingEventConfig.java new file mode 100644 index 0000000000..737f7e5c3e --- /dev/null +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingEventConfig.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.things.service.common.config; + +import java.util.List; + +import org.eclipse.ditto.internal.utils.config.KnownConfigValue; +import org.eclipse.ditto.internal.utils.persistence.mongo.config.EventConfig; + +/** + * Extends {@link EventConfig} by providing ThingEvent specific additional configuration. + */ +public interface ThingEventConfig extends EventConfig { + + /** + * Contains pre-defined (configured) {@code extraFields} to send along all thing (change) events and thing messages. + * + * @return the pre-defined {@code extraFields} to send along. + */ + List getPredefinedExtraFieldsConfigs(); + + /** + * An enumeration of the known config path expressions and their associated default values for + * {@code ThingEventConfig}. + */ + enum ThingEventConfigValue implements KnownConfigValue { + + /** + * The pre-defined (configured) {@code extraFields} to send along all events and messages. + */ + PRE_DEFINED_EXTRA_FIELDS("pre-defined-extra-fields", List.of()); + + private final String path; + private final Object defaultValue; + + ThingEventConfigValue(final String thePath, final Object theDefaultValue) { + path = thePath; + defaultValue = theDefaultValue; + } + + @Override + public Object getDefaultValue() { + return defaultValue; + } + + @Override + public String getConfigPath() { + return path; + } + } +} diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java index 146e4f5c1e..56c03c8558 100755 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java @@ -13,7 +13,14 @@ package org.eclipse.ditto.things.service.persistence.actors; import java.time.Instant; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.Predicate; +import java.util.stream.StreamSupport; import javax.annotation.Nullable; @@ -22,7 +29,10 @@ import org.apache.pekko.japi.pf.ReceiveBuilder; import org.apache.pekko.persistence.RecoveryCompleted; import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel; +import org.eclipse.ditto.base.model.auth.AuthorizationSubject; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder; +import org.eclipse.ditto.base.model.exceptions.InvalidRqlExpressionException; +import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.LiveChannelTimeoutStrategy; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; @@ -39,7 +49,26 @@ import org.eclipse.ditto.internal.utils.pubsub.DistributedPub; import org.eclipse.ditto.internal.utils.pubsub.extractors.AckExtractor; import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; +import org.eclipse.ditto.json.JsonArray; +import org.eclipse.ditto.json.JsonCollectors; import org.eclipse.ditto.json.JsonFactory; +import org.eclipse.ditto.json.JsonField; +import org.eclipse.ditto.json.JsonFieldSelector; +import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.json.JsonObjectBuilder; +import org.eclipse.ditto.json.JsonPointer; +import org.eclipse.ditto.json.JsonValue; +import org.eclipse.ditto.placeholders.HeadersPlaceholder; +import org.eclipse.ditto.placeholders.PlaceholderFactory; +import org.eclipse.ditto.placeholders.TimePlaceholder; +import org.eclipse.ditto.policies.api.Permission; +import org.eclipse.ditto.policies.enforcement.PolicyEnforcerProvider; +import org.eclipse.ditto.policies.model.Permissions; +import org.eclipse.ditto.policies.model.PoliciesResourceType; +import org.eclipse.ditto.policies.model.PolicyId; +import org.eclipse.ditto.rql.parser.RqlPredicateParser; +import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory; +import org.eclipse.ditto.rql.query.things.ThingPredicateVisitor; import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThing; import org.eclipse.ditto.things.model.Thing; import org.eclipse.ditto.things.model.ThingBuilder; @@ -54,6 +83,7 @@ import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse; import org.eclipse.ditto.things.model.signals.events.ThingEvent; import org.eclipse.ditto.things.service.common.config.DittoThingsConfig; +import org.eclipse.ditto.things.service.common.config.PreDefinedExtraFieldsConfig; import org.eclipse.ditto.things.service.common.config.ThingConfig; import org.eclipse.ditto.things.service.persistence.actors.strategies.commands.ThingCommandStrategies; import org.eclipse.ditto.things.service.persistence.actors.strategies.events.ThingEventStrategies; @@ -79,18 +109,23 @@ public final class ThingPersistenceActor */ static final String SNAPSHOT_PLUGIN_ID = "pekko-contrib-mongodb-persistence-things-snapshots"; + private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance(); + private static final HeadersPlaceholder HEADERS_PLACEHOLDER = PlaceholderFactory.newHeadersPlaceholder(); + private static final AckExtractor> ACK_EXTRACTOR = AckExtractor.of(ThingEvent::getEntityId, ThingEvent::getDittoHeaders); private final ThingConfig thingConfig; private final DistributedPub> distributedPub; @Nullable private final ActorRef searchShardRegionProxy; + private final PolicyEnforcerProvider policyEnforcerProvider; @SuppressWarnings("unused") private ThingPersistenceActor(final ThingId thingId, final MongoReadJournal mongoReadJournal, final DistributedPub> distributedPub, - @Nullable final ActorRef searchShardRegionProxy) { + @Nullable final ActorRef searchShardRegionProxy, + final PolicyEnforcerProvider policyEnforcerProvider) { super(thingId, mongoReadJournal); final DittoThingsConfig thingsConfig = DittoThingsConfig.of( @@ -99,6 +134,7 @@ private ThingPersistenceActor(final ThingId thingId, thingConfig = thingsConfig.getThingConfig(); this.distributedPub = distributedPub; this.searchShardRegionProxy = searchShardRegionProxy; + this.policyEnforcerProvider = policyEnforcerProvider; } /** @@ -107,15 +143,19 @@ private ThingPersistenceActor(final ThingId thingId, * @param thingId the Thing ID this Actor manages. * @param mongoReadJournal the ReadJournal used for gaining access to historical values of the thing. * @param distributedPub the distributed-pub access to publish thing events. + * @param searchShardRegionProxy the proxy of the shard region of search updaters. + * @param policyEnforcerProvider a provider for the used Policy {@code Enforcer} which "guards" the + * ThingPersistenceActor for applying access control. * @return the Pekko configuration Props object */ public static Props props(final ThingId thingId, final MongoReadJournal mongoReadJournal, final DistributedPub> distributedPub, - @Nullable final ActorRef searchShardRegionProxy) { - + @Nullable final ActorRef searchShardRegionProxy, + final PolicyEnforcerProvider policyEnforcerProvider + ) { return Props.create(ThingPersistenceActor.class, thingId, mongoReadJournal, distributedPub, - searchShardRegionProxy); + searchShardRegionProxy, policyEnforcerProvider); } @Override @@ -244,12 +284,138 @@ protected void recoveryCompleted(final RecoveryCompleted event) { @Override protected void publishEvent(@Nullable final Thing previousEntity, final ThingEvent event) { - distributedPub.publishWithAcks(event, entityId, ACK_EXTRACTOR, getSelf()); - if (searchShardRegionProxy != null) { - searchShardRegionProxy.tell(event, getSelf()); + enrichEventWithPredefinedExtraFields( + Optional.ofNullable(previousEntity).flatMap(Thing::getPolicyId).orElse(null), event) + .whenComplete((modifiedEvent, ex) -> { + final ThingEvent eventToPublish; + if (ex != null) { + eventToPublish = event; + } else { + eventToPublish = modifiedEvent; + } + distributedPub.publishWithAcks(eventToPublish, entityId, ACK_EXTRACTOR, getSelf()); + if (searchShardRegionProxy != null) { + searchShardRegionProxy.tell(eventToPublish, getSelf()); + } + }); + } + + private CompletionStage> enrichEventWithPredefinedExtraFields(@Nullable final PolicyId policyId, + final ThingEvent event + ) { + final List predefinedExtraFieldsConfigs = thingConfig.getEventConfig() + .getPredefinedExtraFieldsConfigs(); + if (null != entity && !predefinedExtraFieldsConfigs.isEmpty()) { + final List matchingPreDefinedFieldsConfigs = + predefinedExtraFieldsConfigs.stream() + .filter(conf -> conf + .getNamespace().stream() + .anyMatch(pattern -> pattern.matcher(entityId.getNamespace()).matches()) + ) + .filter(applyPredefinedExtraFieldsCondition(event)) + .toList(); + final JsonFieldSelector combinedPredefinedExtraFields = matchingPreDefinedFieldsConfigs.stream() + .map(PreDefinedExtraFieldsConfig::getExtraFields) + .reduce(JsonFactory.newFieldSelector(List.of()), (a, b) -> { + final Set combinedPointerSet = new LinkedHashSet<>(a.getPointers()); + combinedPointerSet.addAll(b.getPointers()); + return JsonFactory.newFieldSelector(combinedPointerSet); + }); + return buildPredefinedExtraFieldsHeaderReadGrantObject(policyId, combinedPredefinedExtraFields) + .thenApply(predefinedExtraFieldsHeaderReadGrantObject -> + event.setDittoHeaders(event.getDittoHeaders() + .toBuilder() + .putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey(), + buildPredefinedExtraFieldsHeaderList(combinedPredefinedExtraFields) + ) + .putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey(), + predefinedExtraFieldsHeaderReadGrantObject + ) + .putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_OBJECT.getKey(), + buildPredefinedExtraFieldsHeaderObject(entity, + combinedPredefinedExtraFields).toString() + ) + .build() + ) + ); + } else { + return CompletableFuture.completedStage(event); } } + private Predicate applyPredefinedExtraFieldsCondition(final ThingEvent event) { + return conf -> { + if (conf.getCondition().isEmpty()) { + return true; + } else { + final String rqlCondition = conf.getCondition().get(); + try { + final var criteria = QueryFilterCriteriaFactory + .modelBased(RqlPredicateParser.getInstance()) + .filterCriteria(rqlCondition, event.getDittoHeaders()); + + final var predicate = ThingPredicateVisitor.apply( + criteria, + PlaceholderFactory.newPlaceholderResolver(TIME_PLACEHOLDER, + new Object()), + PlaceholderFactory.newPlaceholderResolver(HEADERS_PLACEHOLDER, + event.getDittoHeaders()) + ); + return predicate.test(entity); + } catch (final InvalidRqlExpressionException e) { + log.warning(e, "Encountered invalid RQL condition <{}> for enriching " + + "predefined extra fields: <{}>", rqlCondition, e.getMessage()); + return true; + } + } + }; + } + + private static String buildPredefinedExtraFieldsHeaderList(final JsonFieldSelector preDefinedExtraFields) { + return StreamSupport.stream(preDefinedExtraFields.spliterator(), false) + .map(JsonPointer::toString) + .map(JsonValue::of) + .collect(JsonCollectors.valuesToArray()) + .toString(); + } + + private CompletionStage buildPredefinedExtraFieldsHeaderReadGrantObject(@Nullable final PolicyId policyId, + final JsonFieldSelector preDefinedExtraFields) + { + return policyEnforcerProvider.getPolicyEnforcer(policyId) + .thenApply(policyEnforcerOpt -> + policyEnforcerOpt.map(policyEnforcer -> + StreamSupport.stream(preDefinedExtraFields.spliterator(), false) + .map(pointer -> { + final JsonArray unrestrictedReadSubjects = policyEnforcer.getEnforcer() + .getSubjectsWithUnrestrictedPermission( + PoliciesResourceType.thingResource(pointer), + Permissions.newInstance(Permission.READ) + ) + .stream() + .map(AuthorizationSubject::getId) + .map(JsonValue::of) + .collect(JsonCollectors.valuesToArray()); + return JsonField.newInstance(pointer.toString(), unrestrictedReadSubjects); + }) + .collect(JsonCollectors.fieldsToObject()) + .toString() + ).orElse("{}") + ); + } + + private static JsonObject buildPredefinedExtraFieldsHeaderObject( + final Thing thing, + final JsonFieldSelector preDefinedExtraFields + ) { + final JsonObjectBuilder builder = JsonObject.newBuilder(); + final JsonObject thingJson = thing.toJson(); + preDefinedExtraFields.getPointers().forEach(pointer -> + thingJson.getValue(pointer).ifPresent(thingValue -> builder.set(pointer, thingValue)) + ); + return builder.build(); + } + @Override protected boolean shouldSendResponse(final DittoHeaders dittoHeaders) { return dittoHeaders.isResponseRequired() || diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorPropsFactory.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorPropsFactory.java index 95a982170c..66d11cff03 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorPropsFactory.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorPropsFactory.java @@ -14,14 +14,14 @@ import javax.annotation.Nullable; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.Props; import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; import org.eclipse.ditto.internal.utils.pubsub.DistributedPub; +import org.eclipse.ditto.policies.enforcement.PolicyEnforcerProvider; import org.eclipse.ditto.things.model.ThingId; import org.eclipse.ditto.things.model.signals.events.ThingEvent; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.Props; - /** * Factory of thing-persistence-actor. */ @@ -35,8 +35,10 @@ public interface ThingPersistenceActorPropsFactory { * @param mongoReadJournal the ReadJournal used for gaining access to historical values of the thing. * @param distributedPub the distributed-pub access. * @param searchShardRegionProxy the proxy of the shard region of search updaters. + * @param policyEnforcerProvider a provider for the used Policy {@code Enforcer} which "guards" the + * ThingPersistenceActor for applying access control. * @return Props of the thing-persistence-actor. */ Props props(ThingId thingId, MongoReadJournal mongoReadJournal, DistributedPub> distributedPub, - @Nullable ActorRef searchShardRegionProxy); + @Nullable ActorRef searchShardRegionProxy, PolicyEnforcerProvider policyEnforcerProvider); } diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java index 7ee918714a..e2e24e0ed6 100755 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java @@ -408,7 +408,7 @@ protected ThingId getEntityId() throws Exception { protected Props getPersistenceActorProps(final ThingId entityId) { assert thingPersistenceActorPropsFactory != null; return thingPersistenceActorPropsFactory.props(entityId, mongoReadJournal, distributedPubThingEventsForTwin, - searchShardRegionProxy); + searchShardRegionProxy, policyEnforcerProvider); } @Override diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/starter/DefaultThingPersistenceActorPropsFactory.java b/things/service/src/main/java/org/eclipse/ditto/things/service/starter/DefaultThingPersistenceActorPropsFactory.java index c025b424e6..e77d07f945 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/starter/DefaultThingPersistenceActorPropsFactory.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/starter/DefaultThingPersistenceActorPropsFactory.java @@ -17,17 +17,17 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.actor.Props; import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; import org.eclipse.ditto.internal.utils.pubsub.DistributedPub; +import org.eclipse.ditto.policies.enforcement.PolicyEnforcerProvider; import org.eclipse.ditto.things.model.ThingId; import org.eclipse.ditto.things.model.signals.events.ThingEvent; import org.eclipse.ditto.things.service.persistence.actors.ThingPersistenceActor; import org.eclipse.ditto.things.service.persistence.actors.ThingPersistenceActorPropsFactory; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.actor.Props; - /** * Factory for creating Props of {@link org.eclipse.ditto.things.service.persistence.actors.ThingPersistenceActor}. */ @@ -53,9 +53,10 @@ static DefaultThingPersistenceActorPropsFactory of(final ActorSystem actorSystem @Override public Props props(final ThingId thingId, final MongoReadJournal mongoReadJournal, - final DistributedPub> distributedPub, - @Nullable final ActorRef searchShardRegionProxy) { + final DistributedPub> distributedPub, @Nullable final ActorRef searchShardRegionProxy, + final PolicyEnforcerProvider policyEnforcerProvider) { argumentNotEmpty(thingId); - return ThingPersistenceActor.props(thingId, mongoReadJournal, distributedPub, searchShardRegionProxy); + return ThingPersistenceActor.props(thingId, mongoReadJournal, distributedPub, searchShardRegionProxy, + policyEnforcerProvider); } } diff --git a/things/service/src/main/resources/things-dev.conf b/things/service/src/main/resources/things-dev.conf index 352a4695d4..d9361980b2 100755 --- a/things/service/src/main/resources/things-dev.conf +++ b/things/service/src/main/resources/things-dev.conf @@ -7,6 +7,31 @@ ditto { metrics.prometheus.port = 9011 things { + thing { + event { + pre-defined-extra-fields = [ + { + namespaces = [ + "*" + ] + condition = "exists(definition)" + extra-fields = [ + "definition" + ] + }, + { + namespaces = [ + "org.eclipse.ditto.lamps" + ] + extra-fields = [ + "attributes/manufacturer", + "attributes/serial" + ] + } + ] + } + } + wot { tm-model-validation { enabled = true diff --git a/things/service/src/main/resources/things.conf b/things/service/src/main/resources/things.conf index e1ff362256..0555e84596 100755 --- a/things/service/src/main/resources/things.conf +++ b/things/service/src/main/resources/things.conf @@ -103,6 +103,27 @@ ditto { #"user-agent" # the HTTP user-agent header ] historical-headers-to-persist = ${?THING_EVENT_HISTORICAL_HEADERS_TO_PERSIST} + + pre-defined-extra-fields = [ + # { + # namespaces = [ + # "*" + # ] + # condition = "exists(definition)" + # extra-fields = [ + # "definition" + # ] + # }, + # { + # namespaces = [ + # "org.eclipse.ditto.lamps" + # ] + # extra-fields = [ + # "attributes/manufacturer", + # "attributes/serial" + # ] + # } + ] } supervisor { diff --git a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/PersistenceActorTestBase.java b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/PersistenceActorTestBase.java index a22c5ac664..b7e5a6838b 100755 --- a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/PersistenceActorTestBase.java +++ b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/PersistenceActorTestBase.java @@ -21,6 +21,13 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import javax.annotation.Nullable; + +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.actor.Props; +import org.apache.pekko.testkit.TestProbe; +import org.apache.pekko.testkit.javadsl.TestKit; import org.eclipse.ditto.base.model.auth.AuthorizationContext; import org.eclipse.ditto.base.model.auth.AuthorizationModelFactory; import org.eclipse.ditto.base.model.auth.AuthorizationSubject; @@ -62,12 +69,6 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.actor.Props; -import org.apache.pekko.testkit.TestProbe; -import org.apache.pekko.testkit.javadsl.TestKit; - /** * Base test class for testing persistence actors of the things persistence. */ @@ -228,12 +229,13 @@ protected ActorRef createPersistenceActorFor(final ThingId thingId) { protected ActorRef createPersistenceActorWithPubSubFor(final ThingId thingId) { return actorSystem.actorOf(getPropsOfThingPersistenceActor(thingId, Mockito.mock(MongoReadJournal.class), - getDistributedPub())); + getDistributedPub(), null, policyEnforcerProvider)); } private Props getPropsOfThingPersistenceActor(final ThingId thingId, final MongoReadJournal mongoReadJournal, - final DistributedPub> pub) { - return ThingPersistenceActor.props(thingId, mongoReadJournal, pub, null); + final DistributedPub> pub, @Nullable final ActorRef searchShardRegionProxy, + final PolicyEnforcerProvider policyEnforcerProvider) { + return ThingPersistenceActor.props(thingId, mongoReadJournal, pub, searchShardRegionProxy, policyEnforcerProvider); } protected ActorRef createSupervisorActorFor(final ThingId thingId) { @@ -261,8 +263,7 @@ public > Object wrapForPublicationWithAcks(final S messa } }, liveSignalPub, - (thingId1, mongoReadJournal, pub, searchShardRegionProxy) -> getPropsOfThingPersistenceActor( - thingId1, mongoReadJournal, pub), + this::getPropsOfThingPersistenceActor, null, policyEnforcerProvider, Mockito.mock(MongoReadJournal.class)); diff --git a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorTest.java b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorTest.java index 5d90eb62ed..b247b2c658 100755 --- a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorTest.java +++ b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorTest.java @@ -317,7 +317,7 @@ public void tryToCreateThingWithDifferentThingId() { final CreateThing createThing = CreateThing.of(thing, null, dittoHeadersV2); final Props props = ThingPersistenceActor.props(thingIdOfActor, Mockito.mock(MongoReadJournal.class), - getDistributedPub(), null); + getDistributedPub(), null, policyEnforcerProvider); final TestActorRef underTest = TestActorRef.create(actorSystem, props); final ThingPersistenceActor thingPersistenceActor = underTest.underlyingActor(); final PartialFunction receiveCommand = thingPersistenceActor.receiveCommand(); @@ -2043,7 +2043,7 @@ public void unavailableExpectedAndPolicyIsDeletedIfPersistenceActorFails() { ThingId thingId = getIdOrThrow(thing); ActorRef underTest = createSupervisorActorWithCustomPersistenceActor(thingId, - (thingId1, mongoReadJournal, distributedPub, searchShardRegionProxy) -> FailingInCtorActor.props()); + (thingId1, mongoReadJournal, distributedPub, searchShardRegionProxy, policyEnforcerProvider) -> FailingInCtorActor.props()); CreateThing createThing = CreateThing.of(thing, null, dittoHeaders); underTest.tell(createThing, getRef()); @@ -2083,7 +2083,7 @@ public void policyShouldNotBeDeletedOnThingRetrieveAndActorFail() { ThingId thingId = getIdOrThrow(thing); ActorRef underTest = createSupervisorActorWithCustomPersistenceActor(thingId, - (thingId1, mongoReadJournal, distributedPub, searchShardRegionProxy) -> FailingInCtorActor.props()); + (thingId1, mongoReadJournal, distributedPub, searchShardRegionProxy, policyEnforcerProvider) -> FailingInCtorActor.props()); RetrieveThing retrieveThing = RetrieveThing.of(thingId, dittoHeaders); underTest.tell(retrieveThing, getRef()); diff --git a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceOperationsActorIT.java b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceOperationsActorIT.java index bed781abeb..f76d8ef1bc 100644 --- a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceOperationsActorIT.java +++ b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceOperationsActorIT.java @@ -12,6 +12,12 @@ */ package org.eclipse.ditto.things.service.persistence.actors; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.actor.Props; import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.internal.utils.persistence.mongo.ops.eventsource.MongoEventSourceITAssertions; @@ -40,10 +46,6 @@ import com.typesafe.config.Config; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.actor.Props; - /** * Tests {@link ThingPersistenceOperationsActor} against a local MongoDB. */ @@ -59,6 +61,8 @@ public final class ThingPersistenceOperationsActorIT extends MongoEventSourceITA @Before public void setup() { policyEnforcerProvider = Mockito.mock(PolicyEnforcerProvider.class); + Mockito.when(policyEnforcerProvider.getPolicyEnforcer(Mockito.any())) + .thenReturn(CompletableFuture.completedStage(Optional.empty())); } @Test @@ -150,12 +154,7 @@ public > Object wrapForPublicationWithAcks(final S messa } }, liveSignalPub, - (thingId, mongoReadJournal, distributedPub, searchShardRegionProxy) -> ThingPersistenceActor.props( - thingId, - mongoReadJournal, - distributedPub, - null - ), + ThingPersistenceActor::props, null, policyEnforcerProvider, Mockito.mock(MongoReadJournal.class));