From 665e4f73ade37b8b2fb74a5a4946b0e1d1eca291 Mon Sep 17 00:00:00 2001 From: Duy Nguyen Date: Wed, 29 Nov 2023 06:45:28 -0800 Subject: [PATCH] source-mysql: merge strict encrypt variant into standard main mysql source (#31062) Co-authored-by: erohmensing Co-authored-by: nguyenaiden Co-authored-by: erohmensing Co-authored-by: alafanechere Co-authored-by: alafanechere Co-authored-by: Marius Posta --- .../acceptance-test-config.yml | 6 - .../source-mysql-strict-encrypt/build.gradle | 31 -- .../gradle.properties | 1 - .../source-mysql-strict-encrypt/icon.svg | 1 - .../source-mysql-strict-encrypt/metadata.yaml | 29 -- .../MySqlStrictEncryptSource.java | 87 ---- ...cateStrictEncryptSourceAcceptanceTest.java | 33 -- ...ySqlStrictEncryptSourceAcceptanceTest.java | 116 ----- ...StrictEncryptJdbcSourceAcceptanceTest.java | 453 ------------------ .../src/test/resources/config.json | 7 - .../source-mysql/acceptance-test-config.yml | 5 +- .../connectors/source-mysql/build.gradle | 1 - .../connectors/source-mysql/metadata.yaml | 3 +- .../source/mysql/MySqlSource.java | 56 ++- ...udDeploymentMySqlSourceAcceptanceTest.java | 26 + ...lSslCaCertificateSourceAcceptanceTest.java | 49 ++ ...lFullCertificateSourceAcceptanceTest.java} | 20 +- .../resources/expected_cloud_spec.json} | 34 +- ...ected_spec.json => expected_oss_spec.json} | 0 .../mysql/CloudDeploymentMySqlSslTest.java} | 37 +- .../test/resources/expected_cloud_spec.json | 227 +++++++++ .../src/test/resources/expected_oss_spec.json | 233 +++++++++ docs/integrations/sources/mysql.md | 3 +- 23 files changed, 650 insertions(+), 808 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-mysql-strict-encrypt/acceptance-test-config.yml delete mode 100644 airbyte-integrations/connectors/source-mysql-strict-encrypt/build.gradle delete mode 100644 airbyte-integrations/connectors/source-mysql-strict-encrypt/gradle.properties delete mode 100644 airbyte-integrations/connectors/source-mysql-strict-encrypt/icon.svg delete mode 100644 airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml delete mode 100644 airbyte-integrations/connectors/source-mysql-strict-encrypt/src/main/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSource.java delete mode 100644 airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlSslCaCertificateStrictEncryptSourceAcceptanceTest.java delete mode 100644 airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSourceAcceptanceTest.java delete mode 100644 airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptJdbcSourceAcceptanceTest.java delete mode 100644 airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/config.json create mode 100644 airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CloudDeploymentMySqlSourceAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CloudDeploymentMySqlSslCaCertificateSourceAcceptanceTest.java rename airbyte-integrations/connectors/{source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlSslFullCertificateStrictEncryptSourceAcceptanceTest.java => source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CloudDeploymentMySqlSslFullCertificateSourceAcceptanceTest.java} (53%) rename airbyte-integrations/connectors/{source-mysql-strict-encrypt/src/test/resources/expected_spec.json => source-mysql/src/test-integration/resources/expected_cloud_spec.json} (94%) rename airbyte-integrations/connectors/source-mysql/src/test-integration/resources/{expected_spec.json => expected_oss_spec.json} (100%) rename airbyte-integrations/connectors/{source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSslTest.java => source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CloudDeploymentMySqlSslTest.java} (79%) create mode 100644 airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json create mode 100644 airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/acceptance-test-config.yml b/airbyte-integrations/connectors/source-mysql-strict-encrypt/acceptance-test-config.yml deleted file mode 100644 index c2ef1564874e..000000000000 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/acceptance-test-config.yml +++ /dev/null @@ -1,6 +0,0 @@ -# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) -# for more information about how to configure these tests -connector_image: airbyte/source-mysql-strict-encrypt:dev -tests: - spec: - - spec_path: "src/test/resources/expected_spec.json" diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/build.gradle b/airbyte-integrations/connectors/source-mysql-strict-encrypt/build.gradle deleted file mode 100644 index 3d908370e97d..000000000000 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/build.gradle +++ /dev/null @@ -1,31 +0,0 @@ -plugins { - id 'application' - id 'airbyte-java-connector' -} - -airbyteJavaConnector { - cdkVersionRequired = '0.5.1' - features = ['db-sources'] - useLocalCdk = false -} - -configurations.all { - resolutionStrategy { - force libs.jooq - } -} - -application { - mainClass = 'io.airbyte.integrations.source.mysql_strict_encrypt.MySqlStrictEncryptSource' - applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] -} - -dependencies { - implementation project(':airbyte-integrations:connectors:source-mysql') - implementation libs.jooq - - testImplementation testFixtures(project(':airbyte-integrations:connectors:source-mysql')) - testImplementation libs.junit.jupiter.system.stubs - testImplementation 'org.hamcrest:hamcrest-all:1.3' - testImplementation libs.testcontainers.mysql -} diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/gradle.properties b/airbyte-integrations/connectors/source-mysql-strict-encrypt/gradle.properties deleted file mode 100644 index 8ef098d20b92..000000000000 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/gradle.properties +++ /dev/null @@ -1 +0,0 @@ -testExecutionConcurrency=-1 \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/icon.svg b/airbyte-integrations/connectors/source-mysql-strict-encrypt/icon.svg deleted file mode 100644 index 607d361ed765..000000000000 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/icon.svg +++ /dev/null @@ -1 +0,0 @@ - \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml b/airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml deleted file mode 100644 index 97c938a53050..000000000000 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml +++ /dev/null @@ -1,29 +0,0 @@ -data: - allowedHosts: - hosts: - - ${host} - - ${tunnel_method.tunnel_host} - registries: - cloud: - enabled: false # strict encrypt connectors are deployed to Cloud by their non strict encrypt sibling. - oss: - enabled: false # strict encrypt connectors are not used on OSS. - connectorSubtype: database - connectorType: source - definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.1.9 - dockerRepository: airbyte/source-mysql-strict-encrypt - githubIssueLabel: source-mysql - icon: mysql.svg - license: ELv2 - name: MySQL - releaseStage: generally_available - documentationUrl: https://docs.airbyte.com/integrations/sources/mysql - tags: - - language:java - releases: - breakingChanges: - 3.0.0: - message: "Add default cursor for cdc" - upgradeDeadline: "2023-08-17" -metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/main/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSource.java b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/main/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSource.java deleted file mode 100644 index 05583c81ddaa..000000000000 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/main/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSource.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mysql_strict_encrypt; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.cdk.db.jdbc.JdbcUtils; -import io.airbyte.cdk.integrations.base.IntegrationRunner; -import io.airbyte.cdk.integrations.base.Source; -import io.airbyte.cdk.integrations.base.spec_modification.SpecModifyingSource; -import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.source.mysql.MySqlSource; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; -import io.airbyte.protocol.models.v0.ConnectorSpecification; -import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Secure-only version of MySQL source that can be used in the Airbyte cloud. This connector - * inherently prevent certain insecure connections such as connecting to a database over the public - * internet without encryption. - */ -public class MySqlStrictEncryptSource extends SpecModifyingSource implements Source { - - public static final String TUNNEL_METHOD = "tunnel_method"; - public static final String NO_TUNNEL = "NO_TUNNEL"; - public static final String SSL_MODE = "ssl_mode"; - public static final String MODE = "mode"; - public static final String SSL_MODE_PREFERRED = "preferred"; - public static final String SSL_MODE_REQUIRED = "required"; - - private static final Logger LOGGER = LoggerFactory.getLogger(MySqlStrictEncryptSource.class); - private static final String SSL_MODE_DESCRIPTION = "SSL connection modes. " + - "
  • required - Always connect with SSL. If the MySQL server doesn’t support SSL, the connection will not be established. Certificate Authority (CA) and Hostname are not verified.
  • " - + - "
  • verify-ca - Always connect with SSL. Verifies CA, but allows connection even if Hostname does not match.
  • " + - "
  • Verify Identity - Always connect with SSL. Verify both CA and Hostname.
  • Read more in the docs."; - - MySqlStrictEncryptSource() { - this(new MySqlSource()); - } - - MySqlStrictEncryptSource(MySqlSource source) { - super(MySqlSource.sshWrappedSource(source)); - } - - @Override - public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) { - final ConnectorSpecification spec = Jsons.clone(originalSpec); - ((ObjectNode) spec.getConnectionSpecification().get("properties")).remove(JdbcUtils.SSL_KEY); - ((ObjectNode) spec.getConnectionSpecification().get("properties").get(SSL_MODE)).put("default", SSL_MODE_REQUIRED); - return spec; - } - - @Override - public AirbyteConnectionStatus check(final JsonNode config) throws Exception { - // #15808 Disallow connecting to db with disable, prefer or allow SSL mode when connecting directly - // and not over SSH tunnel - if (config.has(TUNNEL_METHOD) - && config.get(TUNNEL_METHOD).has(TUNNEL_METHOD) - && config.get(TUNNEL_METHOD).get(TUNNEL_METHOD).asText().equals(NO_TUNNEL)) { - // If no SSH tunnel - if (config.has(SSL_MODE) && config.get(SSL_MODE).has(MODE)) { - if (Set.of(SSL_MODE_PREFERRED).contains(config.get(SSL_MODE).get(MODE).asText())) { - // Fail in case SSL mode is preferred - return new AirbyteConnectionStatus() - .withStatus(Status.FAILED) - .withMessage( - "Unsecured connection not allowed. If no SSH Tunnel set up, please use one of the following SSL modes: required, verify-ca, verify-identity"); - } - } - } - return super.check(config); - } - - public static void main(final String[] args) throws Exception { - final Source source = new MySqlStrictEncryptSource(); - LOGGER.info("starting source: {}", MySqlStrictEncryptSource.class); - new IntegrationRunner(source).run(args); - LOGGER.info("completed source: {}", MySqlStrictEncryptSource.class); - } - -} diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlSslCaCertificateStrictEncryptSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlSslCaCertificateStrictEncryptSourceAcceptanceTest.java deleted file mode 100644 index c0efd449d2c0..000000000000 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlSslCaCertificateStrictEncryptSourceAcceptanceTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mysql_strict_encrypt; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.cdk.db.jdbc.JdbcUtils; -import java.util.stream.Stream; - -public class MySqlSslCaCertificateStrictEncryptSourceAcceptanceTest extends MySqlStrictEncryptSourceAcceptanceTest { - - private static final String PASSWORD = "Passw0rd"; - - @Override - protected Stream extraContainerFactoryMethods() { - return Stream.of("withRootAndServerCertificates"); - } - - @Override - protected JsonNode getConfig() { - return testdb.integrationTestConfigBuilder() - .withStandardReplication() - .withSsl(ImmutableMap.builder() - .put(JdbcUtils.MODE_KEY, "verify_ca") - .put("ca_certificate", testdb.getCaCertificate()) - .put("client_key_password", PASSWORD) - .build()) - .build(); - } - -} diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSourceAcceptanceTest.java deleted file mode 100644 index 35fafab62790..000000000000 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSourceAcceptanceTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mysql_strict_encrypt; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import io.airbyte.cdk.db.jdbc.JdbcUtils; -import io.airbyte.cdk.integrations.base.ssh.SshHelpers; -import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest; -import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv; -import io.airbyte.commons.features.FeatureFlags; -import io.airbyte.commons.features.FeatureFlagsWrapper; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; -import io.airbyte.integrations.source.mysql.MySQLContainerFactory; -import io.airbyte.integrations.source.mysql.MySQLTestDatabase; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.JsonSchemaType; -import io.airbyte.protocol.models.v0.CatalogHelpers; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.ConnectorSpecification; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import io.airbyte.protocol.models.v0.SyncMode; -import java.util.HashMap; -import java.util.stream.Stream; - -public class MySqlStrictEncryptSourceAcceptanceTest extends SourceAcceptanceTest { - - private static final String STREAM_NAME = "id_and_name"; - private static final String STREAM_NAME2 = "public.starships"; - - protected MySQLTestDatabase testdb; - - @Override - protected void setupEnvironment(final TestDestinationEnv environment) { - final var container = new MySQLContainerFactory().shared("mysql:8.0", extraContainerFactoryMethods().toArray(String[]::new)); - testdb = new MySQLTestDatabase(container) - .withConnectionProperty("useSSL", "true") - .withConnectionProperty("requireSSL", "true") - .initialized() - .with("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));") - .with("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');") - .with("CREATE TABLE starships(id INTEGER, name VARCHAR(200));") - .with("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); - } - - protected Stream extraContainerFactoryMethods() { - return Stream.empty(); - } - - @Override - protected FeatureFlags featureFlags() { - return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true); - } - - @Override - protected void tearDown(final TestDestinationEnv testEnv) { - testdb.close(); - } - - @Override - protected String getImageName() { - return "airbyte/source-mysql-strict-encrypt:dev"; - } - - @Override - protected ConnectorSpecification getSpec() throws Exception { - return SshHelpers.injectSshIntoSpec(Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class)); - } - - @Override - protected JsonNode getConfig() { - return testdb.integrationTestConfigBuilder() - .withSsl(ImmutableMap.of(JdbcUtils.MODE_KEY, "required")) - .withStandardReplication() - .build(); - } - - @Override - protected ConfiguredAirbyteCatalog getConfiguredCatalog() { - return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( - new ConfiguredAirbyteStream() - .withSyncMode(SyncMode.INCREMENTAL) - .withCursorField(Lists.newArrayList("id")) - .withDestinationSyncMode(DestinationSyncMode.APPEND) - .withStream(CatalogHelpers.createAirbyteStream( - String.format("%s.%s", testdb.getDatabaseName(), STREAM_NAME), - Field.of("id", JsonSchemaType.NUMBER), - Field.of("name", JsonSchemaType.STRING)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), - new ConfiguredAirbyteStream() - .withSyncMode(SyncMode.INCREMENTAL) - .withCursorField(Lists.newArrayList("id")) - .withDestinationSyncMode(DestinationSyncMode.APPEND) - .withStream(CatalogHelpers.createAirbyteStream( - String.format("%s.%s", testdb.getDatabaseName(), STREAM_NAME2), - Field.of("id", JsonSchemaType.NUMBER), - Field.of("name", JsonSchemaType.STRING)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); - } - - @Override - protected JsonNode getState() { - return Jsons.jsonNode(new HashMap<>()); - } - - @Override - protected boolean supportsPerStream() { - return true; - } - -} diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptJdbcSourceAcceptanceTest.java deleted file mode 100644 index a0381a0dc7c9..000000000000 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptJdbcSourceAcceptanceTest.java +++ /dev/null @@ -1,453 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mysql_strict_encrypt; - -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStateManager.STATE_TYPE_KEY; -import static java.util.stream.Collectors.toList; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import io.airbyte.cdk.db.jdbc.JdbcUtils; -import io.airbyte.cdk.integrations.base.ssh.SshHelpers; -import io.airbyte.cdk.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; -import io.airbyte.cdk.integrations.source.relationaldb.models.DbStreamState; -import io.airbyte.commons.features.EnvVariableFeatureFlags; -import io.airbyte.commons.features.FeatureFlagsWrapper; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; -import io.airbyte.commons.util.MoreIterators; -import io.airbyte.integrations.source.mysql.MySQLContainerFactory; -import io.airbyte.integrations.source.mysql.MySQLTestDatabase; -import io.airbyte.integrations.source.mysql.MySqlSource; -import io.airbyte.integrations.source.mysql.internal.models.CursorBasedStatus; -import io.airbyte.integrations.source.mysql.internal.models.InternalModels.StateType; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.JsonSchemaType; -import io.airbyte.protocol.models.v0.AirbyteCatalog; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteMessage.Type; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; -import io.airbyte.protocol.models.v0.AirbyteStateMessage; -import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; -import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.AirbyteStreamState; -import io.airbyte.protocol.models.v0.CatalogHelpers; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.ConnectorSpecification; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import io.airbyte.protocol.models.v0.StreamDescriptor; -import io.airbyte.protocol.models.v0.SyncMode; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.junit.jupiter.api.Test; - -class MySqlStrictEncryptJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { - - @Override - protected JsonNode config() { - return testdb.testConfigBuilder().build(); - } - - @Override - protected MySqlStrictEncryptSource source() { - final var source = new MySqlSource(); - source.setFeatureFlags(FeatureFlagsWrapper.overridingUseStreamCapableState(new EnvVariableFeatureFlags(), true)); - return new MySqlStrictEncryptSource(source); - } - - @Override - protected MySQLTestDatabase createTestDatabase() { - final var container = new MySQLContainerFactory().shared("mysql:8.0"); - return new MySQLTestDatabase(container) - .withConnectionProperty("useSSL", "true") - .withConnectionProperty("requireSSL", "true") - .initialized(); - } - - @Override - protected void maybeSetShorterConnectionTimeout(final JsonNode config) { - ((ObjectNode) config).put(JdbcUtils.JDBC_URL_PARAMS_KEY, "connectTimeout=1000"); - } - - // MySql does not support schemas in the way most dbs do. Instead we namespace by db name. - @Override - public boolean supportsSchemas() { - return false; - } - - @Test - void testSpec() throws Exception { - final ConnectorSpecification actual = source().spec(); - final ConnectorSpecification expected = - SshHelpers.injectSshIntoSpec(Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class)); - assertEquals(expected, actual); - } - - @Override - protected AirbyteCatalog getCatalog(final String defaultNamespace) { - return new AirbyteCatalog().withStreams(List.of( - CatalogHelpers.createAirbyteStream( - TABLE_NAME, - defaultNamespace, - Field.of(COL_ID, JsonSchemaType.INTEGER), - Field.of(COL_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) - .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))), - CatalogHelpers.createAirbyteStream( - TABLE_NAME_WITHOUT_PK, - defaultNamespace, - Field.of(COL_ID, JsonSchemaType.INTEGER), - Field.of(COL_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) - .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(Collections.emptyList()), - CatalogHelpers.createAirbyteStream( - TABLE_NAME_COMPOSITE_PK, - defaultNamespace, - Field.of(COL_FIRST_NAME, JsonSchemaType.STRING), - Field.of(COL_LAST_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) - .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey( - List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))))); - } - - @Test - void testReadMultipleTablesIncrementally() throws Exception { - final var config = config(); - ((ObjectNode) config).put("sync_checkpoint_records", 1); - final String namespace = getDefaultNamespace(); - final String streamOneName = TABLE_NAME + "one"; - // Create a fresh first table - testdb.with(""" - CREATE TABLE %s ( - id int PRIMARY KEY, - name VARCHAR(200) NOT NULL, - updated_at VARCHAR(200) NOT NULL - );""", streamOneName) - .with("INSERT INTO %s(id, name, updated_at) VALUES (1,'picard', '2004-10-19')", - getFullyQualifiedTableName(streamOneName)) - .with("INSERT INTO %s(id, name, updated_at) VALUES (2, 'crusher', '2005-10-19')", - getFullyQualifiedTableName(streamOneName)) - .with("INSERT INTO %s(id, name, updated_at) VALUES (3, 'vash', '2006-10-19')", - getFullyQualifiedTableName(streamOneName)); - - // Create a fresh second table - final String streamTwoName = TABLE_NAME + "two"; - final String streamTwoFullyQualifiedName = getFullyQualifiedTableName(streamTwoName); - // Insert records into second table - testdb.with(""" - CREATE TABLE %s ( - id int PRIMARY KEY, - name VARCHAR(200) NOT NULL, - updated_at DATE NOT NULL - );""", streamTwoName) - .with("INSERT INTO %s(id, name, updated_at) VALUES (40,'Jean Luc','2006-10-19')", - streamTwoFullyQualifiedName) - .with("INSERT INTO %s(id, name, updated_at) VALUES (41, 'Groot', '2006-10-19')", - streamTwoFullyQualifiedName) - .with("INSERT INTO %s(id, name, updated_at) VALUES (42, 'Thanos','2006-10-19')", - streamTwoFullyQualifiedName); - // Create records list that we expect to see in the state message - final List streamTwoExpectedRecords = Arrays.asList( - createRecord(streamTwoName, namespace, ImmutableMap.of( - COL_ID, 40, - COL_NAME, "Jean Luc", - COL_UPDATED_AT, "2006-10-19")), - createRecord(streamTwoName, namespace, ImmutableMap.of( - COL_ID, 41, - COL_NAME, "Groot", - COL_UPDATED_AT, "2006-10-19")), - createRecord(streamTwoName, namespace, ImmutableMap.of( - COL_ID, 42, - COL_NAME, "Thanos", - COL_UPDATED_AT, "2006-10-19"))); - - // Prep and create a configured catalog to perform sync - final AirbyteStream streamOne = getAirbyteStream(streamOneName, namespace); - final AirbyteStream streamTwo = getAirbyteStream(streamTwoName, namespace); - - final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog( - new AirbyteCatalog().withStreams(List.of(streamOne, streamTwo))); - configuredCatalog.getStreams().forEach(airbyteStream -> { - airbyteStream.setSyncMode(SyncMode.INCREMENTAL); - airbyteStream.setCursorField(List.of(COL_ID)); - airbyteStream.setDestinationSyncMode(DestinationSyncMode.APPEND); - airbyteStream.withPrimaryKey(List.of(List.of(COL_ID))); - }); - - // Perform initial sync - final List messagesFromFirstSync = MoreIterators - .toList(source().read(config, configuredCatalog, null)); - - final List recordsFromFirstSync = filterRecords(messagesFromFirstSync); - - setEmittedAtToNull(messagesFromFirstSync); - // All records in the 2 configured streams should be present - assertThat(filterRecords(recordsFromFirstSync)).containsExactlyElementsOf( - Stream.concat(getTestMessages(streamOneName).stream().parallel(), - streamTwoExpectedRecords.stream().parallel()).collect(toList())); - - final List actualFirstSyncState = extractStateMessage(messagesFromFirstSync); - // Since we are emitting a state message after each record, we should have 1 state for each record - - // 3 from stream1 and 3 from stream2 - assertEquals(6, actualFirstSyncState.size()); - - // The expected state type should be 2 primaryKey's and the last one being standard - final List expectedStateTypesFromFirstSync = List.of("primary_key", "primary_key", "cursor_based"); - final List stateTypeOfStreamOneStatesFromFirstSync = - extractSpecificFieldFromCombinedMessages(messagesFromFirstSync, streamOneName, STATE_TYPE_KEY); - final List stateTypeOfStreamTwoStatesFromFirstSync = - extractSpecificFieldFromCombinedMessages(messagesFromFirstSync, streamTwoName, STATE_TYPE_KEY); - // It should be the same for stream1 and stream2 - assertEquals(stateTypeOfStreamOneStatesFromFirstSync, expectedStateTypesFromFirstSync); - assertEquals(stateTypeOfStreamTwoStatesFromFirstSync, expectedStateTypesFromFirstSync); - - // Create the expected primaryKeys that we should see - final List expectedPrimaryKeysFromFirstSync = List.of("1", "2"); - final List primaryKeyFromStreamOneStatesFromFirstSync = - extractSpecificFieldFromCombinedMessages(messagesFromFirstSync, streamOneName, "pk_val"); - final List primaryKeyFromStreamTwoStatesFromFirstSync = - extractSpecificFieldFromCombinedMessages(messagesFromFirstSync, streamOneName, "pk_val"); - - // Verifying each element and its index to match. - // Only checking the first 2 elements since we have verified that the last state_type is - // "cursor_based" - assertEquals(primaryKeyFromStreamOneStatesFromFirstSync.get(0), expectedPrimaryKeysFromFirstSync.get(0)); - assertEquals(primaryKeyFromStreamOneStatesFromFirstSync.get(1), expectedPrimaryKeysFromFirstSync.get(1)); - assertEquals(primaryKeyFromStreamTwoStatesFromFirstSync.get(0), expectedPrimaryKeysFromFirstSync.get(0)); - assertEquals(primaryKeyFromStreamTwoStatesFromFirstSync.get(1), expectedPrimaryKeysFromFirstSync.get(1)); - - // Extract only state messages for each stream - final List streamOneStateMessagesFromFirstSync = extractStateMessage(messagesFromFirstSync, streamOneName); - final List streamTwoStateMessagesFromFirstSync = extractStateMessage(messagesFromFirstSync, streamTwoName); - // Extract the incremental states of each stream's first and second state message - final List streamOneIncrementalStatesFromFirstSync = - List.of(streamOneStateMessagesFromFirstSync.get(0).getStream().getStreamState().get("incremental_state"), - streamOneStateMessagesFromFirstSync.get(1).getStream().getStreamState().get("incremental_state")); - final JsonNode streamOneFinalStreamStateFromFirstSync = streamOneStateMessagesFromFirstSync.get(2).getStream().getStreamState(); - - final List streamTwoIncrementalStatesFromFirstSync = - List.of(streamTwoStateMessagesFromFirstSync.get(0).getStream().getStreamState().get("incremental_state"), - streamTwoStateMessagesFromFirstSync.get(1).getStream().getStreamState().get("incremental_state")); - final JsonNode streamTwoFinalStreamStateFromFirstSync = streamTwoStateMessagesFromFirstSync.get(2).getStream().getStreamState(); - - // The incremental_state of each stream's first and second incremental states is expected - // to be identical to the stream_state of the final state message for each stream - assertEquals(streamOneIncrementalStatesFromFirstSync.get(0), streamOneFinalStreamStateFromFirstSync); - assertEquals(streamOneIncrementalStatesFromFirstSync.get(1), streamOneFinalStreamStateFromFirstSync); - assertEquals(streamTwoIncrementalStatesFromFirstSync.get(0), streamTwoFinalStreamStateFromFirstSync); - assertEquals(streamTwoIncrementalStatesFromFirstSync.get(1), streamTwoFinalStreamStateFromFirstSync); - - // Sync should work with a primaryKey state AND a cursor-based state from each stream - // Forcing a sync with - // - stream one state still being the first record read via Primary Key. - // - stream two state being the Primary Key state before the final emitted state before the cursor - // switch - final List messagesFromSecondSyncWithMixedStates = MoreIterators - .toList(source().read(config, configuredCatalog, - Jsons.jsonNode(List.of(streamOneStateMessagesFromFirstSync.get(0), - streamTwoStateMessagesFromFirstSync.get(1))))); - - // Extract only state messages for each stream after second sync - final List streamOneStateMessagesFromSecondSync = - extractStateMessage(messagesFromSecondSyncWithMixedStates, streamOneName); - final List stateTypeOfStreamOneStatesFromSecondSync = - extractSpecificFieldFromCombinedMessages(messagesFromSecondSyncWithMixedStates, streamOneName, STATE_TYPE_KEY); - - final List streamTwoStateMessagesFromSecondSync = - extractStateMessage(messagesFromSecondSyncWithMixedStates, streamTwoName); - final List stateTypeOfStreamTwoStatesFromSecondSync = - extractSpecificFieldFromCombinedMessages(messagesFromSecondSyncWithMixedStates, streamTwoName, STATE_TYPE_KEY); - - // Stream One states after the second sync are expected to have 2 stream states - // - 1 with PrimaryKey state_type and 1 state that is of cursorBased state type - assertEquals(2, streamOneStateMessagesFromSecondSync.size()); - assertEquals(List.of("primary_key", "cursor_based"), stateTypeOfStreamOneStatesFromSecondSync); - - // Stream Two states after the second sync are expected to have 1 stream state - // - The state that is of cursorBased state type - assertEquals(1, streamTwoStateMessagesFromSecondSync.size()); - assertEquals(List.of("cursor_based"), stateTypeOfStreamTwoStatesFromSecondSync); - - // Add some data to each table and perform a third read. - // Expect to see all records be synced via cursorBased method and not primaryKey - testdb.with("INSERT INTO %s(id, name, updated_at) VALUES (4,'Hooper','2006-10-19')", - getFullyQualifiedTableName(streamOneName)) - .with("INSERT INTO %s(id, name, updated_at) VALUES (43, 'Iron Man', '2006-10-19')", - streamTwoFullyQualifiedName); - - final List messagesFromThirdSync = MoreIterators - .toList(source().read(config, configuredCatalog, - Jsons.jsonNode(List.of(streamOneStateMessagesFromSecondSync.get(1), - streamTwoStateMessagesFromSecondSync.get(0))))); - - // Extract only state messages, state type, and cursor for each stream after second sync - final List streamOneStateMessagesFromThirdSync = - extractStateMessage(messagesFromThirdSync, streamOneName); - final List stateTypeOfStreamOneStatesFromThirdSync = - extractSpecificFieldFromCombinedMessages(messagesFromThirdSync, streamOneName, STATE_TYPE_KEY); - final List cursorOfStreamOneStatesFromThirdSync = - extractSpecificFieldFromCombinedMessages(messagesFromThirdSync, streamOneName, "cursor"); - - final List streamTwoStateMessagesFromThirdSync = - extractStateMessage(messagesFromThirdSync, streamTwoName); - final List stateTypeOfStreamTwoStatesFromThirdSync = - extractSpecificFieldFromCombinedMessages(messagesFromThirdSync, streamTwoName, STATE_TYPE_KEY); - final List cursorOfStreamTwoStatesFromThirdSync = - extractSpecificFieldFromCombinedMessages(messagesFromThirdSync, streamTwoName, "cursor"); - - // Both streams should now be synced via standard cursor and have updated max cursor values - // cursor: 4 for stream one - // cursor: 43 for stream two - assertEquals(1, streamOneStateMessagesFromThirdSync.size()); - assertEquals(List.of("cursor_based"), stateTypeOfStreamOneStatesFromThirdSync); - assertEquals(List.of("4"), cursorOfStreamOneStatesFromThirdSync); - - assertEquals(1, streamTwoStateMessagesFromThirdSync.size()); - assertEquals(List.of("cursor_based"), stateTypeOfStreamTwoStatesFromThirdSync); - assertEquals(List.of("43"), cursorOfStreamTwoStatesFromThirdSync); - } - - @Override - protected boolean supportsPerStream() { - return true; - } - - // Override from parent class as we're no longer including the legacy Data field. - @Override - protected List createExpectedTestMessages(final List states) { - return supportsPerStream() - ? states.stream() - .map(s -> new AirbyteMessage().withType(Type.STATE) - .withState( - new AirbyteStateMessage().withType(AirbyteStateType.STREAM) - .withStream(new AirbyteStreamState() - .withStreamDescriptor(new StreamDescriptor().withNamespace(s.getStreamNamespace()).withName(s.getStreamName())) - .withStreamState(Jsons.jsonNode(s))))) - .collect( - Collectors.toList()) - : List.of(new AirbyteMessage().withType(Type.STATE).withState(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY))); - } - - @Override - protected List createState(final List states) { - return supportsPerStream() - ? states.stream() - .map(s -> new AirbyteStateMessage().withType(AirbyteStateType.STREAM) - .withStream(new AirbyteStreamState() - .withStreamDescriptor(new StreamDescriptor().withNamespace(s.getStreamNamespace()).withName(s.getStreamName())) - .withStreamState(Jsons.jsonNode(s)))) - .collect( - Collectors.toList()) - : List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY)); - } - - @Override - protected JsonNode getStateData(final AirbyteMessage airbyteMessage, final String streamName) { - final JsonNode streamState = airbyteMessage.getState().getStream().getStreamState(); - if (streamState.get("stream_name").asText().equals(streamName)) { - return streamState; - } - - throw new IllegalArgumentException("Stream not found in state message: " + streamName); - } - - @Override - protected DbStreamState buildStreamState(final ConfiguredAirbyteStream configuredAirbyteStream, - final String cursorField, - final String cursorValue) { - return new CursorBasedStatus().withStateType(StateType.CURSOR_BASED).withVersion(2L) - .withStreamName(configuredAirbyteStream.getStream().getName()) - .withStreamNamespace(configuredAirbyteStream.getStream().getNamespace()) - .withCursorField(List.of(cursorField)) - .withCursor(cursorValue) - .withCursorRecordCount(1L); - } - - @Override - protected List getExpectedAirbyteMessagesSecondSync(final String namespace) { - final List expectedMessages = new ArrayList<>(); - expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(streamName()).withNamespace(namespace) - .withData(Jsons.jsonNode(ImmutableMap - .of(COL_ID, ID_VALUE_4, - COL_NAME, "riker", - COL_UPDATED_AT, "2006-10-19"))))); - expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(streamName()).withNamespace(namespace) - .withData(Jsons.jsonNode(ImmutableMap - .of(COL_ID, ID_VALUE_5, - COL_NAME, "data", - COL_UPDATED_AT, "2006-10-19"))))); - final DbStreamState state = new CursorBasedStatus() - .withStateType(StateType.CURSOR_BASED) - .withVersion(2L) - .withStreamName(streamName()) - .withStreamNamespace(namespace) - .withCursorField(ImmutableList.of(COL_ID)) - .withCursor("5") - .withCursorRecordCount(1L); - - expectedMessages.addAll(createExpectedTestMessages(List.of(state))); - return expectedMessages; - } - - @Override - protected List getTestMessages() { - return getTestMessages(streamName()); - } - - protected List getTestMessages(final String streamName) { - return List.of( - new AirbyteMessage().withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) - .withData(Jsons.jsonNode(Map - .of(COL_ID, ID_VALUE_1, - COL_NAME, "picard", - COL_UPDATED_AT, "2004-10-19")))), - new AirbyteMessage().withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) - .withData(Jsons.jsonNode(Map - .of(COL_ID, ID_VALUE_2, - COL_NAME, "crusher", - COL_UPDATED_AT, - "2005-10-19")))), - new AirbyteMessage().withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) - .withData(Jsons.jsonNode(Map - .of(COL_ID, ID_VALUE_3, - COL_NAME, "vash", - COL_UPDATED_AT, "2006-10-19"))))); - } - - private AirbyteStream getAirbyteStream(final String tableName, final String namespace) { - return CatalogHelpers.createAirbyteStream( - tableName, - namespace, - Field.of(COL_ID, JsonSchemaType.INTEGER), - Field.of(COL_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))); - } - -} diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/config.json b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/config.json deleted file mode 100644 index e17733f16b23..000000000000 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/config.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "host": "default", - "port": 5555, - "database": "default", - "username": "default", - "replication_method": { "method": "STANDARD" } -} diff --git a/airbyte-integrations/connectors/source-mysql/acceptance-test-config.yml b/airbyte-integrations/connectors/source-mysql/acceptance-test-config.yml index a3dd76400d68..575c91b8bb87 100644 --- a/airbyte-integrations/connectors/source-mysql/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-mysql/acceptance-test-config.yml @@ -3,5 +3,8 @@ connector_image: airbyte/source-mysql:dev tests: spec: - - spec_path: "src/test-integration/resources/expected_spec.json" + - spec_path: "src/test-integration/resources/expected_oss_spec.json" + config_path: "src/test-integration/resources/dummy_config.json" + - deployment_mode: cloud + spec_path: "src/test-integration/resources/expected_cloud_spec.json" config_path: "src/test-integration/resources/dummy_config.json" diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index b63a4120fe48..70966b334176 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -19,7 +19,6 @@ configurations.all { } - application { mainClass = 'io.airbyte.integrations.source.mysql.MySqlSource' applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 552643198bbb..8a3792b505df 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.1.9 + dockerImageTag: 3.2.0 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql @@ -18,7 +18,6 @@ data: name: MySQL registries: cloud: - dockerRepository: airbyte/source-mysql-strict-encrypt enabled: true oss: enabled: true diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index b942d468cdc6..4128dc2fcc20 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -10,7 +10,6 @@ import static io.airbyte.cdk.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; import static io.airbyte.cdk.integrations.source.jdbc.JdbcDataSourceUtils.DEFAULT_JDBC_PARAMETERS_DELIMITER; import static io.airbyte.cdk.integrations.source.jdbc.JdbcDataSourceUtils.assertCustomParametersDontOverwriteDefaultParameters; -import static io.airbyte.cdk.integrations.source.jdbc.JdbcSSLConnectionUtils.SSL_MODE; import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getCursorBasedSyncStatusForStreams; import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getTableSizeInfoForStreams; import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.logStreamSyncStatus; @@ -33,6 +32,7 @@ import io.airbyte.cdk.db.jdbc.StreamingJdbcDatabase; import io.airbyte.cdk.integrations.base.IntegrationRunner; import io.airbyte.cdk.integrations.base.Source; +import io.airbyte.cdk.integrations.base.adaptive.AdaptiveSourceRunner; import io.airbyte.cdk.integrations.base.ssh.SshWrappedSource; import io.airbyte.cdk.integrations.debezium.internals.RecordWaitTimeUtil; import io.airbyte.cdk.integrations.source.jdbc.AbstractJdbcSource; @@ -60,12 +60,15 @@ import io.airbyte.integrations.source.mysql.internal.models.CursorBasedStatus; import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.v0.AirbyteCatalog; +import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; +import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.v0.ConnectorSpecification; import io.airbyte.protocol.models.v0.SyncMode; import java.sql.SQLException; import java.time.Instant; @@ -87,6 +90,13 @@ public class MySqlSource extends AbstractJdbcSource implements Source { + public static final String TUNNEL_METHOD = "tunnel_method"; + public static final String NO_TUNNEL = "NO_TUNNEL"; + public static final String SSL_MODE = "ssl_mode"; + private static final String MODE = "mode"; + public static final String SSL_MODE_PREFERRED = "preferred"; + public static final String SSL_MODE_REQUIRED = "required"; + private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSource.class); private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000; public static final String NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY = @@ -118,6 +128,46 @@ public static Source sshWrappedSource(MySqlSource source) { return new SshWrappedSource(source, JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY); } + private ConnectorSpecification getCloudDeploymentSpec(final ConnectorSpecification originalSpec) { + final ConnectorSpecification spec = Jsons.clone(originalSpec); + // Remove the SSL options + ((ObjectNode) spec.getConnectionSpecification().get("properties")).remove(JdbcUtils.SSL_KEY); + // Set SSL_MODE to required by default + ((ObjectNode) spec.getConnectionSpecification().get("properties").get(SSL_MODE)).put("default", SSL_MODE_REQUIRED); + return spec; + } + + @Override + public ConnectorSpecification spec() throws Exception { + if (cloudDeploymentMode()) { + return getCloudDeploymentSpec(super.spec()); + } + return super.spec(); + } + + @Override + public AirbyteConnectionStatus check(final JsonNode config) throws Exception { + // #15808 Disallow connecting to db with disable, prefer or allow SSL mode when connecting directly + // and not over SSH tunnel + if (cloudDeploymentMode()) { + if (config.has(TUNNEL_METHOD) + && config.get(TUNNEL_METHOD).has(TUNNEL_METHOD) + && config.get(TUNNEL_METHOD).get(TUNNEL_METHOD).asText().equals(NO_TUNNEL)) { + // If no SSH tunnel + if (config.has(SSL_MODE) && config.get(SSL_MODE).has(MODE)) { + if (Set.of(SSL_MODE_PREFERRED).contains(config.get(SSL_MODE).get(MODE).asText())) { + // Fail in case SSL mode is preferred + return new AirbyteConnectionStatus() + .withStatus(Status.FAILED) + .withMessage( + "Unsecured connection not allowed. If no SSH Tunnel set up, please use one of the following SSL modes: required, verify-ca, verify-identity"); + } + } + } + } + return super.check(config); + } + public MySqlSource() { super(DRIVER_CLASS, MySqlStreamingQueryConfig::new, new MySqlSourceOperations()); } @@ -456,6 +506,10 @@ private String toSslJdbcParam(final SslMode sslMode) { return toSslJdbcParamInternal(sslMode); } + private boolean cloudDeploymentMode() { + return AdaptiveSourceRunner.CLOUD_MODE.equalsIgnoreCase(featureFlags.deploymentMode()); + } + @Override protected int getStateEmissionFrequency() { return INTERMEDIATE_STATE_EMISSION_FREQUENCY; diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CloudDeploymentMySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CloudDeploymentMySqlSourceAcceptanceTest.java new file mode 100644 index 000000000000..8b607cc092f8 --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CloudDeploymentMySqlSourceAcceptanceTest.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.io.airbyte.integration_tests.sources; + +import io.airbyte.cdk.integrations.base.ssh.SshHelpers; +import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.commons.features.FeatureFlagsWrapper; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.protocol.models.v0.ConnectorSpecification; + +public class CloudDeploymentMySqlSourceAcceptanceTest extends MySqlSslSourceAcceptanceTest { + + @Override + protected FeatureFlags featureFlags() { + return FeatureFlagsWrapper.overridingDeploymentMode(super.featureFlags(), "CLOUD"); + } + + @Override + protected ConnectorSpecification getSpec() throws Exception { + return SshHelpers.injectSshIntoSpec(Jsons.deserialize(MoreResources.readResource("expected_cloud_spec.json"), ConnectorSpecification.class)); + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CloudDeploymentMySqlSslCaCertificateSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CloudDeploymentMySqlSslCaCertificateSourceAcceptanceTest.java new file mode 100644 index 000000000000..2e9baade07dc --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CloudDeploymentMySqlSslCaCertificateSourceAcceptanceTest.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.io.airbyte.integration_tests.sources; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.cdk.db.jdbc.JdbcUtils; +import io.airbyte.cdk.integrations.base.ssh.SshHelpers; +import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.commons.features.FeatureFlagsWrapper; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.protocol.models.v0.ConnectorSpecification; +import java.util.stream.Stream; + +public class CloudDeploymentMySqlSslCaCertificateSourceAcceptanceTest extends MySqlSourceAcceptanceTest { + + private static final String PASSWORD = "Passw0rd"; + + @Override + protected FeatureFlags featureFlags() { + return FeatureFlagsWrapper.overridingDeploymentMode(super.featureFlags(), "CLOUD"); + } + + @Override + protected Stream extraContainerFactoryMethods() { + return Stream.of("withRootAndServerCertificates"); + } + + @Override + protected JsonNode getConfig() { + return testdb.integrationTestConfigBuilder() + .withStandardReplication() + .withSsl(ImmutableMap.builder() + .put(JdbcUtils.MODE_KEY, "verify_ca") + .put("ca_certificate", testdb.getCaCertificate()) + .put("client_key_password", PASSWORD) + .build()) + .build(); + } + + @Override + protected ConnectorSpecification getSpec() throws Exception { + return SshHelpers.injectSshIntoSpec(Jsons.deserialize(MoreResources.readResource("expected_cloud_spec.json"), ConnectorSpecification.class)); + } + +} diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlSslFullCertificateStrictEncryptSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CloudDeploymentMySqlSslFullCertificateSourceAcceptanceTest.java similarity index 53% rename from airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlSslFullCertificateStrictEncryptSourceAcceptanceTest.java rename to airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CloudDeploymentMySqlSslFullCertificateSourceAcceptanceTest.java index 6df92b5e507d..b0d27490798a 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlSslFullCertificateStrictEncryptSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CloudDeploymentMySqlSslFullCertificateSourceAcceptanceTest.java @@ -2,17 +2,28 @@ * Copyright (c) 2023 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql_strict_encrypt; +package io.airbyte.integrations.io.airbyte.integration_tests.sources; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.cdk.db.jdbc.JdbcUtils; +import io.airbyte.cdk.integrations.base.ssh.SshHelpers; +import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.commons.features.FeatureFlagsWrapper; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.protocol.models.v0.ConnectorSpecification; import java.util.stream.Stream; -public class MySqlSslFullCertificateStrictEncryptSourceAcceptanceTest extends MySqlStrictEncryptSourceAcceptanceTest { +public class CloudDeploymentMySqlSslFullCertificateSourceAcceptanceTest extends MySqlSourceAcceptanceTest { private static final String PASSWORD = "Passw0rd"; + @Override + protected FeatureFlags featureFlags() { + return FeatureFlagsWrapper.overridingDeploymentMode(super.featureFlags(), "CLOUD"); + } + @Override protected Stream extraContainerFactoryMethods() { return Stream.of("withRootAndServerCertificates", "withClientCertificate"); @@ -32,4 +43,9 @@ protected JsonNode getConfig() { .build(); } + @Override + protected ConnectorSpecification getSpec() throws Exception { + return SshHelpers.injectSshIntoSpec(Jsons.deserialize(MoreResources.readResource("expected_cloud_spec.json"), ConnectorSpecification.class)); + } + } diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json similarity index 94% rename from airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/expected_spec.json rename to airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json index 30d138e263aa..50d717a95886 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/expected_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json @@ -53,18 +53,13 @@ "description": "SSL connection modes. Read more in the docs.", "type": "object", "order": 7, - "default": "required", "oneOf": [ { "title": "preferred", "description": "Automatically attempt SSL connection. If the MySQL server does not support SSL, continue with a regular connection.", "required": ["mode"], "properties": { - "mode": { - "type": "string", - "const": "preferred", - "order": 0 - } + "mode": { "type": "string", "const": "preferred", "order": 0 } } }, { @@ -72,11 +67,7 @@ "description": "Always connect with SSL. If the MySQL server doesn’t support SSL, the connection will not be established. Certificate Authority (CA) and Hostname are not verified.", "required": ["mode"], "properties": { - "mode": { - "type": "string", - "const": "required", - "order": 0 - } + "mode": { "type": "string", "const": "required", "order": 0 } } }, { @@ -84,11 +75,7 @@ "description": "Always connect with SSL. Verifies CA, but allows connection even if Hostname does not match.", "required": ["mode", "ca_certificate"], "properties": { - "mode": { - "type": "string", - "const": "verify_ca", - "order": 0 - }, + "mode": { "type": "string", "const": "verify_ca", "order": 0 }, "ca_certificate": { "type": "string", "title": "CA certificate", @@ -169,7 +156,8 @@ } } } - ] + ], + "default": "required" }, "replication_method": { "type": "object", @@ -184,11 +172,7 @@ "description": "Recommended - Incrementally reads new inserts, updates, and deletes using the MySQL binary log. This must be enabled on your database.", "required": ["method"], "properties": { - "method": { - "type": "string", - "const": "CDC", - "order": 0 - }, + "method": { "type": "string", "const": "CDC", "order": 0 }, "initial_waiting_seconds": { "type": "integer", "title": "Initial Waiting Time in Seconds (Advanced)", @@ -213,11 +197,7 @@ "description": "Incrementally detects new inserts and updates using the cursor column chosen when configuring a connection (e.g. created_at, updated_at).", "required": ["method"], "properties": { - "method": { - "type": "string", - "const": "STANDARD", - "order": 0 - } + "method": { "type": "string", "const": "STANDARD", "order": 0 } } } ] diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_spec.json b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json similarity index 100% rename from airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_spec.json rename to airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSslTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CloudDeploymentMySqlSslTest.java similarity index 79% rename from airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSslTest.java rename to airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CloudDeploymentMySqlSslTest.java index 66f6713dabec..92fded997c4d 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSslTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CloudDeploymentMySqlSslTest.java @@ -2,24 +2,29 @@ * Copyright (c) 2023 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql_strict_encrypt; +package io.airbyte.integrations.source.mysql; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.common.collect.ImmutableMap; import io.airbyte.cdk.db.jdbc.JdbcUtils; +import io.airbyte.cdk.integrations.base.Source; import io.airbyte.cdk.integrations.base.ssh.SshBastionContainer; +import io.airbyte.cdk.integrations.base.ssh.SshHelpers; import io.airbyte.cdk.integrations.base.ssh.SshTunnel; -import io.airbyte.integrations.source.mysql.MySQLContainerFactory; -import io.airbyte.integrations.source.mysql.MySQLTestDatabase; +import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.features.FeatureFlagsWrapper; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; +import io.airbyte.protocol.models.v0.ConnectorSpecification; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; @Execution(ExecutionMode.CONCURRENT) -public class MySqlStrictEncryptSslTest { +public class CloudDeploymentMySqlSslTest { private MySQLTestDatabase createTestDatabase(String... containerFactoryMethods) { final var container = new MySQLContainerFactory().shared("mysql:8.0", containerFactoryMethods); @@ -29,6 +34,20 @@ private MySQLTestDatabase createTestDatabase(String... containerFactoryMethods) .initialized(); } + private Source source() { + final var source = new MySqlSource(); + source.setFeatureFlags(FeatureFlagsWrapper.overridingDeploymentMode(new EnvVariableFeatureFlags(), "CLOUD")); + return MySqlSource.sshWrappedSource(source); + } + + @Test + void testSpec() throws Exception { + final ConnectorSpecification actual = source().spec(); + final ConnectorSpecification expected = + SshHelpers.injectSshIntoSpec(Jsons.deserialize(MoreResources.readResource("expected_cloud_spec.json"), ConnectorSpecification.class)); + assertEquals(expected, actual); + } + @Test void testStrictSSLUnsecuredNoTunnel() throws Exception { try (final var testdb = createTestDatabase()) { @@ -42,7 +61,7 @@ void testStrictSSLUnsecuredNoTunnel() throws Exception { .put(JdbcUtils.MODE_KEY, "preferred") .build()) .build(); - final AirbyteConnectionStatus actual = new MySqlStrictEncryptSource().check(config); + final AirbyteConnectionStatus actual = source().check(config); assertEquals(AirbyteConnectionStatus.Status.FAILED, actual.getStatus()); assertTrue(actual.getMessage().contains("Unsecured connection not allowed"), actual.getMessage()); } @@ -62,7 +81,7 @@ void testStrictSSLSecuredNoTunnel() throws Exception { .put("client_key_password", PASSWORD) .build()) .build(); - final AirbyteConnectionStatus actual = new MySqlStrictEncryptSource().check(config); + final AirbyteConnectionStatus actual = source().check(config); assertEquals(AirbyteConnectionStatus.Status.FAILED, actual.getStatus()); assertTrue(actual.getMessage().contains("Failed to create keystore for Client certificate"), actual.getMessage()); } @@ -86,7 +105,7 @@ void testStrictSSLSecuredWithTunnel() throws Exception { .build()) .with("tunnel_method", ImmutableMap.builder().put("tunnel_method", "SSH_KEY_AUTH").build()) .build(); - final AirbyteConnectionStatus actual = new MySqlStrictEncryptSource().check(config); + final AirbyteConnectionStatus actual = source().check(config); assertEquals(AirbyteConnectionStatus.Status.FAILED, actual.getStatus()); assertTrue(actual.getMessage().contains("Could not connect with provided SSH configuration."), actual.getMessage()); } @@ -105,7 +124,7 @@ void testStrictSSLUnsecuredWithTunnel() throws Exception { .build()) .with("tunnel_method", ImmutableMap.builder().put("tunnel_method", "SSH_KEY_AUTH").build()) .build(); - final AirbyteConnectionStatus actual = new MySqlStrictEncryptSource().check(config); + final AirbyteConnectionStatus actual = source().check(config); assertEquals(AirbyteConnectionStatus.Status.FAILED, actual.getStatus()); assertTrue(actual.getMessage().contains("Could not connect with provided SSH configuration."), actual.getMessage()); } @@ -120,7 +139,7 @@ void testCheckWithSslModeDisabled() throws Exception { .with("tunnel_method", bastion.getTunnelMethod(SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH, false)) .withoutSsl() .build(); - final AirbyteConnectionStatus actual = new MySqlStrictEncryptSource().check(config); + final AirbyteConnectionStatus actual = source().check(config); assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, actual.getStatus()); } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json new file mode 100644 index 000000000000..52441e124b17 --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json @@ -0,0 +1,227 @@ +{ + "documentationUrl": "https://docs.airbyte.com/integrations/sources/mysql", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "MySql Source Spec", + "type": "object", + "required": ["host", "port", "database", "username", "replication_method"], + "properties": { + "host": { + "description": "The host name of the database.", + "title": "Host", + "type": "string", + "order": 0 + }, + "port": { + "description": "The port to connect to.", + "title": "Port", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 3306, + "examples": ["3306"], + "order": 1 + }, + "database": { + "description": "The database name.", + "title": "Database", + "type": "string", + "order": 2 + }, + "username": { + "description": "The username which is used to access the database.", + "title": "Username", + "type": "string", + "order": 3 + }, + "password": { + "description": "The password associated with the username.", + "title": "Password", + "type": "string", + "airbyte_secret": true, + "order": 4, + "always_show": true + }, + "jdbc_url_params": { + "description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). For more information read about JDBC URL parameters.", + "title": "JDBC URL Parameters (Advanced)", + "type": "string", + "order": 5 + }, + "ssl_mode": { + "title": "SSL modes", + "description": "SSL connection modes. Read more in the docs.", + "type": "object", + "order": 7, + "oneOf": [ + { + "title": "preferred", + "description": "Automatically attempt SSL connection. If the MySQL server does not support SSL, continue with a regular connection.", + "required": ["mode"], + "properties": { + "mode": { + "type": "string", + "const": "preferred", + "order": 0 + } + } + }, + { + "title": "required", + "description": "Always connect with SSL. If the MySQL server doesn’t support SSL, the connection will not be established. Certificate Authority (CA) and Hostname are not verified.", + "required": ["mode"], + "properties": { + "mode": { + "type": "string", + "const": "required", + "order": 0 + } + } + }, + { + "title": "Verify CA", + "description": "Always connect with SSL. Verifies CA, but allows connection even if Hostname does not match.", + "required": ["mode", "ca_certificate"], + "properties": { + "mode": { + "type": "string", + "const": "verify_ca", + "order": 0 + }, + "ca_certificate": { + "type": "string", + "title": "CA certificate", + "description": "CA certificate", + "airbyte_secret": true, + "multiline": true, + "order": 1 + }, + "client_certificate": { + "type": "string", + "title": "Client certificate", + "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", + "airbyte_secret": true, + "multiline": true, + "order": 2, + "always_show": true + }, + "client_key": { + "type": "string", + "title": "Client key", + "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", + "airbyte_secret": true, + "multiline": true, + "order": 3, + "always_show": true + }, + "client_key_password": { + "type": "string", + "title": "Client key password", + "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.", + "airbyte_secret": true, + "order": 4 + } + } + }, + { + "title": "Verify Identity", + "description": "Always connect with SSL. Verify both CA and Hostname.", + "required": ["mode", "ca_certificate"], + "properties": { + "mode": { + "type": "string", + "const": "verify_identity", + "order": 0 + }, + "ca_certificate": { + "type": "string", + "title": "CA certificate", + "description": "CA certificate", + "airbyte_secret": true, + "multiline": true, + "order": 1 + }, + "client_certificate": { + "type": "string", + "title": "Client certificate", + "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", + "airbyte_secret": true, + "multiline": true, + "order": 2, + "always_show": true + }, + "client_key": { + "type": "string", + "title": "Client key", + "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", + "airbyte_secret": true, + "multiline": true, + "order": 3, + "always_show": true + }, + "client_key_password": { + "type": "string", + "title": "Client key password", + "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.", + "airbyte_secret": true, + "order": 4 + } + } + } + ], + "default": "required" + }, + "replication_method": { + "type": "object", + "title": "Update Method", + "description": "Configures how data is extracted from the database.", + "order": 8, + "default": "CDC", + "display_type": "radio", + "oneOf": [ + { + "title": "Read Changes using Binary Log (CDC)", + "description": "Recommended - Incrementally reads new inserts, updates, and deletes using the MySQL binary log. This must be enabled on your database.", + "required": ["method"], + "properties": { + "method": { + "type": "string", + "const": "CDC", + "order": 0 + }, + "initial_waiting_seconds": { + "type": "integer", + "title": "Initial Waiting Time in Seconds (Advanced)", + "description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about initial waiting time.", + "default": 300, + "min": 120, + "max": 1200, + "order": 1, + "always_show": true + }, + "server_time_zone": { + "type": "string", + "title": "Configured server timezone for the MySQL source (Advanced)", + "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", + "order": 2, + "always_show": true + } + } + }, + { + "title": "Scan Changes with User Defined Cursor", + "description": "Incrementally detects new inserts and updates using the cursor column chosen when configuring a connection (e.g. created_at, updated_at).", + "required": ["method"], + "properties": { + "method": { + "type": "string", + "const": "STANDARD", + "order": 0 + } + } + } + ] + } + } + } +} diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json new file mode 100644 index 000000000000..841fa1f3bdba --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json @@ -0,0 +1,233 @@ +{ + "documentationUrl": "https://docs.airbyte.com/integrations/sources/mysql", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "MySql Source Spec", + "type": "object", + "required": ["host", "port", "database", "username", "replication_method"], + "properties": { + "host": { + "description": "The host name of the database.", + "title": "Host", + "type": "string", + "order": 0 + }, + "port": { + "description": "The port to connect to.", + "title": "Port", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 3306, + "examples": ["3306"], + "order": 1 + }, + "database": { + "description": "The database name.", + "title": "Database", + "type": "string", + "order": 2 + }, + "username": { + "description": "The username which is used to access the database.", + "title": "Username", + "type": "string", + "order": 3 + }, + "password": { + "description": "The password associated with the username.", + "title": "Password", + "type": "string", + "airbyte_secret": true, + "order": 4, + "always_show": true + }, + "jdbc_url_params": { + "description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). For more information read about JDBC URL parameters.", + "title": "JDBC URL Parameters (Advanced)", + "type": "string", + "order": 5 + }, + "ssl": { + "title": "SSL Connection", + "description": "Encrypt data using SSL.", + "type": "boolean", + "default": true, + "order": 6 + }, + "ssl_mode": { + "title": "SSL modes", + "description": "SSL connection modes. Read more in the docs.", + "type": "object", + "order": 7, + "oneOf": [ + { + "title": "preferred", + "description": "Automatically attempt SSL connection. If the MySQL server does not support SSL, continue with a regular connection.", + "required": ["mode"], + "properties": { + "mode": { + "type": "string", + "const": "preferred", + "order": 0 + } + } + }, + { + "title": "required", + "description": "Always connect with SSL. If the MySQL server doesn’t support SSL, the connection will not be established. Certificate Authority (CA) and Hostname are not verified.", + "required": ["mode"], + "properties": { + "mode": { + "type": "string", + "const": "required", + "order": 0 + } + } + }, + { + "title": "Verify CA", + "description": "Always connect with SSL. Verifies CA, but allows connection even if Hostname does not match.", + "required": ["mode", "ca_certificate"], + "properties": { + "mode": { + "type": "string", + "const": "verify_ca", + "order": 0 + }, + "ca_certificate": { + "type": "string", + "title": "CA certificate", + "description": "CA certificate", + "airbyte_secret": true, + "multiline": true, + "order": 1 + }, + "client_certificate": { + "type": "string", + "title": "Client certificate", + "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", + "airbyte_secret": true, + "multiline": true, + "order": 2, + "always_show": true + }, + "client_key": { + "type": "string", + "title": "Client key", + "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", + "airbyte_secret": true, + "multiline": true, + "order": 3, + "always_show": true + }, + "client_key_password": { + "type": "string", + "title": "Client key password", + "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.", + "airbyte_secret": true, + "order": 4 + } + } + }, + { + "title": "Verify Identity", + "description": "Always connect with SSL. Verify both CA and Hostname.", + "required": ["mode", "ca_certificate"], + "properties": { + "mode": { + "type": "string", + "const": "verify_identity", + "order": 0 + }, + "ca_certificate": { + "type": "string", + "title": "CA certificate", + "description": "CA certificate", + "airbyte_secret": true, + "multiline": true, + "order": 1 + }, + "client_certificate": { + "type": "string", + "title": "Client certificate", + "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", + "airbyte_secret": true, + "multiline": true, + "order": 2, + "always_show": true + }, + "client_key": { + "type": "string", + "title": "Client key", + "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", + "airbyte_secret": true, + "multiline": true, + "order": 3, + "always_show": true + }, + "client_key_password": { + "type": "string", + "title": "Client key password", + "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.", + "airbyte_secret": true, + "order": 4 + } + } + } + ] + }, + "replication_method": { + "type": "object", + "title": "Update Method", + "description": "Configures how data is extracted from the database.", + "order": 8, + "default": "CDC", + "display_type": "radio", + "oneOf": [ + { + "title": "Read Changes using Binary Log (CDC)", + "description": "Recommended - Incrementally reads new inserts, updates, and deletes using the MySQL binary log. This must be enabled on your database.", + "required": ["method"], + "properties": { + "method": { + "type": "string", + "const": "CDC", + "order": 0 + }, + "initial_waiting_seconds": { + "type": "integer", + "title": "Initial Waiting Time in Seconds (Advanced)", + "description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about initial waiting time.", + "default": 300, + "min": 120, + "max": 1200, + "order": 1, + "always_show": true + }, + "server_time_zone": { + "type": "string", + "title": "Configured server timezone for the MySQL source (Advanced)", + "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", + "order": 2, + "always_show": true + } + } + }, + { + "title": "Scan Changes with User Defined Cursor", + "description": "Incrementally detects new inserts and updates using the cursor column chosen when configuring a connection (e.g. created_at, updated_at).", + "required": ["method"], + "properties": { + "method": { + "type": "string", + "const": "STANDARD", + "order": 0 + } + } + } + ] + } + } + } +} diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 25af67167da8..e855e669eaaf 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -221,7 +221,8 @@ Any database or table encoding combination of charset and collation is supported ## Changelog | Version | Date | Pull Request | Subject | -|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| :------ | :--------- | :--------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------- | +| 3.2.0 | 2023-11-29 | [31062](https://github.com/airbytehq/airbyte/pull/31062) | enforce SSL on Airbyte Cloud | | 3.1.9 | 2023-11-27 | [32662](https://github.com/airbytehq/airbyte/pull/32662) | Apply initial setup time to debezium engine warmup time. | | 3.1.8 | 2023-11-22 | [32656](https://github.com/airbytehq/airbyte/pull/32656) | Adopt java CDK version 0.5.0. | | 3.1.7 | 2023-11-08 | [32125](https://github.com/airbytehq/airbyte/pull/32125) | fix compilation warnings |