Skip to content

Commit

Permalink
Merge pull request #18 from confluentinc/CC-29070
Browse files Browse the repository at this point in the history
Add support for Custom Credentials Provider Class in JDBC connectors
  • Loading branch information
aniketshrimal authored Sep 26, 2024
2 parents d8b1cfc + bf981ee commit 1897096
Show file tree
Hide file tree
Showing 15 changed files with 539 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.time.ZoneOffset;
import java.util.TimeZone;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.types.Password;
Expand Down Expand Up @@ -89,8 +90,11 @@
import io.confluent.connect.jdbc.util.ExpressionBuilder;
import io.confluent.connect.jdbc.util.ExpressionBuilder.Transform;
import io.confluent.connect.jdbc.util.IdentifierRules;
import io.confluent.connect.jdbc.util.JdbcCredentials;
import io.confluent.connect.jdbc.util.JdbcCredentialsProvider;
import io.confluent.connect.jdbc.util.JdbcDriverInfo;
import io.confluent.connect.jdbc.util.QuoteMethod;
import io.confluent.connect.jdbc.util.StringUtils;
import io.confluent.connect.jdbc.util.TableDefinition;
import io.confluent.connect.jdbc.util.TableId;
import io.confluent.connect.jdbc.util.TableType;
Expand All @@ -114,6 +118,8 @@ public class GenericDatabaseDialect implements DatabaseDialect {

private static final String PRECISION_FIELD = "connect.decimal.precision";

private final JdbcCredentialsProvider jdbcCredentialsProvider;

/**
* The provider for {@link GenericDatabaseDialect}.
*/
Expand Down Expand Up @@ -183,6 +189,8 @@ protected GenericDatabaseDialect(
this.defaultIdentifierRules = defaultIdentifierRules;
this.jdbcUrl = config.getString(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG);
this.jdbcUrlInfo = DatabaseDialects.extractJdbcUrlInfo(jdbcUrl);
this.jdbcCredentialsProvider = getJdbcCredentialsProvider(config);

if (config instanceof JdbcSinkConfig) {
JdbcSinkConfig sinkConfig = (JdbcSinkConfig) config;
catalogPattern = JdbcSourceTaskConfig.CATALOG_PATTERN_DEFAULT;
Expand Down Expand Up @@ -238,15 +246,14 @@ protected TimeZone timeZone() {

@Override
public Connection getConnection() throws SQLException {
// These config names are the same for both source and sink configs ...
String username = config.getString(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG);
Password dbPassword = config.getPassword(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG);
JdbcCredentials jdbcCredentials = jdbcCredentialsProvider.getJdbcCredentials();

Properties properties = new Properties();
if (username != null) {
properties.setProperty("user", username);
if (jdbcCredentials.getUsername() != null) {
properties.setProperty("user", jdbcCredentials.getUsername());
}
if (dbPassword != null) {
properties.setProperty("password", dbPassword.value());
if (jdbcCredentials.getPassword() != null) {
properties.setProperty("password", jdbcCredentials.getPassword());
}
properties = addConnectionProperties(properties);
// Timeout is 40 seconds to be as long as possible for customer to have a long connection
Expand Down Expand Up @@ -1972,6 +1979,51 @@ protected String sanitizedUrl(String url) {
return url.replaceAll("(?i)([?&]([^=&]*)password([^=&]*)=)[^&]*", "$1****");
}

/**
* This is a common method to get an instance of configured JdbcCredentialsProvider class
* @param config Source or sink connector config
* @return an instance of configured JdbcCredentialsProvider class
*/
@SuppressWarnings("unchecked")
protected JdbcCredentialsProvider getJdbcCredentialsProvider(AbstractConfig config) {
// All the config key variables referred in this method are same in both source and sink
// connector. Using source connector config keys here but method should work for sink
// connector as well.
String username = config.getString(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG);
Password dbPassword = config.getPassword(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG);

try {
JdbcCredentialsProvider provider = ((Class<? extends JdbcCredentialsProvider>)
config.getClass(JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)
).newInstance();

if (provider instanceof Configurable) {
Map<String, Object> configs = config.originalsWithPrefix(
JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX
);
configs.remove(
JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG.substring(
JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX.length()
)
);

if (StringUtils.isNotBlank(username)) {
configs.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, username);
}
if (dbPassword != null && StringUtils.isNotBlank(dbPassword.value())) {
configs.put(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, dbPassword.value());
}

((Configurable) provider).configure(configs);
}

return provider;
} catch (ClassCastException | IllegalAccessException | InstantiationException e) {
throw new ConnectException(
"Invalid class for: " + JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, e);
}
}

@Override
public String identifier() {
return name() + " database " + sanitizedUrl(jdbcUrl);
Expand Down
44 changes: 39 additions & 5 deletions src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,19 @@
import io.confluent.connect.jdbc.util.DatabaseDialectRecommender;
import io.confluent.connect.jdbc.util.DeleteEnabledRecommender;
import io.confluent.connect.jdbc.util.EnumRecommender;
import io.confluent.connect.jdbc.util.JdbcCredentialsProvider;
import io.confluent.connect.jdbc.util.JdbcCredentialsProviderValidator;
import io.confluent.connect.jdbc.util.PrimaryKeyModeRecommender;
import io.confluent.connect.jdbc.util.QuoteMethod;
import io.confluent.connect.jdbc.util.StringUtils;
import io.confluent.connect.jdbc.util.TableType;
import io.confluent.connect.jdbc.util.TimeZoneValidator;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.types.Password;

Expand Down Expand Up @@ -288,6 +294,24 @@ public enum DateTimezone {
private static final String MSSQL_USE_MERGE_HOLDLOCK_DISPLAY =
"SQL Server - Use HOLDLOCK in MERGE";

/**
* The properties that begin with this prefix will be used to configure a class, specified by
* {@code jdbc.credentials.provider.class} if it implements {@link Configurable}.
*/
public static final String CREDENTIALS_PROVIDER_CONFIG_PREFIX =
JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX;
public static final String CREDENTIALS_PROVIDER_CLASS_CONFIG =
JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG;
public static final Class<? extends JdbcCredentialsProvider> CREDENTIALS_PROVIDER_CLASS_DEFAULT =
JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_DEFAULT;

public static final String CREDENTIALS_PROVIDER_CLASS_DISPLAY =
JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_DISPLAY;

public static final String CREDENTIALS_PROVIDER_CLASS_DOC =
JdbcSourceConnectorConfig.CREDENTIALS_PROVIDER_CLASS_DOC;


public static final ConfigDef CONFIG_DEF = new ConfigDef()
// Connection
.define(
Expand Down Expand Up @@ -322,16 +346,26 @@ public enum DateTimezone {
3,
ConfigDef.Width.MEDIUM,
CONNECTION_PASSWORD_DISPLAY
)
.define(
).define(
CREDENTIALS_PROVIDER_CLASS_CONFIG,
Type.CLASS,
CREDENTIALS_PROVIDER_CLASS_DEFAULT,
new JdbcCredentialsProviderValidator(),
Importance.LOW,
CREDENTIALS_PROVIDER_CLASS_DOC,
CONNECTION_GROUP,
4,
Width.LONG,
CREDENTIALS_PROVIDER_CLASS_DISPLAY
).define(
DIALECT_NAME_CONFIG,
ConfigDef.Type.STRING,
DIALECT_NAME_DEFAULT,
DatabaseDialectRecommender.INSTANCE,
ConfigDef.Importance.LOW,
DIALECT_NAME_DOC,
CONNECTION_GROUP,
4,
5,
ConfigDef.Width.LONG,
DIALECT_NAME_DISPLAY,
DatabaseDialectRecommender.INSTANCE
Expand All @@ -344,7 +378,7 @@ public enum DateTimezone {
ConfigDef.Importance.LOW,
CONNECTION_ATTEMPTS_DOC,
CONNECTION_GROUP,
5,
6,
ConfigDef.Width.SHORT,
CONNECTION_ATTEMPTS_DISPLAY
).define(
Expand All @@ -354,7 +388,7 @@ public enum DateTimezone {
ConfigDef.Importance.LOW,
CONNECTION_BACKOFF_DOC,
CONNECTION_GROUP,
6,
7,
ConfigDef.Width.SHORT,
CONNECTION_BACKOFF_DISPLAY
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@
import io.confluent.connect.jdbc.dialect.DatabaseDialects;
import io.confluent.connect.jdbc.util.DatabaseDialectRecommender;
import io.confluent.connect.jdbc.util.DateTimeUtils;
import io.confluent.connect.jdbc.util.DefaultJdbcCredentialsProvider;
import io.confluent.connect.jdbc.util.EnumRecommender;
import io.confluent.connect.jdbc.util.JdbcCredentialsProvider;
import io.confluent.connect.jdbc.util.JdbcCredentialsProviderValidator;
import io.confluent.connect.jdbc.util.QuoteMethod;
import io.confluent.connect.jdbc.util.TimeZoneValidator;

import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.regex.Pattern;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
Expand Down Expand Up @@ -324,6 +328,23 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
"Number of times to retry SQL exceptions encountered when executing queries.";
public static final String QUERY_RETRIES_DISPLAY = "Query Retry Attempts";

/**
* The properties that begin with this prefix will be used to configure a class, specified by
* {@code jdbc.credentials.provider.class} if it implements {@link Configurable}.
*/
public static final String CREDENTIALS_PROVIDER_CONFIG_PREFIX = "jdbc.credentials.provider.";

public static final String CREDENTIALS_PROVIDER_CLASS_CONFIG = CREDENTIALS_PROVIDER_CONFIG_PREFIX
+ "class";
public static final Class<? extends JdbcCredentialsProvider> CREDENTIALS_PROVIDER_CLASS_DEFAULT =
DefaultJdbcCredentialsProvider.class;

public static final String CREDENTIALS_PROVIDER_CLASS_DISPLAY = "JDBC Credentials Provider Class";

public static final String CREDENTIALS_PROVIDER_CLASS_DOC = "Credentials provider or provider "
+ "chain to use for authentication to database. By default the connector uses ``"
+ DefaultJdbcCredentialsProvider.class.getName() + "``.";

private static final EnumRecommender QUOTE_METHOD_RECOMMENDER =
EnumRecommender.in(QuoteMethod.values());

Expand Down Expand Up @@ -455,6 +476,17 @@ private static final void addDatabaseOptions(ConfigDef config) {
++orderInGroup,
Width.SHORT,
CONNECTION_PASSWORD_DISPLAY
).define(
CREDENTIALS_PROVIDER_CLASS_CONFIG,
Type.CLASS,
CREDENTIALS_PROVIDER_CLASS_DEFAULT,
new JdbcCredentialsProviderValidator(),
Importance.LOW,
CREDENTIALS_PROVIDER_CLASS_DOC,
DATABASE_GROUP,
++orderInGroup,
Width.LONG,
CREDENTIALS_PROVIDER_CLASS_DISPLAY
).define(
CONNECTION_ATTEMPTS_CONFIG,
Type.INT,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.jdbc.util;

public class BasicJdbcCredentials implements JdbcCredentials {

String username;
String password;

public BasicJdbcCredentials(String user, String password) {
this.username = user;
this.password = password;
}

@Override
public String getUsername() {
return username;
}

@Override
public String getPassword() {
return password;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.jdbc.util;

import java.util.Map;
import org.apache.kafka.common.Configurable;

public class DefaultJdbcCredentialsProvider implements JdbcCredentialsProvider, Configurable {

private static final String DB_USERNAME_CONFIG = "connection.user";
private static final String DB_PASSWORD_CONFIG = "connection.password";
String username;
String password;

@Override
public JdbcCredentials getJdbcCredentials() {
return new BasicJdbcCredentials(username, password);
}

@Override
public void configure(Map<String, ?> map) {
username = (String) map.get(DB_USERNAME_CONFIG);
password = (String) map.get(DB_PASSWORD_CONFIG);
}
}
31 changes: 31 additions & 0 deletions src/main/java/io/confluent/connect/jdbc/util/JdbcCredentials.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.jdbc.util;

public interface JdbcCredentials {

/**
* @return Username to use for authentication to database
*/
String getUsername();

/**
*
* @return Password/Token to use for authentication to database
*/
String getPassword();

}
Loading

0 comments on commit 1897096

Please sign in to comment.