From 2703a44a4443696a579846f2faf0038212f210b8 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Thu, 23 Mar 2023 10:11:13 -0700 Subject: [PATCH] Cleanup createDatabase() code in AbstractJdbcSource (#24343) * Cleanup createDatabase() code Move methods from AbstractJdbcSource Moved methods until createDataSource outside of the abstract class * Quick cleanups/renames * Add tests --- .../source/cockroachdb/CockroachDbSource.java | 12 +--- .../source/jdbc/AbstractJdbcSource.java | 61 ++----------------- .../source/jdbc/JdbcDataSourceUtils.java | 54 ++++++++++++++++ .../jdbc/DefaultJdbcSourceAcceptanceTest.java | 2 +- .../source/jdbc/JdbcDataSourceUtilsTest.java | 33 ++++++++++ .../source/oracle/OracleSource.java | 9 +-- .../source/postgres/PostgresSource.java | 5 -- .../SnowflakeSource.java | 18 +++++- 8 files changed, 114 insertions(+), 80 deletions(-) create mode 100644 airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/JdbcDataSourceUtils.java create mode 100644 airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcDataSourceUtilsTest.java diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java index 3890e5ebeb06..b95e57c3d12f 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java @@ -10,7 +10,6 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.db.factory.DataSourceFactory; import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.DefaultJdbcDatabase; @@ -22,9 +21,6 @@ import io.airbyte.integrations.base.ssh.SshWrappedSource; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import java.sql.Connection; import java.sql.JDBCType; import java.sql.PreparedStatement; @@ -107,9 +103,10 @@ protected boolean isNotInternalSchema(final JsonNode jsonNode, final Set } @Override - protected DataSource createDataSource(final JsonNode sourceConfig) { + public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException { final JsonNode jdbcConfig = toDatabaseConfig(sourceConfig); + // Create the JDBC data source final DataSource dataSource = DataSourceFactory.create( jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText(), jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null, @@ -117,12 +114,7 @@ protected DataSource createDataSource(final JsonNode sourceConfig) { jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(), JdbcUtils.parseJdbcParameters(jdbcConfig, JdbcUtils.CONNECTION_PROPERTIES_KEY)); dataSources.add(dataSource); - return dataSource; - } - @Override - public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException { - final DataSource dataSource = createDataSource(sourceConfig); final JdbcDatabase database = new DefaultJdbcDatabase(dataSource, sourceOperations); quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString); final CockroachJdbcDatabase cockroachJdbcDatabase = new CockroachJdbcDatabase(database, sourceOperations); diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index ddf9e47b8ab0..990315c86a97 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -31,7 +31,6 @@ import datadog.trace.api.Trace; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.map.MoreMaps; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.db.JdbcCompatibleSourceOperations; @@ -64,7 +63,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.function.Predicate; import java.util.function.Supplier; @@ -391,22 +389,19 @@ protected long getActualCursorRecordCount(final Connection connection, } } - protected DataSource createDataSource(final JsonNode sourceConfig) { + @Override + public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException { final JsonNode jdbcConfig = toDatabaseConfig(sourceConfig); + // Create the data source final DataSource dataSource = DataSourceFactory.create( jdbcConfig.has(JdbcUtils.USERNAME_KEY) ? jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText() : null, jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null, driverClass, jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(), - getConnectionProperties(sourceConfig)); + JdbcDataSourceUtils.getConnectionProperties(sourceConfig)); // Record the data source so that it can be closed. dataSources.add(dataSource); - return dataSource; - } - @Override - public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException { - final DataSource dataSource = createDataSource(sourceConfig); final JdbcDatabase database = new StreamingJdbcDatabase( dataSource, sourceOperations, @@ -414,56 +409,10 @@ public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLExcept quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString); database.setSourceConfig(sourceConfig); - database.setDatabaseConfig(toDatabaseConfig(sourceConfig)); + database.setDatabaseConfig(jdbcConfig); return database; } - /** - * Retrieves connection_properties from config and also validates if custom jdbc_url parameters - * overlap with the default properties - * - * @param config A configuration used to check Jdbc connection - * @return A mapping of connection properties - */ - protected Map getConnectionProperties(final JsonNode config) { - final Map customProperties = JdbcUtils.parseJdbcParameters(config, JdbcUtils.JDBC_URL_PARAMS_KEY); - final Map defaultProperties = getDefaultConnectionProperties(config); - assertCustomParametersDontOverwriteDefaultParameters(customProperties, defaultProperties); - return MoreMaps.merge(customProperties, defaultProperties); - } - - /** - * Validates for duplication parameters - * - * @param customParameters custom connection properties map as specified by each Jdbc source - * @param defaultParameters connection properties map as specified by each Jdbc source - * @throws IllegalArgumentException - */ - protected static void assertCustomParametersDontOverwriteDefaultParameters(final Map customParameters, - final Map defaultParameters) { - for (final String key : defaultParameters.keySet()) { - if (customParameters.containsKey(key) && !Objects.equals(customParameters.get(key), defaultParameters.get(key))) { - throw new IllegalArgumentException("Cannot overwrite default JDBC parameter " + key); - } - } - } - - /** - * Retrieves default connection_properties from config - * - * TODO: make this method abstract and add parity features to destination connectors - * - * @param config A configuration used to check Jdbc connection - * @return A mapping of the default connection properties - */ - protected Map getDefaultConnectionProperties(final JsonNode config) { - return JdbcUtils.parseJdbcParameters(config, "connection_properties", getJdbcParameterDelimiter()); - }; - - protected String getJdbcParameterDelimiter() { - return "&"; - } - @Override public void close() { dataSources.forEach(d -> { diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/JdbcDataSourceUtils.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/JdbcDataSourceUtils.java new file mode 100644 index 000000000000..8c6012669b4f --- /dev/null +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/JdbcDataSourceUtils.java @@ -0,0 +1,54 @@ +package io.airbyte.integrations.source.jdbc; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.map.MoreMaps; +import io.airbyte.db.jdbc.JdbcUtils; +import java.util.Map; +import java.util.Objects; + +public class JdbcDataSourceUtils { + + public static String DEFAULT_JDBC_PARAMETERS_DELIMITER = "&"; + /** + * Validates for duplication parameters + * + * @param customParameters custom connection properties map as specified by each Jdbc source + * @param defaultParameters connection properties map as specified by each Jdbc source + * @throws IllegalArgumentException + */ + public static void assertCustomParametersDontOverwriteDefaultParameters(final Map customParameters, + final Map defaultParameters) { + for (final String key : defaultParameters.keySet()) { + if (customParameters.containsKey(key) && !Objects.equals(customParameters.get(key), defaultParameters.get(key))) { + throw new IllegalArgumentException("Cannot overwrite default JDBC parameter " + key); + } + } + } + + /** + * Retrieves connection_properties from config and also validates if custom jdbc_url parameters + * overlap with the default properties + * + * @param config A configuration used to check Jdbc connection + * @return A mapping of connection properties + */ + public static Map getConnectionProperties(final JsonNode config) { + final Map customProperties = JdbcUtils.parseJdbcParameters(config, JdbcUtils.JDBC_URL_PARAMS_KEY); + final Map defaultProperties = JdbcDataSourceUtils.getDefaultConnectionProperties(config); + assertCustomParametersDontOverwriteDefaultParameters(customProperties, defaultProperties); + return MoreMaps.merge(customProperties, defaultProperties); + } + + /** + * Retrieves default connection_properties from config + * + * TODO: make this method abstract and add parity features to destination connectors + * + * @param config A configuration used to check Jdbc connection + * @return A mapping of the default connection properties + */ + private static Map getDefaultConnectionProperties(final JsonNode config) { + // NOTE that Postgres returns an empty map for some reason? + return JdbcUtils.parseJdbcParameters(config, "connection_properties", DEFAULT_JDBC_PARAMETERS_DELIMITER); + }; +} diff --git a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.java index be0b8de32903..8971fe9adea7 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.java @@ -4,7 +4,7 @@ package io.airbyte.integrations.source.jdbc; -import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.assertCustomParametersDontOverwriteDefaultParameters; +import static io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils.assertCustomParametersDontOverwriteDefaultParameters; import static org.junit.jupiter.api.Assertions.assertThrows; import com.fasterxml.jackson.databind.JsonNode; diff --git a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcDataSourceUtilsTest.java b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcDataSourceUtilsTest.java new file mode 100644 index 000000000000..3aab6edd8821 --- /dev/null +++ b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcDataSourceUtilsTest.java @@ -0,0 +1,33 @@ +package io.airbyte.integrations.source.jdbc; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +public class JdbcDataSourceUtilsTest { + + @Test + void test() { + final String validConfigString = "{\"jdbc_url_params\":\"key1=val1&key3=key3\",\"connection_properties\":\"key1=val1&key2=val2\"}"; + final JsonNode validConfig = Jsons.deserialize(validConfigString); + final Map connectionProperties = JdbcDataSourceUtils.getConnectionProperties(validConfig); + final List validKeys = List.of("key1", "key2", "key3"); + validKeys.forEach(key -> assertTrue(connectionProperties.containsKey(key))); + + // For an invalid config, there is a conflict betweeen the values of keys in jdbc_url_params and connection_properties + final String invalidConfigString = "{\"jdbc_url_params\":\"key1=val2&key3=key3\",\"connection_properties\":\"key1=val1&key2=val2\"}"; + final JsonNode invalidConfig = Jsons.deserialize(invalidConfigString); + final Exception exception = assertThrows(IllegalArgumentException.class, () -> { + JdbcDataSourceUtils.getConnectionProperties(invalidConfig); + }); + + final String expectedMessage = "Cannot overwrite default JDBC parameter key1"; + assertThat(expectedMessage.equals(exception.getMessage())); + } +} diff --git a/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java b/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java index e78cf52fdff4..b8531d3a1751 100644 --- a/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java +++ b/airbyte-integrations/connectors/source-oracle/src/main/java/io/airbyte/integrations/source/oracle/OracleSource.java @@ -47,6 +47,8 @@ public class OracleSource extends AbstractJdbcSource implements Source private static final String UNRECOGNIZED = "Unrecognized"; private static final String CONNECTION_DATA = "connection_data"; + private static final String ORACLE_JDBC_PARAMETER_DELIMITER = ";"; + enum Protocol { TCP, TCPS @@ -115,7 +117,7 @@ public JsonNode toDatabaseConfig(final JsonNode config) { } if (!additionalParameters.isEmpty()) { - final String connectionParams = String.join(getJdbcParameterDelimiter(), additionalParameters); + final String connectionParams = String.join(ORACLE_JDBC_PARAMETER_DELIMITER, additionalParameters); configBuilder.put(JdbcUtils.CONNECTION_PROPERTIES_KEY, connectionParams); } @@ -193,11 +195,6 @@ public Set getExcludedInternalNameSpaces() { return Set.of(); } - @Override - protected String getJdbcParameterDelimiter() { - return ";"; - } - @Override protected int getStateEmissionFrequency() { return INTERMEDIATE_STATE_EMISSION_FREQUENCY; diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 29236eef54f3..9126c2e352a4 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -125,11 +125,6 @@ public static Source sshWrappedSource() { this.featureFlags = new EnvVariableFeatureFlags(); } - @Override - protected Map getDefaultConnectionProperties(final JsonNode config) { - return Collections.emptyMap(); - } - @Override public JsonNode toDatabaseConfig(final JsonNode config) { final List additionalParameters = new ArrayList<>(); diff --git a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java index 75d9f631c362..b5187e545072 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java +++ b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java @@ -12,12 +12,15 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.db.factory.DatabaseDriver; +import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.db.jdbc.StreamingJdbcDatabase; import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import java.io.IOException; import java.sql.JDBCType; +import java.sql.SQLException; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -41,10 +44,21 @@ public SnowflakeSource(final String airbyteEnvironment) { } @Override - protected DataSource createDataSource(final JsonNode sourceConfig) { + public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException { + final JsonNode jdbcConfig = toDatabaseConfig(sourceConfig); + // Create the data source final DataSource dataSource = SnowflakeDataSourceUtils.createDataSource(sourceConfig, airbyteEnvironment); dataSources.add(dataSource); - return dataSource; + + final JdbcDatabase database = new StreamingJdbcDatabase( + dataSource, + sourceOperations, + streamingQueryConfigProvider); + + quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString); + database.setSourceConfig(sourceConfig); + database.setDatabaseConfig(jdbcConfig); + return database; } @Override