Skip to content

upgrade debezium version to 1.9.6 #17459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 1.0.0
dockerImageTag: 1.0.1
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down Expand Up @@ -836,7 +836,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.12
dockerImageTag: 1.0.13
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6965,7 +6965,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:1.0.0"
- dockerImage: "airbyte/source-mysql:1.0.1"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
Expand Down Expand Up @@ -8578,7 +8578,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.12"
- dockerImage: "airbyte/source-postgres:1.0.13"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ dependencies {
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-db:db-lib')

implementation 'io.debezium:debezium-api:1.9.2.Final'
implementation 'io.debezium:debezium-embedded:1.9.2.Final'
implementation 'io.debezium:debezium-api:1.9.6.Final'
implementation 'io.debezium:debezium-embedded:1.9.6.Final'
// implementation 'io.debezium:debezium-connector-sqlserver:1.9.2.Final'
implementation 'io.debezium:debezium-connector-mysql:1.9.2.Final'
implementation 'io.debezium:debezium-connector-postgres:1.9.2.Final'
implementation 'io.debezium:debezium-connector-mysql:1.9.6.Final'
implementation 'io.debezium:debezium-connector-postgres:1.9.6.Final'
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'

testFixturesImplementation project(':airbyte-db:db-lib')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ ENV APPLICATION source-mysql-strict-encrypt
COPY --from=build /airbyte /airbyte


LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=1.0.1

LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=1.0.1

LABEL io.airbyte.name=airbyte/source-mysql
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ application {
dependencies {
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:bases:debezium-v1-9-2')
implementation project(':airbyte-integrations:bases:debezium-v1-9-6')
implementation project(':airbyte-integrations:connectors:source-jdbc')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-integrations:connectors:source-relational-db')

implementation 'mysql:mysql-connector-java:8.0.22'
implementation 'org.apache.commons:commons-lang3:3.11'

testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-2'))
testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-6'))
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.12
LABEL io.airbyte.version=1.0.13
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.12
LABEL io.airbyte.version=1.0.13
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ application {
dependencies {
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:bases:debezium-v1-9-2')
implementation project(':airbyte-integrations:bases:debezium-v1-9-6')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-integrations:connectors:source-jdbc')
implementation project(':airbyte-integrations:connectors:source-relational-db')

implementation 'org.apache.commons:commons-lang3:3.11'
implementation libs.postgresql

testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-2'))
testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-6'))
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
testImplementation project(":airbyte-json-validation")
testImplementation project(':airbyte-test-utils')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -21,7 +20,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
Expand All @@ -45,23 +43,16 @@
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -278,72 +269,6 @@ public String createSchemaQuery(final String schemaName) {
return "CREATE SCHEMA " + schemaName + ";";
}

@Override
@Test
public void testRecordsProducedDuringAndAfterSync() throws Exception {

final int recordsToCreate = 20;
// first batch of records. 20 created here and 6 created in setup method.
for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated));
writeModelRecord(record);
}

final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = getSource()
.read(getConfig(), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> dataFromFirstBatch = AutoCloseableIterators
.toListAndClose(firstBatchIterator);
final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch);
assertEquals(1, stateAfterFirstBatch.size());
assertNotNull(stateAfterFirstBatch.get(0).getData());
assertExpectedStateMessages(stateAfterFirstBatch);
final Set<AirbyteRecordMessage> recordsFromFirstBatch = extractRecordMessages(
dataFromFirstBatch);
assertEquals((MODEL_RECORDS.size() + recordsToCreate), recordsFromFirstBatch.size());

// second batch of records again 20 being created
for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 200 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated));
writeModelRecord(record);
}

final JsonNode state = Jsons.jsonNode(stateAfterFirstBatch);
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = getSource()
.read(getConfig(), CONFIGURED_CATALOG, state);
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
.toListAndClose(secondBatchIterator);

final List<AirbyteStateMessage> stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch);
assertEquals(1, stateAfterSecondBatch.size());
assertNotNull(stateAfterSecondBatch.get(0).getData());
assertExpectedStateMessages(stateAfterSecondBatch);

final Set<AirbyteRecordMessage> recordsFromSecondBatch = extractRecordMessages(
dataFromSecondBatch);
assertEquals(recordsToCreate * 2, recordsFromSecondBatch.size(),
"Expected 40 records to be replicated in the second sync.");

// sometimes there can be more than one of these at the end of the snapshot and just before the
// first incremental.
final Set<AirbyteRecordMessage> recordsFromFirstBatchWithoutDuplicates = removeDuplicates(
recordsFromFirstBatch);
final Set<AirbyteRecordMessage> recordsFromSecondBatchWithoutDuplicates = removeDuplicates(
recordsFromSecondBatch);

final int recordsCreatedBeforeTestCount = MODEL_RECORDS.size();
assertTrue(recordsCreatedBeforeTestCount < recordsFromFirstBatchWithoutDuplicates.size(),
"Expected first sync to include records created while the test was running.");
assertEquals((recordsToCreate * 3) + recordsCreatedBeforeTestCount,
recordsFromFirstBatchWithoutDuplicates.size() + recordsFromSecondBatchWithoutDuplicates
.size());
}

@Override
protected String randomTableSchema() {
return MODELS_SCHEMA + "_random";
Expand Down
Loading