Skip to content

Commit

Permalink
enable configuring even more flexibly indexes to hint for in Cleanu…
Browse files Browse the repository at this point in the history
…p aggregations

* also create indexes speeding up the aggregation on snapshot collection

Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Jun 11, 2024
1 parent 48fbb99 commit 7753e04
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 95 deletions.
13 changes: 11 additions & 2 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,23 @@ ditto {
database = ${?MONGO_DB_DATABASE}

read-journal {
should-create-additional-snapshot-aggregation-indexes = true
should-create-additional-snapshot-aggregation-indexes = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEXES}

hint-name-filterPidsThatDoesntContainTagInNewestEntry = null
hint-name-filterPidsThatDoesntContainTagInNewestEntry = ${?MONGODB_READ_JOURNAL_HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY}

hint-name-listLatestJournalEntries = null
hint-name-listLatestJournalEntries = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES}

hint-name-listNewestActiveSnapshotsByBatch = "_id_"
hint-name-listNewestActiveSnapshotsByBatch = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH}
hint-name-listNewestActiveSnapshotsByBatchPidId = null
hint-name-listNewestActiveSnapshotsByBatchPidId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID}

hint-name-listNewestActiveSnapshotsByBatchPid = null
hint-name-listNewestActiveSnapshotsByBatchPid = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID}

hint-name-listNewestActiveSnapshotsByBatchId = null
hint-name-listNewestActiveSnapshotsByBatchId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,27 @@ public final class DefaultMongoReadJournalConfig implements MongoReadJournalConf

private static final String CONFIG_PATH = "read-journal";

private final boolean createAdditionalSnapshotAggregationIndexes;
@Nullable private final String hintNameFilterPidsThatDoesntContainTagInNewestEntry;
@Nullable private final String hintNameListLatestJournalEntries;
@Nullable private final String listNewestActiveSnapshotsByBatch;
@Nullable private final String listNewestActiveSnapshotsByBatchPidId;
@Nullable private final String listNewestActiveSnapshotsByBatchPid;
@Nullable private final String listNewestActiveSnapshotsByBatchId;

private DefaultMongoReadJournalConfig(final ScopedConfig config) {
createAdditionalSnapshotAggregationIndexes = config.getBoolean(
MongoReadJournalConfigValue.SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEXES.getConfigPath()
);
hintNameFilterPidsThatDoesntContainTagInNewestEntry = getNullableString(config,
MongoReadJournalConfigValue.HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY);
hintNameListLatestJournalEntries = getNullableString(config,
MongoReadJournalConfigValue.HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES);
listNewestActiveSnapshotsByBatch = getNullableString(config,
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH);
listNewestActiveSnapshotsByBatchPidId = getNullableString(config,
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID);
listNewestActiveSnapshotsByBatchPid = getNullableString(config,
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID);
listNewestActiveSnapshotsByBatchId = getNullableString(config,
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID);
}

/**
Expand All @@ -59,7 +69,15 @@ public static DefaultMongoReadJournalConfig of(final Config config) {

@Nullable
private static String getNullableString(final Config config, final KnownConfigValue configValue) {
return config.getIsNull(configValue.getConfigPath()) ? null : config.getString(configValue.getConfigPath());
return config.getIsNull(configValue.getConfigPath()) ? null :
Optional.of(config.getString(configValue.getConfigPath()))
.filter(s -> !s.equals("null"))
.orElse(null);
}

@Override
public boolean shouldCreateAdditionalSnapshotAggregationIndexes() {
return createAdditionalSnapshotAggregationIndexes;
}

@Override
Expand All @@ -73,8 +91,18 @@ public Optional<String> getIndexNameHintForListLatestJournalEntries() {
}

@Override
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatch() {
return Optional.ofNullable(listNewestActiveSnapshotsByBatch);
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPidId() {
return Optional.ofNullable(listNewestActiveSnapshotsByBatchPidId);
}

@Override
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPid() {
return Optional.ofNullable(listNewestActiveSnapshotsByBatchPid);
}

@Override
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchId() {
return Optional.ofNullable(listNewestActiveSnapshotsByBatchId);
}

@Override
Expand All @@ -86,25 +114,31 @@ public boolean equals(final Object o) {
return false;
}
final DefaultMongoReadJournalConfig that = (DefaultMongoReadJournalConfig) o;
return Objects.equals(hintNameFilterPidsThatDoesntContainTagInNewestEntry,
that.hintNameFilterPidsThatDoesntContainTagInNewestEntry) &&
return createAdditionalSnapshotAggregationIndexes == that.createAdditionalSnapshotAggregationIndexes &&
Objects.equals(hintNameFilterPidsThatDoesntContainTagInNewestEntry,
that.hintNameFilterPidsThatDoesntContainTagInNewestEntry) &&
Objects.equals(hintNameListLatestJournalEntries, that.hintNameListLatestJournalEntries) &&
Objects.equals(listNewestActiveSnapshotsByBatch, that.listNewestActiveSnapshotsByBatch);
Objects.equals(listNewestActiveSnapshotsByBatchPidId, that.listNewestActiveSnapshotsByBatchPidId);
}

@Override
public int hashCode() {
return Objects.hash(hintNameFilterPidsThatDoesntContainTagInNewestEntry, hintNameListLatestJournalEntries,
listNewestActiveSnapshotsByBatch);
return Objects.hash(createAdditionalSnapshotAggregationIndexes,
hintNameFilterPidsThatDoesntContainTagInNewestEntry, hintNameListLatestJournalEntries,
listNewestActiveSnapshotsByBatchPidId, listNewestActiveSnapshotsByBatchPid,
listNewestActiveSnapshotsByBatchId);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"hintNameFilterPidsThatDoesntContainTagInNewestEntry=" +
"createAdditionalSnapshotAggregationIndexes=" + createAdditionalSnapshotAggregationIndexes +
", hintNameFilterPidsThatDoesntContainTagInNewestEntry=" +
hintNameFilterPidsThatDoesntContainTagInNewestEntry +
", hintNameListLatestJournalEntries=" + hintNameListLatestJournalEntries +
", listNewestActiveSnapshotsByBatch=" + listNewestActiveSnapshotsByBatch +
", listNewestActiveSnapshotsByBatchPidId=" + listNewestActiveSnapshotsByBatchPidId +
", listNewestActiveSnapshotsByBatchPid=" + listNewestActiveSnapshotsByBatchPid +
", listNewestActiveSnapshotsByBatchId=" + listNewestActiveSnapshotsByBatchId +
"]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
@Immutable
public interface MongoReadJournalConfig {

/**
* @return whether additional indexes should be created in order to speed up MongoReadJournal aggregation queries
* on the snapshot collection.
*/
boolean shouldCreateAdditionalSnapshotAggregationIndexes();

/**
* @return the optional hint name for aggregation done in {@code filterPidsThatDoesntContainTagInNewestEntry}.
*/
Expand All @@ -36,17 +42,35 @@ public interface MongoReadJournalConfig {
Optional<String> getIndexNameHintForListLatestJournalEntries();

/**
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch}.
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} containing both
* "pid" and "_id" fields in first "$match".
*/
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatch();
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPidId();

/**
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} only containing
* "pid" field in first "$match".
*/
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPid();

/**
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} only containing
* "_id" field in first "$match".
*/
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchId();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code MongoReadJournalConfig}.
*/
enum MongoReadJournalConfigValue implements KnownConfigValue {

/**
* Whether additional indexes should be created in order to speed up MongoReadJournal aggregation queries
* on the snapshot collection.
*/
SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEXES("should-create-additional-snapshot-aggregation-indexes", true),

/**
* Hint name for aggregation done in {@code filterPidsThatDoesntContainTagInNewestEntry}.
*/
Expand All @@ -58,9 +82,19 @@ enum MongoReadJournalConfigValue implements KnownConfigValue {
HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES("hint-name-listLatestJournalEntries", null),

/**
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch}.
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchPidId}.
*/
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID("hint-name-listNewestActiveSnapshotsByBatchPidId", null),

/**
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchPid}.
*/
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID("hint-name-listNewestActiveSnapshotsByBatchPid", null),

/**
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchId}.
*/
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH("hint-name-listNewestActiveSnapshotsByBatch", null);
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID("hint-name-listNewestActiveSnapshotsByBatchId", null);

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -166,6 +167,12 @@ public final class MongoReadJournal implements CurrentEventsByPersistenceIdQuery
private static final Index TAG_PID_INDEX =
IndexFactory.newInstance("ditto_tag_pid", List.of(J_TAGS, J_PROCESSOR_ID), false, true);

private static final Index SNAPS_PID_ID_INDEX =
IndexFactory.newInstance("snaps_pid_id_index", List.of(S_PROCESSOR_ID, S_ID), false, false);

private static final Index SNAPS_PID_INDEX =
IndexFactory.newInstance("snaps_pid_index", List.of(S_PROCESSOR_ID), false, false);

private final String journalCollection;
private final String snapsCollection;
private final DittoMongoClient mongoClient;
Expand Down Expand Up @@ -201,7 +208,8 @@ public static MongoReadJournal newInstance(final ActorSystem system) {
final Config config = system.settings().config();
final MongoDbConfig mongoDbConfig =
DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config));
return newInstance(config, MongoClientWrapper.newInstance(mongoDbConfig), mongoDbConfig.getReadJournalConfig(), system);
return newInstance(config, MongoClientWrapper.newInstance(mongoDbConfig), mongoDbConfig.getReadJournalConfig(),
system);
}

/**
Expand Down Expand Up @@ -240,6 +248,20 @@ public CompletionStage<Done> ensureTagPidIndex() {
return indexInitializer.createNonExistingIndices(journalCollection, List.of(TAG_PID_INDEX));
}

/**
* Ensure a compound index exists for snapshot cleanup aggregation matching on "pid" and "_id".
*
* @return a future that completes after index creation completes or fails when index creation fails.
*/
public CompletionStage<Done> ensureSnapshotCollectionPidIdIndex() {
if (readJournalConfig.shouldCreateAdditionalSnapshotAggregationIndexes()) {
return indexInitializer.createNonExistingIndices(snapsCollection,
List.of(SNAPS_PID_ID_INDEX, SNAPS_PID_INDEX));
} else {
return CompletableFuture.completedFuture(Done.getInstance());
}
}

/**
* Retrieve all unique PIDs in journals. Does its best not to create long-living cursors on the database by reading
* {@code batchSize} events per query.
Expand Down Expand Up @@ -349,6 +371,7 @@ private Source<String, NotUsed> filterPidsThatDoesntContainTagInNewestEntry(fina
));
final AggregatePublisher<Document> hintedAggregate =
readJournalConfig.getIndexNameHintForFilterPidsThatDoesntContainTagInNewestEntry()
.filter(hint -> !hint.equals("null"))
.map(aggregate::hintString)
.orElse(aggregate);
return Source.fromPublisher(hintedAggregate)
Expand Down Expand Up @@ -928,7 +951,8 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(

final List<Bson> pipeline = new ArrayList<>(5);
// match stage
pipeline.add(Aggregates.match(snapshotFilter.toMongoFilter()));
final Bson matchFilter = snapshotFilter.toMongoFilter();
pipeline.add(Aggregates.match(matchFilter));

// sort stage
pipeline.add(Aggregates.sort(Sorts.orderBy(Sorts.ascending(S_PROCESSOR_ID), Sorts.descending(S_SN))));
Expand All @@ -947,8 +971,8 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
final String items = "i";
pipeline.add(Aggregates.group(
new Document("_id", new BsonNull()),
Accumulators.max(maxPid, "$"+ S_ID),
Accumulators.push(items,"$$ROOT")));
Accumulators.max(maxPid, "$" + S_ID),
Accumulators.push(items, "$$ROOT")));

// redact stage - "$$PRUNE"s documents with "__lifecycle" = DELETED if includeDeleted=false
// if includeDeleted=true keeps them using "$$DESCEND"
Expand All @@ -963,13 +987,15 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
)));

final AggregatePublisher<Document> aggregate = snapshotStore.aggregate(pipeline);
final AggregatePublisher<Document> hintedAggregate =
readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatch()
.map(aggregate::hintString)
.orElse(aggregate);
final Optional<String> indexHint = calculateIndexHint(matchFilter);
final AggregatePublisher<Document> hintedAggregate = indexHint
.filter(hint -> !hint.equals("null"))
.map(aggregate::hintString)
.orElse(aggregate);
return Source.fromPublisher(
hintedAggregate
.batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!)
hintedAggregate
.batchSize(batchSize)
// use batchSize also for the cursor batchSize (16 by default bc of backpressure!)
)
.flatMapConcat(document -> {
final String theMaxPid = document.getString(maxPid);
Expand All @@ -983,6 +1009,23 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
});
}

private Optional<String> calculateIndexHint(final Bson matchFilter) {
final String matchJson = matchFilter.toBsonDocument().toJson();
final boolean matchContainsPid = matchJson.contains("\"pid\":");
final boolean matchContainsId = matchJson.contains("\"_id\":");
final Optional<String> indexHint;
if (matchContainsPid && matchContainsId) {
indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchPidId();
} else if (matchContainsPid) {
indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchPid();
} else if (matchContainsId) {
indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchId();
} else {
indexHint = Optional.empty();
}
return indexHint;
}

private static Source<List<String>, NotUsed> listJournalEntryTags(final MongoCollection<Document> journal,
final String pid) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import java.util.function.Supplier;
import java.util.stream.LongStream;

import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;

import org.apache.pekko.NotUsed;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Source;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;

/**
* An Pekko stream to handle background cleanup regulated by insert times.
Expand All @@ -42,6 +42,7 @@ final class Cleanup {
private final boolean deleteFinalDeletedSnapshot;

Cleanup(final MongoReadJournal readJournal,
final ThreadSafeDittoLoggingAdapter logger,
final Materializer materializer,
final Supplier<Pair<Integer, Integer>> responsibilitySupplier,
final Duration historyRetentionDuration,
Expand All @@ -56,14 +57,20 @@ final class Cleanup {
this.readBatchSize = readBatchSize;
this.deleteBatchSize = deleteBatchSize;
this.deleteFinalDeletedSnapshot = deleteFinalDeletedSnapshot;

readJournal.ensureSnapshotCollectionPidIdIndex().exceptionally(e -> {
logger.error(e, "Failed to create index for read journal snapshot aggregation queries");
return null;
});
}

static Cleanup of(final CleanupConfig config,
final MongoReadJournal readJournal,
final ThreadSafeDittoLoggingAdapter logger,
final Materializer materializer,
final Supplier<Pair<Integer, Integer>> responsibilitySupplier) {

return new Cleanup(readJournal, materializer, responsibilitySupplier,
return new Cleanup(readJournal, logger, materializer, responsibilitySupplier,
config.getHistoryRetentionDuration(),
config.getReadsPerQuery(),
config.getWritesPerCredit(),
Expand Down
Loading

0 comments on commit 7753e04

Please sign in to comment.