Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for reactor-kafka 1.3+ #8529

Merged
merged 2 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ muzzle {
pass {
group.set("io.projectreactor.kafka")
module.set("reactor-kafka")
// TODO: add support for 1.3
versions.set("[1.0.0,1.3.0)")
versions.set("[1.0.0,)")
assertInverse.set(true)
}
}

dependencies {
compileOnly(project(":muzzle"))

compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")

Expand All @@ -20,28 +22,59 @@ dependencies {
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library"))
implementation(project(":instrumentation:reactor:reactor-3.1:library"))

library("io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE")
// using 1.3.0 to be able to implement several new KafkaReceiver methods added in 1.3.3
// @NoMuzzle is used to ensure that this does not break muzzle checks
compileOnly("io.projectreactor.kafka:reactor-kafka:1.3.3")

testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent"))
testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent"))

testImplementation("org.testcontainers:kafka")
testImplementation(project(":instrumentation:reactor:reactor-kafka-1.0:testing"))

testLibrary("io.projectreactor:reactor-test:3.1.0.RELEASE")
testLibrary("io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE")

latestDepTestLibrary("io.projectreactor:reactor-core:3.4.+")
// TODO: add support for 1.3
latestDepTestLibrary("io.projectreactor.kafka:reactor-kafka:1.2.+")
}

val testLatestDeps = findProperty("testLatestDeps") as Boolean

testing {
suites {
val testV1_3_3 by registering(JvmTestSuite::class) {
dependencies {
implementation(project(":instrumentation:reactor:reactor-kafka-1.0:testing"))

if (testLatestDeps) {
implementation("io.projectreactor.kafka:reactor-kafka:+")
implementation("io.projectreactor:reactor-core:3.4.+")
} else {
implementation("io.projectreactor.kafka:reactor-kafka:1.3.3")
}
}

targets {
all {
testTask.configure {
systemProperty("hasConsumerGroupAndId", true)
}
}
}
}
}
}

tasks {
test {
withType<Test>().configureEach {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)

jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
}

test {
systemProperty("hasConsumerGroupAndId", testLatestDeps)
}

check {
dependsOn(testing.suites)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import reactor.core.publisher.Flux;

// handles versions 1.3.+
public class ConsumerHandlerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("reactor.kafka.receiver.internals.ConsumerHandler");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("receive").and(returns(named("reactor.core.publisher.Flux"))),
this.getClass().getName() + "$ReceiveAdvice");
}

@SuppressWarnings("unused")
public static class ReceiveAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return(readOnly = false) Flux<?> flux) {
if (!(flux instanceof TracingDisablingKafkaFlux)) {
flux = new TracingDisablingKafkaFlux<>(flux);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,60 @@ public InstrumentedKafkaReceiver(KafkaReceiver<K, V> actual) {
this.actual = actual;
}

// added in 1.3.3
@Override
public Flux<ReceiverRecord<K, V>> receive(Integer prefetch) {
return wrap(KafkaReceiver133Access.receive(actual, prefetch));
}

@Override
public Flux<ReceiverRecord<K, V>> receive() {
return new InstrumentedKafkaFlux<>(actual.receive());
return wrap(actual.receive());
}

// added in 1.3.3
@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
return KafkaReceiver133Access.receiveAutoAck(actual, prefetch)
.map(InstrumentedKafkaReceiver::wrap);
}

@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() {
return actual.receiveAutoAck().map(InstrumentedKafkaFlux::new);
return actual.receiveAutoAck().map(InstrumentedKafkaReceiver::wrap);
}

// added in 1.3.3
@Override
public Flux<ConsumerRecord<K, V>> receiveAtmostOnce(Integer prefetch) {
return wrap(KafkaReceiver133Access.receiveAtmostOnce(actual, prefetch));
}

@Override
public Flux<ConsumerRecord<K, V>> receiveAtmostOnce() {
return new InstrumentedKafkaFlux<>(actual.receiveAtmostOnce());
return wrap(actual.receiveAtmostOnce());
}

@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(
TransactionManager transactionManager) {
return actual.receiveAutoAck().map(InstrumentedKafkaFlux::new);
return actual.receiveAutoAck().map(InstrumentedKafkaReceiver::wrap);
}

// added in 1.3.3
@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(
TransactionManager transactionManager, Integer prefetch) {
return KafkaReceiver133Access.receiveExactlyOnce(actual, transactionManager, prefetch)
.map(InstrumentedKafkaReceiver::wrap);
}

@Override
public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
return actual.doOnConsumer(function);
}

private static <K, V, R extends ConsumerRecord<K, V>> Flux<R> wrap(Flux<R> flux) {
return flux instanceof InstrumentedKafkaFlux ? flux : new InstrumentedKafkaFlux<>(flux);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;

import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.TransactionManager;

final class KafkaReceiver133Access {

@NoMuzzle
static <K, V> Flux<ReceiverRecord<K, V>> receive(KafkaReceiver<K, V> receiver, Integer prefetch) {
return receiver.receive(prefetch);
}

@NoMuzzle
static <K, V> Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(
KafkaReceiver<K, V> receiver, Integer prefetch) {
return receiver.receiveAutoAck(prefetch);
}

@NoMuzzle
static <K, V> Flux<ConsumerRecord<K, V>> receiveAtmostOnce(
KafkaReceiver<K, V> receiver, Integer prefetch) {
return receiver.receiveAtmostOnce(prefetch);
}

@NoMuzzle
static <K, V> Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(
KafkaReceiver<K, V> receiver, TransactionManager transactionManager, Integer prefetch) {
return receiver.receiveExactlyOnce(transactionManager, prefetch);
}

private KafkaReceiver133Access() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new KafkaReceiverInstrumentation(),
new ReceiverRecordInstrumentation(),
new DefaultKafkaReceiverInstrumentation());
new DefaultKafkaReceiverInstrumentation(),
new ConsumerHandlerInstrumentation());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,63 +5,25 @@

package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;

import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;

import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.SenderRecord;

public class ReactorKafkaInstrumentationTest extends AbstractReactorKafkaTest {

@RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();
class ReactorKafkaInstrumentationTest extends AbstractReactorKafkaTest {

@Test
void shouldCreateSpansForSingleRecordProcess() {
Disposable disposable =
receiver.receive().subscribe(record -> testing.runWithSpan("consumer", () -> {}));
cleanup.deferCleanup(disposable::dispose);

SenderRecord<String, String, Object> record =
SenderRecord.create("testTopic", 0, null, "10", "testSpan", null);
Flux<?> producer = sender.send(Flux.just(record));
testing.runWithSpan("producer", () -> producer.blockLast(Duration.ofSeconds(30)));

AtomicReference<SpanData> producerSpan = new AtomicReference<>();
void testReceive() {
testSingleRecordProcess(recordConsumer -> receiver.receive().subscribe(recordConsumer));
}

testing.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer"),
span ->
span.hasName("testTopic send")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(sendAttributes(record)));
@Test
void testReceiveAutoAck() {
testSingleRecordProcess(
recordConsumer ->
receiver.receiveAutoAck().subscribe(records -> records.subscribe(recordConsumer)));
}

producerSpan.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("testTopic receive")
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasAttributesSatisfyingExactly(receiveAttributes("testTopic")),
span ->
span.hasName("testTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasAttributesSatisfyingExactly(processAttributes(record)),
span -> span.hasName("consumer").hasParent(trace.getSpan(1))));
@Test
void testReceiveAtMostOnce() {
testSingleRecordProcess(
recordConsumer -> receiver.receiveAtmostOnce().subscribe(recordConsumer));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;

import org.junit.jupiter.api.Test;

class ReactorKafka133InstrumentationTest extends AbstractReactorKafkaTest {

@Test
void testReceive() {
testSingleRecordProcess(recordConsumer -> receiver.receive(1).subscribe(recordConsumer));
}

@Test
void testReceiveAutoAck() {
testSingleRecordProcess(
recordConsumer ->
receiver.receiveAutoAck(1).subscribe(records -> records.subscribe(recordConsumer)));
}

@Test
void testReceiveAtMostOnce() {
testSingleRecordProcess(
recordConsumer -> receiver.receiveAtmostOnce(1).subscribe(recordConsumer));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
plugins {
id("otel.java-conventions")
}

dependencies {
api(project(":testing-common"))

compileOnly("io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE")
implementation("org.testcontainers:kafka")
}
Loading