Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable configuring even more flexibly indexes to hint for in Cleanup aggregations #1961

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,26 @@ ditto {
database = ${?MONGO_DB_DATABASE}

read-journal {
should-create-additional-snapshot-aggregation-index-pid-id = false
should-create-additional-snapshot-aggregation-index-pid-id = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID}

should-create-additional-snapshot-aggregation-index-pid = false
should-create-additional-snapshot-aggregation-index-pid = ${?MONGODB_READ_JOURNAL_SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID}

hint-name-filterPidsThatDoesntContainTagInNewestEntry = null
hint-name-filterPidsThatDoesntContainTagInNewestEntry = ${?MONGODB_READ_JOURNAL_HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY}

hint-name-listLatestJournalEntries = null
hint-name-listLatestJournalEntries = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES}

hint-name-listNewestActiveSnapshotsByBatch = "_id_"
hint-name-listNewestActiveSnapshotsByBatch = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH}
hint-name-listNewestActiveSnapshotsByBatchPidId = null
hint-name-listNewestActiveSnapshotsByBatchPidId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID}

hint-name-listNewestActiveSnapshotsByBatchPid = null
hint-name-listNewestActiveSnapshotsByBatchPid = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID}

hint-name-listNewestActiveSnapshotsByBatchId = null
hint-name-listNewestActiveSnapshotsByBatchId = ${?MONGODB_READ_JOURNAL_HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,31 @@ public final class DefaultMongoReadJournalConfig implements MongoReadJournalConf

private static final String CONFIG_PATH = "read-journal";

private final boolean createAdditionalSnapshotAggregationIndexPidId;
private final boolean createAdditionalSnapshotAggregationIndexPid;
@Nullable private final String hintNameFilterPidsThatDoesntContainTagInNewestEntry;
@Nullable private final String hintNameListLatestJournalEntries;
@Nullable private final String listNewestActiveSnapshotsByBatch;
@Nullable private final String listNewestActiveSnapshotsByBatchPidId;
@Nullable private final String listNewestActiveSnapshotsByBatchPid;
@Nullable private final String listNewestActiveSnapshotsByBatchId;

private DefaultMongoReadJournalConfig(final ScopedConfig config) {
createAdditionalSnapshotAggregationIndexPidId = config.getBoolean(
MongoReadJournalConfigValue.SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID.getConfigPath()
);
createAdditionalSnapshotAggregationIndexPid = config.getBoolean(
MongoReadJournalConfigValue.SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID.getConfigPath()
);
hintNameFilterPidsThatDoesntContainTagInNewestEntry = getNullableString(config,
MongoReadJournalConfigValue.HINT_NAME_FILTER_PIDS_THAT_DOESNT_CONTAIN_TAG_IN_NEWEST_ENTRY);
hintNameListLatestJournalEntries = getNullableString(config,
MongoReadJournalConfigValue.HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES);
listNewestActiveSnapshotsByBatch = getNullableString(config,
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH);
listNewestActiveSnapshotsByBatchPidId = getNullableString(config,
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID);
listNewestActiveSnapshotsByBatchPid = getNullableString(config,
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID);
listNewestActiveSnapshotsByBatchId = getNullableString(config,
MongoReadJournalConfigValue.HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID);
}

/**
Expand All @@ -59,7 +73,20 @@ public static DefaultMongoReadJournalConfig of(final Config config) {

@Nullable
private static String getNullableString(final Config config, final KnownConfigValue configValue) {
return config.getIsNull(configValue.getConfigPath()) ? null : config.getString(configValue.getConfigPath());
return config.getIsNull(configValue.getConfigPath()) ? null :
Optional.of(config.getString(configValue.getConfigPath()))
.filter(s -> !s.equals("null"))
.orElse(null);
}

@Override
public boolean shouldCreateAdditionalSnapshotAggregationIndexPidId() {
return createAdditionalSnapshotAggregationIndexPidId;
}

@Override
public boolean shouldCreateAdditionalSnapshotAggregationIndexPid() {
return createAdditionalSnapshotAggregationIndexPid;
}

@Override
Expand All @@ -73,8 +100,18 @@ public Optional<String> getIndexNameHintForListLatestJournalEntries() {
}

@Override
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatch() {
return Optional.ofNullable(listNewestActiveSnapshotsByBatch);
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPidId() {
return Optional.ofNullable(listNewestActiveSnapshotsByBatchPidId);
}

@Override
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPid() {
return Optional.ofNullable(listNewestActiveSnapshotsByBatchPid);
}

@Override
public Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchId() {
return Optional.ofNullable(listNewestActiveSnapshotsByBatchId);
}

@Override
Expand All @@ -86,25 +123,33 @@ public boolean equals(final Object o) {
return false;
}
final DefaultMongoReadJournalConfig that = (DefaultMongoReadJournalConfig) o;
return Objects.equals(hintNameFilterPidsThatDoesntContainTagInNewestEntry,
that.hintNameFilterPidsThatDoesntContainTagInNewestEntry) &&
return createAdditionalSnapshotAggregationIndexPidId == that.createAdditionalSnapshotAggregationIndexPidId &&
createAdditionalSnapshotAggregationIndexPid == that.createAdditionalSnapshotAggregationIndexPid &&
Objects.equals(hintNameFilterPidsThatDoesntContainTagInNewestEntry,
that.hintNameFilterPidsThatDoesntContainTagInNewestEntry) &&
Objects.equals(hintNameListLatestJournalEntries, that.hintNameListLatestJournalEntries) &&
Objects.equals(listNewestActiveSnapshotsByBatch, that.listNewestActiveSnapshotsByBatch);
Objects.equals(listNewestActiveSnapshotsByBatchPidId, that.listNewestActiveSnapshotsByBatchPidId);
}

@Override
public int hashCode() {
return Objects.hash(hintNameFilterPidsThatDoesntContainTagInNewestEntry, hintNameListLatestJournalEntries,
listNewestActiveSnapshotsByBatch);
return Objects.hash(createAdditionalSnapshotAggregationIndexPidId, createAdditionalSnapshotAggregationIndexPid,
hintNameFilterPidsThatDoesntContainTagInNewestEntry, hintNameListLatestJournalEntries,
listNewestActiveSnapshotsByBatchPidId, listNewestActiveSnapshotsByBatchPid,
listNewestActiveSnapshotsByBatchId);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"hintNameFilterPidsThatDoesntContainTagInNewestEntry=" +
"createAdditionalSnapshotAggregationIndexPidId=" + createAdditionalSnapshotAggregationIndexPidId +
", createAdditionalSnapshotAggregationIndexPid=" + createAdditionalSnapshotAggregationIndexPid +
", hintNameFilterPidsThatDoesntContainTagInNewestEntry=" +
hintNameFilterPidsThatDoesntContainTagInNewestEntry +
", hintNameListLatestJournalEntries=" + hintNameListLatestJournalEntries +
", listNewestActiveSnapshotsByBatch=" + listNewestActiveSnapshotsByBatch +
", listNewestActiveSnapshotsByBatchPidId=" + listNewestActiveSnapshotsByBatchPidId +
", listNewestActiveSnapshotsByBatchPid=" + listNewestActiveSnapshotsByBatchPid +
", listNewestActiveSnapshotsByBatchId=" + listNewestActiveSnapshotsByBatchId +
"]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@
@Immutable
public interface MongoReadJournalConfig {

/**
* @return whether additional index for "pid" + "_id" should be created in order to speed up MongoReadJournal
* aggregation queries on the snapshot collection.
*/
boolean shouldCreateAdditionalSnapshotAggregationIndexPidId();

/**
* @return whether additional index for "pid" should be created in order to speed up MongoReadJournal
* aggregation queries on the snapshot collection.
*/
boolean shouldCreateAdditionalSnapshotAggregationIndexPid();

/**
* @return the optional hint name for aggregation done in {@code filterPidsThatDoesntContainTagInNewestEntry}.
*/
Expand All @@ -36,17 +48,41 @@ public interface MongoReadJournalConfig {
Optional<String> getIndexNameHintForListLatestJournalEntries();

/**
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch}.
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} containing both
* "pid" and "_id" fields in first "$match".
*/
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatch();
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPidId();

/**
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} only containing
* "pid" field in first "$match".
*/
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchPid();

/**
* @return the optional hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch} only containing
* "_id" field in first "$match".
*/
Optional<String> getIndexNameHintForListNewestActiveSnapshotsByBatchId();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code MongoReadJournalConfig}.
*/
enum MongoReadJournalConfigValue implements KnownConfigValue {

/**
* Whether additional index for "pid" + "_id" should be created in order to speed up MongoReadJournal
* aggregation queries on the snapshot collection.
*/
SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID_ID("should-create-additional-snapshot-aggregation-index-pid-id", false),

/**
* Whether additional index for "pid" should be created in order to speed up MongoReadJournal aggregation
* queries on the snapshot collection.
*/
SHOULD_CREATE_ADDITIONAL_SNAPSHOT_AGGREGATION_INDEX_PID("should-create-additional-snapshot-aggregation-index-pid", false),

/**
* Hint name for aggregation done in {@code filterPidsThatDoesntContainTagInNewestEntry}.
*/
Expand All @@ -58,9 +94,19 @@ enum MongoReadJournalConfigValue implements KnownConfigValue {
HINT_NAME_LIST_LATEST_JOURNAL_ENTRIES("hint-name-listLatestJournalEntries", null),

/**
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatch}.
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchPidId}.
*/
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID_ID("hint-name-listNewestActiveSnapshotsByBatchPidId", null),

/**
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchPid}.
*/
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_PID("hint-name-listNewestActiveSnapshotsByBatchPid", null),

/**
* Hint name for aggregation done in {@code listNewestActiveSnapshotsByBatchId}.
*/
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH("hint-name-listNewestActiveSnapshotsByBatch", null);
HINT_NAME_LIST_NEWEST_ACTIVE_SNAPSHOT_BY_BATCH_ID("hint-name-listNewestActiveSnapshotsByBatchId", null);

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -166,6 +167,12 @@ public final class MongoReadJournal implements CurrentEventsByPersistenceIdQuery
private static final Index TAG_PID_INDEX =
IndexFactory.newInstance("ditto_tag_pid", List.of(J_TAGS, J_PROCESSOR_ID), false, true);

private static final Index SNAPS_PID_ID_INDEX =
IndexFactory.newInstance("snaps_pid_id_index", List.of(S_PROCESSOR_ID, S_ID), false, false);

private static final Index SNAPS_PID_INDEX =
IndexFactory.newInstance("snaps_pid_index", List.of(S_PROCESSOR_ID), false, false);

private final String journalCollection;
private final String snapsCollection;
private final DittoMongoClient mongoClient;
Expand Down Expand Up @@ -201,7 +208,8 @@ public static MongoReadJournal newInstance(final ActorSystem system) {
final Config config = system.settings().config();
final MongoDbConfig mongoDbConfig =
DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config));
return newInstance(config, MongoClientWrapper.newInstance(mongoDbConfig), mongoDbConfig.getReadJournalConfig(), system);
return newInstance(config, MongoClientWrapper.newInstance(mongoDbConfig), mongoDbConfig.getReadJournalConfig(),
system);
}

/**
Expand Down Expand Up @@ -240,6 +248,32 @@ public CompletionStage<Done> ensureTagPidIndex() {
return indexInitializer.createNonExistingIndices(journalCollection, List.of(TAG_PID_INDEX));
}

/**
* Ensure a compound index exists for snapshot cleanup aggregation matching on "pid" and "_id".
*
* @return a future that completes after index creation completes or fails when index creation fails.
*/
public CompletionStage<Done> ensureSnapshotCollectionPidIdIndex() {
if (readJournalConfig.shouldCreateAdditionalSnapshotAggregationIndexPidId()) {
return indexInitializer.createNonExistingIndices(snapsCollection, List.of(SNAPS_PID_ID_INDEX));
} else {
return CompletableFuture.completedFuture(Done.getInstance());
}
}

/**
* Ensure a compound index exists for snapshot cleanup aggregation matching on "pid" .
*
* @return a future that completes after index creation completes or fails when index creation fails.
*/
public CompletionStage<Done> ensureSnapshotCollectionPidIndex() {
if (readJournalConfig.shouldCreateAdditionalSnapshotAggregationIndexPid()) {
return indexInitializer.createNonExistingIndices(snapsCollection, List.of(SNAPS_PID_INDEX));
} else {
return CompletableFuture.completedFuture(Done.getInstance());
}
}

/**
* Retrieve all unique PIDs in journals. Does its best not to create long-living cursors on the database by reading
* {@code batchSize} events per query.
Expand Down Expand Up @@ -349,6 +383,7 @@ private Source<String, NotUsed> filterPidsThatDoesntContainTagInNewestEntry(fina
));
final AggregatePublisher<Document> hintedAggregate =
readJournalConfig.getIndexNameHintForFilterPidsThatDoesntContainTagInNewestEntry()
.filter(hint -> !hint.equals("null"))
.map(aggregate::hintString)
.orElse(aggregate);
return Source.fromPublisher(hintedAggregate)
Expand Down Expand Up @@ -928,7 +963,8 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(

final List<Bson> pipeline = new ArrayList<>(5);
// match stage
pipeline.add(Aggregates.match(snapshotFilter.toMongoFilter()));
final Bson matchFilter = snapshotFilter.toMongoFilter();
pipeline.add(Aggregates.match(matchFilter));

// sort stage
pipeline.add(Aggregates.sort(Sorts.orderBy(Sorts.ascending(S_PROCESSOR_ID), Sorts.descending(S_SN))));
Expand All @@ -947,8 +983,8 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
final String items = "i";
pipeline.add(Aggregates.group(
new Document("_id", new BsonNull()),
Accumulators.max(maxPid, "$"+ S_ID),
Accumulators.push(items,"$$ROOT")));
Accumulators.max(maxPid, "$" + S_ID),
Accumulators.push(items, "$$ROOT")));

// redact stage - "$$PRUNE"s documents with "__lifecycle" = DELETED if includeDeleted=false
// if includeDeleted=true keeps them using "$$DESCEND"
Expand All @@ -963,13 +999,15 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
)));

final AggregatePublisher<Document> aggregate = snapshotStore.aggregate(pipeline);
final AggregatePublisher<Document> hintedAggregate =
readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatch()
.map(aggregate::hintString)
.orElse(aggregate);
final Optional<String> indexHint = calculateIndexHint(matchFilter);
final AggregatePublisher<Document> hintedAggregate = indexHint
.filter(hint -> !hint.equals("null"))
.map(aggregate::hintString)
.orElse(aggregate);
return Source.fromPublisher(
hintedAggregate
.batchSize(batchSize) // use batchSize also for the cursor batchSize (16 by default bc of backpressure!)
hintedAggregate
.batchSize(batchSize)
// use batchSize also for the cursor batchSize (16 by default bc of backpressure!)
)
.flatMapConcat(document -> {
final String theMaxPid = document.getString(maxPid);
Expand All @@ -983,6 +1021,23 @@ private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
});
}

private Optional<String> calculateIndexHint(final Bson matchFilter) {
final String matchJson = matchFilter.toBsonDocument().toJson();
final boolean matchContainsPid = matchJson.contains("\"pid\":");
final boolean matchContainsId = matchJson.contains("\"_id\":");
final Optional<String> indexHint;
if (matchContainsPid && matchContainsId) {
indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchPidId();
} else if (matchContainsPid) {
indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchPid();
} else if (matchContainsId) {
indexHint = readJournalConfig.getIndexNameHintForListNewestActiveSnapshotsByBatchId();
} else {
indexHint = Optional.empty();
}
return indexHint;
}

private static Source<List<String>, NotUsed> listJournalEntryTags(final MongoCollection<Document> journal,
final String pid) {

Expand Down
Loading
Loading