diff --git a/connectivity/service/src/main/resources/connectivity.conf b/connectivity/service/src/main/resources/connectivity.conf index 2a9acb5510..789f96e434 100644 --- a/connectivity/service/src/main/resources/connectivity.conf +++ b/connectivity/service/src/main/resources/connectivity.conf @@ -6,14 +6,26 @@ ditto { database = ${?MONGO_DB_DATABASE} read-journal { + should-create-additional-snapshot-aggregation-index-pid-id = false + should-create-additional-snapshot-aggregation-index-pid-id = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID} + + should-create-additional-snapshot-aggregation-index-pid = false + should-create-additional-snapshot-aggregation-index-pid = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID} + hint-name-filterPidsThatDoesntContainTagInNewestEntry = null hint-name-filterPidsThatDoesntContainTagInNewestEntry = ${?MONGODB_READ_JOURNAL_HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY} hint-name-listLatestJournalEntries = null hint-name-listLatestJournalEntries = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES} - hint-name-listNewestActiveSnapshotsByBatch = "_id_" - hint-name-listNewestActiveSnapshotsByBatch = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH} + hint-name-listNewestActiveSnapshotsByBatchPidId = null + hint-name-listNewestActiveSnapshotsByBatchPidId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID} + + hint-name-listNewestActiveSnapshotsByBatchPid = null + hint-name-listNewestActiveSnapshotsByBatchPid = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID} + + hint-name-listNewestActiveSnapshotsByBatchId = null + hint-name-listNewestActiveSnapshotsByBatchId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID} } } diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/DefaultMongoReadJournalConfig.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/DefaultMongoReadJournalConfig.java index 3e82760e02..85b0172a40 100644 --- a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/DefaultMongoReadJournalConfig.java +++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/DefaultMongoReadJournalConfig.java @@ -32,17 +32,31 @@ public final class DefaultMongoReadJournalConfig implements MongoReadJournalConf private static final String CONFIG_PATH = "read-journal"; + private final boolean createAdditionalSnapshotAggregationIndexPidId; + private final boolean createAdditionalSnapshotAggregationIndexPid; @Nullable private final String hintNameFilterPidsThatDoesntContainTagInNewestEntry; @Nullable private final String hintNameListLatestJournalEntries; - @Nullable private final String listNewestActiveSnapshotsByBatch; + @Nullable private final String listNewestActiveSnapshotsByBatchPidId; + @Nullable private final String listNewestActiveSnapshotsByBatchPid; + @Nullable private final String listNewestActiveSnapshotsByBatchId; private DefaultMongoReadJournalConfig(final ScopedConfig config) { + createAdditionalSnapshotAggregationIndexPidId = config.getBoolean( + MongoReadJournalConfigValue.SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID.getConfigPath() + ); + createAdditionalSnapshotAggregationIndexPid = config.getBoolean( + MongoReadJournalConfigValue.SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID.getConfigPath() + ); hintNameFilterPidsThatDoesntContainTagInNewestEntry = getNullableString(config, MongoReadJournalConfigValue.HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY); hintNameListLatestJournalEntries = getNullableString(config, MongoReadJournalConfigValue.HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES); - listNewestActiveSnapshotsByBatch = getNullableString(config, - MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH); + listNewestActiveSnapshotsByBatchPidId = getNullableString(config, + MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID); + listNewestActiveSnapshotsByBatchPid = getNullableString(config, + MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID); + listNewestActiveSnapshotsByBatchId = getNullableString(config, + MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID); } /** @@ -59,7 +73,20 @@ public static DefaultMongoReadJournalConfig of(final Config config) { @Nullable private static String getNullableString(final Config config, final KnownConfigValue configValue) { - return config.getIsNull(configValue.getConfigPath()) ? null : config.getString(configValue.getConfigPath()); + return config.getIsNull(configValue.getConfigPath()) ? null : + Optional.of(config.getString(configValue.getConfigPath())) + .filter(s -> !s.equals("null")) + .orElse(null); + } + + @Override + public boolean shouldCreateAdditionalSnapshotAggregationIndexPidId() { + return createAdditionalSnapshotAggregationIndexPidId; + } + + @Override + public boolean shouldCreateAdditionalSnapshotAggregationIndexPid() { + return createAdditionalSnapshotAggregationIndexPid; } @Override @@ -73,8 +100,18 @@ public Optional<String> getIndexNameHintForListLatestJournalEntries() { } @Override - public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatch() { - return Optional.ofNullable(listNewestActiveSnapshotsByBatch); + public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPidId() { + return Optional.ofNullable(listNewestActiveSnapshotsByBatchPidId); + } + + @Override + public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPid() { + return Optional.ofNullable(listNewestActiveSnapshotsByBatchPid); + } + + @Override + public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchId() { + return Optional.ofNullable(listNewestActiveSnapshotsByBatchId); } @Override @@ -86,25 +123,33 @@ public boolean equals(final Object o) { return false; } final DefaultMongoReadJournalConfig that = (DefaultMongoReadJournalConfig) o; - return Objects.equals(hintNameFilterPidsThatDoesntContainTagInNewestEntry, - that.hintNameFilterPidsThatDoesntContainTagInNewestEntry) && + return createAdditionalSnapshotAggregationIndexPidId == that.createAdditionalSnapshotAggregationIndexPidId && + createAdditionalSnapshotAggregationIndexPid == that.createAdditionalSnapshotAggregationIndexPid && + Objects.equals(hintNameFilterPidsThatDoesntContainTagInNewestEntry, + that.hintNameFilterPidsThatDoesntContainTagInNewestEntry) && Objects.equals(hintNameListLatestJournalEntries, that.hintNameListLatestJournalEntries) && - Objects.equals(listNewestActiveSnapshotsByBatch, that.listNewestActiveSnapshotsByBatch); + Objects.equals(listNewestActiveSnapshotsByBatchPidId, that.listNewestActiveSnapshotsByBatchPidId); } @Override public int hashCode() { - return Objects.hash(hintNameFilterPidsThatDoesntContainTagInNewestEntry, hintNameListLatestJournalEntries, - listNewestActiveSnapshotsByBatch); + return Objects.hash(createAdditionalSnapshotAggregationIndexPidId, createAdditionalSnapshotAggregationIndexPid, + hintNameFilterPidsThatDoesntContainTagInNewestEntry, hintNameListLatestJournalEntries, + listNewestActiveSnapshotsByBatchPidId, listNewestActiveSnapshotsByBatchPid, + listNewestActiveSnapshotsByBatchId); } @Override public String toString() { return getClass().getSimpleName() + " [" + - "hintNameFilterPidsThatDoesntContainTagInNewestEntry=" + + "createAdditionalSnapshotAggregationIndexPidId=" + createAdditionalSnapshotAggregationIndexPidId + + ", createAdditionalSnapshotAggregationIndexPid=" + createAdditionalSnapshotAggregationIndexPid + + ", hintNameFilterPidsThatDoesntContainTagInNewestEntry=" + hintNameFilterPidsThatDoesntContainTagInNewestEntry + ", hintNameListLatestJournalEntries=" + hintNameListLatestJournalEntries + - ", listNewestActiveSnapshotsByBatch=" + listNewestActiveSnapshotsByBatch + + ", listNewestActiveSnapshotsByBatchPidId=" + listNewestActiveSnapshotsByBatchPidId + + ", listNewestActiveSnapshotsByBatchPid=" + listNewestActiveSnapshotsByBatchPid + + ", listNewestActiveSnapshotsByBatchId=" + listNewestActiveSnapshotsByBatchId + "]"; } diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/MongoReadJournalConfig.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/MongoReadJournalConfig.java index e26bd4a64a..b522e7b48e 100644 --- a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/MongoReadJournalConfig.java +++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/MongoReadJournalConfig.java @@ -25,6 +25,18 @@ @Immutable public interface MongoReadJournalConfig { + /** + * @return whether additional index for "pid" + "_id" should be created in order to speed up MongoReadJournal + * aggregation queries on the snapshot collection. + */ + boolean shouldCreateAdditionalSnapshotAggregationIndexPidId(); + + /** + * @return whether additional index for "pid" should be created in order to speed up MongoReadJournal + * aggregation queries on the snapshot collection. + */ + boolean shouldCreateAdditionalSnapshotAggregationIndexPid(); + /** * @return the optional hint name for aggregation done in {@code filterPidsThatDoesntContainTagInNewestEntry}. */ @@ -36,10 +48,22 @@ public interface MongoReadJournalConfig { Optional<String> getIndexNameHintForListLatestJournalEntries(); /** - * @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch}. + * @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} containing both + * "pid" and "_id" fields in first "$match". */ - Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatch(); + Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPidId(); + /** + * @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} only containing + * "pid" field in first "$match". + */ + Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPid(); + + /** + * @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} only containing + * "_id" field in first "$match". + */ + Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchId(); /** * An enumeration of the known config path expressions and their associated default values for @@ -47,6 +71,18 @@ public interface MongoReadJournalConfig { */ enum MongoReadJournalConfigValue implements KnownConfigValue { + /** + * Whether additional index for "pid" + "_id" should be created in order to speed up MongoReadJournal + * aggregation queries on the snapshot collection. + */ + SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID("should-create-additional-snapshot-aggregation-index-pid-id", false), + + /** + * Whether additional index for "pid" should be created in order to speed up MongoReadJournal aggregation + * queries on the snapshot collection. + */ + SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID("should-create-additional-snapshot-aggregation-index-pid", false), + /** * Hint name for aggregation done in {@code filterPidsThatDoesntContainTagInNewestEntry}. */ @@ -58,9 +94,19 @@ enum MongoReadJournalConfigValue implements KnownConfigValue { HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES("hint-name-listLatestJournalEntries", null), /** - * Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch}. + * Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchPidId}. + */ + HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID("hint-name-listNewestActiveSnapshotsByBatchPidId", null), + + /** + * Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchPid}. + */ + HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID("hint-name-listNewestActiveSnapshotsByBatchPid", null), + + /** + * Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchId}. */ - HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH("hint-name-listNewestActiveSnapshotsByBatch", null); + HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID("hint-name-listNewestActiveSnapshotsByBatchId", null); private final String path; private final Object defaultValue; diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java index 15930fea0b..25664bc9c1 100644 --- a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java +++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.stream.Collectors; @@ -166,6 +167,12 @@ public final class MongoReadJournal implements CurrentEventsByPersistenceIdQuery private static final Index TAG_PID_INDEX = IndexFactory.newInstance("ditto_tag_pid", List.of(J_TAGS, J_PROCESSOR_ID), false, true); + private static final Index SNAPS_PID_ID_INDEX = + IndexFactory.newInstance("snaps_pid_id_index", List.of(S_PROCESSOR_ID, S_ID), false, false); + + private static final Index SNAPS_PID_INDEX = + IndexFactory.newInstance("snaps_pid_index", List.of(S_PROCESSOR_ID), false, false); + private final String journalCollection; private final String snapsCollection; private final DittoMongoClient mongoClient; @@ -201,7 +208,8 @@ public static MongoReadJournal newInstance(final ActorSystem system) { final Config config = system.settings().config(); final MongoDbConfig mongoDbConfig = DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config)); - return newInstance(config, MongoClientWrapper.newInstance(mongoDbConfig), mongoDbConfig.getReadJournalConfig(), system); + return newInstance(config, MongoClientWrapper.newInstance(mongoDbConfig), mongoDbConfig.getReadJournalConfig(), + system); } /** @@ -240,6 +248,32 @@ public CompletionStage<Done> ensureTagPidIndex() { return indexInitializer.createNonExistingIndices(journalCollection, List.of(TAG_PID_INDEX)); } + /** + * Ensure a compound index exists for snapshot cleanup aggregation matching on "pid" and "_id". + * + * @return a future that completes after index creation completes or fails when index creation fails. + */ + public CompletionStage<Done> ensureSnapshotCollectionPidIdIndex() { + if (readJournalConfig.shouldCreateAdditionalSnapshotAggregationIndexPidId()) { + return indexInitializer.createNonExistingIndices(snapsCollection, List.of(SNAPS_PID_ID_INDEX)); + } else { + return CompletableFuture.completedFuture(Done.getInstance()); + } + } + + /** + * Ensure a compound index exists for snapshot cleanup aggregation matching on "pid" . + * + * @return a future that completes after index creation completes or fails when index creation fails. + */ + public CompletionStage<Done> ensureSnapshotCollectionPidIndex() { + if (readJournalConfig.shouldCreateAdditionalSnapshotAggregationIndexPid()) { + return indexInitializer.createNonExistingIndices(snapsCollection, List.of(SNAPS_PID_INDEX)); + } else { + return CompletableFuture.completedFuture(Done.getInstance()); + } + } + /** * Retrieve all unique PIDs in journals. Does its best not to create long-living cursors on the database by reading * {@code batchSize} events per query. @@ -349,6 +383,7 @@ private Source<String, NotUsed> filterPidsThatDoesntContainTagInNewestEntry(fina )); final AggregatePublisher<Document> hintedAggregate = readJournalConfig.getIndexNameHintForFilterPidsThatDoesntContainTagInNewestEntry() + .filter(hint -> !hint.equals("null")) .map(aggregate::hintString) .orElse(aggregate); return Source.fromPublisher(hintedAggregate) @@ -928,7 +963,8 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch( final List<Bson> pipeline = new ArrayList<>(5); // match stage - pipeline.add(Aggregates.match(snapshotFilter.toMongoFilter())); + final Bson matchFilter = snapshotFilter.toMongoFilter(); + pipeline.add(Aggregates.match(matchFilter)); // sort stage pipeline.add(Aggregates.sort(Sorts.orderBy(Sorts.ascending(S_PROCESSOR_ID), Sorts.descending(S_SN)))); @@ -947,8 +983,8 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch( final String items = "i"; pipeline.add(Aggregates.group( new Document("_id", new BsonNull()), - Accumulators.max(maxPid, "$"+ S_ID), - Accumulators.push(items,"$$ROOT"))); + Accumulators.max(maxPid, "$" + S_ID), + Accumulators.push(items, "$$ROOT"))); // redact stage - "$$PRUNE"s documents with "__lifecycle" = DELETED if includeDeleted=false // if includeDeleted=true keeps them using "$$DESCEND" @@ -963,13 +999,15 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch( ))); final AggregatePublisher<Document> aggregate = snapshotStore.aggregate(pipeline); - final AggregatePublisher<Document> hintedAggregate = - readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatch() - .map(aggregate::hintString) - .orElse(aggregate); + final Optional<String> indexHint = calculateIndexHint(matchFilter); + final AggregatePublisher<Document> hintedAggregate = indexHint + .filter(hint -> !hint.equals("null")) + .map(aggregate::hintString) + .orElse(aggregate); return Source.fromPublisher( - hintedAggregate - .batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!) + hintedAggregate + .batchSize(batchSize) + // use batchSize also for the cursor batchSize (16 by default bc of backpressure!) ) .flatMapConcat(document -> { final String theMaxPid = document.getString(maxPid); @@ -983,6 +1021,23 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch( }); } + private Optional<String> calculateIndexHint(final Bson matchFilter) { + final String matchJson = matchFilter.toBsonDocument().toJson(); + final boolean matchContainsPid = matchJson.contains("\"pid\":"); + final boolean matchContainsId = matchJson.contains("\"_id\":"); + final Optional<String> indexHint; + if (matchContainsPid && matchContainsId) { + indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchPidId(); + } else if (matchContainsPid) { + indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchPid(); + } else if (matchContainsId) { + indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchId(); + } else { + indexHint = Optional.empty(); + } + return indexHint; + } + private static Source<List<String>, NotUsed> listJournalEntryTags(final MongoCollection<Document> journal, final String pid) { diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/Cleanup.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/Cleanup.java index b6604ce4a2..19f49a61d9 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/Cleanup.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/Cleanup.java @@ -21,12 +21,12 @@ import java.util.function.Supplier; import java.util.stream.LongStream; -import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; - import org.apache.pekko.NotUsed; import org.apache.pekko.japi.Pair; import org.apache.pekko.stream.Materializer; import org.apache.pekko.stream.javadsl.Source; +import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; +import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; /** * An Pekko stream to handle background cleanup regulated by insert times. @@ -42,6 +42,7 @@ final class Cleanup { private final boolean deleteFinalDeletedSnapshot; Cleanup(final MongoReadJournal readJournal, + final ThreadSafeDittoLoggingAdapter logger, final Materializer materializer, final Supplier<Pair<Integer, Integer>> responsibilitySupplier, final Duration historyRetentionDuration, @@ -56,14 +57,22 @@ final class Cleanup { this.readBatchSize = readBatchSize; this.deleteBatchSize = deleteBatchSize; this.deleteFinalDeletedSnapshot = deleteFinalDeletedSnapshot; + + readJournal.ensureSnapshotCollectionPidIdIndex() + .thenCompose(done -> readJournal.ensureSnapshotCollectionPidIndex()) + .exceptionally(e -> { + logger.error(e, "Failed to create index for read journal snapshot aggregation queries"); + return null; + }); } static Cleanup of(final CleanupConfig config, final MongoReadJournal readJournal, + final ThreadSafeDittoLoggingAdapter logger, final Materializer materializer, final Supplier<Pair<Integer, Integer>> responsibilitySupplier) { - return new Cleanup(readJournal, materializer, responsibilitySupplier, + return new Cleanup(readJournal, logger, materializer, responsibilitySupplier, config.getHistoryRetentionDuration(), config.getReadsPerQuery(), config.getWritesPerCredit(), @@ -76,7 +85,8 @@ Source<Source<CleanupResult, NotUsed>, NotUsed> getCleanupStream(final String lo } private Source<SnapshotRevision, NotUsed> getSnapshotRevisions(final String lowerBound) { - return readJournal.getNewestSnapshotsAbove(lowerBound, readBatchSize, true, historyRetentionDuration, materializer) + return readJournal.getNewestSnapshotsAbove(lowerBound, readBatchSize, true, historyRetentionDuration, + materializer) .map(document -> new SnapshotRevision(document.getString(S_ID), document.getLong(S_SN), "DELETED".equals(document.getString(LIFECYCLE)))) diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor.java index 9502a1c301..e65331d8d0 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor.java @@ -17,26 +17,6 @@ import javax.annotation.Nullable; -import org.eclipse.ditto.base.api.common.ModifyConfig; -import org.eclipse.ditto.base.api.common.RetrieveConfig; -import org.eclipse.ditto.base.model.headers.DittoHeaders; -import org.eclipse.ditto.internal.utils.pekko.actors.ModifyConfigBehavior; -import org.eclipse.ditto.internal.utils.pekko.actors.RetrieveConfigBehavior; -import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; -import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; -import org.eclipse.ditto.internal.utils.health.RetrieveHealth; -import org.eclipse.ditto.internal.utils.health.RetrieveHealthResponse; -import org.eclipse.ditto.internal.utils.health.StatusDetailMessage; -import org.eclipse.ditto.internal.utils.health.StatusInfo; -import org.eclipse.ditto.internal.utils.metrics.DittoMetrics; -import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter; -import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; -import org.eclipse.ditto.json.JsonObject; -import org.eclipse.ditto.json.JsonValue; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - import org.apache.pekko.Done; import org.apache.pekko.actor.AbstractFSM; import org.apache.pekko.actor.ActorRef; @@ -53,6 +33,25 @@ import org.apache.pekko.stream.UniqueKillSwitch; import org.apache.pekko.stream.javadsl.Keep; import org.apache.pekko.stream.javadsl.Sink; +import org.eclipse.ditto.base.api.common.ModifyConfig; +import org.eclipse.ditto.base.api.common.RetrieveConfig; +import org.eclipse.ditto.base.model.headers.DittoHeaders; +import org.eclipse.ditto.internal.utils.health.RetrieveHealth; +import org.eclipse.ditto.internal.utils.health.RetrieveHealthResponse; +import org.eclipse.ditto.internal.utils.health.StatusDetailMessage; +import org.eclipse.ditto.internal.utils.health.StatusInfo; +import org.eclipse.ditto.internal.utils.metrics.DittoMetrics; +import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter; +import org.eclipse.ditto.internal.utils.pekko.actors.ModifyConfigBehavior; +import org.eclipse.ditto.internal.utils.pekko.actors.RetrieveConfigBehavior; +import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; +import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; +import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; +import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.json.JsonValue; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; /** * Actor to control persistence cleanup. @@ -110,7 +109,7 @@ private PersistenceCleanupActor(final CleanupConfig config, this.mongoReadJournal = mongoReadJournal; responsibilitySupplier = ClusterResponsibilitySupplier.of(cluster, myRole); this.config = config; - cleanup = Cleanup.of(config, mongoReadJournal, materializer, responsibilitySupplier); + cleanup = Cleanup.of(config, mongoReadJournal, logger, materializer, responsibilitySupplier); credits = Credits.of(config); } @@ -324,7 +323,7 @@ public Config getConfig() { @Override public Config setConfig(final Config config) { this.config = this.config.setAll(config); - cleanup = Cleanup.of(this.config, mongoReadJournal, materializer, responsibilitySupplier); + cleanup = Cleanup.of(this.config, mongoReadJournal, logger, materializer, responsibilitySupplier); credits = Credits.of(this.config); getSelf().tell(Control.SHUTDOWN, ActorRef.noSender()); diff --git a/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CleanupTest.java b/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CleanupTest.java index 2fd7ef4711..d04b32e744 100644 --- a/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CleanupTest.java +++ b/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CleanupTest.java @@ -24,16 +24,10 @@ import java.time.Duration; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -import org.bson.Document; -import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.mongodb.client.result.DeleteResult; - +import org.apache.pekko.Done; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.japi.Pair; import org.apache.pekko.stream.Materializer; @@ -41,6 +35,14 @@ import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.testkit.javadsl.TestKit; +import org.bson.Document; +import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; +import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.mongodb.client.result.DeleteResult; /** * Tests {@link Cleanup}. @@ -55,6 +57,8 @@ public final class CleanupTest { @Before public void init() { mongoReadJournal = mock(MongoReadJournal.class); + when(mongoReadJournal.ensureSnapshotCollectionPidIdIndex()) + .thenReturn(CompletableFuture.completedFuture(Done.getInstance())); materializer = SystemMaterializer.get(actorSystem).materializer(); } @@ -68,7 +72,8 @@ public void emptyStream() { when(mongoReadJournal.getNewestSnapshotsAbove(any(), anyInt(), eq(true), any(), any())) .thenReturn(Source.empty()); - final var underTest = new Cleanup(mongoReadJournal, materializer, () -> Pair.create(0, 1), + final var underTest = new Cleanup(mongoReadJournal, mock(ThreadSafeDittoLoggingAdapter.class), materializer, + () -> Pair.create(0, 1), Duration.ZERO, 1, 1, true); final var result = underTest.getCleanupStream("") .flatMapConcat(x -> x) @@ -96,7 +101,8 @@ public void deleteFinalDeletedSnapshot() { invocation.<Long>getArgument(1) * 1000L + invocation.<Long>getArgument(2) * 10L))) .when(mongoReadJournal).deleteSnapshots(any(), anyLong(), anyLong()); - final var underTest = new Cleanup(mongoReadJournal, materializer, () -> Pair.create(0, 1), + final var underTest = new Cleanup(mongoReadJournal, mock(ThreadSafeDittoLoggingAdapter.class), materializer, + () -> Pair.create(0, 1), Duration.ZERO, 1, 4, true); final var result = underTest.getCleanupStream("") @@ -130,7 +136,8 @@ public void excludeFinalDeletedSnapshot() { invocation.<Long>getArgument(1) * 1000L + invocation.<Long>getArgument(2) * 10L))) .when(mongoReadJournal).deleteSnapshots(any(), anyLong(), anyLong()); - final var underTest = new Cleanup(mongoReadJournal, materializer, () -> Pair.create(0, 1), + final var underTest = new Cleanup(mongoReadJournal, mock(ThreadSafeDittoLoggingAdapter.class), materializer, + () -> Pair.create(0, 1), Duration.ZERO, 1, 4, false); final var result = underTest.getCleanupStream("") @@ -172,7 +179,8 @@ public void ignorePidsNotResponsibleFor() { .when(mongoReadJournal).deleteSnapshots(any(), anyLong(), anyLong()); // WHEN: the instance is responsible for 1/3 of the 3 PIDs - final var underTest = new Cleanup(mongoReadJournal, materializer, () -> Pair.create(2, 3), + final var underTest = new Cleanup(mongoReadJournal, mock(ThreadSafeDittoLoggingAdapter.class), materializer, + () -> Pair.create(2, 3), Duration.ZERO, 1, 4, false); final var result = underTest.getCleanupStream("") diff --git a/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CreditsTest.java b/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CreditsTest.java index 47d1ab820d..c8e46e16c0 100644 --- a/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CreditsTest.java +++ b/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CreditsTest.java @@ -23,18 +23,12 @@ import java.time.Duration; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAccumulator; -import org.bson.Document; -import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.mongodb.client.result.DeleteResult; - +import org.apache.pekko.Done; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.event.Logging; import org.apache.pekko.japi.Pair; @@ -48,6 +42,14 @@ import org.apache.pekko.stream.testkit.javadsl.TestSink; import org.apache.pekko.stream.testkit.javadsl.TestSource; import org.apache.pekko.testkit.javadsl.TestKit; +import org.bson.Document; +import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; +import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.mongodb.client.result.DeleteResult; /** * Tests {@link Credits}. @@ -110,6 +112,8 @@ public void noElementRequestedWithoutCredit() { @Test public void onePersistenceWriteAllowedPerCredit() { final var mongoReadJournal = mock(MongoReadJournal.class); + when(mongoReadJournal.ensureSnapshotCollectionPidIdIndex()) + .thenReturn(CompletableFuture.completedFuture(Done.getInstance())); final var opsCounter = new AtomicInteger(0); when(mongoReadJournal.getNewestSnapshotsAbove(any(), anyInt(), eq(true), any(), any())) @@ -134,7 +138,8 @@ public void onePersistenceWriteAllowedPerCredit() { // mock timer permits 1 batch of credit, after which no credit is given out final var mockTimerResult = new AtomicLong(0L); doAnswer(inv -> mockTimerResult.getAndSet(1001L)).when(mockTimer).getThenReset(); - final var cleanup = new Cleanup(mongoReadJournal, materializer, () -> Pair.create(0, 1), + final var cleanup = new Cleanup(mongoReadJournal, mock(ThreadSafeDittoLoggingAdapter.class), materializer, + () -> Pair.create(0, 1), Duration.ZERO, 1, 4, true); final var underTest = new Credits(getFastCreditConfig(4), mockTimer); diff --git a/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActorTest.java b/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActorTest.java index 5830a36586..5c21e8b5f3 100644 --- a/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActorTest.java +++ b/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActorTest.java @@ -19,12 +19,28 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; +import org.apache.pekko.Done; +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.actor.FSM; +import org.apache.pekko.actor.Props; +import org.apache.pekko.event.Logging; +import org.apache.pekko.japi.Pair; +import org.apache.pekko.stream.Attributes; +import org.apache.pekko.stream.KillSwitches; +import org.apache.pekko.stream.javadsl.Keep; +import org.apache.pekko.stream.javadsl.Source; +import org.apache.pekko.stream.testkit.javadsl.TestSource; +import org.apache.pekko.testkit.javadsl.TestKit; import org.eclipse.ditto.base.api.common.ModifyConfig; import org.eclipse.ditto.base.api.common.ModifyConfigResponse; import org.eclipse.ditto.base.api.common.RetrieveConfig; @@ -45,21 +61,6 @@ import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigRenderOptions; -import org.apache.pekko.Done; -import org.apache.pekko.NotUsed; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.actor.FSM; -import org.apache.pekko.actor.Props; -import org.apache.pekko.event.Logging; -import org.apache.pekko.japi.Pair; -import org.apache.pekko.stream.Attributes; -import org.apache.pekko.stream.KillSwitches; -import org.apache.pekko.stream.javadsl.Keep; -import org.apache.pekko.stream.javadsl.Source; -import org.apache.pekko.stream.testkit.javadsl.TestSource; -import org.apache.pekko.testkit.javadsl.TestKit; - /** * Tests {@link PersistenceCleanupActor}. */ @@ -69,13 +70,18 @@ public final class PersistenceCleanupActorTest { ConfigFactory.load("test.conf")); private final AtomicReference<Source<Source<CleanupResult, NotUsed>, NotUsed>> sourceBox = new AtomicReference<>(Source.empty()); + + private MongoReadJournal mongoReadJournal; private Cleanup cleanup; private Credits credits; @Before public void init() { + mongoReadJournal = mock(MongoReadJournal.class); cleanup = mock(Cleanup.class); credits = mock(Credits.class); + when(mongoReadJournal.ensureSnapshotCollectionPidIdIndex()) + .thenReturn(CompletableFuture.completedFuture(Done.getInstance())); doAnswer(inv -> Source.empty()).when(cleanup).getCleanupStream(any()); doAnswer(inv -> sourceBox.get()).when(credits).regulate(any(), any()); } @@ -333,7 +339,7 @@ private void waitForResponse(final TestKit testKit, private Props testProps() { return Props.create(PersistenceCleanupActor.class, - () -> new PersistenceCleanupActor(cleanup, credits, mock(MongoReadJournal.class), + () -> new PersistenceCleanupActor(cleanup, credits, mongoReadJournal, () -> Pair.create(0, 1))); } diff --git a/policies/service/src/main/resources/policies.conf b/policies/service/src/main/resources/policies.conf index e28c8e7b60..275fe3f309 100755 --- a/policies/service/src/main/resources/policies.conf +++ b/policies/service/src/main/resources/policies.conf @@ -20,14 +20,26 @@ ditto { database = ${?MONGO_DB_DATABASE} read-journal { + should-create-additional-snapshot-aggregation-index-pid-id = false + should-create-additional-snapshot-aggregation-index-pid-id = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID} + + should-create-additional-snapshot-aggregation-index-pid = false + should-create-additional-snapshot-aggregation-index-pid = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID} + hint-name-filterPidsThatDoesntContainTagInNewestEntry = null hint-name-filterPidsThatDoesntContainTagInNewestEntry = ${?MONGODB_READ_JOURNAL_HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY} hint-name-listLatestJournalEntries = null hint-name-listLatestJournalEntries = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES} - hint-name-listNewestActiveSnapshotsByBatch = "_id_" - hint-name-listNewestActiveSnapshotsByBatch = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH} + hint-name-listNewestActiveSnapshotsByBatchPidId = null + hint-name-listNewestActiveSnapshotsByBatchPidId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID} + + hint-name-listNewestActiveSnapshotsByBatchPid = null + hint-name-listNewestActiveSnapshotsByBatchPid = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID} + + hint-name-listNewestActiveSnapshotsByBatchId = null + hint-name-listNewestActiveSnapshotsByBatchId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID} } } diff --git a/things/service/src/main/resources/things.conf b/things/service/src/main/resources/things.conf index 20a0d1d29a..71cac58bbb 100755 --- a/things/service/src/main/resources/things.conf +++ b/things/service/src/main/resources/things.conf @@ -26,14 +26,26 @@ ditto { database = ${?MONGO_DB_DATABASE} read-journal { + should-create-additional-snapshot-aggregation-index-pid-id = false + should-create-additional-snapshot-aggregation-index-pid-id = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID} + + should-create-additional-snapshot-aggregation-index-pid = false + should-create-additional-snapshot-aggregation-index-pid = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID} + hint-name-filterPidsThatDoesntContainTagInNewestEntry = null hint-name-filterPidsThatDoesntContainTagInNewestEntry = ${?MONGODB_READ_JOURNAL_HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY} hint-name-listLatestJournalEntries = null hint-name-listLatestJournalEntries = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES} - hint-name-listNewestActiveSnapshotsByBatch = "_id_" - hint-name-listNewestActiveSnapshotsByBatch = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH} + hint-name-listNewestActiveSnapshotsByBatchPidId = null + hint-name-listNewestActiveSnapshotsByBatchPidId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID} + + hint-name-listNewestActiveSnapshotsByBatchPid = null + hint-name-listNewestActiveSnapshotsByBatchPid = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID} + + hint-name-listNewestActiveSnapshotsByBatchId = null + hint-name-listNewestActiveSnapshotsByBatchId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID} } }