Skip to content

Commit

Permalink
Checkpointing source-mssql (airbytehq#34182)
Browse files Browse the repository at this point in the history
Co-authored-by: nguyenaiden <duy@airbyte.io>
Co-authored-by: Xiaohan Song <xiaohan@airbyte.io>
Co-authored-by: Marius Posta <marius@airbyte.io>
  • Loading branch information
4 people authored and jatinyadav-cc committed Feb 21, 2024
1 parent acb425b commit aaacfe6
Show file tree
Hide file tree
Showing 45 changed files with 2,742 additions and 253 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.18.0 | 2024-02-08 | [\#33606](https://github.com/airbytehq/airbyte/pull/33606) | Add updated Initial and Incremental Stream State definitions for DB Sources. |
| 0.17.1 | 2024-02-08 | [\#35027](https://github.com/airbytehq/airbyte/pull/35027) | Make state handling thread safe in async destination framework. |
| 0.17.0 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Enable configuring async destination batch size. |
| 0.16.6 | 2024-02-07 | [\#34892](https://github.com/airbytehq/airbyte/pull/34892) | Improved testcontainers logging and support for unshared containers. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public final class JdbcConstants {
public static final String JDBC_COLUMN_TABLE_NAME = "TABLE_NAME";
public static final String JDBC_COLUMN_COLUMN_NAME = "COLUMN_NAME";
public static final String JDBC_COLUMN_DATA_TYPE = "DATA_TYPE";
public static final String JDBC_COLUMN_TYPE = "TYPE";

public static final String JDBC_COLUMN_TYPE_NAME = "TYPE_NAME";
public static final String JDBC_COLUMN_SIZE = "COLUMN_SIZE";
public static final String JDBC_INDEX_NAME = "INDEX_NAME";
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.17.1
version=0.18.0
2 changes: 2 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ dependencies {

testImplementation libs.testcontainers.jdbc
testImplementation libs.testcontainers.mysql
testImplementation libs.testcontainers.mssqlserver
testImplementation libs.testcontainers.postgresql
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'

Expand Down Expand Up @@ -141,6 +142,7 @@ dependencies {
// Mark as compile only to avoid leaking transitively to connectors
testFixturesCompileOnly libs.testcontainers.jdbc
testFixturesCompileOnly libs.testcontainers.postgresql
testFixturesCompileOnly libs.testcontainers.mssqlserver
testFixturesCompileOnly libs.testcontainers.cockroachdb
testFixturesImplementation libs.testcontainers.cockroachdb
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ default boolean isEventAheadOffset(final Map<String, String> offset, final Chang
* @return Returns `true` if both offsets are at the same position. Otherwise, it returns `false`
*/
default boolean isSameOffset(final Map<String, String> offsetA, final Map<String, String> offsetB) {
return true;
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public class DebeziumStateDecoratingIterator<T> extends AbstractIterator<Airbyte
private final Duration syncCheckpointDuration;
private final Long syncCheckpointRecords;
private OffsetDateTime dateTimeLastSync;
private Long recordsLastSync;
private long recordsLastSync;
private long recordsAllSyncs;
private boolean sendCheckpointMessage = false;

/**
Expand All @@ -69,7 +70,7 @@ public class DebeziumStateDecoratingIterator<T> extends AbstractIterator<Airbyte
* we just rely on the `offsetManger.read()`, there is a chance to sent duplicate states, generating
* an unneeded usage of networking and processing.
*/
private final HashMap<String, String> previousCheckpointOffset;
private final HashMap<String, String> initialOffset, previousCheckpointOffset;

private final DebeziumEventConverter eventConverter;

Expand Down Expand Up @@ -103,6 +104,7 @@ public DebeziumStateDecoratingIterator(final Iterator<ChangeEventWithMetadata> c
this.syncCheckpointDuration = checkpointDuration;
this.syncCheckpointRecords = checkpointRecords;
this.previousCheckpointOffset = (HashMap<String, String>) offsetManager.read();
this.initialOffset = new HashMap<>(this.previousCheckpointOffset);
resetCheckpointValues();
}

Expand Down Expand Up @@ -137,7 +139,7 @@ protected AirbyteMessage computeNext() {
final ChangeEventWithMetadata event = changeEventIterator.next();

if (cdcStateHandler.isCdcCheckpointEnabled()) {
if (checkpointOffsetToSend.size() == 0 &&
if (checkpointOffsetToSend.isEmpty() &&
(recordsLastSync >= syncCheckpointRecords ||
Duration.between(dateTimeLastSync, OffsetDateTime.now()).compareTo(syncCheckpointDuration) > 0)) {
// Using temporal variable to avoid reading teh offset twice, one in the condition and another in
Expand All @@ -160,11 +162,23 @@ protected AirbyteMessage computeNext() {
}
}
recordsLastSync++;
recordsAllSyncs++;
return eventConverter.toAirbyteMessage(event);
}

isSyncFinished = true;
return createStateMessage(offsetManager.read(), recordsLastSync);
final var syncFinishedOffset = (HashMap<String, String>) offsetManager.read();
if (recordsAllSyncs == 0L && targetPosition.isSameOffset(initialOffset, syncFinishedOffset)) {
// Edge case where no progress has been made: wrap up the
// sync by returning the initial offset instead of the
// current offset. We do this because we found that
// for some databases, heartbeats will cause Debezium to
// overwrite the offset file with a state which doesn't
// include all necessary data such as snapshot completion.
// This is the case for MS SQL Server, at least.
return createStateMessage(initialOffset, 0);
}
return createStateMessage(syncFinishedOffset, recordsLastSync);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.List;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -23,6 +25,8 @@ public class RelationalDbQueryUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(RelationalDbQueryUtils.class);

public record TableSizeInfo(Long tableSize, Long avgRowLength) {}

public static String getIdentifierWithQuoting(final String identifier, final String quoteString) {
// double-quoted values within a database name or column name should be wrapped with extra
// quoteString
Expand Down Expand Up @@ -79,4 +83,17 @@ public static <Database extends SqlDatabase> AutoCloseableIterator<JsonNode> que
}, airbyteStreamNameNamespacePair);
}

public static void logStreamSyncStatus(final List<ConfiguredAirbyteStream> streams, final String syncType) {
if (streams.isEmpty()) {
LOGGER.info("No Streams will be synced via {}.", syncType);
} else {
LOGGER.info("Streams to be synced via {} : {}", syncType, streams.size());
LOGGER.info("Streams: {}", prettyPrintConfiguredAirbyteStreamList(streams));
}
}

public static String prettyPrintConfiguredAirbyteStreamList(final List<ConfiguredAirbyteStream> streamList) {
return streamList.stream().map(s -> "%s.%s".formatted(s.getStream().getNamespace(), s.getStream().getName())).collect(Collectors.joining(", "));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.source.relationaldb;

import com.google.common.collect.Sets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class RelationalDbReadUtil {

public static List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final ConfiguredAirbyteCatalog catalog,
final Set<AirbyteStreamNameNamespacePair> alreadySyncedStreams) {
final Set<AirbyteStreamNameNamespacePair> allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog);
final Set<AirbyteStreamNameNamespacePair> newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySyncedStreams));
return catalog.getStreams().stream()
.filter(c -> c.getSyncMode() == SyncMode.INCREMENTAL)
.filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream())))
.map(Jsons::clone)
.collect(Collectors.toList());
}

public static List<ConfiguredAirbyteStream> identifyStreamsForCursorBased(final ConfiguredAirbyteCatalog catalog,
final List<ConfiguredAirbyteStream> streamsForInitialLoad) {

final Set<AirbyteStreamNameNamespacePair> initialLoadStreamsNamespacePairs =
streamsForInitialLoad.stream().map(stream -> AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))
.collect(
Collectors.toSet());
return catalog.getStreams().stream()
.filter(c -> c.getSyncMode() == SyncMode.INCREMENTAL)
.filter(stream -> !initialLoadStreamsNamespacePairs.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream())))
.map(Jsons::clone)
.collect(Collectors.toList());
}

public static AirbyteStreamNameNamespacePair convertNameNamespacePairFromV0(final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair v1NameNamespacePair) {
return new AirbyteStreamNameNamespacePair(v1NameNamespacePair.getName(), v1NameNamespacePair.getNamespace());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
"$schema": http://json-schema.org/draft-07/schema#
title: DbSource Models
type: object
description: DbSource Models
properties:
state_type:
"$ref": "#/definitions/StateType"
ordered_column_state:
"$ref": "#/definitions/OrderedColumnLoadStatus"
cursor_based_state:
"$ref": "#/definitions/CursorBasedStatus"
definitions:
StateType:
description: Enum to define the sync mode of stream state.
type: string
enum:
- cursor_based
- ordered_column
- cdc
CursorBasedStatus:
type: object
extends:
type: object
existingJavaType: "io.airbyte.cdk.integrations.source.relationaldb.models.DbStreamState"
properties:
state_type:
"$ref": "#/definitions/StateType"
version:
description: Version of state.
type: integer
OrderedColumnLoadStatus:
type: object
properties:
version:
description: Version of state.
type: integer
state_type:
"$ref": "#/definitions/StateType"
ordered_col:
description: ordered column name
type: string
ordered_col_val:
description: ordered column high watermark
type: string
incremental_state:
description: State to switch to after completion of the ordered column initial sync
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,8 @@ protected void testReadOneTableIncrementallyTwice() throws Exception {

protected void executeStatementReadIncrementallyTwice() {
testdb
.with("INSERT INTO %s(id, name, updated_at) VALUES (4, 'riker', '2006-10-19')", getFullyQualifiedTableName(TABLE_NAME))
.with("INSERT INTO %s(id, name, updated_at) VALUES (5, 'data', '2006-10-19')", getFullyQualifiedTableName(TABLE_NAME));
.with("INSERT INTO %s (id, name, updated_at) VALUES (4, 'riker', '2006-10-19')", getFullyQualifiedTableName(TABLE_NAME))
.with("INSERT INTO %s (id, name, updated_at) VALUES (5, 'data', '2006-10-19')", getFullyQualifiedTableName(TABLE_NAME));
}

protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(final String namespace) {
Expand Down
8 changes: 6 additions & 2 deletions airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.16.0'
cdkVersionRequired = '0.18.0'
features = ['db-sources']
useLocalCdk = false
}
Expand All @@ -15,7 +15,11 @@ configurations.all {
}
}


java {
compileJava {
options.compilerArgs += "-Xlint:-try,-rawtypes"
}
}

application {
mainClass = 'io.airbyte.integrations.source.mssql.MssqlSource'
Expand Down
6 changes: 5 additions & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 3.6.1
dockerImageTag: 3.7.0
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand All @@ -18,8 +18,12 @@ data:
name: Microsoft SQL Server (MSSQL)
registries:
cloud:
# CI pipeline is broken for mssql
dockerImageTag: 3.6.1
enabled: true
oss:
# CI pipeline is broken for mssql
dockerImageTag: 3.6.1
enabled: true
releaseStage: alpha
supportLevel: community
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@

package io.airbyte.integrations.source.mssql;

import static io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter.CDC_DELETED_AT;
import static io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_DEFAULT_CURSOR;
import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_EVENT_SERIAL_NO;
import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_LSN;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.integrations.debezium.CdcMetadataInjector;
import io.airbyte.integrations.source.mssql.cdc.MssqlDebeziumStateUtil.MssqlDebeziumStateAttributes;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicLong;

public class MssqlCdcConnectorMetadataInjector implements CdcMetadataInjector<Long> {
public class MssqlCdcConnectorMetadataInjector implements CdcMetadataInjector<MssqlDebeziumStateAttributes> {

private final long emittedAtConverted;

Expand Down Expand Up @@ -44,6 +47,17 @@ public void addMetaData(final ObjectNode event, final JsonNode source) {
event.put(CDC_DEFAULT_CURSOR, getCdcDefaultCursor());
}

@Override
public void addMetaDataToRowsFetchedOutsideDebezium(final ObjectNode record,
final String transactionTimestamp,
final MssqlDebeziumStateAttributes debeziumStateAttributes) {
record.put(CDC_UPDATED_AT, transactionTimestamp);
record.put(CDC_EVENT_SERIAL_NO, 1);
record.put(CDC_LSN, debeziumStateAttributes.lsn().toString());
record.put(CDC_DELETED_AT, (String) null);
record.put(CDC_DEFAULT_CURSOR, getCdcDefaultCursor());
}

@Override
public String namespace(final JsonNode source) {
return source.get("schema").asText();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static boolean isCdc(final JsonNode config) {
return false;
}

static Properties getDebeziumProperties(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog, final boolean isSnapshot) {
public static Properties getDebeziumProperties(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog, final boolean isSnapshot) {
final JsonNode config = database.getSourceConfig();
final JsonNode dbConfig = database.getDatabaseConfig();

Expand Down Expand Up @@ -94,8 +94,6 @@ static Properties getDebeziumProperties(final JdbcDatabase database, final Confi
? HEARTBEAT_INTERVAL_IN_TESTS
: HEARTBEAT_INTERVAL;
props.setProperty("heartbeat.interval.ms", Long.toString(heartbeatInterval.toMillis()));
// TODO: enable heartbeats in MS SQL Server.
props.setProperty("heartbeat.interval.ms", "0");

if (config.has("ssl_method")) {
final JsonNode sslConfig = config.get("ssl_method");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class MssqlCdcSavedInfoFetcher implements CdcSavedInfoFetcher {
private final JsonNode savedSchemaHistory;
private final boolean isSavedSchemaHistoryCompressed;

protected MssqlCdcSavedInfoFetcher(final CdcState savedState) {
public MssqlCdcSavedInfoFetcher(final CdcState savedState) {
final boolean savedStatePresent = savedState != null && savedState.getState() != null;
this.savedOffset = savedStatePresent ? savedState.getState().get(MSSQL_CDC_OFFSET) : null;
this.savedSchemaHistory = savedStatePresent ? savedState.getState().get(MSSQL_DB_HISTORY) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public MssqlCdcStateHandler(final StateManager stateManager) {
this.stateManager = stateManager;
}

@Override
public boolean isCdcCheckpointEnabled() {
return true;
}

@Override
public AirbyteMessage saveState(final Map<String, String> offset, final SchemaHistory<String> dbHistory) {
final Map<String, Object> state = new HashMap<>();
Expand Down
Loading

0 comments on commit aaacfe6

Please sign in to comment.