Skip to content

Commit

Permalink
Source MSSQL: Added event_serial_no to cdc metadata (#16798)
Browse files Browse the repository at this point in the history
* Added event_serial_no as cdc metadata for sql server source connector

* fix SourceAcceptanceTest

* upgrade version

* auto-bump connector version

---------

Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: Subodh Kant Chaturvedi <subodh1810@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
4 people authored Feb 24, 2023
1 parent e174647 commit 5bf63c6
Show file tree
Hide file tree
Showing 10 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,7 @@
- name: Microsoft SQL Server (MSSQL)
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.4.28
dockerImageTag: 0.4.29
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
icon: mssql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8234,7 +8234,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mssql:0.4.28"
- dockerImage: "airbyte/source-mssql:0.4.29"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/mssql"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public abstract class SourceAcceptanceTest extends AbstractSourceConnectorTest {
public static final String CDC_DELETED_AT = "_ab_cdc_deleted_at";
public static final String CDC_LOG_FILE = "_ab_cdc_log_file";
public static final String CDC_LOG_POS = "_ab_cdc_log_pos";
public static final String CDC_EVENT_SERIAL_NO = "_ab_cdc_event_serial_no";

private static final Logger LOGGER = LoggerFactory.getLogger(SourceAcceptanceTest.class);

Expand Down Expand Up @@ -357,6 +358,7 @@ private AirbyteRecordMessage pruneCdcMetadata(final AirbyteRecordMessage m) {
((ObjectNode) clone.getData()).remove(CDC_LOG_POS);
((ObjectNode) clone.getData()).remove(CDC_UPDATED_AT);
((ObjectNode) clone.getData()).remove(CDC_DELETED_AT);
((ObjectNode) clone.getData()).remove(CDC_EVENT_SERIAL_NO);
return clone;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.28
LABEL io.airbyte.version=0.4.29
LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.28
LABEL io.airbyte.version=0.4.29
LABEL io.airbyte.name=airbyte/source-mssql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.source.mssql;

import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_LSN;
import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_EVENT_SERIAL_NO;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -15,7 +16,9 @@ public class MssqlCdcConnectorMetadataInjector implements CdcMetadataInjector {
@Override
public void addMetaData(final ObjectNode event, final JsonNode source) {
final String commitLsn = source.get("commit_lsn").asText();
final String eventSerialNo = source.get("event_serial_no").asText();
event.put(CDC_LSN, commitLsn);
event.put(CDC_EVENT_SERIAL_NO, eventSerialNo);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class MssqlSource extends AbstractJdbcSource<JDBCType> implements Source
public static final String MSSQL_CDC_OFFSET = "mssql_cdc_offset";
public static final String MSSQL_DB_HISTORY = "mssql_db_history";
public static final String CDC_LSN = "_ab_cdc_lsn";
public static final String CDC_EVENT_SERIAL_NO = "_ab_cdc_event_serial_no";
private static final String HIERARCHYID = "hierarchyid";
private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000;
private List<String> schemas;
Expand Down Expand Up @@ -426,6 +427,7 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
properties.set(CDC_LSN, stringType);
properties.set(CDC_UPDATED_AT, stringType);
properties.set(CDC_DELETED_AT, stringType);
properties.set(CDC_EVENT_SERIAL_NO, stringType);

return stream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_LSN;
import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_EVENT_SERIAL_NO;
import static io.airbyte.integrations.source.mssql.MssqlSource.DRIVER_CLASS;
import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_DB_HISTORY;
Expand Down Expand Up @@ -356,6 +357,7 @@ protected void removeCDCColumns(final ObjectNode data) {
data.remove(CDC_LSN);
data.remove(CDC_UPDATED_AT);
data.remove(CDC_DELETED_AT);
data.remove(CDC_EVENT_SERIAL_NO);
}

@Override
Expand Down Expand Up @@ -389,11 +391,13 @@ protected void assertNullCdcMetaData(final JsonNode data) {
assertNull(data.get(CDC_LSN));
assertNull(data.get(CDC_UPDATED_AT));
assertNull(data.get(CDC_DELETED_AT));
assertNull(data.get(CDC_EVENT_SERIAL_NO));
}

@Override
protected void assertCdcMetaData(final JsonNode data, final boolean deletedAtNull) {
assertNotNull(data.get(CDC_LSN));
assertNotNull(data.get(CDC_EVENT_SERIAL_NO));
assertNotNull(data.get(CDC_UPDATED_AT));
if (deletedAtNull) {
assertTrue(data.get(CDC_DELETED_AT).isNull());
Expand All @@ -411,6 +415,7 @@ protected void addCdcMetadataColumns(final AirbyteStream stream) {
properties.set(CDC_LSN, stringType);
properties.set(CDC_UPDATED_AT, stringType);
properties.set(CDC_DELETED_AT, stringType);
properties.set(CDC_EVENT_SERIAL_NO, stringType);

}

Expand Down
2 changes: 1 addition & 1 deletion connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
| **Marketo** | <img alt="Marketo icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/marketo.svg" height="30" height="30"/> | Source | airbyte/source-marketo:1.0.2 | generally_available | [link](https://docs.airbyte.com/integrations/sources/marketo) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-marketo) | <small>`9e0556f4-69df-4522-a3fb-03264d36b348`</small> |
| **Metabase** | <img alt="Metabase icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/metabase.svg" height="30" height="30"/> | Source | airbyte/source-metabase:0.3.1 | beta | [link](https://docs.airbyte.com/integrations/sources/metabase) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-metabase) | <small>`c7cb421b-942e-4468-99ee-e369bcabaec5`</small> |
| **Microsoft Dataverse** | <img alt="Microsoft Dataverse icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/microsoftdataverse.svg" height="30" height="30"/> | Source | airbyte/source-microsoft-dataverse:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/microsoft-dataverse) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-microsoft-dataverse) | <small>`9220e3de-3b60-4bb2-a46f-046d59ea235a`</small> |
| **Microsoft SQL Server (MSSQL)** | <img alt="Microsoft SQL Server (MSSQL) icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/mssql.svg" height="30" height="30"/> | Source | airbyte/source-mssql:0.4.28 | alpha | [link](https://docs.airbyte.com/integrations/sources/mssql) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mssql) | <small>`b5ea17b1-f170-46dc-bc31-cc744ca984c1`</small> |
| **Microsoft SQL Server (MSSQL)** | <img alt="Microsoft SQL Server (MSSQL) icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/mssql.svg" height="30" height="30"/> | Source | airbyte/source-mssql:0.4.29 | alpha | [link](https://docs.airbyte.com/integrations/sources/mssql) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mssql) | <small>`b5ea17b1-f170-46dc-bc31-cc744ca984c1`</small> |
| **Microsoft teams** | <img alt="Microsoft teams icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/microsoft-teams.svg" height="30" height="30"/> | Source | airbyte/source-microsoft-teams:0.2.5 | alpha | [link](https://docs.airbyte.com/integrations/sources/microsoft-teams) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-microsoft-teams) | <small>`eaf50f04-21dd-4620-913b-2a83f5635227`</small> |
| **Mixpanel** | <img alt="Mixpanel icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/mixpanel.svg" height="30" height="30"/> | Source | airbyte/source-mixpanel:0.1.30 | generally_available | [link](https://docs.airbyte.com/integrations/sources/mixpanel) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mixpanel) | <small>`12928b32-bf0a-4f1e-964f-07e12e37153a`</small> |
| **Monday** | <img alt="Monday icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/monday.svg" height="30" height="30"/> | Source | airbyte/source-monday:0.2.2 | beta | [link](https://docs.airbyte.com/integrations/sources/monday) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-monday) | <small>`80a54ea2-9959-4040-aac1-eee42423ec9b`</small> |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.4.29 | 2023-02-24 | [16798](https://github.com/airbytehq/airbyte/pull/16798) | Add event_serial_no to cdc metadata |
| 0.4.28 | 2023-01-18 | [21348](https://github.com/airbytehq/airbyte/pull/21348) | Fix error introduced in [18959](https://github.com/airbytehq/airbyte/pull/18959) in which option `initial_waiting_seconds` was removed |
| 0.4.27 | 2022-12-14 | [20436](https://github.com/airbytehq/airbyte/pull/20346) | Consolidate date/time values mapping for JDBC sources |
| 0.4.26 | 2022-12-12 | [18959](https://github.com/airbytehq/airbyte/pull/18959) | CDC : Don't timeout if snapshot is not complete. |
Expand Down

0 comments on commit 5bf63c6

Please sign in to comment.