-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Akka.Streams improvements #105
Akka.Streams improvements #105
Conversation
Push(_stage.Out, packet); | ||
else |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a source of some major, major bugs for TurboMqtt - it resulted in us not consistently pulling the stages above us, therefore many QoS 1 / QoS 2 packets went unacknowledged
@@ -72,89 +75,88 @@ private sealed class Logic : InAndOutGraphStageLogic | |||
public Logic(ClientAckingFlow stage) : base(stage.Shape) | |||
{ | |||
_stage = stage; | |||
_publishIds = new SimpleLruCache<NonZeroUInt16>(stage._bufferSize, stage._bufferExpiry); | |||
_pubRelIds = new SimpleLruCache<NonZeroUInt16>(stage._bufferSize, stage._bufferExpiry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Profiled this - the explicit de-duplication code was actually a major source of slow-downs. Think O(N) for each message that came in. I'm rethinking whether or not this is the client's responsibility or not too - we should add the PacketId
to the MqttMessage
type if we want to pass the buck on de-duplication to the end-user.
Working on some throughput improvements around QoS 1 / QoS 2 handling