Skip to content

Commit

Permalink
introduce common abstraction for CDC via debezium (#4580)
Browse files Browse the repository at this point in the history
* wip

* add file

* final structure

* few more updates

* undo unwanted changes

* add abstract test + more refinement

* remove CDC metadata to debezium

* rename class + add missing property

* move debezium to bases + upgrade debezium version + review comments

* downgrade version + minor fixes

* reset to minutes

* fix build

* address review comments

* should return Optional

* use common abstraction for CDC via debezium for mysql (#4604)

* use new cdc abstraction for mysql

* undo wanted change

* pull in latest changes

* use renamed class + move constants to MySqlSource

* bring in latest changes from cdc abstraction

* format

* bring in latest changes

* pull in latest changes

* use common abstraction for CDC via debezium for postgres (#4607)

* use cdc abstraction for postgres

* add files

* ready

* use renamed class + move constants to PostgresSource

* bring in the latest changes

* bring in latest changes

* pull in latest changes
  • Loading branch information
subodh1810 authored and gl-pix committed Jul 22, 2021
1 parent 5a8b185 commit 6bf9028
Show file tree
Hide file tree
Showing 47 changed files with 1,964 additions and 2,015 deletions.
23 changes: 23 additions & 0 deletions airbyte-integrations/bases/debezium/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
plugins {
id "java-test-fixtures"
}

project.configurations {
testFixturesImplementation.extendsFrom implementation
}
dependencies {
implementation project(':airbyte-protocol:models')

implementation 'io.debezium:debezium-api:1.4.2.Final'
implementation 'io.debezium:debezium-embedded:1.4.2.Final'
implementation 'io.debezium:debezium-connector-mysql:1.4.2.Final'
implementation 'io.debezium:debezium-connector-postgres:1.4.2.Final'

testFixturesImplementation project(':airbyte-db')
testFixturesImplementation project(':airbyte-integrations:bases:base-java')

testFixturesImplementation 'org.junit.jupiter:junit-jupiter-engine:5.4.2'
testFixturesImplementation 'org.junit.jupiter:junit-jupiter-api:5.4.2'
testFixturesImplementation 'org.junit.jupiter:junit-jupiter-params:5.4.2'

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.debezium;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.commons.util.CompositeIterator;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
import io.airbyte.integrations.debezium.internals.DebeziumEventUtils;
import io.airbyte.integrations.debezium.internals.DebeziumRecordIterator;
import io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher;
import io.airbyte.integrations.debezium.internals.FilteredFileDatabaseHistory;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.debezium.engine.ChangeEvent;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants
* to use debezium for CDC, it should use this class
*/
public class AirbyteDebeziumHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteDebeziumHandler.class);
/**
* We use 10000 as capacity cause the default queue size and batch size of debezium is :
* {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_BATCH_SIZE}is 2048
* {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_QUEUE_SIZE} is 8192
*/
private static final int QUEUE_CAPACITY = 10000;

private final Properties connectorProperties;
private final JsonNode config;
private final CdcTargetPosition targetPosition;
private final ConfiguredAirbyteCatalog catalog;
private final boolean trackSchemaHistory;

private final LinkedBlockingQueue<ChangeEvent<String, String>> queue;

public AirbyteDebeziumHandler(JsonNode config,
CdcTargetPosition targetPosition,
Properties connectorProperties,
ConfiguredAirbyteCatalog catalog,
boolean trackSchemaHistory) {
this.config = config;
this.targetPosition = targetPosition;
this.connectorProperties = connectorProperties;
this.catalog = catalog;
this.trackSchemaHistory = trackSchemaHistory;
this.queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
}

public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(CdcSavedInfoFetcher cdcSavedInfoFetcher,
CdcStateHandler cdcStateHandler,
CdcMetadataInjector cdcMetadataInjector,
Instant emittedAt) {
LOGGER.info("using CDC: {}", true);
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcSavedInfoFetcher.getSavedOffset());
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager = schemaHistoryManager(cdcSavedInfoFetcher);
final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(connectorProperties, config, catalog, offsetManager,
schemaHistoryManager);
publisher.start(queue);

// handle state machine around pub/sub logic.
final AutoCloseableIterator<ChangeEvent<String, String>> eventIterator = new DebeziumRecordIterator(
queue,
targetPosition,
publisher::hasClosed,
publisher::close);

// convert to airbyte message.
final AutoCloseableIterator<AirbyteMessage> messageIterator = AutoCloseableIterators
.transform(
eventIterator,
(event) -> DebeziumEventUtils.toAirbyteMessage(event, cdcMetadataInjector, emittedAt));

// our goal is to get the state at the time this supplier is called (i.e. after all message records
// have been produced)
final Supplier<AirbyteMessage> stateMessageSupplier = () -> {
Map<String, String> offset = offsetManager.read();
String dbHistory = trackSchemaHistory ? schemaHistoryManager
.orElseThrow(() -> new RuntimeException("Schema History Tracking is true but manager is not initialised")).read() : null;

return cdcStateHandler.saveState(offset, dbHistory);
};

// wrap the supplier in an iterator so that we can concat it to the message iterator.
final Iterator<AirbyteMessage> stateMessageIterator = MoreIterators.singletonIteratorFromSupplier(stateMessageSupplier);

// this structure guarantees that the debezium engine will be closed, before we attempt to emit the
// state file. we want this so that we have a guarantee that the debezium offset file (which we use
// to produce the state file) is up-to-date.
final CompositeIterator<AirbyteMessage> messageIteratorWithStateDecorator =
AutoCloseableIterators.concatWithEagerClose(messageIterator, AutoCloseableIterators.fromIterator(stateMessageIterator));

return Collections.singletonList(messageIteratorWithStateDecorator);
}

private Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager(CdcSavedInfoFetcher cdcSavedInfoFetcher) {
if (trackSchemaHistory) {
FilteredFileDatabaseHistory.setDatabaseName(config.get("database").asText());
return Optional.of(AirbyteSchemaHistoryStorage.initializeDBHistory(cdcSavedInfoFetcher.getSavedSchemaHistory()));
}

return Optional.empty();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.debezium;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

/**
* This interface is used to add metadata to the records fetched from the database. For instance, in
* Postgres we add the lsn to the records. In MySql we add the file name and position to the
* records.
*/
public interface CdcMetadataInjector {

/**
* A debezium record contains multiple pieces. Ref :
* https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-create-events
*
* @param event is the actual record which contains data and would be written to the destination
* @param source contains the metadata about the record and we need to extract that metadata and add
* it to the event before writing it to destination
*/
void addMetaData(ObjectNode event, JsonNode source);

/**
* As part of Airbyte record we need to add the namespace (schema name)
*
* @param source part of debezium record and contains the metadata about the record. We need to
* extract namespace out of this metadata and return Ref :
* https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-create-events
*/
String namespace(JsonNode source);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.debezium;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.Optional;

/**
* This interface is used to fetch the saved info required for debezium to run incrementally. Each
* connector saves offset and schema history in different manner
*/
public interface CdcSavedInfoFetcher {

JsonNode getSavedOffset();

Optional<JsonNode> getSavedSchemaHistory();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.debezium;

import io.airbyte.protocol.models.AirbyteMessage;
import java.util.Map;

/**
* This interface is used to allow connectors to save the offset and schema history in the manner
* which suits them
*/
@FunctionalInterface
public interface CdcStateHandler {

AirbyteMessage saveState(Map<String, String> offset, String dbHistory);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.debezium;

import com.fasterxml.jackson.databind.JsonNode;

/**
* This interface is used to define the target position at the beginning of the sync so that once we
* reach the desired target, we can shutdown the sync. This is needed because it might happen that
* while we are syncing the data, new changes are being made in the source database and as a result
* we might end up syncing forever. In order to tackle that, we need to define a point to end at the
* beginning of the sync
*/
public interface CdcTargetPosition {

boolean reachedTargetPosition(JsonNode valueAsJson);

}
Loading

0 comments on commit 6bf9028

Please sign in to comment.