Skip to content

rogervinas/spring-cloud-sleuth-in-action

Repository files navigation

CI Java Kotlin SpringBoot SpringCloud

Spring Cloud Sleuth in action

Spring Cloud Sleuth is the solution for distributed tracing provided by Spring and comes with a bunch of useful integrations out of the box

I've created this spring-cloud-sleuth-in-action sample to use some of these integrations executing the following flow:

Demo

To keep it simple everything will be executed within the same Spring Boot Application but at the end it is the same as if it was splitted between different services

Demo time!

Let's follow these steps to execute the demo:

  • Run docker-compose:
docker-compose up -d
  • Start the Spring Boot Application:
./gradlew bootRun
  • Consume from the Kafka topic my.topic with kcat:
kcat -b localhost:9094 -C -t my.topic -f '%h %s\n'
  • Execute a request to the first endpoint with curl or any other tool you like:
curl http://localhost:8080/request1?payload=hello \
  -H 'X-B3-TraceId: aaaaaa1234567890' \
  -H 'X-B3-SpanId: bbbbbb1234567890'

Note: the default format for context propagation is B3, so we use headers X-B3-TraceId and X-B3-SpanId

  • Check application output, all lines should share the same traceId
Started MyApplicationKt in 44.739 seconds (JVM running for 49.324) - traceId ? spanId ? - main
>>> RestRequest1 hello  - traceId aaaaaa1234567890 spanId cf596e6281432fb9 - http-nio-8080-exec-7
>>> KafkaProducer hello - traceId aaaaaa1234567890 spanId cf596e6281432fb9 - http-nio-8080-exec-7
>>> KafkaConsumer hello - traceId aaaaaa1234567890 spanId 91e1b6b37334620c - KafkaConsumerDestination...
>>> RestRequest2 hello  - traceId aaaaaa1234567890 spanId a1ac0233664f5249 - http-nio-8080-exec-8
>>> RestRequest3 hello  - traceId aaaaaa1234567890 spanId bf384c3b4d97efe9 - http-nio-8080-exec-9
>>> RestRequest4 hello  - traceId aaaaaa1234567890 spanId c84470ce03e993f1 - http-nio-8080-exec-1
>>> AsyncService hello  - traceId aaaaaa1234567890 spanId acccead477b4e1c8 - task-3
  • Check kcat output:
b3=aaaaaa1234567890-331986280d41ccdc-1,
nativeHeaders={"b3":["aaaaaa1234567890-331986280d41ccdc-1"]},
contentType=application/json,
spring_json_header_types={
    "b3":"java.lang.String",
    "nativeHeaders":"org.springframework.util.LinkedMultiValueMap",
    "contentType":"java.lang.String"
}
hello

Zipkin

  • Stop the Spring Boot Application just with CTRL-C

  • Stop docker-compose:

docker-compose down

Show me the code!

This demo was created using this spring initializr configuration

Just adding the sleuth dependency will enable tracing by default in any of the supported integrations, so as you will see no extra coding is needed (maybe only a few exceptions)

Logging

We need to add traceId and spanId values to the application log. In production we would use the logstash-logback-encoder to generate logs in JSON format and send them to an ELK but for the demo we use this plain text logback layout:

<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%msg - traceId %X{traceId:-?} spanId %X{spanId:-?} - %thread%n</pattern>
        </encoder>
    </appender>
    <root level="ERROR">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>

RestController

Create your @RestController as usual

@RestController
class MyRestController {

  companion object {
    private val LOGGER = LoggerFactory.getLogger(MyRestController::class.java)
  }

  @GetMapping("/request1")
  fun request1(@RequestParam("payload") payload: String): String {
    LOGGER.info(">>> RestRequest1 $payload")
    // do more stuff
    return "ok"
  }
}

Kafka Producer & Consumer

We have a few alternatives to propagate tracing information when publishing to kafka

For example, we can use Spring for Apache Kafka and create a KafkaProducer or KafkaConsumer using the autoconfigured KafkaProducerFactory or KafkaConsumerFactory. We can use the autoconfigured KafkaTemplate too

In this demo we use Spring Cloud Stream and Reactive Functions Support

  • Configure binding and function definitions:

    spring:
      cloud:
        stream:
          kafka:
            binder:
              brokers: "localhost:9094"
          bindings:
            consumer-in-0:
              group: ${spring.application.name}
              destination: "my.topic"
            producer-out-0:
              destination: "my.topic"
        function:
          definition: consumer;producer
  • The consumer is just a @Bean implementing a lambda consuming a <Message<PAYLOAD>>:

    @Component("consumer")
    class MyKafkaConsumer: (Message<String>) -> Unit {
    
      companion object {
        private val LOGGER = LoggerFactory.getLogger(MyKafkaConsumer::class.java)
      }
    
      override fun invoke(message: Message<String>) {
        LOGGER.info(">>> KafkaConsumer ${message.payload}")
        // do more stuff
      }
    }
  • The producer is just a @Bean implementing a lambda producing a Flux<Message<PAYLOAD>>:

    In this case we have to use MessagingSleuthOperators helper methods in order to preserve the tracing context when using reactive stream functions

    @Component("producer")
    class MyKafkaProducer(private val beanFactory: BeanFactory) : () -> Flux<Message<String>> {
    
      companion object {
        private val LOGGER = LoggerFactory.getLogger(MyKafkaProducer::class.java)
      }
    
      private val sink = Sinks.many().unicast().onBackpressureBuffer<Message<String>>()
    
      fun produce(payload: String) {
        LOGGER.info(">>> KafkaProducer $payload")
        sink.emitNext(createMessageWithTracing(payload), FAIL_FAST)
      }
    
      private fun createMessageWithTracing(payload: String): Message<String> {
        return MessagingSleuthOperators.handleOutputMessage(
          beanFactory,
          MessagingSleuthOperators.forInputMessage(beanFactory, GenericMessage(payload))
        )
      }
    
      override fun invoke() = sink.asFlux()
    }

RestTemplate

Just create a RestTemplate @Bean and inject it wherever is needed

@Configuration
class MyConfiguration {
  @Bean
  fun restTemplate() = RestTemplate()
}

FeignClient

Just declare the @FeignClient as usual

@SpringBootApplication
@EnableFeignClients
class MyApplication

@FeignClient(name = "request3", url = "http://localhost:\${server.port}")
interface MyFeignClient {
  @RequestMapping(method = [RequestMethod.GET], path = ["/request3"])
  fun request3(@RequestParam("payload") payload: String) : String
}

WebClient

Just create a WebClient @Bean and inject it wherever is needed

@Configuration
class MyConfiguration {
  @Bean
  fun webClient() = WebClient.create()
}

Async

Just annotate the method with @Async as usual. Tracing context will be preserved between threads

@SpringBootApplication
@EnableAsync
class MyApplication

@Service
class MyAsyncService {

  companion object {
    private val LOGGER = LoggerFactory.getLogger(MyAsyncService::class.java)
  }

  @Async   
  fun execute(payload: String): CompletableFuture<String> {
    LOGGER.info(">>> AsyncService $payload")
    return CompletableFuture.completedFuture("ok")
  }
}

Zipkin

In production we would send to zipkin a small percentage of all the traces (sampling) but for the demo we will send all of them:

spring:
  sleuth:
    sampler:
      probability: 1.0
  zipkin:
    base-url: "http://localhost:9411"

Test

One easy way to test the demo is running a SpringBootTest with an OutputCaptureExtension and verify that all logs contain the expected traceId and spanId values:

@SpringBootTest(webEnvironment = DEFINED_PORT)
@Testcontainers
@ExtendWith(OutputCaptureExtension::class)
class MyApplicationIntegrationTest {
  @Test
  fun `should propagate tracing`(log: CapturedOutput) {
    val traceId = "edb77ece416b3196"
    val spanId = "c58ac2aa66d238b9"

    val response = request1(traceId, spanId)

    assertThat(response.statusCode).isEqualTo(OK)
    assertThat(response.body).isEqualTo("ok")

    val logLines = await()
      .atMost(TEN_SECONDS)
      .pollDelay(ONE_SECOND)
      .until({ parseLogLines(log) }, { it.size >= 7 })

    assertThatLogLineContainsMessageAndTraceId(logLines[0], "RestRequest1 hello", traceId)
    assertThatLogLineContainsMessageAndTraceId(logLines[1], "KafkaProducer hello", traceId)
    assertThatLogLineContainsMessageAndTraceId(logLines[2], "KafkaConsumer hello", traceId)
    assertThatLogLineContainsMessageAndTraceId(logLines[3], "RestRequest2 hello", traceId)
    assertThatLogLineContainsMessageAndTraceId(logLines[4], "RestRequest3 hello", traceId)
    assertThatLogLineContainsMessageAndTraceId(logLines[5], "RestRequest4 hello", traceId)
    assertThatLogLineContainsMessageAndTraceId(logLines[6], "AsyncService hello", traceId)
  }
}

Run test with ./gradlew test

That's it! Happy coding! πŸ’™