Skip to content

Commit

Permalink
Revert "chore: Remove dynamic relaying (#31)"
Browse files Browse the repository at this point in the history
This reverts commit 73ff4be.
  • Loading branch information
elementbound committed Jul 9, 2023
1 parent fa13c32 commit edc4aae
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 1 deletion.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@foxssake/noray",
"version": "1.3.0",
"version": "1.3.1",
"description": "Online multiplayer orchestrator and potential game platform",
"main": "src/noray.mjs",
"bin": {
Expand Down
88 changes: 88 additions & 0 deletions src/relay/dynamic.relaying.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/* eslint-disable */
import { NetAddress } from './net.address.mjs'
import { UDPRelayHandler } from './udp.relay.handler.mjs'
/* eslint-enable */
import logger from '../logger.mjs'
import { RelayEntry } from './relay.entry.mjs'

const log = logger.child({ name: 'DynamicRelaying' })

/**
* Implementation for dynamic relaying.
*
* Whenever an unknown client tries to send data to a known host through its
* relay address, dynamic relaying will create a new relay.
*
* While it's waiting for the relay to be created, it will buffer any incoming
* data and send it all once the relay is created.
*/
export class DynamicRelaying {
/** @type {Map<string, Buffer[]>} */
#buffers = new Map()

/**
* Apply dynamic relay creation to relay handler.
* @param {UDPRelayHandler} relayHandler Relay handler
*/
apply (relayHandler) {
relayHandler.on('drop',
(senderRelay, targetRelay, senderAddress, targetPort, message) =>
this.#handle(relayHandler, senderRelay, targetRelay, senderAddress, targetPort, message)
)
}

/**
* @param {UDPRelayHandler} relayHandler
* @param {RelayEntry} senderRelay
* @param {RelayEntry} targetRelay
* @param {NetAddress} senderAddress
* @param {number} targetPort
* @param {Buffer} message
*/
async #handle (relayHandler, senderRelay, targetRelay, senderAddress, targetPort, message) {
// Unknown host or client already has relay, ignore
if (senderRelay || !targetRelay) {
return
}

const key = senderAddress.toString() + '>' + targetPort

// We're already buffering for client, save data end return
if (this.#buffers.has(key)) {
this.#buffers.get(key).push(message)
return
}

// No buffer for client yet, start buffering and create relay
log.info(
{ from: senderAddress, to: targetRelay.address },
'Creating dynamic relay'
)
this.#buffers.set(key, [message])
const port = await relayHandler.socketPool.allocatePort()
const relay = new RelayEntry({
address: senderAddress,
port
})
await relayHandler.createRelay(relay)

log.info(
{ relay },
'Relay created, sending %d packets',
this.#buffers.get(key)?.length ?? 0
)
this.#buffers.get(key).forEach(msg =>
relayHandler.relay(msg, senderAddress, targetPort)
)

this.#buffers.delete(key)
}
}

/**
* Apply dynamic relaying to relay handler.
* @param {UDPRelayHandler} relayHandler Relay handler
*/
export function useDynamicRelay (relayHandler) {
new DynamicRelaying().apply(relayHandler)
}
4 changes: 4 additions & 0 deletions src/relay/relay.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import logger from '../logger.mjs'
import { formatByteSize, formatDuration } from '../utils.mjs'
import { UDPRemoteRegistrar } from './udp.remote.registrar.mjs'
import { hostRepository } from '../hosts/host.mjs'
import { useDynamicRelay } from './dynamic.relaying.mjs'
import { UDPSocketPool } from './udp.socket.pool.mjs'

export const udpSocketPool = new UDPSocketPool()
Expand Down Expand Up @@ -65,6 +66,9 @@ Noray.hook(async noray => {
constrainLifetime(udpRelayHandler, config.udpRelay.maxLifetimeDuration)
constrainTraffic(udpRelayHandler, config.udpRelay.maxLifetimeTraffic)

log.info('Applying dynamic relaying')
useDynamicRelay(udpRelayHandler)

log.info('Adding shutdown hooks')
noray.on('close', () => {
log.info('Noray shutting down, cancelling UDP relay cleanup job')
Expand Down
130 changes: 130 additions & 0 deletions test/spec/relay/dynamic.relaying.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { beforeEach, afterEach, describe, it } from 'node:test'
import assert from 'node:assert'
import sinon from 'sinon'
import { UDPRelayHandler } from '../../../src/relay/udp.relay.handler.mjs'
import { sleep } from '../../../src/utils.mjs'
import { useDynamicRelay } from '../../../src/relay/dynamic.relaying.mjs'
import { RelayEntry } from '../../../src/relay/relay.entry.mjs'
import { NetAddress } from '../../../src/relay/net.address.mjs'
import { UDPSocketPool } from '../../../src/relay/udp.socket.pool.mjs'

describe('DynamicRelaying', () => {
let clock

beforeEach(() => {
clock = sinon.useFakeTimers()
})

it('should create relay', async () => {
// Given
const socketPool = sinon.createStubInstance(UDPSocketPool)
socketPool.allocatePort.resolves(10000)

const relayHandler = sinon.createStubInstance(UDPRelayHandler)
relayHandler.on.callThrough()
relayHandler.emit.callThrough()
sinon.stub(relayHandler, 'socketPool').value(socketPool)

relayHandler.createRelay.resolves(true)
useDynamicRelay(relayHandler)

const senderRelay = undefined
const targetRelay = new RelayEntry({
address: new NetAddress({ address: '87.54.0.16', port: 16752 }),
port: 10007
})
const senderAddress = new NetAddress({ address: '97.32.4.16', port: 32775 })
const targetPort = targetRelay.port
const messages = [
'hello', 'world', 'use', 'noray'
].map(message => Buffer.from(message))

// When
messages.forEach(message =>
relayHandler.emit('drop', senderRelay, targetRelay, senderAddress, targetPort, message)
)
clock.restore()
await sleep(0.05) // Wait for relay to be created
clock = sinon.useFakeTimers()

// Then
const createdRelay = relayHandler.createRelay.lastCall.args[0]
assert(createdRelay, 'Relay was not created!')
assert.equal(createdRelay.address, senderAddress)
assert.equal(createdRelay.port, 10000)

const sent = relayHandler.relay.getCalls().map(call => call.args[0]?.toString())
messages.forEach(message =>
assert(
sent.includes(message.toString()),
`Message "${message.toString()}" was not sent!`
)
)
})

it('should ignore known sender', async () => {
// Given
const relayHandler = sinon.createStubInstance(UDPRelayHandler)
relayHandler.on.callThrough()
relayHandler.emit.callThrough()

useDynamicRelay(relayHandler)

const senderRelay = new RelayEntry({
address: new NetAddress({ address: '87.54.0.16', port: 16752 }),
port: 10007
})

const targetRelay = undefined
const senderAddress = new NetAddress(senderRelay.address)
const targetPort = 10057
const messages = [
'hello', 'world', 'use', 'noray'
].map(message => Buffer.from(message))

// When
messages.forEach(message =>
relayHandler.emit('drop', senderRelay, targetRelay, senderAddress, targetPort, message)
)
clock.restore()
await sleep(0.05) // Wait for relay to be created
clock = sinon.useFakeTimers()

// Then
assert(relayHandler.createRelay.notCalled)
assert(relayHandler.relay.notCalled)
})

it('should ignore unknown target', async () => {
// Given
const relayHandler = sinon.createStubInstance(UDPRelayHandler)
relayHandler.on.callThrough()
relayHandler.emit.callThrough()

useDynamicRelay(relayHandler)

const senderRelay = undefined
const targetRelay = undefined
const senderAddress = new NetAddress({ address: '87.54.0.16', port: 16752 })
const targetPort = 10057
const messages = [
'hello', 'world', 'use', 'noray'
].map(message => Buffer.from(message))

// When
messages.forEach(message =>
relayHandler.emit('drop', senderRelay, targetRelay, senderAddress, targetPort, message)
)
clock.restore()
await sleep(0.05) // Wait for relay to be created
clock = sinon.useFakeTimers()

// Then
assert(relayHandler.createRelay.notCalled)
assert(relayHandler.relay.notCalled)
})

afterEach(() => {
clock.restore()
})
})

0 comments on commit edc4aae

Please sign in to comment.