Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

feat: close read and write #115

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,17 @@ In addition to `sink` and `source` properties, this stream also has the followin

#### `stream.close()`

Closes the stream for **reading**. If iterating over the source of this stream in a `for await of` loop, it will return (exit the loop) after any buffered data has been consumed.
Closes the stream for **reading** and **writing**. If iterating over the source of this stream in a `for await of` loop, it will return (exit the loop) after any buffered data has been consumed.

#### `stream.closeRead()`

Closes the stream for **reading**, but still allows writing. This should not typically be called by application code, but may be used for one way, push only streams.

This function is called automatically by the muxer when it receives a `CLOSE` message from the remote.

The source will return normally, the sink will continue to consume.
#### `stream.closeWrite()`

Closes the stream for **writing**, but still allows reading. This is useful for when you only ever wish to read from a stream.

#### `stream.abort([err])`

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"aegir": "^33.1.0",
"cborg": "^1.2.1",
"iso-random-stream": "^2.0.0",
"libp2p-interfaces": "^0.10.0",
"libp2p-interfaces": "github:libp2p/js-libp2p-interfaces#feat/streams",
"p-defer": "^3.0.0",
"random-int": "^2.0.0",
"streaming-iterables": "^5.0.4",
Expand Down
9 changes: 8 additions & 1 deletion src/mplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const pipe = require('it-pipe')
const pushable = require('it-pushable')
const log = require('debug')('libp2p:mplex')
const abortable = require('abortable-iterator')
const errCode = require('err-code')
const Coder = require('./coder')
const restrictSize = require('./restrict-size')
const { MessageTypes, MessageTypeNames } = require('./message-types')
Expand Down Expand Up @@ -117,6 +118,9 @@ class Mplex {
}
log('new %s stream %s %s', type, id, name)
const send = msg => {
if (!registry.has(id)) {
throw errCode(new Error('the stream is not in the muxer registry, it may have already been closed'), 'ERR_STREAM_DOESNT_EXIST')
}
if (log.enabled) {
log('%s stream %s %s send', type, id, name, { ...msg, type: MessageTypeNames[msg.type], data: msg.data && msg.data.slice() })
}
Expand Down Expand Up @@ -216,14 +220,17 @@ class Mplex {
switch (type) {
case MessageTypes.MESSAGE_INITIATOR:
case MessageTypes.MESSAGE_RECEIVER:
// We got data from the remote, push it into our local stream
stream.source.push(data)
break
case MessageTypes.CLOSE_INITIATOR:
case MessageTypes.CLOSE_RECEIVER:
stream.close()
// We should expect no more data from the remote, stop reading
stream.closeRead()
break
case MessageTypes.RESET_INITIATOR:
case MessageTypes.RESET_RECEIVER:
// Stop reading and writing to the stream immediately
stream.reset()
break
default:
Expand Down
9 changes: 8 additions & 1 deletion src/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,15 @@ module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsg
}

const stream = {
// Close for both Reading and Writing
close: () => Promise.all([
stream.closeRead(),
stream.closeWrite()
]),
// Close for reading
close: () => stream.source.end(),
closeRead: () => stream.source.end(),
// Close for writing
closeWrite: () => stream.sink([]),
// Close for reading and writing (local error)
abort: err => {
log('%s stream %s abort', type, name, err)
Expand Down
1 change: 0 additions & 1 deletion test/compliance.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
'use strict'

const tests = require('libp2p-interfaces/src/stream-muxer/tests')

const Mplex = require('../src')

describe('compliance', () => {
Expand Down
4 changes: 2 additions & 2 deletions test/stream.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ describe('stream', () => {
pipe(
receiver,
map(msg => {
// when the initiator sends a CLOSE message, we call close
// when the initiator sends a CLOSE message, we close for reading
if (msg.type === MessageTypes.CLOSE_INITIATOR) {
receiver.close()
receiver.closeRead()
}
return msgToBuffer(msg)
}),
Expand Down