From 9e94126fd6e1bd19edd21945536ca5a532becc3c Mon Sep 17 00:00:00 2001 From: VitaliiMaltsev <39538064+VitaliiMaltsev@users.noreply.github.com> Date: Wed, 15 Mar 2023 21:27:31 +0200 Subject: [PATCH] MySQL/MsSQL sources fixed NPE during cursor values validation (#24082) * MySQL/MsSQL sources fixed NPE during cursor values validation * added logging * bump versions * auto-bump connector version * manually update mssql definitions * Automated Change * Automated Commit - Formatting Changes --------- Co-authored-by: Octavia Squidington III Co-authored-by: VitaliiMaltsev --- .../resources/seed/source_definitions.yaml | 4 +- .../src/main/resources/seed/source_specs.yaml | 4 +- .../source-mssql-strict-encrypt/Dockerfile | 2 +- .../connectors/source-mssql/Dockerfile | 2 +- .../source/mssql/MssqlSource.java | 10 ++- .../source-mysql-strict-encrypt/Dockerfile | 2 +- .../connectors/source-mysql/Dockerfile | 2 +- .../source/mysql/MySqlSource.java | 62 ++++++++++--------- connectors.md | 4 +- docs/integrations/sources/mssql.md | 1 + docs/integrations/sources/mysql.md | 1 + 11 files changed, 52 insertions(+), 42 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index d0e3f2d16821..0d5ada5f63f8 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1173,7 +1173,7 @@ - name: Microsoft SQL Server (MSSQL) sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 dockerRepository: airbyte/source-mssql - dockerImageTag: 1.0.2 + dockerImageTag: 1.0.3 documentationUrl: https://docs.airbyte.com/integrations/sources/mssql icon: mssql.svg sourceType: database @@ -1236,7 +1236,7 @@ - name: MySQL sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad dockerRepository: airbyte/source-mysql - dockerImageTag: 2.0.2 + dockerImageTag: 2.0.3 documentationUrl: https://docs.airbyte.com/integrations/sources/mysql icon: mysql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 25dd1da3c945..83378918905b 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -8411,7 +8411,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mssql:1.0.2" +- dockerImage: "airbyte/source-mssql:1.0.3" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/mssql" connectionSpecification: @@ -9286,7 +9286,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mysql:2.0.2" +- dockerImage: "airbyte/source-mysql:2.0.3" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/mysql" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile index 3dd7fbd3fde1..c9bf2cf9df6a 100644 --- a/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.2 +LABEL io.airbyte.version=1.0.3 LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mssql/Dockerfile b/airbyte-integrations/connectors/source-mssql/Dockerfile index d559518a7bf6..448a64f19da4 100644 --- a/airbyte-integrations/connectors/source-mssql/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.2 +LABEL io.airbyte.version=1.0.3 LABEL io.airbyte.name=airbyte/source-mssql diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index a853e8afd4b9..15c17d3008ae 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -260,6 +260,9 @@ protected boolean verifyCursorColumnValues(final JdbcDatabase database, final St .executeQuery(descQuery), resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)) .stream() + .peek(x -> LOGGER.info("MsSQL Table Structure {}, {}, {}", x.toString(), schema, tableName)) + .filter(x -> x.get("TABLE_OWNER") != null) + .filter(x -> x.get("COLUMN_NAME") != null) .filter(x -> x.get("TABLE_OWNER").asText().equals(schema)) .filter(x -> x.get("COLUMN_NAME").asText().equalsIgnoreCase(columnName)) .findFirst(); @@ -276,12 +279,13 @@ protected boolean verifyCursorColumnValues(final JdbcDatabase database, final St resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)); Preconditions.checkState(jsonNodes.size() == 1); nullValExist = jsonNodes.get(0).get(resultColName).booleanValue(); - LOGGER.info("null cursor value for MsSQL source : {}, shema {} , tableName {}, columnName {} ", nullValExist, schema, tableName, columnName); + LOGGER.info("null cursor value for MsSQL source : {}, shema {} , tableName {}, columnName {} ", nullValExist, schema, tableName, + columnName); } } } -// return !nullValExist; -// will enable after we have sent comms to users this affects + // return !nullValExist; + // will enable after we have sent comms to users this affects return true; } diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile index 090687d43815..defe4f7684d4 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=2.0.2 +LABEL io.airbyte.version=2.0.3 LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index cfd4b0e00747..0f58b6256f27 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=2.0.2 +LABEL io.airbyte.version=2.0.3 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 6bf44f689c75..02a10a846e3e 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -71,21 +71,21 @@ public class MySqlSource extends AbstractJdbcSource implements Source private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSource.class); private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000; public static final String NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY = - """ - SELECT (EXISTS (SELECT * from `%s`.`%s` where `%s` IS NULL LIMIT 1)) AS %s - """; + """ + SELECT (EXISTS (SELECT * from `%s`.`%s` where `%s` IS NULL LIMIT 1)) AS %s + """; public static final String NULL_CURSOR_VALUE_WITHOUT_SCHEMA_QUERY = - """ - SELECT (EXISTS (SELECT * from %s where `%s` IS NULL LIMIT 1)) AS %s - """; + """ + SELECT (EXISTS (SELECT * from %s where `%s` IS NULL LIMIT 1)) AS %s + """; public static final String DESCRIBE_TABLE_WITHOUT_SCHEMA_QUERY = - """ - DESCRIBE %s - """; + """ + DESCRIBE %s + """; public static final String DESCRIBE_TABLE_WITH_SCHEMA_QUERY = - """ - DESCRIBE `%s`.`%s` - """; + """ + DESCRIBE `%s`.`%s` + """; public static final String DRIVER_CLASS = DatabaseDriver.MYSQL.getDriverClassName(); public static final String MYSQL_CDC_OFFSET = "mysql_cdc_offset"; @@ -333,39 +333,43 @@ public Set getExcludedInternalNameSpaces() { @Override protected boolean verifyCursorColumnValues(final JdbcDatabase database, final String schema, final String tableName, final String columnName) - throws SQLException { + throws SQLException { boolean nullValExist = false; final String resultColName = "nullValue"; final String descQuery = schema == null || schema.isBlank() - ? String.format(DESCRIBE_TABLE_WITHOUT_SCHEMA_QUERY, tableName) - : String.format(DESCRIBE_TABLE_WITH_SCHEMA_QUERY, schema, tableName); + ? String.format(DESCRIBE_TABLE_WITHOUT_SCHEMA_QUERY, tableName) + : String.format(DESCRIBE_TABLE_WITH_SCHEMA_QUERY, schema, tableName); final Optional field = database.bufferedResultSetQuery(conn -> conn.createStatement() - .executeQuery(descQuery), - resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)) - .stream().filter(x -> x.get("Field").asText().equalsIgnoreCase(columnName)) - .findFirst(); - if(field.isPresent()){ + .executeQuery(descQuery), + resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)) + .stream() + .peek(x -> LOGGER.info("MySQL Table Structure {}, {}, {}", x.toString(), schema, tableName)) + .filter(x -> x.get("Field") != null) + .filter(x -> x.get("Field").asText().equalsIgnoreCase(columnName)) + .findFirst(); + if (field.isPresent()) { final JsonNode jsonNode = field.get(); final JsonNode isNullable = jsonNode.get("Null"); - if (isNullable!=null){ - if (isNullable.asText().equalsIgnoreCase("YES")){ + if (isNullable != null) { + if (isNullable.asText().equalsIgnoreCase("YES")) { final String query = schema == null || schema.isBlank() - ? String.format(NULL_CURSOR_VALUE_WITHOUT_SCHEMA_QUERY, + ? String.format(NULL_CURSOR_VALUE_WITHOUT_SCHEMA_QUERY, tableName, columnName, resultColName) - : String.format(NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY, - schema, tableName, columnName, resultColName) ; + : String.format(NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY, + schema, tableName, columnName, resultColName); LOGGER.debug("null value query: {}", query); final List jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(query), - resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)); + resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)); Preconditions.checkState(jsonNodes.size() == 1); nullValExist = convertToBoolean(jsonNodes.get(0).get(resultColName).toString()); - LOGGER.info("null cursor value for MySQL source : {}, shema {} , tableName {}, columnName {} ", nullValExist, schema, tableName, columnName); + LOGGER.info("null cursor value for MySQL source : {}, shema {} , tableName {}, columnName {} ", nullValExist, schema, tableName, + columnName); } } } -// return !nullValExist; -// will enable after we have sent comms to users this affects + // return !nullValExist; + // will enable after we have sent comms to users this affects return true; } diff --git a/connectors.md b/connectors.md index a5b5daee7444..487decb4536c 100644 --- a/connectors.md +++ b/connectors.md @@ -137,13 +137,13 @@ | **Marketo** | Marketo icon | Source | airbyte/source-marketo:1.0.2 | generally_available | [link](https://docs.airbyte.com/integrations/sources/marketo) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-marketo) | `9e0556f4-69df-4522-a3fb-03264d36b348` | | **Metabase** | Metabase icon | Source | airbyte/source-metabase:0.3.1 | beta | [link](https://docs.airbyte.com/integrations/sources/metabase) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-metabase) | `c7cb421b-942e-4468-99ee-e369bcabaec5` | | **Microsoft Dataverse** | Microsoft Dataverse icon | Source | airbyte/source-microsoft-dataverse:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/microsoft-dataverse) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-microsoft-dataverse) | `9220e3de-3b60-4bb2-a46f-046d59ea235a` | -| **Microsoft SQL Server (MSSQL)** | Microsoft SQL Server (MSSQL) icon | Source | airbyte/source-mssql:1.0.2 | alpha | [link](https://docs.airbyte.com/integrations/sources/mssql) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mssql) | `b5ea17b1-f170-46dc-bc31-cc744ca984c1` | +| **Microsoft SQL Server (MSSQL)** | Microsoft SQL Server (MSSQL) icon | Source | airbyte/source-mssql:1.0.3 | alpha | [link](https://docs.airbyte.com/integrations/sources/mssql) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mssql) | `b5ea17b1-f170-46dc-bc31-cc744ca984c1` | | **Microsoft teams** | Microsoft teams icon | Source | airbyte/source-microsoft-teams:0.2.5 | alpha | [link](https://docs.airbyte.com/integrations/sources/microsoft-teams) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-microsoft-teams) | `eaf50f04-21dd-4620-913b-2a83f5635227` | | **Mixpanel** | Mixpanel icon | Source | airbyte/source-mixpanel:0.1.30 | generally_available | [link](https://docs.airbyte.com/integrations/sources/mixpanel) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mixpanel) | `12928b32-bf0a-4f1e-964f-07e12e37153a` | | **Monday** | Monday icon | Source | airbyte/source-monday:0.2.3 | beta | [link](https://docs.airbyte.com/integrations/sources/monday) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-monday) | `80a54ea2-9959-4040-aac1-eee42423ec9b` | | **MongoDb** | MongoDb icon | Source | airbyte/source-mongodb-v2:0.1.19 | alpha | [link](https://docs.airbyte.com/integrations/sources/mongodb-v2) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mongodb-v2) | `b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e` | | **My Hours** | My Hours icon | Source | airbyte/source-my-hours:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/sources/my-hours) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-my-hours) | `722ba4bf-06ec-45a4-8dd5-72e4a5cf3903` | -| **MySQL** | MySQL icon | Source | airbyte/source-mysql:2.0.2 | beta | [link](https://docs.airbyte.com/integrations/sources/mysql) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mysql) | `435bb9a5-7887-4809-aa58-28c27df0d7ad` | +| **MySQL** | MySQL icon | Source | airbyte/source-mysql:2.0.3 | beta | [link](https://docs.airbyte.com/integrations/sources/mysql) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mysql) | `435bb9a5-7887-4809-aa58-28c27df0d7ad` | | **NASA** | NASA icon | Source | airbyte/source-nasa:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/sources/nasa) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-nasa) | `1a8667d7-7978-43cd-ba4d-d32cbd478971` | | **Netsuite** | Netsuite icon | Source | airbyte/source-netsuite:0.1.3 | alpha | [link](https://docs.airbyte.com/integrations/sources/netsuite) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-netsuite) | `4f2f093d-ce44-4121-8118-9d13b7bfccd0` | | **New York Times** | New York Times icon | Source | airbyte/source-nytimes:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/sources/nytimes) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-nytimes) | `0fae6a9a-04eb-44d4-96e1-e02d3dbc1d83` | diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index bf6446f9de1a..2c06925a1313 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -341,6 +341,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.0.3 | 2023-03-15 | [24082](https://github.com/airbytehq/airbyte/pull/24082) | Fixed NPE during cursor values validation | | 1.0.2 | 2023-03-14 | [23908](https://github.com/airbytehq/airbyte/pull/23908) | Log warning on null cursor values | | 1.0.1 | 2023-03-10 | [23939](https://github.com/airbytehq/airbyte/pull/23939) | For network isolation, source connector accepts a list of hosts it is allowed to connect | | 1.0.0 | 2023-03-06 | [23112](https://github.com/airbytehq/airbyte/pull/23112) | Upgrade Debezium version to 2.1.2 | diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 1c838fd1d9e5..ba992e958629 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -256,6 +256,7 @@ WHERE actor_definition_id ='435bb9a5-7887-4809-aa58-28c27df0d7ad' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.0.3 | 2023-03-15 | [24082](https://github.com/airbytehq/airbyte/pull/24082) | Fixed NPE during cursor values validation | | 2.0.2 | 2023-03-14 | [23908](https://github.com/airbytehq/airbyte/pull/23908) | Log warning on null cursor values | | 2.0.1 | 2023-03-10 | [23939](https://github.com/airbytehq/airbyte/pull/23939) | For network isolation, source connector accepts a list of hosts it is allowed to connect | | 2.0.0 | 2023-03-06 | [23112](https://github.com/airbytehq/airbyte/pull/23112) | Upgrade Debezium version to 2.1.2 |