Skip to content

Commit

Permalink
postgres-cdc checkpointing: fix LSN parsing bug + refactor for effici…
Browse files Browse the repository at this point in the history
…ency (airbytehq#24582)

* Fix LSN parsing from Integer to Long

* rebasing

* Rebase

* Rebase

* Other casting

* Lock the file only when reading, so the file is free when parsing the object.
Increased from 1 to 166 checkpoints, and from skipping hundreds of checkpoints to never skip a state.

* Update load function documentation

* bump mysql and mssql

* cdc: refactor to remove debezium dependency from connector packages

* use gradle's shared dependency

* more refactoring

* upgrade docker version

* resolve master merge conflicts

* Automated Change

* minor changes

* resolve merge conflicts

* avoid deserializing multiple times

* simplify

* enable checkpointing for Postgres

* more improvements

* enable assertions

* changelog + bump version

* auto-bump connector version

* auto-bump connector version

* manual bump

---------

Co-authored-by: subodh <subodh1810@gmail.com>
Co-authored-by: subodh1810 <subodh1810@users.noreply.github.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
4 people authored and marcosmarxm committed Jun 8, 2023
1 parent 2c1d0ea commit 0467d9a
Show file tree
Hide file tree
Showing 30 changed files with 323 additions and 303 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8055,7 +8055,7 @@
"sourceDefinitionId": "1fa90628-2b9e-11ed-a261-0242ac120002",
"name": "AlloyDB for PostgreSQL",
"dockerRepository": "airbyte/source-alloydb",
"dockerImageTag": "2.0.22",
"dockerImageTag": "2.0.23",
"documentationUrl": "https://docs.airbyte.com/integrations/sources/alloydb",
"icon": "alloydb.svg",
"sourceType": "database",
Expand Down Expand Up @@ -17315,7 +17315,7 @@
"sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1",
"name": "Microsoft SQL Server (MSSQL)",
"dockerRepository": "airbyte/source-mssql",
"dockerImageTag": "1.0.12",
"dockerImageTag": "1.0.13",
"documentationUrl": "https://docs.airbyte.com/integrations/sources/mssql",
"icon": "mssql.svg",
"sourceType": "database",
Expand Down Expand Up @@ -18283,7 +18283,7 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "2.0.16",
"dockerImageTag": "2.0.17",
"documentationUrl": "https://docs.airbyte.com/integrations/sources/mysql",
"icon": "mysql.svg",
"sourceType": "database",
Expand Down Expand Up @@ -21009,7 +21009,7 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "2.0.22",
"dockerImageTag": "2.0.23",
"documentationUrl": "https://docs.airbyte.com/integrations/sources/postgres",
"icon": "postgresql.svg",
"sourceType": "database",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
- name: AlloyDB for PostgreSQL
sourceDefinitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
dockerRepository: airbyte/source-alloydb
dockerImageTag: 2.0.22
dockerImageTag: 2.0.23
documentationUrl: https://docs.airbyte.com/integrations/sources/alloydb
icon: alloydb.svg
sourceType: database
Expand Down Expand Up @@ -1236,7 +1236,7 @@
- name: Microsoft SQL Server (MSSQL)
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerRepository: airbyte/source-mssql
dockerImageTag: 1.0.12
dockerImageTag: 1.0.13
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
icon: mssql.svg
sourceType: database
Expand Down Expand Up @@ -1303,7 +1303,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 2.0.16
dockerImageTag: 2.0.17
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down Expand Up @@ -1623,7 +1623,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 2.0.22
dockerImageTag: 2.0.23
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-alloydb:2.0.22"
- dockerImage: "airbyte/source-alloydb:2.0.23"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -8666,7 +8666,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mssql:1.0.12"
- dockerImage: "airbyte/source-mssql:1.0.13"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/mssql"
connectionSpecification:
Expand Down Expand Up @@ -9543,7 +9543,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:2.0.16"
- dockerImage: "airbyte/source-mysql:2.0.17"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/mysql"
connectionSpecification:
Expand Down Expand Up @@ -12141,7 +12141,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:2.0.22"
- dockerImage: "airbyte/source-postgres:2.0.23"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public AutoCloseableIterator<AirbyteMessage> getSnapshotIterators(
schemaHistoryManager(new EmptySavedInfo()));
tableSnapshotPublisher.start(queue);

final AutoCloseableIterator<ChangeEvent<String, String>> eventIterator = new DebeziumRecordIterator<>(
final AutoCloseableIterator<ChangeEventWithMetadata> eventIterator = new DebeziumRecordIterator<>(
queue,
targetPosition,
tableSnapshotPublisher::hasClosed,
Expand Down Expand Up @@ -102,7 +102,7 @@ public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final Confi
publisher.start(queue);

// handle state machine around pub/sub logic.
final AutoCloseableIterator<ChangeEvent<String, String>> eventIterator = new DebeziumRecordIterator<>(
final AutoCloseableIterator<ChangeEventWithMetadata> eventIterator = new DebeziumRecordIterator<>(
queue,
targetPosition,
publisher::hasClosed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

package io.airbyte.integrations.debezium;

import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.engine.ChangeEvent;
import io.airbyte.integrations.debezium.internals.ChangeEventWithMetadata;
import java.util.Map;

/**
Expand All @@ -20,11 +19,11 @@ public interface CdcTargetPosition<T> {
/**
* Reads a position value (ex: LSN) from a change event and compares it to target position
*
* @param valueAsJson json representation of a change event
* @param changeEventWithMetadata change event from Debezium with extra calculated metadata
* @return true if event position is equal or greater than target position, or if last snapshot
* event
*/
boolean reachedTargetPosition(final JsonNode valueAsJson);
boolean reachedTargetPosition(final ChangeEventWithMetadata changeEventWithMetadata);

/**
* Reads a position value (lsn) from a change event and compares it to target lsn
Expand Down Expand Up @@ -54,16 +53,6 @@ default boolean isHeartbeatSupported() {
*/
T extractPositionFromHeartbeatOffset(final Map<String, ?> sourceOffset);

/**
* This function indicates if the event is part of the snapshot or not.
*
* @param event Event from the CDC load
* @return Returns `true` when the DB event is part of the snapshot load. Otherwise, returns `false`
*/
default boolean isSnapshotEvent(final ChangeEvent<String, String> event) {
return false;
}

/**
* This function checks if the event we are processing in the loop is already behind the offset so
* the process can safety save the state.
Expand All @@ -72,7 +61,7 @@ default boolean isSnapshotEvent(final ChangeEvent<String, String> event) {
* @param event Event from the CDC load
* @return Returns `true` when the record is behind the offset. Otherwise, it returns `false`
*/
default boolean isRecordBehindOffset(final Map<String, String> offset, final ChangeEvent<String, String> event) {
default boolean isRecordBehindOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,34 +108,37 @@ private static ByteBuffer stringToByteBuffer(final String s) {

/**
* See FileOffsetBackingStore#load - logic is mostly borrowed from here. duplicated because this
* method is not public.
* method is not public. Reduced the try catch block to only the read operation from original code
* to reduce errors when reading the file.
*/
@SuppressWarnings("unchecked")
private Map<ByteBuffer, ByteBuffer> load() {
Object obj;
try (final SafeObjectInputStream is = new SafeObjectInputStream(Files.newInputStream(offsetFilePath))) {
// todo (cgardens) - we currently suppress a security warning for this line. use of readObject from
// untrusted sources is considered unsafe. Since the source is controlled by us in this case it
// should be safe. That said, changing this implementation to not use readObject would remove some
// headache.
final Object obj = is.readObject();
if (!(obj instanceof HashMap))
throw new ConnectException("Expected HashMap but found " + obj.getClass());
final Map<byte[], byte[]> raw = (Map<byte[], byte[]>) obj;
final Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
for (final Map.Entry<byte[], byte[]> mapEntry : raw.entrySet()) {
final ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null;
final ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null;
data.put(key, value);
}

return data;
obj = is.readObject();
} catch (final NoSuchFileException | EOFException e) {
// NoSuchFileException: Ignore, may be new.
// EOFException: Ignore, this means the file was missing or corrupt
return Collections.emptyMap();
} catch (final IOException | ClassNotFoundException e) {
throw new ConnectException(e);
}

if (!(obj instanceof HashMap))
throw new ConnectException("Expected HashMap but found " + obj.getClass());
final Map<byte[], byte[]> raw = (Map<byte[], byte[]>) obj;
final Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
for (final Map.Entry<byte[], byte[]> mapEntry : raw.entrySet()) {
final ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null;
final ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null;
data.put(key, value);
}

return data;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.debezium.engine.ChangeEvent;

public class ChangeEventWithMetadata {

private final ChangeEvent<String, String> event;
private final JsonNode eventValueAsJson;
private final SnapshotMetadata snapshotMetadata;

public ChangeEventWithMetadata(final ChangeEvent<String, String> event) {
this.event = event;
this.eventValueAsJson = Jsons.deserialize(event.value());
this.snapshotMetadata = SnapshotMetadata.fromString(eventValueAsJson.get("source").get("snapshot").asText());
}

public ChangeEvent<String, String> event() {
return event;
}

public JsonNode eventValueAsJson() {
return eventValueAsJson;
}

public boolean isSnapshotEvent() {
return SnapshotMetadata.isSnapshotEventMetadata(snapshotMetadata);
}

public SnapshotMetadata snapshotMetadata() {
return snapshotMetadata;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.debezium.CdcMetadataInjector;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.debezium.engine.ChangeEvent;
import java.sql.Timestamp;
import java.time.Instant;

Expand All @@ -20,10 +18,10 @@ public class DebeziumEventUtils {
public static final String CDC_UPDATED_AT = "_ab_cdc_updated_at";
public static final String CDC_DELETED_AT = "_ab_cdc_deleted_at";

public static AirbyteMessage toAirbyteMessage(final ChangeEvent<String, String> event,
public static AirbyteMessage toAirbyteMessage(final ChangeEventWithMetadata event,
final CdcMetadataInjector cdcMetadataInjector,
final Instant emittedAt) {
final JsonNode debeziumRecord = Jsons.deserialize(event.value());
final JsonNode debeziumRecord = event.eventValueAsJson();
final JsonNode before = debeziumRecord.get("before");
final JsonNode after = debeziumRecord.get("after");
final JsonNode source = debeziumRecord.get("source");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@

package io.airbyte.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.AbstractIterator;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.debezium.CdcTargetPosition;
Expand Down Expand Up @@ -36,8 +34,8 @@
* publisher is not closed. Even after the publisher is closed, the consumer will finish processing
* any produced records before closing.
*/
public class DebeziumRecordIterator<T> extends AbstractIterator<ChangeEvent<String, String>>
implements AutoCloseableIterator<ChangeEvent<String, String>> {
public class DebeziumRecordIterator<T> extends AbstractIterator<ChangeEventWithMetadata>
implements AutoCloseableIterator<ChangeEventWithMetadata> {

private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordIterator.class);

Expand Down Expand Up @@ -85,7 +83,7 @@ public DebeziumRecordIterator(final LinkedBlockingQueue<ChangeEvent<String, Stri
// 4. If change event lsn reached target finish sync
// 5. Otherwise check message queuen again
@Override
protected ChangeEvent<String, String> computeNext() {
protected ChangeEventWithMetadata computeNext() {
// keep trying until the publisher is closed or until the queue is empty. the latter case is
// possible when the publisher has shutdown but the consumer has not yet processed all messages it
// emitted.
Expand Down Expand Up @@ -129,18 +127,18 @@ protected ChangeEvent<String, String> computeNext() {
continue;
}

final JsonNode eventAsJson = Jsons.deserialize(next.value());
hasSnapshotFinished = hasSnapshotFinished(eventAsJson);
final ChangeEventWithMetadata changeEventWithMetadata = new ChangeEventWithMetadata(next);
hasSnapshotFinished = !changeEventWithMetadata.isSnapshotEvent();

// if the last record matches the target file position, it is time to tell the producer to shutdown.
if (targetPosition.reachedTargetPosition(eventAsJson)) {
if (targetPosition.reachedTargetPosition(changeEventWithMetadata)) {
requestClose("Closing: Change event reached target position");
}
this.tsLastHeartbeat = null;
this.lastHeartbeatPosition = null;
this.receivedFirstRecord = true;
this.maxInstanceOfNoRecordsFound = 0;
return next;
return changeEventWithMetadata;
}

if (!signalledDebeziumEngineShutdown) {
Expand All @@ -158,8 +156,9 @@ protected ChangeEvent<String, String> computeNext() {
if (event == null || isHeartbeatEvent(event)) {
continue;
}
hasSnapshotFinished = hasSnapshotFinished(Jsons.deserialize(event.value()));
return event;
final ChangeEventWithMetadata changeEventWithMetadata = new ChangeEventWithMetadata(event);
hasSnapshotFinished = !changeEventWithMetadata.isSnapshotEvent();
return changeEventWithMetadata;
}
throwExceptionIfSnapshotNotFinished();
return endOfData();
Expand Down Expand Up @@ -202,11 +201,6 @@ private boolean heartbeatPosNotChanging() {
return timeElapsedSinceLastHeartbeatTs.compareTo(this.firstRecordWaitTime.dividedBy(2)) > 0;
}

private boolean hasSnapshotFinished(final JsonNode eventAsJson) {
final SnapshotMetadata snapshotMetadata = SnapshotMetadata.fromString(eventAsJson.get("source").get("snapshot").asText());
return !SnapshotMetadata.isSnapshotEventMetadata(snapshotMetadata);
}

private void requestClose(final String closeLogMessage) {
if (signalledDebeziumEngineShutdown) {
return;
Expand Down
Loading

0 comments on commit 0467d9a

Please sign in to comment.