From 72d2f75d141cc53e00c508195bf804b3db695829 Mon Sep 17 00:00:00 2001 From: Aniket Shrimal Date: Thu, 19 Sep 2024 01:56:01 +0530 Subject: [PATCH 1/6] Add support for Custom Credentials Provider Class in the connectors --- .../jdbc/dialect/GenericDatabaseDialect.java | 19 +++-- .../connect/jdbc/sink/JdbcSinkConfig.java | 78 +++++++++++++++++-- .../source/JdbcSourceConnectorConfig.java | 70 +++++++++++++++++ .../jdbc/util/BasicJdbcCredentials.java | 37 +++++++++ .../util/DefaultJdbcCredentialsProvider.java | 38 +++++++++ .../connect/jdbc/util/JdbcCredentials.java | 31 ++++++++ .../jdbc/util/JdbcCredentialsProvider.java | 32 ++++++++ .../JdbcCredentialsProviderValidator.java | 40 ++++++++++ .../connect/jdbc/util/StringUtils.java | 6 ++ .../connect/jdbc/sink/JdbcSinkConfigTest.java | 67 ++++++++++++++++ .../source/JdbcSourceConnectorConfigTest.java | 26 +++++++ .../ConfigurableJdbcCredentialsProvider.java | 26 +++++++ .../DefaultJdbcCredentialsProviderTest.java | 46 +++++++++++ 13 files changed, 505 insertions(+), 11 deletions(-) create mode 100644 src/main/java/io/confluent/connect/jdbc/util/BasicJdbcCredentials.java create mode 100644 src/main/java/io/confluent/connect/jdbc/util/DefaultJdbcCredentialsProvider.java create mode 100644 src/main/java/io/confluent/connect/jdbc/util/JdbcCredentials.java create mode 100644 src/main/java/io/confluent/connect/jdbc/util/JdbcCredentialsProvider.java create mode 100644 src/main/java/io/confluent/connect/jdbc/util/JdbcCredentialsProviderValidator.java create mode 100644 src/test/java/io/confluent/connect/jdbc/util/ConfigurableJdbcCredentialsProvider.java create mode 100644 src/test/java/io/confluent/connect/jdbc/util/DefaultJdbcCredentialsProviderTest.java diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java index d50a08c19..51d883324 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -15,6 +15,8 @@ package io.confluent.connect.jdbc.dialect; +import io.confluent.connect.jdbc.util.JdbcCredentials; +import io.confluent.connect.jdbc.util.JdbcCredentialsProvider; import java.time.ZoneOffset; import java.util.TimeZone; @@ -114,6 +116,8 @@ public class GenericDatabaseDialect implements DatabaseDialect { private static final String PRECISION_FIELD = "connect.decimal.precision"; + private JdbcCredentialsProvider jdbcCredentialsProvider; + /** * The provider for {@link GenericDatabaseDialect}. */ @@ -191,6 +195,7 @@ protected GenericDatabaseDialect( quoteSqlIdentifiers = QuoteMethod.get( config.getString(JdbcSinkConfig.QUOTE_SQL_IDENTIFIERS_CONFIG) ); + jdbcCredentialsProvider = ((JdbcSinkConfig) config).credentialsProvider(); } else { catalogPattern = config.getString(JdbcSourceTaskConfig.CATALOG_PATTERN_CONFIG); schemaPattern = config.getString(JdbcSourceTaskConfig.SCHEMA_PATTERN_CONFIG); @@ -202,6 +207,7 @@ protected GenericDatabaseDialect( if (config instanceof JdbcSourceConnectorConfig) { mapNumerics = ((JdbcSourceConnectorConfig)config).numericMapping(); batchMaxRows = config.getInt(JdbcSourceConnectorConfig.BATCH_MAX_ROWS_CONFIG); + jdbcCredentialsProvider = ((JdbcSourceConnectorConfig) config).credentialsProvider(); } else { mapNumerics = NumericMapping.NONE; batchMaxRows = 0; @@ -238,15 +244,16 @@ protected TimeZone timeZone() { @Override public Connection getConnection() throws SQLException { - // These config names are the same for both source and sink configs ... - String username = config.getString(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG); + JdbcCredentials jdbcCredentials = jdbcCredentialsProvider.getJdbcCredentials(); + Password dbPassword = config.getPassword(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG); + Properties properties = new Properties(); - if (username != null) { - properties.setProperty("user", username); + if (jdbcCredentials.getUsername() != null) { + properties.setProperty("user", jdbcCredentials.getUsername()); } - if (dbPassword != null) { - properties.setProperty("password", dbPassword.value()); + if (jdbcCredentials.getPassword() != null) { + properties.setProperty("password", jdbcCredentials.getPassword()); } properties = addConnectionProperties(properties); // Timeout is 40 seconds to be as long as possible for customer to have a long connection diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java index b6d7586e0..a3c8cb77e 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java @@ -34,15 +34,22 @@ import io.confluent.connect.jdbc.util.DatabaseDialectRecommender; import io.confluent.connect.jdbc.util.DeleteEnabledRecommender; import io.confluent.connect.jdbc.util.EnumRecommender; +import io.confluent.connect.jdbc.util.JdbcCredentialsProvider; +import io.confluent.connect.jdbc.util.JdbcCredentialsProviderValidator; import io.confluent.connect.jdbc.util.PrimaryKeyModeRecommender; import io.confluent.connect.jdbc.util.QuoteMethod; import io.confluent.connect.jdbc.util.StringUtils; import io.confluent.connect.jdbc.util.TableType; import io.confluent.connect.jdbc.util.TimeZoneValidator; +import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.connect.errors.ConnectException; public class JdbcSinkConfig extends AbstractConfig { @@ -288,6 +295,27 @@ public enum DateTimezone { private static final String MSSQL_USE_MERGE_HOLDLOCK_DISPLAY = "SQL Server - Use HOLDLOCK in MERGE"; + public static final String CREDENTIALS_PROVIDER_CLASS_CONFIG = + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG; + public static final Class CREDENTIALS_PROVIDER_CLASS_DEFAULT = + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_DEFAULT; + + public static final String CREDENTIALS_PROVIDER_CLASS_DISPLAY = + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_DISPLAY; + + public static final String CREDENTIALS_PROVIDER_CLASS_DOC = + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_DOC; + + /** + * The properties that begin with this prefix will be used to configure a class, specified by + * {@code jdbc.credentials.provider.class} if it implements {@link Configurable}. + */ + public static final String CREDENTIALS_PROVIDER_CONFIG_PREFIX = + CREDENTIALS_PROVIDER_CLASS_CONFIG.substring( + 0, + CREDENTIALS_PROVIDER_CLASS_CONFIG.lastIndexOf(".") + 1 + ); + public static final ConfigDef CONFIG_DEF = new ConfigDef() // Connection .define( @@ -322,8 +350,18 @@ public enum DateTimezone { 3, ConfigDef.Width.MEDIUM, CONNECTION_PASSWORD_DISPLAY - ) - .define( + ).define( + CREDENTIALS_PROVIDER_CLASS_CONFIG, + Type.CLASS, + CREDENTIALS_PROVIDER_CLASS_DEFAULT, + new JdbcCredentialsProviderValidator(), + Importance.LOW, + CREDENTIALS_PROVIDER_CLASS_DOC, + CONNECTION_GROUP, + 4, + Width.LONG, + CREDENTIALS_PROVIDER_CLASS_DISPLAY + ).define( DIALECT_NAME_CONFIG, ConfigDef.Type.STRING, DIALECT_NAME_DEFAULT, @@ -331,7 +369,7 @@ public enum DateTimezone { ConfigDef.Importance.LOW, DIALECT_NAME_DOC, CONNECTION_GROUP, - 4, + 5, ConfigDef.Width.LONG, DIALECT_NAME_DISPLAY, DatabaseDialectRecommender.INSTANCE @@ -344,7 +382,7 @@ public enum DateTimezone { ConfigDef.Importance.LOW, CONNECTION_ATTEMPTS_DOC, CONNECTION_GROUP, - 5, + 6, ConfigDef.Width.SHORT, CONNECTION_ATTEMPTS_DISPLAY ).define( @@ -354,7 +392,7 @@ public enum DateTimezone { ConfigDef.Importance.LOW, CONNECTION_BACKOFF_DOC, CONNECTION_GROUP, - 6, + 7, ConfigDef.Width.SHORT, CONNECTION_BACKOFF_DISPLAY ) @@ -612,6 +650,36 @@ public JdbcSinkConfig(Map props) { tableTypes = TableType.parse(getList(TABLE_TYPES_CONFIG)); } + @SuppressWarnings("unchecked") + public JdbcCredentialsProvider credentialsProvider() { + String username = getString(JdbcSinkConfig.CONNECTION_USER); + Password dbPassword = getPassword(JdbcSinkConfig.CONNECTION_PASSWORD); + + try { + JdbcCredentialsProvider provider = ((Class) getClass( + JdbcSinkConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance(); + + if (provider instanceof Configurable) { + Map configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX); + configs.remove( + CREDENTIALS_PROVIDER_CLASS_CONFIG.substring(CREDENTIALS_PROVIDER_CONFIG_PREFIX.length()) + ); + + if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(dbPassword.value())) { + configs.put(JdbcSinkConfig.CONNECTION_USER, username); + configs.put(JdbcSinkConfig.CONNECTION_PASSWORD, dbPassword.value()); + } + + ((Configurable) provider).configure(configs); + } + + return provider; + } catch (ClassCastException | IllegalAccessException | InstantiationException e) { + throw new ConnectException( + "Invalid class for: " + JdbcSinkConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, e); + } + } + private String getPasswordValue(String key) { Password password = getPassword(key); if (password != null) { diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index 4ce538aa1..29cc4e881 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -15,6 +15,7 @@ package io.confluent.connect.jdbc.source; +import io.confluent.connect.jdbc.util.StringUtils; import java.sql.Connection; import java.sql.Timestamp; import java.time.ZoneId; @@ -32,7 +33,10 @@ import io.confluent.connect.jdbc.dialect.DatabaseDialects; import io.confluent.connect.jdbc.util.DatabaseDialectRecommender; import io.confluent.connect.jdbc.util.DateTimeUtils; +import io.confluent.connect.jdbc.util.DefaultJdbcCredentialsProvider; import io.confluent.connect.jdbc.util.EnumRecommender; +import io.confluent.connect.jdbc.util.JdbcCredentialsProvider; +import io.confluent.connect.jdbc.util.JdbcCredentialsProviderValidator; import io.confluent.connect.jdbc.util.QuoteMethod; import io.confluent.connect.jdbc.util.TimeZoneValidator; @@ -40,6 +44,7 @@ import java.util.function.Function; import java.util.regex.Pattern; +import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; @@ -50,6 +55,7 @@ import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; @@ -324,6 +330,26 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { "Number of times to retry SQL exceptions encountered when executing queries."; public static final String QUERY_RETRIES_DISPLAY = "Query Retry Attempts"; + public static final String CREDENTIALS_PROVIDER_CLASS_CONFIG = "jdbc.credentials.provider.class"; + public static final Class CREDENTIALS_PROVIDER_CLASS_DEFAULT = + DefaultJdbcCredentialsProvider.class; + + public static final String CREDENTIALS_PROVIDER_CLASS_DISPLAY = "JDBC Credentials Provider Class"; + + public static final String CREDENTIALS_PROVIDER_CLASS_DOC = "Credentials provider or provider " + + "chain to use for authentication to database. By default the connector uses ``" + + DefaultJdbcCredentialsProvider.class.getName() + "``."; + + /** + * The properties that begin with this prefix will be used to configure a class, specified by + * {@code jdbc.credentials.provider.class} if it implements {@link Configurable}. + */ + public static final String CREDENTIALS_PROVIDER_CONFIG_PREFIX = + CREDENTIALS_PROVIDER_CLASS_CONFIG.substring( + 0, + CREDENTIALS_PROVIDER_CLASS_CONFIG.lastIndexOf(".") + 1 + ); + private static final EnumRecommender QUOTE_METHOD_RECOMMENDER = EnumRecommender.in(QuoteMethod.values()); @@ -455,6 +481,17 @@ private static final void addDatabaseOptions(ConfigDef config) { ++orderInGroup, Width.SHORT, CONNECTION_PASSWORD_DISPLAY + ).define( + CREDENTIALS_PROVIDER_CLASS_CONFIG, + Type.CLASS, + CREDENTIALS_PROVIDER_CLASS_DEFAULT, + new JdbcCredentialsProviderValidator(), + Importance.LOW, + CREDENTIALS_PROVIDER_CLASS_DOC, + DATABASE_GROUP, + ++orderInGroup, + Width.LONG, + CREDENTIALS_PROVIDER_CLASS_DISPLAY ).define( CONNECTION_ATTEMPTS_CONFIG, Type.INT, @@ -798,6 +835,39 @@ public String topicPrefix() { return getString(JdbcSourceTaskConfig.TOPIC_PREFIX_CONFIG).trim(); } + + @SuppressWarnings("unchecked") + public JdbcCredentialsProvider credentialsProvider() { + String username = getString(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG); + Password dbPassword = getPassword(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG); + + try { + JdbcCredentialsProvider provider = ((Class) getClass( + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance(); + + if (provider instanceof Configurable) { + Map configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX); + configs.remove( + CREDENTIALS_PROVIDER_CLASS_CONFIG.substring(CREDENTIALS_PROVIDER_CONFIG_PREFIX.length()) + ); + + if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(dbPassword.value())) { + configs.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, username); + configs.put( + JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, dbPassword.value() + ); + } + + ((Configurable) provider).configure(configs); + } + + return provider; + } catch (ClassCastException | IllegalAccessException | InstantiationException e) { + throw new ConnectException( + "Invalid class for: " + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, e); + } + } + /** * A recommender that caches values returned by a delegate, where the cache remains valid for a * specified duration and as long as the configuration remains unchanged. diff --git a/src/main/java/io/confluent/connect/jdbc/util/BasicJdbcCredentials.java b/src/main/java/io/confluent/connect/jdbc/util/BasicJdbcCredentials.java new file mode 100644 index 000000000..f99e8c9d5 --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/util/BasicJdbcCredentials.java @@ -0,0 +1,37 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc.util; + +public class BasicJdbcCredentials implements JdbcCredentials { + + String username; + String password; + + public BasicJdbcCredentials(String user, String password) { + this.username = user; + this.password = password; + } + + @Override + public String getUsername() { + return username; + } + + @Override + public String getPassword() { + return password; + } +} diff --git a/src/main/java/io/confluent/connect/jdbc/util/DefaultJdbcCredentialsProvider.java b/src/main/java/io/confluent/connect/jdbc/util/DefaultJdbcCredentialsProvider.java new file mode 100644 index 000000000..8ff4c079d --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/util/DefaultJdbcCredentialsProvider.java @@ -0,0 +1,38 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc.util; + +import java.util.Map; +import org.apache.kafka.common.Configurable; + +public class DefaultJdbcCredentialsProvider implements JdbcCredentialsProvider, Configurable { + + private static final String DB_USERNAME_CONFIG = "connection.user"; + private static final String DB_PASSWORD_CONFIG = "connection.password"; + String username; + String password; + + @Override + public JdbcCredentials getJdbcCredentials() { + return new BasicJdbcCredentials(username, password); + } + + @Override + public void configure(Map map) { + username = (String) map.get(DB_USERNAME_CONFIG); + password = (String) map.get(DB_PASSWORD_CONFIG); + } +} diff --git a/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentials.java b/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentials.java new file mode 100644 index 000000000..9e201b7ec --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentials.java @@ -0,0 +1,31 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc.util; + +public interface JdbcCredentials { + + /** + * @return Username to use for authentication to database + */ + String getUsername(); + + /** + * + * @return Password/Token to use for authentication to database + */ + String getPassword(); + +} diff --git a/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentialsProvider.java b/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentialsProvider.java new file mode 100644 index 000000000..d27a03e07 --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentialsProvider.java @@ -0,0 +1,32 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc.util; + +public interface JdbcCredentialsProvider { + + /** + * @return JdbcCredentials which caller can use for authentication purpose + */ + JdbcCredentials getJdbcCredentials(); + + /** + * Instructs the provider to refresh or renew credentials. + * Default behavior is no-op. + */ + default void refresh() { + // no need to refresh anything by default + } +} diff --git a/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentialsProviderValidator.java b/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentialsProviderValidator.java new file mode 100644 index 000000000..f05dc1ea9 --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentialsProviderValidator.java @@ -0,0 +1,40 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc.util; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +public class JdbcCredentialsProviderValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object provider) { + if (provider != null && provider instanceof Class + && JdbcCredentialsProvider.class.isAssignableFrom((Class) provider)) { + return; + } + throw new ConfigException( + name, + provider, + "Class must extend: " + JdbcCredentialsProvider.class + ); + } + + @Override + public String toString() { + return "Any class implementing: " + JdbcCredentialsProvider.class; + } +} + diff --git a/src/main/java/io/confluent/connect/jdbc/util/StringUtils.java b/src/main/java/io/confluent/connect/jdbc/util/StringUtils.java index a24382860..200d5bc5a 100644 --- a/src/main/java/io/confluent/connect/jdbc/util/StringUtils.java +++ b/src/main/java/io/confluent/connect/jdbc/util/StringUtils.java @@ -15,6 +15,8 @@ package io.confluent.connect.jdbc.util; +import static org.apache.kafka.common.utils.Utils.isBlank; + import org.apache.kafka.connect.data.Schema; /** @@ -72,4 +74,8 @@ public static String schemaTypeOrNull(Schema schema) { return schema.type().getName(); } } + + public static boolean isNotBlank(String string) { + return !isBlank(string); + } } diff --git a/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java b/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java index 0775dd145..fdfa15340 100644 --- a/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java +++ b/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java @@ -19,6 +19,9 @@ import java.util.HashMap; import java.util.Map; +import io.confluent.connect.jdbc.util.ConfigurableJdbcCredentialsProvider; +import io.confluent.connect.jdbc.util.DefaultJdbcCredentialsProvider; +import io.confluent.connect.jdbc.util.JdbcCredentialsProvider; import io.confluent.connect.jdbc.util.TableType; import org.apache.kafka.common.config.ConfigException; @@ -27,6 +30,8 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class JdbcSinkConfigTest { @@ -133,6 +138,68 @@ public void shouldCreateConfigWithTrailingWhitespaceInTableTypes() { assertTableTypes(TableType.TABLE); } + @Test + public void shouldCreateConfigWithValidCredentialsProviderClass() { + props.put(JdbcSinkConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + DefaultJdbcCredentialsProvider.class.getName()); + createConfig(); + JdbcCredentialsProvider provider = config.credentialsProvider(); + assertNotNull(provider); + assertTrue(provider instanceof DefaultJdbcCredentialsProvider); + } + + @Test(expected = ConfigException.class) + public void shouldFailToCreateConfigWithInvalidCredentialsProviderClass() { + // Configuring SqliteHelper Class here which does not extends JdbcCredentialsProvider Interface + props.put(JdbcSinkConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + SqliteHelper.class.getName()); + createConfig(); + } + + @Test + public void testConfigurableCredentialsProviderClass() { + // Test username and password value + String username = "test_user"; + String password = "test_password"; + + props.put(JdbcSinkConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + ConfigurableJdbcCredentialsProvider.class.getName()); + + // Adding custom config with prefix - jdbc.credentials.provider. to verify Configurable + // functionality + props.put(JdbcSinkConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX + "username", username); + props.put(JdbcSinkConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX + "password", password); + + createConfig(); + JdbcCredentialsProvider provider = config.credentialsProvider(); + assertNotNull(provider); + assertTrue(provider instanceof ConfigurableJdbcCredentialsProvider); + + // Assert Username and password are returned from config provider instance correctly + assertEquals(username, provider.getJdbcCredentials().getUsername()); + assertEquals(password, provider.getJdbcCredentials().getPassword()); + } + + @Test + public void testDefaultBehaviorWhenConnectionConfigsArePresent() { + // Test username and password value + String username = "test_user"; + String password = "test_password"; + + props.put(JdbcSinkConfig.CONNECTION_USER , username); + props.put(JdbcSinkConfig.CONNECTION_PASSWORD, password); + + createConfig(); + JdbcCredentialsProvider provider = config.credentialsProvider(); + assertNotNull(provider); + assertTrue(provider instanceof DefaultJdbcCredentialsProvider); + + // Assert username and password are updated in provider class instance according to + // connection.user and connection.password config values. + assertEquals(username, provider.getJdbcCredentials().getUsername()); + assertEquals(password, provider.getJdbcCredentials().getPassword()); + } + protected void createConfig() { config = new JdbcSinkConfig(props); } diff --git a/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java b/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java index 7059cafcc..50873a9e5 100644 --- a/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java +++ b/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java @@ -14,6 +14,7 @@ */ package io.confluent.connect.jdbc.source; +import io.confluent.connect.jdbc.util.DefaultJdbcCredentialsProvider; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Recommender; import org.apache.kafka.common.config.ConfigValue; @@ -253,6 +254,31 @@ public void testTooLongTopicPrefix() { assertFalse(connectionAttemptsConfig.errorMessages().isEmpty()); } + @Test + public void testCredentialsProviderClassConfig() { + // Configuring MockTime Class here which does not extends JdbcCredentialsProvider Interface + props.put(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + MockTime.class.getName()); + Map validatedConfig = + JdbcSourceConnectorConfig.baseConfigDef().validateAll(props); + ConfigValue credentialsProviderConfig = + validatedConfig.get(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG); + + assertNotNull(credentialsProviderConfig); + assertFalse(credentialsProviderConfig.errorMessages().isEmpty()); + + // Configuring DefaultJdbcCredentialsProvider Class here which extends JdbcCredentialsProvider + // Interface + props.put(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + DefaultJdbcCredentialsProvider.class.getName()); + validatedConfig = + JdbcSourceConnectorConfig.baseConfigDef().validateAll(props); + credentialsProviderConfig = + validatedConfig.get(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG); + assertNotNull(credentialsProviderConfig); + assertTrue(credentialsProviderConfig.errorMessages().isEmpty()); + } + @SuppressWarnings("unchecked") protected void assertContains(Collection actual, T... expected) { for (T e : expected) { diff --git a/src/test/java/io/confluent/connect/jdbc/util/ConfigurableJdbcCredentialsProvider.java b/src/test/java/io/confluent/connect/jdbc/util/ConfigurableJdbcCredentialsProvider.java new file mode 100644 index 000000000..ee0f55c01 --- /dev/null +++ b/src/test/java/io/confluent/connect/jdbc/util/ConfigurableJdbcCredentialsProvider.java @@ -0,0 +1,26 @@ +package io.confluent.connect.jdbc.util; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.common.Configurable; + +/** + * This is a test class for JdbcCredentialsProvider Interface which is created to test the + * configurable functionality + */ +public class ConfigurableJdbcCredentialsProvider implements JdbcCredentialsProvider, + Configurable { + + Map configMap = new HashMap<>(); + + @Override + public void configure(Map map) { + configMap = new HashMap<>(map); + } + + @Override + public JdbcCredentials getJdbcCredentials() { + return new BasicJdbcCredentials((String) configMap.get("username"), + (String) configMap.get("password")); + } +} diff --git a/src/test/java/io/confluent/connect/jdbc/util/DefaultJdbcCredentialsProviderTest.java b/src/test/java/io/confluent/connect/jdbc/util/DefaultJdbcCredentialsProviderTest.java new file mode 100644 index 000000000..3355ff5af --- /dev/null +++ b/src/test/java/io/confluent/connect/jdbc/util/DefaultJdbcCredentialsProviderTest.java @@ -0,0 +1,46 @@ +package io.confluent.connect.jdbc.util; + +import static org.junit.Assert.assertEquals; + +import java.util.HashMap; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; + +public class DefaultJdbcCredentialsProviderTest { + + DefaultJdbcCredentialsProvider defaultCredentialsProvider; + String testUsername = "username"; + String testPassword = "password"; + + @Before + public void setup() { + Map configMap = new HashMap<>(); + configMap.put("connection.user", testUsername); + configMap.put("connection.password", testPassword); + + defaultCredentialsProvider = new DefaultJdbcCredentialsProvider(); + defaultCredentialsProvider.configure(configMap); + + } + @Test + public void testDatabaseCredsAreConfigurable() { + // Assert username and password are configured to test values which is configured in setup method + assertEquals(testUsername, defaultCredentialsProvider.getJdbcCredentials().getUsername()); + assertEquals(testPassword, defaultCredentialsProvider.getJdbcCredentials().getPassword()); + } + + // Test Default Refresh Functionality (which is NoOp) + @Test + public void testDefaultRefreshFunctionality() { + + assertEquals(testUsername, defaultCredentialsProvider.getJdbcCredentials().getUsername()); + assertEquals(testPassword, defaultCredentialsProvider.getJdbcCredentials().getPassword()); + + defaultCredentialsProvider.refresh(); + + // Assert username and password are same + assertEquals(testUsername, defaultCredentialsProvider.getJdbcCredentials().getUsername()); + assertEquals(testPassword, defaultCredentialsProvider.getJdbcCredentials().getPassword()); + } +} From 1f3f9682c472aabd17db8ef27eb219aabfda0687 Mon Sep 17 00:00:00 2001 From: Aniket Shrimal Date: Thu, 19 Sep 2024 01:59:59 +0530 Subject: [PATCH 2/6] Minor cleanup --- .../confluent/connect/jdbc/dialect/GenericDatabaseDialect.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java index 51d883324..29602ce0c 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -246,8 +246,6 @@ protected TimeZone timeZone() { public Connection getConnection() throws SQLException { JdbcCredentials jdbcCredentials = jdbcCredentialsProvider.getJdbcCredentials(); - Password dbPassword = config.getPassword(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG); - Properties properties = new Properties(); if (jdbcCredentials.getUsername() != null) { properties.setProperty("user", jdbcCredentials.getUsername()); From eb14f834cdbeb3a0ea78d53c828a9ad415bcbd4c Mon Sep 17 00:00:00 2001 From: Aniket Shrimal Date: Thu, 19 Sep 2024 03:11:06 +0530 Subject: [PATCH 3/6] Minor fix --- .../connect/jdbc/dialect/GenericDatabaseDialect.java | 1 - .../io/confluent/connect/jdbc/sink/JdbcSinkConfig.java | 4 +++- .../connect/jdbc/source/JdbcSourceConnectorConfig.java | 8 ++++---- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java index 29602ce0c..4f845f08f 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.config.types.Password; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java index a3c8cb77e..cf76fcdd4 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java @@ -665,8 +665,10 @@ public JdbcCredentialsProvider credentialsProvider() { CREDENTIALS_PROVIDER_CLASS_CONFIG.substring(CREDENTIALS_PROVIDER_CONFIG_PREFIX.length()) ); - if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(dbPassword.value())) { + if (StringUtils.isNotBlank(username)) { configs.put(JdbcSinkConfig.CONNECTION_USER, username); + } + if (dbPassword != null && StringUtils.isNotBlank(dbPassword.value())) { configs.put(JdbcSinkConfig.CONNECTION_PASSWORD, dbPassword.value()); } diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index 29cc4e881..956556ae9 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -851,11 +851,11 @@ public JdbcCredentialsProvider credentialsProvider() { CREDENTIALS_PROVIDER_CLASS_CONFIG.substring(CREDENTIALS_PROVIDER_CONFIG_PREFIX.length()) ); - if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(dbPassword.value())) { + if (StringUtils.isNotBlank(username)) { configs.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, username); - configs.put( - JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, dbPassword.value() - ); + } + if (dbPassword != null && StringUtils.isNotBlank(dbPassword.value())) { + configs.put(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, dbPassword.value()); } ((Configurable) provider).configure(configs); From 2fc2a46a6080870540d13c4e1817838562b5d815 Mon Sep 17 00:00:00 2001 From: Aniket Shrimal Date: Wed, 25 Sep 2024 01:50:52 +0530 Subject: [PATCH 4/6] Minor: add test for refresh functionality --- .../connect/jdbc/sink/JdbcSinkConfigTest.java | 28 +++++++++++++++-- ...tConfigurableJdbcCredentialsProvider.java} | 2 +- .../TestRefreshJdbcCredentialsProvider.java | 31 +++++++++++++++++++ 3 files changed, 57 insertions(+), 4 deletions(-) rename src/test/java/io/confluent/connect/jdbc/util/{ConfigurableJdbcCredentialsProvider.java => TestConfigurableJdbcCredentialsProvider.java} (87%) create mode 100644 src/test/java/io/confluent/connect/jdbc/util/TestRefreshJdbcCredentialsProvider.java diff --git a/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java b/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java index fdfa15340..7362b317c 100644 --- a/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java +++ b/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java @@ -19,9 +19,11 @@ import java.util.HashMap; import java.util.Map; -import io.confluent.connect.jdbc.util.ConfigurableJdbcCredentialsProvider; import io.confluent.connect.jdbc.util.DefaultJdbcCredentialsProvider; import io.confluent.connect.jdbc.util.JdbcCredentialsProvider; +import io.confluent.connect.jdbc.util.JdbcCredentials; +import io.confluent.connect.jdbc.util.TestConfigurableJdbcCredentialsProvider; +import io.confluent.connect.jdbc.util.TestRefreshJdbcCredentialsProvider; import io.confluent.connect.jdbc.util.TableType; import org.apache.kafka.common.config.ConfigException; @@ -163,7 +165,7 @@ public void testConfigurableCredentialsProviderClass() { String password = "test_password"; props.put(JdbcSinkConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, - ConfigurableJdbcCredentialsProvider.class.getName()); + TestConfigurableJdbcCredentialsProvider.class.getName()); // Adding custom config with prefix - jdbc.credentials.provider. to verify Configurable // functionality @@ -173,13 +175,33 @@ public void testConfigurableCredentialsProviderClass() { createConfig(); JdbcCredentialsProvider provider = config.credentialsProvider(); assertNotNull(provider); - assertTrue(provider instanceof ConfigurableJdbcCredentialsProvider); + assertTrue(provider instanceof TestConfigurableJdbcCredentialsProvider); // Assert Username and password are returned from config provider instance correctly assertEquals(username, provider.getJdbcCredentials().getUsername()); assertEquals(password, provider.getJdbcCredentials().getPassword()); } + @Test + public void testRefreshFunctionalityOfCredentialsProviderClass() { + + props.put(JdbcSinkConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + TestRefreshJdbcCredentialsProvider.class.getName()); + + createConfig(); + JdbcCredentialsProvider provider = config.credentialsProvider(); + assertNotNull(provider); + assertTrue(provider instanceof TestRefreshJdbcCredentialsProvider); + + // Assert Username and password are returned from config provider instance correctly. Also + // password field is rotated everytime password is fetched. + for (int i = 0; i < 5; i++) { + JdbcCredentials basicJdbcCredentials = provider.getJdbcCredentials(); + assertEquals("test-user", basicJdbcCredentials.getUsername()); + assertEquals("test-password-" + i, basicJdbcCredentials.getPassword()); + } + } + @Test public void testDefaultBehaviorWhenConnectionConfigsArePresent() { // Test username and password value diff --git a/src/test/java/io/confluent/connect/jdbc/util/ConfigurableJdbcCredentialsProvider.java b/src/test/java/io/confluent/connect/jdbc/util/TestConfigurableJdbcCredentialsProvider.java similarity index 87% rename from src/test/java/io/confluent/connect/jdbc/util/ConfigurableJdbcCredentialsProvider.java rename to src/test/java/io/confluent/connect/jdbc/util/TestConfigurableJdbcCredentialsProvider.java index ee0f55c01..b566e1b2f 100644 --- a/src/test/java/io/confluent/connect/jdbc/util/ConfigurableJdbcCredentialsProvider.java +++ b/src/test/java/io/confluent/connect/jdbc/util/TestConfigurableJdbcCredentialsProvider.java @@ -8,7 +8,7 @@ * This is a test class for JdbcCredentialsProvider Interface which is created to test the * configurable functionality */ -public class ConfigurableJdbcCredentialsProvider implements JdbcCredentialsProvider, +public class TestConfigurableJdbcCredentialsProvider implements JdbcCredentialsProvider, Configurable { Map configMap = new HashMap<>(); diff --git a/src/test/java/io/confluent/connect/jdbc/util/TestRefreshJdbcCredentialsProvider.java b/src/test/java/io/confluent/connect/jdbc/util/TestRefreshJdbcCredentialsProvider.java new file mode 100644 index 000000000..4d01148e6 --- /dev/null +++ b/src/test/java/io/confluent/connect/jdbc/util/TestRefreshJdbcCredentialsProvider.java @@ -0,0 +1,31 @@ +package io.confluent.connect.jdbc.util; + +/** + * This is a test Class for JdbcCredentialsProvider Interface which is created to test the refresh + * functionality. The password is updated everytime credentials are fetched through + * 'getJdbcCredentials()' method. + */ +public class TestRefreshJdbcCredentialsProvider implements JdbcCredentialsProvider { + + + String username = "test-user"; + String password; + private int numRotations; + + public TestRefreshJdbcCredentialsProvider() { + numRotations = 0; + } + + @Override + public JdbcCredentials getJdbcCredentials() { + refresh(); + return new BasicJdbcCredentials(username, password); + } + + @Override + public void refresh() { + password = "test-password-" + numRotations; + numRotations++; + } + +} From e85a54a1222e0413196fc3ae52ae5e300f592077 Mon Sep 17 00:00:00 2001 From: Aniket Shrimal Date: Wed, 25 Sep 2024 13:34:39 +0530 Subject: [PATCH 5/6] Minor: address review comments --- .../jdbc/dialect/GenericDatabaseDialect.java | 2 +- .../connect/jdbc/sink/JdbcSinkConfig.java | 15 ++++++--------- .../source/JdbcSourceConnectorConfig.java | 19 ++++++++----------- .../TestRefreshJdbcCredentialsProvider.java | 2 -- 4 files changed, 15 insertions(+), 23 deletions(-) diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java index 4f845f08f..f47408dbd 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -194,7 +194,7 @@ protected GenericDatabaseDialect( quoteSqlIdentifiers = QuoteMethod.get( config.getString(JdbcSinkConfig.QUOTE_SQL_IDENTIFIERS_CONFIG) ); - jdbcCredentialsProvider = ((JdbcSinkConfig) config).credentialsProvider(); + jdbcCredentialsProvider = sinkConfig.credentialsProvider(); } else { catalogPattern = config.getString(JdbcSourceTaskConfig.CATALOG_PATTERN_CONFIG); schemaPattern = config.getString(JdbcSourceTaskConfig.SCHEMA_PATTERN_CONFIG); diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java index cf76fcdd4..a4cce58e4 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java @@ -295,6 +295,12 @@ public enum DateTimezone { private static final String MSSQL_USE_MERGE_HOLDLOCK_DISPLAY = "SQL Server - Use HOLDLOCK in MERGE"; + /** + * The properties that begin with this prefix will be used to configure a class, specified by + * {@code jdbc.credentials.provider.class} if it implements {@link Configurable}. + */ + public static final String CREDENTIALS_PROVIDER_CONFIG_PREFIX = + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX; public static final String CREDENTIALS_PROVIDER_CLASS_CONFIG = JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG; public static final Class CREDENTIALS_PROVIDER_CLASS_DEFAULT = @@ -306,15 +312,6 @@ public enum DateTimezone { public static final String CREDENTIALS_PROVIDER_CLASS_DOC = JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_DOC; - /** - * The properties that begin with this prefix will be used to configure a class, specified by - * {@code jdbc.credentials.provider.class} if it implements {@link Configurable}. - */ - public static final String CREDENTIALS_PROVIDER_CONFIG_PREFIX = - CREDENTIALS_PROVIDER_CLASS_CONFIG.substring( - 0, - CREDENTIALS_PROVIDER_CLASS_CONFIG.lastIndexOf(".") + 1 - ); public static final ConfigDef CONFIG_DEF = new ConfigDef() // Connection diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index 956556ae9..8a9939b29 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -330,7 +330,14 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { "Number of times to retry SQL exceptions encountered when executing queries."; public static final String QUERY_RETRIES_DISPLAY = "Query Retry Attempts"; - public static final String CREDENTIALS_PROVIDER_CLASS_CONFIG = "jdbc.credentials.provider.class"; + /** + * The properties that begin with this prefix will be used to configure a class, specified by + * {@code jdbc.credentials.provider.class} if it implements {@link Configurable}. + */ + public static final String CREDENTIALS_PROVIDER_CONFIG_PREFIX = "jdbc.credentials.provider."; + + public static final String CREDENTIALS_PROVIDER_CLASS_CONFIG = CREDENTIALS_PROVIDER_CONFIG_PREFIX + + "class"; public static final Class CREDENTIALS_PROVIDER_CLASS_DEFAULT = DefaultJdbcCredentialsProvider.class; @@ -340,16 +347,6 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { + "chain to use for authentication to database. By default the connector uses ``" + DefaultJdbcCredentialsProvider.class.getName() + "``."; - /** - * The properties that begin with this prefix will be used to configure a class, specified by - * {@code jdbc.credentials.provider.class} if it implements {@link Configurable}. - */ - public static final String CREDENTIALS_PROVIDER_CONFIG_PREFIX = - CREDENTIALS_PROVIDER_CLASS_CONFIG.substring( - 0, - CREDENTIALS_PROVIDER_CLASS_CONFIG.lastIndexOf(".") + 1 - ); - private static final EnumRecommender QUOTE_METHOD_RECOMMENDER = EnumRecommender.in(QuoteMethod.values()); diff --git a/src/test/java/io/confluent/connect/jdbc/util/TestRefreshJdbcCredentialsProvider.java b/src/test/java/io/confluent/connect/jdbc/util/TestRefreshJdbcCredentialsProvider.java index 4d01148e6..c0b868d48 100644 --- a/src/test/java/io/confluent/connect/jdbc/util/TestRefreshJdbcCredentialsProvider.java +++ b/src/test/java/io/confluent/connect/jdbc/util/TestRefreshJdbcCredentialsProvider.java @@ -7,7 +7,6 @@ */ public class TestRefreshJdbcCredentialsProvider implements JdbcCredentialsProvider { - String username = "test-user"; String password; private int numRotations; @@ -27,5 +26,4 @@ public void refresh() { password = "test-password-" + numRotations; numRotations++; } - } From bf981ee7cd0302cac33dfa968a06f14505459dab Mon Sep 17 00:00:00 2001 From: Aniket Shrimal Date: Thu, 26 Sep 2024 02:20:40 +0530 Subject: [PATCH 6/6] Address review comments --- .../jdbc/dialect/GenericDatabaseDialect.java | 58 +++++++++++- .../connect/jdbc/sink/JdbcSinkConfig.java | 33 ------- .../source/JdbcSourceConnectorConfig.java | 35 ------- .../dialect/GenericDatabaseDialectTest.java | 91 ++++++++++++++++++- .../connect/jdbc/sink/JdbcSinkConfigTest.java | 81 ----------------- 5 files changed, 143 insertions(+), 155 deletions(-) diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java index f47408dbd..f393413a5 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -15,13 +15,13 @@ package io.confluent.connect.jdbc.dialect; -import io.confluent.connect.jdbc.util.JdbcCredentials; -import io.confluent.connect.jdbc.util.JdbcCredentialsProvider; import java.time.ZoneOffset; import java.util.TimeZone; +import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -90,8 +90,11 @@ import io.confluent.connect.jdbc.util.ExpressionBuilder; import io.confluent.connect.jdbc.util.ExpressionBuilder.Transform; import io.confluent.connect.jdbc.util.IdentifierRules; +import io.confluent.connect.jdbc.util.JdbcCredentials; +import io.confluent.connect.jdbc.util.JdbcCredentialsProvider; import io.confluent.connect.jdbc.util.JdbcDriverInfo; import io.confluent.connect.jdbc.util.QuoteMethod; +import io.confluent.connect.jdbc.util.StringUtils; import io.confluent.connect.jdbc.util.TableDefinition; import io.confluent.connect.jdbc.util.TableId; import io.confluent.connect.jdbc.util.TableType; @@ -115,7 +118,7 @@ public class GenericDatabaseDialect implements DatabaseDialect { private static final String PRECISION_FIELD = "connect.decimal.precision"; - private JdbcCredentialsProvider jdbcCredentialsProvider; + private final JdbcCredentialsProvider jdbcCredentialsProvider; /** * The provider for {@link GenericDatabaseDialect}. @@ -186,6 +189,8 @@ protected GenericDatabaseDialect( this.defaultIdentifierRules = defaultIdentifierRules; this.jdbcUrl = config.getString(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG); this.jdbcUrlInfo = DatabaseDialects.extractJdbcUrlInfo(jdbcUrl); + this.jdbcCredentialsProvider = getJdbcCredentialsProvider(config); + if (config instanceof JdbcSinkConfig) { JdbcSinkConfig sinkConfig = (JdbcSinkConfig) config; catalogPattern = JdbcSourceTaskConfig.CATALOG_PATTERN_DEFAULT; @@ -194,7 +199,6 @@ protected GenericDatabaseDialect( quoteSqlIdentifiers = QuoteMethod.get( config.getString(JdbcSinkConfig.QUOTE_SQL_IDENTIFIERS_CONFIG) ); - jdbcCredentialsProvider = sinkConfig.credentialsProvider(); } else { catalogPattern = config.getString(JdbcSourceTaskConfig.CATALOG_PATTERN_CONFIG); schemaPattern = config.getString(JdbcSourceTaskConfig.SCHEMA_PATTERN_CONFIG); @@ -206,7 +210,6 @@ protected GenericDatabaseDialect( if (config instanceof JdbcSourceConnectorConfig) { mapNumerics = ((JdbcSourceConnectorConfig)config).numericMapping(); batchMaxRows = config.getInt(JdbcSourceConnectorConfig.BATCH_MAX_ROWS_CONFIG); - jdbcCredentialsProvider = ((JdbcSourceConnectorConfig) config).credentialsProvider(); } else { mapNumerics = NumericMapping.NONE; batchMaxRows = 0; @@ -1976,6 +1979,51 @@ protected String sanitizedUrl(String url) { return url.replaceAll("(?i)([?&]([^=&]*)password([^=&]*)=)[^&]*", "$1****"); } + /** + * This is a common method to get an instance of configured JdbcCredentialsProvider class + * @param config Source or sink connector config + * @return an instance of configured JdbcCredentialsProvider class + */ + @SuppressWarnings("unchecked") + protected JdbcCredentialsProvider getJdbcCredentialsProvider(AbstractConfig config) { + // All the config key variables referred in this method are same in both source and sink + // connector. Using source connector config keys here but method should work for sink + // connector as well. + String username = config.getString(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG); + Password dbPassword = config.getPassword(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG); + + try { + JdbcCredentialsProvider provider = ((Class) + config.getClass(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG) + ).newInstance(); + + if (provider instanceof Configurable) { + Map configs = config.originalsWithPrefix( + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX + ); + configs.remove( + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG.substring( + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX.length() + ) + ); + + if (StringUtils.isNotBlank(username)) { + configs.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, username); + } + if (dbPassword != null && StringUtils.isNotBlank(dbPassword.value())) { + configs.put(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, dbPassword.value()); + } + + ((Configurable) provider).configure(configs); + } + + return provider; + } catch (ClassCastException | IllegalAccessException | InstantiationException e) { + throw new ConnectException( + "Invalid class for: " + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, e); + } + } + @Override public String identifier() { return name() + " database " + sanitizedUrl(jdbcUrl); diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java index a4cce58e4..fa883f494 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java @@ -49,7 +49,6 @@ import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.connect.errors.ConnectException; public class JdbcSinkConfig extends AbstractConfig { @@ -647,38 +646,6 @@ public JdbcSinkConfig(Map props) { tableTypes = TableType.parse(getList(TABLE_TYPES_CONFIG)); } - @SuppressWarnings("unchecked") - public JdbcCredentialsProvider credentialsProvider() { - String username = getString(JdbcSinkConfig.CONNECTION_USER); - Password dbPassword = getPassword(JdbcSinkConfig.CONNECTION_PASSWORD); - - try { - JdbcCredentialsProvider provider = ((Class) getClass( - JdbcSinkConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance(); - - if (provider instanceof Configurable) { - Map configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX); - configs.remove( - CREDENTIALS_PROVIDER_CLASS_CONFIG.substring(CREDENTIALS_PROVIDER_CONFIG_PREFIX.length()) - ); - - if (StringUtils.isNotBlank(username)) { - configs.put(JdbcSinkConfig.CONNECTION_USER, username); - } - if (dbPassword != null && StringUtils.isNotBlank(dbPassword.value())) { - configs.put(JdbcSinkConfig.CONNECTION_PASSWORD, dbPassword.value()); - } - - ((Configurable) provider).configure(configs); - } - - return provider; - } catch (ClassCastException | IllegalAccessException | InstantiationException e) { - throw new ConnectException( - "Invalid class for: " + JdbcSinkConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, e); - } - } - private String getPasswordValue(String key) { Password password = getPassword(key); if (password != null) { diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index 8a9939b29..7bee77fc8 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -15,7 +15,6 @@ package io.confluent.connect.jdbc.source; -import io.confluent.connect.jdbc.util.StringUtils; import java.sql.Connection; import java.sql.Timestamp; import java.time.ZoneId; @@ -55,7 +54,6 @@ import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; -import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; @@ -832,39 +830,6 @@ public String topicPrefix() { return getString(JdbcSourceTaskConfig.TOPIC_PREFIX_CONFIG).trim(); } - - @SuppressWarnings("unchecked") - public JdbcCredentialsProvider credentialsProvider() { - String username = getString(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG); - Password dbPassword = getPassword(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG); - - try { - JdbcCredentialsProvider provider = ((Class) getClass( - JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance(); - - if (provider instanceof Configurable) { - Map configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX); - configs.remove( - CREDENTIALS_PROVIDER_CLASS_CONFIG.substring(CREDENTIALS_PROVIDER_CONFIG_PREFIX.length()) - ); - - if (StringUtils.isNotBlank(username)) { - configs.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, username); - } - if (dbPassword != null && StringUtils.isNotBlank(dbPassword.value())) { - configs.put(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, dbPassword.value()); - } - - ((Configurable) provider).configure(configs); - } - - return provider; - } catch (ClassCastException | IllegalAccessException | InstantiationException e) { - throw new ConnectException( - "Invalid class for: " + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, e); - } - } - /** * A recommender that caches values returned by a delegate, where the cache remains valid for a * specified duration and as long as the configuration remains unchanged. diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java index 353b506fa..a8d01cac1 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java @@ -16,7 +16,6 @@ package io.confluent.connect.jdbc.dialect; import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -59,6 +58,12 @@ import io.confluent.connect.jdbc.util.TableDefinition; import io.confluent.connect.jdbc.util.TableId; import io.confluent.connect.jdbc.util.TableType; +import io.confluent.connect.jdbc.util.DefaultJdbcCredentialsProvider; +import io.confluent.connect.jdbc.util.JdbcCredentials; +import io.confluent.connect.jdbc.util.JdbcCredentialsProvider; +import io.confluent.connect.jdbc.util.TestConfigurableJdbcCredentialsProvider; +import io.confluent.connect.jdbc.util.TestRefreshJdbcCredentialsProvider; + import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; @@ -649,4 +654,88 @@ public void shouldAddExtraProperties() { assertFalse(modified.containsKey("foo2")); assertFalse(modified.containsKey("connection.foo2")); } + + @Test + public void shouldCreateDialectWithValidCredentialsProviderClass() { + connProps.put(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + DefaultJdbcCredentialsProvider.class.getName()); + config = new JdbcSourceConnectorConfig(connProps); + dialect = createDialect(config); + + + JdbcCredentialsProvider provider = dialect.getJdbcCredentialsProvider(config); + assertNotNull(provider); + assertTrue(provider instanceof DefaultJdbcCredentialsProvider); + } + + @Test + public void testConfigurableCredentialsProviderClass() { + // Test username and password value + String username = "test_user"; + String password = "test_password"; + + connProps.put(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + TestConfigurableJdbcCredentialsProvider.class.getName()); + + // Adding custom config with prefix - jdbc.credentials.provider. to verify Configurable + // functionality + connProps.put(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX + "username", username); + connProps.put(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX + "password", password); + config = new JdbcSourceConnectorConfig(connProps); + dialect = createDialect(config); + + JdbcCredentialsProvider provider = dialect.getJdbcCredentialsProvider(config); + + assertNotNull(provider); + assertTrue(provider instanceof TestConfigurableJdbcCredentialsProvider); + + // Assert Username and password are returned from config provider instance correctly + JdbcCredentials basicJdbcCredentials = provider.getJdbcCredentials(); + assertEquals(username, basicJdbcCredentials.getUsername()); + assertEquals(password, basicJdbcCredentials.getPassword()); + } + + @Test + public void testRefreshFunctionalityOfCredentialsProviderClass() { + + connProps.put(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + TestRefreshJdbcCredentialsProvider.class.getName()); + + config = new JdbcSourceConnectorConfig(connProps); + dialect = createDialect(config); + + JdbcCredentialsProvider provider = dialect.getJdbcCredentialsProvider(config); + + assertNotNull(provider); + assertTrue(provider instanceof TestRefreshJdbcCredentialsProvider); + + // Assert Username and password are returned from config provider instance correctly. Also + // password field is rotated everytime password is fetched. + for (int i = 0; i < 5; i++) { + JdbcCredentials basicJdbcCredentials = provider.getJdbcCredentials(); + assertEquals("test-user", basicJdbcCredentials.getUsername()); + assertEquals("test-password-" + i, basicJdbcCredentials.getPassword()); + } + } + + @Test + public void testDefaultBehaviorWhenConnectionConfigsArePresent() { + // Test username and password value + String username = "test_user"; + String password = "test_password"; + + connProps.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG , username); + connProps.put(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, password); + config = new JdbcSourceConnectorConfig(connProps); + dialect = createDialect(config); + + JdbcCredentialsProvider provider = dialect.getJdbcCredentialsProvider(config); + assertTrue(provider instanceof DefaultJdbcCredentialsProvider); + + // Assert username and password are updated in provider class instance according to + // connection.user and connection.password config values. + JdbcCredentials basicJdbcCredentials = provider.getJdbcCredentials(); + assertEquals(username, basicJdbcCredentials.getUsername()); + assertEquals(password, basicJdbcCredentials.getPassword()); + } } \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java b/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java index 7362b317c..822e76bfb 100644 --- a/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java +++ b/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java @@ -19,11 +19,6 @@ import java.util.HashMap; import java.util.Map; -import io.confluent.connect.jdbc.util.DefaultJdbcCredentialsProvider; -import io.confluent.connect.jdbc.util.JdbcCredentialsProvider; -import io.confluent.connect.jdbc.util.JdbcCredentials; -import io.confluent.connect.jdbc.util.TestConfigurableJdbcCredentialsProvider; -import io.confluent.connect.jdbc.util.TestRefreshJdbcCredentialsProvider; import io.confluent.connect.jdbc.util.TableType; import org.apache.kafka.common.config.ConfigException; @@ -32,8 +27,6 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; public class JdbcSinkConfigTest { @@ -140,16 +133,6 @@ public void shouldCreateConfigWithTrailingWhitespaceInTableTypes() { assertTableTypes(TableType.TABLE); } - @Test - public void shouldCreateConfigWithValidCredentialsProviderClass() { - props.put(JdbcSinkConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, - DefaultJdbcCredentialsProvider.class.getName()); - createConfig(); - JdbcCredentialsProvider provider = config.credentialsProvider(); - assertNotNull(provider); - assertTrue(provider instanceof DefaultJdbcCredentialsProvider); - } - @Test(expected = ConfigException.class) public void shouldFailToCreateConfigWithInvalidCredentialsProviderClass() { // Configuring SqliteHelper Class here which does not extends JdbcCredentialsProvider Interface @@ -158,70 +141,6 @@ public void shouldFailToCreateConfigWithInvalidCredentialsProviderClass() { createConfig(); } - @Test - public void testConfigurableCredentialsProviderClass() { - // Test username and password value - String username = "test_user"; - String password = "test_password"; - - props.put(JdbcSinkConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, - TestConfigurableJdbcCredentialsProvider.class.getName()); - - // Adding custom config with prefix - jdbc.credentials.provider. to verify Configurable - // functionality - props.put(JdbcSinkConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX + "username", username); - props.put(JdbcSinkConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX + "password", password); - - createConfig(); - JdbcCredentialsProvider provider = config.credentialsProvider(); - assertNotNull(provider); - assertTrue(provider instanceof TestConfigurableJdbcCredentialsProvider); - - // Assert Username and password are returned from config provider instance correctly - assertEquals(username, provider.getJdbcCredentials().getUsername()); - assertEquals(password, provider.getJdbcCredentials().getPassword()); - } - - @Test - public void testRefreshFunctionalityOfCredentialsProviderClass() { - - props.put(JdbcSinkConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, - TestRefreshJdbcCredentialsProvider.class.getName()); - - createConfig(); - JdbcCredentialsProvider provider = config.credentialsProvider(); - assertNotNull(provider); - assertTrue(provider instanceof TestRefreshJdbcCredentialsProvider); - - // Assert Username and password are returned from config provider instance correctly. Also - // password field is rotated everytime password is fetched. - for (int i = 0; i < 5; i++) { - JdbcCredentials basicJdbcCredentials = provider.getJdbcCredentials(); - assertEquals("test-user", basicJdbcCredentials.getUsername()); - assertEquals("test-password-" + i, basicJdbcCredentials.getPassword()); - } - } - - @Test - public void testDefaultBehaviorWhenConnectionConfigsArePresent() { - // Test username and password value - String username = "test_user"; - String password = "test_password"; - - props.put(JdbcSinkConfig.CONNECTION_USER , username); - props.put(JdbcSinkConfig.CONNECTION_PASSWORD, password); - - createConfig(); - JdbcCredentialsProvider provider = config.credentialsProvider(); - assertNotNull(provider); - assertTrue(provider instanceof DefaultJdbcCredentialsProvider); - - // Assert username and password are updated in provider class instance according to - // connection.user and connection.password config values. - assertEquals(username, provider.getJdbcCredentials().getUsername()); - assertEquals(password, provider.getJdbcCredentials().getPassword()); - } - protected void createConfig() { config = new JdbcSinkConfig(props); }