Skip to content

Commit

Permalink
DB sources cdc : upgrade Debezium version to 2.1.2 (#23112)
Browse files Browse the repository at this point in the history
* WIP

* remove wal2json

* revert test change

* working version for MySQL

* cleanup

* mssql changes

* rename module

* format

* undo unwanted change

* disable backward compatibility spec test

* fix acceptance-test-config.yml

* fix acceptance-test-config.yml

* update doc link

* fix mssql

* review comments

* master merge

* review comments

* disable few mssql tests

* temp commit for mssql tests experiment

* another temp commit

* another temp commit

* revert temp commits

* update test

* ssl support added for mssql cdc

* improve tests

* fix strict encrypt test

* make sql-server state backward compatible

* missed this one

* add error properties

* upgrade version and docs

* remove wal2json reference from docs

* auto-bump connector version

* add extra safety validation

* increase wait time

* auto-bump connector version

* update spec

---------

Co-authored-by: alafanechere <augustin.lafanechere@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people authored Mar 7, 2023
1 parent 3843eca commit f6ec876
Show file tree
Hide file tree
Showing 95 changed files with 867 additions and 655 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
- name: AlloyDB for PostgreSQL
sourceDefinitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
dockerRepository: airbyte/source-alloydb
dockerImageTag: 1.0.51
dockerImageTag: 2.0.0
documentationUrl: https://docs.airbyte.com/integrations/sources/alloydb
icon: alloydb.svg
sourceType: database
Expand Down Expand Up @@ -1126,7 +1126,7 @@
- name: Microsoft SQL Server (MSSQL)
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.4.29
dockerImageTag: 1.0.0
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
icon: mssql.svg
sourceType: database
Expand Down Expand Up @@ -1182,7 +1182,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 1.0.21
dockerImageTag: 2.0.0
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down Expand Up @@ -1468,7 +1468,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.51
dockerImageTag: 2.0.0
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
20 changes: 6 additions & 14 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-alloydb:1.0.51"
- dockerImage: "airbyte/source-alloydb:2.0.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -608,13 +608,9 @@
type: "string"
title: "Plugin"
description: "A logical decoding plugin installed on the PostgreSQL\
\ server. The `pgoutput` plugin is used by default. The `wal2json`\
\ plugin is deprecated and will soon be removed so it's not recommended\
\ to use. Read more about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#step-2-select-a-replication-plugin\"\
>selecting replication plugins</a>."
\ server."
enum:
- "pgoutput"
- "wal2json"
default: "pgoutput"
order: 2
replication_slot:
Expand Down Expand Up @@ -8343,7 +8339,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mssql:0.4.29"
- dockerImage: "airbyte/source-mssql:1.0.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/mssql"
connectionSpecification:
Expand Down Expand Up @@ -9218,7 +9214,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:1.0.21"
- dockerImage: "airbyte/source-mysql:2.0.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/mysql"
connectionSpecification:
Expand Down Expand Up @@ -11695,7 +11691,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.51"
- dockerImage: "airbyte/source-postgres:2.0.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -11933,13 +11929,9 @@
type: "string"
title: "Plugin"
description: "A logical decoding plugin installed on the PostgreSQL\
\ server. The `pgoutput` plugin is used by default. The `wal2json`\
\ plugin is deprecated and will soon be removed so it's not recommended\
\ to use. Read more about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#step-2-select-a-replication-plugin\"\
>selecting replication plugins</a>."
\ server."
enum:
- "pgoutput"
- "wal2json"
default: "pgoutput"
order: 2
replication_slot:
Expand Down
Binary file not shown.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ dependencies {
implementation libs.airbyte.protocol
implementation project(':airbyte-db:db-lib')

implementation 'io.debezium:debezium-api:1.9.6.Final'
implementation 'io.debezium:debezium-embedded:1.9.6.Final'
implementation 'io.debezium:debezium-connector-sqlserver:1.9.6.Final'
implementation 'io.debezium:debezium-connector-mysql:1.9.6.Final'
implementation files('debezium-connector-postgres-1.9.6.Final.jar')
implementation 'io.debezium:debezium-api:2.1.2.Final'
implementation 'io.debezium:debezium-embedded:2.1.2.Final'
implementation 'io.debezium:debezium-connector-sqlserver:2.1.2.Final'
implementation 'io.debezium:debezium-connector-mysql:2.1.2.Final'
implementation 'io.debezium:debezium-connector-postgres:2.1.2.Final'
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'

testFixturesImplementation project(':airbyte-db:db-lib')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
import io.airbyte.integrations.debezium.internals.DebeziumEventUtils;
Expand Down Expand Up @@ -96,10 +97,12 @@ public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final Confi
final CdcStateHandler cdcStateHandler,
final CdcMetadataInjector cdcMetadataInjector,
final Properties connectorProperties,
final Instant emittedAt) {
final Instant emittedAt,
final boolean addDbNameToState) {
LOGGER.info("Using CDC: {}", true);
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcSavedInfoFetcher.getSavedOffset());
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcSavedInfoFetcher.getSavedOffset(),
addDbNameToState ? Optional.ofNullable(config.get(JdbcUtils.DATABASE_KEY).asText()) : Optional.empty());
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager = schemaHistoryManager(cdcSavedInfoFetcher);
final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(connectorProperties, config, catalog, offsetManager,
schemaHistoryManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.integrations.debezium;

import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.engine.ChangeEvent;

/**
* This interface is used to define the target position at the beginning of the sync so that once we
Expand All @@ -24,16 +23,6 @@ public interface CdcTargetPosition {
*/
boolean reachedTargetPosition(JsonNode valueAsJson);

/**
* Returns a position value (lsn) from a heartbeat event.
*
* @param heartbeatEvent a heartbeat change event
* @return the lsn value in a heartbeat change event or null
*/
default Long getHeartbeatPosition(final ChangeEvent<String, String> heartbeatEvent) {
throw new UnsupportedOperationException();
}

/**
* Checks if a specified lsn has reached the target lsn.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.connect.errors.ConnectException;
Expand All @@ -36,11 +39,14 @@
public class AirbyteFileOffsetBackingStore {

private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteFileOffsetBackingStore.class);

private static final BiFunction<String, String, String> SQL_SERVER_STATE_MUTATION = (key, databaseName) -> key.substring(0, key.length() - 2)
+ ",\"database\":\"" + databaseName + "\"" + key.substring(key.length() - 2);
private final Path offsetFilePath;
private final Optional<String> dbName;

public AirbyteFileOffsetBackingStore(final Path offsetFilePath) {
public AirbyteFileOffsetBackingStore(final Path offsetFilePath, final Optional<String> dbName) {
this.offsetFilePath = offsetFilePath;
this.dbName = dbName;
}

public Path getOffsetFilePath() {
Expand All @@ -59,14 +65,37 @@ public Map<String, String> read() {
public void persist(final JsonNode cdcState) {
final Map<String, String> mapAsString =
cdcState != null ? Jsons.object(cdcState, Map.class) : Collections.emptyMap();
final Map<ByteBuffer, ByteBuffer> mappedAsStrings = mapAsString.entrySet().stream().collect(Collectors.toMap(

final Map<String, String> updatedMap = updateStateForDebezium2_1(mapAsString);

final Map<ByteBuffer, ByteBuffer> mappedAsStrings = updatedMap.entrySet().stream().collect(Collectors.toMap(
e -> stringToByteBuffer(e.getKey()),
e -> stringToByteBuffer(e.getValue())));

FileUtils.deleteQuietly(offsetFilePath.toFile());
save(mappedAsStrings);
}

private Map<String, String> updateStateForDebezium2_1(final Map<String, String> mapAsString) {
final Map<String, String> updatedMap = new LinkedHashMap<>();
if (mapAsString.size() > 0) {
final String key = mapAsString.keySet().stream().toList().get(0);
final int i = key.indexOf('[');
final int i1 = key.lastIndexOf(']');

if (i == 0 && i1 == key.length() - 1) {
// The state is Debezium 2.1 compatible. No need to change anything.
return mapAsString;
}

LOGGER.info("Mutating sate to make it Debezium 2.1 compatible");
final String newKey = dbName.isPresent() ? SQL_SERVER_STATE_MUTATION.apply(key.substring(i, i1 + 1), dbName.get()) : key.substring(i, i1 + 1);
final String value = mapAsString.get(key);
updatedMap.put(newKey, value);
}
return updatedMap;
}

private static String byteBufferToString(final ByteBuffer byteBuffer) {
Preconditions.checkNotNull(byteBuffer);
return new String(byteBuffer.array(), StandardCharsets.UTF_8);
Expand Down Expand Up @@ -127,7 +156,7 @@ private void save(final Map<ByteBuffer, ByteBuffer> data) {
}
}

public static AirbyteFileOffsetBackingStore initializeState(final JsonNode cdcState) {
public static AirbyteFileOffsetBackingStore initializeState(final JsonNode cdcState, final Optional<String> dbName) {
final Path cdcWorkingDir;
try {
cdcWorkingDir = Files.createTempDirectory(Path.of("/tmp"), "cdc-state-offset");
Expand All @@ -136,7 +165,7 @@ public static AirbyteFileOffsetBackingStore initializeState(final JsonNode cdcSt
}
final Path cdcOffsetFilePath = cdcWorkingDir.resolve("offset.dat");

final AirbyteFileOffsetBackingStore offsetManager = new AirbyteFileOffsetBackingStore(cdcOffsetFilePath);
final AirbyteFileOffsetBackingStore offsetManager = new AirbyteFileOffsetBackingStore(cdcOffsetFilePath, dbName);
offsetManager.persist(cdcState);
return offsetManager;
}
Expand All @@ -150,7 +179,7 @@ public static AirbyteFileOffsetBackingStore initializeDummyStateForSnapshotPurpo
}
final Path cdcOffsetFilePath = cdcWorkingDir.resolve("offset.dat");

return new AirbyteFileOffsetBackingStore(cdcOffsetFilePath);
return new AirbyteFileOffsetBackingStore(cdcOffsetFilePath, Optional.empty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.HistoryRecord;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.Charset;
Expand All @@ -19,15 +18,13 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.commons.io.FileUtils;

/**
* The purpose of this class is : to , 1. Read the contents of the file {@link #path} which contains
* the schema history at the end of the sync so that it can be saved in state for future syncs.
* Check {@link #read()} 2. Write the saved content back to the file {@link #path} at the beginning
* of the sync so that debezium can function smoothly. Check persist(Optional&lt;JsonNode&gt;). To
* understand more about file, please refer {@link FilteredFileDatabaseHistory}
* of the sync so that debezium can function smoothly. Check persist(Optional&lt;JsonNode&gt;).
*/
public class AirbyteSchemaHistoryStorage {

Expand All @@ -44,10 +41,6 @@ public Path getPath() {
return path;
}

/**
* This implementation is kind of similar to
* {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)}
*/
public String read() {
final StringBuilder fileAsString = new StringBuilder();
try {
Expand All @@ -65,10 +58,6 @@ public String read() {
}
}

/**
* This implementation is kind of similar to
* {@link io.debezium.relational.history.FileDatabaseHistory#start()}
*/
private void makeSureFileExists() {
try {
// Make sure the file exists ...
Expand Down Expand Up @@ -105,9 +94,6 @@ public void persist(final Optional<JsonNode> schemaHistory) {
}

/**
* This implementation is kind of similar to
* {@link io.debezium.relational.history.FileDatabaseHistory#storeRecord(HistoryRecord)}
*
* @param fileAsString Represents the contents of the file saved in state from previous syncs
*/
private void writeToFile(final String fileAsString) {
Expand Down
Loading

0 comments on commit f6ec876

Please sign in to comment.