-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MQTT5 client, unable to get data, seems like store gets corrupted #112
Comments
"class com.baidu.bifromq.basekv.proto.Boundary cannot be cast to class java.lang.Comparable (com.baidu.bifromq.basekv.proto.Boundary is in unnamed module of loader 'app'" |
@Gujiawei-Edinburgh, we don't use com.baidu.bifromq.basekv.proto.Boundary data structure in our plugin, here the github link for the plugin, its pretty rudimentary:https://github.com/saraheem/Bifromqplugin Do your all topics meet the situation or part of them will be printed with this error? Here's our standalone.yml file: bootstrap: true # deprecated since 3.3.0 authProviderFQN: "com.baidu.demo.plugin.DemoAuthProvider"resourceThrottlerFQN: "com.baidu.demo.plugin.DemoResourceThrottler"settingProviderFQN: "com.baidu.demo.plugin.DemoSettingProvider"authProviderFQN: "com.hyperdata.bifromq.plugin.HdcBifromqPluginAuthProvider" tcpListener: tlsListener: clusterConfig: |
Thanks for providing details. I think there may be some issues. I will try your plugin code to do some tests. |
@saraheem I follow your reproduce steps with latest version of docker image and default authProvider plugin, but I did not find the retain failure. I think the plugin has nothing to do with the BifroMQ retaining messages. My test script is: import time
import paho.mqtt.client as mqtt
# MQTT broker details
BROKER = "127.0.0.1"
PORT = 1883
USERNAME = "dev"
PASSWORD = "dev"
# Topic base and message count
TOPIC_BASE = "test/retain/"
MESSAGE_COUNT = 10_001
# Callback for when the client connects to the broker
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected successfully to MQTT broker")
else:
print(f"Failed to connect, return code {rc}")
# Initialize the MQTT client
client = mqtt.Client()
# Set username and password
client.username_pw_set(USERNAME, PASSWORD)
# Attach the on_connect callback
client.on_connect = on_connect
# Connect to the broker
client.connect(BROKER, PORT)
# Start the loop in a separate thread to process network events
client.loop_start()
try:
for i in range(MESSAGE_COUNT):
topic = f"{TOPIC_BASE}{i}"
message = f"Message {i}"
client.publish(topic, message, retain=True)
print(f"Published message to {topic}")
time.sleep(0.1) # Delay of 0.1 seconds after each publish
print("All messages published successfully.")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Stop the loop and disconnect the client
client.loop_stop()
client.disconnect() By add 10,000 retained messages, I can get retained messages from BifroMQ through subscription. |
Following the current discussion, We enabled the detailed logs in our plugin. We noticed following event messages that might causing this issue. CLIENT_CHANNEL_ERROR Background: SessionInboxSize These changes resolved the retained message issue. However, a new challenge arose: after publishing a few thousand records, clients were unable to publish additional messages on retained or non-retained topics. When we changed [ClientID], the issue temporarily resolved but then recurred. Given our requirements, we need to configure certain parameters with significantly large or infinite values to enable our clients to send unlimited messages on non-retained topics. This is critical, as some of our services run continuously and need to publish responses at a rate of approximately 100 non-retained messages per second. If you guys can suggest us the relevant parameters that can cater our requirements, it would be much appreciated. |
From the error events, it seems you have enabled persistent sessions (cleansession=false) for your SUB clients. In persistent mode, there is an upper limit (controlled by the SessionInboxSize setting, which defaults to 1000) for undelivered messages. When the session inbox queue is full, new messages will be dropped. The fullness of the inbox queue depends on: Could you provide some reproducible commands using publicly available tools like |
@popduke Clustor Info: docker run -d --name bifromq -p 1883:1883 -p 1884:1884 -p 8091:8091 -v /home/root/bifromq/conf:/home/bifromq/conf -v /home/root/bifromq/data:/home/bifromq/data -v /home/root/bifromq/plugins:/home/bifromq/plugins -v /home/root/bifromq/logs:/home/bifromq/logs -e EXTRA_JVM_OPTS="-Dplugin.hdcauthprovider.url=https://localhost/Auth -DMaxTopicFiltersPerSub=100 -DMaxTopicFiltersPerInbox=10000 -DRetainMessageMatchLimit=10000 -DSessionInboxSize=10000 -DMaxSessionExpirySeconds=3600 -DDForceTransient=true -DMsgPubPerSec=1000 -DReceivingMaximum=10000" bifromq/bifromq:latest MQTT Client: Connection Details: for this topic, "Tenant/1/Centre/3/Batch/4", there would be thousands of batches, and each will have their own topics (non-retained). Pub/Sub on this topic would be more frequent. Pub/Sub rate: So, What we are looking for is that we need to configure SessionInbox to like infinite/dynamic as it varies on pub/sub. Plus, Our Clients should subscribe to retained topics and publish/subscribe to non-retained topics without any barrier or limit blockers. Kindly suggest us the the relevant parameters according to our requirements. |
@MadniRaza Additionally, if you could provide code to reproduce the issue, similar to the previous Python script, it would be greatly appreciated |
Persistent Sessions involve disk I/O for every pub/sub operation. Therefore, they should not be expected to achieve the same level of throughput as their transient counterparts (cleanSession=true) given the same allocated resources. There are still some missing details regarding your workload and the specifications of the resources provisioned.
How many CPUs are allocated per container? And how many container instances are present in your BifroMQ cluster? What is the type of disk storage?
Providing the MQTT client library and parameter descriptions is insufficient for troubleshooting due to potential communication imprecisions. Moreover, mixed workloads in your scenario further complicate the analysis. This is why we require reproducible commands (with arguments reflecting your use case) using public, fully MQTT-compliant stress tools like
Your combination of retained messages and persistent sessions is quite confusing. In a persistent session, subscriptions are also persisted. Why do your subscribing clients with persistent sessions frequently unsubscribe and resubscribe to retained topics? Can you elaborate your user scenario?
Tuning overall performance requires a systematic analysis. It’s not just a matter of maximizing some configs, especially when workloads involve persistent I/O internally. |
@popduke Subscribing to events, I am getting this event Qos0Dropped with Reason = Overflow. What does Reason Oveflow imply? |
Overflow means broker cannot send out any messages to the SUB client, because the underlying channel(Netty) is unwritable. |
I saw in the source code that overflow happens when underlying channel is unwritable, however under what circumstances would the underlying channel be unwritable.
We are testing out on a single instance of bifromq with a few client connections, could running single instance be the problem?
Sent from my iPhone
On Jan 7, 2025, at 6:36 AM, Yonny(Yu) Hao ***@***.***> wrote:
Overflow means broker cannot send out any messages to the SUB client, because the underlying channel(Netty) is unwritable.
—
Reply to this email directly, view it on GitHub<#112 (comment)>, or unsubscribe<https://github.com/notifications/unsubscribe-auth/APFKLAL4MSH74MW4RQHZYAT2JMVR3AVCNFSM6AAAAABUE2I5W6VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDKNZUGIZDMOBVGM>.
You are receiving this because you were mentioned.Message ID: ***@***.***>
|
@popduke We have enabled clustering in our environments with multiple nodes. Our testing is in progress, we will get back to you with the test results with proper configurations. Thanks for your timely responses. |
docker run -d --name bifromq -p 1883:1883 -p 1884:1884 -v /home/root/bifromq/conf:/home/bifromq/conf -v /home/root/bifromq/data:/home/bifromq/data -v /home/root/bifromq/plugins:/home/bifromq/plugins -v /home/root/bifromq/logs:/home/bifromq/logs -e EXTRA_JVM_OPTS=-Dplugin.hdcauthprovider.url=https:/localhost/Auth -DMaxTopicFiltersPerSub=50 -DRetainMessageMatchLimit=1000 bifromq/bifromq:latest
Steps to reproduce:
2024-12-23 19:28:13.476 ERROR [mqtt-worker-elg-3] --- c.baidu.bifromq.basescheduler.Batcher [Batcher.java:142] Unexpected exception
java.lang.ClassCastException: class com.baidu.bifromq.basekv.proto.Boundary cannot be cast to class java.lang.Comparable (com.baidu.bifromq.basekv.proto.Boundary is in unnamed module of loader 'app'; java.lang.Comparable is in module java.base of loader 'bootstrap')
at java.base/java.util.TreeMap.getEntry(TreeMap.java:347)
at java.base/java.util.TreeMap.containsKey(TreeMap.java:233)
at java.base/java.util.Collections$UnmodifiableMap.containsKey(Collections.java:1500)
at com.baidu.bifromq.dist.server.scheduler.DistCallScheduler$DistWorkerCallBatcher$BatchDistCall.rangeLookup(DistCallScheduler.java:240)
at com.baidu.bifromq.dist.server.scheduler.DistCallScheduler$DistWorkerCallBatcher$BatchDistCall.execute(DistCallScheduler.java:150)
at com.baidu.bifromq.basescheduler.Batcher.batchAndEmit(Batcher.java:170)
at com.baidu.bifromq.basescheduler.Batcher.trigger(Batcher.java:139)
at com.baidu.bifromq.basescheduler.Batcher.submit(Batcher.java:111)
at com.baidu.bifromq.basescheduler.BatchCallScheduler.lambda$schedule$2(BatchCallScheduler.java:119)
at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
at com.baidu.bifromq.basescheduler.BatchCallScheduler.schedule(BatchCallScheduler.java:113)
at com.baidu.bifromq.dist.server.DistResponsePipeline.handleRequest(DistResponsePipeline.java:66)
at com.baidu.bifromq.dist.server.DistResponsePipeline.handleRequest(DistResponsePipeline.java:42)
at com.baidu.bifromq.baserpc.AbstractResponsePipeline.startHandlingRequest(AbstractResponsePipeline.java:87)
at com.baidu.bifromq.baserpc.ResponsePipeline.onNext(ResponsePipeline.java:76)
at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
at io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:334)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:319)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at com.google.common.util.concurrent.DirectExecutorService.execute(DirectExecutorService.java:51)
at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:102)
at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:95)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener.messagesAvailable(ServerImpl.java:842)
at io.grpc.inprocess.InProcTransport$InProcessStream$InProcessClientStream.lambda$writeMessage$3(InProcTransport.java:803)
at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:94)
at io.grpc.inprocess.InProcTransport$InProcessStream$InProcessClientStream.writeMessage(InProcTransport.java:808)
at io.grpc.internal.ForwardingClientStream.writeMessage(ForwardingClientStream.java:37)
at io.grpc.internal.RetriableStream$1SendMessageEntry.runWith(RetriableStream.java:582)
at io.grpc.internal.RetriableStream.delayOrExecute(RetriableStream.java:559)
at io.grpc.internal.RetriableStream.sendMessage(RetriableStream.java:590)
at io.grpc.internal.ClientCallImpl.sendMessageInternal(ClientCallImpl.java:522)
at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:510)
at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:37)
at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext(ClientCalls.java:368)
at com.baidu.bifromq.baserpc.ManagedRequestPipeline.sendUntilStreamNotReadyOrNoTask(ManagedRequestPipeline.java:364)
at com.baidu.bifromq.baserpc.ManagedRequestPipeline.invoke(ManagedRequestPipeline.java:255)
at com.baidu.bifromq.dist.client.scheduler.DistServerCallScheduler$DistServerCallBatcher$DistServerBatchCall.execute(DistServerCallScheduler.java:100)
at com.baidu.bifromq.basescheduler.Batcher.batchAndEmit(Batcher.java:170)
at com.baidu.bifromq.basescheduler.Batcher.trigger(Batcher.java:139)
at com.baidu.bifromq.basescheduler.Batcher.submit(Batcher.java:111)
at com.baidu.bifromq.basescheduler.BatchCallScheduler.lambda$schedule$2(BatchCallScheduler.java:119)
at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
at com.baidu.bifromq.basescheduler.BatchCallScheduler.schedule(BatchCallScheduler.java:113)
at com.baidu.bifromq.dist.client.DistClient.pub(DistClient.java:55)
at com.baidu.bifromq.mqtt.handler.MQTTSessionHandler.doPub(MQTTSessionHandler.java:1500)
at com.baidu.bifromq.mqtt.handler.MQTTSessionHandler.doPub(MQTTSessionHandler.java:1402)
at com.baidu.bifromq.mqtt.handler.MQTTSessionHandler.lambda$handleQoS1Pub$37(MQTTSessionHandler.java:1204)
at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
at com.baidu.bifromq.mqtt.handler.MQTTSessionHandler.handleQoS1Pub(MQTTSessionHandler.java:1200)
at com.baidu.bifromq.mqtt.handler.MQTTSessionHandler.handlePubMsg(MQTTSessionHandler.java:459)
at com.baidu.bifromq.mqtt.handler.MQTTSessionHandler.channelRead(MQTTSessionHandler.java:414)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at com.baidu.bifromq.mqtt.handler.ConditionalSlowDownHandler.channelRead(ConditionalSlowDownHandler.java:78)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at com.baidu.bifromq.mqtt.handler.MQTTMessageDebounceHandler.channelRead(MQTTMessageDebounceHandler.java:61)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.traffic.AbstractTrafficShapingHandler.channelRead(AbstractTrafficShapingHandler.java:506)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1473)
at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1347)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1387)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:833)
2024-12-23 19:28:14.131 ERROR [basekv-client-executor-1] --- c.b.b.r.s.scheduler.BatchRetainCall [BatchRetainCall.java:68] topic not found in result map, tenantId: global, topic: Tenant/1/Station/19/Unit/10078/Device/10522/Auth
2024-12-23 19:40:50.895 ERROR [basekv-server-executor] --- c.b.b.baserpc.AbstractResponsePipeline [AbstractResponsePipeline.java:93] Request handling with error in pipeline@556979744
java.util.concurrent.CancellationException: null
at java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2478)
at com.baidu.bifromq.baserpc.utils.FutureTracker.stop(FutureTracker.java:38)
at com.baidu.bifromq.baserpc.AbstractResponsePipeline.cleanup(AbstractResponsePipeline.java:138)
at com.baidu.bifromq.baserpc.AbstractResponsePipeline.close(AbstractResponsePipeline.java:68)
at com.baidu.bifromq.baserpc.AbstractResponsePipeline.onError(AbstractResponsePipeline.java:42)
at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:288)
at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
at io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:375)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:364)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:910)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at io.micrometer.core.instrument.internal.TimedRunnable.run(TimedRunnable.java:49)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
The text was updated successfully, but these errors were encountered: