diff --git a/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java b/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java index 518d73524..bd2eeeda8 100644 --- a/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java +++ b/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java @@ -89,6 +89,7 @@ public void start(Map properties) throws ConnectException { config ); cachedConnectionProvider = connectionProvider(maxConnectionAttempts, connectionRetryBackoff); + log.info("Cached Connection Provider created"); // Initial connection attempt log.info("Initial connection attempt with the database."); @@ -110,6 +111,10 @@ public void start(Map properties) throws ConnectException { String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG); if (!query.isEmpty()) { if (whitelistSet != null || blacklistSet != null) { + log.error( + "Configuration error: {} is set, but table whitelist or blacklist is also specified." + + "These settings cannot be used together.", + JdbcSourceConnectorConfig.QUERY_CONFIG); throw new ConnectException(JdbcSourceConnectorConfig.QUERY_CONFIG + " may not be combined" + " with whole-table copying settings."); } @@ -130,6 +135,7 @@ public void start(Map properties) throws ConnectException { ); if (query.isEmpty()) { tableMonitorThread.start(); + log.info("Starting Table Monitor Thread"); } } @@ -156,6 +162,7 @@ public List> taskConfigs(int maxTasks) { String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG); List> taskConfigs; if (!query.isEmpty()) { + log.info("Custom query provided, generating task configuration for the query"); Map taskProps = new HashMap<>(configProperties); taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, ""); taskProps.put(JdbcSourceTaskConfig.TABLES_FETCHED, "true"); @@ -163,6 +170,7 @@ public List> taskConfigs(int maxTasks) { log.trace("Producing task configs with custom query"); return taskConfigs; } else { + log.info("No custom query provided, generating task configurations for tables"); List currentTables = tableMonitorThread.tables(); if (currentTables == null || currentTables.isEmpty()) { taskConfigs = new ArrayList<>(1); diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java index 0162a17f6..d50a08c19 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -615,6 +615,7 @@ public void setConnectionIsolationMode( Connection connection, TransactionIsolationMode transactionIsolationMode ) { + log.info("Setting connection isolation mode to: {}", transactionIsolationMode.name()); if (transactionIsolationMode == TransactionIsolationMode.DEFAULT) { return; diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java index 33a60cf13..d0af7d8d9 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java @@ -83,16 +83,21 @@ void write(final Collection records) buffer.flush(); buffer.close(); } + log.trace("Committing transaction"); connection.commit(); } catch (SQLException | TableAlterOrCreateException e) { + log.error("Error during write operation. Attempting rollback.", e); try { connection.rollback(); + log.info("Successfully rolled back transaction"); } catch (SQLException sqle) { + log.error("Failed to rollback transaction", sqle); e.addSuppressed(sqle); } finally { throw e; } } + log.info("Completed write operation for {} records to the database", records.size()); } void closeQuietly() { diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java index dc6707855..0b3f336f1 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java @@ -88,6 +88,7 @@ public void put(Collection records) { ); try { writer.write(records); + log.info("Successfully wrote {} records.", recordsCount); } catch (TableAlterOrCreateException tace) { if (reporter != null) { unrollAndRetry(records); @@ -139,6 +140,7 @@ public void put(Collection records) { private void unrollAndRetry(Collection records) { writer.closeQuietly(); initWriter(); + log.warn("Retrying write operation for {} records.", records.size()); for (SinkRecord record : records) { try { writer.write(Collections.singletonList(record)); diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index 9a984b774..4ce538aa1 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -408,11 +408,14 @@ public Config validateMultiConfigs(Config config) { ) ) { configValues - .get(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG) - .addErrorMessage("Isolation mode of `" - + TransactionIsolationMode.SQL_SERVER_SNAPSHOT.name() - + "` can only be configured with a Sql Server Dialect" - ); + .get(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG) + .addErrorMessage( + "Isolation mode of `" + + TransactionIsolationMode.SQL_SERVER_SNAPSHOT.name() + + "` can only be configured with a Sql Server Dialect"); + LOG.warn( + "Isolation mode of '{}' can only be configured with a Sql Server Dialect", + TransactionIsolationMode.SQL_SERVER_SNAPSHOT.name()); } } @@ -854,6 +857,7 @@ public CachedRecommenderValues( public List cachedValue(Map config, long currentTimeInMillis) { if (currentTimeInMillis < expiryTimeInMillis && lastConfig != null && lastConfig.equals(config)) { + LOG.trace("Returning Cached values for the given configuration."); return results; } return null; diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java index 413e1658d..39dec4595 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java @@ -204,6 +204,7 @@ public void start(Map properties) { for (String tableOrQuery : tablesOrQuery) { final List> tablePartitionsToCheck; final Map partition; + log.trace("Task executing in {} mode",queryMode); switch (queryMode) { case TABLE: if (validateNonNulls) { @@ -319,6 +320,7 @@ private void validateColumnsExist( final Connection conn = cachedConnectionProvider.getConnection(); boolean autoCommit = conn.getAutoCommit(); try { + log.info("Validating columns exist for table"); conn.setAutoCommit(true); Map defnsById = dialect.describeColumns(conn, table, null); Set columnNames = defnsById.keySet().stream().map(ColumnId::name) @@ -380,8 +382,10 @@ protected Map computeInitialOffset( Map partitionOffset, TimeZone timezone) { if (!(partitionOffset == null)) { + log.info("Partition offset for '{}' is not null. Using existing offset.", tableOrQuery); return partitionOffset; } else { + log.info("Partition offset for '{}' is null. Computing initial offset.", tableOrQuery); Map initialPartitionOffset = null; // no offsets found Long timestampInitial = config.getLong(JdbcSourceConnectorConfig.TIMESTAMP_INITIAL_CONFIG); @@ -571,6 +575,7 @@ private void validateNonNullable( String incrementingColumn, List timestampColumns ) { + log.info("Validating non-nullable fields for table: {}", table); try { Set lowercaseTsColumns = new HashSet<>(); for (String timestampColumn: timestampColumns) { diff --git a/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java b/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java index 86136dbb3..3cec4a95e 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java +++ b/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java @@ -110,6 +110,7 @@ public List tables() { awaitTablesReady(startupMs); List tablesSnapshot = tables.get(); if (tablesSnapshot == null) { + log.info("No Tables snapshot available"); return null; } @@ -120,7 +121,7 @@ public List tables() { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); if (tablesSnapshot.isEmpty()) { - log.debug( + log.info( "Based on the supplied filtering rules, there are no matching tables to read from" ); } else { diff --git a/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java b/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java index 6be4f68be..014ff9203 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java +++ b/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java @@ -115,6 +115,10 @@ public TimestampIncrementingTableQuerier(DatabaseDialect dialect, QueryMode mode this.timeZone = timeZone; this.timestampGranularity = timestampGranularity; + log.trace( + "TimestampIncrementingTableQuerier initialized with timeZone: {}, timestampGranularity: {}", + timeZone, + timestampGranularity); } /** @@ -124,6 +128,7 @@ public TimestampIncrementingTableQuerier(DatabaseDialect dialect, QueryMode mode @Override protected void createPreparedStatement(Connection db) throws SQLException { + log.debug("Creating PreparedStatement"); findDefaultAutoIncrementingColumn(db); ColumnId incrementingColumn = null; @@ -166,6 +171,7 @@ public void maybeStartQuery(Connection db) throws SQLException, ConnectException ResultSetMetaData metadata = resultSet.getMetaData(); dialect.validateSpecificColumnTypes(metadata, timestampColumns); schemaMapping = SchemaMapping.create(schemaName, metadata, dialect); + log.info("Current Result is null. Executing query."); } else { log.trace("Current ResultSet {} isn't null. Continuing to seek.", resultSet.hashCode()); } @@ -200,6 +206,7 @@ private void findDefaultAutoIncrementingColumn(Connection db) throws SQLExceptio for (ColumnDefinition defn : dialect.describeColumnsByQuerying(db, tableId).values()) { if (defn.isAutoIncrement()) { incrementingColumnName = defn.id().name(); + log.info("Found auto incrementing column after fallback: {}", incrementingColumnName); break; } } diff --git a/src/main/java/io/confluent/connect/jdbc/source/TimestampTableQuerier.java b/src/main/java/io/confluent/connect/jdbc/source/TimestampTableQuerier.java index 625660629..4c892caa1 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/TimestampTableQuerier.java +++ b/src/main/java/io/confluent/connect/jdbc/source/TimestampTableQuerier.java @@ -124,6 +124,7 @@ protected ResultSet executeQuery() throws SQLException { @Override public SourceRecord extractRecord() { if (nextRecord == null) { + log.error("No more records are available"); throw new IllegalStateException("No more records are available"); } PendingRecord currentRecord = nextRecord; diff --git a/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java b/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java index e8863e88a..041845f68 100644 --- a/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java +++ b/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java @@ -61,7 +61,7 @@ public synchronized Connection getConnection() { log.debug("Could not establish connection with database.", sqle); throw new ConnectException(sqle); } - log.debug("Database connection established."); + log.info("Database connection established."); return connection; } diff --git a/src/main/java/io/confluent/connect/jdbc/util/JdbcDriverInfo.java b/src/main/java/io/confluent/connect/jdbc/util/JdbcDriverInfo.java index 86c0b2845..cc4dfac98 100644 --- a/src/main/java/io/confluent/connect/jdbc/util/JdbcDriverInfo.java +++ b/src/main/java/io/confluent/connect/jdbc/util/JdbcDriverInfo.java @@ -15,11 +15,17 @@ package io.confluent.connect.jdbc.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A summary of the version information about a JDBC driver and the database. */ public class JdbcDriverInfo { + private static final Logger log = LoggerFactory.getLogger(JdbcDriverInfo.class); + private final int jdbcMajorVersion; private final int jdbcMinorVersion; private final String jdbcDriverName; @@ -108,9 +114,13 @@ public boolean jdbcVersionAtLeast( int jdbcMinorVersion ) { if (this.jdbcMajorVersion() > jdbcMajorVersion) { + log.info("JDBC driver version {} is newer than required version {}.{}", + this.jdbcMajorVersion(), jdbcMajorVersion, jdbcMinorVersion); return true; } if (jdbcMajorVersion == jdbcMajorVersion() && jdbcMinorVersion() >= jdbcMinorVersion) { + log.info("JDBC driver version {} is newer than required version {}.{}", + this.jdbcMinorVersion(), jdbcMajorVersion, jdbcMinorVersion); return true; } return false;