Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL Source : add CDC heartbeat support #23984

Merged
merged 18 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants
* to use debezium for CDC, it should use this class
*/
public class AirbyteDebeziumHandler {
public class AirbyteDebeziumHandler<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteDebeziumHandler.class);
/**
Expand All @@ -38,12 +38,12 @@ public class AirbyteDebeziumHandler {
private static final int QUEUE_CAPACITY = 10000;

private final JsonNode config;
private final CdcTargetPosition targetPosition;
private final CdcTargetPosition<T> targetPosition;
private final boolean trackSchemaHistory;
private final Duration firstRecordWaitTime;

public AirbyteDebeziumHandler(final JsonNode config,
final CdcTargetPosition targetPosition,
final CdcTargetPosition<T> targetPosition,
final boolean trackSchemaHistory,
final Duration firstRecordWaitTime) {
this.config = config;
Expand All @@ -70,7 +70,7 @@ public AutoCloseableIterator<AirbyteMessage> getSnapshotIterators(
schemaHistoryManager(new EmptySavedInfo()));
tableSnapshotPublisher.start(queue);

final AutoCloseableIterator<ChangeEvent<String, String>> eventIterator = new DebeziumRecordIterator(
final AutoCloseableIterator<ChangeEvent<String, String>> eventIterator = new DebeziumRecordIterator<>(
queue,
targetPosition,
tableSnapshotPublisher::hasClosed,
Expand Down Expand Up @@ -102,7 +102,7 @@ public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final Confi
publisher.start(queue);

// handle state machine around pub/sub logic.
final AutoCloseableIterator<ChangeEvent<String, String>> eventIterator = new DebeziumRecordIterator(
final AutoCloseableIterator<ChangeEvent<String, String>> eventIterator = new DebeziumRecordIterator<>(
queue,
targetPosition,
publisher::hasClosed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.debezium;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.Map;

/**
* This interface is used to define the target position at the beginning of the sync so that once we
Expand All @@ -13,23 +14,25 @@
* we might end up syncing forever. In order to tackle that, we need to define a point to end at the
* beginning of the sync
*/
public interface CdcTargetPosition {
public interface CdcTargetPosition<T> {

/**
* Reads a position value (lsn) from a change event and compares it to target lsn
* Reads a position value (ex: LSN) from a change event and compares it to target position
*
* @param valueAsJson json representation of a change event
* @return true if event lsn is equal or greater than targer lsn, or if last snapshot event
* @return true if event position is equal or greater than target position, or if last snapshot
* event
*/
boolean reachedTargetPosition(JsonNode valueAsJson);
boolean reachedTargetPosition(final JsonNode valueAsJson);

/**
* Checks if a specified lsn has reached the target lsn.
* Reads a position value (lsn) from a change event and compares it to target lsn
*
* @param lsn an lsn value
* @return true if lsn is equal or greater than target lsn
* @param positionFromHeartbeat is the position extracted out of a heartbeat event (if the connector
* supports heartbeat)
* @return true if heartbeat position is equal or greater than target position
*/
default boolean reachedTargetPosition(final Long lsn) {
default boolean reachedTargetPosition(final T positionFromHeartbeat) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just remove the default?
It just throw an exception as it's an interface and all the classes needs to implement it.
Otherwise we can make default the method extractPositionFromHeartbeatOffset.

throw new UnsupportedOperationException();
}

Expand All @@ -42,4 +45,12 @@ default boolean isHeartbeatSupported() {
return false;
}

/**
* Returns a position value from a heartbeat event offset.
*
* @param sourceOffset source offset params from heartbeat change event
* @return the heartbeat position in a heartbeat change event or null
*/
T extractPositionFromHeartbeatOffset(final Map<String, ?> sourceOffset);

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* publisher is not closed. Even after the publisher is closed, the consumer will finish processing
* any produced records before closing.
*/
public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String, String>>
public class DebeziumRecordIterator<T> extends AbstractIterator<ChangeEvent<String, String>>
implements AutoCloseableIterator<ChangeEvent<String, String>> {

private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordIterator.class);
Expand All @@ -46,19 +46,19 @@ public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String,

private final Map<Class<? extends ChangeEvent>, Field> heartbeatEventSourceField;
private final LinkedBlockingQueue<ChangeEvent<String, String>> queue;
private final CdcTargetPosition targetPosition;
private final CdcTargetPosition<T> targetPosition;
private final Supplier<Boolean> publisherStatusSupplier;
private final VoidCallable requestClose;
private final Duration firstRecordWaitTime;

private boolean receivedFirstRecord;
private boolean hasSnapshotFinished;
private LocalDateTime tsLastHeartbeat;
private long lastHeartbeatPosition;
private T lastHeartbeatPosition;
private int maxInstanceOfNoRecordsFound;

public DebeziumRecordIterator(final LinkedBlockingQueue<ChangeEvent<String, String>> queue,
final CdcTargetPosition targetPosition,
final CdcTargetPosition<T> targetPosition,
final Supplier<Boolean> publisherStatusSupplier,
final VoidCallable requestClose,
final Duration firstRecordWaitTime) {
Expand All @@ -72,7 +72,7 @@ public DebeziumRecordIterator(final LinkedBlockingQueue<ChangeEvent<String, Stri
this.receivedFirstRecord = false;
this.hasSnapshotFinished = true;
this.tsLastHeartbeat = null;
this.lastHeartbeatPosition = -1;
this.lastHeartbeatPosition = null;
this.maxInstanceOfNoRecordsFound = 0;
}

Expand Down Expand Up @@ -115,14 +115,14 @@ protected ChangeEvent<String, String> computeNext() {
continue;
}

final long heartbeatPos = getHeartbeatPosition(next);
final T heartbeatPos = getHeartbeatPosition(next);
// wrap up sync if heartbeat position crossed the target OR heartbeat position hasn't changed for
// too long
if (hasSyncFinished(heartbeatPos)) {
LOGGER.info("Closing: Heartbeat indicates sync is done");
requestClose();
}
if (heartbeatPos != this.lastHeartbeatPosition) {
if (!heartbeatPos.equals(lastHeartbeatPosition)) {
this.tsLastHeartbeat = LocalDateTime.now();
this.lastHeartbeatPosition = heartbeatPos;
}
Expand All @@ -138,17 +138,17 @@ protected ChangeEvent<String, String> computeNext() {
requestClose();
}
this.tsLastHeartbeat = null;
this.lastHeartbeatPosition = -1L;
this.lastHeartbeatPosition = null;
this.receivedFirstRecord = true;
this.maxInstanceOfNoRecordsFound = 0;
return next;
}
return endOfData();
}

private boolean hasSyncFinished(final long heartbeatPos) {
private boolean hasSyncFinished(final T heartbeatPos) {
return targetPosition.reachedTargetPosition(heartbeatPos)
|| (heartbeatPos == this.lastHeartbeatPosition && heartbeatPosNotChanging());
|| (heartbeatPos.equals(this.lastHeartbeatPosition) && heartbeatPosNotChanging());
}

/**
Expand Down Expand Up @@ -209,10 +209,7 @@ private void throwExceptionIfSnapshotNotFinished() {
* reflection to setAccessible for each event
*/
@VisibleForTesting
protected long getHeartbeatPosition(final ChangeEvent<String, String> heartbeatEvent) {
if (heartbeatEvent == null) {
return -1;
}
protected T getHeartbeatPosition(final ChangeEvent<String, String> heartbeatEvent) {

try {
final Class<? extends ChangeEvent> eventClass = heartbeatEvent.getClass();
Expand All @@ -230,11 +227,9 @@ protected long getHeartbeatPosition(final ChangeEvent<String, String> heartbeatE
}

final SourceRecord sr = (SourceRecord) f.get(heartbeatEvent);
final long hbLsn = (long) sr.sourceOffset().get("lsn");
LOGGER.debug("Found heartbeat lsn: {}", hbLsn);
return hbLsn;
return targetPosition.extractPositionFromHeartbeatOffset(sr.sourceOffset());
} catch (final NoSuchFieldException | IllegalAccessException e) {
LOGGER.info("failed to get heartbeat lsn");
LOGGER.info("failed to get heartbeat source offset");
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.debezium.CdcTargetPosition;
import io.debezium.engine.ChangeEvent;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Test;
Expand All @@ -19,8 +21,20 @@ public class DebeziumRecordIteratorTest {

@Test
public void getHeartbeatPositionTest() {
final DebeziumRecordIterator debeziumRecordIterator = new DebeziumRecordIterator(mock(LinkedBlockingQueue.class),
mock(CdcTargetPosition.class),
final DebeziumRecordIterator<Long> debeziumRecordIterator = new DebeziumRecordIterator<>(mock(LinkedBlockingQueue.class),
new CdcTargetPosition<>() {

@Override
public boolean reachedTargetPosition(JsonNode valueAsJson) {
return false;
}

@Override
public Long extractPositionFromHeartbeatOffset(final Map<String, ?> sourceOffset) {
return (long) sourceOffset.get("lsn");
}

},
() -> false,
() -> {},
Duration.ZERO);
Expand Down Expand Up @@ -50,7 +64,6 @@ public SourceRecord sourceRecord() {
});

assertEquals(lsn, 358824993496L);
assertEquals(-1, debeziumRecordIterator.getHeartbeatPosition(null));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should keep this test, but instead of making it equal to -1 to null.

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MssqlCdcTargetPosition implements CdcTargetPosition {
public class MssqlCdcTargetPosition implements CdcTargetPosition<Lsn> {

private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcTargetPosition.class);
public final Lsn targetLsn;
Expand Down Expand Up @@ -46,6 +47,11 @@ public boolean reachedTargetPosition(final JsonNode valueAsJson) {
}
}

@Override
public Lsn extractPositionFromHeartbeatOffset(final Map<String, ?> sourceOffset) {
throw new RuntimeException("Heartbeat is not supported for MSSQL");
}

private Lsn extractLsn(final JsonNode valueAsJson) {
return Optional.ofNullable(valueAsJson.get("source"))
.flatMap(source -> Optional.ofNullable(source.get("commit_lsn").asText()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.SyncMode;
import io.debezium.connector.sqlserver.Lsn;
import java.io.File;
import java.sql.Connection;
import java.sql.JDBCType;
Expand Down Expand Up @@ -429,8 +430,8 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
if (MssqlCdcHelper.isCdc(sourceConfig) && shouldUseCDC(catalog)) {
LOGGER.info("using CDC: {}", true);
final Duration firstRecordWaitTime = FirstRecordWaitTimeUtil.getFirstRecordWaitTime(sourceConfig);
final AirbyteDebeziumHandler handler =
new AirbyteDebeziumHandler(sourceConfig,
final AirbyteDebeziumHandler<Lsn> handler =
new AirbyteDebeziumHandler<>(sourceConfig,
MssqlCdcTargetPosition.getTargetPosition(database, sourceConfig.get(JdbcUtils.DATABASE_KEY).asText()), true, firstRecordWaitTime);

final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(catalog,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.airbyte.integrations.source.mysql;

import java.util.Objects;

public class MySqlCdcPosition {
public final String fileName;
public final Long position;

public MySqlCdcPosition(final String fileName, final Long position) {
this.fileName = fileName;
this.position = position;
}

@Override
public boolean equals(final Object obj) {
if (obj instanceof final MySqlCdcPosition mySqlCdcPosition) {
return fileName.equals(mySqlCdcPosition.fileName) && mySqlCdcPosition.position.equals(position);
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(fileName, position);
}

@Override
public String toString() {
return "FileName: " + fileName + ", Position : " + position;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlCdcProperties {

private static final Logger LOGGER = LoggerFactory.getLogger(MySqlCdcProperties.class);
private static final Duration HEARTBEAT_FREQUENCY = Duration.ofSeconds(10);

static Properties getDebeziumProperties(final JdbcDatabase database) {
final JsonNode sourceConfig = database.getSourceConfig();
Expand Down Expand Up @@ -57,6 +59,7 @@ private static Properties commonProperties(final JdbcDatabase database) {
props.setProperty("converters", "boolean, datetime");
props.setProperty("boolean.type", "io.airbyte.integrations.debezium.internals.CustomMySQLTinyIntOneToBooleanConverter");
props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter");
props.setProperty("heartbeat.interval.ms", Long.toString(HEARTBEAT_FREQUENCY.toMillis()));

// For CDC mode, the user cannot provide timezone arguments as JDBC parameters - they are
// specifically defined in the replication_method
Expand Down
Loading