diff --git a/airbyte-integrations/bases/debezium-v1-9-6/debezium-connector-postgres-1.9.6.Final.jar b/airbyte-integrations/bases/debezium-v1-9-6/debezium-connector-postgres-1.9.6.Final.jar deleted file mode 100644 index 6cb6edfb62c6..000000000000 Binary files a/airbyte-integrations/bases/debezium-v1-9-6/debezium-connector-postgres-1.9.6.Final.jar and /dev/null differ diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java b/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java deleted file mode 100644 index 481b7dfb5424..000000000000 --- a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium.internals; - -public enum SnapshotMetadata { - TRUE, - FALSE, - LAST -} diff --git a/airbyte-integrations/bases/debezium-v1-9-6/build.gradle b/airbyte-integrations/bases/debezium/build.gradle similarity index 72% rename from airbyte-integrations/bases/debezium-v1-9-6/build.gradle rename to airbyte-integrations/bases/debezium/build.gradle index e33738b17f40..cecf02d108ac 100644 --- a/airbyte-integrations/bases/debezium-v1-9-6/build.gradle +++ b/airbyte-integrations/bases/debezium/build.gradle @@ -9,11 +9,11 @@ dependencies { implementation libs.airbyte.protocol implementation project(':airbyte-db:db-lib') - implementation 'io.debezium:debezium-api:1.9.6.Final' - implementation 'io.debezium:debezium-embedded:1.9.6.Final' - implementation 'io.debezium:debezium-connector-sqlserver:1.9.6.Final' - implementation 'io.debezium:debezium-connector-mysql:1.9.6.Final' - implementation files('debezium-connector-postgres-1.9.6.Final.jar') + implementation 'io.debezium:debezium-api:2.1.2.Final' + implementation 'io.debezium:debezium-embedded:2.1.2.Final' + implementation 'io.debezium:debezium-connector-sqlserver:2.1.2.Final' + implementation 'io.debezium:debezium-connector-mysql:2.1.2.Final' + implementation 'io.debezium:debezium-connector-postgres:2.1.2.Final' implementation 'org.codehaus.plexus:plexus-utils:3.4.2' testFixturesImplementation project(':airbyte-db:db-lib') diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/CdcMetadataInjector.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcMetadataInjector.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/CdcMetadataInjector.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcMetadataInjector.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/CdcSavedInfoFetcher.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcSavedInfoFetcher.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/CdcSavedInfoFetcher.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcSavedInfoFetcher.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/CdcStateHandler.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcStateHandler.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/CdcStateHandler.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcStateHandler.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java similarity index 80% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java index fb66c84c0989..c12e60822e93 100644 --- a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java @@ -5,7 +5,8 @@ package io.airbyte.integrations.debezium; import com.fasterxml.jackson.databind.JsonNode; -import io.debezium.engine.ChangeEvent; + +import java.util.Map; /** * This interface is used to define the target position at the beginning of the sync so that once we @@ -24,16 +25,6 @@ public interface CdcTargetPosition { */ boolean reachedTargetPosition(JsonNode valueAsJson); - /** - * Returns a position value (lsn) from a heartbeat event. - * - * @param heartbeatEvent a heartbeat change event - * @return the lsn value in a heartbeat change event or null - */ - default Long getHeartbeatPosition(final ChangeEvent heartbeatEvent) { - throw new UnsupportedOperationException(); - } - /** * Checks if a specified lsn has reached the target lsn. * @@ -52,5 +43,12 @@ default boolean reachedTargetPosition(final Long lsn) { default boolean isHeartbeatSupported() { return false; } + /** + * Returns a position value from a heartbeat event offset. + * + * @param sourceOffset source offset params from heartbeat change event + * @return the hearbeat position in a heartbeat change event or null + */ + Object getHeartbeatPositon(Map sourceOffset); } diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java similarity index 89% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java index f33d80fd0252..dde469c992ed 100644 --- a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java @@ -17,6 +17,7 @@ import java.nio.file.Path; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; @@ -59,7 +60,10 @@ public Map read() { public void persist(final JsonNode cdcState) { final Map mapAsString = cdcState != null ? Jsons.object(cdcState, Map.class) : Collections.emptyMap(); - final Map mappedAsStrings = mapAsString.entrySet().stream().collect(Collectors.toMap( + + final Map updatedMap = updateStateForDebezium2_1(mapAsString); + + final Map mappedAsStrings = updatedMap.entrySet().stream().collect(Collectors.toMap( e -> stringToByteBuffer(e.getKey()), e -> stringToByteBuffer(e.getValue()))); @@ -67,6 +71,19 @@ public void persist(final JsonNode cdcState) { save(mappedAsStrings); } + private Map updateStateForDebezium2_1(final Map mapAsString) { + final Map updatedMap = new LinkedHashMap<>(); + if (mapAsString.size() > 0) { + String key = mapAsString.keySet().stream().toList().get(0); + final int i = key.indexOf('['); + final int i1 = key.lastIndexOf(']'); + final String newKey = key.substring(i, i1 + 1); + final String value = mapAsString.get(key); + updatedMap.put(newKey, value); + } + return updatedMap; + } + private static String byteBufferToString(final ByteBuffer byteBuffer) { Preconditions.checkNotNull(byteBuffer); return new String(byteBuffer.array(), StandardCharsets.UTF_8); diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/CustomMySQLTinyIntOneToBooleanConverter.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/CustomMySQLTinyIntOneToBooleanConverter.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/CustomMySQLTinyIntOneToBooleanConverter.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/CustomMySQLTinyIntOneToBooleanConverter.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumEventUtils.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumEventUtils.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumEventUtils.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumEventUtils.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumPropertiesManager.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumPropertiesManager.java similarity index 87% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumPropertiesManager.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumPropertiesManager.java index 2771d2964585..4179acb67fca 100644 --- a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumPropertiesManager.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumPropertiesManager.java @@ -46,7 +46,7 @@ protected Properties getDebeziumProperties() { props.putAll(properties); // debezium engine configuration - props.setProperty("name", "engine"); + // https://debezium.io/documentation/reference/stable/development/engine.html#engine-properties props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore"); props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString()); props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer @@ -55,21 +55,20 @@ protected Properties getDebeziumProperties() { props.setProperty("max.queue.size", "8192"); if (schemaHistoryManager.isPresent()) { - // https://debezium.io/documentation/reference/1.9/operations/debezium-server.html#debezium-source-database-history-class + // https://debezium.io/documentation/reference/stable/operations/debezium-server.html#debezium-source-database-history-class // https://debezium.io/documentation/reference/development/engine.html#_in_the_code // As mentioned in the documents above, debezium connector for MySQL needs to track the schema // changes. If we don't do this, we can't fetch records for the table. - props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory"); - props.setProperty("database.history.file.filename", schemaHistoryManager.get().getPath().toString()); + props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory"); + props.setProperty("schema.history.internal.file.filename", schemaHistoryManager.get().getPath().toString()); } - // https://debezium.io/documentation/reference/configuration/avro.html + // https://debezium.io/documentation/reference/stable/configuration/avro.html props.setProperty("key.converter.schemas.enable", "false"); props.setProperty("value.converter.schemas.enable", "false"); // debezium names props.setProperty("name", config.get(JdbcUtils.DATABASE_KEY).asText()); - props.setProperty("database.server.name", config.get(JdbcUtils.DATABASE_KEY).asText()); // db connection configuration props.setProperty("database.hostname", config.get(JdbcUtils.HOST_KEY).asText()); @@ -84,10 +83,14 @@ protected Properties getDebeziumProperties() { // By default "decimal.handing.mode=precise" which's caused returning this value as a binary. // The "double" type may cause a loss of precision, so set Debezium's config to store it as a String // explicitly in its Kafka messages for more details see: - // https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-decimal-types + // https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-decimal-types // https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation props.setProperty("decimal.handling.mode", "string"); + // WARNING : Never change the value of this otherwise all the connectors would start syncing from + // scratch + props.setProperty("topic.prefix", config.get(JdbcUtils.DATABASE_KEY).asText()); + // table selection props.setProperty("table.include.list", getTableIncludelist(catalog)); // column selection diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java similarity index 71% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java index 4be1f42b3f6b..359105356587 100644 --- a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.debezium.internals; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.AbstractIterator; import io.airbyte.commons.concurrency.VoidCallable; import io.airbyte.commons.json.Jsons; @@ -12,11 +13,16 @@ import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.integrations.debezium.CdcTargetPosition; import io.debezium.engine.ChangeEvent; +import java.lang.reflect.Field; import java.time.Duration; import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +44,7 @@ public class DebeziumRecordIterator extends AbstractIterator, Field> heartbeatEventSourceField; private final LinkedBlockingQueue> queue; private final CdcTargetPosition targetPosition; private final Supplier publisherStatusSupplier; @@ -46,9 +53,8 @@ public class DebeziumRecordIterator extends AbstractIterator> queue, @@ -61,12 +67,12 @@ public DebeziumRecordIterator(final LinkedBlockingQueue(1); this.receivedFirstRecord = false; this.hasSnapshotFinished = true; - this.signalledClose = false; - tsLastHeartbeat = null; - lastHeartbeatPosition = null; + this.tsLastHeartbeat = null; + this.lastHeartbeatPosition = -1; this.maxInstanceOfNoRecordsFound = 0; } @@ -85,10 +91,6 @@ protected ChangeEvent computeNext() { while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) { final ChangeEvent next; - // #18987: waitTime is still required with heartbeats for backward - // compatibility with connectors not implementing heartbeat - // yet (MySql, MSSql), And also due to postgres taking a long time - // initially staying on "searching for WAL resume position" final Duration waitTime = receivedFirstRecord ? SUBSEQUENT_RECORD_WAIT_TIME : this.firstRecordWaitTime; try { next = queue.poll(waitTime.getSeconds(), TimeUnit.SECONDS); @@ -98,11 +100,8 @@ protected ChangeEvent computeNext() { // if within the timeout, the consumer could not get a record, it is time to tell the producer to // shutdown. - // #18987: Noticed in testing that it's possible for DBZ to be stuck "Searching for WAL resume - // position" - // when no changes exist. In that case queue will pop after timeout with null value for next if (next == null) { - if ((!receivedFirstRecord || hasSnapshotFinished || maxInstanceOfNoRecordsFound >= 10) && !signalledClose) { + if (!receivedFirstRecord || hasSnapshotFinished || maxInstanceOfNoRecordsFound >= 10) { LOGGER.info("No records were returned by Debezium in the timeout seconds {}, closing the engine and iterator", waitTime.getSeconds()); requestClose(); } @@ -111,37 +110,36 @@ protected ChangeEvent computeNext() { continue; } - if (targetPosition.isHeartbeatSupported()) { - // check if heartbeat and read hearbeat position - LOGGER.debug("checking heartbeat lsn for: {}", next); - final Long heartbeatPos = targetPosition.getHeartbeatPosition(next); - if (heartbeatPos != null) { - // wrap up sync if heartbeat position crossed the target OR heartbeat position hasn't changed for - // too long - if (targetPosition.reachedTargetPosition(heartbeatPos) - || (heartbeatPos.equals(this.lastHeartbeatPosition) && heartbeatPosNotChanging()) && !signalledClose) { - LOGGER.info("Closing: Heartbeat indicates sync is done"); - requestClose(); - } - if (!heartbeatPos.equals(this.lastHeartbeatPosition)) { - this.tsLastHeartbeat = LocalDateTime.now(); - this.lastHeartbeatPosition = heartbeatPos; - } + if (isHeartbeatEvent(next)) { + if (!hasSnapshotFinished) { continue; } + + final long heartbeatPos = getHeartbeatPosition(next); + // wrap up sync if heartbeat position crossed the target OR heartbeat position hasn't changed for + // too long + if (targetPosition.reachedTargetPosition(heartbeatPos) + || (heartbeatPos == this.lastHeartbeatPosition && heartbeatPosNotChanging())) { + LOGGER.info("Closing: Heartbeat indicates sync is done"); + requestClose(); + } + if (heartbeatPos != this.lastHeartbeatPosition) { + this.tsLastHeartbeat = LocalDateTime.now(); + this.lastHeartbeatPosition = heartbeatPos; + } + continue; } final JsonNode eventAsJson = Jsons.deserialize(next.value()); hasSnapshotFinished = hasSnapshotFinished(eventAsJson); // if the last record matches the target file position, it is time to tell the producer to shutdown. - - if (!signalledClose && shouldSignalClose(eventAsJson)) { + if (targetPosition.reachedTargetPosition(eventAsJson)) { LOGGER.info("Closing: Change event reached target position"); requestClose(); } this.tsLastHeartbeat = null; - this.lastHeartbeatPosition = null; + this.lastHeartbeatPosition = -1L; this.receivedFirstRecord = true; this.maxInstanceOfNoRecordsFound = 0; return next; @@ -149,18 +147,6 @@ protected ChangeEvent computeNext() { return endOfData(); } - private boolean heartbeatPosNotChanging() { - final Duration tbt = Duration.between(this.tsLastHeartbeat, LocalDateTime.now()); - LOGGER.debug("Time since last hb_pos change {}s", tbt.toSeconds()); - // wait time for no change in heartbeat position is half of initial waitTime - return tbt.compareTo(this.firstRecordWaitTime.dividedBy(2)) > 0; - } - - private boolean hasSnapshotFinished(final JsonNode eventAsJson) { - final SnapshotMetadata snapshot = SnapshotMetadata.valueOf(eventAsJson.get("source").get("snapshot").asText().toUpperCase()); - return SnapshotMetadata.TRUE != snapshot; - } - /** * Debezium was built as an ever running process which keeps on listening for new changes on DB and * immediately processing them. Airbyte needs debezium to work as a start stop mechanism. In order @@ -183,14 +169,25 @@ public void close() throws Exception { requestClose(); } - private boolean shouldSignalClose(final JsonNode eventAsJson) { - return targetPosition.reachedTargetPosition(eventAsJson); + private boolean isHeartbeatEvent(final ChangeEvent event) { + return targetPosition.isHeartbeatSupported() && Objects.nonNull(event) && !event.value().contains("source"); + } + + private boolean heartbeatPosNotChanging() { + final Duration tbt = Duration.between(this.tsLastHeartbeat, LocalDateTime.now()); + LOGGER.debug("Time since last hb_pos change {}s", tbt.toSeconds()); + // wait time for no change in heartbeat position is half of initial waitTime + return tbt.compareTo(this.firstRecordWaitTime.dividedBy(2)) > 0; + } + + private boolean hasSnapshotFinished(final JsonNode eventAsJson) { + final SnapshotMetadata snapshotMetadata = SnapshotMetadata.fromString(eventAsJson.get("source").get("snapshot").asText()); + return !SnapshotMetadata.isSnapshotEventMetadata(snapshotMetadata); } private void requestClose() { try { requestClose.call(); - signalledClose = true; } catch (final Exception e) { throw new RuntimeException(e); } @@ -203,4 +200,35 @@ private void throwExceptionIfSnapshotNotFinished() { } } + @VisibleForTesting + protected Long getHeartbeatPosition(final ChangeEvent heartbeatEvent) { + try { + if (heartbeatEvent == null) { + return null; + } + + final Class eventClass = heartbeatEvent.getClass(); + final Field f; + if (heartbeatEventSourceField.containsKey(eventClass)) { + f = heartbeatEventSourceField.get(eventClass); + } else { + f = eventClass.getDeclaredField("sourceRecord"); + f.setAccessible(true); + heartbeatEventSourceField.put(eventClass, f); + + if (heartbeatEventSourceField.size() > 1) { + LOGGER.warn("Field Cache size growing beyond expected size of 1, size is " + heartbeatEventSourceField.size()); + } + } + + final SourceRecord sr = (SourceRecord) f.get(heartbeatEvent); + final long hbPosition = (long) targetPosition.getHeartbeatPositon(sr.sourceOffset()); + LOGGER.debug("Found heartbeat position: {}", hbPosition); + return hbPosition; + } catch (final NoSuchFieldException | IllegalAccessException e) { + LOGGER.info("failed to get heartbeat position"); + throw new RuntimeException(e); + } + } + } diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java similarity index 99% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java index 344895524e8f..3e7946432e91 100644 --- a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java @@ -71,6 +71,7 @@ public void start(final BlockingQueue> queue) { }) .using((success, message, error) -> { LOGGER.info("Debezium engine shutdown."); + LOGGER.info(message); thrownError.set(error); engineLatch.countDown(); }) diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/FirstRecordWaitTimeUtil.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/FirstRecordWaitTimeUtil.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/FirstRecordWaitTimeUtil.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/FirstRecordWaitTimeUtil.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/MSSQLConverter.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MSSQLConverter.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/MSSQLConverter.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MSSQLConverter.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresCustomLoader.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/PostgresCustomLoader.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresCustomLoader.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/PostgresCustomLoader.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresDebeziumStateUtil.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/PostgresDebeziumStateUtil.java similarity index 93% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresDebeziumStateUtil.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/PostgresDebeziumStateUtil.java index b7f544f7e3ba..90e88a021e7f 100644 --- a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresDebeziumStateUtil.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/PostgresDebeziumStateUtil.java @@ -6,6 +6,7 @@ import static io.debezium.connector.postgresql.PostgresOffsetContext.LAST_COMMIT_LSN_KEY; import static io.debezium.connector.postgresql.SourceInfo.LSN_KEY; +import static io.debezium.relational.RelationalDatabaseConnectorConfig.DATABASE_NAME; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; @@ -27,6 +28,7 @@ import java.util.Properties; import java.util.Set; import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.storage.FileOffsetBackingStore; @@ -156,20 +158,19 @@ private OptionalLong parseSavedOffset(final Properties properties) { fileOffsetBackingStore.configure(new StandaloneConfig(propertiesMap)); fileOffsetBackingStore.start(); + final Map internalConverterConfig = Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"); final JsonConverter keyConverter = new JsonConverter(); - keyConverter.configure(Configuration.from(properties).subset("internal.key.converter" + ".", true).asMap(), true); + keyConverter.configure(internalConverterConfig, true); final JsonConverter valueConverter = new JsonConverter(); - // Make sure that the JSON converter is configured to NOT enable schemas ... - final Configuration valueConverterConfig = Configuration.from(properties).edit().with("internal.value.converter" + ".schemas.enable", false) - .build(); - valueConverter.configure(valueConverterConfig.subset("internal.value.converter" + ".", true).asMap(), false); + valueConverter.configure(internalConverterConfig, false); - offsetStorageReader = new OffsetStorageReaderImpl(fileOffsetBackingStore, properties.getProperty("name"), keyConverter, - valueConverter); final PostgresConnectorConfig postgresConnectorConfig = new PostgresConnectorConfig(Configuration.from(properties)); final PostgresCustomLoader loader = new PostgresCustomLoader(postgresConnectorConfig); + final Set partitions = + Collections.singleton(new PostgresPartition(postgresConnectorConfig.getLogicalName(), properties.getProperty(DATABASE_NAME.name()))); + offsetStorageReader = new OffsetStorageReaderImpl(fileOffsetBackingStore, properties.getProperty("name"), keyConverter, + valueConverter); final OffsetReader offsetReader = new OffsetReader<>(offsetStorageReader, loader); - final Set partitions = Collections.singleton(new PostgresPartition(postgresConnectorConfig.getLogicalName())); final Map offsets = offsetReader.offsets(partitions); return extractLsn(partitions, offsets, loader); diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresReplicationConnection.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/PostgresReplicationConnection.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/PostgresReplicationConnection.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/PostgresReplicationConnection.java diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java new file mode 100644 index 000000000000..90bbcabcc032 --- /dev/null +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import com.google.common.collect.ImmutableSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public enum SnapshotMetadata { + + FIRST, + FIRST_IN_DATA_COLLECTION, + LAST_IN_DATA_COLLECTION, + TRUE, + LAST, + FALSE; + + private static final Set ENTRIES_OF_SNAPSHOT_EVENTS = + ImmutableSet.of(TRUE, FIRST, FIRST_IN_DATA_COLLECTION, LAST_IN_DATA_COLLECTION); + private static final Map STRING_TO_ENUM; + static { + STRING_TO_ENUM = new HashMap<>(12); + STRING_TO_ENUM.put("true", TRUE); + STRING_TO_ENUM.put("TRUE", TRUE); + STRING_TO_ENUM.put("false", FALSE); + STRING_TO_ENUM.put("FALSE", FALSE); + STRING_TO_ENUM.put("last", LAST); + STRING_TO_ENUM.put("LAST", LAST); + STRING_TO_ENUM.put("first", FIRST); + STRING_TO_ENUM.put("FIRST", FIRST); + STRING_TO_ENUM.put("last_in_data_collection", LAST_IN_DATA_COLLECTION); + STRING_TO_ENUM.put("LAST_IN_DATA_COLLECTION", LAST_IN_DATA_COLLECTION); + STRING_TO_ENUM.put("first_in_data_collection", FIRST_IN_DATA_COLLECTION); + STRING_TO_ENUM.put("FIRST_IN_DATA_COLLECTION", FIRST_IN_DATA_COLLECTION); + } + + public static SnapshotMetadata fromString(final String value) { + if (STRING_TO_ENUM.containsKey(value)) { + return STRING_TO_ENUM.get(value); + } + throw new RuntimeException("ENUM value not found for " + value); + } + + public static boolean isSnapshotEventMetadata(final SnapshotMetadata snapshotMetadata) { + return ENTRIES_OF_SNAPSHOT_EVENTS.contains(snapshotMetadata); + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandlerTest.java b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandlerTest.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandlerTest.java rename to airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandlerTest.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/AirbyteFileOffsetBackingStoreTest.java b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/AirbyteFileOffsetBackingStoreTest.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/AirbyteFileOffsetBackingStoreTest.java rename to airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/AirbyteFileOffsetBackingStoreTest.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/DebeziumEventUtilsTest.java b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/DebeziumEventUtilsTest.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/DebeziumEventUtilsTest.java rename to airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/DebeziumEventUtilsTest.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java rename to airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtilsTest.java b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtilsTest.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtilsTest.java rename to airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtilsTest.java diff --git a/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIteratorTest.java b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIteratorTest.java new file mode 100644 index 000000000000..e45918e6ca23 --- /dev/null +++ b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIteratorTest.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.mock; + +import io.airbyte.integrations.debezium.CdcTargetPosition; +import io.debezium.engine.ChangeEvent; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.Test; + +public class DebeziumRecordIteratorTest { + + @Test + public void getHeartbeatPositionTest() { + final DebeziumRecordIterator debeziumRecordIterator = new DebeziumRecordIterator(mock(LinkedBlockingQueue.class), + mock(CdcTargetPosition.class), + () -> false, + () -> {}, + Duration.ZERO); + final Long lsn = debeziumRecordIterator.getHeartbeatPosition(new ChangeEvent() { + + private final SourceRecord sourceRecord = new SourceRecord(null, Collections.singletonMap("lsn", 358824993496L), null, null, null); + + @Override + public String key() { + return null; + } + + @Override + public String value() { + return "{\"ts_ms\":1667616934701}"; + } + + @Override + public String destination() { + return null; + } + + public SourceRecord sourceRecord() { + return sourceRecord; + } + + }); + + assertEquals(lsn, 358824993496L); + assertNull(debeziumRecordIterator.getHeartbeatPosition(null)); + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/internals/FirstRecordWaitTimeUtilTest.java b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/FirstRecordWaitTimeUtilTest.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/internals/FirstRecordWaitTimeUtilTest.java rename to airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/FirstRecordWaitTimeUtilTest.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/internals/PostgresDebeziumStateUtilTest.java b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/PostgresDebeziumStateUtilTest.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/internals/PostgresDebeziumStateUtilTest.java rename to airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/PostgresDebeziumStateUtilTest.java diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/test/resources/delete_change_event.json b/airbyte-integrations/bases/debezium/src/test/resources/delete_change_event.json similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/test/resources/delete_change_event.json rename to airbyte-integrations/bases/debezium/src/test/resources/delete_change_event.json diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/test/resources/delete_message.json b/airbyte-integrations/bases/debezium/src/test/resources/delete_message.json similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/test/resources/delete_message.json rename to airbyte-integrations/bases/debezium/src/test/resources/delete_message.json diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/test/resources/insert_change_event.json b/airbyte-integrations/bases/debezium/src/test/resources/insert_change_event.json similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/test/resources/insert_change_event.json rename to airbyte-integrations/bases/debezium/src/test/resources/insert_change_event.json diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/test/resources/insert_message.json b/airbyte-integrations/bases/debezium/src/test/resources/insert_message.json similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/test/resources/insert_message.json rename to airbyte-integrations/bases/debezium/src/test/resources/insert_message.json diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/test/resources/test_debezium_offset.dat b/airbyte-integrations/bases/debezium/src/test/resources/test_debezium_offset.dat similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/test/resources/test_debezium_offset.dat rename to airbyte-integrations/bases/debezium/src/test/resources/test_debezium_offset.dat diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/test/resources/update_change_event.json b/airbyte-integrations/bases/debezium/src/test/resources/update_change_event.json similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/test/resources/update_change_event.json rename to airbyte-integrations/bases/debezium/src/test/resources/update_change_event.json diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/test/resources/update_message.json b/airbyte-integrations/bases/debezium/src/test/resources/update_message.json similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/test/resources/update_message.json rename to airbyte-integrations/bases/debezium/src/test/resources/update_message.json diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java b/airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java similarity index 100% rename from airbyte-integrations/bases/debezium-v1-9-6/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java rename to airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java diff --git a/airbyte-integrations/connectors/source-mssql/build.gradle b/airbyte-integrations/connectors/source-mssql/build.gradle index 601c5306023b..03e10c0aad72 100644 --- a/airbyte-integrations/connectors/source-mssql/build.gradle +++ b/airbyte-integrations/connectors/source-mssql/build.gradle @@ -16,16 +16,16 @@ dependencies { implementation project(':airbyte-db:db-lib') implementation project(':airbyte-integrations:bases:base-java') - implementation project(':airbyte-integrations:bases:debezium-v1-9-6') + implementation project(':airbyte-integrations:bases:debezium') implementation libs.airbyte.protocol implementation project(':airbyte-integrations:connectors:source-jdbc') implementation project(':airbyte-integrations:connectors:source-relational-db') - implementation 'io.debezium:debezium-connector-sqlserver:1.9.6.Final' + implementation 'io.debezium:debezium-connector-sqlserver:2.1.2.Final' implementation 'com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre14' implementation 'org.codehaus.plexus:plexus-utils:3.4.2' - testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-6')) + testImplementation testFixtures(project(':airbyte-integrations:bases:debezium')) testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) testImplementation 'org.apache.commons:commons-lang3:3.11' diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java index 8d720cc2e3ee..63ab1d2b891e 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java @@ -36,7 +36,7 @@ public enum ReplicationMethod { * The default "SNAPSHOT" mode can prevent other (non-Airbyte) transactions from updating table rows * while we snapshot. References: * https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver15 - * https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-isolation-mode + * https://debezium.io/documentation/reference/stable/connectors/sqlserver.html#sqlserver-property-snapshot-isolation-mode */ public enum SnapshotIsolation { @@ -66,7 +66,7 @@ public static SnapshotIsolation from(final String jsonValue) { } - // https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-mode + // https://debezium.io/documentation/reference/stable/connectors/sqlserver.html#sqlserver-property-snapshot-mode public enum DataToSync { EXISTING_AND_NEW("Existing and New", "initial"), @@ -140,9 +140,9 @@ static Properties getDebeziumProperties(final JsonNode config, final ConfiguredA final Properties props = new Properties(); props.setProperty("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector"); - // https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-include-schema-changes + // https://debezium.io/documentation/reference/stable/connectors/sqlserver.html#sqlserver-property-include-schema-changes props.setProperty("include.schema.changes", "false"); - // https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-provide-transaction-metadata + // https://debezium.io/documentation/reference/stable/connectors/sqlserver.html#sqlserver-property-provide-transaction-metadata props.setProperty("provide.transaction.metadata", "false"); props.setProperty("converters", "mssql_converter"); diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java index a7eef6a4210d..c6d01be92ce1 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java @@ -29,9 +29,9 @@ public MssqlCdcTargetPosition(final Lsn targetLsn) { @Override public boolean reachedTargetPosition(final JsonNode valueAsJson) { - final SnapshotMetadata snapshotMetadata = SnapshotMetadata.valueOf(valueAsJson.get("source").get("snapshot").asText().toUpperCase()); + final SnapshotMetadata snapshotMetadata = SnapshotMetadata.fromString(valueAsJson.get("source").get("snapshot").asText()); - if (SnapshotMetadata.TRUE == snapshotMetadata) { + if (SnapshotMetadata.isSnapshotEventMetadata(snapshotMetadata)) { return false; } else if (SnapshotMetadata.LAST == snapshotMetadata) { LOGGER.info("Signalling close because Snapshot is complete"); diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 7d6ab67ea010..189ad748b4f7 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -14,7 +14,7 @@ application { dependencies { implementation project(':airbyte-db:db-lib') implementation project(':airbyte-integrations:bases:base-java') - implementation project(':airbyte-integrations:bases:debezium-v1-9-6') + implementation project(':airbyte-integrations:bases:debezium') implementation project(':airbyte-integrations:connectors:source-jdbc') implementation libs.airbyte.protocol implementation project(':airbyte-integrations:connectors:source-relational-db') @@ -22,7 +22,7 @@ dependencies { implementation 'mysql:mysql-connector-java:8.0.30' implementation 'org.apache.commons:commons-lang3:3.11' - testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-6')) + testImplementation testFixtures(project(':airbyte-integrations:bases:debezium')) testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) testImplementation 'org.apache.commons:commons-lang3:3.11' testImplementation 'org.hamcrest:hamcrest-all:1.3' diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java index 7f5f8af43fa0..e9ef810a7a1d 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java @@ -16,10 +16,16 @@ import io.airbyte.integrations.source.jdbc.AbstractJdbcSource.SslMode; import java.net.URI; import java.nio.file.Path; +import java.time.Duration; import java.util.Properties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MySqlCdcProperties { + private static final Logger LOGGER = LoggerFactory.getLogger(MySqlCdcProperties.class); + private static final int HEARTBEAT_FREQUENCY_SEC = 10; + static Properties getDebeziumProperties(final JdbcDatabase database) { final JsonNode sourceConfig = database.getSourceConfig(); final Properties props = commonProperties(database); @@ -29,7 +35,7 @@ static Properties getDebeziumProperties(final JdbcDatabase database) { // initial snapshot props.setProperty("snapshot.mode", sourceConfig.get("snapshot_mode").asText()); } else { - // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-mode + // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-snapshot-mode props.setProperty("snapshot.mode", "when_needed"); } @@ -43,8 +49,9 @@ private static Properties commonProperties(final JdbcDatabase database) { // debezium engine configuration props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector"); - // https://debezium.io/documentation/reference/connectors/mysql.html#mysql-boolean-values - // https://debezium.io/documentation/reference/1.9/development/converters.html + props.setProperty("database.server.id", String.valueOf(generateServerID())); + // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-boolean-values + // https://debezium.io/documentation/reference/stable/development/converters.html /** * {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter} * {@link MySQLConverter} @@ -52,6 +59,7 @@ private static Properties commonProperties(final JdbcDatabase database) { props.setProperty("converters", "boolean, datetime"); props.setProperty("boolean.type", "io.airbyte.integrations.debezium.internals.CustomMySQLTinyIntOneToBooleanConverter"); props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter"); + props.setProperty("heartbeat.interval.ms", Long.toString(Duration.ofSeconds(HEARTBEAT_FREQUENCY_SEC).toMillis())); // For CDC mode, the user cannot provide timezone arguments as JDBC parameters - they are // specifically defined in the replication_method @@ -68,56 +76,37 @@ private static Properties commonProperties(final JdbcDatabase database) { if (!sourceConfig.has(JdbcUtils.SSL_KEY) || sourceConfig.get(JdbcUtils.SSL_KEY).asBoolean()) { if (dbConfig.has(SSL_MODE) && !dbConfig.get(SSL_MODE).asText().isEmpty()) { props.setProperty("database.ssl.mode", MySqlSource.toSslJdbcParamInternal(SslMode.valueOf(dbConfig.get(SSL_MODE).asText()))); - props.setProperty("database.history.producer.security.protocol", "SSL"); - props.setProperty("database.history.consumer.security.protocol", "SSL"); if (dbConfig.has(TRUST_KEY_STORE_URL) && !dbConfig.get(TRUST_KEY_STORE_URL).asText().isEmpty()) { props.setProperty("database.ssl.truststore", Path.of(URI.create(dbConfig.get(TRUST_KEY_STORE_URL).asText())).toString()); - props.setProperty("database.history.producer.ssl.truststore.location", - Path.of(URI.create(dbConfig.get(TRUST_KEY_STORE_URL).asText())).toString()); - props.setProperty("database.history.consumer.ssl.truststore.location", - Path.of(URI.create(dbConfig.get(TRUST_KEY_STORE_URL).asText())).toString()); - props.setProperty("database.history.producer.ssl.truststore.type", "PKCS12"); - props.setProperty("database.history.consumer.ssl.truststore.type", "PKCS12"); - } + if (dbConfig.has(TRUST_KEY_STORE_PASS) && !dbConfig.get(TRUST_KEY_STORE_PASS).asText().isEmpty()) { props.setProperty("database.ssl.truststore.password", dbConfig.get(TRUST_KEY_STORE_PASS).asText()); - props.setProperty("database.history.producer.ssl.truststore.password", dbConfig.get(TRUST_KEY_STORE_PASS).asText()); - props.setProperty("database.history.consumer.ssl.truststore.password", dbConfig.get(TRUST_KEY_STORE_PASS).asText()); - props.setProperty("database.history.producer.ssl.key.password", dbConfig.get(TRUST_KEY_STORE_PASS).asText()); - props.setProperty("database.history.consumer.ssl.key.password", dbConfig.get(TRUST_KEY_STORE_PASS).asText()); - } + if (dbConfig.has(CLIENT_KEY_STORE_URL) && !dbConfig.get(CLIENT_KEY_STORE_URL).asText().isEmpty()) { props.setProperty("database.ssl.keystore", Path.of(URI.create(dbConfig.get(CLIENT_KEY_STORE_URL).asText())).toString()); - props.setProperty("database.history.producer.ssl.keystore.location", - Path.of(URI.create(dbConfig.get(CLIENT_KEY_STORE_URL).asText())).toString()); - props.setProperty("database.history.consumer.ssl.keystore.location", - Path.of(URI.create(dbConfig.get(CLIENT_KEY_STORE_URL).asText())).toString()); - props.setProperty("database.history.producer.ssl.keystore.type", "PKCS12"); - props.setProperty("database.history.consumer.ssl.keystore.type", "PKCS12"); - } + if (dbConfig.has(CLIENT_KEY_STORE_PASS) && !dbConfig.get(CLIENT_KEY_STORE_PASS).asText().isEmpty()) { props.setProperty("database.ssl.keystore.password", dbConfig.get(CLIENT_KEY_STORE_PASS).asText()); - props.setProperty("database.history.producer.ssl.keystore.password", dbConfig.get(CLIENT_KEY_STORE_PASS).asText()); - props.setProperty("database.history.consumer.ssl.keystore.password", dbConfig.get(CLIENT_KEY_STORE_PASS).asText()); } + } else { props.setProperty("database.ssl.mode", "required"); } } - // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-locking-mode + // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-snapshot-locking-mode // This is to make sure other database clients are allowed to write to a table while Airbyte is // taking a snapshot. There is a risk involved that // if any database client makes a schema change then the sync might break props.setProperty("snapshot.locking.mode", "none"); - // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-include-schema-changes + // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-include-schema-changes props.setProperty("include.schema.changes", "false"); // This to make sure that binary data represented as a base64-encoded String. - // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-binary-handling-mode + // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-binary-handling-mode props.setProperty("binary.handling.mode", "base64"); props.setProperty("database.include.list", sourceConfig.get("database").asText()); @@ -130,4 +119,13 @@ static Properties getSnapshotProperties(final JdbcDatabase database) { return props; } + private static int generateServerID() { + int min = 5400; + int max = 6400; + + int serverId = (int) Math.floor(Math.random() * (max - min + 1) + min); + LOGGER.info("Randomly generated Server ID : " + serverId); + return serverId; + } + } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java index 2eaa0334247c..4f391ff12921 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java @@ -10,6 +10,7 @@ import io.airbyte.integrations.debezium.internals.SnapshotMetadata; import java.sql.SQLException; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.stream.Stream; import org.slf4j.Logger; @@ -68,9 +69,8 @@ public static MySqlCdcTargetPosition targetPosition(final JdbcDatabase database) @Override public boolean reachedTargetPosition(final JsonNode valueAsJson) { final String eventFileName = valueAsJson.get("source").get("file").asText(); - final SnapshotMetadata snapshotMetadata = SnapshotMetadata.valueOf(valueAsJson.get("source").get("snapshot").asText().toUpperCase()); - - if (SnapshotMetadata.TRUE == snapshotMetadata) { + final SnapshotMetadata snapshotMetadata = SnapshotMetadata.fromString(valueAsJson.get("source").get("snapshot").asText()); + if (SnapshotMetadata.isSnapshotEventMetadata(snapshotMetadata)) { return false; } else if (SnapshotMetadata.LAST == snapshotMetadata) { LOGGER.info("Signalling close because Snapshot is complete"); @@ -89,4 +89,18 @@ public boolean reachedTargetPosition(final JsonNode valueAsJson) { } + @Override + public boolean isHeartbeatSupported() { + return true; + } + + @Override + public boolean reachedTargetPosition(final Long currentPosition) { + return currentPosition != null && currentPosition.compareTo(Long.valueOf(position)) >= 0; + } + + @Override + public Object getHeartbeatPositon(Map sourceOffset){ + return sourceOffset.get("pos"); + } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSslCaCertificateSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSslCaCertificateSourceAcceptanceTest.java index a2d5eddf5583..125fd259b740 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSslCaCertificateSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSslCaCertificateSourceAcceptanceTest.java @@ -202,4 +202,9 @@ public void testIncrementalSyncShouldNotFailIfBinlogIsDeleted() throws Exception assertEquals(6, filterRecords(runRead(configuredCatalog, latestState)).size()); } + @Override + protected boolean supportsPerStream() { + return true; + } + } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSslRequiredSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSslRequiredSourceAcceptanceTest.java index 496fd47a5ef2..8f0137856b2f 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSslRequiredSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSslRequiredSourceAcceptanceTest.java @@ -200,4 +200,9 @@ public void testIncrementalSyncShouldNotFailIfBinlogIsDeleted() throws Exception assertEquals(6, filterRecords(runRead(configuredCatalog, latestState)).size()); } + @Override + protected boolean supportsPerStream() { + return true; + } + } diff --git a/airbyte-integrations/connectors/source-postgres/acceptance-test-config.yml b/airbyte-integrations/connectors/source-postgres/acceptance-test-config.yml index 7488bb4785bb..5c89112914f1 100644 --- a/airbyte-integrations/connectors/source-postgres/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-postgres/acceptance-test-config.yml @@ -8,8 +8,12 @@ acceptance_tests: tests: - spec_path: "src/test-integration/resources/expected_spec.json" config_path: "secrets/config.json" + backward_compatibility_tests_config: + disable_for_version: "1.0.48" - spec_path: "src/test-integration/resources/expected_spec.json" config_path: "secrets/config_cdc.json" + backward_compatibility_tests_config: + disable_for_version: "1.0.48" connection: tests: - config_path: "secrets/config.json" diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index 34fb3785c894..8dc9368b6d18 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -13,10 +13,8 @@ application { dependencies { implementation project(':airbyte-db:db-lib') - implementation 'io.debezium:debezium-api:1.9.6.Final' - implementation 'io.debezium:debezium-embedded:1.9.6.Final' implementation project(':airbyte-integrations:bases:base-java') - implementation project(':airbyte-integrations:bases:debezium-v1-9-6') + implementation project(':airbyte-integrations:bases:debezium') implementation libs.airbyte.protocol implementation project(':airbyte-integrations:connectors:source-jdbc') implementation project(':airbyte-integrations:connectors:source-relational-db') @@ -24,7 +22,7 @@ dependencies { implementation 'org.apache.commons:commons-lang3:3.11' implementation libs.postgresql - testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-6')) + testImplementation testFixtures(project(':airbyte-integrations:bases:debezium')) testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) testImplementation project(":airbyte-json-validation") testImplementation project(':airbyte-test-utils') diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java index e7efda08ce78..2d797dc92531 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java @@ -65,48 +65,29 @@ private static Properties commonProperties(final JdbcDatabase database) { // https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-database-sslmode if (!sourceConfig.has(JdbcUtils.SSL_KEY) || sourceConfig.get(JdbcUtils.SSL_KEY).asBoolean()) { if (sourceConfig.has(JdbcUtils.SSL_MODE_KEY) && sourceConfig.get(JdbcUtils.SSL_MODE_KEY).has(JdbcUtils.MODE_KEY)) { + if (dbConfig.has(SSL_MODE) && !dbConfig.get(SSL_MODE).asText().isEmpty()) { LOGGER.debug("sslMode: {}", dbConfig.get(SSL_MODE).asText()); props.setProperty("database.sslmode", PostgresSource.toSslJdbcParamInternal(SslMode.valueOf(dbConfig.get(SSL_MODE).asText()))); - props.setProperty("database.history.producer.security.protocol", "SSL"); - props.setProperty("database.history.consumer.security.protocol", "SSL"); } if (dbConfig.has(PostgresSource.CA_CERTIFICATE_PATH) && !dbConfig.get(PostgresSource.CA_CERTIFICATE_PATH).asText().isEmpty()) { props.setProperty("database.sslrootcert", dbConfig.get(PostgresSource.CA_CERTIFICATE_PATH).asText()); - props.setProperty("database.history.producer.ssl.truststore.location", - dbConfig.get(PostgresSource.CA_CERTIFICATE_PATH).asText()); - props.setProperty("database.history.consumer.ssl.truststore.location", - dbConfig.get(PostgresSource.CA_CERTIFICATE_PATH).asText()); - props.setProperty("database.history.producer.ssl.truststore.type", "PKCS12"); - props.setProperty("database.history.consumer.ssl.truststore.type", "PKCS12"); - } + if (dbConfig.has(TRUST_KEY_STORE_PASS) && !dbConfig.get(TRUST_KEY_STORE_PASS).asText().isEmpty()) { props.setProperty("database.ssl.truststore.password", dbConfig.get(TRUST_KEY_STORE_PASS).asText()); - props.setProperty("database.history.producer.ssl.truststore.password", dbConfig.get(TRUST_KEY_STORE_PASS).asText()); - props.setProperty("database.history.consumer.ssl.truststore.password", dbConfig.get(TRUST_KEY_STORE_PASS).asText()); - props.setProperty("database.history.producer.ssl.key.password", dbConfig.get(TRUST_KEY_STORE_PASS).asText()); - props.setProperty("database.history.consumer.ssl.key.password", dbConfig.get(TRUST_KEY_STORE_PASS).asText()); - } + if (dbConfig.has(CLIENT_KEY_STORE_URL) && !dbConfig.get(CLIENT_KEY_STORE_URL).asText().isEmpty()) { props.setProperty("database.sslkey", Path.of(URI.create(dbConfig.get(CLIENT_KEY_STORE_URL).asText())).toString()); - props.setProperty("database.history.producer.ssl.keystore.location", - Path.of(URI.create(dbConfig.get(CLIENT_KEY_STORE_URL).asText())).toString()); - props.setProperty("database.history.consumer.ssl.keystore.location", - Path.of(URI.create(dbConfig.get(CLIENT_KEY_STORE_URL).asText())).toString()); - props.setProperty("database.history.producer.ssl.keystore.type", "PKCS12"); - props.setProperty("database.history.consumer.ssl.keystore.type", "PKCS12"); - } + if (dbConfig.has(CLIENT_KEY_STORE_PASS) && !dbConfig.get(CLIENT_KEY_STORE_PASS).asText().isEmpty()) { props.setProperty("database.sslpassword", dbConfig.get(CLIENT_KEY_STORE_PASS).asText()); - props.setProperty("database.history.producer.ssl.keystore.password", dbConfig.get(CLIENT_KEY_STORE_PASS).asText()); - props.setProperty("database.history.consumer.ssl.keystore.password", dbConfig.get(CLIENT_KEY_STORE_PASS).asText()); } } else { - props.setProperty("database.ssl.mode", "required"); + props.setProperty("database.sslmode", "required"); } } return props; diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcTargetPosition.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcTargetPosition.java index c240f43412ac..5d0409e1d58d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcTargetPosition.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcTargetPosition.java @@ -11,12 +11,10 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.debezium.CdcTargetPosition; import io.airbyte.integrations.debezium.internals.SnapshotMetadata; -import io.debezium.engine.ChangeEvent; -import java.lang.reflect.Field; import java.sql.SQLException; +import java.util.Map; import java.util.Objects; import java.util.Optional; -import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,9 +54,9 @@ static PostgresCdcTargetPosition targetPosition(final JdbcDatabase database) { @Override public boolean reachedTargetPosition(final JsonNode valueAsJson) { - final SnapshotMetadata snapshotMetadata = SnapshotMetadata.valueOf(valueAsJson.get("source").get("snapshot").asText().toUpperCase()); + final SnapshotMetadata snapshotMetadata = SnapshotMetadata.fromString(valueAsJson.get("source").get("snapshot").asText()); - if (SnapshotMetadata.TRUE == snapshotMetadata) { + if (SnapshotMetadata.isSnapshotEventMetadata(snapshotMetadata)) { return false; } else if (SnapshotMetadata.LAST == snapshotMetadata) { LOGGER.info("Signalling close because Snapshot is complete"); @@ -73,30 +71,14 @@ public boolean reachedTargetPosition(final JsonNode valueAsJson) { } } - private boolean isHeartbeatEvent(final ChangeEvent event) { - return Objects.nonNull(event) && !event.value().contains("source"); - } - @Override - public Long getHeartbeatPosition(final ChangeEvent heartbeatEvent) { - if (isHeartbeatEvent(heartbeatEvent)) { - try { - final Field f = heartbeatEvent.getClass().getDeclaredField("sourceRecord"); - f.setAccessible(true); - final SourceRecord sr = (SourceRecord) f.get(heartbeatEvent); - final Long hbLsn = (Long) sr.sourceOffset().get("lsn"); - LOGGER.debug("Found heartbeat lsn: {}", hbLsn); - return hbLsn; - } catch (final NoSuchFieldException | IllegalAccessException e) { - LOGGER.info("failed to get heartbeat lsn"); - } - } - return null; + public boolean reachedTargetPosition(final Long lsn) { + return lsn != null && lsn.compareTo(targetLsn.asLong()) >= 0; } @Override - public boolean reachedTargetPosition(final Long lsn) { - return (lsn == null) ? false : lsn.compareTo(targetLsn.asLong()) >= 0; + public Object getHeartbeatPositon(Map sourceOffset){ + return sourceOffset.get("lsn"); } private PgLsn extractLsn(final JsonNode valueAsJson) { diff --git a/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json b/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json index 92db4771b994..3f2c05c974b1 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json @@ -246,8 +246,8 @@ "plugin": { "type": "string", "title": "Plugin", - "description": "A logical decoding plugin installed on the PostgreSQL server. The `pgoutput` plugin is used by default. The `wal2json` plugin is deprecated and will soon be removed so it's not recommended to use. Read more about selecting replication plugins.", - "enum": ["pgoutput", "wal2json"], + "description": "A logical decoding plugin installed on the PostgreSQL server.", + "enum": ["pgoutput"], "default": "pgoutput", "order": 2 }, diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractCdcPostgresSourceSslAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractCdcPostgresSourceSslAcceptanceTest.java index 378043c2d9c1..2a96d65d1548 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractCdcPostgresSourceSslAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractCdcPostgresSourceSslAcceptanceTest.java @@ -35,11 +35,6 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc container.start(); certs = getCertificate(container); - /** - * The publication is not being set as part of the config and because of it - * {@link io.airbyte.integrations.source.postgres.PostgresSource#isCdc(JsonNode)} returns false, as - * a result no test in this class runs through the cdc path. - */ final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() .put("method", "CDC") .put("replication_slot", SLOT_NAME_BASE) @@ -70,14 +65,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc SQLDialect.POSTGRES)) { final Database database = new Database(dslContext); - /** - * cdc expects the INCREMENTAL tables to contain primary key checkout - * {@link io.airbyte.integrations.source.postgres.PostgresSource#removeIncrementalWithoutPk(AirbyteStream)} - */ database.query(ctx -> { - ctx.execute("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); + ctx.execute("CREATE TABLE id_and_name(id INTEGER primary key, name VARCHAR(200));"); ctx.execute("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); - ctx.execute("CREATE TABLE starships(id INTEGER, name VARCHAR(200));"); + ctx.execute("CREATE TABLE starships(id INTEGER primary key, name VARCHAR(200));"); ctx.execute("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); ctx.execute("SELECT pg_create_logical_replication_slot('" + SLOT_NAME_BASE + "', 'pgoutput');"); ctx.execute("CREATE PUBLICATION " + PUBLICATION + " FOR ALL TABLES;"); diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java index 1b1288ba352c..30346996657c 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java @@ -293,26 +293,26 @@ protected void initTests() { "08:00:2b:01:02:03:04:07") .build()); - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("money") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues( - "null", - "'999.99'", "'1,001.01'", "'-1,000'", - "'$999.99'", "'$1001.01'", "'-$1,000'" - // max values for Money type: "-92233720368547758.08", "92233720368547758.07" - // Debezium has wrong parsing for values more than 999999999999999 and less than -999999999999999 - // https://github.com/airbytehq/airbyte/issues/7338 - /* "'-92233720368547758.08'", "'92233720368547758.07'" */) - .addExpectedValues( - null, - // Double#toString method is necessary here because sometimes the output - // has unexpected decimals, e.g. Double.toString(-1000) is -1000.0 - "999.99", "1001.01", Double.toString(-1000), - "999.99", "1001.01", Double.toString(-1000) - /* "-92233720368547758.08", "92233720368547758.07" */) - .build()); +// addDataTypeTestData( +// TestDataHolder.builder() +// .sourceType("money") +// .airbyteType(JsonSchemaType.NUMBER) +// .addInsertValues( +// "null", +// "'999.99'", "'1,001.01'", "'-1,000'", +// "'$999.99'", "'$1001.01'", "'-$1,000'" +// // max values for Money type: "-92233720368547758.08", "92233720368547758.07" +// // Debezium has wrong parsing for values more than 999999999999999 and less than -999999999999999 +// // https://github.com/airbytehq/airbyte/issues/7338 +// /* "'-92233720368547758.08'", "'92233720368547758.07'" */) +// .addExpectedValues( +// null, +// // Double#toString method is necessary here because sometimes the output +// // has unexpected decimals, e.g. Double.toString(-1000) is -1000.0 +// "999.99", "1001.01", Double.toString(-1000), +// "999.99", "1001.01", Double.toString(-1000) +// /* "-92233720368547758.08", "92233720368547758.07" */) +// .build()); // Blocked by https://github.com/airbytehq/airbyte/issues/8902 for (final String type : Set.of("numeric", "decimal")) { diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java index b855dce9bd63..37f2dbcc7af4 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java @@ -23,6 +23,7 @@ import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.CatalogHelpers; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; @@ -41,10 +42,6 @@ // todo (cgardens) - Sanity check that when configured for CDC that postgres performs like any other // incremental source. As we have more sources support CDC we will find a more reusable way of doing // this, but for now this is a solid sanity check. -/** - * None of the tests in this class use the cdc path (run the tests and search for `using CDC: false` - * in logs). This is exact same as {@link PostgresSourceAcceptanceTest} - */ public class CdcPostgresSourceAcceptanceTest extends SourceAcceptanceTest { protected static final String SLOT_NAME_BASE = "debezium_slot"; @@ -64,11 +61,6 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc .withCommand("postgres -c config_file=/etc/postgresql/postgresql.conf"); container.start(); - /** - * The publication is not being set as part of the config and because of it - * {@link io.airbyte.integrations.source.postgres.PostgresSource#isCdc(JsonNode)} returns false, as - * a result no test in this class runs through the cdc path. - */ final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() .put("method", "CDC") .put("replication_slot", SLOT_NAME_BASE) @@ -98,14 +90,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc SQLDialect.POSTGRES)) { final Database database = new Database(dslContext); - /** - * cdc expects the INCREMENTAL tables to contain primary key checkout - * {@link io.airbyte.integrations.source.postgres.PostgresSource#removeIncrementalWithoutPk(AirbyteStream)} - */ database.query(ctx -> { - ctx.execute("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); + ctx.execute("CREATE TABLE id_and_name(id INTEGER primary key, name VARCHAR(200));"); ctx.execute("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); - ctx.execute("CREATE TABLE starships(id INTEGER, name VARCHAR(200));"); + ctx.execute("CREATE TABLE starships(id INTEGER primary key, name VARCHAR(200));"); ctx.execute("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); ctx.execute("SELECT pg_create_logical_replication_slot('" + SLOT_NAME_BASE + "', 'pgoutput');"); ctx.execute("CREATE PUBLICATION " + PUBLICATION + " FOR ALL TABLES;"); @@ -147,55 +135,51 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() { return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) - .withCursorField(Lists.newArrayList("id")) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( STREAM_NAME, NAMESPACE, Field.of("id", JsonSchemaType.INTEGER), Field.of("name", JsonSchemaType.STRING)) + .withSourceDefinedCursor(true) + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) - .withCursorField(Lists.newArrayList("id")) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( STREAM_NAME2, NAMESPACE, Field.of("id", JsonSchemaType.INTEGER), Field.of("name", JsonSchemaType.STRING)) + .withSourceDefinedCursor(true) + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); } protected ConfiguredAirbyteCatalog getConfiguredCatalogWithPartialColumns() { - /** - * This catalog config is incorrect for CDC replication. We specify - * withCursorField(Lists.newArrayList("id")) but with CDC customers can't/shouldn't be able to - * specify cursor field for INCREMENTAL tables Take a look at - * {@link io.airbyte.integrations.source.postgres.PostgresSource#setIncrementalToSourceDefined(AirbyteStream)} - * We should also specify the primary keys for INCREMENTAL tables checkout - * {@link io.airbyte.integrations.source.postgres.PostgresSource#removeIncrementalWithoutPk(AirbyteStream)} - */ return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) - .withCursorField(Lists.newArrayList("id")) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( STREAM_NAME, NAMESPACE, Field.of("id", JsonSchemaType.INTEGER) /* no name field */) + .withSourceDefinedCursor(true) + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) - .withCursorField(Lists.newArrayList("name")) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( STREAM_NAME2, NAMESPACE, /* no id field */ Field.of("name", JsonSchemaType.STRING)) + .withSourceDefinedCursor(true) + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); } diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_spec.json b/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_spec.json index 0743b4a4700b..b585f6ca2b1d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_spec.json +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_spec.json @@ -246,8 +246,8 @@ "plugin": { "type": "string", "title": "Plugin", - "description": "A logical decoding plugin installed on the PostgreSQL server. The `pgoutput` plugin is used by default. The `wal2json` plugin is deprecated and will soon be removed so it's not recommended to use. Read more about selecting replication plugins.", - "enum": ["pgoutput", "wal2json"], + "description": "A logical decoding plugin installed on the PostgreSQL server.", + "enum": ["pgoutput"], "default": "pgoutput", "order": 2 }, diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_strict_encrypt_spec.json b/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_strict_encrypt_spec.json index d8d4704cedfc..564c05f7d2f3 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_strict_encrypt_spec.json +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_strict_encrypt_spec.json @@ -226,8 +226,8 @@ "plugin": { "type": "string", "title": "Plugin", - "description": "A logical decoding plugin installed on the PostgreSQL server. The `pgoutput` plugin is used by default. The `wal2json` plugin is deprecated and will soon be removed so it's not recommended to use. Read more about selecting replication plugins.", - "enum": ["pgoutput", "wal2json"], + "description": "A logical decoding plugin installed on the PostgreSQL server.", + "enum": ["pgoutput"], "default": "pgoutput", "order": 2 }, diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourcePgoutputTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourcePgoutputTest.java deleted file mode 100644 index 0426432cc605..000000000000 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourcePgoutputTest.java +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.postgres; - -class CdcPostgresSourcePgoutputTest extends CdcPostgresSourceTest { - - @Override - protected String getPluginName() { - return "pgoutput"; - } - -} diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index a8ba92ad4b95..1baf130aa8a9 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -50,15 +50,11 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.SyncMode; import io.airbyte.test.utils.PostgreSQLContainerHelper; -import io.debezium.engine.ChangeEvent; - import java.sql.SQLException; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Set; import javax.sql.DataSource; -import org.apache.kafka.connect.source.SourceRecord; import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.junit.jupiter.api.AfterEach; @@ -73,7 +69,7 @@ import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; @ExtendWith(SystemStubsExtension.class) -abstract class CdcPostgresSourceTest extends CdcSourceTest { +public class CdcPostgresSourceTest extends CdcSourceTest { @SystemStub private EnvironmentVariables environmentVariables; @@ -92,7 +88,9 @@ abstract class CdcPostgresSourceTest extends CdcSourceTest { private final String cleanUserName = "airbyte_test"; private final String cleanUserPassword = "password"; - protected abstract String getPluginName(); + protected String getPluginName() { + return "pgoutput"; + } @AfterEach void tearDown() { @@ -463,40 +461,6 @@ void testReachedTargetPosition() { assertFalse(ctp.reachedTargetPosition((Long) null)); } - @Test - void testGetHeartbeatPosition() { - final CdcTargetPosition ctp = cdcLatestTargetPosition(); - final PostgresCdcTargetPosition pctp = (PostgresCdcTargetPosition) ctp; - final Long lsn = pctp.getHeartbeatPosition(new ChangeEvent() { - - private final SourceRecord sourceRecord = new SourceRecord(null, Collections.singletonMap("lsn", 358824993496L), null, null, null); - - @Override - public String key() { - return null; - } - - @Override - public String value() { - return "{\"ts_ms\":1667616934701}"; - } - - @Override - public String destination() { - return null; - } - - public SourceRecord sourceRecord() { - return sourceRecord; - } - - }); - - assertEquals(lsn, 358824993496L); - - assertNull(pctp.getHeartbeatPosition(null)); - } - @Test protected void syncShouldIncrementLSN() throws Exception { final int recordsToCreate = 20; diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceWal2jsonTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceWal2jsonTest.java deleted file mode 100644 index 0f750f09c50d..000000000000 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceWal2jsonTest.java +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.postgres; - -class CdcPostgresSourceWal2jsonTest extends CdcPostgresSourceTest { - - @Override - protected String getPluginName() { - return "wal2json"; - } - -} diff --git a/settings.gradle b/settings.gradle index e34004f613fb..8b4654e06999 100644 --- a/settings.gradle +++ b/settings.gradle @@ -107,7 +107,7 @@ if (!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD" include ':airbyte-integrations:bases:s3-destination-base-integration-test' include ':airbyte-integrations:bases:standard-source-test' include ':airbyte-integrations:connector-templates:generator' - include ':airbyte-integrations:bases:debezium-v1-9-6' + include ':airbyte-integrations:bases:debezium' // Needed by normalization integration tests include ':airbyte-integrations:connectors:destination-bigquery'