Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL Source Support CDC heartbeats #23376

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ dependencies {
implementation libs.airbyte.protocol
implementation project(':airbyte-db:db-lib')

implementation 'io.debezium:debezium-api:1.9.6.Final'
implementation 'io.debezium:debezium-embedded:1.9.6.Final'
implementation 'io.debezium:debezium-connector-sqlserver:1.9.6.Final'
implementation 'io.debezium:debezium-connector-mysql:1.9.6.Final'
implementation files('debezium-connector-postgres-1.9.6.Final.jar')
implementation 'io.debezium:debezium-api:2.1.2.Final'
implementation 'io.debezium:debezium-embedded:2.1.2.Final'
implementation 'io.debezium:debezium-connector-sqlserver:2.1.2.Final'
implementation 'io.debezium:debezium-connector-mysql:2.1.2.Final'
implementation 'io.debezium:debezium-connector-postgres:2.1.2.Final'
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'

testFixturesImplementation project(':airbyte-db:db-lib')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
package io.airbyte.integrations.debezium;

import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.engine.ChangeEvent;

import java.util.Map;

/**
* This interface is used to define the target position at the beginning of the sync so that once we
Expand All @@ -24,16 +25,6 @@ public interface CdcTargetPosition {
*/
boolean reachedTargetPosition(JsonNode valueAsJson);

/**
* Returns a position value (lsn) from a heartbeat event.
*
* @param heartbeatEvent a heartbeat change event
* @return the lsn value in a heartbeat change event or null
*/
default Long getHeartbeatPosition(final ChangeEvent<String, String> heartbeatEvent) {
throw new UnsupportedOperationException();
}

/**
* Checks if a specified lsn has reached the target lsn.
*
Expand All @@ -52,5 +43,12 @@ default boolean reachedTargetPosition(final Long lsn) {
default boolean isHeartbeatSupported() {
return false;
}
/**
* Returns a position value from a heartbeat event offset.
*
* @param sourceOffset source offset params from heartbeat change event
* @return the hearbeat position in a heartbeat change event or null
*/
Object getHeartbeatPositon(Map<String, ?> sourceOffset);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -59,14 +60,30 @@ public Map<String, String> read() {
public void persist(final JsonNode cdcState) {
final Map<String, String> mapAsString =
cdcState != null ? Jsons.object(cdcState, Map.class) : Collections.emptyMap();
final Map<ByteBuffer, ByteBuffer> mappedAsStrings = mapAsString.entrySet().stream().collect(Collectors.toMap(

final Map<String, String> updatedMap = updateStateForDebezium2_1(mapAsString);

final Map<ByteBuffer, ByteBuffer> mappedAsStrings = updatedMap.entrySet().stream().collect(Collectors.toMap(
e -> stringToByteBuffer(e.getKey()),
e -> stringToByteBuffer(e.getValue())));

FileUtils.deleteQuietly(offsetFilePath.toFile());
save(mappedAsStrings);
}

private Map<String, String> updateStateForDebezium2_1(final Map<String, String> mapAsString) {
final Map<String, String> updatedMap = new LinkedHashMap<>();
if (mapAsString.size() > 0) {
String key = mapAsString.keySet().stream().toList().get(0);
final int i = key.indexOf('[');
final int i1 = key.lastIndexOf(']');
final String newKey = key.substring(i, i1 + 1);
final String value = mapAsString.get(key);
updatedMap.put(newKey, value);
}
return updatedMap;
}

private static String byteBufferToString(final ByteBuffer byteBuffer) {
Preconditions.checkNotNull(byteBuffer);
return new String(byteBuffer.array(), StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected Properties getDebeziumProperties() {
props.putAll(properties);

// debezium engine configuration
props.setProperty("name", "engine");
// https://debezium.io/documentation/reference/stable/development/engine.html#engine-properties
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString());
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer
Expand All @@ -55,21 +55,20 @@ protected Properties getDebeziumProperties() {
props.setProperty("max.queue.size", "8192");

if (schemaHistoryManager.isPresent()) {
// https://debezium.io/documentation/reference/1.9/operations/debezium-server.html#debezium-source-database-history-class
// https://debezium.io/documentation/reference/stable/operations/debezium-server.html#debezium-source-database-history-class
// https://debezium.io/documentation/reference/development/engine.html#_in_the_code
// As mentioned in the documents above, debezium connector for MySQL needs to track the schema
// changes. If we don't do this, we can't fetch records for the table.
props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
props.setProperty("database.history.file.filename", schemaHistoryManager.get().getPath().toString());
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", schemaHistoryManager.get().getPath().toString());
}

// https://debezium.io/documentation/reference/configuration/avro.html
// https://debezium.io/documentation/reference/stable/configuration/avro.html
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");

// debezium names
props.setProperty("name", config.get(JdbcUtils.DATABASE_KEY).asText());
props.setProperty("database.server.name", config.get(JdbcUtils.DATABASE_KEY).asText());

// db connection configuration
props.setProperty("database.hostname", config.get(JdbcUtils.HOST_KEY).asText());
Expand All @@ -84,10 +83,14 @@ protected Properties getDebeziumProperties() {
// By default "decimal.handing.mode=precise" which's caused returning this value as a binary.
// The "double" type may cause a loss of precision, so set Debezium's config to store it as a String
// explicitly in its Kafka messages for more details see:
// https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-decimal-types
// https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-decimal-types
// https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation
props.setProperty("decimal.handling.mode", "string");

// WARNING : Never change the value of this otherwise all the connectors would start syncing from
// scratch
props.setProperty("topic.prefix", config.get(JdbcUtils.DATABASE_KEY).asText());

// table selection
props.setProperty("table.include.list", getTableIncludelist(catalog));
// column selection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,24 @@
package io.airbyte.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.AbstractIterator;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.debezium.CdcTargetPosition;
import io.debezium.engine.ChangeEvent;
import java.lang.reflect.Field;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,6 +44,7 @@ public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String,

private static final Duration SUBSEQUENT_RECORD_WAIT_TIME = Duration.ofMinutes(1);

private final Map<Class<? extends ChangeEvent>, Field> heartbeatEventSourceField;
private final LinkedBlockingQueue<ChangeEvent<String, String>> queue;
private final CdcTargetPosition targetPosition;
private final Supplier<Boolean> publisherStatusSupplier;
Expand All @@ -46,9 +53,8 @@ public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String,

private boolean receivedFirstRecord;
private boolean hasSnapshotFinished;
private boolean signalledClose;
private LocalDateTime tsLastHeartbeat;
private Long lastHeartbeatPosition;
private long lastHeartbeatPosition;
private int maxInstanceOfNoRecordsFound;

public DebeziumRecordIterator(final LinkedBlockingQueue<ChangeEvent<String, String>> queue,
Expand All @@ -61,12 +67,12 @@ public DebeziumRecordIterator(final LinkedBlockingQueue<ChangeEvent<String, Stri
this.publisherStatusSupplier = publisherStatusSupplier;
this.requestClose = requestClose;
this.firstRecordWaitTime = firstRecordWaitTime;
this.heartbeatEventSourceField = new HashMap<>(1);

this.receivedFirstRecord = false;
this.hasSnapshotFinished = true;
this.signalledClose = false;
tsLastHeartbeat = null;
lastHeartbeatPosition = null;
this.tsLastHeartbeat = null;
this.lastHeartbeatPosition = -1;
this.maxInstanceOfNoRecordsFound = 0;
}

Expand All @@ -85,10 +91,6 @@ protected ChangeEvent<String, String> computeNext() {
while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) {
final ChangeEvent<String, String> next;

// #18987: waitTime is still required with heartbeats for backward
// compatibility with connectors not implementing heartbeat
// yet (MySql, MSSql), And also due to postgres taking a long time
// initially staying on "searching for WAL resume position"
final Duration waitTime = receivedFirstRecord ? SUBSEQUENT_RECORD_WAIT_TIME : this.firstRecordWaitTime;
try {
next = queue.poll(waitTime.getSeconds(), TimeUnit.SECONDS);
Expand All @@ -98,11 +100,8 @@ protected ChangeEvent<String, String> computeNext() {

// if within the timeout, the consumer could not get a record, it is time to tell the producer to
// shutdown.
// #18987: Noticed in testing that it's possible for DBZ to be stuck "Searching for WAL resume
// position"
// when no changes exist. In that case queue will pop after timeout with null value for next
if (next == null) {
if ((!receivedFirstRecord || hasSnapshotFinished || maxInstanceOfNoRecordsFound >= 10) && !signalledClose) {
if (!receivedFirstRecord || hasSnapshotFinished || maxInstanceOfNoRecordsFound >= 10) {
LOGGER.info("No records were returned by Debezium in the timeout seconds {}, closing the engine and iterator", waitTime.getSeconds());
requestClose();
}
Expand All @@ -111,56 +110,43 @@ protected ChangeEvent<String, String> computeNext() {
continue;
}

if (targetPosition.isHeartbeatSupported()) {
// check if heartbeat and read hearbeat position
LOGGER.debug("checking heartbeat lsn for: {}", next);
final Long heartbeatPos = targetPosition.getHeartbeatPosition(next);
if (heartbeatPos != null) {
// wrap up sync if heartbeat position crossed the target OR heartbeat position hasn't changed for
// too long
if (targetPosition.reachedTargetPosition(heartbeatPos)
|| (heartbeatPos.equals(this.lastHeartbeatPosition) && heartbeatPosNotChanging()) && !signalledClose) {
LOGGER.info("Closing: Heartbeat indicates sync is done");
requestClose();
}
if (!heartbeatPos.equals(this.lastHeartbeatPosition)) {
this.tsLastHeartbeat = LocalDateTime.now();
this.lastHeartbeatPosition = heartbeatPos;
}
if (isHeartbeatEvent(next)) {
if (!hasSnapshotFinished) {
continue;
}

final long heartbeatPos = getHeartbeatPosition(next);
// wrap up sync if heartbeat position crossed the target OR heartbeat position hasn't changed for
// too long
if (targetPosition.reachedTargetPosition(heartbeatPos)
|| (heartbeatPos == this.lastHeartbeatPosition && heartbeatPosNotChanging())) {
LOGGER.info("Closing: Heartbeat indicates sync is done");
requestClose();
}
if (heartbeatPos != this.lastHeartbeatPosition) {
this.tsLastHeartbeat = LocalDateTime.now();
this.lastHeartbeatPosition = heartbeatPos;
}
continue;
}

final JsonNode eventAsJson = Jsons.deserialize(next.value());
hasSnapshotFinished = hasSnapshotFinished(eventAsJson);

// if the last record matches the target file position, it is time to tell the producer to shutdown.

if (!signalledClose && shouldSignalClose(eventAsJson)) {
if (targetPosition.reachedTargetPosition(eventAsJson)) {
LOGGER.info("Closing: Change event reached target position");
requestClose();
}
this.tsLastHeartbeat = null;
this.lastHeartbeatPosition = null;
this.lastHeartbeatPosition = -1L;
this.receivedFirstRecord = true;
this.maxInstanceOfNoRecordsFound = 0;
return next;
}
return endOfData();
}

private boolean heartbeatPosNotChanging() {
final Duration tbt = Duration.between(this.tsLastHeartbeat, LocalDateTime.now());
LOGGER.debug("Time since last hb_pos change {}s", tbt.toSeconds());
// wait time for no change in heartbeat position is half of initial waitTime
return tbt.compareTo(this.firstRecordWaitTime.dividedBy(2)) > 0;
}

private boolean hasSnapshotFinished(final JsonNode eventAsJson) {
final SnapshotMetadata snapshot = SnapshotMetadata.valueOf(eventAsJson.get("source").get("snapshot").asText().toUpperCase());
return SnapshotMetadata.TRUE != snapshot;
}

/**
* Debezium was built as an ever running process which keeps on listening for new changes on DB and
* immediately processing them. Airbyte needs debezium to work as a start stop mechanism. In order
Expand All @@ -183,14 +169,25 @@ public void close() throws Exception {
requestClose();
}

private boolean shouldSignalClose(final JsonNode eventAsJson) {
return targetPosition.reachedTargetPosition(eventAsJson);
private boolean isHeartbeatEvent(final ChangeEvent<String, String> event) {
return targetPosition.isHeartbeatSupported() && Objects.nonNull(event) && !event.value().contains("source");
}

private boolean heartbeatPosNotChanging() {
final Duration tbt = Duration.between(this.tsLastHeartbeat, LocalDateTime.now());
LOGGER.debug("Time since last hb_pos change {}s", tbt.toSeconds());
// wait time for no change in heartbeat position is half of initial waitTime
return tbt.compareTo(this.firstRecordWaitTime.dividedBy(2)) > 0;
}

private boolean hasSnapshotFinished(final JsonNode eventAsJson) {
final SnapshotMetadata snapshotMetadata = SnapshotMetadata.fromString(eventAsJson.get("source").get("snapshot").asText());
return !SnapshotMetadata.isSnapshotEventMetadata(snapshotMetadata);
}

private void requestClose() {
try {
requestClose.call();
signalledClose = true;
} catch (final Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -203,4 +200,35 @@ private void throwExceptionIfSnapshotNotFinished() {
}
}

@VisibleForTesting
protected Long getHeartbeatPosition(final ChangeEvent<String, String> heartbeatEvent) {
try {
if (heartbeatEvent == null) {
return null;
}

final Class<? extends ChangeEvent> eventClass = heartbeatEvent.getClass();
final Field f;
if (heartbeatEventSourceField.containsKey(eventClass)) {
f = heartbeatEventSourceField.get(eventClass);
} else {
f = eventClass.getDeclaredField("sourceRecord");
f.setAccessible(true);
heartbeatEventSourceField.put(eventClass, f);

if (heartbeatEventSourceField.size() > 1) {
LOGGER.warn("Field Cache size growing beyond expected size of 1, size is " + heartbeatEventSourceField.size());
}
}

final SourceRecord sr = (SourceRecord) f.get(heartbeatEvent);
final long hbPosition = (long) targetPosition.getHeartbeatPositon(sr.sourceOffset());
LOGGER.debug("Found heartbeat position: {}", hbPosition);
return hbPosition;
} catch (final NoSuchFieldException | IllegalAccessException e) {
LOGGER.info("failed to get heartbeat position");
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public void start(final BlockingQueue<ChangeEvent<String, String>> queue) {
})
.using((success, message, error) -> {
LOGGER.info("Debezium engine shutdown.");
LOGGER.info(message);
thrownError.set(error);
engineLatch.countDown();
})
Expand Down
Loading