Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only one connection receive subscriber allowed. #1854

Closed
violetagg opened this issue Feb 22, 2021 · 1 comment
Closed

Only one connection receive subscriber allowed. #1854

violetagg opened this issue Feb 22, 2021 · 1 comment

Comments

@violetagg
Copy link

Describe the bug
The exception below is observed when using spring-cloud-sleuth

2021-02-22 15:12:25.427 ERROR [,77249f740f448dff,77249f740f448dff] 18936 --- [ctor-http-nio-2] reactor.core.publisher.Operators         : Operator called default onErrorDropped

java.lang.IllegalStateException: Only one connection receive subscriber allowed.
	at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:180) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:144) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:392) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:140) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:140) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:162) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.MonoCollect$CollectSubscriber.onError(MonoCollect.java:144) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.Operators.error(Operators.java:196) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:164) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:67) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:168) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:144) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.web.client.TraceExchangeFilterFunction$TraceWebClientSubscriber.onNext(TraceWebClientBeanPostProcessor.java:220) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at org.springframework.cloud.sleuth.instrument.web.client.TraceExchangeFilterFunction$TraceWebClientSubscriber.onNext(TraceWebClientBeanPostProcessor.java:183) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2359) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:74) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:245) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:67) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]

the double subscription happens with onError:

	at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:180) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:144) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:392) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]

and onNext

	at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:168) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:144) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340) ~[reactor-netty-core-1.0.3.jar:1.0.3]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.2.jar:3.4.2]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.2.jar:3.4.2]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:88) ~[spring-cloud-sleuth-instrumentation-3.0.1.jar:3.0.1]

The issue Only one connection receive subscriber allowed. is reported as part of the memory leak issue reactor/reactor-netty#1513. However this exception is not related to the memory leak issue.

Sample
In the example https://github.com/shj95/webflux-leak-test that is provided with reactor/reactor-netty#1513, add a dependency implementation 'org.springframework.cloud:spring-cloud-starter-sleuth:3.0.1', then use the provided jmeter script to reproduce it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants