Skip to content

Commit

Permalink
Mssql source: add schemas for discovery during set up (airbytehq#16002)
Browse files Browse the repository at this point in the history
* Mssql source: add schemas during discovery

* Source mssql: temp changes for testing ci

* Source mssql: update expected version for strict encrypt version

* Source mssql: update order in spec

* Source mssql: added filter by requested schemas

* Source mssql: bump versions

* Source mssql: format

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
2 people authored and robbinhan committed Sep 29, 2022
1 parent ff0a8f2 commit 67d6a6d
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@
- name: Microsoft SQL Server (MSSQL)
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.4.18
dockerImageTag: 0.4.19
documentationUrl: https://docs.airbyte.io/integrations/sources/mssql
icon: mssql.svg
sourceType: database
Expand Down
21 changes: 16 additions & 5 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5273,7 +5273,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mssql:0.4.18"
- dockerImage: "airbyte/source-mssql:0.4.19"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/mssql"
connectionSpecification:
Expand Down Expand Up @@ -5307,30 +5307,41 @@
examples:
- "master"
order: 2
schemas:
title: "Schemas"
description: "The list of schemas to sync from. Defaults to user. Case sensitive."
type: "array"
items:
type: "string"
minItems: 0
uniqueItems: true
default:
- "dbo"
order: 3
username:
description: "The username which is used to access the database."
title: "Username"
type: "string"
order: 3
order: 4
password:
description: "The password associated with the username."
title: "Password"
type: "string"
airbyte_secret: true
order: 4
order: 5
jdbc_url_params:
title: "JDBC URL Params"
description: "Additional properties to pass to the JDBC URL string when\
\ connecting to the database formatted as 'key=value' pairs separated\
\ by the symbol '&'. (example: key1=value1&key2=value2&key3=value3)."
type: "string"
order: 5
order: 6
ssl_method:
title: "SSL Method"
type: "object"
description: "The encryption method which is used when communicating with\
\ the database."
order: 6
order: 7
oneOf:
- title: "Unencrypted"
description: "Data transfer will not be encrypted."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.18
LABEL io.airbyte.version=0.4.19
LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,42 @@
"examples": ["master"],
"order": 2
},
"schemas": {
"title": "Schemas",
"description": "The list of schemas to sync from. Defaults to user. Case sensitive.",
"type": "array",
"items": {
"type": "string"
},
"minItems": 0,
"uniqueItems": true,
"default": ["dbo"],
"order": 3
},
"username": {
"description": "The username which is used to access the database.",
"title": "Username",
"type": "string",
"order": 3
"order": 4
},
"password": {
"description": "The password associated with the username.",
"title": "Password",
"type": "string",
"airbyte_secret": true,
"order": 4
"order": 5
},
"jdbc_url_params": {
"title": "JDBC URL Params",
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"type": "string",
"order": 5
"order": 6
},
"ssl_method": {
"title": "SSL Method",
"type": "object",
"description": "The encryption method which is used when communicating with the database.",
"order": 6,
"order": 7,
"oneOf": [
{
"title": "Encrypted (trust server certificate)",
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.18
LABEL io.airbyte.version=0.4.19
LABEL io.airbyte.name=airbyte/source-mssql
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class MssqlSource extends AbstractJdbcSource<JDBCType> implements Source
public static final String CDC_LSN = "_ab_cdc_lsn";
private static final String HIERARCHYID = "hierarchyid";
private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000;
private List<String> schemas;

public static Source sshWrappedSource() {
return new SshWrappedSource(new MssqlSource(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
Expand Down Expand Up @@ -184,6 +185,13 @@ public JsonNode toDatabaseConfig(final JsonNode mssqlConfig) {
mssqlConfig.get(JdbcUtils.PORT_KEY).asText(),
mssqlConfig.get(JdbcUtils.DATABASE_KEY).asText()));

if (mssqlConfig.has("schemas") && mssqlConfig.get("schemas").isArray()) {
schemas = new ArrayList<>();
for (final JsonNode schema : mssqlConfig.get("schemas")) {
schemas.add(schema.asText());
}
}

if (mssqlConfig.has("ssl_method")) {
readSsl(mssqlConfig, additionalParameters);
}
Expand Down Expand Up @@ -236,6 +244,31 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception {
return catalog;
}

@Override
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(JdbcDatabase database) throws Exception {
final List<TableInfo<CommonField<JDBCType>>> internals = super.discoverInternal(database);
if (schemas != null && !schemas.isEmpty()) {
// process explicitly filtered (from UI) schemas
List<TableInfo<CommonField<JDBCType>>> resultInternals = internals
.stream()
.filter(this::isTableInRequestedSchema)
.toList();
for (TableInfo<CommonField<JDBCType>> info : resultInternals) {
LOGGER.debug("Found table (schema: {}): {}", info.getNameSpace(), info.getName());
}
return resultInternals;
} else {
LOGGER.info("No schemas explicitly set on UI to process, so will process all of existing schemas in DB");
return internals;
}
}

private boolean isTableInRequestedSchema(TableInfo<CommonField<JDBCType>> tableInfo) {
return schemas
.stream()
.anyMatch(schema -> schema.equals(tableInfo.getNameSpace()));
}

@Override
public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,42 @@
"examples": ["master"],
"order": 2
},
"schemas": {
"title": "Schemas",
"description": "The list of schemas to sync from. Defaults to user. Case sensitive.",
"type": "array",
"items": {
"type": "string"
},
"minItems": 0,
"uniqueItems": true,
"default": ["dbo"],
"order": 3
},
"username": {
"description": "The username which is used to access the database.",
"title": "Username",
"type": "string",
"order": 3
"order": 4
},
"password": {
"description": "The password associated with the username.",
"title": "Password",
"type": "string",
"airbyte_secret": true,
"order": 4
"order": 5
},
"jdbc_url_params": {
"title": "JDBC URL Params",
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"type": "string",
"order": 5
"order": 6
},
"ssl_method": {
"title": "SSL Method",
"type": "object",
"description": "The encryption method which is used when communicating with the database.",
"order": 6,
"order": 7,
"oneOf": [
{
"title": "Unencrypted",
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ If you do not see a type in this list, assume that it is coerced into a string.

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :----------------------------------------------------- |:-------------------------------------------------------------------------------------------------------|
| 0.4.19 | 2022-09-05 | [16002](https://github.com/airbytehq/airbyte/pull/16002) | Added ability to specify schemas for discovery during setting connector up |
| 0.4.18 | 2022-09-03 | [14910](https://github.com/airbytehq/airbyte/pull/14910) | Standardize spec for CDC replication. Replace the `replication_method` enum with a config object with a `method` enum field. |
| 0.4.17 | 2022-09-01 | [16261](https://github.com/airbytehq/airbyte/pull/16261) | Emit state messages more frequently |
| 0.4.16 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field |
Expand Down

0 comments on commit 67d6a6d

Please sign in to comment.