From 5bf63c6b2f467e8724affc3f1e5e27943c0a8e8d Mon Sep 17 00:00:00 2001 From: plenti-jacob-roe <92408892+plenti-jacob-roe@users.noreply.github.com> Date: Sat, 25 Feb 2023 01:25:35 +1100 Subject: [PATCH] Source MSSQL: Added event_serial_no to cdc metadata (#16798) * Added event_serial_no as cdc metadata for sql server source connector * fix SourceAcceptanceTest * upgrade version * auto-bump connector version --------- Co-authored-by: Marcos Marx Co-authored-by: Subodh Kant Chaturvedi Co-authored-by: Octavia Squidington III --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- .../init/src/main/resources/seed/source_specs.yaml | 2 +- .../standardtest/source/SourceAcceptanceTest.java | 2 ++ .../connectors/source-mssql-strict-encrypt/Dockerfile | 2 +- airbyte-integrations/connectors/source-mssql/Dockerfile | 2 +- .../source/mssql/MssqlCdcConnectorMetadataInjector.java | 3 +++ .../io/airbyte/integrations/source/mssql/MssqlSource.java | 2 ++ .../integrations/source/mssql/CdcMssqlSourceTest.java | 5 +++++ connectors.md | 2 +- docs/integrations/sources/mssql.md | 1 + 10 files changed, 18 insertions(+), 5 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 9e3e617571c8..6067dbb7163c 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1082,7 +1082,7 @@ - name: Microsoft SQL Server (MSSQL) sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 dockerRepository: airbyte/source-mssql - dockerImageTag: 0.4.28 + dockerImageTag: 0.4.29 documentationUrl: https://docs.airbyte.com/integrations/sources/mssql icon: mssql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index e45eeaeff07a..d20ad2b750da 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -8234,7 +8234,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mssql:0.4.28" +- dockerImage: "airbyte/source-mssql:0.4.29" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/mssql" connectionSpecification: diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java index a83064eabcfb..a4bdc9fd5178 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java @@ -43,6 +43,7 @@ public abstract class SourceAcceptanceTest extends AbstractSourceConnectorTest { public static final String CDC_DELETED_AT = "_ab_cdc_deleted_at"; public static final String CDC_LOG_FILE = "_ab_cdc_log_file"; public static final String CDC_LOG_POS = "_ab_cdc_log_pos"; + public static final String CDC_EVENT_SERIAL_NO = "_ab_cdc_event_serial_no"; private static final Logger LOGGER = LoggerFactory.getLogger(SourceAcceptanceTest.class); @@ -357,6 +358,7 @@ private AirbyteRecordMessage pruneCdcMetadata(final AirbyteRecordMessage m) { ((ObjectNode) clone.getData()).remove(CDC_LOG_POS); ((ObjectNode) clone.getData()).remove(CDC_UPDATED_AT); ((ObjectNode) clone.getData()).remove(CDC_DELETED_AT); + ((ObjectNode) clone.getData()).remove(CDC_EVENT_SERIAL_NO); return clone; } diff --git a/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile index 97644ffc9f9d..4fbbf3ff091e 100644 --- a/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.28 +LABEL io.airbyte.version=0.4.29 LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mssql/Dockerfile b/airbyte-integrations/connectors/source-mssql/Dockerfile index 44663ce0fe9c..28c9bc0e1b8c 100644 --- a/airbyte-integrations/connectors/source-mssql/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.28 +LABEL io.airbyte.version=0.4.29 LABEL io.airbyte.name=airbyte/source-mssql diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcConnectorMetadataInjector.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcConnectorMetadataInjector.java index a3a0a52cc5be..f739978ac7b9 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcConnectorMetadataInjector.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcConnectorMetadataInjector.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.source.mssql; import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_LSN; +import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_EVENT_SERIAL_NO; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -15,7 +16,9 @@ public class MssqlCdcConnectorMetadataInjector implements CdcMetadataInjector { @Override public void addMetaData(final ObjectNode event, final JsonNode source) { final String commitLsn = source.get("commit_lsn").asText(); + final String eventSerialNo = source.get("event_serial_no").asText(); event.put(CDC_LSN, commitLsn); + event.put(CDC_EVENT_SERIAL_NO, eventSerialNo); } @Override diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index d7a784103b47..99adab9a61d5 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -67,6 +67,7 @@ public class MssqlSource extends AbstractJdbcSource implements Source public static final String MSSQL_CDC_OFFSET = "mssql_cdc_offset"; public static final String MSSQL_DB_HISTORY = "mssql_db_history"; public static final String CDC_LSN = "_ab_cdc_lsn"; + public static final String CDC_EVENT_SERIAL_NO = "_ab_cdc_event_serial_no"; private static final String HIERARCHYID = "hierarchyid"; private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000; private List schemas; @@ -426,6 +427,7 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) { properties.set(CDC_LSN, stringType); properties.set(CDC_UPDATED_AT, stringType); properties.set(CDC_DELETED_AT, stringType); + properties.set(CDC_EVENT_SERIAL_NO, stringType); return stream; } diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java index da4a72e97187..e33b4b6ff731 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java @@ -7,6 +7,7 @@ import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT; import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_LSN; +import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_EVENT_SERIAL_NO; import static io.airbyte.integrations.source.mssql.MssqlSource.DRIVER_CLASS; import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_CDC_OFFSET; import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_DB_HISTORY; @@ -356,6 +357,7 @@ protected void removeCDCColumns(final ObjectNode data) { data.remove(CDC_LSN); data.remove(CDC_UPDATED_AT); data.remove(CDC_DELETED_AT); + data.remove(CDC_EVENT_SERIAL_NO); } @Override @@ -389,11 +391,13 @@ protected void assertNullCdcMetaData(final JsonNode data) { assertNull(data.get(CDC_LSN)); assertNull(data.get(CDC_UPDATED_AT)); assertNull(data.get(CDC_DELETED_AT)); + assertNull(data.get(CDC_EVENT_SERIAL_NO)); } @Override protected void assertCdcMetaData(final JsonNode data, final boolean deletedAtNull) { assertNotNull(data.get(CDC_LSN)); + assertNotNull(data.get(CDC_EVENT_SERIAL_NO)); assertNotNull(data.get(CDC_UPDATED_AT)); if (deletedAtNull) { assertTrue(data.get(CDC_DELETED_AT).isNull()); @@ -411,6 +415,7 @@ protected void addCdcMetadataColumns(final AirbyteStream stream) { properties.set(CDC_LSN, stringType); properties.set(CDC_UPDATED_AT, stringType); properties.set(CDC_DELETED_AT, stringType); + properties.set(CDC_EVENT_SERIAL_NO, stringType); } diff --git a/connectors.md b/connectors.md index 87751f2db0fe..16a56f495f74 100644 --- a/connectors.md +++ b/connectors.md @@ -136,7 +136,7 @@ | **Marketo** | Marketo icon | Source | airbyte/source-marketo:1.0.2 | generally_available | [link](https://docs.airbyte.com/integrations/sources/marketo) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-marketo) | `9e0556f4-69df-4522-a3fb-03264d36b348` | | **Metabase** | Metabase icon | Source | airbyte/source-metabase:0.3.1 | beta | [link](https://docs.airbyte.com/integrations/sources/metabase) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-metabase) | `c7cb421b-942e-4468-99ee-e369bcabaec5` | | **Microsoft Dataverse** | Microsoft Dataverse icon | Source | airbyte/source-microsoft-dataverse:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/microsoft-dataverse) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-microsoft-dataverse) | `9220e3de-3b60-4bb2-a46f-046d59ea235a` | -| **Microsoft SQL Server (MSSQL)** | Microsoft SQL Server (MSSQL) icon | Source | airbyte/source-mssql:0.4.28 | alpha | [link](https://docs.airbyte.com/integrations/sources/mssql) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mssql) | `b5ea17b1-f170-46dc-bc31-cc744ca984c1` | +| **Microsoft SQL Server (MSSQL)** | Microsoft SQL Server (MSSQL) icon | Source | airbyte/source-mssql:0.4.29 | alpha | [link](https://docs.airbyte.com/integrations/sources/mssql) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mssql) | `b5ea17b1-f170-46dc-bc31-cc744ca984c1` | | **Microsoft teams** | Microsoft teams icon | Source | airbyte/source-microsoft-teams:0.2.5 | alpha | [link](https://docs.airbyte.com/integrations/sources/microsoft-teams) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-microsoft-teams) | `eaf50f04-21dd-4620-913b-2a83f5635227` | | **Mixpanel** | Mixpanel icon | Source | airbyte/source-mixpanel:0.1.30 | generally_available | [link](https://docs.airbyte.com/integrations/sources/mixpanel) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mixpanel) | `12928b32-bf0a-4f1e-964f-07e12e37153a` | | **Monday** | Monday icon | Source | airbyte/source-monday:0.2.2 | beta | [link](https://docs.airbyte.com/integrations/sources/monday) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-monday) | `80a54ea2-9959-4040-aac1-eee42423ec9b` | diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index eaadcae5f40b..afa2c0cf0d51 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -341,6 +341,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.4.29 | 2023-02-24 | [16798](https://github.com/airbytehq/airbyte/pull/16798) | Add event_serial_no to cdc metadata | | 0.4.28 | 2023-01-18 | [21348](https://github.com/airbytehq/airbyte/pull/21348) | Fix error introduced in [18959](https://github.com/airbytehq/airbyte/pull/18959) in which option `initial_waiting_seconds` was removed | | 0.4.27 | 2022-12-14 | [20436](https://github.com/airbytehq/airbyte/pull/20346) | Consolidate date/time values mapping for JDBC sources | | 0.4.26 | 2022-12-12 | [18959](https://github.com/airbytehq/airbyte/pull/18959) | CDC : Don't timeout if snapshot is not complete. |