Skip to content

Commit

Permalink
Implement support for reactor-kafka 1.3+ (#8529)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mateusz Rzeszutek authored May 24, 2023
1 parent ac343b2 commit b6c612a
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ muzzle {
pass {
group.set("io.projectreactor.kafka")
module.set("reactor-kafka")
// TODO: add support for 1.3
versions.set("[1.0.0,1.3.0)")
versions.set("[1.0.0,)")
assertInverse.set(true)
}
}

dependencies {
compileOnly(project(":muzzle"))

compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")

Expand All @@ -20,28 +22,59 @@ dependencies {
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library"))
implementation(project(":instrumentation:reactor:reactor-3.1:library"))

library("io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE")
// using 1.3.0 to be able to implement several new KafkaReceiver methods added in 1.3.3
// @NoMuzzle is used to ensure that this does not break muzzle checks
compileOnly("io.projectreactor.kafka:reactor-kafka:1.3.3")

testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent"))
testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent"))

testImplementation("org.testcontainers:kafka")
testImplementation(project(":instrumentation:reactor:reactor-kafka-1.0:testing"))

testLibrary("io.projectreactor:reactor-test:3.1.0.RELEASE")
testLibrary("io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE")

latestDepTestLibrary("io.projectreactor:reactor-core:3.4.+")
// TODO: add support for 1.3
latestDepTestLibrary("io.projectreactor.kafka:reactor-kafka:1.2.+")
}

val testLatestDeps = findProperty("testLatestDeps") as Boolean

testing {
suites {
val testV1_3_3 by registering(JvmTestSuite::class) {
dependencies {
implementation(project(":instrumentation:reactor:reactor-kafka-1.0:testing"))

if (testLatestDeps) {
implementation("io.projectreactor.kafka:reactor-kafka:+")
implementation("io.projectreactor:reactor-core:3.4.+")
} else {
implementation("io.projectreactor.kafka:reactor-kafka:1.3.3")
}
}

targets {
all {
testTask.configure {
systemProperty("hasConsumerGroupAndId", true)
}
}
}
}
}
}

tasks {
test {
withType<Test>().configureEach {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)

jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
}

test {
systemProperty("hasConsumerGroupAndId", testLatestDeps)
}

check {
dependsOn(testing.suites)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import reactor.core.publisher.Flux;

// handles versions 1.3.+
public class ConsumerHandlerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("reactor.kafka.receiver.internals.ConsumerHandler");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("receive").and(returns(named("reactor.core.publisher.Flux"))),
this.getClass().getName() + "$ReceiveAdvice");
}

@SuppressWarnings("unused")
public static class ReceiveAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return(readOnly = false) Flux<?> flux) {
if (!(flux instanceof TracingDisablingKafkaFlux)) {
flux = new TracingDisablingKafkaFlux<>(flux);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,60 @@ public InstrumentedKafkaReceiver(KafkaReceiver<K, V> actual) {
this.actual = actual;
}

// added in 1.3.3
@Override
public Flux<ReceiverRecord<K, V>> receive(Integer prefetch) {
return wrap(KafkaReceiver133Access.receive(actual, prefetch));
}

@Override
public Flux<ReceiverRecord<K, V>> receive() {
return new InstrumentedKafkaFlux<>(actual.receive());
return wrap(actual.receive());
}

// added in 1.3.3
@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
return KafkaReceiver133Access.receiveAutoAck(actual, prefetch)
.map(InstrumentedKafkaReceiver::wrap);
}

@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() {
return actual.receiveAutoAck().map(InstrumentedKafkaFlux::new);
return actual.receiveAutoAck().map(InstrumentedKafkaReceiver::wrap);
}

// added in 1.3.3
@Override
public Flux<ConsumerRecord<K, V>> receiveAtmostOnce(Integer prefetch) {
return wrap(KafkaReceiver133Access.receiveAtmostOnce(actual, prefetch));
}

@Override
public Flux<ConsumerRecord<K, V>> receiveAtmostOnce() {
return new InstrumentedKafkaFlux<>(actual.receiveAtmostOnce());
return wrap(actual.receiveAtmostOnce());
}

@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(
TransactionManager transactionManager) {
return actual.receiveAutoAck().map(InstrumentedKafkaFlux::new);
return actual.receiveAutoAck().map(InstrumentedKafkaReceiver::wrap);
}

// added in 1.3.3
@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(
TransactionManager transactionManager, Integer prefetch) {
return KafkaReceiver133Access.receiveExactlyOnce(actual, transactionManager, prefetch)
.map(InstrumentedKafkaReceiver::wrap);
}

@Override
public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
return actual.doOnConsumer(function);
}

private static <K, V, R extends ConsumerRecord<K, V>> Flux<R> wrap(Flux<R> flux) {
return flux instanceof InstrumentedKafkaFlux ? flux : new InstrumentedKafkaFlux<>(flux);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;

import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.TransactionManager;

final class KafkaReceiver133Access {

@NoMuzzle
static <K, V> Flux<ReceiverRecord<K, V>> receive(KafkaReceiver<K, V> receiver, Integer prefetch) {
return receiver.receive(prefetch);
}

@NoMuzzle
static <K, V> Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(
KafkaReceiver<K, V> receiver, Integer prefetch) {
return receiver.receiveAutoAck(prefetch);
}

@NoMuzzle
static <K, V> Flux<ConsumerRecord<K, V>> receiveAtmostOnce(
KafkaReceiver<K, V> receiver, Integer prefetch) {
return receiver.receiveAtmostOnce(prefetch);
}

@NoMuzzle
static <K, V> Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(
KafkaReceiver<K, V> receiver, TransactionManager transactionManager, Integer prefetch) {
return receiver.receiveExactlyOnce(transactionManager, prefetch);
}

private KafkaReceiver133Access() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new KafkaReceiverInstrumentation(),
new ReceiverRecordInstrumentation(),
new DefaultKafkaReceiverInstrumentation());
new DefaultKafkaReceiverInstrumentation(),
new ConsumerHandlerInstrumentation());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,63 +5,25 @@

package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;

import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;

import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.SenderRecord;

public class ReactorKafkaInstrumentationTest extends AbstractReactorKafkaTest {

@RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();
class ReactorKafkaInstrumentationTest extends AbstractReactorKafkaTest {

@Test
void shouldCreateSpansForSingleRecordProcess() {
Disposable disposable =
receiver.receive().subscribe(record -> testing.runWithSpan("consumer", () -> {}));
cleanup.deferCleanup(disposable::dispose);

SenderRecord<String, String, Object> record =
SenderRecord.create("testTopic", 0, null, "10", "testSpan", null);
Flux<?> producer = sender.send(Flux.just(record));
testing.runWithSpan("producer", () -> producer.blockLast(Duration.ofSeconds(30)));

AtomicReference<SpanData> producerSpan = new AtomicReference<>();
void testReceive() {
testSingleRecordProcess(recordConsumer -> receiver.receive().subscribe(recordConsumer));
}

testing.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer"),
span ->
span.hasName("testTopic send")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(sendAttributes(record)));
@Test
void testReceiveAutoAck() {
testSingleRecordProcess(
recordConsumer ->
receiver.receiveAutoAck().subscribe(records -> records.subscribe(recordConsumer)));
}

producerSpan.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("testTopic receive")
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasAttributesSatisfyingExactly(receiveAttributes("testTopic")),
span ->
span.hasName("testTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasAttributesSatisfyingExactly(processAttributes(record)),
span -> span.hasName("consumer").hasParent(trace.getSpan(1))));
@Test
void testReceiveAtMostOnce() {
testSingleRecordProcess(
recordConsumer -> receiver.receiveAtmostOnce().subscribe(recordConsumer));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;

import org.junit.jupiter.api.Test;

class ReactorKafka133InstrumentationTest extends AbstractReactorKafkaTest {

@Test
void testReceive() {
testSingleRecordProcess(recordConsumer -> receiver.receive(1).subscribe(recordConsumer));
}

@Test
void testReceiveAutoAck() {
testSingleRecordProcess(
recordConsumer ->
receiver.receiveAutoAck(1).subscribe(records -> records.subscribe(recordConsumer)));
}

@Test
void testReceiveAtMostOnce() {
testSingleRecordProcess(
recordConsumer -> receiver.receiveAtmostOnce(1).subscribe(recordConsumer));
}
}
10 changes: 10 additions & 0 deletions instrumentation/reactor/reactor-kafka-1.0/testing/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
plugins {
id("otel.java-conventions")
}

dependencies {
api(project(":testing-common"))

compileOnly("io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE")
implementation("org.testcontainers:kafka")
}
Loading

0 comments on commit b6c612a

Please sign in to comment.