Skip to content

Commit

Permalink
Use tcp transport connHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Sep 22, 2022
1 parent 31d5d57 commit 4b146ad
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 56 deletions.
61 changes: 5 additions & 56 deletions libp2p/transports/tortransport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ else:

import std/[oids, sequtils]
import chronos, chronicles, strutils
import stew/byteutils
import stew/endians2
import stew/[byteutils, endians2, results]
import ../multicodec
import transport,
tcptransport,
Expand Down Expand Up @@ -39,56 +38,6 @@ proc new*(
flags: flags,
tcpTransport: TcpTransport.new(upgrade = upgrade))

proc connHandler*(self: TorTransport,
client: StreamTransport,
dir: Direction): Future[Connection] {.async.} =
var observedAddr: MultiAddress = MultiAddress()
try:
observedAddr = MultiAddress.init("/ip4/0.0.0.0").tryGet()
except CatchableError as exc:
trace "Failed to create observedAddr", exc = exc.msg
if not(isNil(client) and client.closed):
await client.closeWait()
raise exc

trace "Handling tcp connection", address = $observedAddr,
dir = $dir,
clients = self.tcpTransport.clients[Direction.In].len +
self.tcpTransport.clients[Direction.Out].len

let conn = Connection(
ChronosStream.init(
client = client,
dir = dir,
observedAddr = observedAddr
))

proc onClose() {.async.} =
try:
let futs = @[client.join(), conn.join()]
await futs[0] or futs[1]
for f in futs:
if not f.finished: await f.cancelAndWait() # cancel outstanding join()

trace "Cleaning up client", addrs = $client.remoteAddress,
conn

self.tcpTransport.clients[dir].keepItIf( it != client )
await allFuturesThrowing(
conn.close(), client.closeWait())

trace "Cleaned up client", addrs = $client.remoteAddress,
conn

except CatchableError as exc:
let useExc {.used.} = exc
debug "Error cleaning up client", errMsg = exc.msg, conn

self.tcpTransport.clients[dir].add(client)
asyncSpawn onClose()

return conn

proc connectToTorServer(transportAddress: TransportAddress): Future[StreamTransport] {.async, gcsafe.} =
let transp = await connect(transportAddress)
try:
Expand Down Expand Up @@ -126,7 +75,7 @@ method dial*(

try:
await dialPeer(transp, address)
return await self.connHandler(transp, Direction.Out)
return await self.tcpTransport.connHandler(transp, Opt.none(MultiAddress), Direction.Out)
except CatchableError as err:
await transp.closeWait()
raise err
Expand All @@ -143,11 +92,11 @@ method start*(
if not self.handles(ma):
trace "Invalid address detected, skipping!", address = ma
continue

let ipTcp = ma[0..1].get()
ipTcpAddrs.add(ipTcp)
let onion3 = ma[multiCodec("onion3")].get()
onion3Addrs.add(onion3)
onion3Addrs.add(onion3)

if len(ipTcpAddrs) != 0 and len(onion3Addrs) != 0:
await procCall Transport(self).start(onion3Addrs)
Expand All @@ -163,7 +112,7 @@ method stop*(self: TorTransport) {.async, gcsafe.} =
##
await self.tcpTransport.stop()

method handles*(t: TorTransport, address: MultiAddress): bool {.gcsafe.} =
method handles*(t: TorTransport, address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(t).handles(address):
if address.protocols.isOk:
return ONIO3_MATCHER.match(address)
1 change: 1 addition & 0 deletions tests/testnative.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import testmultibase,
testrouting_record

import testtcptransport,
testtortransport,
testnameresolve,
testwstransport,
testmultistream,
Expand Down

0 comments on commit 4b146ad

Please sign in to comment.