Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request #22377

Merged
merged 1 commit into from
Mar 28, 2024

Conversation

coderzc
Copy link
Member

@coderzc coderzc commented Mar 28, 2024

Motivation

When the consumer uses AUTO_CONSUME to subscribe to PatternTopic, we get the Field 'topic' is not set error. The root cause is that we incorrectly used the commandGetSchema variable, which is a shared variable in the thread, but we're passing it on to another thread, this is unsafe since next request will clear this command data, this case only occurs when the topic does not exist or the scheme does not exist when request GetSchema. This issue Introduced from #20932

java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException: {"errorMsg":"java.lang.IllegalStateException: Field 'topic' is not set","reqId":3644652631296588088, "remote":"localhost/127.0.0.1:62361", "local":"/127.0.0.1:62369"}
	at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1152) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
	at org.apache.pulsar.client.impl.ClientCnx.handleGetSchemaResponse(ClientCnx.java:970) ~[classes/:?]
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:361) ~[classes/:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[netty-handler-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.105.Final.jar:4.1.105.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.105.Final.jar:4.1.105.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException: {"errorMsg":"java.lang.IllegalStateException: Field 'topic' is not set","reqId":3644652631296588088, "remote":"localhost/127.0.0.1:62361", "local":"/127.0.0.1:62369"}
	at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1350) ~[classes/:?]
	at org.apache.pulsar.client.impl.ClientCnx.lambda$sendGetSchema$15(ClientCnx.java:1047) ~[classes/:?]
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150) ~[?:?]
	... 29 more

Modifications

Do not pass commandGetSchema to another thread instance of export the topic field for logging.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Mar 28, 2024
@Technoboy- Technoboy- added this to the 3.3.0 milestone Mar 28, 2024
@Technoboy- Technoboy- changed the title [fix][bug] Fix issue of Field 'topic' is not set when handle GetSchema request [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request Mar 28, 2024
@Technoboy- Technoboy- closed this Mar 28, 2024
@Technoboy- Technoboy- reopened this Mar 28, 2024
Copy link
Member

@shibd shibd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/LGTM

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM Good catch!

@lhotari lhotari merged commit d8903da into apache:master Mar 28, 2024
50 checks passed
lhotari pushed a commit that referenced this pull request Mar 28, 2024
lhotari pushed a commit that referenced this pull request Mar 28, 2024
lhotari pushed a commit that referenced this pull request Mar 28, 2024
lhotari pushed a commit that referenced this pull request Mar 28, 2024
…hema request (#22377)

(cherry picked from commit d8903da)

# Conflicts:
#	pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
lhotari pushed a commit that referenced this pull request Mar 28, 2024
…hema request (#22377)

(cherry picked from commit d8903da)

# Conflicts:
#	pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
Technoboy- pushed a commit to Technoboy-/pulsar that referenced this pull request Apr 1, 2024
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 3, 2024
…hema request (apache#22377)

(cherry picked from commit d8903da)
(cherry picked from commit 585fc54)
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 4, 2024
…hema request (apache#22377)

(cherry picked from commit d8903da)
(cherry picked from commit 585fc54)
nodece pushed a commit to ascentstream/pulsar that referenced this pull request May 13, 2024
…hema request (apache#22377)

(cherry picked from commit d8903da)

# Conflicts:
#	pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants