diff --git a/lib/connect.js b/lib/connect.js index ce8753d9..5d5d7de0 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -1,5 +1,6 @@ const NoiseSecretStream = require('@hyperswarm/secret-stream') const b4a = require('b4a') +const relay = require('protomux-bridging-relay') const DebuggingStream = require('debugging-stream') const { isPrivate } = require('bogon') const Semaphore = require('./semaphore') @@ -24,7 +25,6 @@ const { SERVER_ERROR, SERVER_INCOMPATIBLE } = require('./errors') -const connectionRelay = require('./connection-relay') module.exports = function connect (dht, publicKey, opts = {}) { const pool = opts.pool || null @@ -64,7 +64,7 @@ module.exports = function connect (dht, publicKey, opts = {}) { onsocket: null, sleeper: new Sleeper(), relayThrough: opts.relayThrough || null, - relayToken: opts.relayThrough ? connectionRelay.token() : null, + relayToken: opts.relayThrough ? relay.token() : null, encryptedSocket } @@ -389,9 +389,23 @@ async function connectThroughNode (c, address, socket) { token = c.relayToken } - const request = connectionRelay.Client - .from(c.dht.connect(publicKey, { pool: c.pool })) - .rendezvous(isInitiator, token, c.rawStream, c.onsocket) + const relaySocket = c.dht.connect(publicKey, { pool: c.pool }) + + const request = relay.Client + .from(relaySocket, { id: relaySocket.publicKey }) + .pair(isInitiator, token, c.rawStream) + .on('error', () => { /* TODO */ }) + .on('data', (remoteId) => { + const { + remotePort, + remoteHost, + socket + } = relaySocket.rawStream + + c.rawStream.connect(socket, remoteId, remotePort, remoteHost) + + c.onsocket(socket, remotePort, remoteHost) + }) const onerror = () => request.destroy() diff --git a/lib/connection-pool.js b/lib/connection-pool.js index 5a83608c..38e2f98d 100644 --- a/lib/connection-pool.js +++ b/lib/connection-pool.js @@ -64,7 +64,7 @@ module.exports = class ConnectionPool extends EventEmitter { return } - const session = new ConnectionSession(this, stream) + const session = new ConnectionRef(this, stream) const keyString = stream.remotePublicKey.toString('hex') @@ -123,10 +123,23 @@ module.exports = class ConnectionPool extends EventEmitter { } } -class ConnectionSession { +class ConnectionRef { constructor (pool, stream) { this._pool = pool this._stream = stream + this._refs = 0 + } + + active () { + this._refs++ + } + + inactive () { + this._refs-- + } + + release () { + this._stream.destroy() } } diff --git a/lib/connection-relay.js b/lib/connection-relay.js deleted file mode 100644 index 577e6089..00000000 --- a/lib/connection-relay.js +++ /dev/null @@ -1,253 +0,0 @@ -const Protomux = require('protomux') -const sodium = require('sodium-universal') -const b4a = require('b4a') -const c = require('compact-encoding') - -exports.Server = class ConnectionRelayServer { - constructor (dht) { - this.dht = dht - this.pairs = new Map() - this.sessions = new Set() - } - - accept (stream) { - const session = new ConnectionRelaySession(this, stream) - - this.sessions.add(session) - - stream.on('close', () => session.close()) - } -} - -class ConnectionRelaySession { - constructor (server, stream) { - this.server = server - this.mux = Protomux.from(stream) - - this.channel = this.mux.createChannel({ - protocol: 'hyperdht/relay', - id: stream.remotePublicKey, - onclose: this._onclose.bind(this) - }) - - this.messages = { - rendezvous: this.channel.addMessage({ - encoding: m.rendezvous, - onmessage: this._onrendezvous.bind(this) - }) - } - - this.channel.open() - } - - get dht () { - return this.server.dht - } - - _onclose () { - this.server.sessions.delete(this) - } - - _onrendezvous ({ isInitiator, token, id: remoteId }) { - const keyString = token.toString('hex') - - let pair = this.server.pairs.get(keyString) - - if (pair === undefined) { - pair = new ConnectionRelaySessionPair() - this.server.pairs.set(keyString, pair) - } - - pair.sessions[+isInitiator] = { - isInitiator, - session: this, - remoteId, - rawStream: null - } - - if (pair.paired) { - this.server.pairs.delete(keyString) - - for (const session of pair.sessions) { - const { session: { dht }, remoteId } = session - - const rawStream = dht.createRawStream({ - framed: true, - firewall (socket, port, host) { - rawStream.connect(socket, remoteId, port, host) - return false - } - }) - - session.rawStream = rawStream - } - - for (const session of pair.sessions) { - const { isInitiator, session: { messages }, rawStream } = session - - rawStream.relayTo(pair.remote(session).rawStream) - - messages.rendezvous.send({ - isInitiator, - token, - id: rawStream.id, - seq: 0 - }) - } - } - } - - close () { - // TODO - // this.channel.close() - } -} - -class ConnectionRelaySessionPair { - constructor () { - this.sessions = [null, null] - } - - get paired () { - return this.sessions[0] !== null && this.sessions[1] !== null - } - - remote (session) { - return this.sessions[session.isInitiator ? 0 : 1] - } -} - -exports.Client = class ConnectionRelayClient { - static clients = new WeakMap() - - static from (stream) { - let client = this.clients.get(stream) - if (client) return client - client = new ConnectionRelayClient(stream) - this.clients.set(stream, client) - return client - } - - constructor (stream) { - this.mux = Protomux.from(stream) - this.requests = new Map() - - this.channel = this.mux.createChannel({ - protocol: 'hyperdht/relay', - id: stream.publicKey, - onclose: this._onclose.bind(this) - }) - - this.messages = { - rendezvous: this.channel.addMessage({ - encoding: m.rendezvous, - onmessage: this._onrendezvous.bind(this) - }) - } - - this.channel.open() - } - - get stream () { - return this.mux.stream - } - - _onclose () { - ConnectionRelayClient.clients.delete(this.stream) - } - - _onrendezvous ({ isInitiator, token, id: remoteId }) { - const keyString = token.toString('hex') - - const request = this.requests.get(keyString) - - if (request === undefined || request.isInitiator !== isInitiator) return - - const { - remotePort, - remoteHost, - socket - } = this.stream.rawStream - - request.rawStream.connect(socket, remoteId, remotePort, remoteHost) - - request.cb(socket, remotePort, remoteHost) - - request.destroy() - } - - rendezvous (isInitiator, token, rawStream, cb) { - const request = new ConnectionRelayRequest(this, isInitiator, token, rawStream, cb) - - this.requests.set(token.toString('hex'), request) - - this.messages.rendezvous.send({ - isInitiator, - token, - id: rawStream.id, - seq: 0 - }) - - return request - } - - _closeMaybe () { - // TODO - // if (this.requests.size === 0) this.close() - } - - close () { - this.channel.close() - } -} - -class ConnectionRelayRequest { - constructor (client, isInitiator, token, rawStream, cb) { - this.client = client - this.isInitiator = isInitiator - this.token = token - this.rawStream = rawStream - this.cb = cb - } - - destroy () { - this.client.requests.delete(this.token.toString('hex')) - this.client._closeMaybe() - } -} - -exports.token = function token (buf = b4a.allocUnsafe(32)) { - sodium.randombytes_buf(buf) - return buf -} - -const m = exports.messages = {} - -m.rendezvous = { - preencode (state, m) { - c.uint.preencode(state, 0) - c.fixed32.preencode(state, m.token) - c.uint.preencode(state, m.id) - c.uint.preencode(state, m.seq) - }, - encode (state, m) { - let flags = 0 - - if (m.isInitiator) flags |= 1 - - c.uint.encode(state, flags) - c.fixed32.encode(state, m.token) - c.uint.encode(state, m.id) - c.uint.encode(state, m.seq) - }, - decode (state) { - const flags = c.uint.decode(state) - - return { - isInitiator: (flags & 1) !== 0, - token: c.fixed32.decode(state), - id: c.uint.decode(state), - seq: c.uint.decode(state) - } - } -} diff --git a/lib/server.js b/lib/server.js index f6ca7fa5..05432013 100644 --- a/lib/server.js +++ b/lib/server.js @@ -2,13 +2,13 @@ const { EventEmitter } = require('events') const safetyCatch = require('safety-catch') const NoiseSecretStream = require('@hyperswarm/secret-stream') const b4a = require('b4a') +const relay = require('protomux-bridging-relay') const NoiseWrap = require('./noise-wrap') const Announcer = require('./announcer') const { FIREWALL, ERROR } = require('./constants') const { hash } = require('./crypto') const SecurePayload = require('./secure-payload') const Holepuncher = require('./holepuncher') -const connectionRelay = require('./connection-relay') const DebuggingStream = require('debugging-stream') const { isPrivate } = require('bogon') const { ALREADY_LISTENING, NODE_DESTROYED } = require('./errors') @@ -37,7 +37,6 @@ module.exports = class Server extends EventEmitter { this._keyPair = null this._announcer = null this._connects = new Map() - this._connectionRelay = opts.connectionRelay ? new connectionRelay.Server(dht) : null this._holepunches = [] this._listening = false this._closing = null @@ -221,11 +220,7 @@ module.exports = class Server extends EventEmitter { hs.rawStream.connect(socket, remotePayload.udx.id, port, host) } - const encryptedSocket = this.createSecretStream(false, rawStream, { handshake: h }) - - if (this._connectionRelay) this._connectionRelay.accept(encryptedSocket) - - this.onconnection(encryptedSocket) + this.onconnection(this.createSecretStream(false, rawStream, { handshake: h })) if (hs.puncher) { hs.puncher.onabort = noop @@ -238,7 +233,7 @@ module.exports = class Server extends EventEmitter { } } - if (this.relayThrough) hs.relayToken = connectionRelay.token() + if (this.relayThrough) hs.relayToken = relay.token() try { hs.reply = await handshake.send({ @@ -290,9 +285,23 @@ module.exports = class Server extends EventEmitter { token = remotePayload.relayThrough.token } - const request = connectionRelay.Client - .from(this.dht.connect(publicKey, { pool: this.pool })) - .rendezvous(isInitiator, token, hs.rawStream, hs.onsocket) + const relaySocket = this.dht.connect(publicKey, { pool: this.pool }) + + const request = relay.Client + .from(relaySocket, { id: relaySocket.publicKey }) + .pair(isInitiator, token, hs.rawStream) + .on('error', () => { /* TODO */ }) + .on('data', (remoteId) => { + const { + remotePort, + remoteHost, + socket + } = relaySocket.rawStream + + hs.rawStream.connect(socket, remoteId, remotePort, remoteHost) + + hs.onsocket(socket, remotePort, remoteHost) + }) const onerror = () => request.destroy() diff --git a/test/relaying.js b/test/relaying.js index 43d41ec7..c6b09935 100644 --- a/test/relaying.js +++ b/test/relaying.js @@ -1,4 +1,5 @@ const test = require('brittle') +const RelayServer = require('protomux-bridging-relay').Server const { swarm } = require('./helpers') const DHT = require('../') @@ -31,9 +32,16 @@ test('relay connections through node, client side', async function (t) { await aServer.listen() + const relay = new RelayServer({ + createStream (opts) { + return b.createRawStream({ ...opts, framed: true }) + } + }) + const bServer = b.createServer({ - shareLocalAddress: false, - connectionRelay: true + shareLocalAddress: false + }, function (socket) { + relay.accept(socket, { id: socket.remotePublicKey }) }) await bServer.listen() @@ -70,9 +78,16 @@ test('relay connections through node, server side', async function (t) { const lc = t.test('socket lifecycle') lc.plan(5) + const relay = new RelayServer({ + createStream (opts) { + return a.createRawStream({ ...opts, framed: true }) + } + }) + const aServer = a.createServer({ - shareLocalAddress: false, - connectionRelay: true + shareLocalAddress: false + }, function (socket) { + relay.accept(socket, { id: socket.remotePublicKey }) }) await aServer.listen() @@ -128,9 +143,16 @@ test('relay connections through node, client and server side', async function (t const lc = t.test('socket lifecycle') lc.plan(5) + const relay = new RelayServer({ + createStream (opts) { + return a.createRawStream({ ...opts, framed: true }) + } + }) + const aServer = a.createServer({ - shareLocalAddress: false, - connectionRelay: true + shareLocalAddress: false + }, function (socket) { + relay.accept(socket, { id: socket.remotePublicKey }) }) await aServer.listen() @@ -206,9 +228,16 @@ test('relay several connections through node with pool', async function (t) { await aServer.listen() + const relay = new RelayServer({ + createStream (opts) { + return b.createRawStream({ ...opts, framed: true }) + } + }) + const bServer = b.createServer({ - shareLocalAddress: false, - connectionRelay: true + shareLocalAddress: false + }, function (socket) { + relay.accept(socket, { id: socket.remotePublicKey }) }) await bServer.listen() @@ -224,10 +253,10 @@ test('relay several connections through node with pool', async function (t) { aSocket .on('open', () => { - lc.pass('client socket opened') + lc.pass('1st client socket opened') }) .on('close', () => { - lc.pass('client socket closed') + lc.pass('1st client socket closed') const aSocket = c.connect(aServer.publicKey, { fastOpen: false, @@ -238,10 +267,10 @@ test('relay several connections through node with pool', async function (t) { aSocket .on('open', () => { - lc.pass('client socket opened') + lc.pass('2nd client socket opened') }) .on('close', () => { - lc.pass('client socket closed') + lc.pass('2nd client socket closed') }) .end('hello world') })