Skip to content

Commit

Permalink
fix: UDP relaying (#41)
Browse files Browse the repository at this point in the history
Removing dynamic relaying broke UDP relaying for clients. After the handshake, generally the original UDP socket is released, and the multiplayer framework will create a new one. This will have the same external address, but a completely different port, meaning noray won't recognize it and won't relay any data.

Fixes #40
  • Loading branch information
elementbound authored Jul 9, 2023
1 parent b4e93e0 commit 4047174
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 1 deletion.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@foxssake/noray",
"version": "1.3.0",
"version": "1.3.1",
"description": "Online multiplayer orchestrator and potential game platform",
"main": "src/noray.mjs",
"bin": {
Expand Down
88 changes: 88 additions & 0 deletions src/relay/dynamic.relaying.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/* eslint-disable */
import { NetAddress } from './net.address.mjs'
import { UDPRelayHandler } from './udp.relay.handler.mjs'
/* eslint-enable */
import logger from '../logger.mjs'
import { RelayEntry } from './relay.entry.mjs'

const log = logger.child({ name: 'DynamicRelaying' })

/**
* Implementation for dynamic relaying.
*
* Whenever an unknown client tries to send data to a known host through its
* relay address, dynamic relaying will create a new relay.
*
* While it's waiting for the relay to be created, it will buffer any incoming
* data and send it all once the relay is created.
*/
export class DynamicRelaying {
/** @type {Map<string, Buffer[]>} */
#buffers = new Map()

/**
* Apply dynamic relay creation to relay handler.
* @param {UDPRelayHandler} relayHandler Relay handler
*/
apply (relayHandler) {
relayHandler.on('drop',
(senderRelay, targetRelay, senderAddress, targetPort, message) =>
this.#handle(relayHandler, senderRelay, targetRelay, senderAddress, targetPort, message)
)
}

/**
* @param {UDPRelayHandler} relayHandler
* @param {RelayEntry} senderRelay
* @param {RelayEntry} targetRelay
* @param {NetAddress} senderAddress
* @param {number} targetPort
* @param {Buffer} message
*/
async #handle (relayHandler, senderRelay, targetRelay, senderAddress, targetPort, message) {
// Unknown host or client already has relay, ignore
if (senderRelay || !targetRelay) {
return
}

const key = senderAddress.toString() + '>' + targetPort

// We're already buffering for client, save data end return
if (this.#buffers.has(key)) {
this.#buffers.get(key).push(message)
return
}

// No buffer for client yet, start buffering and create relay
log.info(
{ from: senderAddress, to: targetRelay.address },
'Creating dynamic relay'
)
this.#buffers.set(key, [message])
const port = await relayHandler.socketPool.allocatePort()
const relay = new RelayEntry({
address: senderAddress,
port
})
await relayHandler.createRelay(relay)

log.info(
{ relay },
'Relay created, sending %d packets',
this.#buffers.get(key)?.length ?? 0
)
this.#buffers.get(key).forEach(msg =>
relayHandler.relay(msg, senderAddress, targetPort)
)

this.#buffers.delete(key)
}
}

/**
* Apply dynamic relaying to relay handler.
* @param {UDPRelayHandler} relayHandler Relay handler
*/
export function useDynamicRelay (relayHandler) {
new DynamicRelaying().apply(relayHandler)
}
4 changes: 4 additions & 0 deletions src/relay/relay.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import logger from '../logger.mjs'
import { formatByteSize, formatDuration } from '../utils.mjs'
import { UDPRemoteRegistrar } from './udp.remote.registrar.mjs'
import { hostRepository } from '../hosts/host.mjs'
import { useDynamicRelay } from './dynamic.relaying.mjs'
import { UDPSocketPool } from './udp.socket.pool.mjs'

export const udpSocketPool = new UDPSocketPool()
Expand Down Expand Up @@ -65,6 +66,9 @@ Noray.hook(async noray => {
constrainLifetime(udpRelayHandler, config.udpRelay.maxLifetimeDuration)
constrainTraffic(udpRelayHandler, config.udpRelay.maxLifetimeTraffic)

log.info('Applying dynamic relaying')
useDynamicRelay(udpRelayHandler)

log.info('Adding shutdown hooks')
noray.on('close', () => {
log.info('Noray shutting down, cancelling UDP relay cleanup job')
Expand Down
130 changes: 130 additions & 0 deletions test/spec/relay/dynamic.relaying.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { beforeEach, afterEach, describe, it } from 'node:test'
import assert from 'node:assert'
import sinon from 'sinon'
import { UDPRelayHandler } from '../../../src/relay/udp.relay.handler.mjs'
import { sleep } from '../../../src/utils.mjs'
import { useDynamicRelay } from '../../../src/relay/dynamic.relaying.mjs'
import { RelayEntry } from '../../../src/relay/relay.entry.mjs'
import { NetAddress } from '../../../src/relay/net.address.mjs'
import { UDPSocketPool } from '../../../src/relay/udp.socket.pool.mjs'

describe('DynamicRelaying', () => {
let clock

beforeEach(() => {
clock = sinon.useFakeTimers()
})

it('should create relay', async () => {
// Given
const socketPool = sinon.createStubInstance(UDPSocketPool)
socketPool.allocatePort.resolves(10000)

const relayHandler = sinon.createStubInstance(UDPRelayHandler)
relayHandler.on.callThrough()
relayHandler.emit.callThrough()
sinon.stub(relayHandler, 'socketPool').value(socketPool)

relayHandler.createRelay.resolves(true)
useDynamicRelay(relayHandler)

const senderRelay = undefined
const targetRelay = new RelayEntry({
address: new NetAddress({ address: '87.54.0.16', port: 16752 }),
port: 10007
})
const senderAddress = new NetAddress({ address: '97.32.4.16', port: 32775 })
const targetPort = targetRelay.port
const messages = [
'hello', 'world', 'use', 'noray'
].map(message => Buffer.from(message))

// When
messages.forEach(message =>
relayHandler.emit('drop', senderRelay, targetRelay, senderAddress, targetPort, message)
)
clock.restore()
await sleep(0.05) // Wait for relay to be created
clock = sinon.useFakeTimers()

// Then
const createdRelay = relayHandler.createRelay.lastCall.args[0]
assert(createdRelay, 'Relay was not created!')
assert.equal(createdRelay.address, senderAddress)
assert.equal(createdRelay.port, 10000)

const sent = relayHandler.relay.getCalls().map(call => call.args[0]?.toString())
messages.forEach(message =>
assert(
sent.includes(message.toString()),
`Message "${message.toString()}" was not sent!`
)
)
})

it('should ignore known sender', async () => {
// Given
const relayHandler = sinon.createStubInstance(UDPRelayHandler)
relayHandler.on.callThrough()
relayHandler.emit.callThrough()

useDynamicRelay(relayHandler)

const senderRelay = new RelayEntry({
address: new NetAddress({ address: '87.54.0.16', port: 16752 }),
port: 10007
})

const targetRelay = undefined
const senderAddress = new NetAddress(senderRelay.address)
const targetPort = 10057
const messages = [
'hello', 'world', 'use', 'noray'
].map(message => Buffer.from(message))

// When
messages.forEach(message =>
relayHandler.emit('drop', senderRelay, targetRelay, senderAddress, targetPort, message)
)
clock.restore()
await sleep(0.05) // Wait for relay to be created
clock = sinon.useFakeTimers()

// Then
assert(relayHandler.createRelay.notCalled)
assert(relayHandler.relay.notCalled)
})

it('should ignore unknown target', async () => {
// Given
const relayHandler = sinon.createStubInstance(UDPRelayHandler)
relayHandler.on.callThrough()
relayHandler.emit.callThrough()

useDynamicRelay(relayHandler)

const senderRelay = undefined
const targetRelay = undefined
const senderAddress = new NetAddress({ address: '87.54.0.16', port: 16752 })
const targetPort = 10057
const messages = [
'hello', 'world', 'use', 'noray'
].map(message => Buffer.from(message))

// When
messages.forEach(message =>
relayHandler.emit('drop', senderRelay, targetRelay, senderAddress, targetPort, message)
)
clock.restore()
await sleep(0.05) // Wait for relay to be created
clock = sinon.useFakeTimers()

// Then
assert(relayHandler.createRelay.notCalled)
assert(relayHandler.relay.notCalled)
})

afterEach(() => {
clock.restore()
})
})

0 comments on commit 4047174

Please sign in to comment.