diff --git a/connectivity/service/src/main/resources/connectivity.conf b/connectivity/service/src/main/resources/connectivity.conf index 2a9acb5510..9da500eea2 100644 --- a/connectivity/service/src/main/resources/connectivity.conf +++ b/connectivity/service/src/main/resources/connectivity.conf @@ -6,14 +6,23 @@ ditto { database = ${?MONGO_DB_DATABASE} read-journal { + should-create-additional-snapshot-aggregation-indexes = true + should-create-additional-snapshot-aggregation-indexes = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEXES} + hint-name-filterPidsThatDoesntContainTagInNewestEntry = null hint-name-filterPidsThatDoesntContainTagInNewestEntry = ${?MONGODB_READ_JOURNAL_HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY} hint-name-listLatestJournalEntries = null hint-name-listLatestJournalEntries = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES} - hint-name-listNewestActiveSnapshotsByBatch = "_id_" - hint-name-listNewestActiveSnapshotsByBatch = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH} + hint-name-listNewestActiveSnapshotsByBatchPidId = null + hint-name-listNewestActiveSnapshotsByBatchPidId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID} + + hint-name-listNewestActiveSnapshotsByBatchPid = null + hint-name-listNewestActiveSnapshotsByBatchPid = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID} + + hint-name-listNewestActiveSnapshotsByBatchId = null + hint-name-listNewestActiveSnapshotsByBatchId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID} } } diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/DefaultMongoReadJournalConfig.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/DefaultMongoReadJournalConfig.java index 3e82760e02..b2d9998bdc 100644 --- a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/DefaultMongoReadJournalConfig.java +++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/DefaultMongoReadJournalConfig.java @@ -32,17 +32,27 @@ public final class DefaultMongoReadJournalConfig implements MongoReadJournalConf private static final String CONFIG_PATH = "read-journal"; + private final boolean createAdditionalSnapshotAggregationIndexes; @Nullable private final String hintNameFilterPidsThatDoesntContainTagInNewestEntry; @Nullable private final String hintNameListLatestJournalEntries; - @Nullable private final String listNewestActiveSnapshotsByBatch; + @Nullable private final String listNewestActiveSnapshotsByBatchPidId; + @Nullable private final String listNewestActiveSnapshotsByBatchPid; + @Nullable private final String listNewestActiveSnapshotsByBatchId; private DefaultMongoReadJournalConfig(final ScopedConfig config) { + createAdditionalSnapshotAggregationIndexes = config.getBoolean( + MongoReadJournalConfigValue.SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEXES.getConfigPath() + ); hintNameFilterPidsThatDoesntContainTagInNewestEntry = getNullableString(config, MongoReadJournalConfigValue.HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY); hintNameListLatestJournalEntries = getNullableString(config, MongoReadJournalConfigValue.HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES); - listNewestActiveSnapshotsByBatch = getNullableString(config, - MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH); + listNewestActiveSnapshotsByBatchPidId = getNullableString(config, + MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID); + listNewestActiveSnapshotsByBatchPid = getNullableString(config, + MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID); + listNewestActiveSnapshotsByBatchId = getNullableString(config, + MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID); } /** @@ -59,7 +69,15 @@ public static DefaultMongoReadJournalConfig of(final Config config) { @Nullable private static String getNullableString(final Config config, final KnownConfigValue configValue) { - return config.getIsNull(configValue.getConfigPath()) ? null : config.getString(configValue.getConfigPath()); + return config.getIsNull(configValue.getConfigPath()) ? null : + Optional.of(config.getString(configValue.getConfigPath())) + .filter(s -> !s.equals("null")) + .orElse(null); + } + + @Override + public boolean shouldCreateAdditionalSnapshotAggregationIndexes() { + return createAdditionalSnapshotAggregationIndexes; } @Override @@ -73,8 +91,18 @@ public Optional getIndexNameHintForListLatestJournalEntries() { } @Override - public Optional getIndexNameHintForListNewestActiveSnapshotsByBatch() { - return Optional.ofNullable(listNewestActiveSnapshotsByBatch); + public Optional getIndexNameHintForListNewestActiveSnapshotsByBatchPidId() { + return Optional.ofNullable(listNewestActiveSnapshotsByBatchPidId); + } + + @Override + public Optional getIndexNameHintForListNewestActiveSnapshotsByBatchPid() { + return Optional.ofNullable(listNewestActiveSnapshotsByBatchPid); + } + + @Override + public Optional getIndexNameHintForListNewestActiveSnapshotsByBatchId() { + return Optional.ofNullable(listNewestActiveSnapshotsByBatchId); } @Override @@ -86,25 +114,31 @@ public boolean equals(final Object o) { return false; } final DefaultMongoReadJournalConfig that = (DefaultMongoReadJournalConfig) o; - return Objects.equals(hintNameFilterPidsThatDoesntContainTagInNewestEntry, - that.hintNameFilterPidsThatDoesntContainTagInNewestEntry) && + return createAdditionalSnapshotAggregationIndexes == that.createAdditionalSnapshotAggregationIndexes && + Objects.equals(hintNameFilterPidsThatDoesntContainTagInNewestEntry, + that.hintNameFilterPidsThatDoesntContainTagInNewestEntry) && Objects.equals(hintNameListLatestJournalEntries, that.hintNameListLatestJournalEntries) && - Objects.equals(listNewestActiveSnapshotsByBatch, that.listNewestActiveSnapshotsByBatch); + Objects.equals(listNewestActiveSnapshotsByBatchPidId, that.listNewestActiveSnapshotsByBatchPidId); } @Override public int hashCode() { - return Objects.hash(hintNameFilterPidsThatDoesntContainTagInNewestEntry, hintNameListLatestJournalEntries, - listNewestActiveSnapshotsByBatch); + return Objects.hash(createAdditionalSnapshotAggregationIndexes, + hintNameFilterPidsThatDoesntContainTagInNewestEntry, hintNameListLatestJournalEntries, + listNewestActiveSnapshotsByBatchPidId, listNewestActiveSnapshotsByBatchPid, + listNewestActiveSnapshotsByBatchId); } @Override public String toString() { return getClass().getSimpleName() + " [" + - "hintNameFilterPidsThatDoesntContainTagInNewestEntry=" + + "createAdditionalSnapshotAggregationIndexes=" + createAdditionalSnapshotAggregationIndexes + + ", hintNameFilterPidsThatDoesntContainTagInNewestEntry=" + hintNameFilterPidsThatDoesntContainTagInNewestEntry + ", hintNameListLatestJournalEntries=" + hintNameListLatestJournalEntries + - ", listNewestActiveSnapshotsByBatch=" + listNewestActiveSnapshotsByBatch + + ", listNewestActiveSnapshotsByBatchPidId=" + listNewestActiveSnapshotsByBatchPidId + + ", listNewestActiveSnapshotsByBatchPid=" + listNewestActiveSnapshotsByBatchPid + + ", listNewestActiveSnapshotsByBatchId=" + listNewestActiveSnapshotsByBatchId + "]"; } diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/MongoReadJournalConfig.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/MongoReadJournalConfig.java index e26bd4a64a..b429a7c7a5 100644 --- a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/MongoReadJournalConfig.java +++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/MongoReadJournalConfig.java @@ -25,6 +25,12 @@ @Immutable public interface MongoReadJournalConfig { + /** + * @return whether additional indexes should be created in order to speed up MongoReadJournal aggregation queries + * on the snapshot collection. + */ + boolean shouldCreateAdditionalSnapshotAggregationIndexes(); + /** * @return the optional hint name for aggregation done in {@code filterPidsThatDoesntContainTagInNewestEntry}. */ @@ -36,10 +42,22 @@ public interface MongoReadJournalConfig { Optional getIndexNameHintForListLatestJournalEntries(); /** - * @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch}. + * @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} containing both + * "pid" and "_id" fields in first "$match". */ - Optional getIndexNameHintForListNewestActiveSnapshotsByBatch(); + Optional getIndexNameHintForListNewestActiveSnapshotsByBatchPidId(); + /** + * @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} only containing + * "pid" field in first "$match". + */ + Optional getIndexNameHintForListNewestActiveSnapshotsByBatchPid(); + + /** + * @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} only containing + * "_id" field in first "$match". + */ + Optional getIndexNameHintForListNewestActiveSnapshotsByBatchId(); /** * An enumeration of the known config path expressions and their associated default values for @@ -47,6 +65,12 @@ public interface MongoReadJournalConfig { */ enum MongoReadJournalConfigValue implements KnownConfigValue { + /** + * Whether additional indexes should be created in order to speed up MongoReadJournal aggregation queries + * on the snapshot collection. + */ + SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEXES("should-create-additional-snapshot-aggregation-indexes", true), + /** * Hint name for aggregation done in {@code filterPidsThatDoesntContainTagInNewestEntry}. */ @@ -58,9 +82,19 @@ enum MongoReadJournalConfigValue implements KnownConfigValue { HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES("hint-name-listLatestJournalEntries", null), /** - * Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch}. + * Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchPidId}. + */ + HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID("hint-name-listNewestActiveSnapshotsByBatchPidId", null), + + /** + * Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchPid}. + */ + HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID("hint-name-listNewestActiveSnapshotsByBatchPid", null), + + /** + * Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchId}. */ - HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH("hint-name-listNewestActiveSnapshotsByBatch", null); + HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID("hint-name-listNewestActiveSnapshotsByBatchId", null); private final String path; private final Object defaultValue; diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java index 15930fea0b..a9fd8ea1d5 100644 --- a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java +++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/streaming/MongoReadJournal.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.stream.Collectors; @@ -166,6 +167,12 @@ public final class MongoReadJournal implements CurrentEventsByPersistenceIdQuery private static final Index TAG_PID_INDEX = IndexFactory.newInstance("ditto_tag_pid", List.of(J_TAGS, J_PROCESSOR_ID), false, true); + private static final Index SNAPS_PID_ID_INDEX = + IndexFactory.newInstance("snaps_pid_id_index", List.of(S_PROCESSOR_ID, S_ID), false, false); + + private static final Index SNAPS_PID_INDEX = + IndexFactory.newInstance("snaps_pid_index", List.of(S_PROCESSOR_ID), false, false); + private final String journalCollection; private final String snapsCollection; private final DittoMongoClient mongoClient; @@ -201,7 +208,8 @@ public static MongoReadJournal newInstance(final ActorSystem system) { final Config config = system.settings().config(); final MongoDbConfig mongoDbConfig = DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config)); - return newInstance(config, MongoClientWrapper.newInstance(mongoDbConfig), mongoDbConfig.getReadJournalConfig(), system); + return newInstance(config, MongoClientWrapper.newInstance(mongoDbConfig), mongoDbConfig.getReadJournalConfig(), + system); } /** @@ -240,6 +248,20 @@ public CompletionStage ensureTagPidIndex() { return indexInitializer.createNonExistingIndices(journalCollection, List.of(TAG_PID_INDEX)); } + /** + * Ensure a compound index exists for snapshot cleanup aggregation matching on "pid" and "_id". + * + * @return a future that completes after index creation completes or fails when index creation fails. + */ + public CompletionStage ensureSnapshotCollectionPidIdIndex() { + if (readJournalConfig.shouldCreateAdditionalSnapshotAggregationIndexes()) { + return indexInitializer.createNonExistingIndices(snapsCollection, + List.of(SNAPS_PID_ID_INDEX, SNAPS_PID_INDEX)); + } else { + return CompletableFuture.completedFuture(Done.getInstance()); + } + } + /** * Retrieve all unique PIDs in journals. Does its best not to create long-living cursors on the database by reading * {@code batchSize} events per query. @@ -349,6 +371,7 @@ private Source filterPidsThatDoesntContainTagInNewestEntry(fina )); final AggregatePublisher hintedAggregate = readJournalConfig.getIndexNameHintForFilterPidsThatDoesntContainTagInNewestEntry() + .filter(hint -> !hint.equals("null")) .map(aggregate::hintString) .orElse(aggregate); return Source.fromPublisher(hintedAggregate) @@ -928,7 +951,8 @@ private Source listNewestActiveSnapshotsByBatch( final List pipeline = new ArrayList<>(5); // match stage - pipeline.add(Aggregates.match(snapshotFilter.toMongoFilter())); + final Bson matchFilter = snapshotFilter.toMongoFilter(); + pipeline.add(Aggregates.match(matchFilter)); // sort stage pipeline.add(Aggregates.sort(Sorts.orderBy(Sorts.ascending(S_PROCESSOR_ID), Sorts.descending(S_SN)))); @@ -947,8 +971,8 @@ private Source listNewestActiveSnapshotsByBatch( final String items = "i"; pipeline.add(Aggregates.group( new Document("_id", new BsonNull()), - Accumulators.max(maxPid, "$"+ S_ID), - Accumulators.push(items,"$$ROOT"))); + Accumulators.max(maxPid, "$" + S_ID), + Accumulators.push(items, "$$ROOT"))); // redact stage - "$$PRUNE"s documents with "__lifecycle" = DELETED if includeDeleted=false // if includeDeleted=true keeps them using "$$DESCEND" @@ -963,13 +987,15 @@ private Source listNewestActiveSnapshotsByBatch( ))); final AggregatePublisher aggregate = snapshotStore.aggregate(pipeline); - final AggregatePublisher hintedAggregate = - readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatch() - .map(aggregate::hintString) - .orElse(aggregate); + final Optional indexHint = calculateIndexHint(matchFilter); + final AggregatePublisher hintedAggregate = indexHint + .filter(hint -> !hint.equals("null")) + .map(aggregate::hintString) + .orElse(aggregate); return Source.fromPublisher( - hintedAggregate - .batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!) + hintedAggregate + .batchSize(batchSize) + // use batchSize also for the cursor batchSize (16 by default bc of backpressure!) ) .flatMapConcat(document -> { final String theMaxPid = document.getString(maxPid); @@ -983,6 +1009,23 @@ private Source listNewestActiveSnapshotsByBatch( }); } + private Optional calculateIndexHint(final Bson matchFilter) { + final String matchJson = matchFilter.toBsonDocument().toJson(); + final boolean matchContainsPid = matchJson.contains("\"pid\":"); + final boolean matchContainsId = matchJson.contains("\"_id\":"); + final Optional indexHint; + if (matchContainsPid && matchContainsId) { + indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchPidId(); + } else if (matchContainsPid) { + indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchPid(); + } else if (matchContainsId) { + indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchId(); + } else { + indexHint = Optional.empty(); + } + return indexHint; + } + private static Source, NotUsed> listJournalEntryTags(final MongoCollection journal, final String pid) { diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/Cleanup.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/Cleanup.java index b6604ce4a2..59e00fba21 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/Cleanup.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/Cleanup.java @@ -21,12 +21,12 @@ import java.util.function.Supplier; import java.util.stream.LongStream; -import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; - import org.apache.pekko.NotUsed; import org.apache.pekko.japi.Pair; import org.apache.pekko.stream.Materializer; import org.apache.pekko.stream.javadsl.Source; +import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; +import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; /** * An Pekko stream to handle background cleanup regulated by insert times. @@ -42,6 +42,7 @@ final class Cleanup { private final boolean deleteFinalDeletedSnapshot; Cleanup(final MongoReadJournal readJournal, + final ThreadSafeDittoLoggingAdapter logger, final Materializer materializer, final Supplier> responsibilitySupplier, final Duration historyRetentionDuration, @@ -56,14 +57,20 @@ final class Cleanup { this.readBatchSize = readBatchSize; this.deleteBatchSize = deleteBatchSize; this.deleteFinalDeletedSnapshot = deleteFinalDeletedSnapshot; + + readJournal.ensureSnapshotCollectionPidIdIndex().exceptionally(e -> { + logger.error(e, "Failed to create index for read journal snapshot aggregation queries"); + return null; + }); } static Cleanup of(final CleanupConfig config, final MongoReadJournal readJournal, + final ThreadSafeDittoLoggingAdapter logger, final Materializer materializer, final Supplier> responsibilitySupplier) { - return new Cleanup(readJournal, materializer, responsibilitySupplier, + return new Cleanup(readJournal, logger, materializer, responsibilitySupplier, config.getHistoryRetentionDuration(), config.getReadsPerQuery(), config.getWritesPerCredit(), diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor.java index 9502a1c301..e65331d8d0 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor.java @@ -17,26 +17,6 @@ import javax.annotation.Nullable; -import org.eclipse.ditto.base.api.common.ModifyConfig; -import org.eclipse.ditto.base.api.common.RetrieveConfig; -import org.eclipse.ditto.base.model.headers.DittoHeaders; -import org.eclipse.ditto.internal.utils.pekko.actors.ModifyConfigBehavior; -import org.eclipse.ditto.internal.utils.pekko.actors.RetrieveConfigBehavior; -import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; -import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; -import org.eclipse.ditto.internal.utils.health.RetrieveHealth; -import org.eclipse.ditto.internal.utils.health.RetrieveHealthResponse; -import org.eclipse.ditto.internal.utils.health.StatusDetailMessage; -import org.eclipse.ditto.internal.utils.health.StatusInfo; -import org.eclipse.ditto.internal.utils.metrics.DittoMetrics; -import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter; -import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; -import org.eclipse.ditto.json.JsonObject; -import org.eclipse.ditto.json.JsonValue; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - import org.apache.pekko.Done; import org.apache.pekko.actor.AbstractFSM; import org.apache.pekko.actor.ActorRef; @@ -53,6 +33,25 @@ import org.apache.pekko.stream.UniqueKillSwitch; import org.apache.pekko.stream.javadsl.Keep; import org.apache.pekko.stream.javadsl.Sink; +import org.eclipse.ditto.base.api.common.ModifyConfig; +import org.eclipse.ditto.base.api.common.RetrieveConfig; +import org.eclipse.ditto.base.model.headers.DittoHeaders; +import org.eclipse.ditto.internal.utils.health.RetrieveHealth; +import org.eclipse.ditto.internal.utils.health.RetrieveHealthResponse; +import org.eclipse.ditto.internal.utils.health.StatusDetailMessage; +import org.eclipse.ditto.internal.utils.health.StatusInfo; +import org.eclipse.ditto.internal.utils.metrics.DittoMetrics; +import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter; +import org.eclipse.ditto.internal.utils.pekko.actors.ModifyConfigBehavior; +import org.eclipse.ditto.internal.utils.pekko.actors.RetrieveConfigBehavior; +import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; +import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; +import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; +import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.json.JsonValue; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; /** * Actor to control persistence cleanup. @@ -110,7 +109,7 @@ private PersistenceCleanupActor(final CleanupConfig config, this.mongoReadJournal = mongoReadJournal; responsibilitySupplier = ClusterResponsibilitySupplier.of(cluster, myRole); this.config = config; - cleanup = Cleanup.of(config, mongoReadJournal, materializer, responsibilitySupplier); + cleanup = Cleanup.of(config, mongoReadJournal, logger, materializer, responsibilitySupplier); credits = Credits.of(config); } @@ -324,7 +323,7 @@ public Config getConfig() { @Override public Config setConfig(final Config config) { this.config = this.config.setAll(config); - cleanup = Cleanup.of(this.config, mongoReadJournal, materializer, responsibilitySupplier); + cleanup = Cleanup.of(this.config, mongoReadJournal, logger, materializer, responsibilitySupplier); credits = Credits.of(this.config); getSelf().tell(Control.SHUTDOWN, ActorRef.noSender()); diff --git a/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CleanupTest.java b/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CleanupTest.java index 2fd7ef4711..6de1e6bff5 100644 --- a/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CleanupTest.java +++ b/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CleanupTest.java @@ -26,14 +26,6 @@ import java.util.Optional; import java.util.stream.Collectors; -import org.bson.Document; -import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.mongodb.client.result.DeleteResult; - import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.japi.Pair; import org.apache.pekko.stream.Materializer; @@ -41,6 +33,14 @@ import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.testkit.javadsl.TestKit; +import org.bson.Document; +import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; +import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.mongodb.client.result.DeleteResult; /** * Tests {@link Cleanup}. @@ -68,7 +68,8 @@ public void emptyStream() { when(mongoReadJournal.getNewestSnapshotsAbove(any(), anyInt(), eq(true), any(), any())) .thenReturn(Source.empty()); - final var underTest = new Cleanup(mongoReadJournal, materializer, () -> Pair.create(0, 1), + final var underTest = new Cleanup(mongoReadJournal, mock(ThreadSafeDittoLoggingAdapter.class), materializer, + () -> Pair.create(0, 1), Duration.ZERO, 1, 1, true); final var result = underTest.getCleanupStream("") .flatMapConcat(x -> x) @@ -96,7 +97,8 @@ public void deleteFinalDeletedSnapshot() { invocation.getArgument(1) * 1000L + invocation.getArgument(2) * 10L))) .when(mongoReadJournal).deleteSnapshots(any(), anyLong(), anyLong()); - final var underTest = new Cleanup(mongoReadJournal, materializer, () -> Pair.create(0, 1), + final var underTest = new Cleanup(mongoReadJournal, mock(ThreadSafeDittoLoggingAdapter.class), materializer, + () -> Pair.create(0, 1), Duration.ZERO, 1, 4, true); final var result = underTest.getCleanupStream("") @@ -130,7 +132,8 @@ public void excludeFinalDeletedSnapshot() { invocation.getArgument(1) * 1000L + invocation.getArgument(2) * 10L))) .when(mongoReadJournal).deleteSnapshots(any(), anyLong(), anyLong()); - final var underTest = new Cleanup(mongoReadJournal, materializer, () -> Pair.create(0, 1), + final var underTest = new Cleanup(mongoReadJournal, mock(ThreadSafeDittoLoggingAdapter.class), materializer, + () -> Pair.create(0, 1), Duration.ZERO, 1, 4, false); final var result = underTest.getCleanupStream("") @@ -172,7 +175,8 @@ public void ignorePidsNotResponsibleFor() { .when(mongoReadJournal).deleteSnapshots(any(), anyLong(), anyLong()); // WHEN: the instance is responsible for 1/3 of the 3 PIDs - final var underTest = new Cleanup(mongoReadJournal, materializer, () -> Pair.create(2, 3), + final var underTest = new Cleanup(mongoReadJournal, mock(ThreadSafeDittoLoggingAdapter.class), materializer, + () -> Pair.create(2, 3), Duration.ZERO, 1, 4, false); final var result = underTest.getCleanupStream("") diff --git a/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CreditsTest.java b/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CreditsTest.java index 47d1ab820d..69f15521ed 100644 --- a/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CreditsTest.java +++ b/internal/utils/persistent-actors/src/test/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/CreditsTest.java @@ -27,14 +27,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAccumulator; -import org.bson.Document; -import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.mongodb.client.result.DeleteResult; - import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.event.Logging; import org.apache.pekko.japi.Pair; @@ -48,6 +40,14 @@ import org.apache.pekko.stream.testkit.javadsl.TestSink; import org.apache.pekko.stream.testkit.javadsl.TestSource; import org.apache.pekko.testkit.javadsl.TestKit; +import org.bson.Document; +import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; +import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.mongodb.client.result.DeleteResult; /** * Tests {@link Credits}. @@ -134,7 +134,8 @@ public void onePersistenceWriteAllowedPerCredit() { // mock timer permits 1 batch of credit, after which no credit is given out final var mockTimerResult = new AtomicLong(0L); doAnswer(inv -> mockTimerResult.getAndSet(1001L)).when(mockTimer).getThenReset(); - final var cleanup = new Cleanup(mongoReadJournal, materializer, () -> Pair.create(0, 1), + final var cleanup = new Cleanup(mongoReadJournal, mock(ThreadSafeDittoLoggingAdapter.class), materializer, + () -> Pair.create(0, 1), Duration.ZERO, 1, 4, true); final var underTest = new Credits(getFastCreditConfig(4), mockTimer); diff --git a/policies/service/src/main/resources/policies.conf b/policies/service/src/main/resources/policies.conf index e28c8e7b60..5be4ed26ad 100755 --- a/policies/service/src/main/resources/policies.conf +++ b/policies/service/src/main/resources/policies.conf @@ -20,14 +20,23 @@ ditto { database = ${?MONGO_DB_DATABASE} read-journal { + should-create-additional-snapshot-aggregation-indexes = true + should-create-additional-snapshot-aggregation-indexes = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEXES} + hint-name-filterPidsThatDoesntContainTagInNewestEntry = null hint-name-filterPidsThatDoesntContainTagInNewestEntry = ${?MONGODB_READ_JOURNAL_HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY} hint-name-listLatestJournalEntries = null hint-name-listLatestJournalEntries = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES} - hint-name-listNewestActiveSnapshotsByBatch = "_id_" - hint-name-listNewestActiveSnapshotsByBatch = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH} + hint-name-listNewestActiveSnapshotsByBatchPidId = null + hint-name-listNewestActiveSnapshotsByBatchPidId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID} + + hint-name-listNewestActiveSnapshotsByBatchPid = null + hint-name-listNewestActiveSnapshotsByBatchPid = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID} + + hint-name-listNewestActiveSnapshotsByBatchId = null + hint-name-listNewestActiveSnapshotsByBatchId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID} } } diff --git a/things/service/src/main/resources/things.conf b/things/service/src/main/resources/things.conf index 20a0d1d29a..f258327a28 100755 --- a/things/service/src/main/resources/things.conf +++ b/things/service/src/main/resources/things.conf @@ -26,14 +26,23 @@ ditto { database = ${?MONGO_DB_DATABASE} read-journal { + should-create-additional-snapshot-aggregation-indexes = true + should-create-additional-snapshot-aggregation-indexes = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEXES} + hint-name-filterPidsThatDoesntContainTagInNewestEntry = null hint-name-filterPidsThatDoesntContainTagInNewestEntry = ${?MONGODB_READ_JOURNAL_HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY} hint-name-listLatestJournalEntries = null hint-name-listLatestJournalEntries = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES} - hint-name-listNewestActiveSnapshotsByBatch = "_id_" - hint-name-listNewestActiveSnapshotsByBatch = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH} + hint-name-listNewestActiveSnapshotsByBatchPidId = null + hint-name-listNewestActiveSnapshotsByBatchPidId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID} + + hint-name-listNewestActiveSnapshotsByBatchPid = null + hint-name-listNewestActiveSnapshotsByBatchPid = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID} + + hint-name-listNewestActiveSnapshotsByBatchId = null + hint-name-listNewestActiveSnapshotsByBatchId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID} } }