Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Source MongoDB Internal POC: CDC State Handling #29763

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
f3f0ead
Add state management and handling for CDC integration
jdpgrailsdev Aug 23, 2023
3ab0fe2
Formatting
jdpgrailsdev Aug 23, 2023
4b5552f
Fix typo
jdpgrailsdev Aug 23, 2023
2f2fe53
Merge branch 'master' into jonathan/cdc-state-handler
jdpgrailsdev Aug 23, 2023
f67aa27
Merge branch 'master' into jonathan/cdc-state-handler
jdpgrailsdev Aug 23, 2023
9317ccf
Merge branch 'master' into jonathan/cdc-state-handler
jdpgrailsdev Aug 23, 2023
9517f34
Merge branch 'master' into jonathan/cdc-state-handler
jdpgrailsdev Aug 24, 2023
fca3fbc
Merge branch 'master' into jonathan/cdc-state-handler
jdpgrailsdev Aug 24, 2023
b08b556
Automated Commit - Format and Process Resources Changes
jdpgrailsdev Aug 24, 2023
cbf5f09
Merge branch 'master' into jonathan/cdc-state-handler
jdpgrailsdev Aug 24, 2023
af7feba
Merge branch 'jonathan/cdc-metadata-injector' into jonathan/cdc-state…
jdpgrailsdev Aug 25, 2023
a8b2455
Merge branch 'jonathan/cdc-metadata-injector' into jonathan/cdc-state…
jdpgrailsdev Aug 25, 2023
291f6ff
PR feedback
jdpgrailsdev Aug 25, 2023
31726f3
Store offset map in CDC state
jdpgrailsdev Aug 25, 2023
ee781c1
Formatting
jdpgrailsdev Aug 25, 2023
28e4dc0
Merge branch 'jonathan/cdc-metadata-injector' into jonathan/cdc-state…
jdpgrailsdev Aug 25, 2023
971af01
disable checkpointing + saveStateAfterCompletionOfSnapshotOfNewStreams
subodh1810 Aug 25, 2023
9a13b5f
Merge branch 'jonathan/cdc-metadata-injector' into jonathan/cdc-state…
jdpgrailsdev Aug 28, 2023
8743788
Automated Commit - Formatting Changes
jdpgrailsdev Aug 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ void testTimestampExtractionFromEvent() throws IOException {
final BsonTimestamp expectedTimestamp = new BsonTimestamp(timestampSec, 2);
final String changeEventJson = MoreResources.readResource("mongodb/change_event.json");
final JsonNode changeEvent = Jsons.deserialize(changeEventJson);

final BsonTimestamp timestamp = MongoDbResumeTokenHelper.extractTimestampFromEvent(changeEvent);
assertNotNull(timestamp);
assertEquals(expectedTimestamp, timestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,30 @@
import static io.airbyte.integrations.source.mongodb.internal.MongoConstants.ID_FIELD;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;
import com.mongodb.connection.ClusterType;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStateManager;
import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStreamState;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import io.airbyte.protocol.models.v0.SyncMode;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
Expand All @@ -50,9 +43,6 @@ public class MongoDbSource extends BaseConnector implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSource.class);

/** Helper class for holding a collection-name and stream state together */
private record CollectionNameState(Optional<String> name, Optional<MongodbStreamState> state) {}

public static void main(final String[] args) throws Exception {
final Source source = new MongoDbSource();
LOGGER.info("starting source: {}", MongoDbSource.class);
Expand Down Expand Up @@ -106,15 +96,14 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
final JsonNode state) {
final var databaseName = config.get(DATABASE_CONFIGURATION_KEY).asText();
final var emittedAt = Instant.now();

final var states = convertState(state);
final var stateManager = MongoDbStateManager.createStateManager(state);
final MongoClient mongoClient = MongoConnectionUtils.createMongoClient(config);

try {
final var database = mongoClient.getDatabase(databaseName);
// TODO treat INCREMENTAL and FULL_REFRESH differently?
return AutoCloseableIterators.appendOnClose(AutoCloseableIterators.concatWithEagerClose(
convertCatalogToIterators(catalog, states, database, emittedAt),
convertCatalogToIterators(catalog, stateManager, database, emittedAt),
AirbyteTraceMessageUtility::emitStreamStatusTrace),
mongoClient::close);
} catch (final Exception e) {
Expand All @@ -123,41 +112,12 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
}
}

/**
* Converts the JsonNode into a map of mongodb collection names to stream states.
*/
@VisibleForTesting
protected Map<String, MongodbStreamState> convertState(final JsonNode state) {
// I'm unsure if the JsonNode data is going to be a singular AirbyteStateMessage or an array of
// AirbyteStateMessages.
// So this currently handles both cases, converting the singular message into a list of messages,
// leaving the list of messages
// as a list of messages, or returning an empty list.
final List<AirbyteStateMessage> states = Jsons.tryObject(state, AirbyteStateMessage.class)
.map(List::of)
.orElseGet(() -> Jsons.tryObject(state, AirbyteStateMessage[].class)
.map(Arrays::asList)
.orElse(List.of()));

// TODO add namespace support?
return states.stream()
.filter(s -> s.getType() == AirbyteStateType.STREAM)
.map(s -> new CollectionNameState(
Optional.ofNullable(s.getStream().getStreamDescriptor()).map(StreamDescriptor::getName),
Jsons.tryObject(s.getStream().getStreamState(), MongodbStreamState.class)))
// only keep states that could be parsed
.filter(p -> p.name.isPresent() && p.state.isPresent())
.collect(Collectors.toMap(
p -> p.name.orElseThrow(),
p -> p.state.orElseThrow()));
}

/**
* Converts the streams in the catalog into a list of AutoCloseableIterators.
*/
private List<AutoCloseableIterator<AirbyteMessage>> convertCatalogToIterators(
final ConfiguredAirbyteCatalog catalog,
final Map<String, MongodbStreamState> states,
final MongoDbStateManager stateManager,
final MongoDatabase database,
final Instant emittedAt) {
return catalog.getStreams()
Expand All @@ -175,12 +135,8 @@ private List<AutoCloseableIterator<AirbyteMessage>> convertCatalogToIterators(
final var fields = Projections.fields(Projections.include(CatalogHelpers.getTopLevelFieldNames(airbyteStream).stream().toList()));

// find the existing state, if there is one, for this steam
final Optional<MongodbStreamState> existingState = states.entrySet().stream()
// look only for states that match this stream's name
// TODO add namespace support
.filter(state -> state.getKey().equals(airbyteStream.getStream().getName()))
.map(Entry::getValue)
.findFirst();
final Optional<MongoDbStreamState> existingState =
stateManager.getStreamState(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());

// The filter determines the starting point of this iterator based on the state of this collection.
// If a state exists, it will use that state to create a query akin to
Expand All @@ -198,7 +154,7 @@ private List<AutoCloseableIterator<AirbyteMessage>> convertCatalogToIterators(
.sort(Sorts.ascending(ID_FIELD))
.cursor();

final var stateIterator = new MongoDbStateIterator(cursor, airbyteStream, existingState, emittedAt, CHECKPOINT_INTERVAL);
final var stateIterator = new MongoDbStateIterator(cursor, stateManager, airbyteStream, emittedAt, CHECKPOINT_INTERVAL);
return AutoCloseableIterators.fromIterator(stateIterator, cursor::close, null);
})
.toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,17 @@

import com.mongodb.MongoException;
import com.mongodb.client.MongoCursor;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.mongodb.MongoUtils;
import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStateManager;
import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStreamState;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStreamState;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,6 +32,7 @@ class MongoDbStateIterator implements Iterator<AirbyteMessage> {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbStateIterator.class);

private final MongoCursor<Document> iter;
private final MongoDbStateManager stateManager;
private final ConfiguredAirbyteStream stream;
private final List<String> fields;
private final Instant emittedAt;
Expand All @@ -60,23 +57,25 @@ class MongoDbStateIterator implements Iterator<AirbyteMessage> {
/**
* Constructor.
*
* @param iter MongoCursor that iterates over Mongo documents
* @param iter {@link MongoCursor} that iterates over Mongo documents
* @param stateManager {@link MongoDbStateManager} that manages global and per-stream state
* @param stream the stream that this iterator represents
* @param state the initial state of this stream
* @param emittedAt when this iterator was started
* @param checkpointInterval how often a state message should be emitted.
*/
MongoDbStateIterator(final MongoCursor<Document> iter,
final MongoDbStateManager stateManager,
final ConfiguredAirbyteStream stream,
Optional<MongodbStreamState> state,
final Instant emittedAt,
final int checkpointInterval) {
this.iter = iter;
this.stateManager = stateManager;
this.stream = stream;
this.checkpointInterval = checkpointInterval;
this.emittedAt = emittedAt;
fields = CatalogHelpers.getTopLevelFieldNames(stream).stream().toList();
lastId = state.map(MongodbStreamState::id).orElse(null);
this.fields = CatalogHelpers.getTopLevelFieldNames(stream).stream().toList();
this.lastId =
stateManager.getStreamState(stream.getStream().getName(), stream.getStream().getNamespace()).map(MongoDbStreamState::id).orElse(null);
}

@Override
Expand All @@ -103,20 +102,15 @@ public AirbyteMessage next() {
if ((count > 0 && count % checkpointInterval == 0) || finalStateNext) {
count = 0;

final var streamState = new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor()
.withName(stream.getStream().getName())
.withNamespace(stream.getStream().getNamespace()));
if (lastId != null) {
// TODO add type support in here once more than ObjectId fields are supported
streamState.withStreamState(Jsons.jsonNode(new MongodbStreamState(lastId)));
stateManager.updateStreamState(stream.getStream().getName(),
stream.getStream().getNamespace(), new MongoDbStreamState(lastId));
}

final var stateMessage = new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withStream(streamState);

return new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
return new AirbyteMessage()
.withType(Type.STATE)
.withState(stateManager.toState());
}

count++;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mongodb.internal.cdc;

import com.fasterxml.jackson.databind.JsonNode;

/**
* Represents the global CDC state that is used by Debezium as an offset.
*
* @param state The Debezium offset state as a {@link JsonNode}.
*/
public record MongoDbCdcState(JsonNode state) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mongodb.internal.cdc;

import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.debezium.CdcStateHandler;
import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStateManager;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of the {@link CdcStateHandler} that handles saving the CDC offset as Airbyte state
* for MongoDB.
*/
public class MongoDbCdcStateHandler implements CdcStateHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbCdcStateHandler.class);

private final MongoDbStateManager stateManager;

public MongoDbCdcStateHandler(MongoDbStateManager stateManager) {
this.stateManager = stateManager;
}

@Override
public AirbyteMessage saveState(final Map<String, String> offset, String dbHistory) {
final MongoDbCdcState cdcState = new MongoDbCdcState(Jsons.jsonNode(offset));

LOGGER.info("Saving Debezium state {}...", cdcState);
stateManager.updateCdcState(cdcState);

final AirbyteStateMessage stateMessage = stateManager.toState();
return new AirbyteMessage().withType(AirbyteMessage.Type.STATE).withState(stateMessage);
}

@Override
public AirbyteMessage saveStateAfterCompletionOfSnapshotOfNewStreams() {
throw new RuntimeException("Debezium is not used to carry out the snapshot of tables.");
}

@Override
public boolean isCdcCheckpointEnabled() {
return false;
}

}
Loading