-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(transport): add experimental QUIC Transport (not production ready) #725
Changes from all commits
c7eef2e
9b7e1c6
40ab9ff
7d6e66a
54d6ca9
f3705a5
0f6b934
8743bb9
3910728
af0a9ac
905f221
e6136ea
51c6639
6287bb0
c8c2d8e
e7bae90
4078bf7
2501102
6681116
5403143
e61a190
beaed7c
ee3fdad
b3f9385
2fa44df
cab28c2
7e58972
56240f1
f04b2df
628bfa8
330bbb3
f8c1899
948d937
81aa2bb
e393835
cb715ff
f39951f
9dbf9d6
3319424
4078b66
6ddafca
3301ae6
c238812
830ec3e
0a88fca
739f2ae
b8db7be
85dfdf2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,224 @@ | ||
import std/sequtils | ||
import pkg/chronos | ||
import pkg/chronicles | ||
import pkg/quic | ||
import ../multiaddress | ||
import ../multicodec | ||
import ../stream/connection | ||
import ../wire | ||
import ../muxers/muxer | ||
import ../upgrademngrs/upgrade | ||
import ./transport | ||
|
||
export multiaddress | ||
export multicodec | ||
export connection | ||
export transport | ||
|
||
logScope: | ||
topics = "libp2p quictransport" | ||
|
||
type | ||
P2PConnection = connection.Connection | ||
QuicConnection = quic.Connection | ||
|
||
# Stream | ||
type QuicStream* = ref object of P2PConnection | ||
stream: Stream | ||
cached: seq[byte] | ||
|
||
proc new( | ||
_: type QuicStream, stream: Stream, oaddr: Opt[MultiAddress], peerId: PeerId | ||
): QuicStream = | ||
let quicstream = QuicStream(stream: stream, observedAddr: oaddr, peerId: peerId) | ||
procCall P2PConnection(quicstream).initStream() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should specify an objName for |
||
quicstream | ||
|
||
template mapExceptions(body: untyped) = | ||
try: | ||
body | ||
except QuicError: | ||
raise newLPStreamEOFError() | ||
except CatchableError: | ||
raise newLPStreamEOFError() | ||
Comment on lines
+40
to
+43
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After a recent discussion with @Ivansete-status, I think those errors need a small rework, they lack precision (not in this PR though). But nevertheless, everytime you use |
||
|
||
method readOnce*( | ||
stream: QuicStream, pbytes: pointer, nbytes: int | ||
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} = | ||
try: | ||
if stream.cached.len == 0: | ||
stream.cached = await stream.stream.read() | ||
result = min(nbytes, stream.cached.len) | ||
copyMem(pbytes, addr stream.cached[0], result) | ||
stream.cached = stream.cached[result ..^ 1] | ||
except CatchableError as exc: | ||
raise newLPStreamEOFError() | ||
|
||
{.push warning[LockLevel]: off.} | ||
method write*( | ||
stream: QuicStream, bytes: seq[byte] | ||
) {.async: (raises: [CancelledError, LPStreamError]).} = | ||
mapExceptions(await stream.stream.write(bytes)) | ||
|
||
{.pop.} | ||
|
||
method closeImpl*(stream: QuicStream) {.async: (raises: []).} = | ||
try: | ||
await stream.stream.close() | ||
except CatchableError as exc: | ||
discard | ||
await procCall P2PConnection(stream).closeImpl() | ||
|
||
# Session | ||
type QuicSession* = ref object of P2PConnection | ||
connection: QuicConnection | ||
Comment on lines
+72
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure to understand why you need two P2PConnections here (with QuicSession and QuicStream). It's a bit confusing. And I think it's better to create your custom |
||
|
||
method close*(session: QuicSession) {.async, base.} = | ||
await session.connection.close() | ||
await procCall P2PConnection(session).close() | ||
|
||
proc getStream*( | ||
session: QuicSession, direction = Direction.In | ||
): Future[QuicStream] {.async.} = | ||
var stream: Stream | ||
case direction | ||
of Direction.In: | ||
stream = await session.connection.incomingStream() | ||
of Direction.Out: | ||
stream = await session.connection.openStream() | ||
await stream.write(@[]) # QUIC streams do not exist until data is sent | ||
return QuicStream.new(stream, session.observedAddr, session.peerId) | ||
|
||
method getWrapped*(self: QuicSession): P2PConnection = | ||
nil | ||
|
||
# Muxer | ||
type QuicMuxer = ref object of Muxer | ||
quicSession: QuicSession | ||
handleFut: Future[void] | ||
|
||
method newStream*( | ||
m: QuicMuxer, name: string = "", lazy: bool = false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think the |
||
): Future[P2PConnection] {. | ||
async: (raises: [CancelledError, LPStreamError, MuxerError]) | ||
.} = | ||
try: | ||
return await m.quicSession.getStream(Direction.Out) | ||
except CatchableError as exc: | ||
raise newException(MuxerError, exc.msg, exc) | ||
|
||
proc handleStream(m: QuicMuxer, chann: QuicStream) {.async.} = | ||
## call the muxer stream handler for this channel | ||
## | ||
try: | ||
await m.streamHandler(chann) | ||
trace "finished handling stream" | ||
doAssert(chann.closed, "connection not closed by handler!") | ||
except CatchableError as exc: | ||
trace "Exception in mplex stream handler", msg = exc.msg | ||
await chann.close() | ||
|
||
method handle*(m: QuicMuxer): Future[void] {.async: (raises: []).} = | ||
try: | ||
while not m.quicSession.atEof: | ||
let incomingStream = await m.quicSession.getStream(Direction.In) | ||
asyncSpawn m.handleStream(incomingStream) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you find an alternative to the asyncspawn, it would be great. |
||
except CatchableError as exc: | ||
trace "Exception in mplex handler", msg = exc.msg | ||
|
||
method close*(m: QuicMuxer) {.async: (raises: []).} = | ||
try: | ||
await m.quicSession.close() | ||
m.handleFut.cancel() | ||
except CatchableError as exc: | ||
discard | ||
|
||
# Transport | ||
type QuicUpgrade = ref object of Upgrade | ||
|
||
type QuicTransport* = ref object of Transport | ||
listener: Listener | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above, not a fan of un-prefixed object like that. QuicListener is great. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is defined in nim-quic. |
||
connections: seq[P2PConnection] | ||
|
||
func new*(_: type QuicTransport, u: Upgrade): QuicTransport = | ||
QuicTransport(upgrader: QuicUpgrade(ms: u.ms)) | ||
|
||
method handles*(transport: QuicTransport, address: MultiAddress): bool = | ||
if not procCall Transport(transport).handles(address): | ||
return false | ||
QUIC_V1.match(address) | ||
|
||
method start*(transport: QuicTransport, addrs: seq[MultiAddress]) {.async.} = | ||
doAssert transport.listener.isNil, "start() already called" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this really be a |
||
#TODO handle multiple addr | ||
transport.listener = listen(initTAddress(addrs[0]).tryGet) | ||
await procCall Transport(transport).start(addrs) | ||
transport.addrs[0] = | ||
MultiAddress.init(transport.listener.localAddress(), IPPROTO_UDP).tryGet() & | ||
MultiAddress.init("/quic-v1").get() | ||
transport.running = true | ||
|
||
method stop*(transport: QuicTransport) {.async.} = | ||
if transport.running: | ||
for c in transport.connections: | ||
await c.close() | ||
Comment on lines
+163
to
+164
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about something like that: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any semantic difference? Not sure if it adds enough value to justify the change. |
||
await procCall Transport(transport).stop() | ||
await transport.listener.stop() | ||
transport.running = false | ||
transport.listener = nil | ||
|
||
proc wrapConnection( | ||
transport: QuicTransport, connection: QuicConnection | ||
): P2PConnection {.raises: [Defect, TransportOsError, LPError].} = | ||
let | ||
remoteAddr = connection.remoteAddress() | ||
observedAddr = | ||
MultiAddress.init(remoteAddr, IPPROTO_UDP).get() & | ||
MultiAddress.init("/quic-v1").get() | ||
conres = QuicSession(connection: connection, observedAddr: Opt.some(observedAddr)) | ||
conres.initStream() | ||
|
||
transport.connections.add(conres) | ||
proc onClose() {.async.} = | ||
await conres.join() | ||
transport.connections.keepItIf(it != conres) | ||
trace "Cleaned up client" | ||
|
||
asyncSpawn onClose() | ||
return conres | ||
|
||
method accept*(transport: QuicTransport): Future[P2PConnection] {.async.} = | ||
doAssert not transport.listener.isNil, "call start() before calling accept()" | ||
let connection = await transport.listener.accept() | ||
return transport.wrapConnection(connection) | ||
|
||
method dial*( | ||
transport: QuicTransport, | ||
hostname: string, | ||
address: MultiAddress, | ||
peerId: Opt[PeerId] = Opt.none(PeerId), | ||
): Future[P2PConnection] {.async, gcsafe.} = | ||
let connection = await dial(initTAddress(address).tryGet) | ||
return transport.wrapConnection(connection) | ||
Comment on lines
+201
to
+202
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should probably check if |
||
|
||
method upgrade*( | ||
self: QuicTransport, conn: P2PConnection, peerId: Opt[PeerId] | ||
): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} = | ||
let qs = QuicSession(conn) | ||
if peerId.isSome: | ||
qs.peerId = peerId.get() | ||
|
||
let muxer = QuicMuxer(quicSession: qs, connection: conn) | ||
muxer.streamHandler = proc(conn: P2PConnection) {.async: (raises: []).} = | ||
trace "Starting stream handler" | ||
try: | ||
await self.upgrader.ms.handle(conn) # handle incoming connection | ||
except CancelledError as exc: | ||
return | ||
except CatchableError as exc: | ||
trace "exception in stream handler", conn, msg = exc.msg | ||
finally: | ||
await conn.closeWithEOF() | ||
trace "Stream handler done", conn | ||
muxer.handleFut = muxer.handle() | ||
return muxer |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
{.used.} | ||
|
||
import sequtils | ||
import chronos, stew/byteutils | ||
import | ||
../libp2p/[ | ||
stream/connection, | ||
transports/transport, | ||
transports/quictransport, | ||
upgrademngrs/upgrade, | ||
multiaddress, | ||
errors, | ||
wire, | ||
] | ||
|
||
import ./helpers, ./commontransport | ||
|
||
suite "Quic transport": | ||
asyncTest "can handle local address": | ||
let ma = @[MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()] | ||
let transport1 = QuicTransport.new() | ||
await transport1.start(ma) | ||
check transport1.handles(transport1.addrs[0]) | ||
await transport1.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a huge fan of this un-prefixed
Stream
. Maybe renaming itQuicStream
and renaming the currentQuicStream
intoLPQuicStream
will be clearer.