Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add CDC sync checkpointing based on time or records #21727

Merged
merged 42 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
cb055f2
This commit adds new functionality that generates checkpoints when do…
Jan 23, 2023
2ac1c24
Reformat code
Jan 23, 2023
70b7869
Reformat code
Jan 23, 2023
7c818f6
Reformat code
Jan 23, 2023
02290ab
Reformat code
Jan 23, 2023
adc4b70
Second attempt with ugly if statement
Jan 25, 2023
13760a2
Add `isRecordBehindOffset` function to make sure is safe to send the …
Jan 27, 2023
fd4dd98
Code formatting
Jan 27, 2023
2c168b2
Add additional check if the record is part of the snapshot load to sk…
Jan 30, 2023
d50bc6a
Merge branch 'master' into sergio/feat/cdc-checkpointing
sergio-ropero Jan 30, 2023
1ad7aac
Remove comments
Jan 30, 2023
61aa88e
Fix imports
Jan 30, 2023
7c32841
Fix format
Jan 30, 2023
3978770
Add check if the iterator has extra elements so we don't send state m…
Jan 31, 2023
89c0769
Merge branch 'master' into sergio/feat/cdc-checkpointing
sergio-ropero Jan 31, 2023
7ab60c5
Add a new check to avoid sending multiple state messages with same of…
Feb 1, 2023
f2cf9b9
Modify MSSQL and MySQL implementations
Feb 1, 2023
40521d8
Adds better control on Maps and include a test for time checkpoint.
Feb 1, 2023
3854ceb
Formatting
Feb 1, 2023
9d4203c
Improve code documentation and use default for CdcStateHandler new fu…
Feb 2, 2023
e85faa1
Merge branch 'master' into sergio/feat/cdc-checkpointing
sergio-ropero Feb 2, 2023
a51d111
Merge branch 'master' into sergio/feat/cdc-checkpointing
sergio-ropero Feb 3, 2023
e0fa1de
Sort out missing `final` and types from comments
Feb 6, 2023
48e986c
Minor improve in checkpoint validation
Feb 6, 2023
eabf7b9
Merge branch 'master' into sergio/feat/cdc-checkpointing
sergio-ropero Feb 6, 2023
87cbf44
Merge branch 'master' into sergio/feat/cdc-checkpointing
sergio-ropero Feb 6, 2023
58166a7
format files
Feb 6, 2023
c719870
Merge branch 'master' into sergio/feat/cdc-checkpointing
sergio-ropero Feb 9, 2023
ea59dbf
It's 2023!
Feb 9, 2023
25afcbb
Import issues
Feb 9, 2023
e89df5e
Merge branch 'master' into sergio/feat/cdc-checkpointing
Feb 10, 2023
6688c8a
Merge branch 'master' into sergio/feat/cdc-checkpointing
sergio-ropero Feb 15, 2023
6822dc8
Merge remote-tracking branch 'origin/sergio/feat/cdc-checkpointing' i…
Feb 15, 2023
08b9c90
Merge branch 'master' into sergio/feat/cdc-checkpointing
Feb 15, 2023
a226f76
Merge branch 'master' into sergio/feat/cdc-checkpointing
sergio-ropero Mar 1, 2023
a50b49e
Merge branch 'master' into sergio/feat/cdc-checkpointing
Mar 10, 2023
7dcd2f8
Changes after merging master
Mar 10, 2023
78cf575
Upgrade Debezium version in MySQL
Mar 10, 2023
e8c49a4
Bump Postgres and Alloydb
Mar 10, 2023
d4ca237
auto-bump connector version
octavia-squidington-iii Mar 13, 2023
c8044ae
Manually bumping version
Mar 13, 2023
328f26a
Merge remote-tracking branch 'origin/sergio/feat/cdc-checkpointing' i…
Mar 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
- name: AlloyDB for PostgreSQL
sourceDefinitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
dockerRepository: airbyte/source-alloydb
dockerImageTag: 2.0.1
dockerImageTag: 2.0.2
documentationUrl: https://docs.airbyte.com/integrations/sources/alloydb
icon: alloydb.svg
sourceType: database
Expand Down Expand Up @@ -1501,7 +1501,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 2.0.0
dockerImageTag: 2.0.2
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-alloydb:2.0.1"
- dockerImage: "airbyte/source-alloydb:2.0.2"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -11709,7 +11709,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:2.0.0"
- dockerImage: "airbyte/source-postgres:2.0.2"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,17 @@
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
import io.airbyte.integrations.debezium.internals.DebeziumEventUtils;
import io.airbyte.integrations.debezium.internals.DebeziumRecordIterator;
import io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher;
import io.airbyte.integrations.debezium.internals.*;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import io.debezium.engine.ChangeEvent;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -116,30 +109,21 @@ public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final Confi
publisher::close,
firstRecordWaitTime);

// convert to airbyte message.
final AutoCloseableIterator<AirbyteMessage> messageIterator = AutoCloseableIterators
.transform(
eventIterator,
(event) -> DebeziumEventUtils.toAirbyteMessage(event, cdcMetadataInjector, emittedAt));

// our goal is to get the state at the time this supplier is called (i.e. after all message records
// have been produced)
final Supplier<AirbyteMessage> stateMessageSupplier = () -> {
final Map<String, String> offset = offsetManager.read();
final String dbHistory = trackSchemaHistory ? schemaHistoryManager
.orElseThrow(() -> new RuntimeException("Schema History Tracking is true but manager is not initialised")).read() : null;

return cdcStateHandler.saveState(offset, dbHistory);
};

// wrap the supplier in an iterator so that we can concat it to the message iterator.
final Iterator<AirbyteMessage> stateMessageIterator = MoreIterators.singletonIteratorFromSupplier(stateMessageSupplier);

// this structure guarantees that the debezium engine will be closed, before we attempt to emit the
// state file. we want this so that we have a guarantee that the debezium offset file (which we use
// to produce the state file) is up-to-date.

return AutoCloseableIterators.concatWithEagerClose(messageIterator, AutoCloseableIterators.fromIterator(stateMessageIterator));
final Duration syncCheckpointSeconds =
config.get("sync_checkpoint_seconds") != null ? Duration.ofSeconds(config.get("sync_checkpoint_seconds").asLong())
: DebeziumStateDecoratingIterator.SYNC_CHECKPOINT_SECONDS;
final Long syncCheckpointRecords = config.get("sync_checkpoint_records") != null ? config.get("sync_checkpoint_records").asLong()
: DebeziumStateDecoratingIterator.SYNC_CHECKPOINT_RECORDS;
return AutoCloseableIterators.fromIterator(new DebeziumStateDecoratingIterator(
eventIterator,
cdcStateHandler,
cdcMetadataInjector,
emittedAt,
offsetManager,
trackSchemaHistory,
schemaHistoryManager.orElse(null),
syncCheckpointSeconds,
syncCheckpointRecords));
}

private Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager(final CdcSavedInfoFetcher cdcSavedInfoFetcher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,51 @@
package io.airbyte.integrations.debezium;

import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.debezium.engine.ChangeEvent;
import java.util.Map;

/**
* This interface is used to allow connectors to save the offset and schema history in the manner
* which suits them
* which suits them. Also, it adds some utils to verify CDC event status.
*/
public interface CdcStateHandler {

AirbyteMessage saveState(Map<String, String> offset, String dbHistory);

AirbyteMessage saveStateAfterCompletionOfSnapshotOfNewStreams();

/**
* This function indicates if the event is part of the snapshot or not.
*
* @param event Event from the CDC load
* @return Returns `true` when the DB event is part of the snapshot load. Otherwise, returns `false`
*/
default boolean isSnapshotEvent(final ChangeEvent<String, String> event) {
return false;
}

/**
* This function checks if the event we are processing in the loop is already behind the offset so
* the process can safety save the state.
*
* @param offset DB CDC offset
* @param event Event from the CDC load
* @return Returns `true` when the record is behind the offset. Otherwise, it returns `false`
*/
default boolean isRecordBehindOffset(final Map<String, String> offset, final ChangeEvent<String, String> event) {
return false;
}

/**
* This function compares two offsets to make sure both are not pointing to the same position. The
* main purpose is to avoid sending same offset multiple times.
*
* @param offsetA Offset to compare
* @param offsetB Offset to compare
* @return Returns `true` if both offsets are at the same position. Otherwise, it returns `false`
*/
default boolean isSameOffset(final Map<String, String> offsetA, final Map<String, String> offsetB) {
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import com.google.common.collect.AbstractIterator;
import io.airbyte.integrations.debezium.CdcMetadataInjector;
import io.airbyte.integrations.debezium.CdcStateHandler;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.debezium.engine.ChangeEvent;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class encapsulates CDC change events and adds the required functionality to create
* checkpoints for CDC replications. That way, if the process fails in the middle of a long sync, it
* will be able to recover for any acknowledged checkpoint in the next syncs.
*/
public class DebeziumStateDecoratingIterator extends AbstractIterator<AirbyteMessage> implements Iterator<AirbyteMessage> {

public static final Duration SYNC_CHECKPOINT_SECONDS = Duration.ofMinutes(15);
public static final Integer SYNC_CHECKPOINT_RECORDS = 10_000;

private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumStateDecoratingIterator.class);

private final Iterator<ChangeEvent<String, String>> changeEventIterator;
private final CdcStateHandler cdcStateHandler;
private final AirbyteFileOffsetBackingStore offsetManager;
private final boolean trackSchemaHistory;
private final AirbyteSchemaHistoryStorage schemaHistoryManager;
private final CdcMetadataInjector cdcMetadataInjector;
private final Instant emittedAt;

private boolean isSyncFinished = false;

/**
* These parameters control when a checkpoint message has to be sent in a CDC integration. We can
* emit a checkpoint when any of the following two conditions are met.
* <p/>
* 1. The amount of records in the current loop ({@code SYNC_CHECKPOINT_RECORDS}) is higher than a
* threshold defined by {@code SYNC_CHECKPOINT_RECORDS}.
* <p/>
* 2. Time between checkpoints ({@code dateTimeLastSync}) is higher than a {@code Duration} defined
* at {@code SYNC_CHECKPOINT_SECONDS}.
* <p/>
*/
private final Duration syncCheckpointDuration;
private final Long syncCheckpointRecords;
private OffsetDateTime dateTimeLastSync;
private Long recordsLastSync;
private boolean sendCheckpointMessage = false;

/**
* `checkpointOffsetToSend` is used as temporal storage for the offset that we want to send as
* message. As Debezium is reading records faster that we process them, if we try to send
* `offsetManger.read()` offset, it is possible that the state is behind the record we are currently
* propagating. To avoid that, we store the offset as soon as we reach the checkpoint threshold
* (time or records) and we wait to send it until we are sure that the record we are processing is
* behind the offset to be sent.
*/
private final HashMap<String, String> checkpointOffsetToSend = new HashMap<>();

/**
* `previousCheckpointOffset` is used to make sure we don't send duplicated states with the same
* offset. Is it possible that the offset Debezium report doesn't move for a period of time, and if
* we just rely on the `offsetManger.read()`, there is a chance to sent duplicate states, generating
* an unneeded usage of networking and processing.
*/
private final HashMap<String, String> previousCheckpointOffset;

/**
* @param changeEventIterator Base iterator that we want to enrich with checkpoint messages
* @param cdcStateHandler Handler to save the offset and schema history
* @param offsetManager Handler to read and write debezium offset file
* @param trackSchemaHistory Set true if the schema needs to be tracked
* @param schemaHistoryManager Handler to write schema. Needs to be initialized if
* trackSchemaHistory is set to true
* @param checkpointDuration Duration object with time between syncs
* @param checkpointRecords Number of records between syncs
*/
public DebeziumStateDecoratingIterator(final Iterator<ChangeEvent<String, String>> changeEventIterator,
final CdcStateHandler cdcStateHandler,
final CdcMetadataInjector cdcMetadataInjector,
final Instant emittedAt,
final AirbyteFileOffsetBackingStore offsetManager,
final boolean trackSchemaHistory,
final AirbyteSchemaHistoryStorage schemaHistoryManager,
final Duration checkpointDuration,
final Long checkpointRecords) {
this.changeEventIterator = changeEventIterator;
this.cdcStateHandler = cdcStateHandler;
this.cdcMetadataInjector = cdcMetadataInjector;
this.emittedAt = emittedAt;
this.offsetManager = offsetManager;
this.trackSchemaHistory = trackSchemaHistory;
this.schemaHistoryManager = schemaHistoryManager;

this.syncCheckpointDuration = checkpointDuration;
this.syncCheckpointRecords = checkpointRecords;
this.previousCheckpointOffset = (HashMap<String, String>) offsetManager.read();
resetCheckpointValues();
}

/**
* Computes the next record retrieved from Source stream. Emits state messages as checkpoints based
* on number of records or time lapsed.
*
* <p>
* If this method throws an exception, it will propagate outward to the {@code hasNext} or
* {@code next} invocation that invoked this method. Any further attempts to use the iterator will
* result in an {@link IllegalStateException}.
* </p>
*
* @return {@link AirbyteStateMessage} containing CDC data or state checkpoint message.
*/
@Override
protected AirbyteMessage computeNext() {
if (isSyncFinished) {
return endOfData();
}

if (sendCheckpointMessage) {
final AirbyteMessage stateMessage = createStateMessage(checkpointOffsetToSend);
previousCheckpointOffset.clear();
previousCheckpointOffset.putAll(checkpointOffsetToSend);
resetCheckpointValues();
return stateMessage;
}

if (changeEventIterator.hasNext()) {
try {
final ChangeEvent<String, String> event = changeEventIterator.next();
recordsLastSync++;

if (checkpointOffsetToSend.size() == 0 &&
(recordsLastSync >= syncCheckpointRecords ||
Duration.between(dateTimeLastSync, OffsetDateTime.now()).compareTo(syncCheckpointDuration) > 0)) {
// Using temporal variable to avoid reading teh offset twice, one in the condition and another in
// the assignation
final HashMap<String, String> temporalOffset = (HashMap<String, String>) offsetManager.read();
if (!cdcStateHandler.isSameOffset(previousCheckpointOffset, temporalOffset)) {
checkpointOffsetToSend.putAll(temporalOffset);
}
}

if (checkpointOffsetToSend.size() == 1
&& changeEventIterator.hasNext()
&& !cdcStateHandler.isSnapshotEvent(event)
&& cdcStateHandler.isRecordBehindOffset(checkpointOffsetToSend, event)) {
sendCheckpointMessage = true;
}

return DebeziumEventUtils.toAirbyteMessage(event, cdcMetadataInjector, emittedAt);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

isSyncFinished = true;
return createStateMessage(offsetManager.read());
}

/**
* Initialize or reset the checkpoint variables.
*/
private void resetCheckpointValues() {
sendCheckpointMessage = false;
checkpointOffsetToSend.clear();
recordsLastSync = 0L;
dateTimeLastSync = OffsetDateTime.now();
}

/**
* Creates {@link AirbyteStateMessage} while updating CDC data, used to checkpoint the state of the
* process.
*
* @return {@link AirbyteStateMessage} which includes offset and schema history if used.
*/
private AirbyteMessage createStateMessage(final Map<String, String> offset) {
if (trackSchemaHistory && schemaHistoryManager == null) {
throw new RuntimeException("Schema History Tracking is true but manager is not initialised");
}
if (offsetManager == null) {
throw new RuntimeException("Offset can not be null");
}

return cdcStateHandler.saveState(offset, schemaHistoryManager != null ? schemaHistoryManager.read() : null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.1
LABEL io.airbyte.version=2.0.2
LABEL io.airbyte.name=airbyte/source-alloydb-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-alloydb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.1
LABEL io.airbyte.version=2.0.2
LABEL io.airbyte.name=airbyte/source-alloydb
Loading