Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set source defined cursor field for cdc #2878

Merged
merged 3 commits into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.3.0",
"dockerImageTag": "0.3.1",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres",
"icon": "postgresql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.3.0
dockerImageTag: 0.3.1
documentationUrl: https://hub.docker.com/r/airbyte/source-postgres
icon: postgresql.svg
- sourceDefinitionId: cd42861b-01fc-4658-a8ab-5d11d0510f01
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.version=0.3.1
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public AirbyteCatalog discover(JsonNode config) throws Exception {
if (isCdc(config)) {
final List<AirbyteStream> streams = catalog.getStreams().stream()
.map(PostgresSource::removeIncrementalWithoutPk)
.map(PostgresSource::setIncrementalToSourceDefined)
.map(PostgresSource::addCdcMetadataColumns)
.collect(toList());

Expand Down Expand Up @@ -283,6 +284,8 @@ static boolean isCdc(JsonNode config) {
* information (like an oid) when using logical replication. By limiting to Full Refresh when we
* don't have a primary key we dodge the problem for now. As a work around a CDC and non-CDC source
* could be configured if there's a need to replicate a large non-PK table.
*
* Note: in place mutation.
*/
private static AirbyteStream removeIncrementalWithoutPk(AirbyteStream stream) {
if (stream.getSourceDefinedPrimaryKey().isEmpty()) {
Expand All @@ -292,6 +295,21 @@ private static AirbyteStream removeIncrementalWithoutPk(AirbyteStream stream) {
return stream;
}

/*
* Set all streams that do have incremental to sourceDefined, so that the user cannot set or
* override a cursor field.
*
* Note: in place mutation.
*/
private static AirbyteStream setIncrementalToSourceDefined(AirbyteStream stream) {
if (stream.getSupportedSyncModes().contains(SyncMode.INCREMENTAL)) {
stream.setSourceDefinedCursor(true);
}

return stream;
}

// Note: in place mutation.
private static AirbyteStream addCdcMetadataColumns(AirbyteStream stream) {
ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema();
ObjectNode properties = (ObjectNode) jsonSchema.get("properties");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,6 @@ listen_addresses = '*'
# CUSTOMIZED OPTIONS
#------------------------------------------------------------------------------
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
max_wal_senders = 30
max_replication_slots = 30

Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
Expand All @@ -61,6 +62,7 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -480,6 +482,40 @@ void testReadWithoutReplicationSlot() throws SQLException {
});
}

@Test
void testDiscover() throws Exception {
final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG);

// stream with PK
expectedCatalog.getStreams().get(0).setSourceDefinedCursor(true);
addCdcMetadataColumns(expectedCatalog.getStreams().get(0));

// stream with no PK.
expectedCatalog.getStreams().get(1).setSourceDefinedPrimaryKey(Collections.emptyList());
expectedCatalog.getStreams().get(1).setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH));
addCdcMetadataColumns(expectedCatalog.getStreams().get(1));

database.query(ctx -> ctx.execute(String.format("ALTER TABLE %s.%s DROP CONSTRAINT models_pkey", MODELS_SCHEMA, MODELS_STREAM_NAME)));

final AirbyteCatalog actualCatalog = source.discover(getConfig(PSQL_DB, dbName));

assertEquals(
expectedCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName)).collect(Collectors.toList()),
actualCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName)).collect(Collectors.toList()));
}

private static AirbyteStream addCdcMetadataColumns(AirbyteStream stream) {
ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema();
ObjectNode properties = (ObjectNode) jsonSchema.get("properties");

final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number"));
properties.set(AbstractJdbcSource.CDC_LSN, numberType);
properties.set(AbstractJdbcSource.CDC_UPDATED_AT, numberType);
properties.set(AbstractJdbcSource.CDC_DELETED_AT, numberType);

return stream;
}

private void writeMakeRecord(DSLContext ctx, JsonNode recordJson) {
ctx.execute(String.format("INSERT INTO %s.%s (%s, %s) VALUES (%s, '%s');", MAKES_SCHEMA, MAKES_STREAM_NAME, COL_ID, COL_MAKE,
recordJson.get(COL_ID).asInt(), recordJson.get(COL_MAKE).asText()));
Expand Down