Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document Server state machine + clean up edge cases #202

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,32 @@ const { ALREADY_LISTENING, NODE_DESTROYED } = require('./errors')
const HANDSHAKE_CLEAR_WAIT = 10000
const HANDSHAKE_INITIAL_TIMEOUT = 10000

// Developer info
// While DHT Server is open (after listen finished, and before close is called)
// it listens for new connections and passes them on to the main DHT object which owns it
// The _router object of the main DHT object provides the entrypoint
// for new connections, through its onpeerhandshake callback
// The lifecycle of a new connection is:
// 1) onpeerhandshake is called by the router
// 2) A new handshake is performed for that peer if none exists
// (if one exists, the next steps are skipped)
// 3) _addHandshake runs:
// a) Verify the remote is not firewalled
// b) Ask the main DHT object to create a new raw stream
// c) define callbacks to run when the connection is established (when it has a socket).
// Note: to that end, it (ab)uses the firewall to detect when it's no longer a relayed stream, but has a direct connection
// 4) When a connection is established (onsocket callback of the raw stream):
// a) Create a Hyperswarm secret stream on top of the raw connection
// b) trigger the onconnection callback, informing the main DHT object of the connection
// => From here on, the DHT object is responsible for the connection's lifecycle
// An important takeaway here is that there is a period of time during which a connection
// is being established, but does not yet exist for the DHT itself.
//
// On top of opening/opened/closing/closed states,
// the Server also has suspend logic, adding suspended, suspending and resuming
// states (which can overlap with the open/close states)
// Being suspended means it has no active connections and does not accept
// new connections until resumed again
module.exports = class Server extends EventEmitter {
constructor (dht, opts = {}) {
super()
Expand Down Expand Up @@ -178,6 +204,10 @@ module.exports = class Server extends EventEmitter {
}

if (this._closing) return

// In case we start listening while already
// TODO: pass the suspended state in when creating the announcer instead?
// That way it never creates any queries (needs changes in _announcer)
if (this.suspended) await this._announcer.suspend()

if (this._closing) return
Expand Down Expand Up @@ -230,6 +260,8 @@ module.exports = class Server extends EventEmitter {

let remotePayload
try {
// TODO: figure out why this is awaited
// (default handshake.recv is sync, which makes sense since noise is just a buffer)
remotePayload = await handshake.recv(noise)
} catch (err) {
safetyCatch(err)
Expand Down Expand Up @@ -283,6 +315,13 @@ module.exports = class Server extends EventEmitter {
hs.rawStream.on('error', autoDestroy)

hs.onsocket = (socket, port, host) => {
if (this._closing || this.suspended) {
// TODO: which error to use? There's a SUSPENDED error.
// Add a CLOSED/CLOSING error too?
socket.destroy(new Error('TODO: correct error message'))
return
}

if (hs.rawStream === null) return // Already hole punched

this._clearLater(hs, id, k)
Expand Down Expand Up @@ -442,6 +481,11 @@ module.exports = class Server extends EventEmitter {
}

async _onpeerhandshake ({ noise, peerAddress }, req) {
// TODO: consider alternatives
// (an alternative is destroying the router before the servers in index.js/destroy.
// which solves the _closing case and leaves only the suspended case.)
if (this._closing || this.suspended) return

const k = b4a.toString(noise, 'hex')

// The next couple of statements MUST run within the same tick to prevent
Expand Down
Loading