diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/d4353156-9217-4cad-8dd7-c108fd4f74cf.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/d4353156-9217-4cad-8dd7-c108fd4f74cf.json new file mode 100644 index 000000000000..2b2c09083f45 --- /dev/null +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/d4353156-9217-4cad-8dd7-c108fd4f74cf.json @@ -0,0 +1,7 @@ +{ + "destinationDefinitionId": "d4353156-9217-4cad-8dd7-c108fd4f74cf", + "name": "MS SQL Server", + "dockerRepository": "airbyte/destination-mssql", + "dockerImageTag": "0.1.0", + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/mssql" +} diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index b829e5fda638..8e0200ddfaf2 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -40,3 +40,8 @@ dockerRepository: airbyte/destination-mysql dockerImageTag: 0.1.3 documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql +- destinationDefinitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf + name: MS SQL Server + dockerRepository: airbyte/destination-mssql + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.io/integrations/destinations/mssql diff --git a/airbyte-db/src/main/java/io/airbyte/db/Databases.java b/airbyte-db/src/main/java/io/airbyte/db/Databases.java index c0318d7d2b7a..b38f2c47f287 100644 --- a/airbyte-db/src/main/java/io/airbyte/db/Databases.java +++ b/airbyte-db/src/main/java/io/airbyte/db/Databases.java @@ -42,6 +42,10 @@ public static JdbcDatabase createRedshiftDatabase(String username, String passwo return createJdbcDatabase(username, password, jdbcConnectionString, "com.amazon.redshift.jdbc.Driver"); } + public static Database createSqlServerDatabase(String username, String password, String jdbcConnectionString) { + return createDatabase(username, password, jdbcConnectionString, "com.microsoft.sqlserver.jdbc.SQLServerDriver", SQLDialect.DEFAULT); + } + public static Database createDatabase(final String username, final String password, final String jdbcConnectionString, diff --git a/airbyte-integrations/connectors/destination-mssql/.dockerignore b/airbyte-integrations/connectors/destination-mssql/.dockerignore new file mode 100644 index 000000000000..65c7d0ad3e73 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/destination-mssql/Dockerfile b/airbyte-integrations/connectors/destination-mssql/Dockerfile new file mode 100644 index 000000000000..5da3719e0252 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql/Dockerfile @@ -0,0 +1,12 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte + +ENV APPLICATION destination-mssql + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-mssql diff --git a/airbyte-integrations/connectors/destination-mssql/build.gradle b/airbyte-integrations/connectors/destination-mssql/build.gradle new file mode 100644 index 000000000000..8aab66880d58 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql/build.gradle @@ -0,0 +1,26 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.destination.mssql.MSSQLDestination' +} + +dependencies { + implementation project(':airbyte-db') + implementation project(':airbyte-integrations:bases:base-java') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:connectors:destination-jdbc') + + implementation 'com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre14' + + testImplementation 'org.apache.commons:commons-lang3:3.11' + testImplementation "org.testcontainers:mssqlserver:1.15.3" + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs) +} diff --git a/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/MSSQLDestination.java b/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/MSSQLDestination.java new file mode 100644 index 000000000000..7a78753cbcac --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/MSSQLDestination.java @@ -0,0 +1,118 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.mssql; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MSSQLDestination extends AbstractJdbcDestination implements Destination { + + private static final Logger LOGGER = LoggerFactory.getLogger(MSSQLDestination.class); + + public static final String DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; + + public MSSQLDestination() { + super(DRIVER_CLASS, new MSSQLNameTransformer(), new SqlServerOperations()); + } + + @Override + public JsonNode toJdbcConfig(JsonNode config) { + final String schema = Optional.ofNullable(config.get("schema")).map(JsonNode::asText).orElse("public"); + + List additionalParameters = new ArrayList<>(); + + final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:sqlserver://%s:%s;databaseName=%s;", + config.get("host").asText(), + config.get("port").asText(), + config.get("database").asText())); + + if (config.has("ssl_method")) { + readSsl(config, additionalParameters); + } + + if (!additionalParameters.isEmpty()) { + jdbcUrl.append(String.join(";", additionalParameters)); + } + + final ImmutableMap.Builder configBuilder = ImmutableMap.builder() + .put("jdbc_url", jdbcUrl.toString()) + .put("username", config.get("username").asText()) + .put("password", config.get("password").asText()) + .put("schema", schema); + + return Jsons.jsonNode(configBuilder.build()); + } + + private void readSsl(JsonNode config, List additionalParameters) { + switch (config.get("ssl_method").asText()) { + case "unencrypted": + additionalParameters.add("encrypt=false"); + break; + case "encrypted_trust_server_certificate": + additionalParameters.add("encrypt=true"); + additionalParameters.add("trustServerCertificate=true"); + break; + case "encrypted_verify_certificate": + additionalParameters.add("encrypt=true"); + + // trust store location code found at https://stackoverflow.com/a/56570588 + String trustStoreLocation = Optional.ofNullable(System.getProperty("javax.net.ssl.trustStore")) + .orElseGet(() -> System.getProperty("java.home") + "/lib/security/cacerts"); + File trustStoreFile = new File(trustStoreLocation); + if (!trustStoreFile.exists()) { + throw new RuntimeException("Unable to locate the Java TrustStore: the system property javax.net.ssl.trustStore is undefined or " + + trustStoreLocation + " does not exist."); + } + String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword"); + + additionalParameters.add("trustStore=" + trustStoreLocation); + if (trustStorePassword != null && !trustStorePassword.isEmpty()) { + additionalParameters.add("trustStorePassword=" + config.get("trustStorePassword").asText()); + } + if (config.has("hostNameInCertificate")) { + additionalParameters.add("hostNameInCertificate=" + config.get("hostNameInCertificate").asText()); + } + break; + } + } + + public static void main(String[] args) throws Exception { + final Destination destination = new MSSQLDestination(); + LOGGER.info("starting destination: {}", MSSQLDestination.class); + new IntegrationRunner(destination).run(args); + LOGGER.info("completed destination: {}", MSSQLDestination.class); + } + +} diff --git a/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/MSSQLNameTransformer.java b/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/MSSQLNameTransformer.java new file mode 100644 index 000000000000..8a7d8ecc3f98 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/MSSQLNameTransformer.java @@ -0,0 +1,36 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.mssql; + +import io.airbyte.integrations.destination.ExtendedNameTransformer; + +public class MSSQLNameTransformer extends ExtendedNameTransformer { + + @Override + protected String applyDefaultCase(String input) { + return input.toUpperCase(); + } + +} diff --git a/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/SqlServerOperations.java b/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/SqlServerOperations.java new file mode 100644 index 000000000000..3c4dccc2e381 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/SqlServerOperations.java @@ -0,0 +1,106 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.mssql; + +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.SqlOperationsUtils; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class SqlServerOperations implements SqlOperations { + + @Override + public void createSchemaIfNotExists(JdbcDatabase database, String schemaName) throws Exception { + final String query = String.format("IF NOT EXISTS ( SELECT * FROM sys.schemas WHERE name = '%s') EXEC('CREATE SCHEMA [%s]')", + schemaName, + schemaName); + database.execute(query); + } + + @Override + public void createTableIfNotExists(JdbcDatabase database, String schemaName, String tableName) throws Exception { + database.execute(createTableQuery(schemaName, tableName)); + } + + @Override + public String createTableQuery(String schemaName, String tableName) { + return String.format( + "IF NOT EXISTS (SELECT * FROM sys.tables t JOIN sys.schemas s ON t.schema_id = s.schema_id " + + "WHERE s.name = '%s' AND t.name = '%s') " + + "CREATE TABLE %s.%s ( \n" + + "%s VARCHAR(64) PRIMARY KEY,\n" + + "%s VARCHAR(MAX),\n" + + "%s DATETIMEOFFSET(7) DEFAULT SYSDATETIMEOFFSET()\n" + + ");\n", + schemaName, tableName, schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, + JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + } + + @Override + public void dropTableIfExists(JdbcDatabase database, String schemaName, String tableName) throws Exception { + final String query = String.format( + "IF EXISTS (SELECT * FROM sys.tables t JOIN sys.schemas s ON t.schema_id = s.schema_id " + + "WHERE s.name = '%s' AND t.name = '%s') " + + "DROP TABLE %s.%s", + schemaName, tableName, schemaName, tableName); + database.execute(query); + } + + @Override + public String truncateTableQuery(String schemaName, String tableName) { + return String.format("TRUNCATE TABLE %s.%s\n", schemaName, tableName); + } + + @Override + public void insertRecords(JdbcDatabase database, Stream recordsStream, String schemaName, String tempTableName) + throws Exception { + final List records = recordsStream.collect(Collectors.toList()); + + final String insertQueryComponent = String.format( + "INSERT INTO %s.%s (%s, %s, %s) VALUES\n", + schemaName, + tempTableName, + JavaBaseConstants.COLUMN_NAME_AB_ID, + JavaBaseConstants.COLUMN_NAME_DATA, + JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + final String recordQueryComponent = "(?, ?, ?),\n"; + SqlOperationsUtils.insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, database, records); + } + + @Override + public String copyTableQuery(String schemaName, String sourceTableName, String destinationTableName) { + return String.format("INSERT INTO %s.%s SELECT * FROM %s.%s;\n", schemaName, destinationTableName, schemaName, sourceTableName); + } + + @Override + public void executeTransaction(JdbcDatabase database, String queries) throws Exception { + database.execute("BEGIN TRAN;\n" + queries + "COMMIT TRAN;"); + } + +} diff --git a/airbyte-integrations/connectors/destination-mssql/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-mssql/src/main/resources/spec.json new file mode 100644 index 000000000000..48284392671a --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql/src/main/resources/spec.json @@ -0,0 +1,110 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/postgres", + "supportsIncremental": true, + "supported_destination_sync_modes": ["overwrite", "append"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Postgres Destination Spec", + "type": "object", + "required": ["host", "port", "username", "database", "schema"], + "additionalProperties": false, + "properties": { + "host": { + "title": "Host", + "description": "Hostname of the database.", + "type": "string", + "order": 0 + }, + "port": { + "title": "Port", + "description": "Port of the database.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 5432, + "examples": ["5432"], + "order": 1 + }, + "database": { + "title": "DB Name", + "description": "Name of the database.", + "type": "string", + "order": 2 + }, + "schema": { + "title": "Default Schema", + "description": "The default schema tables are written to if the source does not specify a namespace. The usual value for this field is \"public\".", + "type": "string", + "examples": ["public"], + "default": "public", + "order": 3 + }, + "username": { + "title": "User", + "description": "Username to use to access the database.", + "type": "string", + "order": 4 + }, + "password": { + "title": "Password", + "description": "Password associated with the username.", + "type": "string", + "airbyte_secret": true, + "order": 5 + }, + "ssl_method": { + "title": "SSL Method", + "type": "object", + "description": "Encryption method to use when communicating with the database", + "order": 6, + "oneOf": [ + { + "title": "Unencrypted", + "additionalProperties": false, + "description": "Data transfer will not be encrypted.", + "required": ["ssl_method"], + "properties": { + "ssl_method": { + "type": "string", + "enum": ["unencrypted"], + "default": "unencrypted" + } + } + }, + { + "title": "Encrypted (trust server certificate)", + "additionalProperties": false, + "description": "Use the cert provided by the server without verification. (For testing purposes only!)", + "required": ["ssl_method"], + "properties": { + "ssl_method": { + "type": "string", + "enum": ["encrypted_trust_server_certificate"], + "default": "encrypted_trust_server_certificate" + } + } + }, + { + "title": "Encrypted (verify certificate)", + "additionalProperties": false, + "description": "Verify and use the cert provided by the server.", + "required": ["ssl_method", "trustStoreName", "trustStorePassword"], + "properties": { + "ssl_method": { + "type": "string", + "enum": ["encrypted_verify_certificate"], + "default": "encrypted_verify_certificate" + }, + "hostNameInCertificate": { + "title": "Host Name In Certificate", + "type": "string", + "description": "Specifies the host name of the server. The value of this property must match the subject property of the certificate.", + "order": 7 + } + } + } + ] + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLIntegrationTest.java b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLIntegrationTest.java new file mode 100644 index 000000000000..8c78b651d5b8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLIntegrationTest.java @@ -0,0 +1,190 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.mssql; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.standardtest.destination.TestDestination; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.lang3.RandomStringUtils; +import org.jooq.JSONFormat; +import org.jooq.JSONFormat.RecordFormat; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.MSSQLServerContainer; + +public class MSSQLIntegrationTest extends TestDestination { + + private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT); + + private static MSSQLServerContainer db; + private ExtendedNameTransformer namingResolver = new ExtendedNameTransformer(); + private JsonNode configWithoutDbName; + private JsonNode config; + + @Override + protected String getImageName() { + return "airbyte/destination-mssql:dev"; + } + + private JsonNode getConfig(MSSQLServerContainer db) { + return Jsons.jsonNode(ImmutableMap.builder() + .put("host", db.getHost()) + .put("port", db.getFirstMappedPort()) + .put("username", db.getUsername()) + .put("password", db.getPassword()) + .put("schema", "testSchema") + .build()); + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + protected JsonNode getFailCheckConfig() { + return Jsons.jsonNode(ImmutableMap.builder() + .put("host", db.getHost()) + .put("username", db.getUsername()) + .put("password", "wrong password") + .put("schema", "public") + .put("port", db.getFirstMappedPort()) + .put("ssl", false) + .build()); + } + + @Override + protected List retrieveRecords(TestDestinationEnv env, String streamName, String namespace) throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) + .stream() + .map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) + .collect(Collectors.toList()); + } + + @Override + protected boolean implementsBasicNormalization() { + return false; + } + + @Override + protected boolean implementsNamespaces() { + return true; + } + + @Override + protected List retrieveNormalizedRecords(TestDestinationEnv env, String streamName, String namespace) + throws Exception { + String tableName = namingResolver.getIdentifier(streamName); + return retrieveRecordsFromTable(tableName, namespace); + } + + @Override + protected List resolveIdentifier(String identifier) { + final List result = new ArrayList<>(); + final String resolved = namingResolver.getIdentifier(identifier); + result.add(identifier); + result.add(resolved); + if (!resolved.startsWith("\"")) { + result.add(resolved.toLowerCase()); + result.add(resolved.toUpperCase()); + } + return result; + } + + private List retrieveRecordsFromTable(String tableName, String schemaName) throws SQLException { + return Databases.createSqlServerDatabase(db.getUsername(), db.getPassword(), + db.getJdbcUrl()).query( + ctx -> { + ctx.fetch(String.format("USE %s;", config.get("database"))); + return ctx + .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) + .stream() + .map(r -> r.formatJSON(JSON_FORMAT)) + .map(Jsons::deserialize) + .collect(Collectors.toList()); + }); + } + + @BeforeAll + protected static void init() { + db = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest").acceptLicense(); + db.start(); + } + + private static Database getDatabase(JsonNode config) { + // todo (cgardens) - rework this abstraction so that we do not have to pass a null into the + // constructor. at least explicitly handle it, even if the impl doesn't change. + return Databases.createDatabase( + config.get("username").asText(), + config.get("password").asText(), + String.format("jdbc:sqlserver://%s:%s", + config.get("host").asText(), + config.get("port").asInt()), + "com.microsoft.sqlserver.jdbc.SQLServerDriver", + null); + } + + // how to interact with the mssql test container manaully. + // 1. exec into mssql container (not the test container container) + // 2. /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "A_Str0ng_Required_Password" + @Override + protected void setup(TestDestinationEnv testEnv) throws SQLException { + configWithoutDbName = getConfig(db); + final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + + final Database database = getDatabase(configWithoutDbName); + database.query(ctx -> { + ctx.fetch(String.format("CREATE DATABASE %s;", dbName)); + ctx.fetch(String.format("USE %s;", dbName)); + ctx.fetch("CREATE TABLE id_and_name(id INTEGER NOT NULL, name VARCHAR(200), born DATETIMEOFFSET(7));"); + ctx.fetch( + "INSERT INTO id_and_name (id, name, born) VALUES (1,'picard', '2124-03-04T01:01:01Z'), (2, 'crusher', '2124-03-04T01:01:01Z'), (3, 'vash', '2124-03-04T01:01:01Z');"); + return null; + }); + + config = Jsons.clone(configWithoutDbName); + ((ObjectNode) config).put("database", dbName); + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) {} + + @AfterAll + static void cleanUp() { + db.stop(); + db.close(); + } + +} diff --git a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLIntegrationTestSSL.java b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLIntegrationTestSSL.java new file mode 100644 index 000000000000..72f4d6b9fbe3 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLIntegrationTestSSL.java @@ -0,0 +1,199 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.mssql; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.standardtest.destination.TestDestination; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.lang3.RandomStringUtils; +import org.jooq.JSONFormat; +import org.jooq.JSONFormat.RecordFormat; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.MSSQLServerContainer; +import org.testcontainers.utility.DockerImageName; + +public class MSSQLIntegrationTestSSL extends TestDestination { + + private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT); + + private static MSSQLServerContainer db; + private ExtendedNameTransformer namingResolver = new ExtendedNameTransformer(); + private JsonNode configWithoutDbName; + private JsonNode config; + + @Override + protected String getImageName() { + return "airbyte/destination-mssql:dev"; + } + + private JsonNode getConfig(MSSQLServerContainer db) { + return Jsons.jsonNode(ImmutableMap.builder() + .put("host", db.getHost()) + .put("port", db.getFirstMappedPort()) + .put("username", db.getUsername()) + .put("password", db.getPassword()) + .put("schema", "testSchema") + .put("ssl_method", "encrypted_trust_server_certificate") + .build()); + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + protected JsonNode getFailCheckConfig() { + return Jsons.jsonNode(ImmutableMap.builder() + .put("host", db.getHost()) + .put("username", db.getUsername()) + .put("password", "wrong password") + .put("schema", "public") + .put("port", db.getFirstMappedPort()) + .put("ssl", false) + .build()); + } + + @Override + protected List retrieveRecords(TestDestinationEnv env, String streamName, String namespace) throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) + .stream() + .map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) + .collect(Collectors.toList()); + } + + @Override + protected boolean implementsBasicNormalization() { + return false; + } + + @Override + protected boolean implementsNamespaces() { + return true; + } + + @Override + protected List retrieveNormalizedRecords(TestDestinationEnv env, String streamName, String namespace) + throws Exception { + String tableName = namingResolver.getIdentifier(streamName); + // Temporarily disabling the behavior of the ExtendedNameTransformer, see (issue #1785) so we don't + // use quoted names + // if (!tableName.startsWith("\"")) { + // // Currently, Normalization always quote tables identifiers + // //tableName = "\"" + tableName + "\""; + // } + return retrieveRecordsFromTable(tableName, namespace); + } + + @Override + protected List resolveIdentifier(String identifier) { + final List result = new ArrayList<>(); + final String resolved = namingResolver.getIdentifier(identifier); + result.add(identifier); + result.add(resolved); + if (!resolved.startsWith("\"")) { + result.add(resolved.toLowerCase()); + result.add(resolved.toUpperCase()); + } + return result; + } + + private List retrieveRecordsFromTable(String tableName, String schemaName) throws SQLException { + return Databases.createSqlServerDatabase(db.getUsername(), db.getPassword(), + db.getJdbcUrl()).query( + ctx -> { + ctx.fetch(String.format("USE %s;", config.get("database"))); + return ctx + .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) + .stream() + .map(r -> r.formatJSON(JSON_FORMAT)) + .map(Jsons::deserialize) + .collect(Collectors.toList()); + }); + } + + @BeforeAll + protected static void init() { + db = new MSSQLServerContainer<>(DockerImageName.parse("airbyte/mssql_ssltest:dev").asCompatibleSubstituteFor("mcr.microsoft.com/mssql/server")) + .acceptLicense(); + db.start(); + } + + private static Database getDatabase(JsonNode config) { + // todo (cgardens) - rework this abstraction so that we do not have to pass a null into the + // constructor. at least explicitly handle it, even if the impl doesn't change. + return Databases.createDatabase( + config.get("username").asText(), + config.get("password").asText(), + String.format("jdbc:sqlserver://%s:%s", + config.get("host").asText(), + config.get("port").asInt()), + "com.microsoft.sqlserver.jdbc.SQLServerDriver", + null); + } + + // how to interact with the mssql test container manaully. + // 1. exec into mssql container (not the test container container) + // 2. /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "A_Str0ng_Required_Password" + @Override + protected void setup(TestDestinationEnv testEnv) throws SQLException { + configWithoutDbName = getConfig(db); + final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + + final Database database = getDatabase(configWithoutDbName); + database.query(ctx -> { + ctx.fetch(String.format("CREATE DATABASE %s;", dbName)); + ctx.fetch(String.format("USE %s;", dbName)); + ctx.fetch("CREATE TABLE id_and_name(id INTEGER NOT NULL, name VARCHAR(200), born DATETIMEOFFSET(7));"); + ctx.fetch( + "INSERT INTO id_and_name (id, name, born) VALUES (1,'picard', '2124-03-04T01:01:01Z'), (2, 'crusher', '2124-03-04T01:01:01Z'), (3, 'vash', '2124-03-04T01:01:01Z');"); + return null; + }); + + config = Jsons.clone(configWithoutDbName); + ((ObjectNode) config).put("database", dbName); + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) {} + + @AfterAll + static void cleanUp() { + db.stop(); + db.close(); + } + +} diff --git a/docs/integrations/destinations/mssql.md b/docs/integrations/destinations/mssql.md new file mode 100644 index 000000000000..d08414de91cd --- /dev/null +++ b/docs/integrations/destinations/mssql.md @@ -0,0 +1,68 @@ +# MS SQL Server + +## Overview + +The Airbyte MS SQL Server destination allows you to sync data to SQL Server databases. + +### Sync overview + +#### Output schema + +Each stream will be output into its own table in SQL Server. Each table will contain 3 columns: + +* `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. The column type in SQL Server is `VARCHAR(64)`. +* `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. The column type in SQL Server is `DATETIMEOFFSET(7)`. +* `_airbyte_data`: a JSON blob representing with the event data. The column type in SQL Server is `VARCHAR(MAX)`. + +#### Features + +| Feature | Supported?\(Yes/No\) | Notes | +| :--- | :--- | :--- | +| Full Refresh Sync | Yes | | +| Incremental - Append Sync | Yes | | +| Namespaces | Yes | | + +## Getting started + +### Requirements + +To use the SQL Server destination, you'll need: + +MS SQL Server: `Azure SQL Database`, `Azure Synapse Analytics`, `Azure SQL Managed Instance`, `SQL Server 2019`, `SQL Server 2017`, `SQL Server 2016`, `SQL Server 2014`, `SQL Server 2012`, or `PDW 2008R2 AU34`. + +### Setup guide + +#### Network Access + +Make sure your SQL Server database can be accessed by Airbyte. If your database is within a VPC, you may need to allow access from the IP you're using to expose Airbyte. + +#### **Permissions** + +You need a user configured in SQL Server that can create tables and write rows. We highly recommend creating an Airbyte-specific user for this purpose. + +#### Target Database + +You will need to choose an existing database or create a new database that will be used to store synced data from Airbyte. + +#### SSL configuration (optional) + +Airbyte supports a SSL-encrypted connection to the database. If you want to use SSL to securely access your database, ensure that [the server is configured to use an SSL certificate.](https://support.microsoft.com/en-us/topic/how-to-enable-ssl-encryption-for-an-instance-of-sql-server-by-using-microsoft-management-console-1c7ae22f-8518-2b3e-93eb-d735af9e344c) + +### Setup the MSSQL destination in Airbyte + +You should now have all the requirements needed to configure SQL Server as a destination in the UI. You'll need the following information to configure the MSSQL destination: + +* **Host** +* **Port** +* **Username** +* **Password** +* **Schema** +* **Database** + * This database needs to exist within the schema provided. +* **SSL Method**: + * The SSL configuration supports three modes: Unencrypted, Encrypted (trust server certificate), and Encrypted (verify certificate). + * **Unencrypted**: Do not use SSL encryption on the database connection + * **Encrypted (trust server certificate)**: Use SSL encryption without verifying the server's certificate. This is useful for self-signed certificates in testing scenarios, but should not be used in production. + * **Encrypted (verify certificate)**: Use the server's SSL certificate, after standard certificate verification. + * **Host Name In Certificate** (optional): When using certificate verification, this property can be set to specify an expected name for added security. If this value is present, and the server's certificate's host name does not match it, certificate verification will fail. + \ No newline at end of file diff --git a/tools/integrations-test-ssl/MSSQL.Dockerfile b/tools/integrations-test-ssl/MSSQL.Dockerfile new file mode 100644 index 000000000000..9d57b1d449bd --- /dev/null +++ b/tools/integrations-test-ssl/MSSQL.Dockerfile @@ -0,0 +1,11 @@ +FROM mcr.microsoft.com/mssql/server:2019-latest + +COPY mssql.key /etc/ssl/private/mssql.key +COPY mssql.pem /etc/ssl/certs/mssql.pem +COPY mssql.conf /var/opt/mssql/mssql.conf + +EXPOSE 1433 + +USER root +RUN chmod 755 /etc/ssl/private +USER mssql \ No newline at end of file diff --git a/tools/integrations-test-ssl/generate_crts_mssql.sh b/tools/integrations-test-ssl/generate_crts_mssql.sh new file mode 100755 index 000000000000..c25c66c95730 --- /dev/null +++ b/tools/integrations-test-ssl/generate_crts_mssql.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +set -euo pipefail + +openssl req -x509 -nodes -newkey rsa:2048 -subj '/CN=sqltest.airbyte.com' -keyout mssql.key -out mssql.pem +chmod 440 mssql.pem +chmod 440 mssql.key diff --git a/tools/integrations-test-ssl/mssql.conf b/tools/integrations-test-ssl/mssql.conf new file mode 100644 index 000000000000..44d9eef21182 --- /dev/null +++ b/tools/integrations-test-ssl/mssql.conf @@ -0,0 +1,5 @@ +[network] +tlscert = /etc/ssl/certs/mssql.pem +tlskey = /etc/ssl/private/mssql.key +tlsprotocols = 1.2 +forceencryption = 1 \ No newline at end of file diff --git a/tools/integrations-test-ssl/mssql.key b/tools/integrations-test-ssl/mssql.key new file mode 100644 index 000000000000..e2e20c6e0aaf --- /dev/null +++ b/tools/integrations-test-ssl/mssql.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC5bUoMylrFiaAC +1sYCQlMqqX/yvGwZAIoWq5mo4Kc1PB1JDS8+vB7s8SMsJSMldB1udSEsv0P4k1sV +pymRZp0Y2IAepcJ1qpFEUJGH5E1A6JIDW3EyvddO5u4BoA+ZDUFvlRyozZumSbP8 +hvBtFv8OR/guB1938XB185CM5EFZydKeTkKiPhvTA2fbqmmi96DAxDMI2ggBBKDI +UVTUWJqJMOcj/sLGY87jPiltgQGknSxKZoJYeqFeyhYPrzvUrzLJJQPBJ9L7HwoO +In4jxIN8sDhl/4yXSmX8bGRRXOLhrspGp0KRIQXUeClZLswz+YT07pPl6oMXQnD/ +dl/WcSuZAgMBAAECggEABbzoCbVJUcuMdAoJXpCG2k8ccnp6LdviagktXBh3lCIk +Fdqel6ZinppnqDoN+F67emuNd0ED7XFB5E2j76fpPJeWf1xJxDJfBGop1rat3VBV +FF2EBznwq7RhsRMu6GGMoNNQa7jRFDg7pZjXX8jSY7K+b04zGhcSj9PVqUZ27zxO +AwKsRVDwbJ3SkQKhUTMvzgJiaIinvE8qMjFby+ar5R7HrQOXKKKwZDuTlLQGinMF +Nskb6e9uI5T4KVnp7JDGPw/rojBYEJnCnUv3nL51UYjRDl1lX+KPEf8a3nK37Nk5 +Xalj7IKAbyw6vbqr8qPcRWsOloC3KF/O5v+5nxSAMQKBgQDzahPceQxeZHpeWpC3 +9nUIHBt9zqkg9eszNZ9MyVjUbBraBRwYX1sBjZz+l5ohMA5VqyPlv31LFp9B6V/A +GI6b1qFWQ0RnzNT7DT0xSq5Ble6DKmksJjcONEVy6oROhj/GGQuufx1+sstARxk8 +H3A6uFtL53Hjgf8vGQEeFjIqcwKBgQDDA6sW41xwfGPGfu355oN6s4fVrfA8wocv +zNGXSMtJbAXLAC1fpOhbWzAWGRt8KqQWhTcpmBAClotNi4ollWlzHit6u34l4tL+ +dba7Vo2qbEs6IggE4zLHznEO/yKBOXVNB2PV+fnQo90kZ76AxEbGywz4BJ5ImO2C +rpV+flZSwwKBgQCbCyY7eJ74QOfw0Z78jm9dCwo3yDrSU9HMfItLTbTXGUTBOh/7 +JkHBa4JkaAw0t3dp+eiTnrUf7vjh8tSadwnfGYcKey5HL6E5h+VCUF9OR0H1Kj5z +cKQA2CqkV9yOZ9SXSby3GSCgYyIzfxYDxcKmpGcCohlY4KS6SyL7Fwg9IQKBgQCk +ktTw1ODvANqG6hlU+ubcRuQMPOTvsc66VSRPgpwkEyh0X2rrO1Tnu/XBwGCEkcu2 +QagCzxQ7yuY2g9sKyqOaBcz1n4Le4CPloFucj3ewagG2Rn/z9/Sj0CFzYXayDVZj +sifbrUDYhWEb1v1a18lO/I6uQ998LqrJzSHWBTI+VwKBgBek9td3SMUCjQ49toer +WKb69wqHIwgniOZmemX7EStfI7oQUtUZi0ZRvQd220oJI/jmarNlmi4Myuj2E8bI +pn3TRi3tAsxZ8y59BFNZkeHNAv4//ei0+6Y9HN43Ie7rpXIvEmL/8QMDUsfOXkto +s4bzmT5NcOMZuDr7fjKic/hl +-----END PRIVATE KEY----- diff --git a/tools/integrations-test-ssl/mssql.pem b/tools/integrations-test-ssl/mssql.pem new file mode 100644 index 000000000000..480667e84667 --- /dev/null +++ b/tools/integrations-test-ssl/mssql.pem @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDHTCCAgWgAwIBAgIUXEXm761ENtHmsrdb12sJyPedfW8wDQYJKoZIhvcNAQEL +BQAwHjEcMBoGA1UEAwwTc3FsdGVzdC5haXJieXRlLmNvbTAeFw0yMTA1MDcyMzA3 +NDBaFw0yMTA2MDYyMzA3NDBaMB4xHDAaBgNVBAMME3NxbHRlc3QuYWlyYnl0ZS5j +b20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC5bUoMylrFiaAC1sYC +QlMqqX/yvGwZAIoWq5mo4Kc1PB1JDS8+vB7s8SMsJSMldB1udSEsv0P4k1sVpymR +Zp0Y2IAepcJ1qpFEUJGH5E1A6JIDW3EyvddO5u4BoA+ZDUFvlRyozZumSbP8hvBt +Fv8OR/guB1938XB185CM5EFZydKeTkKiPhvTA2fbqmmi96DAxDMI2ggBBKDIUVTU +WJqJMOcj/sLGY87jPiltgQGknSxKZoJYeqFeyhYPrzvUrzLJJQPBJ9L7HwoOIn4j +xIN8sDhl/4yXSmX8bGRRXOLhrspGp0KRIQXUeClZLswz+YT07pPl6oMXQnD/dl/W +cSuZAgMBAAGjUzBRMB0GA1UdDgQWBBQO7E0qXXC6/C63Ph02pmr1DGhTfjAfBgNV +HSMEGDAWgBQO7E0qXXC6/C63Ph02pmr1DGhTfjAPBgNVHRMBAf8EBTADAQH/MA0G +CSqGSIb3DQEBCwUAA4IBAQA9uejsECQbFqNw2oXnOZfcHSvfslWq11GdPLqMWFYW +NBWAEw2PS5uiB3B8Q+sFZ/7sXcGyK7e55JPMUxnsH3yIiE0NB0S56pcfFBL9k9xB +4zL7h1LMnmYTueIhUWOInbc1VNrdycMjpTqkVjNYabiXwvza/iWG+EQfxh3bABtE ++t1omtwMGtOB/XF7jPndfBk7Tdj2PgsTGBru3HVP7hTwHOSlhpGt+p5hsWQVAbBl +PSQyvP1xX/KfjGOs8WtKtpwc6RMNbreJfA4ktqvYTYPCvVm9+LpdLZ1jj0OAxKe6 +dcshkMOfJUPBb4HmDX0RZrDJH+4UKbqQ9vC6sTvPQtDO +-----END CERTIFICATE-----