Skip to content

Commit

Permalink
review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Sep 20, 2022
1 parent afefb11 commit e714da2
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 23 deletions.
21 changes: 11 additions & 10 deletions src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package com.snowflake.kafka.connector;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.DELIVERY_GUARANTEE;

import com.google.common.annotations.VisibleForTesting;
import com.snowflake.kafka.connector.dlq.KafkaRecordErrorReporter;
import com.snowflake.kafka.connector.internal.LoggerHandler;
Expand All @@ -28,13 +26,6 @@
import com.snowflake.kafka.connector.internal.SnowflakeSinkServiceFactory;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.records.SnowflakeMetadataConfig;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
Expand All @@ -43,6 +34,16 @@
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.DELIVERY_GUARANTEE;

/**
* SnowflakeSinkTask implements SinkTask for Kafka Connect framework.
*
Expand Down Expand Up @@ -309,7 +310,7 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
// return an empty map means that offset commitment is not desired
if (sink == null || sink.isClosed()) {
LOGGER.warn(
"SnowflakeSinkTask[ID:{}]: sink " + "not initialized or closed before preCommit",
"SnowflakeSinkTask[ID:{}]: sink not initialized or closed before preCommit",
this.id);
return new HashMap<>();
} else if (sink.getPartitionCount() == 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 Snowflake Inc. All rights reserved.
* Copyright (c) 2022 Snowflake Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
package com.snowflake.kafka.connector.internal;

import com.snowflake.kafka.connector.Utils;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.UUID;

/**
* Attaches additional fields to the logs
*/
public class LoggerHandler {
// static properties and methods
private static final UUID CORRELATION_ID_EMPTY = null;
private static final Logger META_LOGGER = LoggerFactory.getLogger(LoggerHandler.class.getName());
private static UUID LOGGER_CORRELATION_ID = CORRELATION_ID_EMPTY;
private static UUID loggerCorrelationId = CORRELATION_ID_EMPTY;

// should only be called on start
/**
* Sets the correlationId for all loggers. This should only be called in start so that the entire kafka connector
* instance has the same correlationId logging
* @param correlationId UUID attached for every log
*/
public static void setCorrelationUuid(UUID correlationId) {
LOGGER_CORRELATION_ID = correlationId;
loggerCorrelationId = correlationId;

if (correlationId == null) {
META_LOGGER.warn(
Expand All @@ -30,7 +38,10 @@ public static void setCorrelationUuid(UUID correlationId) {

private Logger logger;

// create logger handler without changing correlationId
/**
* Create and return a new logging handler
* @param name The class name passed for initializing the logger
*/
public LoggerHandler(String name) {
this.logger = LoggerFactory.getLogger(name);

Expand All @@ -39,92 +50,155 @@ public LoggerHandler(String name) {
Utils.formatLogMessage(
"Created loggerHandler for class: '{}' with correlationId: " + "'{}'",
name,
LOGGER_CORRELATION_ID.toString()));
loggerCorrelationId.toString()));
} else {
META_LOGGER.info(
Utils.formatLogMessage(
"Created loggerHandler for class: '{}' without a correlationId.", name));
}
}

// only message
/**
* Logs an info level message
* @param msg The message to be logged
*/
public void info(String msg) {
if (this.logger.isInfoEnabled()) {
this.logger.info(getFormattedMsg(msg));
}
}

/**
* Logs a trace level message
* @param msg The message to be logged
*/
public void trace(String msg) {
if (this.logger.isTraceEnabled()) {
this.logger.trace(getFormattedMsg(msg));
}
}

/**
* Logs a debug level message
* @param msg The message to be logged
*/
public void debug(String msg) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(getFormattedMsg(msg));
}
}

/**
* Logs a warn level message
* @param msg The message to be logged
*/
public void warn(String msg) {
if (this.logger.isWarnEnabled()) {
this.logger.warn(getFormattedMsg(msg));
}
}

/**
* Logs an error level message
* @param msg The message to be logged
*/
public void error(String msg) {
if (this.logger.isErrorEnabled()) {
this.logger.error(getFormattedMsg(msg));
}
}

// format and variables
/**
* Logs an info level message
* @param format The message format without variables
* @param vars The variables to insert into the format. These variables will be toString()'ed
*/
public void info(String format, Object... vars) {
if (this.logger.isInfoEnabled()) {
this.logger.info(getFormattedMsg(format, vars));
}
}

/**
* Logs an trace level message
* @param format The message format without variables
* @param vars The variables to insert into the format. These variables will be toString()'ed
*/
public void trace(String format, Object... vars) {
if (this.logger.isTraceEnabled()) {
this.logger.trace(getFormattedMsg(format, vars));
}
}

/**
* Logs an debug level message
* @param format The message format without variables
* @param vars The variables to insert into the format. These variables will be toString()'ed
*/
public void debug(String format, Object... vars) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(getFormattedMsg(format, vars));
}
}

/**
* Logs an warn level message
* @param format The message format without variables
* @param vars The variables to insert into the format. These variables will be toString()'ed
*/
public void warn(String format, Object... vars) {
if (this.logger.isWarnEnabled()) {
this.logger.warn(getFormattedMsg(format, vars));
}
}

/**
* Logs an error level message
* @param format The message format without variables
* @param vars The variables to insert into the format. These variables will be toString()'ed
*/
public void error(String format, Object... vars) {
if (this.logger.isErrorEnabled()) {
this.logger.error(getFormattedMsg(format, vars));
}
}

// format correctly and add correlationId tag if exists

/**
* Format the message correctly by attaching correlationId if needed and passing to Utils for more formatting
* @param msg The message that needs to be prepended with tags
* @return The fully formatted string to be logged
*/
private String getFormattedMsg(String msg) {
return Utils.formatLogMessage(getCorrelationIdStr() + msg);
}

/**
* Format the message correctly by injecting the variables, attaching correlationId if needed, and passing to Utils
* for more formatting
* @param msg The message format without variables that needs to be prepended with tags
* @param vars The variables to insert into the format. These variables will be toString()'ed
* @return The fully formatted string to be logged
*/
private String getFormattedMsg(String msg, Object... vars) {
return Utils.formatLogMessage(getCorrelationIdStr() + msg, vars);
}

/**
* Check if the correlationId is valid
* @return true if the correlationId is valid, false otherwise
*/
private static boolean isCorrelationIdValid() {
return LOGGER_CORRELATION_ID != null
&& !LOGGER_CORRELATION_ID.toString().isEmpty()
&& LOGGER_CORRELATION_ID != CORRELATION_ID_EMPTY;
return loggerCorrelationId != CORRELATION_ID_EMPTY
&& !loggerCorrelationId.toString().isEmpty();
}

/**
* Creates the correlationId tag
* @return The correlationId tag
*/
private static String getCorrelationIdStr() {
return isCorrelationIdValid() ? "[" + LOGGER_CORRELATION_ID.toString() + "] " : "";
return isCorrelationIdValid() ? "[" + loggerCorrelationId.toString() + "] " : "";
}
}

0 comments on commit e714da2

Please sign in to comment.