Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Log4j2 instead of SLF4J for our own loggers #864

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
<slf4j.version>1.7.36</slf4j.version>
<vertx.version>4.5.2</vertx.version>
<vertx-testing.version>4.5.2</vertx-testing.version>
<netty.version>4.1.106.Final</netty.version>
Expand Down Expand Up @@ -193,9 +192,9 @@
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand Down
19 changes: 9 additions & 10 deletions src/main/java/io/strimzi/kafka/bridge/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import javax.management.MalformedObjectNameException;
import java.io.BufferedReader;
Expand All @@ -44,8 +44,7 @@
* Apache Kafka bridge main application class
*/
public class Application {

private static final Logger log = LoggerFactory.getLogger(Application.class);
private static final Logger LOGGER = LogManager.getLogger(Application.class);

private static final String KAFKA_BRIDGE_METRICS_ENABLED = "KAFKA_BRIDGE_METRICS_ENABLED";

Expand All @@ -55,12 +54,12 @@ public class Application {
* @param args command line arguments
*/
public static void main(String[] args) {
log.info("Strimzi Kafka Bridge {} is starting", Application.class.getPackage().getImplementationVersion());
LOGGER.info("Strimzi Kafka Bridge {} is starting", Application.class.getPackage().getImplementationVersion());
try {
VertxOptions vertxOptions = new VertxOptions();
JmxCollectorRegistry jmxCollectorRegistry = null;
if (Boolean.parseBoolean(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) {
log.info("Metrics enabled and exposed on the /metrics endpoint");
LOGGER.info("Metrics enabled and exposed on the /metrics endpoint");
// setup Micrometer metrics options
vertxOptions.setMetricsOptions(metricsOptions());
jmxCollectorRegistry = getJmxCollectorRegistry();
Expand All @@ -75,7 +74,7 @@ public static void main(String[] args) {

Map<String, Object> config = ConfigRetriever.getConfig(absoluteFilePath(commandLine.getOptionValue("config-file")));
BridgeConfig bridgeConfig = BridgeConfig.fromMap(config);
log.info("Bridge configuration {}", bridgeConfig);
LOGGER.info("Bridge configuration {}", bridgeConfig);

deployHttpBridge(vertx, bridgeConfig, metricsReporter).onComplete(done -> {
if (done.succeeded()) {
Expand All @@ -84,7 +83,7 @@ public static void main(String[] args) {
}
});
} catch (RuntimeException | MalformedObjectNameException | IOException | ParseException e) {
log.error("Error starting the bridge", e);
LOGGER.error("Error starting the bridge", e);
System.exit(1);
}
}
Expand Down Expand Up @@ -122,10 +121,10 @@ private static Future<HttpBridge> deployHttpBridge(Vertx vertx, BridgeConfig bri
HttpBridge httpBridge = new HttpBridge(bridgeConfig, metricsReporter);
vertx.deployVerticle(httpBridge, done -> {
if (done.succeeded()) {
log.info("HTTP verticle instance deployed [{}]", done.result());
LOGGER.info("HTTP verticle instance deployed [{}]", done.result());
httpPromise.complete(httpBridge);
} else {
log.error("Failed to deploy HTTP verticle instance", done.cause());
LOGGER.error("Failed to deploy HTTP verticle instance", done.cause());
httpPromise.fail(done.cause());
}
});
Expand Down
30 changes: 15 additions & 15 deletions src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
Expand All @@ -27,7 +27,7 @@
* Represents a Kafka bridge admin client
*/
public class KafkaBridgeAdmin {
private final Logger log = LoggerFactory.getLogger(KafkaBridgeAdmin.class);
private static final Logger LOGGER = LogManager.getLogger(KafkaBridgeAdmin.class);

private final KafkaConfig kafkaConfig;
private AdminClient adminClient;
Expand Down Expand Up @@ -68,13 +68,13 @@ public void close() {
* @return a CompletionStage bringing the set of topics
*/
public CompletionStage<Set<String>> listTopics() {
log.trace("List topics thread {}", Thread.currentThread());
log.info("List topics");
LOGGER.trace("List topics thread {}", Thread.currentThread());
LOGGER.info("List topics");
CompletableFuture<Set<String>> promise = new CompletableFuture<>();
this.adminClient.listTopics()
.names()
.whenComplete((topics, exception) -> {
log.trace("List topics callback thread {}", Thread.currentThread());
LOGGER.trace("List topics callback thread {}", Thread.currentThread());
if (exception == null) {
promise.complete(topics);
} else {
Expand All @@ -91,13 +91,13 @@ public CompletionStage<Set<String>> listTopics() {
* @return a CompletionStage bringing the description of the specified topics.
*/
public CompletionStage<Map<String, TopicDescription>> describeTopics(List<String> topicNames) {
log.trace("Describe topics thread {}", Thread.currentThread());
log.info("Describe topics {}", topicNames);
LOGGER.trace("Describe topics thread {}", Thread.currentThread());
LOGGER.info("Describe topics {}", topicNames);
CompletableFuture<Map<String, TopicDescription>> promise = new CompletableFuture<>();
this.adminClient.describeTopics(topicNames)
.allTopicNames()
.whenComplete((topics, exception) -> {
log.trace("Describe topics callback thread {}", Thread.currentThread());
LOGGER.trace("Describe topics callback thread {}", Thread.currentThread());
if (exception == null) {
promise.complete(topics);
} else {
Expand All @@ -114,13 +114,13 @@ public CompletionStage<Map<String, TopicDescription>> describeTopics(List<String
* @return a CompletionStage bringing the configuration of the specified resources.
*/
public CompletionStage<Map<ConfigResource, Config>> describeConfigs(List<ConfigResource> configResources) {
log.trace("Describe configs thread {}", Thread.currentThread());
log.info("Describe configs {}", configResources);
LOGGER.trace("Describe configs thread {}", Thread.currentThread());
LOGGER.info("Describe configs {}", configResources);
CompletableFuture<Map<ConfigResource, Config>> promise = new CompletableFuture<>();
this.adminClient.describeConfigs(configResources)
.all()
.whenComplete((configs, exception) -> {
log.trace("Describe configs callback thread {}", Thread.currentThread());
LOGGER.trace("Describe configs callback thread {}", Thread.currentThread());
if (exception == null) {
promise.complete(configs);
} else {
Expand All @@ -137,13 +137,13 @@ public CompletionStage<Map<ConfigResource, Config>> describeConfigs(List<ConfigR
* @return a CompletionStage bringing the offset spec for the given partition.
*/
public CompletionStage<Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
log.trace("Get offsets thread {}", Thread.currentThread());
log.info("Get the offset spec for partition {}", topicPartitionOffsets);
LOGGER.trace("Get offsets thread {}", Thread.currentThread());
LOGGER.info("Get the offset spec for partition {}", topicPartitionOffsets);
CompletableFuture<Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> promise = new CompletableFuture<>();
this.adminClient.listOffsets(topicPartitionOffsets)
.all()
.whenComplete((offsets, exception) -> {
log.trace("Get offsets callback thread {}", Thread.currentThread());
LOGGER.trace("Get offsets callback thread {}", Thread.currentThread());
if (exception == null) {
promise.complete(offsets);
} else {
Expand Down
39 changes: 19 additions & 20 deletions src/main/java/io/strimzi/kafka/bridge/KafkaBridgeConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.time.Duration;
import java.util.HashSet;
Expand All @@ -33,8 +33,7 @@
* @param <V> type of Kafka message payload
*/
public class KafkaBridgeConsumer<K, V> {

private final Logger log = LoggerFactory.getLogger(KafkaBridgeConsumer.class);
private static final Logger LOGGER = LogManager.getLogger(KafkaBridgeConsumer.class);

private final KafkaConfig kafkaConfig;
private final Deserializer<K> keyDeserializer;
Expand Down Expand Up @@ -102,18 +101,18 @@ public void subscribe(List<SinkTopicSubscription> topicSubscriptions) {
return;
}

log.info("Subscribe to topics {}", topicSubscriptions);
LOGGER.info("Subscribe to topics {}", topicSubscriptions);
Set<String> topics = topicSubscriptions.stream().map(SinkTopicSubscription::getTopic).collect(Collectors.toSet());
log.trace("Subscribe thread {}", Thread.currentThread());
LOGGER.trace("Subscribe thread {}", Thread.currentThread());
this.consumer.subscribe(topics, loggingPartitionsRebalance);
}

/**
* Unsubscribe all the topics which the consumer currently subscribes
*/
public void unsubscribe() {
log.info("Unsubscribe from topics");
log.trace("Unsubscribe thread {}", Thread.currentThread());
LOGGER.info("Unsubscribe from topics");
LOGGER.trace("Unsubscribe thread {}", Thread.currentThread());
this.consumer.unsubscribe();
}

Expand All @@ -123,8 +122,8 @@ public void unsubscribe() {
* @return set of topic partitions to which the consumer is subscribed
*/
public Set<TopicPartition> listSubscriptions() {
log.info("Listing subscribed topics");
log.trace("ListSubscriptions thread {}", Thread.currentThread());
LOGGER.info("Listing subscribed topics");
LOGGER.trace("ListSubscriptions thread {}", Thread.currentThread());
return this.consumer.assignment();
}

Expand All @@ -134,8 +133,8 @@ public Set<TopicPartition> listSubscriptions() {
* @param pattern Java regex for topics subscription
*/
public void subscribe(Pattern pattern) {
log.info("Subscribe to topics with pattern {}", pattern);
log.trace("Subscribe thread {}", Thread.currentThread());
LOGGER.info("Subscribe to topics with pattern {}", pattern);
LOGGER.trace("Subscribe thread {}", Thread.currentThread());
this.consumer.subscribe(pattern, loggingPartitionsRebalance);
}

Expand All @@ -149,7 +148,7 @@ public void assign(List<SinkTopicSubscription> topicSubscriptions) {
throw new IllegalArgumentException("Topic subscriptions cannot be null");
}

log.info("Assigning to topics partitions {}", topicSubscriptions);
LOGGER.info("Assigning to topics partitions {}", topicSubscriptions);
// TODO: maybe we don't need the SinkTopicSubscription class anymore? Removing "offset" field, it's now the same as TopicPartition class?
Set<TopicPartition> topicPartitions = new HashSet<>();
for (SinkTopicSubscription topicSubscription : topicSubscriptions) {
Expand All @@ -161,7 +160,7 @@ public void assign(List<SinkTopicSubscription> topicSubscriptions) {
return;
}

log.trace("Assign thread {}", Thread.currentThread());
LOGGER.trace("Assign thread {}", Thread.currentThread());
this.consumer.assign(topicPartitions);
}

Expand All @@ -172,7 +171,7 @@ public void assign(List<SinkTopicSubscription> topicSubscriptions) {
* @return records polled from the Kafka cluster
*/
public ConsumerRecords<K, V> poll(long timeout) {
log.trace("Poll thread {}", Thread.currentThread());
LOGGER.trace("Poll thread {}", Thread.currentThread());
return this.consumer.poll(Duration.ofMillis(timeout));
}

Expand All @@ -183,7 +182,7 @@ public ConsumerRecords<K, V> poll(long timeout) {
* @return map containing topic partitions and corresponding committed offsets
*/
public Map<TopicPartition, OffsetAndMetadata> commit(Map<TopicPartition, OffsetAndMetadata> offsetsData) {
log.trace("Commit thread {}", Thread.currentThread());
LOGGER.trace("Commit thread {}", Thread.currentThread());
// TODO: doesn't it make sense to change using the commitAsync?
// does it still make sense to return the offsets we get as parameter?
this.consumer.commitSync(offsetsData);
Expand All @@ -194,7 +193,7 @@ public Map<TopicPartition, OffsetAndMetadata> commit(Map<TopicPartition, OffsetA
* Commit offsets returned on the last poll() for all the subscribed list of topics and partitions
*/
public void commitLastPolledOffsets() {
log.trace("Commit thread {}", Thread.currentThread());
LOGGER.trace("Commit thread {}", Thread.currentThread());
// TODO: doesn't it make sense to change using the commitAsync?
this.consumer.commitSync();
}
Expand All @@ -206,7 +205,7 @@ public void commitLastPolledOffsets() {
* @param offset offset to seek to on the topic partition
*/
public void seek(TopicPartition topicPartition, long offset) {
log.trace("Seek thread {}", Thread.currentThread());
LOGGER.trace("Seek thread {}", Thread.currentThread());
this.consumer.seek(topicPartition, offset);
}

Expand All @@ -216,7 +215,7 @@ public void seek(TopicPartition topicPartition, long offset) {
* @param topicPartitionSet set of topic partition on which to seek at the beginning
*/
public void seekToBeginning(Set<TopicPartition> topicPartitionSet) {
log.trace("SeekToBeginning thread {}", Thread.currentThread());
LOGGER.trace("SeekToBeginning thread {}", Thread.currentThread());
this.consumer.seekToBeginning(topicPartitionSet);
}

Expand All @@ -226,7 +225,7 @@ public void seekToBeginning(Set<TopicPartition> topicPartitionSet) {
* @param topicPartitionSet set of topic partition on which to seek at the end
*/
public void seekToEnd(Set<TopicPartition> topicPartitionSet) {
log.trace("SeekToEnd thread {}", Thread.currentThread());
LOGGER.trace("SeekToEnd thread {}", Thread.currentThread());
this.consumer.seekToEnd(topicPartitionSet);
}
}
19 changes: 9 additions & 10 deletions src/main/java/io/strimzi/kafka/bridge/KafkaBridgeProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Properties;
import java.util.concurrent.CompletableFuture;
Expand All @@ -24,8 +24,7 @@
* Represents a Kafka bridge producer client
*/
public class KafkaBridgeProducer<K, V> {

private final Logger log = LoggerFactory.getLogger(KafkaBridgeProducer.class);
private static final Logger LOGGER = LogManager.getLogger(KafkaBridgeProducer.class);

private final KafkaConfig kafkaConfig;
private final Serializer<K> keySerializer;
Expand Down Expand Up @@ -56,11 +55,11 @@ public KafkaBridgeProducer(KafkaConfig kafkaConfig, Serializer<K> keySerializer,
*/
public CompletionStage<RecordMetadata> send(ProducerRecord<K, V> record) {
CompletableFuture<RecordMetadata> promise = new CompletableFuture<>();
log.trace("Send thread {}", Thread.currentThread());
log.debug("Sending record {}", record);
LOGGER.trace("Send thread {}", Thread.currentThread());
LOGGER.debug("Sending record {}", record);
this.producer.send(record, (metadata, exception) -> {
log.trace("Kafka client callback thread {}", Thread.currentThread());
log.debug("Sent record {} at offset {}", record, metadata.offset());
LOGGER.trace("Kafka client callback thread {}", Thread.currentThread());
LOGGER.debug("Sent record {} at offset {}", record, metadata.offset());
if (exception == null) {
promise.complete(metadata);
} else {
Expand All @@ -76,8 +75,8 @@ public CompletionStage<RecordMetadata> send(ProducerRecord<K, V> record) {
* @param record Kafka record to send
*/
public void sendIgnoreResult(ProducerRecord<K, V> record) {
log.trace("Send ignore result thread {}", Thread.currentThread());
log.debug("Sending record {}", record);
LOGGER.trace("Send ignore result thread {}", Thread.currentThread());
LOGGER.debug("Sending record {}", record);
this.producer.send(record);
}

Expand Down
Loading
Loading