Skip to content

Commit

Permalink
Filter out scalar Mono/Flux instances (#8571)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mateusz Rzeszutek authored May 26, 2023
1 parent ba4eea2 commit 262d771
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ public void resetOnEachOperator() {

private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift(
ReactorAsyncOperationEndStrategy asyncOperationEndStrategy) {
return Operators.lift(new Lifter<>(asyncOperationEndStrategy));
return Operators.lift(
ContextPropagationOperator::shouldInstrument, new Lifter<>(asyncOperationEndStrategy));
}

/** Forces Mono to run in traceContext scope. */
Expand Down Expand Up @@ -220,7 +221,12 @@ public reactor.util.context.Context apply(reactor.util.context.Context context)
}
}

public static class Lifter<T>
private static boolean shouldInstrument(Scannable publisher) {
// skip if Flux/Mono #just, #empty, #error
return !(publisher instanceof Fuseable.ScalarCallable);
}

private static class Lifter<T>
implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {

/** Holds reference to strategy to prevent it from being collected. */
Expand All @@ -233,10 +239,6 @@ public Lifter(ReactorAsyncOperationEndStrategy asyncOperationEndStrategy) {

@Override
public CoreSubscriber<? super T> apply(Scannable publisher, CoreSubscriber<? super T> sub) {
// if Flux/Mono #just, #empty, #error
if (publisher instanceof Fuseable.ScalarCallable) {
return sub;
}
return new TracingSubscriber<>(sub, sub.currentContext());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

class HooksTest {

Expand Down Expand Up @@ -43,4 +47,28 @@ public void subscribe(CoreSubscriber<? super Integer> actual) {
subscriber.set(actual);
}
}

@Test
void testInvalidBlockUsage() throws InterruptedException {
ContextPropagationOperator operator = ContextPropagationOperator.create();
operator.registerOnEachOperator();

Callable<String> callable =
() -> {
Mono.just("test1").block();
return "call1";
};

Disposable disposable =
Mono.defer(
() ->
Mono.fromCallable(callable).publishOn(Schedulers.elastic()).flatMap(Mono::just))
.subscribeOn(Schedulers.single())
.subscribe();

TimeUnit.MILLISECONDS.sleep(100);

disposable.dispose();
operator.resetOnEachOperator();
}
}

0 comments on commit 262d771

Please sign in to comment.