Skip to content

Commit

Permalink
Merge branch '10.7.x' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
ConfluentSemaphore committed Sep 11, 2024
2 parents a336f3b + 6b469e0 commit 74402db
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void start(Map<String, String> properties) throws ConnectException {
config
);
cachedConnectionProvider = connectionProvider(maxConnectionAttempts, connectionRetryBackoff);
log.info("Cached Connection Provider created");

// Initial connection attempt
log.info("Initial connection attempt with the database.");
Expand All @@ -110,6 +111,10 @@ public void start(Map<String, String> properties) throws ConnectException {
String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG);
if (!query.isEmpty()) {
if (whitelistSet != null || blacklistSet != null) {
log.error(
"Configuration error: {} is set, but table whitelist or blacklist is also specified."
+ "These settings cannot be used together.",
JdbcSourceConnectorConfig.QUERY_CONFIG);
throw new ConnectException(JdbcSourceConnectorConfig.QUERY_CONFIG + " may not be combined"
+ " with whole-table copying settings.");
}
Expand All @@ -130,6 +135,7 @@ public void start(Map<String, String> properties) throws ConnectException {
);
if (query.isEmpty()) {
tableMonitorThread.start();
log.info("Starting Table Monitor Thread");
}
}

Expand All @@ -156,13 +162,15 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG);
List<Map<String, String>> taskConfigs;
if (!query.isEmpty()) {
log.info("Custom query provided, generating task configuration for the query");
Map<String, String> taskProps = new HashMap<>(configProperties);
taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, "");
taskProps.put(JdbcSourceTaskConfig.TABLES_FETCHED, "true");
taskConfigs = Collections.singletonList(taskProps);
log.trace("Producing task configs with custom query");
return taskConfigs;
} else {
log.info("No custom query provided, generating task configurations for tables");
List<TableId> currentTables = tableMonitorThread.tables();
if (currentTables == null || currentTables.isEmpty()) {
taskConfigs = new ArrayList<>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ public void setConnectionIsolationMode(
Connection connection,
TransactionIsolationMode transactionIsolationMode
) {
log.info("Setting connection isolation mode to: {}", transactionIsolationMode.name());
if (transactionIsolationMode
== TransactionIsolationMode.DEFAULT) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,21 @@ void write(final Collection<SinkRecord> records)
buffer.flush();
buffer.close();
}
log.trace("Committing transaction");
connection.commit();
} catch (SQLException | TableAlterOrCreateException e) {
log.error("Error during write operation. Attempting rollback.", e);
try {
connection.rollback();
log.info("Successfully rolled back transaction");
} catch (SQLException sqle) {
log.error("Failed to rollback transaction", sqle);
e.addSuppressed(sqle);
} finally {
throw e;
}
}
log.info("Completed write operation for {} records to the database", records.size());
}

void closeQuietly() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void put(Collection<SinkRecord> records) {
);
try {
writer.write(records);
log.info("Successfully wrote {} records.", recordsCount);
} catch (TableAlterOrCreateException tace) {
if (reporter != null) {
unrollAndRetry(records);
Expand Down Expand Up @@ -139,6 +140,7 @@ public void put(Collection<SinkRecord> records) {
private void unrollAndRetry(Collection<SinkRecord> records) {
writer.closeQuietly();
initWriter();
log.warn("Retrying write operation for {} records.", records.size());
for (SinkRecord record : records) {
try {
writer.write(Collections.singletonList(record));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,14 @@ public Config validateMultiConfigs(Config config) {
)
) {
configValues
.get(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG)
.addErrorMessage("Isolation mode of `"
+ TransactionIsolationMode.SQL_SERVER_SNAPSHOT.name()
+ "` can only be configured with a Sql Server Dialect"
);
.get(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG)
.addErrorMessage(
"Isolation mode of `"
+ TransactionIsolationMode.SQL_SERVER_SNAPSHOT.name()
+ "` can only be configured with a Sql Server Dialect");
LOG.warn(
"Isolation mode of '{}' can only be configured with a Sql Server Dialect",
TransactionIsolationMode.SQL_SERVER_SNAPSHOT.name());
}
}

Expand Down Expand Up @@ -854,6 +857,7 @@ public CachedRecommenderValues(
public List<Object> cachedValue(Map<String, Object> config, long currentTimeInMillis) {
if (currentTimeInMillis < expiryTimeInMillis
&& lastConfig != null && lastConfig.equals(config)) {
LOG.trace("Returning Cached values for the given configuration.");
return results;
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ public void start(Map<String, String> properties) {
for (String tableOrQuery : tablesOrQuery) {
final List<Map<String, String>> tablePartitionsToCheck;
final Map<String, String> partition;
log.trace("Task executing in {} mode",queryMode);
switch (queryMode) {
case TABLE:
if (validateNonNulls) {
Expand Down Expand Up @@ -319,6 +320,7 @@ private void validateColumnsExist(
final Connection conn = cachedConnectionProvider.getConnection();
boolean autoCommit = conn.getAutoCommit();
try {
log.info("Validating columns exist for table");
conn.setAutoCommit(true);
Map<ColumnId, ColumnDefinition> defnsById = dialect.describeColumns(conn, table, null);
Set<String> columnNames = defnsById.keySet().stream().map(ColumnId::name)
Expand Down Expand Up @@ -380,8 +382,10 @@ protected Map<String, Object> computeInitialOffset(
Map<String, Object> partitionOffset,
TimeZone timezone) {
if (!(partitionOffset == null)) {
log.info("Partition offset for '{}' is not null. Using existing offset.", tableOrQuery);
return partitionOffset;
} else {
log.info("Partition offset for '{}' is null. Computing initial offset.", tableOrQuery);
Map<String, Object> initialPartitionOffset = null;
// no offsets found
Long timestampInitial = config.getLong(JdbcSourceConnectorConfig.TIMESTAMP_INITIAL_CONFIG);
Expand Down Expand Up @@ -571,6 +575,7 @@ private void validateNonNullable(
String incrementingColumn,
List<String> timestampColumns
) {
log.info("Validating non-nullable fields for table: {}", table);
try {
Set<String> lowercaseTsColumns = new HashSet<>();
for (String timestampColumn: timestampColumns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public List<TableId> tables() {
awaitTablesReady(startupMs);
List<TableId> tablesSnapshot = tables.get();
if (tablesSnapshot == null) {
log.info("No Tables snapshot available");
return null;
}

Expand All @@ -120,7 +121,7 @@ public List<TableId> tables() {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (tablesSnapshot.isEmpty()) {
log.debug(
log.info(
"Based on the supplied filtering rules, there are no matching tables to read from"
);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ public TimestampIncrementingTableQuerier(DatabaseDialect dialect, QueryMode mode

this.timeZone = timeZone;
this.timestampGranularity = timestampGranularity;
log.trace(
"TimestampIncrementingTableQuerier initialized with timeZone: {}, timestampGranularity: {}",
timeZone,
timestampGranularity);
}

/**
Expand All @@ -124,6 +128,7 @@ public TimestampIncrementingTableQuerier(DatabaseDialect dialect, QueryMode mode

@Override
protected void createPreparedStatement(Connection db) throws SQLException {
log.debug("Creating PreparedStatement");
findDefaultAutoIncrementingColumn(db);

ColumnId incrementingColumn = null;
Expand Down Expand Up @@ -166,6 +171,7 @@ public void maybeStartQuery(Connection db) throws SQLException, ConnectException
ResultSetMetaData metadata = resultSet.getMetaData();
dialect.validateSpecificColumnTypes(metadata, timestampColumns);
schemaMapping = SchemaMapping.create(schemaName, metadata, dialect);
log.info("Current Result is null. Executing query.");
} else {
log.trace("Current ResultSet {} isn't null. Continuing to seek.", resultSet.hashCode());
}
Expand Down Expand Up @@ -200,6 +206,7 @@ private void findDefaultAutoIncrementingColumn(Connection db) throws SQLExceptio
for (ColumnDefinition defn : dialect.describeColumnsByQuerying(db, tableId).values()) {
if (defn.isAutoIncrement()) {
incrementingColumnName = defn.id().name();
log.info("Found auto incrementing column after fallback: {}", incrementingColumnName);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ protected ResultSet executeQuery() throws SQLException {
@Override
public SourceRecord extractRecord() {
if (nextRecord == null) {
log.error("No more records are available");
throw new IllegalStateException("No more records are available");
}
PendingRecord currentRecord = nextRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public synchronized Connection getConnection() {
log.debug("Could not establish connection with database.", sqle);
throw new ConnectException(sqle);
}
log.debug("Database connection established.");
log.info("Database connection established.");
return connection;
}

Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/confluent/connect/jdbc/util/JdbcDriverInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@

package io.confluent.connect.jdbc.util;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A summary of the version information about a JDBC driver and the database.
*/
public class JdbcDriverInfo {

private static final Logger log = LoggerFactory.getLogger(JdbcDriverInfo.class);

private final int jdbcMajorVersion;
private final int jdbcMinorVersion;
private final String jdbcDriverName;
Expand Down Expand Up @@ -108,9 +114,13 @@ public boolean jdbcVersionAtLeast(
int jdbcMinorVersion
) {
if (this.jdbcMajorVersion() > jdbcMajorVersion) {
log.info("JDBC driver version {} is newer than required version {}.{}",
this.jdbcMajorVersion(), jdbcMajorVersion, jdbcMinorVersion);
return true;
}
if (jdbcMajorVersion == jdbcMajorVersion() && jdbcMinorVersion() >= jdbcMinorVersion) {
log.info("JDBC driver version {} is newer than required version {}.{}",
this.jdbcMinorVersion(), jdbcMajorVersion, jdbcMinorVersion);
return true;
}
return false;
Expand Down

0 comments on commit 74402db

Please sign in to comment.