Skip to content

Commit

Permalink
Merge branch 'master' into alex/replace_last_records_from_paginators
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda authored Mar 26, 2024
2 parents 7b514e0 + 7b981cc commit 2e976b1
Show file tree
Hide file tree
Showing 26 changed files with 390 additions and 347 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/connectors_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
- '*'
- 'airbyte-ci/**/*'
- 'airbyte-integrations/connectors/**/*'
- 'airbyte-cdk/**/*'
- 'airbyte-cdk/java/**/*'
- 'buildSrc/**/*'
# The Connector CI Tests is a status check emitted by airbyte-ci
# We make it pass once we have determined that there are no changes to the connectors
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/python_cdk_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
- name: Install Dependencies
id: install_dependencies
working-directory: airbyte-cdk/python
run: poetry install
run: poetry install --all-extras
- name: Build CDK Package
working-directory: airbyte-cdk/python
run: poetry run poe build
Expand Down
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.27.4 | 2024-03-25 | [\#36333](https://github.com/airbytehq/airbyte/pull/36333) | Sunset DebeziumSourceDecoratingIterator. |
| 0.26.1 | 2024-03-19 | [\#35599](https://github.com/airbytehq/airbyte/pull/35599) | Sunset SourceDecoratingIterator. |
| 0.26.0 | 2024-03-19 | [\#36263](https://github.com/airbytehq/airbyte/pull/36263) | Improve conversion of debezium Date type for some edge case in mssql. |
| 0.25.0 | 2024-03-18 | [\#36203](https://github.com/airbytehq/airbyte/pull/36203) | Wiring of Transformer to StagingConsumerFactory and JdbcBufferedConsumerFactory; import changes for Kotlin conversion; State message logs to debug |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.27.3
version=0.27.4
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.debezium.internals.*;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand Down Expand Up @@ -125,16 +127,18 @@ public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final Debez
final Long syncCheckpointRecords = config.has(SYNC_CHECKPOINT_RECORDS_PROPERTY)
? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
: SYNC_CHECKPOINT_RECORDS;
return AutoCloseableIterators.fromIterator(new DebeziumStateDecoratingIterator<>(
eventIterator,
cdcStateHandler,

DebeziumMessageProducer messageProducer = new DebeziumMessageProducer(cdcStateHandler,
targetPosition,
eventConverter,
offsetManager,
trackSchemaHistory,
schemaHistoryManager.orElse(null),
syncCheckpointDuration,
syncCheckpointRecords));
schemaHistoryManager);

// Usually sourceStateIterator requires airbyteStream as input. For DBZ iterator, stream is not used
// at all thus we will pass in null.
SourceStateIterator iterator =
new SourceStateIterator<>(eventIterator, null, messageProducer, new StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration));
return AutoCloseableIterators.fromIterator(iterator);
}

public static boolean isAnyStreamIncrementalSyncMode(final ConfiguredAirbyteCatalog catalog) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.debezium.internals;

import io.airbyte.cdk.integrations.debezium.CdcStateHandler;
import io.airbyte.cdk.integrations.debezium.CdcTargetPosition;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DebeziumMessageProducer<T> implements SourceStateMessageProducer<ChangeEventWithMetadata> {

private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumMessageProducer.class);

private final CdcStateHandler cdcStateHandler;

/**
* `checkpointOffsetToSend` is used as temporal storage for the offset that we want to send as
* message. As Debezium is reading records faster that we process them, if we try to send
* `offsetManger.read()` offset, it is possible that the state is behind the record we are currently
* propagating. To avoid that, we store the offset as soon as we reach the checkpoint threshold
* (time or records) and we wait to send it until we are sure that the record we are processing is
* behind the offset to be sent.
*/
private final HashMap<String, String> checkpointOffsetToSend = new HashMap<>();

/**
* `previousCheckpointOffset` is used to make sure we don't send duplicated states with the same
* offset. Is it possible that the offset Debezium report doesn't move for a period of time, and if
* we just rely on the `offsetManger.read()`, there is a chance to sent duplicate states, generating
* an unneeded usage of networking and processing.
*/
private final HashMap<String, String> initialOffset, previousCheckpointOffset;
private final AirbyteFileOffsetBackingStore offsetManager;
private final CdcTargetPosition<T> targetPosition;
private final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager;

private boolean shouldEmitStateMessage = false;

private final DebeziumEventConverter eventConverter;

public DebeziumMessageProducer(
final CdcStateHandler cdcStateHandler,
final CdcTargetPosition targetPosition,
final DebeziumEventConverter eventConverter,
final AirbyteFileOffsetBackingStore offsetManager,
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager) {
this.cdcStateHandler = cdcStateHandler;
this.targetPosition = targetPosition;
this.eventConverter = eventConverter;
this.offsetManager = offsetManager;
if (offsetManager == null) {
throw new RuntimeException("Offset manager cannot be null");
}
this.schemaHistoryManager = schemaHistoryManager;
this.previousCheckpointOffset = (HashMap<String, String>) offsetManager.read();
this.initialOffset = new HashMap<>(this.previousCheckpointOffset);
}

@Override
public AirbyteStateMessage generateStateMessageAtCheckpoint(ConfiguredAirbyteStream stream) {
LOGGER.info("Sending CDC checkpoint state message.");
final AirbyteStateMessage stateMessage = createStateMessage(checkpointOffsetToSend);
previousCheckpointOffset.clear();
previousCheckpointOffset.putAll(checkpointOffsetToSend);
checkpointOffsetToSend.clear();
shouldEmitStateMessage = false;
return stateMessage;
}

/**
* @param stream
* @param message
* @return
*/
@Override
public AirbyteMessage processRecordMessage(ConfiguredAirbyteStream stream, ChangeEventWithMetadata message) {

if (checkpointOffsetToSend.isEmpty()) {
try {
final HashMap<String, String> temporalOffset = (HashMap<String, String>) offsetManager.read();
if (!targetPosition.isSameOffset(previousCheckpointOffset, temporalOffset)) {
checkpointOffsetToSend.putAll(temporalOffset);
}
} catch (final ConnectException e) {
LOGGER.warn("Offset file is being written by Debezium. Skipping CDC checkpoint in this loop.");
}
}

if (checkpointOffsetToSend.size() == 1 && !message.isSnapshotEvent()) {
if (targetPosition.isEventAheadOffset(checkpointOffsetToSend, message)) {
shouldEmitStateMessage = true;
} else {
LOGGER.info("Encountered records with the same event offset.");
}
}

return eventConverter.toAirbyteMessage(message);
}

@Override
public AirbyteStateMessage createFinalStateMessage(ConfiguredAirbyteStream stream) {

final var syncFinishedOffset = (HashMap<String, String>) offsetManager.read();
if (targetPosition.isSameOffset(initialOffset, syncFinishedOffset)) {
// Edge case where no progress has been made: wrap up the
// sync by returning the initial offset instead of the
// current offset. We do this because we found that
// for some databases, heartbeats will cause Debezium to
// overwrite the offset file with a state which doesn't
// include all necessary data such as snapshot completion.
// This is the case for MS SQL Server, at least.
return createStateMessage(initialOffset);
}
return createStateMessage(syncFinishedOffset);
}

@Override
public boolean shouldEmitStateMessage(ConfiguredAirbyteStream stream) {
return shouldEmitStateMessage;
}

/**
* Creates {@link AirbyteStateMessage} while updating CDC data, used to checkpoint the state of the
* process.
*
* @return {@link AirbyteStateMessage} which includes offset and schema history if used.
*/
private AirbyteStateMessage createStateMessage(final Map<String, String> offset) {
final AirbyteStateMessage message =
cdcStateHandler.saveState(offset, schemaHistoryManager.map(AirbyteSchemaHistoryStorage::read).orElse(null)).getState();
return message;
}

}
Loading

0 comments on commit 2e976b1

Please sign in to comment.