Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer not reading messages when batching is disabled #271

Open
chriscameron-vertexinc opened this issue Jul 31, 2024 · 1 comment
Open

Comments

@chriscameron-vertexinc
Copy link
Contributor

chriscameron-vertexinc commented Jul 31, 2024

Hello!

I'm using a Key_Shared topic to ensure no two consumers receive messages with the same key.

To avoid collisions when scaling consumers up/down I'm trying to set my receiver queue size to 0. I don't want to have any prefetch messages, I'd like to make sure I'm only ever consuming one message at a time.

When I produce messages to my topic the consumers throw the following exceptions:

[20:18:24 Error]
consumer(0, b4e1c, -1) Closing consumer due to unsupported received batch-message with zero receiver queue size
[20:18:24 Error]
consumer(0, b4e1c, -1) Batch reading exception 42:0:-1
Pulsar.Client.Api.InvalidMessageException: Unsupported Batch message with 0 size receiver queue for [CNR.ExampleSubscriber.API]-[]
   at Pulsar.Client.Api.ZeroQueueConsumerImpl`1.ReceiveIndividualMessagesFromBatch(RawMessage _arg2, FSharpFunc`2 _arg1) in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Internal\ConsumerImpl.fs:line 1699
   at <StartupCode$Pulsar-Client>.$ConsumerImpl.clo@727-101.Invoke(Unit unitVar0) in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Internal\ConsumerImpl.fs:line 728
   at Pulsar.Client.Common.Tools.wrapException[a](FSharpFunc`2 f) in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Common\Tools.fs:line 171
[20:18:25 Information]
consumer(0, b4e1c, -1) Message 42:0:-1 was discarded due to BatchDeSerializeError
[20:18:25 Information]
consumer(0, b4e1c, -1) starting close
[20:18:25 Information]
consumer(0, b4e1c, -1) UnackedTracker mailbox has stopped normally
[20:18:25 Information]
consumer(0, b4e1c, -1) GroupingTracker mailbox has stopped normally
[20:18:25 Information]
consumer(0, b4e1c, -1) NegativeTracker mailbox has stopped normally
[20:18:25 Information]
consumer(0, b4e1c, -1) ConnectionHandler ConnectionHandler mailbox has stopped normally
[20:18:25 Error]
Failed to receive message
Pulsar.Client.Api.AlreadyClosedException: Consumer is already closed
   at <StartupCode$Pulsar-Client>.$ConsumerImpl.ReceiveWrappedAsync@1437.MoveNext() in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Internal\ConsumerImpl.fs:line 1439
   at Pulsar.Client.Common.Tools.reraize[a](Exception ex) in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Common\Tools.fs:line 67
   at <StartupCode$Pulsar-Client>.$ConsumerImpl.Pulsar-Client-Api-IConsumer<'T>-ReceiveAsync@1707-3.MoveNext() in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Internal\ConsumerImpl.fs:line 1712
   at CNR.Common.Eventing.Pulsar.Subscribers.PulsarEventSubscriber`1.ExecuteAsync(CancellationToken stoppingToken) in C:\Workspace\cnr-common-eventing\src\CNR.Common.Eventing.Pulsar\Subscribers\PulsarEventSubscriber.cs:line 76
[20:18:25 Information]
consumer(0, b4e1c, -1) stopped
[20:18:25 Information]
consumer(0, b4e1c, -1) mailbox has stopped normally

Ok, so we can't read batched messages when only handling 1 message at a time.

When I disable batching on the producer side my consumer no-longer receives any messages at all. When I check pulsar-admin I can see my msgInCounter incrementing, but msgOutCounter never goes up.

As a workaround I've had to enable batching on the producer, and set the receive queue size to 1. Is this going to be adequate for my use case?

@Lanayx
Copy link
Member

Lanayx commented Jul 31, 2024

Reading messages using zero queue is a dedicated supported case (although batched messages are not supported following Java client). Pulsar.Client even has a set of tests specially for this type of consumer. You are welcome to create a PR with a failing test to fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants