diff --git a/examples/connection-relaying/client.js b/examples/connection-relaying/client.js index 448fbfa5..d7b40725 100644 --- a/examples/connection-relaying/client.js +++ b/examples/connection-relaying/client.js @@ -5,7 +5,6 @@ const [relay, server] = process.argv.slice(2) const dht = new DHT() const socket = dht.connect(Buffer.from(server, 'hex'), { - localConnection: false, relayThrough: Buffer.from(relay, 'hex') }) diff --git a/lib/connect.js b/lib/connect.js index fb7fb68d..c9b8daf7 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -80,7 +80,8 @@ module.exports = function connect (dht, publicKey, opts = {}) { relayToken: relayThrough ? relay.token() : null, relaySocket: null, relayClient: null, - relayPaired: false + relayPaired: false, + relayKeepAlive: opts.relayKeepAlive || 5000 } // If the raw stream receives an error signal pre connect (ie from the firewall hook), make sure @@ -691,7 +692,7 @@ function relayConnection (c, relayThrough, payload, hs) { c.relayToken = token c.relaySocket = c.dht.connect(publicKey) - c.relaySocket.setKeepAlive(5000) + c.relaySocket.setKeepAlive(c.relayKeepAlive) c.relayClient = relay.Client.from(c.relaySocket, { id: c.relaySocket.publicKey }) c.relayTimeout = setTimeout(onabort, 15000, null) diff --git a/lib/server.js b/lib/server.js index 28395c5c..5c3ecc06 100644 --- a/lib/server.js +++ b/lib/server.js @@ -29,6 +29,7 @@ module.exports = class Server extends EventEmitter { this.firewall = opts.firewall || (() => false) this.holepunch = opts.holepunch || (() => true) this.relayThrough = opts.relayThrough || null + this.relayKeepAlive = opts.relayKeepAlive || 5000 this.pool = opts.pool || null this.createHandshake = opts.createHandshake || defaultCreateHandshake this.createSecretStream = opts.createSecretStream || defaultCreateSecretStream @@ -613,7 +614,7 @@ module.exports = class Server extends EventEmitter { hs.relayToken = token hs.relaySocket = this.dht.connect(publicKey) - hs.relaySocket.setKeepAlive(5000) + hs.relaySocket.setKeepAlive(this.relayKeepAlive) hs.relayClient = relay.Client.from(hs.relaySocket, { id: hs.relaySocket.publicKey }) hs.relayTimeout = setTimeout(onabort, 15000) diff --git a/package.json b/package.json index a2c75efb..e6a7c573 100644 --- a/package.json +++ b/package.json @@ -44,6 +44,7 @@ }, "devDependencies": { "brittle": "^3.0.0", + "graceful-goodbye": "^1.3.0", "newline-decoder": "^1.0.2", "standard": "^17.1.0" }, diff --git a/test/helpers/index.js b/test/helpers/index.js index 4ccf66c3..96767fbc 100644 --- a/test/helpers/index.js +++ b/test/helpers/index.js @@ -1,6 +1,7 @@ const createTestnet = require('../../testnet') const NewlineDecoder = require('newline-decoder') const { spawn } = require('child_process') +const goodbye = require('graceful-goodbye') module.exports = { swarm, toArray, spawnFixture } @@ -17,11 +18,14 @@ async function swarm (t, n = 32, bootstrap = []) { async function * spawnFixture (t, args) { const proc = spawn(process.execPath, args) const nl = new NewlineDecoder() + const kill = () => setTimeout(() => proc.kill('SIGKILL'), 1000) + const unregisterExitHandlers = goodbye(() => proc.kill('SIGKILL')) proc.stderr.on('data', err => t.fail(err)) - const kill = () => setTimeout(() => proc.kill('SIGKILL'), 1000) for await (const data of proc.stdout) { for (const line of nl.push(data)) yield [kill, line] } + + unregisterExitHandlers() } diff --git a/test/integration/fixtures/server-through-relay.js b/test/integration/fixtures/server-through-relay.js new file mode 100644 index 00000000..dba0360d --- /dev/null +++ b/test/integration/fixtures/server-through-relay.js @@ -0,0 +1,38 @@ +const DHT = require('../../../') +const b4a = require('b4a') + +const publicKey = b4a.from(process.argv[2], 'hex') +const secretKey = b4a.from(process.argv[3], 'hex') +const relayServer = b4a.from(process.argv[4], 'hex') +const socketKeepAlive = Number(process.argv[5] || 5000) +const relayKeepAlive = Number(process.argv[6] || 5000) +const keyPair = { publicKey, secretKey } + +main() + +async function main () { + const node = new DHT() + const server = node.createServer({ + holepunch: false, // To ensure it relies only on relaying + shareLocalAddress: false, // To help ensure it relies only on relaying (otherwise it can connect directly over LAN, without even trying to holepunch) + relayKeepAlive, + relayThrough: relayServer + }, socket => { + socket.setKeepAlive(socketKeepAlive) + socket + .on('data', data => { + console.log(`socket_ondata ${b4a.toString(data)}`) + socket.write('world') + }) + .on('open', () => console.log(`socket_onopen ${socket.rawStream.remoteHost}:${socket.rawStream.remotePort}`)) + .on('close', () => console.log('socket_onclose')) + .on('error', err => console.log(`socket_onerror ${err.code}`)) + }) + server.on('open', () => console.log('server_onopen')) + server.on('error', err => console.log(`server_onerror ${err.code}`)) + server.on('close', () => console.log('server_onclose')) + console.log('prelistening') + await server.listen(keyPair) + console.log('postlistening') + console.log('started') +} diff --git a/test/integration/keep-alive-with-relay.js b/test/integration/keep-alive-with-relay.js new file mode 100644 index 00000000..db7e24af --- /dev/null +++ b/test/integration/keep-alive-with-relay.js @@ -0,0 +1,141 @@ +const test = require('brittle') +const { spawnFixture } = require('../helpers') +const { Server: RelayServer } = require('blind-relay') +const DHT = require('../../') +const path = require('path') +const b4a = require('b4a') + +/* + When a peer connects to a relay server, there's two sockets established. One for data (between two peers) + and another "control" socket. + The relay server has a RELAY_KEEPALIVE that looks at the control socket to detect when a peer is no longer there. + The client and server both have a SOCKET_KEEPALIVE which detects when the other peer is no longer there. + + A bug that occured with udx-native < 1.8.9 is that the relay server had detected the control socket had bad been destroyed, + but didn't fully clean up internally. That meant that this information was never fully relayed to the other peer. +*/ +test('When Server is killed, Client should detect this - through relay', async t => { + t.plan(3) + + const relayTest = t.test('relay') + relayTest.plan(3) + const clientTest = t.test('client') + clientTest.plan(3) + const serverTest = t.test('server') + serverTest.plan(3) + + const RELAY_KEEPALIVE = 500 + const SOCKET_KEEPALIVE = 10 * RELAY_KEEPALIVE + + const clientKeyPair = DHT.keyPair() + const clientNode = new DHT() + const relayNode = new DHT() + const relayKeyPair = DHT.keyPair() + const serverKeyPair = DHT.keyPair() + const serverPublicKey = b4a.toString(serverKeyPair.publicKey, 'hex') + const serverSecretKey = b4a.toString(serverKeyPair.secretKey, 'hex') + let hasClientDetectedThatServerDied = false + let didClientNotDetectThatServerDiedTimer + let didClientNotDetectThatServerDiedTimerFired = false + + t.teardown(async () => { + await clientNode.destroy() + await relayNode.destroy() + }) + + await startRelayServer() + await startServer() + + async function startRelayServer () { + const relay = new RelayServer({ + createStream (opts) { + return relayNode.createRawStream({ ...opts, framed: true }) + } + }) + + const relayServer = relayNode.createServer(socket => { + relayTest.pass('Socket connected') + socket.setKeepAlive(RELAY_KEEPALIVE) + + socket.on('error', err => { + // when error is ETIMEDOUT it's the server connection that has broken + // not so long after that, the client should have detected that the connection is gone + if (err.code === 'ETIMEDOUT') { + relayTest.pass('Relay server detected that server has died. Waiting for client to detect') + + // In some cases, the client may have detected that the server has died before the relay server + if (hasClientDetectedThatServerDied) return + + const timeToDetectClientHasDied = SOCKET_KEEPALIVE + didClientNotDetectThatServerDiedTimer = setTimeout(() => { + didClientNotDetectThatServerDiedTimerFired = true + clientTest.fail('Client did not detect that the server has died') + }, timeToDetectClientHasDied) + } + }) + + const session = relay.accept(socket, { id: socket.remotePublicKey }) + session.on('error', () => { }) + }) + + await relayServer.listen(relayKeyPair) + } + + async function startServer () { + const args = [ + path.join(__dirname, 'fixtures/server-through-relay.js'), + serverPublicKey, + serverSecretKey, + b4a.toString(relayKeyPair.publicKey, 'hex'), + SOCKET_KEEPALIVE, + RELAY_KEEPALIVE + ] + + for await (const [kill, data] of spawnFixture(serverTest, args)) { + if (data === 'started') { + serverTest.pass('Started. Now starting new client') + startClient() + } + if (data === 'socket_ondata hello') { + serverTest.pass('Received "hello" from client. Sending "world" back, then wait 1 second and kill server') + setTimeout(kill, 1000) + } + } + + serverTest.pass('Server process killed. Waiting for relay server to detect') + } + + function startClient () { + const client = clientNode.connect(serverKeyPair.publicKey, { + keyPair: clientKeyPair, // To ensure same client keyPair on each connection + relayKeepAlive: RELAY_KEEPALIVE, + relayThrough: relayKeyPair.publicKey + }) + client.setKeepAlive(SOCKET_KEEPALIVE) + // If the client does not receive a ETIMEDOUT it's because it has not detected + // that the server has been killed. Important to note that all of this is through + // the relay server. Essentially it's whether or not the relay server + // detected that the server had been killed and relayed that information downstream + // to the client + client + .on('error', () => { }) + .on('open', () => { + clientTest.pass('Socket opened. Now sending "hello"') + client.write('hello') + }) + .on('data', data => { + data = b4a.toString(data) + + if (data === 'world') { + clientTest.pass('Received "world" from server') + } + }) + .on('close', () => { + if (didClientNotDetectThatServerDiedTimerFired) return // If this has fired, then it's too late + + hasClientDetectedThatServerDied = true + clearTimeout(didClientNotDetectThatServerDiedTimer) + clientTest.pass('Client correctly detected that server had died') + }) + } +}) diff --git a/test/integration/keep-alive.js b/test/integration/keep-alive.js index 0cd42b49..f1144295 100644 --- a/test/integration/keep-alive.js +++ b/test/integration/keep-alive.js @@ -2,6 +2,7 @@ const test = require('brittle') const DHT = require('../../') const path = require('path') const { swarm, spawnFixture } = require('../helpers') +const b4a = require('b4a') // Server is run in a separate proces to make sure that we can forcefully close it. // If the server called `socket.destroy()` that would send an unacked packet back @@ -16,8 +17,8 @@ test('Client use keepalive to detect disconnect - separated by processes', async const { bootstrap } = await swarm(t) const node = new DHT({ bootstrap }) const keyPair = DHT.keyPair() - const publicKey = keyPair.publicKey.toString('hex') - const secretKey = keyPair.secretKey.toString('hex') + const publicKey = b4a.toString(keyPair.publicKey, 'hex') + const secretKey = b4a.toString(keyPair.secretKey, 'hex') clientTest.plan(3) @@ -63,8 +64,8 @@ test('Client not using keepalive does not detect disconnect - separated by proce const { bootstrap } = await swarm(t) const node = new DHT({ bootstrap }) const keyPair = DHT.keyPair() - const publicKey = keyPair.publicKey.toString('hex') - const secretKey = keyPair.secretKey.toString('hex') + const publicKey = b4a.toString(keyPair.publicKey, 'hex') + const secretKey = b4a.toString(keyPair.secretKey, 'hex') let timedout = false clientTest.plan(2)