Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

feat: close read and write streams #122

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,17 @@ In addition to `sink` and `source` properties, this stream also has the followin

#### `stream.close()`

Closes the stream for **reading**. If iterating over the source of this stream in a `for await of` loop, it will return (exit the loop) after any buffered data has been consumed.
Closes the stream for **reading** and **writing**. If iterating over the source of this stream in a `for await of` loop, it will return (exit the loop) after any buffered data has been consumed.

#### `stream.closeRead()`

Closes the stream for **reading**, but still allows writing. This should not typically be called by application code, but may be used for one way, push only streams.

This function is called automatically by the muxer when it receives a `CLOSE` message from the remote.

The source will return normally, the sink will continue to consume.
#### `stream.closeWrite()`

Closes the stream for **writing**, but still allows reading. This is useful for when you only ever wish to read from a stream.

#### `stream.abort([err])`

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"aegir": "^33.1.0",
"cborg": "^1.2.1",
"iso-random-stream": "^2.0.0",
"libp2p-interfaces": "^0.10.0",
"libp2p-interfaces": "github:filoozom/js-libp2p-interfaces#close-read-write",
"p-defer": "^3.0.0",
"random-int": "^2.0.0",
"streaming-iterables": "^5.0.4",
Expand Down
48 changes: 34 additions & 14 deletions src/mplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,41 @@ const pipe = require('it-pipe')
const pushable = require('it-pushable')
const log = require('debug')('libp2p:mplex')
const abortable = require('abortable-iterator')
const errCode = require('err-code')
const Coder = require('./coder')
const restrictSize = require('./restrict-size')
const { MessageTypes, MessageTypeNames } = require('./message-types')
const createStream = require('./stream')

/**
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').Sink} Sink
*/

/**
* @typedef {object} MplexOptions
* @property {(stream: MuxedStream) => void} [onStream] - Called whenever an inbound stream is created
* @property {(stream: MuxedStream) => void} [onStreamEnd] - Called whenever a stream ends
* @property {AbortSignal} [signal] - An AbortController signal
*/

class Mplex {
/**
* @class
* @param {object} options
* @param {function(*)} options.onStream - Called whenever an inbound stream is created
* @param {function(*)} options.onStreamEnd - Called whenever a stream ends
* @param {AbortSignal} options.signal - An AbortController signal
* @param {MplexOptions} options
*/
constructor (options) {
constructor (options = {}) {
options = options || {}
options = typeof options === 'function' ? { onStream: options } : options

this._streamId = 0
this._streams = {
/**
* @type {Map<number, *>} Stream to ids map
* @type {Map<number, MuxedStream>} Stream to ids map
*/
initiators: new Map(),
/**
* @type {Map<number, *>} Stream to ids map
* @type {Map<number, MuxedStream>} Stream to ids map
*/
receivers: new Map()
}
Expand Down Expand Up @@ -58,7 +68,7 @@ class Mplex {
/**
* Returns a Map of streams and their ids
*
* @returns {Map<number,*>}
* @returns {Map<number, MuxedStream>}
*/
get streams () {
// Inbound and Outbound streams may have the same ids, so we need to make those unique
Expand Down Expand Up @@ -90,10 +100,10 @@ class Mplex {
* Called whenever an inbound stream is created
*
* @private
* @param {*} options
* @param {object} options
* @param {number} options.id
* @param {string} options.name
* @returns {*} A muxed stream
* @returns MuxedStream} A muxed stream
*/
_newReceiverStream ({ id, name }) {
const registry = this._streams.receivers
Expand All @@ -108,25 +118,32 @@ class Mplex {
* @param {number} options.id
* @param {string} options.name
* @param {string} options.type
* @param {Map<number, *>} options.registry - A map of streams to their ids
* @returns {*} A muxed stream
* @param {Map<number, MuxedStream>} options.registry - A map of streams to their ids
* @returns {MuxedStream} A muxed stream
*/
_newStream ({ id, name, type, registry }) {
if (registry.has(id)) {
throw new Error(`${type} stream ${id} already exists!`)
}

log('new %s stream %s %s', type, id, name)

const send = msg => {
if (!registry.has(id)) {
throw errCode(new Error('the stream is not in the muxer registry, it may have already been closed'), 'ERR_STREAM_DOESNT_EXIST')
}
if (log.enabled) {
log('%s stream %s %s send', type, id, name, { ...msg, type: MessageTypeNames[msg.type], data: msg.data && msg.data.slice() })
}
return this.source.push(msg)
}

const onEnd = () => {
log('%s stream %s %s ended', type, id, name)
registry.delete(id)
this.onStreamEnd && this.onStreamEnd(stream)
}

const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._options.maxMsgSize })
registry.set(id, stream)
return stream
Expand All @@ -137,7 +154,7 @@ class Mplex {
* also have their size restricted. All messages will be varint decoded.
*
* @private
* @returns {*} Returns an iterable sink
* @returns {Sink} Returns an iterable sink
*/
_createSink () {
return async source => {
Expand Down Expand Up @@ -216,14 +233,17 @@ class Mplex {
switch (type) {
case MessageTypes.MESSAGE_INITIATOR:
case MessageTypes.MESSAGE_RECEIVER:
// We got data from the remote, push it into our local stream
stream.source.push(data)
break
case MessageTypes.CLOSE_INITIATOR:
case MessageTypes.CLOSE_RECEIVER:
stream.close()
// We should expect no more data from the remote, stop reading
stream.closeRead()
break
case MessageTypes.RESET_INITIATOR:
case MessageTypes.RESET_RECEIVER:
// Stop reading and writing to the stream immediately
stream.reset()
break
default:
Expand Down
142 changes: 99 additions & 43 deletions src/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@ const BufferList = require('bl/BufferList')
const errCode = require('err-code')
const { MAX_MSG_SIZE } = require('./restrict-size')
const { InitiatorMessageTypes, ReceiverMessageTypes } = require('./message-types')
const pDefer = require('p-defer')

const ERR_MPLEX_STREAM_RESET = 'ERR_MPLEX_STREAM_RESET'
const ERR_MPLEX_STREAM_ABORT = 'ERR_MPLEX_STREAM_ABORT'
const MPLEX_WRITE_STREAM_CLOSED = 'MPLEX_WRITE_STREAM_CLOSED'

/**
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').Sink} Sink
*/

/**
* @param {object} options
Expand All @@ -20,99 +27,148 @@ const ERR_MPLEX_STREAM_ABORT = 'ERR_MPLEX_STREAM_ABORT'
* @param {function(Error)} [options.onEnd] - Called whenever the stream ends
* @param {string} [options.type] - One of ['initiator','receiver']. Defaults to 'initiator'
* @param {number} [options.maxMsgSize] - Max size of an mplex message in bytes. Writes > size are automatically split. Defaults to 1MB
* @returns {*} A muxed stream
* @returns {MuxedStream} A muxed stream
*/
module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsgSize = MAX_MSG_SIZE }) => {
const abortController = new AbortController()
const resetController = new AbortController()
const writeCloseController = new AbortController()
const Types = type === 'initiator' ? InitiatorMessageTypes : ReceiverMessageTypes
const externalId = type === 'initiator' ? (`i${id}`) : `r${id}`

name = String(name == null ? id : name)

let sourceEnded = false
let sinkEnded = false
let sinkInProgress = false
let endErr
const ended = {}
const defers = {
sink: [],
source: []
}

const end = (type, err) => {
if (!ended[type]) {
ended[type] = true
log('%s stream %s %s end', type, name, type, err)

if (ended.source && ended.sink) {
stream.timeline.close = Date.now()
onEnd(endErr)
} else {
endErr = err
}
}

for (const defer of defers[type]) {
defer.resolve()
}
defers[type] = []
}

const onSourceEnd = err => {
if (sourceEnded) return
sourceEnded = true
log('%s stream %s source end', type, name, err)
if (err && !endErr) endErr = err
if (sinkEnded) {
stream.timeline.close = Date.now()
onEnd(endErr)
const closeWrite = (controller) => {
if (ended.sink) {
return
}

const defer = pDefer()
defers.sink.push(defer)

// Make sure we're still in the sink when aborting
// If the sink wasn't opened yet, open it and immediately close
sinkInProgress ? controller.abort() : stream.sink([])
return defer.promise
}

const onSinkEnd = err => {
if (sinkEnded) return
sinkEnded = true
log('%s stream %s sink end', type, name, err)
if (err && !endErr) endErr = err
if (sourceEnded) {
stream.timeline.close = Date.now()
onEnd(endErr)
const closeRead = (err) => {
// Needed because pushable doesn't call the end function multiple times
if (ended.source) {
return
}

const defer = pDefer()
defers.source.push(defer)
stream.source.end(err)
return defer.promise
}

const closeAll = (controller, err) => {
return Promise.all([
closeWrite(controller),
closeRead(err)
])
}

const _send = (message) => !ended.sink && send(message)

/** @type {MuxedStream} */
const stream = {
// Close for both Reading and Writing
close: () => closeAll(writeCloseController),
// Close for reading
close: () => stream.source.end(),
closeRead: () => closeRead(),
// Close for writing
closeWrite: () => closeWrite(writeCloseController),
// Close for reading and writing (local error)
abort: err => {
log('%s stream %s abort', type, name, err)
// End the source with the passed error
stream.source.end(err)
abortController.abort()
onSinkEnd(err)
closeAll(abortController, err)
},
// Close immediately for reading and writing (remote error)
reset: () => {
const err = errCode(new Error('stream reset'), ERR_MPLEX_STREAM_RESET)
resetController.abort()
stream.source.end(err)
onSinkEnd(err)
closeAll(resetController, err)
},
sink: async source => {
if (sinkInProgress) {
throw errCode(new Error('the sink was already opened'), 'ERR_SINK_ALREADY_OPENED')
}

if (ended.sink) {
throw errCode(new Error('the stream was already closed'), 'ERR_STREAM_CLOSED')
}

sinkInProgress = true
source = abortable(source, [
{ signal: abortController.signal, options: { abortMessage: 'stream aborted', abortCode: ERR_MPLEX_STREAM_ABORT } },
{ signal: resetController.signal, options: { abortMessage: 'stream reset', abortCode: ERR_MPLEX_STREAM_RESET } }
{ signal: resetController.signal, options: { abortMessage: 'stream reset', abortCode: ERR_MPLEX_STREAM_RESET } },
{ signal: writeCloseController.signal, options: { abortMessage: 'write stream closed', abortCode: MPLEX_WRITE_STREAM_CLOSED } }
])

if (type === 'initiator') { // If initiator, open a new stream
send({ id, type: Types.NEW_STREAM, data: name })
_send({ id, type: Types.NEW_STREAM, data: name })
}

try {
for await (let data of source) {
while (data.length) {
if (data.length <= maxMsgSize) {
send({ id, type: Types.MESSAGE, data })
_send({ id, type: Types.MESSAGE, data })
break
}
data = BufferList.isBufferList(data) ? data : new BufferList(data)
send({ id, type: Types.MESSAGE, data: data.shallowSlice(0, maxMsgSize) })
_send({ id, type: Types.MESSAGE, data: data.shallowSlice(0, maxMsgSize) })
data.consume(maxMsgSize)
}
}
} catch (err) {
// Send no more data if this stream was remotely reset
if (err.code === ERR_MPLEX_STREAM_RESET) {
log('%s stream %s reset', type, name)
} else {
log('%s stream %s error', type, name, err)
send({ id, type: Types.RESET })
}
if (err.code !== MPLEX_WRITE_STREAM_CLOSED) {
// Send no more data if this stream was remotely reset
if (err.code === ERR_MPLEX_STREAM_RESET) {
log('%s stream %s reset', type, name)
} else {
log('%s stream %s error', type, name, err)
_send({ id, type: Types.RESET })
}

stream.source.end(err)
return onSinkEnd(err)
stream.source.end(err)
return end('sink', err)
}
}

send({ id, type: Types.CLOSE })
onSinkEnd()
_send({ id, type: Types.CLOSE })
end('sink')
},
source: pushable(onSourceEnd),
source: pushable(end.bind(null, 'source')),
timeline: {
open: Date.now(),
close: null
Expand Down
2 changes: 1 addition & 1 deletion test/stream.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ describe('stream', () => {
map(msg => {
// when the initiator sends a CLOSE message, we call close
if (msg.type === MessageTypes.CLOSE_INITIATOR) {
receiver.close()
receiver.closeRead()
}
return msgToBuffer(msg)
}),
Expand Down