diff --git a/examples/connection-relaying/client.js b/examples/connection-relaying/client.js new file mode 100644 index 00000000..448fbfa5 --- /dev/null +++ b/examples/connection-relaying/client.js @@ -0,0 +1,14 @@ +const DHT = require('../..') + +const [relay, server] = process.argv.slice(2) + +const dht = new DHT() + +const socket = dht.connect(Buffer.from(server, 'hex'), { + localConnection: false, + relayThrough: Buffer.from(relay, 'hex') +}) + +console.log('Client connecting from', socket.publicKey.toString('hex')) + +socket.end('Hello!') diff --git a/examples/connection-relaying/relay.js b/examples/connection-relaying/relay.js new file mode 100644 index 00000000..45bb4c16 --- /dev/null +++ b/examples/connection-relaying/relay.js @@ -0,0 +1,23 @@ +const RelayServer = require('blind-relay').Server +const DHT = require('../..') + +const dht = new DHT() + +const relay = new RelayServer({ + createStream (opts) { + return dht.createRawStream({ ...opts, framed: true }) + } +}) + +const server = dht.createServer({ shareLocalAddress: false }, (socket) => { + console.log('Connection from', socket.remotePublicKey.toString('hex')) + const session = relay.accept(socket, { id: socket.remotePublicKey }) + session + .on('pair', (isInitiator, token, stream, remoteId) => { + console.log('Pair isInitiator =', isInitiator, 'token =', token.toString('hex')) + }) +}) + +server + .listen() + .then(() => console.log('Relay listening on', server.publicKey.toString('hex'))) diff --git a/examples/connection-relaying/server.js b/examples/connection-relaying/server.js new file mode 100644 index 00000000..a353ba2f --- /dev/null +++ b/examples/connection-relaying/server.js @@ -0,0 +1,14 @@ +const DHT = require('../..') + +const dht = new DHT() + +const server = dht.createServer({ shareLocalAddress: false }, (socket) => { + console.log('Connection from', socket.remotePublicKey.toString('hex')) + socket + .on('data', (data) => console.log(data.toString())) + .end() +}) + +server + .listen() + .then(() => console.log('Server listening on', server.publicKey.toString('hex'))) diff --git a/index.js b/index.js index 19e5c6b5..7e9c1bda 100644 --- a/index.js +++ b/index.js @@ -11,6 +11,7 @@ const connect = require('./lib/connect') const { FIREWALL, BOOTSTRAP_NODES, COMMANDS } = require('./lib/constants') const { hash, createKeyPair } = require('./lib/crypto') const RawStreamSet = require('./lib/raw-stream-set') +const ConnectionPool = require('./lib/connection-pool') const { STREAM_NOT_CONNECTED } = require('./lib/errors') const maxSize = 65536 @@ -60,6 +61,10 @@ class HyperDHT extends DHT { return s } + pool () { + return new ConnectionPool(this) + } + async destroy ({ force } = {}) { if (!force) { const closing = [] diff --git a/lib/connect.js b/lib/connect.js index 3eb1ac1e..57e4d03a 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -1,5 +1,6 @@ const NoiseSecretStream = require('@hyperswarm/secret-stream') const b4a = require('b4a') +const relay = require('blind-relay') const DebuggingStream = require('debugging-stream') const { isPrivate } = require('bogon') const Semaphore = require('./semaphore') @@ -26,6 +27,10 @@ const { } = require('./errors') module.exports = function connect (dht, publicKey, opts = {}) { + const pool = opts.pool || null + + if (pool && pool.has(publicKey)) return pool.get(publicKey) + const keyPair = opts.keyPair || dht.defaultKeyPair const encryptedSocket = (opts.createSecretStream || defaultCreateSecretStream)(true, null, { publicKey: keyPair.publicKey, @@ -33,9 +38,12 @@ module.exports = function connect (dht, publicKey, opts = {}) { autoStart: false }) + if (pool) pool._attachStream(encryptedSocket, false) + const c = { dht, session: dht.session(), + pool, round: 0, target: hash(publicKey), remotePublicKey: publicKey, @@ -45,7 +53,7 @@ module.exports = function connect (dht, publicKey, opts = {}) { requesting: false, lan: opts.localConnection !== false, firewall: FIREWALL.UNKNOWN, - rawStream: dht._rawStreams.add({ framed: true, firewall }), + rawStream: dht.createRawStream({ framed: true, firewall }), connect: null, query: null, puncher: null, @@ -55,7 +63,14 @@ module.exports = function connect (dht, publicKey, opts = {}) { serverAddress: null, onsocket: null, sleeper: new Sleeper(), - encryptedSocket + encryptedSocket, + + // Relay state + relayThrough: opts.relayThrough || null, + relayToken: opts.relayThrough ? relay.token() : null, + relaySocket: null, + relayClient: null, + relayPaired: false } // If the raw stream receives an error signal pre connect (ie from the firewall hook), make sure @@ -86,6 +101,12 @@ module.exports = function connect (dht, publicKey, opts = {}) { } function firewall (socket, port, host) { + // Check if the traffic originated from the socket on which we're expecting relay traffic. If so, + // we haven't hole punched yet and the other side is just sending us traffic through the relay. + if (c.relaySocket && c.relaySocket.rawStream && c.relaySocket.rawStream.socket === socket) { + return false + } + if (c.onsocket) { c.onsocket(socket, port, host) } else { @@ -156,7 +177,9 @@ async function holepunch (c, opts) { // TODO: check all addresses also obvs } - const onabort = () => destroyEncryptedSocket(c, HOLEPUNCH_ABORTED()) + const onabort = () => { + if (c.relayToken === null) destroyEncryptedSocket(c, HOLEPUNCH_ABORTED()) + } if (c.firewall === FIREWALL.OPEN) { c.passiveConnectTimeout = setTimeout(onabort, 10000) @@ -294,7 +317,10 @@ async function connectThroughNode (c, address, socket) { id: c.rawStream.id, seq: 0 }, - secretStream: {} + secretStream: {}, + relayThrough: c.relayThrough + ? { publicKey: c.relayThrough, token: c.relayToken } + : null }) if (isDone(c)) return } @@ -336,12 +362,17 @@ async function connectThroughNode (c, address, socket) { c.onsocket = function (socket, port, host) { if (c.rawStream === null) return // Already hole punched - const rawStream = c.dht._debugStream !== null - ? new DebuggingStream(c.rawStream, c.dht._debugStream) - : c.rawStream + if (c.rawStream.connected) { + c.rawStream.changeRemote(socket, c.connect.payload.udx.id, port, host) + } else { + c.rawStream.connect(socket, c.connect.payload.udx.id, port, host) + + const rawStream = c.dht._debugStream !== null + ? new DebuggingStream(c.rawStream, c.dht._debugStream) + : c.rawStream - c.rawStream.connect(socket, c.connect.payload.udx.id, port, host) - c.encryptedSocket.start(rawStream, { handshake: hs }) + c.encryptedSocket.start(rawStream, { handshake: hs }) + } if (c.reusableSocket && payload.udx.reusableSocket) { c.dht._socketPool.routes.add(c.remotePublicKey, c.rawStream) @@ -359,6 +390,10 @@ async function connectThroughNode (c, address, socket) { c.rawStream = null } + if (payload.relayThrough || c.relayThrough) { + relayConnection(c, c.relayThrough, payload, hs) + } + if (c.serverSocket) { c.onsocket(c.serverSocket, c.serverAddress.port, c.serverAddress.host) return @@ -387,7 +422,8 @@ async function updateHolepunch (c, peerAddress, relayAddr, payload) { const { error, firewall, punching, addresses, remoteToken } = remotePayload if (error !== ERROR.NONE) { - throw REMOTE_ABORTED('Remote aborted with error code ' + error) + // Don't throw if we're being relayed + if (!c.relayToken) throw REMOTE_ABORTED('Remote aborted with error code ' + error) } const echoed = !!(remoteToken && payload.token && b4a.equals(remoteToken, payload.token)) @@ -485,11 +521,13 @@ async function roundPunch (c, serverAddress, remoteToken, clientRelay) { }) if (!c.puncher.remoteHolepunching) { - throw REMOTE_NOT_HOLEPUNCHING() + // Don't throw if we're being relayed + if (!c.relayToken) throw REMOTE_NOT_HOLEPUNCHING() } if (!await c.puncher.punch()) { - throw REMOTE_NOT_HOLEPUNCHABLE() + // Don't throw if we're being relayed + if (!c.relayToken) throw REMOTE_NOT_HOLEPUNCHABLE() } } @@ -512,7 +550,54 @@ async function abort (c, { peerAddress, relayAddress }, err) { remoteToken: null }) } catch {} - destroyEncryptedSocket(c, err) + + if (c.relayToken === null) destroyEncryptedSocket(c, err) +} + +function relayConnection (c, relayThrough, payload, hs) { + if (c.passiveConnectTimeout) clearPassiveConnectTimeout(c) + + let isInitiator + let publicKey + let token + + if (payload.relayThrough) { + isInitiator = false + publicKey = payload.relayThrough.publicKey + token = payload.relayThrough.token + } else { + isInitiator = true + publicKey = relayThrough + token = c.relayToken + } + + c.relayToken = token + c.relaySocket = c.dht.connect(publicKey) + c.relayClient = relay.Client.from(c.relaySocket, { id: c.relaySocket.publicKey }) + + c.relayClient + .pair(isInitiator, token, c.rawStream) + .on('error', () => c.relaySocket.destroy()) + .on('data', (remoteId) => { + if (c.rawStream === null) return + c.relayPaired = true + + const { + remotePort, + remoteHost, + socket + } = c.relaySocket.rawStream + + c.rawStream + .on('close', () => c.relaySocket.destroy()) + .connect(socket, remoteId, remotePort, remoteHost) + + const rawStream = c.dht._debugStream !== null + ? new DebuggingStream(c.rawStream, c.dht._debugStream) + : c.rawStream + + c.encryptedSocket.start(rawStream, { handshake: hs }) + }) } function clearPassiveConnectTimeout (c) { diff --git a/lib/connection-pool.js b/lib/connection-pool.js new file mode 100644 index 00000000..46404e30 --- /dev/null +++ b/lib/connection-pool.js @@ -0,0 +1,146 @@ +const EventEmitter = require('events') +const b4a = require('b4a') +const errors = require('./errors') + +module.exports = class ConnectionPool extends EventEmitter { + constructor (dht) { + super() + + this._dht = dht + this._servers = new Map() + this._connecting = new Map() + this._connections = new Map() + } + + _attachServer (server) { + const keyString = b4a.toString(server.publicKey, 'hex') + + this._servers.set(keyString, server) + + server + .on('close', () => { + this._servers.delete(keyString) + }) + .on('connection', (socket) => { + this._attachStream(socket, true) + }) + } + + _attachStream (stream, opened) { + const existing = this.get(stream.remotePublicKey) + + if (existing) { + const keepNew = stream.isInitiator === existing.isInitiator || b4a.compare(stream.publicKey, stream.remotePublicKey) > 0 + + if (keepNew) { + let closed = false + + const onclose = () => { + closed = true + } + + existing + .on('error', noop) + .on('close', () => { + if (closed) return + + stream + .off('error', noop) + .off('close', onclose) + + this._attachStream(stream, opened) + }) + .destroy(errors.DUPLICATE_CONNECTION()) + + stream + .on('error', noop) + .on('close', onclose) + } else { + stream + .on('error', noop) + .destroy(errors.DUPLICATE_CONNECTION()) + } + + return + } + + const session = new ConnectionRef(this, stream) + + const keyString = b4a.toString(stream.remotePublicKey, 'hex') + + if (opened) { + this._connections.set(keyString, session) + + stream.on('close', () => { + this._connections.delete(keyString) + }) + + this.emit('connection', stream, session) + } else { + this._connecting.set(keyString, session) + + stream + .on('error', noop) + .on('close', () => { + if (opened) this._connections.delete(keyString) + else this._connecting.delete(keyString) + }) + .on('open', () => { + opened = true + + this._connecting.delete(keyString) + this._connections.set(keyString, session) + + stream.off('error', noop) + + this.emit('connection', stream, session) + }) + } + + return session + } + + get connecting () { + return this._connecting.size + } + + get connections () { + return this._connections.values() + } + + has (publicKey) { + const keyString = b4a.toString(publicKey, 'hex') + + return this._connections.has(keyString) || this._connecting.has(keyString) + } + + get (publicKey) { + const keyString = b4a.toString(publicKey, 'hex') + + const existing = this._connections.get(keyString) || this._connecting.get(keyString) + + return existing?._stream || null + } +} + +class ConnectionRef { + constructor (pool, stream) { + this._pool = pool + this._stream = stream + this._refs = 0 + } + + active () { + this._refs++ + } + + inactive () { + this._refs-- + } + + release () { + this._stream.destroy() + } +} + +function noop () {} diff --git a/lib/errors.js b/lib/errors.js index 8a96c549..29260c9a 100644 --- a/lib/errors.js +++ b/lib/errors.js @@ -87,4 +87,8 @@ module.exports = class DHTError extends Error { static SERVER_ERROR (msg = 'Server returned an error') { return new DHTError(msg, 'SERVER_ERROR', DHTError.SERVER_ERROR) } + + static DUPLICATE_CONNECTION (msg = 'Duplicate connection') { + return new DHTError(msg, 'DUPLICATE_CONNECTION', DHTError.DUPLICATE_CONNECTION) + } } diff --git a/lib/messages.js b/lib/messages.js index 9760e660..86d83ee4 100644 --- a/lib/messages.js +++ b/lib/messages.js @@ -128,6 +128,31 @@ const secretStreamInfo = { } } +const relayThroughInfo = { + preencode (state, m) { + c.uint.preencode(state, 1) // version + c.uint.preencode(state, 0) // flags + c.fixed32.preencode(state, m.publicKey) + c.fixed32.preencode(state, m.token) + }, + encode (state, m) { + c.uint.encode(state, 1) + c.uint.encode(state, 0) + c.fixed32.encode(state, m.publicKey) + c.fixed32.encode(state, m.token) + }, + decode (state) { + const version = c.uint.decode(state) + c.uint.decode(state) + + return { + version, + publicKey: c.fixed32.decode(state), + token: c.fixed32.decode(state) + } + } +} + exports.noisePayload = { preencode (state, m) { state.end += 4 // version + flags + error + firewall @@ -136,6 +161,7 @@ exports.noisePayload = { if (m.addresses6 && m.addresses6.length) ipv6Array.preencode(state, m.addresses6) if (m.udx) udxInfo.preencode(state, m.udx) if (m.secretStream) secretStreamInfo.preencode(state, m.secretStream) + if (m.relayThrough) relayThroughInfo.preencode(state, m.relayThrough) }, encode (state, m) { let flags = 0 @@ -145,6 +171,7 @@ exports.noisePayload = { if (m.addresses6 && m.addresses6.length) flags |= 4 if (m.udx) flags |= 8 if (m.secretStream) flags |= 16 + if (m.relayThrough) flags |= 32 c.uint.encode(state, 1) // version c.uint.encode(state, flags) @@ -156,6 +183,7 @@ exports.noisePayload = { if (m.addresses6 && m.addresses6.length) ipv6Array.encode(state, m.addresses6) if (m.udx) udxInfo.encode(state, m.udx) if (m.secretStream) secretStreamInfo.encode(state, m.secretStream) + if (m.relayThrough) relayThroughInfo.encode(state, m.relayThrough) }, decode (state) { const version = c.uint.decode(state) @@ -171,7 +199,8 @@ exports.noisePayload = { addresses4: [], addresses6: [], udx: null, - secretStream: null + secretStream: null, + relayThrough: null } } @@ -185,7 +214,8 @@ exports.noisePayload = { addresses4: (flags & 2) !== 0 ? ipv4Array.decode(state) : [], addresses6: (flags & 4) !== 0 ? ipv6Array.decode(state) : [], udx: (flags & 8) !== 0 ? udxInfo.decode(state) : null, - secretStream: (flags & 16) !== 0 ? secretStreamInfo.decode(state) : null + secretStream: (flags & 16) !== 0 ? secretStreamInfo.decode(state) : null, + relayThrough: (flags & 32) !== 0 ? relayThroughInfo.decode(state) : null } } } diff --git a/lib/server.js b/lib/server.js index 9f13a2e4..4b616b53 100644 --- a/lib/server.js +++ b/lib/server.js @@ -2,6 +2,7 @@ const { EventEmitter } = require('events') const safetyCatch = require('safety-catch') const NoiseSecretStream = require('@hyperswarm/secret-stream') const b4a = require('b4a') +const relay = require('blind-relay') const NoiseWrap = require('./noise-wrap') const Announcer = require('./announcer') const { FIREWALL, ERROR } = require('./constants') @@ -26,6 +27,8 @@ module.exports = class Server extends EventEmitter { this.closed = false this.firewall = opts.firewall || (() => false) this.holepunch = opts.holepunch || (() => true) + this.relayThrough = opts.relayThrough || null + this.pool = opts.pool || null this.createHandshake = opts.createHandshake || defaultCreateHandshake this.createSecretStream = opts.createSecretStream || defaultCreateSecretStream @@ -119,6 +122,8 @@ module.exports = class Server extends EventEmitter { this.dht.listening.add(this) this.emit('listening') + if (this.pool) this.pool._attachServer(this) + return this } @@ -136,10 +141,17 @@ module.exports = class Server extends EventEmitter { puncher: null, payload: null, rawStream: null, + encryptedSocket: null, prepunching: null, firewalled: true, clearing: null, - onsocket: null + onsocket: null, + + // Relay state + relayToken: null, + relaySocket: null, + relayClient: null, + relayPaired: false } this._holepunches[id] = hs @@ -180,9 +192,15 @@ module.exports = class Server extends EventEmitter { if (ourLocalAddrs) addresses.push(...ourLocalAddrs) if (error === ERROR.NONE) { - hs.rawStream = this.dht._rawStreams.add({ + hs.rawStream = this.dht.createRawStream({ framed: true, firewall (socket, port, host) { + // Check if the traffic originated from the socket on which we're expecting relay traffic. If so, + // we haven't hole punched yet and the other side is just sending us traffic through the relay. + if (hs.relaySocket && hs.relaySocket.rawStream && hs.relaySocket.rawStream.socket === socket) { + return false + } + hs.onsocket(socket, port, host) return false } @@ -200,18 +218,25 @@ module.exports = class Server extends EventEmitter { hs.prepunching = null } - const rawStream = this.dht._debugStream !== null - ? new DebuggingStream(hs.rawStream, this.dht._debugStream) - : hs.rawStream - if (this._reusableSocket && remotePayload.udx.reusableSocket) { this.dht._socketPool.routes.add(handshake.remotePublicKey, hs.rawStream) } hs.rawStream.removeListener('error', autoDestroy) - hs.rawStream.connect(socket, remotePayload.udx.id, port, host) - this.onconnection(this.createSecretStream(false, rawStream, { handshake: h })) + if (hs.rawStream.connected) { + hs.rawStream.changeRemote(socket, remotePayload.udx.id, port, host) + } else { + hs.rawStream.connect(socket, remotePayload.udx.id, port, host) + + const rawStream = this.dht._debugStream !== null + ? new DebuggingStream(hs.rawStream, this.dht._debugStream) + : hs.rawStream + + hs.encryptedSocket = this.createSecretStream(false, rawStream, { handshake: h }) + + this.onconnection(hs.encryptedSocket) + } if (hs.puncher) { hs.puncher.onabort = noop @@ -226,6 +251,10 @@ module.exports = class Server extends EventEmitter { } } + const relayThrough = typeof this.relayThrough === 'function' ? this.relayThrough() : this.relayThrough + + if (relayThrough) hs.relayToken = relay.token() + try { hs.reply = await handshake.send({ error, @@ -238,7 +267,10 @@ module.exports = class Server extends EventEmitter { id: hs.rawStream ? hs.rawStream.id : 0, seq: 0 }, - secretStream: {} + secretStream: {}, + relayThrough: relayThrough + ? { publicKey: relayThrough, token: hs.relayToken } + : null }) } catch (err) { safetyCatch(err) @@ -259,6 +291,10 @@ module.exports = class Server extends EventEmitter { return hs } + if (relayThrough || remotePayload.relayThrough) { + this._relayConnection(hs, relayThrough, remotePayload, h) + } + if (remotePayload.firewall === FIREWALL.OPEN || direct) { const sock = direct ? socket : this.dht.socket hs.onsocket(sock, clientAddress.port, clientAddress.host) @@ -283,7 +319,7 @@ module.exports = class Server extends EventEmitter { const onabort = () => { if (hs.prepunching) clearTimeout(hs.prepunching) hs.prepunching = null - hs.rawStream.destroy() + if (hs.relayToken === null) hs.rawStream.destroy() this._clearLater(hs, id, k) } @@ -450,6 +486,52 @@ module.exports = class Server extends EventEmitter { return { socket: this.dht.socket, payload } } + + _relayConnection (hs, relayThrough, remotePayload, h) { + let isInitiator + let publicKey + let token + + if (relayThrough) { + isInitiator = true + publicKey = relayThrough + token = hs.relayToken + } else { + isInitiator = false + publicKey = remotePayload.relayThrough.publicKey + token = remotePayload.relayThrough.token + } + + hs.relayToken = token + hs.relaySocket = this.dht.connect(publicKey) + hs.relayClient = relay.Client.from(hs.relaySocket, { id: hs.relaySocket.publicKey }) + + hs.relayClient + .pair(isInitiator, token, hs.rawStream) + .on('error', () => hs.relaySocket.destroy()) + .on('data', (remoteId) => { + if (hs.rawStream === null) return + hs.relayPaired = true + + const { + remotePort, + remoteHost, + socket + } = hs.relaySocket.rawStream + + hs.rawStream + .on('close', () => hs.relaySocket.destroy()) + .connect(socket, remoteId, remotePort, remoteHost) + + const rawStream = this.dht._debugStream !== null + ? new DebuggingStream(hs.rawStream, this.dht._debugStream) + : hs.rawStream + + hs.encryptedSocket = this.createSecretStream(false, rawStream, { handshake: h }) + + this.onconnection(hs.encryptedSocket) + }) + } } function isConsistent (fw) { diff --git a/package.json b/package.json index 041b58f5..0f2424c3 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,7 @@ "dependencies": { "@hyperswarm/secret-stream": "^6.0.0", "b4a": "^1.3.1", + "blind-relay": "^1.3.0", "bogon": "^1.0.0", "compact-encoding": "^2.4.1", "compact-encoding-net": "^1.0.1", @@ -34,7 +35,7 @@ "devDependencies": { "brittle": "^3.0.0", "keypear": "^1.1.1", - "standard": "^16.0.4" + "standard": "^17.1.0" }, "scripts": { "test": "standard && node test/all.js", diff --git a/test/all.js b/test/all.js index e3401bc4..6cdb11a2 100644 --- a/test/all.js +++ b/test/all.js @@ -9,10 +9,13 @@ async function runTests () { await import('./announces.js') await import('./connections.js') + await import('./holepuncher.js') await import('./keychain.js') await import('./messages.js') await import('./nat.js') await import('./noncustodial.js') + await import('./pool.js') + await import('./relaying.js') await import('./storing.js') test.resume() diff --git a/test/messages.js b/test/messages.js index 3019e42f..3d09e349 100644 --- a/test/messages.js +++ b/test/messages.js @@ -13,7 +13,8 @@ test('basic noise payload', function (t) { addresses4: [], addresses6: [], udx: null, - secretStream: null + secretStream: null, + relayThrough: null } m.noisePayload.preencode(state, c) @@ -54,7 +55,8 @@ test('noise payload with holepunch and addresses', function (t) { }], addresses6: [], udx: null, - secretStream: null + secretStream: null, + relayThrough: null } m.noisePayload.preencode(state, c) @@ -84,7 +86,8 @@ test('noise payload only addresses', function (t) { }], addresses6: [], udx: null, - secretStream: null + secretStream: null, + relayThrough: null } m.noisePayload.preencode(state, c) @@ -114,7 +117,8 @@ test('noise payload ipv6', function (t) { port: 42420 }], udx: null, - secretStream: null + secretStream: null, + relayThrough: null } m.noisePayload.preencode(state, c) @@ -145,7 +149,8 @@ test('noise payload newer version', function (t) { addresses4: [], addresses6: [], udx: null, - secretStream: null + secretStream: null, + relayThrough: null }) }) diff --git a/test/pool.js b/test/pool.js new file mode 100644 index 00000000..bcc8b1c2 --- /dev/null +++ b/test/pool.js @@ -0,0 +1,107 @@ +const test = require('brittle') +const { swarm } = require('./helpers') + +test('connection pool, client side', async function (t) { + const [a, b] = await swarm(t) + + const server = a.createServer((socket) => { + t.pass('connection on server') + socket.end() + }) + + await server.listen() + + const pool = b.pool() + pool.on('connection', () => t.pass('connection on pool')) + + const open = t.test('open') + open.plan(1) + + const socket = b.connect(server.publicKey, { pool }) + socket + .on('open', () => { + open.pass('stream opened') + }) + .end() + + t.is(socket, b.connect(server.publicKey, { pool })) + + await open + + await server.close() +}) + +test('connection pool, server side', async function (t) { + const [a, b] = await swarm(t) + + const pool = a.pool() + pool.on('connection', () => t.pass('connection on pool')) + + const server = a.createServer({ pool }, (socket) => { + t.pass('connection on server') + socket.end() + }) + + await server.listen() + + const open = t.test('open') + open.plan(2) + + { + const socket = b.connect(server.publicKey) + socket + .on('open', () => { + open.pass('1st stream opened') + }) + .on('error', () => { + open.pass('1st stream errored') + }) + .end() + } + { + const socket = b.connect(server.publicKey) + socket + .on('open', () => { + open.pass('2nd stream opened') + }) + .on('error', () => { + open.pass('2nd stream errored') + }) + .end() + } + + await open + + await server.close() +}) + +test('connection pool, client and server side', async function (t) { + const [a, b] = await swarm(t) + + const aPool = a.pool() + aPool.on('connection', () => t.pass('connection on pool a')) + + const bPool = b.pool() + bPool.on('connection', () => t.pass('connection on pool b')) + + const server = a.createServer({ pool: aPool }, (socket) => { + t.pass('connection on server') + socket.end() + }) + + await server.listen() + + const open = t.test('open') + open.plan(1) + + const socket = b.connect(server.publicKey, { pool: bPool }) + socket + .on('open', () => { + open.pass('stream opened') + }) + .end() + + await open + + await server.close() +}) diff --git a/test/relaying.js b/test/relaying.js new file mode 100644 index 00000000..76c50f0c --- /dev/null +++ b/test/relaying.js @@ -0,0 +1,555 @@ +const test = require('brittle') +const RelayServer = require('blind-relay').Server +const { swarm } = require('./helpers') +const DHT = require('../') + +test('relay connections through node, client side', async function (t) { + const { bootstrap } = await swarm(t) + + const a = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const b = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + + const lc = t.test('socket lifecycle') + lc.plan(5) + + const aServer = a.createServer({ shareLocalAddress: false }, function (socket) { + lc.pass('server socket opened') + socket + .on('data', (data) => { + lc.alike(data, Buffer.from('hello world')) + }) + .on('close', () => { + lc.pass('server socket closed') + }) + .end() + }) + + await aServer.listen() + + const relay = new RelayServer({ + createStream (opts) { + return b.createRawStream({ ...opts, framed: true }) + } + }) + + const bServer = b.createServer({ shareLocalAddress: false }, function (socket) { + const session = relay.accept(socket, { id: socket.remotePublicKey }) + session + .on('error', (err) => t.comment(err.message)) + }) + + await bServer.listen() + + const aSocket = c.connect(aServer.publicKey, { + localConnection: false, + relayThrough: bServer.publicKey + }) + + aSocket + .on('open', () => { + lc.pass('client socket opened') + }) + .on('close', () => { + lc.pass('client socket closed') + }) + .end('hello world') + + await lc + + await a.destroy() + await b.destroy() + await c.destroy() +}) + +test('relay connections through node, client side, client aborts hole punch', async function (t) { + const { bootstrap } = await swarm(t) + + const a = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const b = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + + const lc = t.test('socket lifecycle') + lc.plan(5) + + const aServer = a.createServer({ shareLocalAddress: false }, function (socket) { + lc.pass('server socket opened') + socket + .on('data', (data) => { + lc.alike(data, Buffer.from('hello world')) + }) + .on('close', () => { + lc.pass('server socket closed') + }) + .end() + }) + + await aServer.listen() + + const relay = new RelayServer({ + createStream (opts) { + return b.createRawStream({ ...opts, framed: true }) + } + }) + + const bServer = b.createServer({ shareLocalAddress: false }, function (socket) { + const session = relay.accept(socket, { id: socket.remotePublicKey }) + session + .on('error', (err) => t.comment(err.message)) + }) + + await bServer.listen() + + const aSocket = c.connect(aServer.publicKey, { + fastOpen: false, + localConnection: false, + holepunch: () => false, + relayThrough: bServer.publicKey + }) + + aSocket + .on('open', () => { + lc.pass('client socket opened') + }) + .on('close', () => { + lc.pass('client socket closed') + }) + .end('hello world') + + await lc + + await a.destroy() + await b.destroy() + await c.destroy() +}) + +test('relay connections through node, client side, server aborts hole punch', async function (t) { + const { bootstrap } = await swarm(t) + + const a = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const b = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + + const lc = t.test('socket lifecycle') + lc.plan(5) + + const aServer = a.createServer({ shareLocalAddress: false, holepunch: () => false }, function (socket) { + lc.pass('server socket opened') + socket + .on('data', (data) => { + lc.alike(data, Buffer.from('hello world')) + }) + .on('close', () => { + lc.pass('server socket closed') + }) + .end() + }) + + await aServer.listen() + + const relay = new RelayServer({ + createStream (opts) { + return b.createRawStream({ ...opts, framed: true }) + } + }) + + const bServer = b.createServer({ shareLocalAddress: false }, function (socket) { + const session = relay.accept(socket, { id: socket.remotePublicKey }) + session + .on('error', (err) => t.comment(err.message)) + }) + + await bServer.listen() + + const aSocket = c.connect(aServer.publicKey, { + fastOpen: false, + localConnection: false, + relayThrough: bServer.publicKey + }) + + aSocket + .on('open', () => { + lc.pass('client socket opened') + }) + .on('close', () => { + lc.pass('client socket closed') + }) + .end('hello world') + + await lc + + await a.destroy() + await b.destroy() + await c.destroy() +}) + +test('relay connections through node, server side', async function (t) { + const { bootstrap } = await swarm(t) + + const a = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const b = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + + const lc = t.test('socket lifecycle') + lc.plan(5) + + const relay = new RelayServer({ + createStream (opts) { + return a.createRawStream({ ...opts, framed: true }) + } + }) + + const aServer = a.createServer({ shareLocalAddress: false }, function (socket) { + const session = relay.accept(socket, { id: socket.remotePublicKey }) + session + .on('error', (err) => t.comment(err.message)) + }) + + await aServer.listen() + + const bServer = b.createServer({ + shareLocalAddress: false, + relayThrough: aServer.publicKey + }, function (socket) { + lc.pass('server socket opened') + socket + .on('data', (data) => { + lc.alike(data, Buffer.from('hello world')) + }) + .on('close', () => { + lc.pass('server socket closed') + }) + .end() + }) + + await bServer.listen() + + const bSocket = c.connect(bServer.publicKey, { + localConnection: false + }) + + bSocket + .on('open', () => { + lc.pass('client socket opened') + }) + .on('close', () => { + lc.pass('client socket closed') + }) + .end('hello world') + + await lc + + await a.destroy() + await b.destroy() + await c.destroy() +}) + +test('relay connections through node, server side, client aborts hole punch', async function (t) { + const { bootstrap } = await swarm(t) + + const a = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const b = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + + const lc = t.test('socket lifecycle') + lc.plan(5) + + const relay = new RelayServer({ + createStream (opts) { + return a.createRawStream({ ...opts, framed: true }) + } + }) + + const aServer = a.createServer({ shareLocalAddress: false }, function (socket) { + const session = relay.accept(socket, { id: socket.remotePublicKey }) + session + .on('error', (err) => t.comment(err.message)) + }) + + await aServer.listen() + + const bServer = b.createServer({ + shareLocalAddress: false, + relayThrough: aServer.publicKey + }, function (socket) { + lc.pass('server socket opened') + socket + .on('data', (data) => { + lc.alike(data, Buffer.from('hello world')) + }) + .on('close', () => { + lc.pass('server socket closed') + }) + .end() + }) + + await bServer.listen() + + const bSocket = c.connect(bServer.publicKey, { + fastOpen: false, + localConnection: false, + holepunch: () => false + }) + + bSocket + .on('open', () => { + lc.pass('client socket opened') + }) + .on('close', () => { + lc.pass('client socket closed') + }) + .end('hello world') + + await lc + + await a.destroy() + await b.destroy() + await c.destroy() +}) + +test('relay connections through node, server side, server aborts hole punch', async function (t) { + const { bootstrap } = await swarm(t) + + const a = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const b = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + + const lc = t.test('socket lifecycle') + lc.plan(5) + + const relay = new RelayServer({ + createStream (opts) { + return a.createRawStream({ ...opts, framed: true }) + } + }) + + const aServer = a.createServer({ shareLocalAddress: false }, function (socket) { + const session = relay.accept(socket, { id: socket.remotePublicKey }) + session + .on('error', (err) => t.comment(err.message)) + }) + + await aServer.listen() + + const bServer = b.createServer({ + shareLocalAddress: false, + holepunch: () => false, + relayThrough: aServer.publicKey + }, function (socket) { + lc.pass('server socket opened') + socket + .on('data', (data) => { + lc.alike(data, Buffer.from('hello world')) + }) + .on('close', () => { + lc.pass('server socket closed') + }) + .end() + }) + + await bServer.listen() + + const bSocket = c.connect(bServer.publicKey, { + fastOpen: false, + localConnection: false + }) + + bSocket + .on('open', () => { + lc.pass('client socket opened') + }) + .on('close', () => { + lc.pass('client socket closed') + }) + .end('hello world') + + await lc + + await a.destroy() + await b.destroy() + await c.destroy() +}) + +test('relay connections through node, client and server side', async function (t) { + const { bootstrap } = await swarm(t) + + const a = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const b = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + + const lc = t.test('socket lifecycle') + lc.plan(5) + + const relay = new RelayServer({ + createStream (opts) { + return a.createRawStream({ ...opts, framed: true }) + } + }) + + const aServer = a.createServer({ shareLocalAddress: false }, function (socket) { + const session = relay.accept(socket, { id: socket.remotePublicKey }) + session + .on('error', (err) => t.comment(err.message)) + }) + + await aServer.listen() + + const bServer = b.createServer({ + shareLocalAddress: false, + relayThrough: aServer.publicKey + }, function (socket) { + lc.pass('server socket opened') + socket + .on('data', (data) => { + lc.alike(data, Buffer.from('hello world')) + }) + .on('close', () => { + lc.pass('server socket closed') + }) + .end() + }) + + await bServer.listen() + + const bSocket = c.connect(bServer.publicKey, { + fastOpen: false, + localConnection: false, + relayThrough: aServer.publicKey + }) + + bSocket + .on('open', () => { + lc.pass('client socket opened') + }) + .on('close', () => { + lc.pass('client socket closed') + }) + .end('hello world') + + await lc + + await a.destroy() + await b.destroy() + await c.destroy() +}) + +test.skip('relay several connections through node with pool', async function (t) { + const { bootstrap } = await swarm(t) + + const a = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const b = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + + const lc = t.test('socket lifecycle') + lc.plan(10) + + const aServer = a.createServer({ shareLocalAddress: false }, function (socket) { + lc.pass('server socket opened') + socket + .on('data', (data) => { + lc.alike(data, Buffer.from('hello world')) + }) + .on('close', () => { + lc.pass('server socket closed') + }) + .end() + }) + + await aServer.listen() + + const relay = new RelayServer({ + createStream (opts) { + return b.createRawStream({ ...opts, framed: true }) + } + }) + + const bServer = b.createServer({ shareLocalAddress: false }, function (socket) { + const session = relay.accept(socket, { id: socket.remotePublicKey }) + session + .on('error', (err) => t.comment(err.message)) + }) + + await bServer.listen() + + const pool = c.pool() + + const aSocket = c.connect(aServer.publicKey, { + fastOpen: false, + localConnection: false, + relayThrough: bServer.publicKey, + pool + }) + + aSocket + .on('open', () => { + lc.pass('1st client socket opened') + }) + .on('close', () => { + lc.pass('1st client socket closed') + + const aSocket = c.connect(aServer.publicKey, { + fastOpen: false, + localConnection: false, + relayThrough: bServer.publicKey, + pool + }) + + aSocket + .on('open', () => { + lc.pass('2nd client socket opened') + }) + .on('close', () => { + lc.pass('2nd client socket closed') + }) + .end('hello world') + }) + .end('hello world') + + await lc + + await a.destroy() + await b.destroy() + await c.destroy() +}) + +test.skip('server does not support connection relaying', async function (t) { + const { bootstrap } = await swarm(t) + + const a = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const b = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true }) + + const lc = t.test('socket lifecycle') + lc.plan(4) + + const aServer = a.createServer({ shareLocalAddress: false }, function () { + t.fail() + }) + + await aServer.listen() + + const bServer = b.createServer({ shareLocalAddress: false }, function (socket) { + lc.pass('server socket opened') + socket.on('error', () => { + lc.pass('server socket timed out') + }) + }) + + await bServer.listen() + + const aSocket = c.connect(aServer.publicKey, { + fastOpen: false, + localConnection: false, + relayThrough: bServer.publicKey + }) + + aSocket.on('error', () => { + lc.pass('client socket timed out') + }) + + await lc + + await a.destroy() + await b.destroy() + await c.destroy() +})