diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index ffd7787dbac9..39ad858c8c41 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1250,7 +1250,7 @@ - name: MySQL sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad dockerRepository: airbyte/source-mysql - dockerImageTag: 2.0.5 + dockerImageTag: 2.0.6 documentationUrl: https://docs.airbyte.com/integrations/sources/mysql icon: mysql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 412ef860a03e..beab64167c09 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -9339,7 +9339,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mysql:2.0.5" +- dockerImage: "airbyte/source-mysql:2.0.6" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/mysql" connectionSpecification: diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java index 87c8dfadcd48..4dc5929afc4e 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java @@ -27,7 +27,7 @@ * This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants * to use debezium for CDC, it should use this class */ -public class AirbyteDebeziumHandler { +public class AirbyteDebeziumHandler { private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteDebeziumHandler.class); /** @@ -38,12 +38,12 @@ public class AirbyteDebeziumHandler { private static final int QUEUE_CAPACITY = 10000; private final JsonNode config; - private final CdcTargetPosition targetPosition; + private final CdcTargetPosition targetPosition; private final boolean trackSchemaHistory; private final Duration firstRecordWaitTime; public AirbyteDebeziumHandler(final JsonNode config, - final CdcTargetPosition targetPosition, + final CdcTargetPosition targetPosition, final boolean trackSchemaHistory, final Duration firstRecordWaitTime) { this.config = config; @@ -70,7 +70,7 @@ public AutoCloseableIterator getSnapshotIterators( schemaHistoryManager(new EmptySavedInfo())); tableSnapshotPublisher.start(queue); - final AutoCloseableIterator> eventIterator = new DebeziumRecordIterator( + final AutoCloseableIterator> eventIterator = new DebeziumRecordIterator<>( queue, targetPosition, tableSnapshotPublisher::hasClosed, @@ -102,7 +102,7 @@ public AutoCloseableIterator getIncrementalIterators(final Confi publisher.start(queue); // handle state machine around pub/sub logic. - final AutoCloseableIterator> eventIterator = new DebeziumRecordIterator( + final AutoCloseableIterator> eventIterator = new DebeziumRecordIterator<>( queue, targetPosition, publisher::hasClosed, diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java index 62d292a679cb..d55c04a9c474 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.debezium; import com.fasterxml.jackson.databind.JsonNode; +import java.util.Map; /** * This interface is used to define the target position at the beginning of the sync so that once we @@ -13,23 +14,25 @@ * we might end up syncing forever. In order to tackle that, we need to define a point to end at the * beginning of the sync */ -public interface CdcTargetPosition { +public interface CdcTargetPosition { /** - * Reads a position value (lsn) from a change event and compares it to target lsn + * Reads a position value (ex: LSN) from a change event and compares it to target position * * @param valueAsJson json representation of a change event - * @return true if event lsn is equal or greater than targer lsn, or if last snapshot event + * @return true if event position is equal or greater than target position, or if last snapshot + * event */ - boolean reachedTargetPosition(JsonNode valueAsJson); + boolean reachedTargetPosition(final JsonNode valueAsJson); /** - * Checks if a specified lsn has reached the target lsn. + * Reads a position value (lsn) from a change event and compares it to target lsn * - * @param lsn an lsn value - * @return true if lsn is equal or greater than target lsn + * @param positionFromHeartbeat is the position extracted out of a heartbeat event (if the connector + * supports heartbeat) + * @return true if heartbeat position is equal or greater than target position */ - default boolean reachedTargetPosition(final Long lsn) { + default boolean reachedTargetPosition(final T positionFromHeartbeat) { throw new UnsupportedOperationException(); } @@ -42,4 +45,12 @@ default boolean isHeartbeatSupported() { return false; } + /** + * Returns a position value from a heartbeat event offset. + * + * @param sourceOffset source offset params from heartbeat change event + * @return the heartbeat position in a heartbeat change event or null + */ + T extractPositionFromHeartbeatOffset(final Map sourceOffset); + } diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java index 45bb02cac62b..4bb0b43c0e7c 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java @@ -37,7 +37,7 @@ * publisher is not closed. Even after the publisher is closed, the consumer will finish processing * any produced records before closing. */ -public class DebeziumRecordIterator extends AbstractIterator> +public class DebeziumRecordIterator extends AbstractIterator> implements AutoCloseableIterator> { private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordIterator.class); @@ -46,7 +46,7 @@ public class DebeziumRecordIterator extends AbstractIterator, Field> heartbeatEventSourceField; private final LinkedBlockingQueue> queue; - private final CdcTargetPosition targetPosition; + private final CdcTargetPosition targetPosition; private final Supplier publisherStatusSupplier; private final VoidCallable requestClose; private final Duration firstRecordWaitTime; @@ -54,11 +54,11 @@ public class DebeziumRecordIterator extends AbstractIterator> queue, - final CdcTargetPosition targetPosition, + final CdcTargetPosition targetPosition, final Supplier publisherStatusSupplier, final VoidCallable requestClose, final Duration firstRecordWaitTime) { @@ -72,7 +72,7 @@ public DebeziumRecordIterator(final LinkedBlockingQueue computeNext() { continue; } - final long heartbeatPos = getHeartbeatPosition(next); + final T heartbeatPos = getHeartbeatPosition(next); // wrap up sync if heartbeat position crossed the target OR heartbeat position hasn't changed for // too long if (hasSyncFinished(heartbeatPos)) { LOGGER.info("Closing: Heartbeat indicates sync is done"); requestClose(); } - if (heartbeatPos != this.lastHeartbeatPosition) { + if (!heartbeatPos.equals(lastHeartbeatPosition)) { this.tsLastHeartbeat = LocalDateTime.now(); this.lastHeartbeatPosition = heartbeatPos; } @@ -138,7 +138,7 @@ protected ChangeEvent computeNext() { requestClose(); } this.tsLastHeartbeat = null; - this.lastHeartbeatPosition = -1L; + this.lastHeartbeatPosition = null; this.receivedFirstRecord = true; this.maxInstanceOfNoRecordsFound = 0; return next; @@ -146,9 +146,9 @@ protected ChangeEvent computeNext() { return endOfData(); } - private boolean hasSyncFinished(final long heartbeatPos) { + private boolean hasSyncFinished(final T heartbeatPos) { return targetPosition.reachedTargetPosition(heartbeatPos) - || (heartbeatPos == this.lastHeartbeatPosition && heartbeatPosNotChanging()); + || (heartbeatPos.equals(this.lastHeartbeatPosition) && heartbeatPosNotChanging()); } /** @@ -209,10 +209,7 @@ private void throwExceptionIfSnapshotNotFinished() { * reflection to setAccessible for each event */ @VisibleForTesting - protected long getHeartbeatPosition(final ChangeEvent heartbeatEvent) { - if (heartbeatEvent == null) { - return -1; - } + protected T getHeartbeatPosition(final ChangeEvent heartbeatEvent) { try { final Class eventClass = heartbeatEvent.getClass(); @@ -230,11 +227,9 @@ protected long getHeartbeatPosition(final ChangeEvent heartbeatE } final SourceRecord sr = (SourceRecord) f.get(heartbeatEvent); - final long hbLsn = (long) sr.sourceOffset().get("lsn"); - LOGGER.debug("Found heartbeat lsn: {}", hbLsn); - return hbLsn; + return targetPosition.extractPositionFromHeartbeatOffset(sr.sourceOffset()); } catch (final NoSuchFieldException | IllegalAccessException e) { - LOGGER.info("failed to get heartbeat lsn"); + LOGGER.info("failed to get heartbeat source offset"); throw new RuntimeException(e); } } diff --git a/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIteratorTest.java b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIteratorTest.java index ee3279843de1..28db1da6b687 100644 --- a/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIteratorTest.java +++ b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIteratorTest.java @@ -7,10 +7,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; +import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.integrations.debezium.CdcTargetPosition; import io.debezium.engine.ChangeEvent; import java.time.Duration; import java.util.Collections; +import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.Test; @@ -19,8 +21,20 @@ public class DebeziumRecordIteratorTest { @Test public void getHeartbeatPositionTest() { - final DebeziumRecordIterator debeziumRecordIterator = new DebeziumRecordIterator(mock(LinkedBlockingQueue.class), - mock(CdcTargetPosition.class), + final DebeziumRecordIterator debeziumRecordIterator = new DebeziumRecordIterator<>(mock(LinkedBlockingQueue.class), + new CdcTargetPosition<>() { + + @Override + public boolean reachedTargetPosition(JsonNode valueAsJson) { + return false; + } + + @Override + public Long extractPositionFromHeartbeatOffset(final Map sourceOffset) { + return (long) sourceOffset.get("lsn"); + } + + }, () -> false, () -> {}, Duration.ZERO); @@ -50,7 +64,6 @@ public SourceRecord sourceRecord() { }); assertEquals(lsn, 358824993496L); - assertEquals(-1, debeziumRecordIterator.getHeartbeatPosition(null)); } } diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java index c6d01be92ce1..c5801d47e260 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java @@ -14,11 +14,12 @@ import java.io.IOException; import java.sql.SQLException; import java.util.List; +import java.util.Map; import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MssqlCdcTargetPosition implements CdcTargetPosition { +public class MssqlCdcTargetPosition implements CdcTargetPosition { private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcTargetPosition.class); public final Lsn targetLsn; @@ -46,6 +47,11 @@ public boolean reachedTargetPosition(final JsonNode valueAsJson) { } } + @Override + public Lsn extractPositionFromHeartbeatOffset(final Map sourceOffset) { + throw new RuntimeException("Heartbeat is not supported for MSSQL"); + } + private Lsn extractLsn(final JsonNode valueAsJson) { return Optional.ofNullable(valueAsJson.get("source")) .flatMap(source -> Optional.ofNullable(source.get("commit_lsn").asText())) diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 15c17d3008ae..13e6a9374216 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -41,6 +41,7 @@ import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.SyncMode; +import io.debezium.connector.sqlserver.Lsn; import java.io.File; import java.sql.Connection; import java.sql.JDBCType; @@ -429,8 +430,8 @@ public List> getIncrementalIterators( if (MssqlCdcHelper.isCdc(sourceConfig) && shouldUseCDC(catalog)) { LOGGER.info("using CDC: {}", true); final Duration firstRecordWaitTime = FirstRecordWaitTimeUtil.getFirstRecordWaitTime(sourceConfig); - final AirbyteDebeziumHandler handler = - new AirbyteDebeziumHandler(sourceConfig, + final AirbyteDebeziumHandler handler = + new AirbyteDebeziumHandler<>(sourceConfig, MssqlCdcTargetPosition.getTargetPosition(database, sourceConfig.get(JdbcUtils.DATABASE_KEY).asText()), true, firstRecordWaitTime); final Supplier> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(catalog, diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile index ca59f68bbf10..d359ef8d269b 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=2.0.5 +LABEL io.airbyte.version=2.0.6 LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index f1dd41e69f15..7e16a35286e0 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=2.0.5 +LABEL io.airbyte.version=2.0.6 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcPosition.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcPosition.java new file mode 100644 index 000000000000..5603dd023c0c --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcPosition.java @@ -0,0 +1,31 @@ +package io.airbyte.integrations.source.mysql; + +import java.util.Objects; + +public class MySqlCdcPosition { + public final String fileName; + public final Long position; + + public MySqlCdcPosition(final String fileName, final Long position) { + this.fileName = fileName; + this.position = position; + } + + @Override + public boolean equals(final Object obj) { + if (obj instanceof final MySqlCdcPosition mySqlCdcPosition) { + return fileName.equals(mySqlCdcPosition.fileName) && mySqlCdcPosition.position.equals(position); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(fileName, position); + } + + @Override + public String toString() { + return "FileName: " + fileName + ", Position : " + position; + } +} diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java index f5bb422c0741..c25613c07408 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java @@ -16,6 +16,7 @@ import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode; import java.net.URI; import java.nio.file.Path; +import java.time.Duration; import java.util.Properties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,6 +24,7 @@ public class MySqlCdcProperties { private static final Logger LOGGER = LoggerFactory.getLogger(MySqlCdcProperties.class); + private static final Duration HEARTBEAT_FREQUENCY = Duration.ofSeconds(10); static Properties getDebeziumProperties(final JdbcDatabase database) { final JsonNode sourceConfig = database.getSourceConfig(); @@ -57,6 +59,7 @@ private static Properties commonProperties(final JdbcDatabase database) { props.setProperty("converters", "boolean, datetime"); props.setProperty("boolean.type", "io.airbyte.integrations.debezium.internals.CustomMySQLTinyIntOneToBooleanConverter"); props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter"); + props.setProperty("heartbeat.interval.ms", Long.toString(HEARTBEAT_FREQUENCY.toMillis())); // For CDC mode, the user cannot provide timezone arguments as JDBC parameters - they are // specifically defined in the replication_method diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java index ebe00f508153..7b04737069c7 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java @@ -10,38 +10,40 @@ import io.airbyte.integrations.debezium.internals.SnapshotMetadata; import java.sql.SQLException; import java.util.List; -import java.util.Objects; +import java.util.Map; import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MySqlCdcTargetPosition implements CdcTargetPosition { +public class MySqlCdcTargetPosition implements CdcTargetPosition { private static final Logger LOGGER = LoggerFactory.getLogger(MySqlCdcTargetPosition.class); - public final String fileName; - public final Integer position; + private final MySqlCdcPosition targetPosition; - public MySqlCdcTargetPosition(final String fileName, final Integer position) { - this.fileName = fileName; - this.position = position; + public MySqlCdcTargetPosition(final String fileName, final Long position) { + this(new MySqlCdcPosition(fileName, position)); } @Override public boolean equals(final Object obj) { if (obj instanceof final MySqlCdcTargetPosition cdcTargetPosition) { - return fileName.equals(cdcTargetPosition.fileName) && cdcTargetPosition.position.equals(position); + return targetPosition.equals(cdcTargetPosition.targetPosition); } return false; } @Override public int hashCode() { - return Objects.hash(fileName, position); + return targetPosition.hashCode(); } @Override public String toString() { - return "FileName: " + fileName + ", Position : " + position; + return targetPosition.toString(); + } + + public MySqlCdcTargetPosition(final MySqlCdcPosition targetPosition) { + this.targetPosition = targetPosition; } public static MySqlCdcTargetPosition targetPosition(final JdbcDatabase database) { @@ -49,7 +51,7 @@ public static MySqlCdcTargetPosition targetPosition(final JdbcDatabase database) connection -> connection.createStatement().executeQuery("SHOW MASTER STATUS"), resultSet -> { final String file = resultSet.getString("File"); - final int position = resultSet.getInt("Position"); + final long position = resultSet.getLong("Position"); if (file == null || position == 0) { return new MySqlCdcTargetPosition(null, null); } @@ -75,17 +77,35 @@ public boolean reachedTargetPosition(final JsonNode valueAsJson) { LOGGER.info("Signalling close because Snapshot is complete"); return true; } else { - final int eventPosition = valueAsJson.get("source").get("pos").asInt(); + final long eventPosition = valueAsJson.get("source").get("pos").asLong(); final boolean isEventPositionAfter = - eventFileName.compareTo(fileName) > 0 || (eventFileName.compareTo(fileName) == 0 && eventPosition >= position); + eventFileName.compareTo(targetPosition.fileName) > 0 || (eventFileName.compareTo( + targetPosition.fileName) == 0 && eventPosition >= targetPosition.position); if (isEventPositionAfter) { LOGGER.info("Signalling close because record's binlog file : " + eventFileName + " , position : " + eventPosition + " is after target file : " - + fileName + " , target position : " + position); + + targetPosition.fileName + " , target position : " + targetPosition.position); } return isEventPositionAfter; } } + @Override + public boolean reachedTargetPosition(final MySqlCdcPosition positionFromHeartbeat) { + return positionFromHeartbeat.fileName.compareTo(targetPosition.fileName) > 0 || + (positionFromHeartbeat.fileName.compareTo(targetPosition.fileName) == 0 + && positionFromHeartbeat.position >= targetPosition.position); + } + + @Override + public boolean isHeartbeatSupported() { + return true; + } + + @Override + public MySqlCdcPosition extractPositionFromHeartbeatOffset(final Map sourceOffset) { + return new MySqlCdcPosition(sourceOffset.get("file").toString(), (Long) sourceOffset.get("pos")); + } + } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index f19afa9f469e..1d1b43a4c08d 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -285,8 +285,8 @@ public List> getIncrementalIterators(final if (isCdc(sourceConfig) && shouldUseCDC(catalog)) { final Duration firstRecordWaitTime = FirstRecordWaitTimeUtil.getFirstRecordWaitTime(sourceConfig); LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds()); - final AirbyteDebeziumHandler handler = - new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, firstRecordWaitTime); + final AirbyteDebeziumHandler handler = + new AirbyteDebeziumHandler<>(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), true, firstRecordWaitTime); final MySqlCdcStateHandler mySqlCdcStateHandler = new MySqlCdcStateHandler(stateManager); final MySqlCdcConnectorMetadataInjector mySqlCdcConnectorMetadataInjector = new MySqlCdcConnectorMetadataInjector(); diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index 24c0ed6e98a5..8b6cc64b6c4f 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -146,7 +146,7 @@ protected CdcTargetPosition cdcLatestTargetPosition() { @Override protected CdcTargetPosition extractPosition(final JsonNode record) { - return new MySqlCdcTargetPosition(record.get(CDC_LOG_FILE).asText(), record.get(CDC_LOG_POS).asInt()); + return new MySqlCdcTargetPosition(record.get(CDC_LOG_FILE).asText(), record.get(CDC_LOG_POS).asLong()); } @Override diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcTargetPosition.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcTargetPosition.java index c2ee04726ce4..e43e6fff4d11 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcTargetPosition.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcTargetPosition.java @@ -12,12 +12,13 @@ import io.airbyte.integrations.debezium.CdcTargetPosition; import io.airbyte.integrations.debezium.internals.SnapshotMetadata; import java.sql.SQLException; +import java.util.Map; import java.util.Objects; import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PostgresCdcTargetPosition implements CdcTargetPosition { +public class PostgresCdcTargetPosition implements CdcTargetPosition { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresCdcTargetPosition.class); @VisibleForTesting @@ -71,8 +72,8 @@ public boolean reachedTargetPosition(final JsonNode valueAsJson) { } @Override - public boolean reachedTargetPosition(final Long lsn) { - return lsn != null && lsn.compareTo(targetLsn.asLong()) >= 0; + public boolean reachedTargetPosition(final Long positionFromHeartbeat) { + return positionFromHeartbeat != null && positionFromHeartbeat.compareTo(targetLsn.asLong()) >= 0; } private PgLsn extractLsn(final JsonNode valueAsJson) { @@ -88,4 +89,9 @@ public boolean isHeartbeatSupported() { return true; } + @Override + public Long extractPositionFromHeartbeatOffset(final Map sourceOffset) { + return (long) sourceOffset.get("lsn"); + } + } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index aaf7bf5258a7..54e4bb6de55d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -381,7 +381,7 @@ public List> getIncrementalIterators(final PostgresUtils.getPluginValue(sourceConfig.get("replication_method"))); } - final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig, + final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler<>(sourceConfig, PostgresCdcTargetPosition.targetPosition(database), false, firstRecordWaitTime); final PostgresCdcStateHandler postgresCdcStateHandler = new PostgresCdcStateHandler(stateManager); final List streamsToSnapshot = identifyStreamsToSnapshot(catalog, stateManager); diff --git a/connectors.md b/connectors.md index ddb3a43cc453..9d0512476e27 100644 --- a/connectors.md +++ b/connectors.md @@ -144,7 +144,7 @@ | **Monday** | Monday icon | Source | airbyte/source-monday:0.2.3 | beta | [link](https://docs.airbyte.com/integrations/sources/monday) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-monday) | `80a54ea2-9959-4040-aac1-eee42423ec9b` | | **MongoDb** | MongoDb icon | Source | airbyte/source-mongodb-v2:0.1.19 | alpha | [link](https://docs.airbyte.com/integrations/sources/mongodb-v2) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mongodb-v2) | `b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e` | | **My Hours** | My Hours icon | Source | airbyte/source-my-hours:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/sources/my-hours) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-my-hours) | `722ba4bf-06ec-45a4-8dd5-72e4a5cf3903` | -| **MySQL** | MySQL icon | Source | airbyte/source-mysql:2.0.5 | beta | [link](https://docs.airbyte.com/integrations/sources/mysql) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mysql) | `435bb9a5-7887-4809-aa58-28c27df0d7ad` | +| **MySQL** | MySQL icon | Source | airbyte/source-mysql:2.0.6 | beta | [link](https://docs.airbyte.com/integrations/sources/mysql) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mysql) | `435bb9a5-7887-4809-aa58-28c27df0d7ad` | | **NASA** | NASA icon | Source | airbyte/source-nasa:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/sources/nasa) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-nasa) | `1a8667d7-7978-43cd-ba4d-d32cbd478971` | | **Netsuite** | Netsuite icon | Source | airbyte/source-netsuite:0.1.3 | alpha | [link](https://docs.airbyte.com/integrations/sources/netsuite) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-netsuite) | `4f2f093d-ce44-4121-8118-9d13b7bfccd0` | | **New York Times** | New York Times icon | Source | airbyte/source-nytimes:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/sources/nytimes) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-nytimes) | `0fae6a9a-04eb-44d4-96e1-e02d3dbc1d83` | diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index f5fb25e05b55..47fa25b24584 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -256,6 +256,7 @@ WHERE actor_definition_id ='435bb9a5-7887-4809-aa58-28c27df0d7ad' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.0.6 | 2023-03-21 | [23984](https://github.com/airbytehq/airbyte/pull/23984) | Support CDC heartbeats | | 2.0.5 | 2023-03-21 | [24147](https://github.com/airbytehq/airbyte/pull/24275) | Fix error with CDC checkpointing | | 2.0.4 | 2023-03-20 | [24147](https://github.com/airbytehq/airbyte/pull/24147) | Support different table structure during "DESCRIBE" query | | 2.0.3 | 2023-03-15 | [24082](https://github.com/airbytehq/airbyte/pull/24082) | Fixed NPE during cursor values validation |