Skip to content

Commit

Permalink
[SNOW-594195] Add correlationId to logging (#487)
Browse files Browse the repository at this point in the history
* smh static doesnt work

* logginghandler with static cid, logging -> enablelogging

* test gitignore

* passes tests

* corrid at beginning

* remove yml files from git cache

* add yml, copyright; change logger class name

* ran formatter

* remove wildcard imports

* added logging handler tests

* review changes

* formatter
  • Loading branch information
sfc-gh-rcheng authored Sep 21, 2022
1 parent 164632b commit 4d35849
Show file tree
Hide file tree
Showing 37 changed files with 812 additions and 573 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package com.snowflake.kafka.connector;

import com.snowflake.kafka.connector.internal.Logging;
import com.snowflake.kafka.connector.internal.LoggerHandler;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
Expand All @@ -26,12 +26,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* SnowflakeSinkConnector implements SinkConnector for Kafka Connect framework.
Expand All @@ -43,6 +42,9 @@
* running on Kafka Connect Workers.
*/
public class SnowflakeSinkConnector extends SinkConnector {
// create logger without correlationId for now
private static LoggerHandler LOGGER = new LoggerHandler(SnowflakeSinkConnector.class.getName());

private Map<String, String> config; // connector configuration, provided by
// user through kafka connect framework
private String connectorName; // unique name of this connector instance
Expand All @@ -55,8 +57,6 @@ public class SnowflakeSinkConnector extends SinkConnector {
private SnowflakeTelemetryService telemetryClient;
private long connectorStartTime;

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSinkConnector.class);

// Kafka Connect starts sink tasks without waiting for setup in
// SnowflakeSinkConnector to finish.
// This causes race conditions for: config validation, tables and stages
Expand All @@ -80,7 +80,11 @@ public SnowflakeSinkConnector() {
@Override
public void start(final Map<String, String> parsedConfig) {
Utils.checkConnectorVersion();
LOGGER.info(Logging.logMessage("SnowflakeSinkConnector:start"));

// initialize logging with correlationId
LoggerHandler.setCorrelationUuid(UUID.randomUUID());

LOGGER.info("SnowflakeSinkConnector:start");
setupComplete = false;
connectorStartTime = System.currentTimeMillis();

Expand Down Expand Up @@ -118,7 +122,7 @@ public void start(final Map<String, String> parsedConfig) {
@Override
public void stop() {
setupComplete = false;
LOGGER.info(Logging.logMessage("SnowflakeSinkConnector:stop"));
LOGGER.info("SnowflakeSinkConnector:stop");
telemetryClient.reportKafkaConnectStop(connectorStartTime);
}

Expand Down Expand Up @@ -156,10 +160,10 @@ public List<Map<String, String>> taskConfigs(final int maxTasks) {
} else {
counter++;
try {
LOGGER.info(Logging.logMessage("Sleeping 5000ms to allow setup to " + "complete."));
LOGGER.info("Sleeping 5000ms to allow setup to " + "complete.");
Thread.sleep(5000);
} catch (InterruptedException ex) {
LOGGER.warn(Logging.logMessage("Waiting for setup to complete got " + "interrupted"));
LOGGER.warn("Waiting for setup to complete got " + "interrupted");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.snowflake.kafka.connector.internal.Logging;
import com.snowflake.kafka.connector.internal.LoggerHandler;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.StreamingUtils;
import java.util.Arrays;
Expand All @@ -31,8 +31,6 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* SnowflakeSinkConnectorConfig class is used for specifying the set of expected configurations. For
Expand Down Expand Up @@ -116,8 +114,8 @@ public class SnowflakeSinkConnectorConfig {
public static final String REBALANCING = "snowflake.test.rebalancing";
public static final boolean REBALANCING_DEFAULT = false;

private static final Logger LOGGER =
LoggerFactory.getLogger(SnowflakeSinkConnectorConfig.class.getName());
private static final LoggerHandler LOGGER =
new LoggerHandler(SnowflakeSinkConnectorConfig.class.getName());

private static final ConfigDef.Validator nonEmptyStringValidator = new ConfigDef.NonEmptyString();
private static final ConfigDef.Validator topicToTableValidator = new TopicToTableValidator();
Expand Down Expand Up @@ -187,7 +185,7 @@ public static void setDefaultValues(Map<String, String> config) {
static void setFieldToDefaultValues(Map<String, String> config, String field, Long value) {
if (!config.containsKey(field)) {
config.put(field, value + "");
LOGGER.info(Logging.logMessage("{} set to default {} seconds", field, value));
LOGGER.info("{} set to default {} seconds", field, value);
}
}

Expand Down
75 changes: 29 additions & 46 deletions src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.snowflake.kafka.connector.dlq.KafkaRecordErrorReporter;
import com.snowflake.kafka.connector.internal.Logging;
import com.snowflake.kafka.connector.internal.LoggerHandler;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
Expand All @@ -42,8 +42,6 @@
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* SnowflakeSinkTask implements SinkTask for Kafka Connect framework.
Expand Down Expand Up @@ -79,7 +77,7 @@ public class SnowflakeSinkTask extends SinkTask {
// check connect-distributed.properties file used to start kafka connect
private final int rebalancingSleepTime = 370000;

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSinkTask.class);
private static final LoggerHandler LOGGER = new LoggerHandler(SnowflakeSinkTask.class.getName());

/** default constructor, invoked by kafka connect framework */
public SnowflakeSinkTask() {
Expand Down Expand Up @@ -124,7 +122,7 @@ public void start(final Map<String, String> parsedConfig) {
long startTime = System.currentTimeMillis();
this.id = parsedConfig.getOrDefault(Utils.TASK_ID, "-1");

LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:start", this.id));
LOGGER.info("SnowflakeSinkTask[ID:{}]:start", this.id);
// connector configuration

// generate topic to table map
Expand Down Expand Up @@ -215,10 +213,9 @@ public void start(final Map<String, String> parsedConfig) {
.build();

LOGGER.info(
Logging.logMessage(
"SnowflakeSinkTask[ID:{}]:start. Time: {} seconds",
this.id,
(System.currentTimeMillis() - startTime) / 1000));
"SnowflakeSinkTask[ID:{}]:start. Time: {} seconds",
this.id,
(System.currentTimeMillis() - startTime) / 1000);
}

/**
Expand All @@ -227,7 +224,7 @@ public void start(final Map<String, String> parsedConfig) {
*/
@Override
public void stop() {
LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:stop", this.id));
LOGGER.info("SnowflakeSinkTask[ID:{}]:stop", this.id);
if (this.sink != null) {
this.sink.setIsStoppedToTrue(); // close cleaner thread
}
Expand All @@ -242,18 +239,14 @@ public void stop() {
public void open(final Collection<TopicPartition> partitions) {
long startTime = System.currentTimeMillis();
LOGGER.info(
Logging.logMessage(
"SnowflakeSinkTask[ID:{}]:open, TopicPartition number: {}",
this.id,
partitions.size()));
"SnowflakeSinkTask[ID:{}]:open, TopicPartition number: {}", this.id, partitions.size());
partitions.forEach(
tp -> this.sink.startTask(Utils.tableName(tp.topic(), this.topic2table), tp));

LOGGER.info(
Logging.logMessage(
"SnowflakeSinkTask[ID:{}]:open. Time: {} seconds",
this.id,
(System.currentTimeMillis() - startTime) / 1000));
"SnowflakeSinkTask[ID:{}]:open. Time: {} seconds",
this.id,
(System.currentTimeMillis() - startTime) / 1000);
}

/**
Expand All @@ -267,16 +260,15 @@ public void open(final Collection<TopicPartition> partitions) {
@Override
public void close(final Collection<TopicPartition> partitions) {
long startTime = System.currentTimeMillis();
LOGGER.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:close", this.id));
LOGGER.info("SnowflakeSinkTask[ID:{}]:close", this.id);
if (this.sink != null) {
this.sink.close(partitions);
}

LOGGER.info(
Logging.logMessage(
"SnowflakeSinkTask[ID:{}]:close. Time: {} seconds",
this.id,
(System.currentTimeMillis() - startTime) / 1000));
"SnowflakeSinkTask[ID:{}]:close. Time: {} seconds",
this.id,
(System.currentTimeMillis() - startTime) / 1000);
}

/**
Expand All @@ -291,8 +283,7 @@ public void put(final Collection<SinkRecord> records) {
}

long startTime = System.currentTimeMillis();
LOGGER.debug(
Logging.logMessage("SnowflakeSinkTask[ID:{}]:put {} records", this.id, records.size()));
LOGGER.debug("SnowflakeSinkTask[ID:{}]:put {} records", this.id, records.size());

getSink().insert(records);

Expand All @@ -313,19 +304,15 @@ public void put(final Collection<SinkRecord> records) {
public Map<TopicPartition, OffsetAndMetadata> preCommit(
Map<TopicPartition, OffsetAndMetadata> offsets) throws RetriableException {
long startTime = System.currentTimeMillis();
LOGGER.debug(
Logging.logMessage("SnowflakeSinkTask[ID:{}]:preCommit {}", this.id, offsets.size()));
LOGGER.debug("SnowflakeSinkTask[ID:{}]:preCommit {}", this.id, offsets.size());

// return an empty map means that offset commitment is not desired
if (sink == null || sink.isClosed()) {
LOGGER.warn(
Logging.logMessage(
"SnowflakeSinkTask[ID:{}]: sink " + "not initialized or closed before preCommit",
this.id));
"SnowflakeSinkTask[ID:{}]: sink not initialized or closed before preCommit", this.id);
return new HashMap<>();
} else if (sink.getPartitionCount() == 0) {
LOGGER.warn(
Logging.logMessage("SnowflakeSinkTask[ID:{}]: no partition is assigned", this.id));
LOGGER.warn("SnowflakeSinkTask[ID:{}]: no partition is assigned", this.id);
return new HashMap<>();
}

Expand All @@ -341,10 +328,7 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
});
} catch (Exception e) {
LOGGER.error(
Logging.logMessage(
"SnowflakeSinkTask[ID:{}]: Error " + "while preCommit: {} ",
this.id,
e.getMessage()));
"SnowflakeSinkTask[ID:{}]: Error " + "while preCommit: {} ", this.id, e.getMessage());
return new HashMap<>();
}

Expand All @@ -371,7 +355,7 @@ static Map<String, String> getTopicToTableMap(Map<String, String> config) {
if (result != null) {
return result;
}
LOGGER.error(Logging.logMessage("Invalid Input, Topic2Table Map disabled"));
LOGGER.error("Invalid Input, Topic2Table Map disabled");
}
return new HashMap<>();
}
Expand Down Expand Up @@ -399,15 +383,14 @@ void logWarningForPutAndPrecommit(long startTime, int size, String apiName) {
// seconds.
// But having this warning helps customer to debug their Kafka Connect config.
LOGGER.warn(
Logging.logMessage(
"SnowflakeSinkTask[ID:{}]:{} {}. Time: {} seconds > 300 seconds. If there is"
+ " CommitFailedException in the log or there is duplicated records, refer to"
+ " this link for solution: "
+ "https://docs.snowflake.com/en/user-guide/kafka-connector-ts.html#resolving-specific-issues",
this.id,
apiName,
size,
executionTime));
"SnowflakeSinkTask[ID:{}]:{} {}. Time: {} seconds > 300 seconds. If there is"
+ " CommitFailedException in the log or there is duplicated records, refer to this"
+ " link for solution: "
+ "https://docs.snowflake.com/en/user-guide/kafka-connector-ts.html#resolving-specific-issues",
this.id,
apiName,
size,
executionTime);
}
}

Expand Down
Loading

0 comments on commit 4d35849

Please sign in to comment.