diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java index d50a08c19..f393413a5 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -18,6 +18,7 @@ import java.time.ZoneOffset; import java.util.TimeZone; +import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.types.Password; @@ -89,8 +90,11 @@ import io.confluent.connect.jdbc.util.ExpressionBuilder; import io.confluent.connect.jdbc.util.ExpressionBuilder.Transform; import io.confluent.connect.jdbc.util.IdentifierRules; +import io.confluent.connect.jdbc.util.JdbcCredentials; +import io.confluent.connect.jdbc.util.JdbcCredentialsProvider; import io.confluent.connect.jdbc.util.JdbcDriverInfo; import io.confluent.connect.jdbc.util.QuoteMethod; +import io.confluent.connect.jdbc.util.StringUtils; import io.confluent.connect.jdbc.util.TableDefinition; import io.confluent.connect.jdbc.util.TableId; import io.confluent.connect.jdbc.util.TableType; @@ -114,6 +118,8 @@ public class GenericDatabaseDialect implements DatabaseDialect { private static final String PRECISION_FIELD = "connect.decimal.precision"; + private final JdbcCredentialsProvider jdbcCredentialsProvider; + /** * The provider for {@link GenericDatabaseDialect}. */ @@ -183,6 +189,8 @@ protected GenericDatabaseDialect( this.defaultIdentifierRules = defaultIdentifierRules; this.jdbcUrl = config.getString(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG); this.jdbcUrlInfo = DatabaseDialects.extractJdbcUrlInfo(jdbcUrl); + this.jdbcCredentialsProvider = getJdbcCredentialsProvider(config); + if (config instanceof JdbcSinkConfig) { JdbcSinkConfig sinkConfig = (JdbcSinkConfig) config; catalogPattern = JdbcSourceTaskConfig.CATALOG_PATTERN_DEFAULT; @@ -238,15 +246,14 @@ protected TimeZone timeZone() { @Override public Connection getConnection() throws SQLException { - // These config names are the same for both source and sink configs ... - String username = config.getString(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG); - Password dbPassword = config.getPassword(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG); + JdbcCredentials jdbcCredentials = jdbcCredentialsProvider.getJdbcCredentials(); + Properties properties = new Properties(); - if (username != null) { - properties.setProperty("user", username); + if (jdbcCredentials.getUsername() != null) { + properties.setProperty("user", jdbcCredentials.getUsername()); } - if (dbPassword != null) { - properties.setProperty("password", dbPassword.value()); + if (jdbcCredentials.getPassword() != null) { + properties.setProperty("password", jdbcCredentials.getPassword()); } properties = addConnectionProperties(properties); // Timeout is 40 seconds to be as long as possible for customer to have a long connection @@ -1972,6 +1979,51 @@ protected String sanitizedUrl(String url) { return url.replaceAll("(?i)([?&]([^=&]*)password([^=&]*)=)[^&]*", "$1****"); } + /** + * This is a common method to get an instance of configured JdbcCredentialsProvider class + * @param config Source or sink connector config + * @return an instance of configured JdbcCredentialsProvider class + */ + @SuppressWarnings("unchecked") + protected JdbcCredentialsProvider getJdbcCredentialsProvider(AbstractConfig config) { + // All the config key variables referred in this method are same in both source and sink + // connector. Using source connector config keys here but method should work for sink + // connector as well. + String username = config.getString(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG); + Password dbPassword = config.getPassword(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG); + + try { + JdbcCredentialsProvider provider = ((Class) + config.getClass(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG) + ).newInstance(); + + if (provider instanceof Configurable) { + Map configs = config.originalsWithPrefix( + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX + ); + configs.remove( + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG.substring( + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX.length() + ) + ); + + if (StringUtils.isNotBlank(username)) { + configs.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, username); + } + if (dbPassword != null && StringUtils.isNotBlank(dbPassword.value())) { + configs.put(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, dbPassword.value()); + } + + ((Configurable) provider).configure(configs); + } + + return provider; + } catch (ClassCastException | IllegalAccessException | InstantiationException e) { + throw new ConnectException( + "Invalid class for: " + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, e); + } + } + @Override public String identifier() { return name() + " database " + sanitizedUrl(jdbcUrl); diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java index b6d7586e0..fa883f494 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java @@ -34,13 +34,19 @@ import io.confluent.connect.jdbc.util.DatabaseDialectRecommender; import io.confluent.connect.jdbc.util.DeleteEnabledRecommender; import io.confluent.connect.jdbc.util.EnumRecommender; +import io.confluent.connect.jdbc.util.JdbcCredentialsProvider; +import io.confluent.connect.jdbc.util.JdbcCredentialsProviderValidator; import io.confluent.connect.jdbc.util.PrimaryKeyModeRecommender; import io.confluent.connect.jdbc.util.QuoteMethod; import io.confluent.connect.jdbc.util.StringUtils; import io.confluent.connect.jdbc.util.TableType; import io.confluent.connect.jdbc.util.TimeZoneValidator; +import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.types.Password; @@ -288,6 +294,24 @@ public enum DateTimezone { private static final String MSSQL_USE_MERGE_HOLDLOCK_DISPLAY = "SQL Server - Use HOLDLOCK in MERGE"; + /** + * The properties that begin with this prefix will be used to configure a class, specified by + * {@code jdbc.credentials.provider.class} if it implements {@link Configurable}. + */ + public static final String CREDENTIALS_PROVIDER_CONFIG_PREFIX = + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX; + public static final String CREDENTIALS_PROVIDER_CLASS_CONFIG = + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG; + public static final Class CREDENTIALS_PROVIDER_CLASS_DEFAULT = + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_DEFAULT; + + public static final String CREDENTIALS_PROVIDER_CLASS_DISPLAY = + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_DISPLAY; + + public static final String CREDENTIALS_PROVIDER_CLASS_DOC = + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_DOC; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() // Connection .define( @@ -322,8 +346,18 @@ public enum DateTimezone { 3, ConfigDef.Width.MEDIUM, CONNECTION_PASSWORD_DISPLAY - ) - .define( + ).define( + CREDENTIALS_PROVIDER_CLASS_CONFIG, + Type.CLASS, + CREDENTIALS_PROVIDER_CLASS_DEFAULT, + new JdbcCredentialsProviderValidator(), + Importance.LOW, + CREDENTIALS_PROVIDER_CLASS_DOC, + CONNECTION_GROUP, + 4, + Width.LONG, + CREDENTIALS_PROVIDER_CLASS_DISPLAY + ).define( DIALECT_NAME_CONFIG, ConfigDef.Type.STRING, DIALECT_NAME_DEFAULT, @@ -331,7 +365,7 @@ public enum DateTimezone { ConfigDef.Importance.LOW, DIALECT_NAME_DOC, CONNECTION_GROUP, - 4, + 5, ConfigDef.Width.LONG, DIALECT_NAME_DISPLAY, DatabaseDialectRecommender.INSTANCE @@ -344,7 +378,7 @@ public enum DateTimezone { ConfigDef.Importance.LOW, CONNECTION_ATTEMPTS_DOC, CONNECTION_GROUP, - 5, + 6, ConfigDef.Width.SHORT, CONNECTION_ATTEMPTS_DISPLAY ).define( @@ -354,7 +388,7 @@ public enum DateTimezone { ConfigDef.Importance.LOW, CONNECTION_BACKOFF_DOC, CONNECTION_GROUP, - 6, + 7, ConfigDef.Width.SHORT, CONNECTION_BACKOFF_DISPLAY ) diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index 4ce538aa1..7bee77fc8 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -32,7 +32,10 @@ import io.confluent.connect.jdbc.dialect.DatabaseDialects; import io.confluent.connect.jdbc.util.DatabaseDialectRecommender; import io.confluent.connect.jdbc.util.DateTimeUtils; +import io.confluent.connect.jdbc.util.DefaultJdbcCredentialsProvider; import io.confluent.connect.jdbc.util.EnumRecommender; +import io.confluent.connect.jdbc.util.JdbcCredentialsProvider; +import io.confluent.connect.jdbc.util.JdbcCredentialsProviderValidator; import io.confluent.connect.jdbc.util.QuoteMethod; import io.confluent.connect.jdbc.util.TimeZoneValidator; @@ -40,6 +43,7 @@ import java.util.function.Function; import java.util.regex.Pattern; +import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; @@ -324,6 +328,23 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { "Number of times to retry SQL exceptions encountered when executing queries."; public static final String QUERY_RETRIES_DISPLAY = "Query Retry Attempts"; + /** + * The properties that begin with this prefix will be used to configure a class, specified by + * {@code jdbc.credentials.provider.class} if it implements {@link Configurable}. + */ + public static final String CREDENTIALS_PROVIDER_CONFIG_PREFIX = "jdbc.credentials.provider."; + + public static final String CREDENTIALS_PROVIDER_CLASS_CONFIG = CREDENTIALS_PROVIDER_CONFIG_PREFIX + + "class"; + public static final Class CREDENTIALS_PROVIDER_CLASS_DEFAULT = + DefaultJdbcCredentialsProvider.class; + + public static final String CREDENTIALS_PROVIDER_CLASS_DISPLAY = "JDBC Credentials Provider Class"; + + public static final String CREDENTIALS_PROVIDER_CLASS_DOC = "Credentials provider or provider " + + "chain to use for authentication to database. By default the connector uses ``" + + DefaultJdbcCredentialsProvider.class.getName() + "``."; + private static final EnumRecommender QUOTE_METHOD_RECOMMENDER = EnumRecommender.in(QuoteMethod.values()); @@ -455,6 +476,17 @@ private static final void addDatabaseOptions(ConfigDef config) { ++orderInGroup, Width.SHORT, CONNECTION_PASSWORD_DISPLAY + ).define( + CREDENTIALS_PROVIDER_CLASS_CONFIG, + Type.CLASS, + CREDENTIALS_PROVIDER_CLASS_DEFAULT, + new JdbcCredentialsProviderValidator(), + Importance.LOW, + CREDENTIALS_PROVIDER_CLASS_DOC, + DATABASE_GROUP, + ++orderInGroup, + Width.LONG, + CREDENTIALS_PROVIDER_CLASS_DISPLAY ).define( CONNECTION_ATTEMPTS_CONFIG, Type.INT, diff --git a/src/main/java/io/confluent/connect/jdbc/util/BasicJdbcCredentials.java b/src/main/java/io/confluent/connect/jdbc/util/BasicJdbcCredentials.java new file mode 100644 index 000000000..f99e8c9d5 --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/util/BasicJdbcCredentials.java @@ -0,0 +1,37 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc.util; + +public class BasicJdbcCredentials implements JdbcCredentials { + + String username; + String password; + + public BasicJdbcCredentials(String user, String password) { + this.username = user; + this.password = password; + } + + @Override + public String getUsername() { + return username; + } + + @Override + public String getPassword() { + return password; + } +} diff --git a/src/main/java/io/confluent/connect/jdbc/util/DefaultJdbcCredentialsProvider.java b/src/main/java/io/confluent/connect/jdbc/util/DefaultJdbcCredentialsProvider.java new file mode 100644 index 000000000..8ff4c079d --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/util/DefaultJdbcCredentialsProvider.java @@ -0,0 +1,38 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc.util; + +import java.util.Map; +import org.apache.kafka.common.Configurable; + +public class DefaultJdbcCredentialsProvider implements JdbcCredentialsProvider, Configurable { + + private static final String DB_USERNAME_CONFIG = "connection.user"; + private static final String DB_PASSWORD_CONFIG = "connection.password"; + String username; + String password; + + @Override + public JdbcCredentials getJdbcCredentials() { + return new BasicJdbcCredentials(username, password); + } + + @Override + public void configure(Map map) { + username = (String) map.get(DB_USERNAME_CONFIG); + password = (String) map.get(DB_PASSWORD_CONFIG); + } +} diff --git a/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentials.java b/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentials.java new file mode 100644 index 000000000..9e201b7ec --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentials.java @@ -0,0 +1,31 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc.util; + +public interface JdbcCredentials { + + /** + * @return Username to use for authentication to database + */ + String getUsername(); + + /** + * + * @return Password/Token to use for authentication to database + */ + String getPassword(); + +} diff --git a/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentialsProvider.java b/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentialsProvider.java new file mode 100644 index 000000000..d27a03e07 --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentialsProvider.java @@ -0,0 +1,32 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc.util; + +public interface JdbcCredentialsProvider { + + /** + * @return JdbcCredentials which caller can use for authentication purpose + */ + JdbcCredentials getJdbcCredentials(); + + /** + * Instructs the provider to refresh or renew credentials. + * Default behavior is no-op. + */ + default void refresh() { + // no need to refresh anything by default + } +} diff --git a/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentialsProviderValidator.java b/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentialsProviderValidator.java new file mode 100644 index 000000000..f05dc1ea9 --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/util/JdbcCredentialsProviderValidator.java @@ -0,0 +1,40 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc.util; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +public class JdbcCredentialsProviderValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object provider) { + if (provider != null && provider instanceof Class + && JdbcCredentialsProvider.class.isAssignableFrom((Class) provider)) { + return; + } + throw new ConfigException( + name, + provider, + "Class must extend: " + JdbcCredentialsProvider.class + ); + } + + @Override + public String toString() { + return "Any class implementing: " + JdbcCredentialsProvider.class; + } +} + diff --git a/src/main/java/io/confluent/connect/jdbc/util/StringUtils.java b/src/main/java/io/confluent/connect/jdbc/util/StringUtils.java index a24382860..200d5bc5a 100644 --- a/src/main/java/io/confluent/connect/jdbc/util/StringUtils.java +++ b/src/main/java/io/confluent/connect/jdbc/util/StringUtils.java @@ -15,6 +15,8 @@ package io.confluent.connect.jdbc.util; +import static org.apache.kafka.common.utils.Utils.isBlank; + import org.apache.kafka.connect.data.Schema; /** @@ -72,4 +74,8 @@ public static String schemaTypeOrNull(Schema schema) { return schema.type().getName(); } } + + public static boolean isNotBlank(String string) { + return !isBlank(string); + } } diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java index 353b506fa..a8d01cac1 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java @@ -16,7 +16,6 @@ package io.confluent.connect.jdbc.dialect; import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -59,6 +58,12 @@ import io.confluent.connect.jdbc.util.TableDefinition; import io.confluent.connect.jdbc.util.TableId; import io.confluent.connect.jdbc.util.TableType; +import io.confluent.connect.jdbc.util.DefaultJdbcCredentialsProvider; +import io.confluent.connect.jdbc.util.JdbcCredentials; +import io.confluent.connect.jdbc.util.JdbcCredentialsProvider; +import io.confluent.connect.jdbc.util.TestConfigurableJdbcCredentialsProvider; +import io.confluent.connect.jdbc.util.TestRefreshJdbcCredentialsProvider; + import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; @@ -649,4 +654,88 @@ public void shouldAddExtraProperties() { assertFalse(modified.containsKey("foo2")); assertFalse(modified.containsKey("connection.foo2")); } + + @Test + public void shouldCreateDialectWithValidCredentialsProviderClass() { + connProps.put(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + DefaultJdbcCredentialsProvider.class.getName()); + config = new JdbcSourceConnectorConfig(connProps); + dialect = createDialect(config); + + + JdbcCredentialsProvider provider = dialect.getJdbcCredentialsProvider(config); + assertNotNull(provider); + assertTrue(provider instanceof DefaultJdbcCredentialsProvider); + } + + @Test + public void testConfigurableCredentialsProviderClass() { + // Test username and password value + String username = "test_user"; + String password = "test_password"; + + connProps.put(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + TestConfigurableJdbcCredentialsProvider.class.getName()); + + // Adding custom config with prefix - jdbc.credentials.provider. to verify Configurable + // functionality + connProps.put(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX + "username", username); + connProps.put(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX + "password", password); + config = new JdbcSourceConnectorConfig(connProps); + dialect = createDialect(config); + + JdbcCredentialsProvider provider = dialect.getJdbcCredentialsProvider(config); + + assertNotNull(provider); + assertTrue(provider instanceof TestConfigurableJdbcCredentialsProvider); + + // Assert Username and password are returned from config provider instance correctly + JdbcCredentials basicJdbcCredentials = provider.getJdbcCredentials(); + assertEquals(username, basicJdbcCredentials.getUsername()); + assertEquals(password, basicJdbcCredentials.getPassword()); + } + + @Test + public void testRefreshFunctionalityOfCredentialsProviderClass() { + + connProps.put(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + TestRefreshJdbcCredentialsProvider.class.getName()); + + config = new JdbcSourceConnectorConfig(connProps); + dialect = createDialect(config); + + JdbcCredentialsProvider provider = dialect.getJdbcCredentialsProvider(config); + + assertNotNull(provider); + assertTrue(provider instanceof TestRefreshJdbcCredentialsProvider); + + // Assert Username and password are returned from config provider instance correctly. Also + // password field is rotated everytime password is fetched. + for (int i = 0; i < 5; i++) { + JdbcCredentials basicJdbcCredentials = provider.getJdbcCredentials(); + assertEquals("test-user", basicJdbcCredentials.getUsername()); + assertEquals("test-password-" + i, basicJdbcCredentials.getPassword()); + } + } + + @Test + public void testDefaultBehaviorWhenConnectionConfigsArePresent() { + // Test username and password value + String username = "test_user"; + String password = "test_password"; + + connProps.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG , username); + connProps.put(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, password); + config = new JdbcSourceConnectorConfig(connProps); + dialect = createDialect(config); + + JdbcCredentialsProvider provider = dialect.getJdbcCredentialsProvider(config); + assertTrue(provider instanceof DefaultJdbcCredentialsProvider); + + // Assert username and password are updated in provider class instance according to + // connection.user and connection.password config values. + JdbcCredentials basicJdbcCredentials = provider.getJdbcCredentials(); + assertEquals(username, basicJdbcCredentials.getUsername()); + assertEquals(password, basicJdbcCredentials.getPassword()); + } } \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java b/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java index 0775dd145..822e76bfb 100644 --- a/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java +++ b/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkConfigTest.java @@ -133,6 +133,14 @@ public void shouldCreateConfigWithTrailingWhitespaceInTableTypes() { assertTableTypes(TableType.TABLE); } + @Test(expected = ConfigException.class) + public void shouldFailToCreateConfigWithInvalidCredentialsProviderClass() { + // Configuring SqliteHelper Class here which does not extends JdbcCredentialsProvider Interface + props.put(JdbcSinkConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + SqliteHelper.class.getName()); + createConfig(); + } + protected void createConfig() { config = new JdbcSinkConfig(props); } diff --git a/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java b/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java index 7059cafcc..50873a9e5 100644 --- a/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java +++ b/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java @@ -14,6 +14,7 @@ */ package io.confluent.connect.jdbc.source; +import io.confluent.connect.jdbc.util.DefaultJdbcCredentialsProvider; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Recommender; import org.apache.kafka.common.config.ConfigValue; @@ -253,6 +254,31 @@ public void testTooLongTopicPrefix() { assertFalse(connectionAttemptsConfig.errorMessages().isEmpty()); } + @Test + public void testCredentialsProviderClassConfig() { + // Configuring MockTime Class here which does not extends JdbcCredentialsProvider Interface + props.put(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + MockTime.class.getName()); + Map validatedConfig = + JdbcSourceConnectorConfig.baseConfigDef().validateAll(props); + ConfigValue credentialsProviderConfig = + validatedConfig.get(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG); + + assertNotNull(credentialsProviderConfig); + assertFalse(credentialsProviderConfig.errorMessages().isEmpty()); + + // Configuring DefaultJdbcCredentialsProvider Class here which extends JdbcCredentialsProvider + // Interface + props.put(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + DefaultJdbcCredentialsProvider.class.getName()); + validatedConfig = + JdbcSourceConnectorConfig.baseConfigDef().validateAll(props); + credentialsProviderConfig = + validatedConfig.get(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG); + assertNotNull(credentialsProviderConfig); + assertTrue(credentialsProviderConfig.errorMessages().isEmpty()); + } + @SuppressWarnings("unchecked") protected void assertContains(Collection actual, T... expected) { for (T e : expected) { diff --git a/src/test/java/io/confluent/connect/jdbc/util/DefaultJdbcCredentialsProviderTest.java b/src/test/java/io/confluent/connect/jdbc/util/DefaultJdbcCredentialsProviderTest.java new file mode 100644 index 000000000..3355ff5af --- /dev/null +++ b/src/test/java/io/confluent/connect/jdbc/util/DefaultJdbcCredentialsProviderTest.java @@ -0,0 +1,46 @@ +package io.confluent.connect.jdbc.util; + +import static org.junit.Assert.assertEquals; + +import java.util.HashMap; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; + +public class DefaultJdbcCredentialsProviderTest { + + DefaultJdbcCredentialsProvider defaultCredentialsProvider; + String testUsername = "username"; + String testPassword = "password"; + + @Before + public void setup() { + Map configMap = new HashMap<>(); + configMap.put("connection.user", testUsername); + configMap.put("connection.password", testPassword); + + defaultCredentialsProvider = new DefaultJdbcCredentialsProvider(); + defaultCredentialsProvider.configure(configMap); + + } + @Test + public void testDatabaseCredsAreConfigurable() { + // Assert username and password are configured to test values which is configured in setup method + assertEquals(testUsername, defaultCredentialsProvider.getJdbcCredentials().getUsername()); + assertEquals(testPassword, defaultCredentialsProvider.getJdbcCredentials().getPassword()); + } + + // Test Default Refresh Functionality (which is NoOp) + @Test + public void testDefaultRefreshFunctionality() { + + assertEquals(testUsername, defaultCredentialsProvider.getJdbcCredentials().getUsername()); + assertEquals(testPassword, defaultCredentialsProvider.getJdbcCredentials().getPassword()); + + defaultCredentialsProvider.refresh(); + + // Assert username and password are same + assertEquals(testUsername, defaultCredentialsProvider.getJdbcCredentials().getUsername()); + assertEquals(testPassword, defaultCredentialsProvider.getJdbcCredentials().getPassword()); + } +} diff --git a/src/test/java/io/confluent/connect/jdbc/util/TestConfigurableJdbcCredentialsProvider.java b/src/test/java/io/confluent/connect/jdbc/util/TestConfigurableJdbcCredentialsProvider.java new file mode 100644 index 000000000..b566e1b2f --- /dev/null +++ b/src/test/java/io/confluent/connect/jdbc/util/TestConfigurableJdbcCredentialsProvider.java @@ -0,0 +1,26 @@ +package io.confluent.connect.jdbc.util; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.common.Configurable; + +/** + * This is a test class for JdbcCredentialsProvider Interface which is created to test the + * configurable functionality + */ +public class TestConfigurableJdbcCredentialsProvider implements JdbcCredentialsProvider, + Configurable { + + Map configMap = new HashMap<>(); + + @Override + public void configure(Map map) { + configMap = new HashMap<>(map); + } + + @Override + public JdbcCredentials getJdbcCredentials() { + return new BasicJdbcCredentials((String) configMap.get("username"), + (String) configMap.get("password")); + } +} diff --git a/src/test/java/io/confluent/connect/jdbc/util/TestRefreshJdbcCredentialsProvider.java b/src/test/java/io/confluent/connect/jdbc/util/TestRefreshJdbcCredentialsProvider.java new file mode 100644 index 000000000..c0b868d48 --- /dev/null +++ b/src/test/java/io/confluent/connect/jdbc/util/TestRefreshJdbcCredentialsProvider.java @@ -0,0 +1,29 @@ +package io.confluent.connect.jdbc.util; + +/** + * This is a test Class for JdbcCredentialsProvider Interface which is created to test the refresh + * functionality. The password is updated everytime credentials are fetched through + * 'getJdbcCredentials()' method. + */ +public class TestRefreshJdbcCredentialsProvider implements JdbcCredentialsProvider { + + String username = "test-user"; + String password; + private int numRotations; + + public TestRefreshJdbcCredentialsProvider() { + numRotations = 0; + } + + @Override + public JdbcCredentials getJdbcCredentials() { + refresh(); + return new BasicJdbcCredentials(username, password); + } + + @Override + public void refresh() { + password = "test-password-" + numRotations; + numRotations++; + } +}