Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

feat: close read and write streams #122

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,17 @@ In addition to `sink` and `source` properties, this stream also has the followin

#### `stream.close()`

Closes the stream for **reading**. If iterating over the source of this stream in a `for await of` loop, it will return (exit the loop) after any buffered data has been consumed.
Closes the stream for **reading** and **writing**. If iterating over the source of this stream in a `for await of` loop, it will return (exit the loop) after any buffered data has been consumed.

#### `stream.closeRead()`

Closes the stream for **reading**, but still allows writing. This should not typically be called by application code, but may be used for one way, push only streams.

This function is called automatically by the muxer when it receives a `CLOSE` message from the remote.

The source will return normally, the sink will continue to consume.
#### `stream.closeWrite()`

Closes the stream for **writing**, but still allows reading. This is useful for when you only ever wish to read from a stream.

#### `stream.abort([err])`

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"aegir": "^33.1.0",
"cborg": "^1.2.1",
"iso-random-stream": "^2.0.0",
"libp2p-interfaces": "^0.10.0",
"libp2p-interfaces": "github:filoozom/js-libp2p-interfaces#close-read-write",
"p-defer": "^3.0.0",
"random-int": "^2.0.0",
"streaming-iterables": "^5.0.4",
Expand Down
48 changes: 34 additions & 14 deletions src/mplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,41 @@ const pipe = require('it-pipe')
const pushable = require('it-pushable')
const log = require('debug')('libp2p:mplex')
const abortable = require('abortable-iterator')
const errCode = require('err-code')
const Coder = require('./coder')
const restrictSize = require('./restrict-size')
const { MessageTypes, MessageTypeNames } = require('./message-types')
const createStream = require('./stream')

/**
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').Sink} Sink
*/

/**
* @typedef {object} MplexOptions
* @property {(stream: MuxedStream) => void} [onStream] - Called whenever an inbound stream is created
* @property {(stream: MuxedStream) => void} [onStreamEnd] - Called whenever a stream ends
* @property {AbortSignal} [signal] - An AbortController signal
*/

class Mplex {
/**
* @class
* @param {object} options
* @param {function(*)} options.onStream - Called whenever an inbound stream is created
* @param {function(*)} options.onStreamEnd - Called whenever a stream ends
* @param {AbortSignal} options.signal - An AbortController signal
* @param {MplexOptions} options
*/
constructor (options) {
constructor (options = {}) {
options = options || {}
options = typeof options === 'function' ? { onStream: options } : options

this._streamId = 0
this._streams = {
/**
* @type {Map<number, *>} Stream to ids map
* @type {Map<number, MuxedStream>} Stream to ids map
*/
initiators: new Map(),
/**
* @type {Map<number, *>} Stream to ids map
* @type {Map<number, MuxedStream>} Stream to ids map
*/
receivers: new Map()
}
Expand Down Expand Up @@ -58,7 +68,7 @@ class Mplex {
/**
* Returns a Map of streams and their ids
*
* @returns {Map<number,*>}
* @returns {Map<number, MuxedStream>}
*/
get streams () {
// Inbound and Outbound streams may have the same ids, so we need to make those unique
Expand Down Expand Up @@ -90,10 +100,10 @@ class Mplex {
* Called whenever an inbound stream is created
*
* @private
* @param {*} options
* @param {object} options
* @param {number} options.id
* @param {string} options.name
* @returns {*} A muxed stream
* @returns MuxedStream} A muxed stream
*/
_newReceiverStream ({ id, name }) {
const registry = this._streams.receivers
Expand All @@ -108,25 +118,32 @@ class Mplex {
* @param {number} options.id
* @param {string} options.name
* @param {string} options.type
* @param {Map<number, *>} options.registry - A map of streams to their ids
* @returns {*} A muxed stream
* @param {Map<number, MuxedStream>} options.registry - A map of streams to their ids
* @returns {MuxedStream} A muxed stream
*/
_newStream ({ id, name, type, registry }) {
if (registry.has(id)) {
throw new Error(`${type} stream ${id} already exists!`)
}

log('new %s stream %s %s', type, id, name)

const send = msg => {
if (!registry.has(id)) {
throw errCode(new Error('the stream is not in the muxer registry, it may have already been closed'), 'ERR_STREAM_DOESNT_EXIST')
}
if (log.enabled) {
log('%s stream %s %s send', type, id, name, { ...msg, type: MessageTypeNames[msg.type], data: msg.data && msg.data.slice() })
}
return this.source.push(msg)
}

const onEnd = () => {
log('%s stream %s %s ended', type, id, name)
registry.delete(id)
this.onStreamEnd && this.onStreamEnd(stream)
}

const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._options.maxMsgSize })
registry.set(id, stream)
return stream
Expand All @@ -137,7 +154,7 @@ class Mplex {
* also have their size restricted. All messages will be varint decoded.
*
* @private
* @returns {*} Returns an iterable sink
* @returns {Sink} Returns an iterable sink
*/
_createSink () {
return async source => {
Expand Down Expand Up @@ -216,14 +233,17 @@ class Mplex {
switch (type) {
case MessageTypes.MESSAGE_INITIATOR:
case MessageTypes.MESSAGE_RECEIVER:
// We got data from the remote, push it into our local stream
stream.source.push(data)
break
case MessageTypes.CLOSE_INITIATOR:
case MessageTypes.CLOSE_RECEIVER:
stream.close()
// We should expect no more data from the remote, stop reading
stream.closeRead()
break
case MessageTypes.RESET_INITIATOR:
case MessageTypes.RESET_RECEIVER:
// Stop reading and writing to the stream immediately
stream.reset()
break
default:
Expand Down
77 changes: 61 additions & 16 deletions src/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@ const BufferList = require('bl/BufferList')
const errCode = require('err-code')
const { MAX_MSG_SIZE } = require('./restrict-size')
const { InitiatorMessageTypes, ReceiverMessageTypes } = require('./message-types')
const pDefer = require('p-defer')

const ERR_MPLEX_STREAM_RESET = 'ERR_MPLEX_STREAM_RESET'
const ERR_MPLEX_STREAM_ABORT = 'ERR_MPLEX_STREAM_ABORT'
const MPLEX_WRITE_STREAM_CLOSED = 'MPLEX_WRITE_STREAM_CLOSED'

/**
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').Sink} Sink
*/

/**
* @param {object} options
Expand All @@ -20,18 +27,21 @@ const ERR_MPLEX_STREAM_ABORT = 'ERR_MPLEX_STREAM_ABORT'
* @param {function(Error)} [options.onEnd] - Called whenever the stream ends
* @param {string} [options.type] - One of ['initiator','receiver']. Defaults to 'initiator'
* @param {number} [options.maxMsgSize] - Max size of an mplex message in bytes. Writes > size are automatically split. Defaults to 1MB
* @returns {*} A muxed stream
* @returns {MuxedStream} A muxed stream
*/
module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsgSize = MAX_MSG_SIZE }) => {
const abortController = new AbortController()
const resetController = new AbortController()
const writeCloseController = new AbortController()
const Types = type === 'initiator' ? InitiatorMessageTypes : ReceiverMessageTypes
const externalId = type === 'initiator' ? (`i${id}`) : `r${id}`

name = String(name == null ? id : name)

let sourceEnded = false
let sinkEnded = false
let sinkInProgress = false
let sinkClosedDefer
let endErr

const onSourceEnd = err => {
Expand All @@ -54,11 +64,38 @@ module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsg
stream.timeline.close = Date.now()
onEnd(endErr)
}
if (sinkClosedDefer) sinkClosedDefer.resolve()
}

const _send = (message) => {
if (!sinkEnded) {
send(message)
}
}

/** @type {MuxedStream} */
const stream = {
// Close for both Reading and Writing
close: () => Promise.all([
stream.closeRead(),
stream.closeWrite()
]),
// Close for reading
close: () => stream.source.end(),
closeRead: () => stream.source.end(),
// Close for writing
closeWrite: () => {
if (sinkEnded) {
return
}

if (sinkInProgress) {
sinkClosedDefer = pDefer()
writeCloseController.abort()
return sinkClosedDefer.promise
}

return stream.sink([])
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm starting to wrap my head around these streams, and I have one question regarding the current code in relation to the spec: does it make sense to create a new stream right before closing it here? Does the spec require this, or could we simply replace it with onSinkEnd() and prohibit a the sink function from being called if sinkEnded?

I don't see anything indicating that there needs to be a stream in both directions in https://github.com/libp2p/specs/tree/master/mplex#opening-a-new-stream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the advantage of doing so is that we inform the other party that we will not write to it anymore and only expect read and their side will also be close for reading

},
// Close for reading and writing (local error)
abort: err => {
log('%s stream %s abort', type, name, err)
Expand All @@ -75,41 +112,49 @@ module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsg
onSinkEnd(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problems seems here, both in reset and abort. They call onSinkEnd and do the abort. This will cause the sink catch to be triggered and a second onSinkEnd happens in the end.

Removing onSinkEnd in abort and reset will not work as if the sink did not start the onEnd will not be triggered. So, we will likely need to improve the logic in the catch function of sink to not do the following on abort/reset:

stream.source.end(err)
return onSinkEnd(err)

Without the previous change, this was not problematic somehow. I think it is related to something now happening in a different event loop.

What do you think to be the best approach here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure I follow. Both onSourceEnd and onSinkEnd are guarded from running twice, so both of these shouldn't do anything if ran a second time, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, I guess it's because in abort, onEnd is called before the Types.Reset message was sent. And with the new _send it can't be, and apparently this makes everything hang. Maybe a bug in some other software, I guess there should be a timeout somewhere.

I'll see tomorrow if I can clean this up a bit, but at least it works now.

},
sink: async source => {
if (sinkInProgress) {
throw errCode(new Error('the sink was already opened'), 'ERR_SINK_ALREADY_OPENED')
}

sinkInProgress = true
source = abortable(source, [
{ signal: abortController.signal, options: { abortMessage: 'stream aborted', abortCode: ERR_MPLEX_STREAM_ABORT } },
{ signal: resetController.signal, options: { abortMessage: 'stream reset', abortCode: ERR_MPLEX_STREAM_RESET } }
{ signal: resetController.signal, options: { abortMessage: 'stream reset', abortCode: ERR_MPLEX_STREAM_RESET } },
{ signal: writeCloseController.signal, options: { abortMessage: 'write stream closed', abortCode: MPLEX_WRITE_STREAM_CLOSED } }
])

if (type === 'initiator') { // If initiator, open a new stream
send({ id, type: Types.NEW_STREAM, data: name })
_send({ id, type: Types.NEW_STREAM, data: name })
}

try {
for await (let data of source) {
while (data.length) {
if (data.length <= maxMsgSize) {
send({ id, type: Types.MESSAGE, data })
_send({ id, type: Types.MESSAGE, data })
break
}
data = BufferList.isBufferList(data) ? data : new BufferList(data)
send({ id, type: Types.MESSAGE, data: data.shallowSlice(0, maxMsgSize) })
_send({ id, type: Types.MESSAGE, data: data.shallowSlice(0, maxMsgSize) })
data.consume(maxMsgSize)
}
}
} catch (err) {
// Send no more data if this stream was remotely reset
if (err.code === ERR_MPLEX_STREAM_RESET) {
log('%s stream %s reset', type, name)
} else {
log('%s stream %s error', type, name, err)
send({ id, type: Types.RESET })
}
if (err.code !== MPLEX_WRITE_STREAM_CLOSED) {
// Send no more data if this stream was remotely reset
if (err.code === ERR_MPLEX_STREAM_RESET) {
log('%s stream %s reset', type, name)
} else {
log('%s stream %s error', type, name, err)
_send({ id, type: Types.RESET })
}

stream.source.end(err)
return onSinkEnd(err)
stream.source.end(err)
return onSinkEnd(err)
}
}

send({ id, type: Types.CLOSE })
_send({ id, type: Types.CLOSE })
onSinkEnd()
},
source: pushable(onSourceEnd),
Expand Down
2 changes: 1 addition & 1 deletion test/stream.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ describe('stream', () => {
map(msg => {
// when the initiator sends a CLOSE message, we call close
if (msg.type === MessageTypes.CLOSE_INITIATOR) {
receiver.close()
receiver.closeRead()
}
return msgToBuffer(msg)
}),
Expand Down