Skip to content

Commit

Permalink
Cleanup createDatabase() code in AbstractJdbcSource (#24343)
Browse files Browse the repository at this point in the history
* Cleanup createDatabase() code

Move methods from AbstractJdbcSource

Moved methods until createDataSource outside of the abstract class

* Quick cleanups/renames

* Add tests
  • Loading branch information
akashkulk authored Mar 23, 2023
1 parent 9b7b30f commit 2703a44
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
Expand All @@ -22,9 +21,6 @@
import io.airbyte.integrations.base.ssh.SshWrappedSource;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.sql.Connection;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -107,22 +103,18 @@ protected boolean isNotInternalSchema(final JsonNode jsonNode, final Set<String>
}

@Override
protected DataSource createDataSource(final JsonNode sourceConfig) {
public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException {
final JsonNode jdbcConfig = toDatabaseConfig(sourceConfig);

// Create the JDBC data source
final DataSource dataSource = DataSourceFactory.create(
jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText(),
jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null,
driverClass,
jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(),
JdbcUtils.parseJdbcParameters(jdbcConfig, JdbcUtils.CONNECTION_PROPERTIES_KEY));
dataSources.add(dataSource);
return dataSource;
}

@Override
public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException {
final DataSource dataSource = createDataSource(sourceConfig);
final JdbcDatabase database = new DefaultJdbcDatabase(dataSource, sourceOperations);
quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString);
final CockroachJdbcDatabase cockroachJdbcDatabase = new CockroachJdbcDatabase(database, sourceOperations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import datadog.trace.api.Trace;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.JdbcCompatibleSourceOperations;
Expand Down Expand Up @@ -64,7 +63,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -391,79 +389,30 @@ protected long getActualCursorRecordCount(final Connection connection,
}
}

protected DataSource createDataSource(final JsonNode sourceConfig) {
@Override
public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException {
final JsonNode jdbcConfig = toDatabaseConfig(sourceConfig);
// Create the data source
final DataSource dataSource = DataSourceFactory.create(
jdbcConfig.has(JdbcUtils.USERNAME_KEY) ? jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText() : null,
jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null,
driverClass,
jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(),
getConnectionProperties(sourceConfig));
JdbcDataSourceUtils.getConnectionProperties(sourceConfig));
// Record the data source so that it can be closed.
dataSources.add(dataSource);
return dataSource;
}

@Override
public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException {
final DataSource dataSource = createDataSource(sourceConfig);
final JdbcDatabase database = new StreamingJdbcDatabase(
dataSource,
sourceOperations,
streamingQueryConfigProvider);

quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString);
database.setSourceConfig(sourceConfig);
database.setDatabaseConfig(toDatabaseConfig(sourceConfig));
database.setDatabaseConfig(jdbcConfig);
return database;
}

/**
* Retrieves connection_properties from config and also validates if custom jdbc_url parameters
* overlap with the default properties
*
* @param config A configuration used to check Jdbc connection
* @return A mapping of connection properties
*/
protected Map<String, String> getConnectionProperties(final JsonNode config) {
final Map<String, String> customProperties = JdbcUtils.parseJdbcParameters(config, JdbcUtils.JDBC_URL_PARAMS_KEY);
final Map<String, String> defaultProperties = getDefaultConnectionProperties(config);
assertCustomParametersDontOverwriteDefaultParameters(customProperties, defaultProperties);
return MoreMaps.merge(customProperties, defaultProperties);
}

/**
* Validates for duplication parameters
*
* @param customParameters custom connection properties map as specified by each Jdbc source
* @param defaultParameters connection properties map as specified by each Jdbc source
* @throws IllegalArgumentException
*/
protected static void assertCustomParametersDontOverwriteDefaultParameters(final Map<String, String> customParameters,
final Map<String, String> defaultParameters) {
for (final String key : defaultParameters.keySet()) {
if (customParameters.containsKey(key) && !Objects.equals(customParameters.get(key), defaultParameters.get(key))) {
throw new IllegalArgumentException("Cannot overwrite default JDBC parameter " + key);
}
}
}

/**
* Retrieves default connection_properties from config
*
* TODO: make this method abstract and add parity features to destination connectors
*
* @param config A configuration used to check Jdbc connection
* @return A mapping of the default connection properties
*/
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
return JdbcUtils.parseJdbcParameters(config, "connection_properties", getJdbcParameterDelimiter());
};

protected String getJdbcParameterDelimiter() {
return "&";
}

@Override
public void close() {
dataSources.forEach(d -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.airbyte.integrations.source.jdbc;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.db.jdbc.JdbcUtils;
import java.util.Map;
import java.util.Objects;

public class JdbcDataSourceUtils {

public static String DEFAULT_JDBC_PARAMETERS_DELIMITER = "&";
/**
* Validates for duplication parameters
*
* @param customParameters custom connection properties map as specified by each Jdbc source
* @param defaultParameters connection properties map as specified by each Jdbc source
* @throws IllegalArgumentException
*/
public static void assertCustomParametersDontOverwriteDefaultParameters(final Map<String, String> customParameters,
final Map<String, String> defaultParameters) {
for (final String key : defaultParameters.keySet()) {
if (customParameters.containsKey(key) && !Objects.equals(customParameters.get(key), defaultParameters.get(key))) {
throw new IllegalArgumentException("Cannot overwrite default JDBC parameter " + key);
}
}
}

/**
* Retrieves connection_properties from config and also validates if custom jdbc_url parameters
* overlap with the default properties
*
* @param config A configuration used to check Jdbc connection
* @return A mapping of connection properties
*/
public static Map<String, String> getConnectionProperties(final JsonNode config) {
final Map<String, String> customProperties = JdbcUtils.parseJdbcParameters(config, JdbcUtils.JDBC_URL_PARAMS_KEY);
final Map<String, String> defaultProperties = JdbcDataSourceUtils.getDefaultConnectionProperties(config);
assertCustomParametersDontOverwriteDefaultParameters(customProperties, defaultProperties);
return MoreMaps.merge(customProperties, defaultProperties);
}

/**
* Retrieves default connection_properties from config
*
* TODO: make this method abstract and add parity features to destination connectors
*
* @param config A configuration used to check Jdbc connection
* @return A mapping of the default connection properties
*/
private static Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
// NOTE that Postgres returns an empty map for some reason?
return JdbcUtils.parseJdbcParameters(config, "connection_properties", DEFAULT_JDBC_PARAMETERS_DELIMITER);
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.integrations.source.jdbc;

import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.assertCustomParametersDontOverwriteDefaultParameters;
import static io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils.assertCustomParametersDontOverwriteDefaultParameters;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.airbyte.integrations.source.jdbc;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;

public class JdbcDataSourceUtilsTest {

@Test
void test() {
final String validConfigString = "{\"jdbc_url_params\":\"key1=val1&key3=key3\",\"connection_properties\":\"key1=val1&key2=val2\"}";
final JsonNode validConfig = Jsons.deserialize(validConfigString);
final Map<String, String> connectionProperties = JdbcDataSourceUtils.getConnectionProperties(validConfig);
final List<String> validKeys = List.of("key1", "key2", "key3");
validKeys.forEach(key -> assertTrue(connectionProperties.containsKey(key)));

// For an invalid config, there is a conflict betweeen the values of keys in jdbc_url_params and connection_properties
final String invalidConfigString = "{\"jdbc_url_params\":\"key1=val2&key3=key3\",\"connection_properties\":\"key1=val1&key2=val2\"}";
final JsonNode invalidConfig = Jsons.deserialize(invalidConfigString);
final Exception exception = assertThrows(IllegalArgumentException.class, () -> {
JdbcDataSourceUtils.getConnectionProperties(invalidConfig);
});

final String expectedMessage = "Cannot overwrite default JDBC parameter key1";
assertThat(expectedMessage.equals(exception.getMessage()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class OracleSource extends AbstractJdbcSource<JDBCType> implements Source
private static final String UNRECOGNIZED = "Unrecognized";
private static final String CONNECTION_DATA = "connection_data";

private static final String ORACLE_JDBC_PARAMETER_DELIMITER = ";";

enum Protocol {
TCP,
TCPS
Expand Down Expand Up @@ -115,7 +117,7 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
}

if (!additionalParameters.isEmpty()) {
final String connectionParams = String.join(getJdbcParameterDelimiter(), additionalParameters);
final String connectionParams = String.join(ORACLE_JDBC_PARAMETER_DELIMITER, additionalParameters);
configBuilder.put(JdbcUtils.CONNECTION_PROPERTIES_KEY, connectionParams);
}

Expand Down Expand Up @@ -193,11 +195,6 @@ public Set<String> getExcludedInternalNameSpaces() {
return Set.of();
}

@Override
protected String getJdbcParameterDelimiter() {
return ";";
}

@Override
protected int getStateEmissionFrequency() {
return INTERMEDIATE_STATE_EMISSION_FREQUENCY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,6 @@ public static Source sshWrappedSource() {
this.featureFlags = new EnvVariableFeatureFlags();
}

@Override
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
return Collections.emptyMap();
}

@Override
public JsonNode toDatabaseConfig(final JsonNode config) {
final List<String> additionalParameters = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.jdbc.StreamingJdbcDatabase;
import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import java.io.IOException;
import java.sql.JDBCType;
import java.sql.SQLException;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -41,10 +44,21 @@ public SnowflakeSource(final String airbyteEnvironment) {
}

@Override
protected DataSource createDataSource(final JsonNode sourceConfig) {
public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException {
final JsonNode jdbcConfig = toDatabaseConfig(sourceConfig);
// Create the data source
final DataSource dataSource = SnowflakeDataSourceUtils.createDataSource(sourceConfig, airbyteEnvironment);
dataSources.add(dataSource);
return dataSource;

final JdbcDatabase database = new StreamingJdbcDatabase(
dataSource,
sourceOperations,
streamingQueryConfigProvider);

quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString);
database.setSourceConfig(sourceConfig);
database.setDatabaseConfig(jdbcConfig);
return database;
}

@Override
Expand Down

0 comments on commit 2703a44

Please sign in to comment.