-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Yamux] Allow max ACK backlog of 256 streams #340
[Yamux] Allow max ACK backlog of 256 streams #340
Conversation
//CC @thomaseizinger who pushed a lot for this to land in rust-libp2p and specs. |
Just to make sure I'm getting the Yamux spec correctly: There could be two kinds of unacknowledged streams:
So the questions:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions remain
import java.util.concurrent.atomic.AtomicInteger | ||
import kotlin.math.max | ||
import kotlin.properties.Delegates | ||
|
||
const val ACK_BACKLOG_ALLOWANCE = 256 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would parametrize it for the YamuxHandler
making this constant default value
createYamuxStreamHandler(child.id).onLocalOpen() | ||
if (totalUnacknowledgedStreams() < ACK_BACKLOG_ALLOWANCE) { | ||
createYamuxStreamHandler(child.id).onLocalOpen() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we throw a specific WriteMuxerException
here?
They should be accounted separately. A should not ACK inbound streams from B, in case A's application layer has not yet read or written to a stream. B should not open a new stream to A in case 256 outbound streams to A have not yet been ACKed. Same vice versa for streams from A to B. |
Do you have any way to backpressure the application? In Rust and I am assuming in Go as well, we suspend the green thread. |
In rust-yamux today it is best effort only, enforced at the outbound end for now. In case you want to enforce it at the receiving side, I would consider resetting the stream exceeding the limit. |
Not with the present API
Yeah, that's a good solution.
If both peers has the same |
Correct. Though I am not aware of a single libp2p based network out there today that supports the yamux ack backlog feature across all nodes. Thus treating it as best effort everywhere. |
@@ -240,8 +245,11 @@ open class YamuxHandler( | |||
streamHandlers.remove(child.id)?.dispose() | |||
} | |||
|
|||
private fun calculateTotalBufferedWrites(): Int { | |||
return streamHandlers.values.sumOf { it.sendBuffer.readableBytes() } | |||
override fun checkCanOpenNewStream() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just throw from onLocalOpen()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So essentially in newStream
method in AbstractMuxHandler
, if onLocalOpen
fails then outboundInitializer
is never called and createStream
in MuxHandler
controller future is never complete. And I am thinking also in order to avoid calling createChild
with he new id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if onLocalOpen fails then outboundInitializer is never called and createStream in MuxHandler controller future is never complete.
Right that probably should be fixed. register()
doesn't rethrow exceptions but rather wraps them into ChannelFuture
, so the best way is to call sync()
to rethrow. register()
call shouldn't block so the sync()
should be synchronous:
Index: libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/AbstractMuxHandler.kt
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/AbstractMuxHandler.kt b/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/AbstractMuxHandler.kt
--- a/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/AbstractMuxHandler.kt (revision 1927bcae6804fc5b24662a40221e217b95298944)
+++ b/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/AbstractMuxHandler.kt (date 1698247944849)
@@ -138,7 +138,7 @@
): MuxChannel<TData> {
val child = MuxChannel(this, id, initializer, initiator)
streamMap[id] = child
- ctx!!.channel().eventLoop().register(child)
+ ctx!!.channel().eventLoop().register(child).sync()
return child
}
And I am thinking also in order to avoid calling createChild with he new id.
Doesn't look like a big issue: the stream seems to immediately disposed in that case and removed from the stream map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the fix above throwing from onLocalOpen()
worked for me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. I like it. Done.
@StefanBratanov are you planning to address this here yet or later? |
Working on it now. Think it makes sense to be in this PR as well. |
Done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just some nits 👍
https://github.com/libp2p/specs/blob/master/yamux/README.md#ack-backlog--backpressure
Implements
SHOULD at most allow an ACK backlog of 256 streams.
ackBacklogLimit
when initializing Yamuxacknowledged
when initializing yamux stream