Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
billiegoose committed Jul 8, 2024
1 parent 9da9a02 commit 6d9ce7f
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 45 deletions.
19 changes: 2 additions & 17 deletions lib/connect.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
/** @import HyperDHT from '../index' */
/** @import UDXStream from 'udx-native/lib/stream' */
const NoiseSecretStream = require('@hyperswarm/secret-stream')
const b4a = require('b4a')
const relay = require('blind-relay')
Expand Down Expand Up @@ -32,12 +30,6 @@ const {
SUSPENDED
} = require('./errors')

/**
* @param {HyperDHT} dht
* @param {*} publicKey
* @param {*} opts
* @returns
*/
module.exports = function connect (dht, publicKey, opts = {}) {
dht.tracer.trace('connect', { publicKey, options: opts })

Expand Down Expand Up @@ -425,7 +417,6 @@ async function connectThroughNode (c, address, socket) {
if (c.rawStream === null) return // Already hole punched

if (c.rawStream.connected) {
// TODO: ask @mafintosh if this "un-relaying" logic is correct
c.rawStream.once('remote-changed', () => {
c.dht.tracer.trace('remote-changed', { relay: c.encryptedSocket.relay })
c.encryptedSocket.relay = {
Expand All @@ -434,9 +425,7 @@ async function connectThroughNode (c, address, socket) {
relayThrough: null,
remotePublicKey: null
}
// Note - this is really an "un-relay" event but subscribers should know
// to look at the .relay.relaying property
this.encryptedSocket.emit('relay', c.encryptedSocket.relay)
this.encryptedSocket.emit('unrelay', c.encryptedSocket.relay)
})
const remoteChanging = c.rawStream.changeRemote(socket, c.connect.payload.udx.id, port, host)

Expand Down Expand Up @@ -717,9 +706,6 @@ function relayConnection (c, relayThrough, payload, hs) {
relayThrough: publicKey,
remotePublicKey: c.remotePublicKey
}
c.encryptedSocket.emit('relay', c.encryptedSocket.relay)
c.dht.tracer.trace('relay-connect', { relayThrough: publicKey, remotePublicKey: c.remotePublicKey })

c.relayToken = token
c.relaySocket = c.dht.connect(publicKey)
c.relaySocket.setKeepAlive(c.relayKeepAlive)
Expand All @@ -732,7 +718,6 @@ function relayConnection (c, relayThrough, payload, hs) {
.on('data', ondata)

function ondata (remoteId) {
c.dht.tracer.trace('relay-connected', { relayThrough: publicKey, remotePublicKey: c.remotePublicKey })
if (c.relayTimeout) clearRelayTimeout(c)
if (c.rawStream === null) {
onabort(null)
Expand Down Expand Up @@ -772,6 +757,7 @@ function relayConnection (c, relayThrough, payload, hs) {
relayThrough: null,
remotePublicKey: null
}
c.encryptedSocket.emit('relay-aborted', c.encryptedSocket.relay)
if (socket) socket.destroy()
maybeDestroyEncryptedSocket(c, err || RELAY_ABORTED())
}
Expand All @@ -788,7 +774,6 @@ function clearRelayTimeout (c) {
}

function destroyPuncher (c) {
c.dht.tracer.trace('destroy-puncher')
if (c.puncher) c.puncher.destroy()
c.session.destroy()
}
Expand Down
4 changes: 0 additions & 4 deletions lib/raw-stream-set.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
/** @import HyperDHT from '../index' */

module.exports = class RawStreamSet {
/** @param {HyperDHT} dht */
constructor (dht) {
/** @type {HyperDHT} */
this._dht = dht

this._prefix = 16 - 1 // 16 is the default stream-set side in udx
Expand Down
26 changes: 10 additions & 16 deletions lib/server.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/** @import HyperDHT from '../index' */
const { EventEmitter } = require('events')
const safetyCatch = require('safety-catch')
const NoiseSecretStream = require('@hyperswarm/secret-stream')
Expand All @@ -22,7 +21,6 @@ const HANDSHAKE_INITIAL_TIMEOUT = 10000
module.exports = class Server extends EventEmitter {
constructor (dht, opts = {}) {
super()
/** @type {HyperDHT} */
this.dht = dht
this.target = null

Expand Down Expand Up @@ -307,7 +305,6 @@ module.exports = class Server extends EventEmitter {
hs.rawStream.removeListener('error', autoDestroy)

if (hs.rawStream.connected) {
// TODO: ask @mafintosh if this "un-relaying" logic is correct
hs.rawStream.once('remote-changed', () => {
this.tracer.trace('remote-changed', { relay: hs.encryptedSocket.relay })
hs.encryptedSocket.relay = {
Expand All @@ -316,9 +313,7 @@ module.exports = class Server extends EventEmitter {
relayThrough: null,
remotePublicKey: null
}
// Note - this is really an "un-relay" event but subscribers should know
// to look at the .relay.relaying property
hs.encryptedSocket.emit('relay', hs.encryptedSocket.relay)
hs.encryptedSocket.emit('unrelay', hs.encryptedSocket.relay)
})
const remoteChanging = hs.rawStream.changeRemote(socket, remotePayload.udx.id, port, host)

Expand All @@ -335,6 +330,13 @@ module.exports = class Server extends EventEmitter {
keepAlive: this.dht.connectionKeepAlive
})

hs.encryptedSocket.relay = {
relaying: false,
relayPaired: false,
relayThrough: null,
remotePublicKey: null
}

this.onconnection(hs.encryptedSocket)
}

Expand Down Expand Up @@ -628,14 +630,6 @@ module.exports = class Server extends EventEmitter {
token = remotePayload.relayThrough.token
}

this.emit('relay', {
relaying: true,
relayPaired: false,
relayThrough: publicKey,
remotePublicKey: h.remotePublicKey
})
hs.tracer.trace('relay-connect', { relayThrough: publicKey, remotePublicKey: h.remotePublicKey })

hs.relayToken = token
hs.relaySocket = this.dht.connect(publicKey)
hs.relaySocket.setKeepAlive(this.relayKeepAlive)
Expand All @@ -646,7 +640,6 @@ module.exports = class Server extends EventEmitter {
.pair(isInitiator, token, hs.rawStream)
.on('error', onabort)
.on('data', (remoteId) => {
hs.tracer.trace('relay-connected', { relayThrough: publicKey, remotePublicKey: hs.remotePublicKey })
if (hs.relayTimeout) clearRelayTimeout(hs)
if (hs.rawStream === null) {
onabort(null)
Expand Down Expand Up @@ -679,7 +672,7 @@ module.exports = class Server extends EventEmitter {
relayThrough: publicKey,
remotePublicKey: h.remotePublicKey
}
this.emit('relay', hs.encryptedSocket.relay)
hs.encryptedSocket.emit('relay', hs.encryptedSocket.relay)

this.onconnection(hs.encryptedSocket)
})
Expand All @@ -696,6 +689,7 @@ module.exports = class Server extends EventEmitter {
relayThrough: null,
remotePublicKey: null
}
hs.encryptedSocket.emit('relay-aborted', hs.encryptedSocket.relay)
if (socket) socket.destroy()
}
}
Expand Down
48 changes: 40 additions & 8 deletions test/relaying.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ test('relay connections through node, client side', async function (t) {
const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true })

const lc = t.test('socket lifecycle')
lc.plan(5)
lc.plan(8)

const aServer = a.createServer(function (socket) {
lc.pass('server socket opened')
Expand Down Expand Up @@ -46,8 +46,13 @@ test('relay connections through node, client side', async function (t) {
const aSocket = c.connect(aServer.publicKey, { relayThrough: bServer.publicKey })

aSocket
.on('relay-aborted', (data) => {
lc.pass('relay aborted')
lc.is(aSocket.relay.relaying, false)
})
.on('open', () => {
lc.pass('client socket opened')
lc.is(aSocket.relay.relaying, true)
})
.on('close', () => {
lc.pass('client socket closed')
Expand All @@ -69,7 +74,7 @@ test('relay connections through node, client side, client aborts hole punch', as
const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true })

const lc = t.test('socket lifecycle')
lc.plan(5)
lc.plan(6)

const aServer = a.createServer(function (socket) {
lc.pass('server socket opened')
Expand Down Expand Up @@ -110,6 +115,9 @@ test('relay connections through node, client side, client aborts hole punch', as
.on('close', () => {
lc.pass('client socket closed')
})
.on('relay-aborted', (data) => {
lc.pass('client relay aborted')
})
.end('hello world')

await lc
Expand All @@ -127,7 +135,7 @@ test('relay connections through node, client side, server aborts hole punch', as
const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true })

const lc = t.test('socket lifecycle')
lc.plan(5)
lc.plan(6)

const aServer = a.createServer({ holepunch: () => false }, function (socket) {
lc.pass('server socket opened')
Expand Down Expand Up @@ -168,6 +176,9 @@ test('relay connections through node, client side, server aborts hole punch', as
.on('close', () => {
lc.pass('client socket closed')
})
.on('relay-aborted', (data) => {
lc.pass('client relay aborted')
})
.end('hello world')

await lc
Expand All @@ -185,7 +196,7 @@ test('relay connections through node, server side', async function (t) {
const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true })

const lc = t.test('socket lifecycle')
lc.plan(5)
lc.plan(6)

const relay = new RelayServer({
createStream (opts) {
Expand Down Expand Up @@ -226,6 +237,9 @@ test('relay connections through node, server side', async function (t) {
.on('close', () => {
lc.pass('client socket closed')
})
.on('relay-aborted', (data) => {
lc.pass('client relay aborted')
})
.end('hello world')

await lc
Expand All @@ -243,7 +257,7 @@ test('relay connections through node, server side, client aborts hole punch', as
const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true })

const lc = t.test('socket lifecycle')
lc.plan(5)
lc.plan(6)

const relay = new RelayServer({
createStream (opts) {
Expand Down Expand Up @@ -284,6 +298,9 @@ test('relay connections through node, server side, client aborts hole punch', as
.on('close', () => {
lc.pass('client socket closed')
})
.on('relay-aborted', (data) => {
lc.pass('client relay aborted')
})
.end('hello world')

await lc
Expand All @@ -301,7 +318,7 @@ test('relay connections through node, server side, server aborts hole punch', as
const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true })

const lc = t.test('socket lifecycle')
lc.plan(5)
lc.plan(6)

const relay = new RelayServer({
createStream (opts) {
Expand Down Expand Up @@ -342,6 +359,9 @@ test('relay connections through node, server side, server aborts hole punch', as
.on('close', () => {
lc.pass('client socket closed')
})
.on('relay-aborted', (data) => {
lc.pass('client relay aborted')
})
.end('hello world')

await lc
Expand All @@ -359,7 +379,7 @@ test('relay connections through node, client and server side', async function (t
const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true })

const lc = t.test('socket lifecycle')
lc.plan(5)
lc.plan(6)
const testRelay = t.test('relay server')
testRelay.plan(2) // One each for the initiator and the follower
const testRelayInitiator = t.test('relay initiator')
Expand Down Expand Up @@ -412,6 +432,12 @@ test('relay connections through node, client and server side', async function (t
const bSocket = c.connect(bServer.publicKey, { relayThrough: aServer.publicKey })

bSocket
.on('relay', () => {
lc.pass('client socket is being relayed')
})
.on('relay-aborted', (data) => {
lc.fail('client relay aborted')
})
.on('open', () => {
lc.pass('client socket opened')
})
Expand All @@ -435,7 +461,7 @@ test.skip('relay several connections through node with pool', async function (t)
const c = new DHT({ bootstrap, quickFirewall: false, ephemeral: true })

const lc = t.test('socket lifecycle')
lc.plan(10)
lc.plan(12)

const aServer = a.createServer(function (socket) {
lc.pass('server socket opened')
Expand Down Expand Up @@ -475,6 +501,9 @@ test.skip('relay several connections through node with pool', async function (t)
.on('open', () => {
lc.pass('1st client socket opened')
})
.on('relay-aborted', () => {
lc.pass('client relay aborted')
})
.on('close', () => {
lc.pass('1st client socket closed')

Expand All @@ -487,6 +516,9 @@ test.skip('relay several connections through node with pool', async function (t)
.on('close', () => {
lc.pass('2nd client socket closed')
})
.on('relay-aborted', () => {
lc.pass('client relay aborted')
})
.end('hello world')
})
.end('hello world')
Expand Down

0 comments on commit 6d9ce7f

Please sign in to comment.