Skip to content

Commit 74e5956

Browse files
garyrussellartembilan
authored andcommitted
More tracing and debug logging
- Add trace log for successfull send - Add debug log for failed send (debug because the user code should react to the future) - Add trace logging for records received (topic-partition@offset) (container) - Add debug logging for records received (topic-partition@offset) (test utils)
1 parent 9d3e326 commit 74e5956

File tree

3 files changed

+21
-1
lines changed

3 files changed

+21
-1
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.HashMap;
2222
import java.util.Map;
23+
import java.util.stream.Collectors;
2324

2425
import org.apache.commons.logging.Log;
2526
import org.apache.commons.logging.LogFactory;
@@ -163,7 +164,12 @@ public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, l
163164
logger.debug("Polling...");
164165
ConsumerRecords<K, V> received = consumer.poll(timeout);
165166
if (logger.isDebugEnabled()) {
166-
logger.debug("Received: " + received.count());
167+
logger.debug("Received: " + received.count() + ", "
168+
+ received.partitions().stream()
169+
.flatMap(p -> received.records(p).stream())
170+
// map to same format as send metadata toString()
171+
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
172+
.collect(Collectors.toList()));
167173
}
168174
assertThat(received).as("null received from consumer.poll()").isNotNull();
169175
return received;

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,9 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
346346
KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
347347
producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
348348
}
349+
if (KafkaTemplate.this.logger.isTraceEnabled()) {
350+
KafkaTemplate.this.logger.trace("Sent ok: " + producerRecord + ", metadata: " + metadata);
351+
}
349352
}
350353
else {
351354
future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
@@ -356,6 +359,9 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
356359
producerRecord.value(),
357360
exception);
358361
}
362+
if (KafkaTemplate.this.logger.isDebugEnabled()) {
363+
KafkaTemplate.this.logger.debug("Failed to send: " + producerRecord, exception);
364+
}
359365
}
360366
}
361367
finally {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.ConcurrentMap;
3333
import java.util.concurrent.LinkedBlockingQueue;
3434
import java.util.concurrent.ScheduledFuture;
35+
import java.util.stream.Collectors;
3536

3637
import org.apache.commons.logging.Log;
3738
import org.apache.commons.logging.LogFactory;
@@ -657,6 +658,13 @@ public void run() {
657658
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
658659
if (records != null && this.logger.isDebugEnabled()) {
659660
this.logger.debug("Received: " + records.count() + " records");
661+
if (records.count() > 0 && this.logger.isTraceEnabled()) {
662+
this.logger.trace(records.partitions().stream()
663+
.flatMap(p -> records.records(p).stream())
664+
// map to same format as send metadata toString()
665+
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
666+
.collect(Collectors.toList()));
667+
}
660668
}
661669
if (records != null && records.count() > 0) {
662670
if (this.containerProperties.getIdleEventInterval() != null) {

0 commit comments

Comments
 (0)