Skip to content

Commit

Permalink
fix: allow mss lazy select on read (#2246)
Browse files Browse the repository at this point in the history
Updates the multistream lazy select to support lazy select on streams that are only read from.
  • Loading branch information
achingbrain authored Nov 20, 2023
1 parent 13a870c commit d8f5bc2
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 21 deletions.
6 changes: 4 additions & 2 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ export class DefaultUpgrader implements Upgrader {
.then(async () => {
const protocols = this.components.registrar.getProtocols()
const { stream, protocol } = await mss.handle(muxedStream, protocols, {
log: muxedStream.log
log: muxedStream.log,
yieldBytes: false
})

if (connection == null) {
Expand Down Expand Up @@ -458,7 +459,8 @@ export class DefaultUpgrader implements Upgrader {

const { stream, protocol } = await mss.select(muxedStream, protocols, {
...options,
log: muxedStream.log
log: muxedStream.log,
yieldBytes: false
})

connection.log('negotiated protocol stream %s with id %s', protocol, muxedStream.id)
Expand Down
1 change: 1 addition & 0 deletions packages/multistream-select/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"it-length-prefixed-stream": "^1.1.1",
"it-pipe": "^3.0.1",
"it-stream-types": "^2.0.1",
"uint8-varint": "^2.0.2",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.6"
},
Expand Down
22 changes: 15 additions & 7 deletions packages/multistream-select/src/handle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,30 @@ import type { Duplex } from 'it-stream-types'
*/
export async function handle <Stream extends Duplex<any, any, any>> (stream: Stream, protocols: string | string[], options: MultistreamSelectInit): Promise<ProtocolStream<Stream>> {
protocols = Array.isArray(protocols) ? protocols : [protocols]
options.log.trace('handle: available protocols %s', protocols)

const lp = lpStream(stream, {
maxDataLength: MAX_PROTOCOL_LENGTH
...options,
maxDataLength: MAX_PROTOCOL_LENGTH,
maxLengthLength: 2 // 2 bytes is enough to length-prefix MAX_PROTOCOL_LENGTH
})

while (true) {
options.log.trace('handle - available protocols %s', protocols)
options?.log.trace('handle: reading incoming string')
const protocol = await multistream.readString(lp, options)
options.log.trace('read "%s"', protocol)
options.log.trace('handle: read "%s"', protocol)

if (protocol === PROTOCOL_ID) {
options.log.trace('respond with "%s" for "%s"', PROTOCOL_ID, protocol)
options.log.trace('handle: respond with "%s" for "%s"', PROTOCOL_ID, protocol)
await multistream.write(lp, uint8ArrayFromString(`${PROTOCOL_ID}\n`), options)
options.log.trace('handle: responded with "%s" for "%s"', PROTOCOL_ID, protocol)
continue
}

if (protocols.includes(protocol)) {
options.log.trace('respond with "%s" for "%s"', protocol, protocol)
options.log.trace('handle: respond with "%s" for "%s"', protocol, protocol)
await multistream.write(lp, uint8ArrayFromString(`${protocol}\n`), options)
options.log.trace('handle: responded with "%s" for "%s"', protocol, protocol)

return { stream: lp.unwrap(), protocol }
}
Expand All @@ -84,12 +90,14 @@ export async function handle <Stream extends Duplex<any, any, any>> (stream: Str
uint8ArrayFromString('\n')
)

options.log.trace('handle: respond with "%s" for %s', protocols, protocol)
await multistream.write(lp, protos, options)
options.log.trace('respond with "%s" for %s', protocols, protocol)
options.log.trace('handle: responded with "%s" for %s', protocols, protocol)
continue
}

options.log('respond with "na" for "%s"', protocol)
options.log('handle: respond with "na" for "%s"', protocol)
await multistream.write(lp, uint8ArrayFromString('na\n'), options)
options.log('handle: responded with "na" for "%s"', protocol)
}
}
3 changes: 2 additions & 1 deletion packages/multistream-select/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import { PROTOCOL_ID } from './constants.js'
import type { AbortOptions, LoggerOptions } from '@libp2p/interface'
import type { LengthPrefixedStreamOpts } from 'it-length-prefixed-stream'

export { PROTOCOL_ID }

Expand All @@ -30,7 +31,7 @@ export interface ProtocolStream<Stream> {
protocol: string
}

export interface MultistreamSelectInit extends AbortOptions, LoggerOptions {
export interface MultistreamSelectInit extends AbortOptions, LoggerOptions, Partial<LengthPrefixedStreamOpts> {

}

Expand Down
52 changes: 42 additions & 10 deletions packages/multistream-select/src/select.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { CodeError } from '@libp2p/interface/errors'
import { lpStream } from 'it-length-prefixed-stream'
import * as varint from 'uint8-varint'
import { Uint8ArrayList } from 'uint8arraylist'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { MAX_PROTOCOL_LENGTH } from './constants.js'
import * as multistream from './multistream.js'
Expand Down Expand Up @@ -53,6 +55,7 @@ import type { Duplex } from 'it-stream-types'
export async function select <Stream extends Duplex<any, any, any>> (stream: Stream, protocols: string | string[], options: MultistreamSelectInit): Promise<ProtocolStream<Stream>> {
protocols = Array.isArray(protocols) ? [...protocols] : [protocols]
const lp = lpStream(stream, {
...options,
maxDataLength: MAX_PROTOCOL_LENGTH
})
const protocol = protocols.shift()
Expand Down Expand Up @@ -109,29 +112,58 @@ export async function select <Stream extends Duplex<any, any, any>> (stream: Str
export function lazySelect <Stream extends Duplex<any, any, any>> (stream: Stream, protocol: string, options: MultistreamSelectInit): ProtocolStream<Stream> {
const originalSink = stream.sink.bind(stream)
const originalSource = stream.source
let selected = false

const lp = lpStream({
sink: originalSink,
source: originalSource
}, {
...options,
maxDataLength: MAX_PROTOCOL_LENGTH
})

stream.sink = async source => {
options?.log.trace('lazy: write ["%s", "%s"]', PROTOCOL_ID, protocol)

await lp.writeV([
uint8ArrayFromString(`${PROTOCOL_ID}\n`),
uint8ArrayFromString(`${protocol}\n`)
])

options?.log.trace('lazy: writing rest of "%s" stream', protocol)
await lp.unwrap().sink(source)
const { sink } = lp.unwrap()

await sink(async function * () {
for await (const buf of source) {
// if writing before selecting, send selection with first data chunk
if (!selected) {
selected = true
options?.log.trace('lazy: write ["%s", "%s", data] in sink', PROTOCOL_ID, protocol)

const protocolString = `${protocol}\n`

// send protocols in first chunk of data written to transport
yield new Uint8ArrayList(
Uint8Array.from([19]), // length of PROTOCOL_ID plus newline
uint8ArrayFromString(`${PROTOCOL_ID}\n`),
varint.encode(protocolString.length),
uint8ArrayFromString(protocolString),
buf
).subarray()

options?.log.trace('lazy: wrote ["%s", "%s", data] in sink', PROTOCOL_ID, protocol)
} else {
yield buf
}
}
}())
}

stream.source = (async function * () {
options?.log.trace('lazy: reading multistream select header')
// if reading before selecting, send selection before first data chunk
if (!selected) {
selected = true
options?.log.trace('lazy: write ["%s", "%s", data] in source', PROTOCOL_ID, protocol)
await lp.writeV([
uint8ArrayFromString(`${PROTOCOL_ID}\n`),
uint8ArrayFromString(`${protocol}\n`)
])
options?.log.trace('lazy: wrote ["%s", "%s", data] in source', PROTOCOL_ID, protocol)
}

options?.log.trace('lazy: reading multistream select header')
let response = await multistream.readString(lp, options)
options?.log.trace('lazy: read multistream select header "%s"', response)

Expand Down
70 changes: 70 additions & 0 deletions packages/multistream-select/test/integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,76 @@ describe('Dialer and Listener integration', () => {
expect(new Uint8ArrayList(...dialerOut).slice()).to.eql(new Uint8ArrayList(...input).slice())
})

it('should handle and lazySelect that fails', async () => {
const protocol = '/echo/1.0.0'
const otherProtocol = '/echo/2.0.0'
const pair = duplexPair<Uint8ArrayList | Uint8Array>()

const dialerSelection = mss.lazySelect(pair[0], protocol, {
log: logger('mss:test')
})
expect(dialerSelection.protocol).to.equal(protocol)

// the listener handles the incoming stream
void mss.handle(pair[1], otherProtocol, {
log: logger('mss:test')
})

// should fail when we interact with the stream
const input = [randomBytes(10), randomBytes(64), randomBytes(3)]
await expect(pipe(input, dialerSelection.stream, async source => all(source)))
.to.eventually.be.rejected.with.property('code', 'ERR_UNSUPPORTED_PROTOCOL')
})

it('should handle and lazySelect only by reading', async () => {
const protocol = '/echo/1.0.0'
const pair = duplexPair<Uint8ArrayList | Uint8Array>()

const dialerSelection = mss.lazySelect(pair[0], protocol, {
log: logger('mss:dialer')
})
expect(dialerSelection.protocol).to.equal(protocol)

// ensure stream is usable after selection
const input = [randomBytes(10), randomBytes(64), randomBytes(3)]

const [, dialerOut] = await Promise.all([
// the listener handles the incoming stream
mss.handle(pair[1], protocol, {
log: logger('mss:listener')
}).then(async result => {
// the listener writes to the incoming stream
await pipe(input, result.stream)
}),

// the dialer just reads from the stream
pipe(dialerSelection.stream, async source => all(source))
])

expect(new Uint8ArrayList(...dialerOut).slice()).to.eql(new Uint8ArrayList(...input).slice())
})

it('should handle and lazySelect only by reading that fails', async () => {
const protocol = '/echo/1.0.0'
const otherProtocol = '/echo/2.0.0'
const pair = duplexPair<Uint8ArrayList | Uint8Array>()

// lazy succeeds
const dialerSelection = mss.lazySelect(pair[0], protocol, {
log: logger('mss:dialer')
})
expect(dialerSelection.protocol).to.equal(protocol)

// the listener handles the incoming stream
void mss.handle(pair[1], otherProtocol, {
log: logger('mss:listener')
})

// should fail when we interact with the stream
await expect(pipe(dialerSelection.stream, async source => all(source)))
.to.eventually.be.rejected.with.property('code', 'ERR_UNSUPPORTED_PROTOCOL')
})

it('should abort an unhandled lazySelect', async () => {
const protocol = '/echo/1.0.0'
const pair = duplexPair<Uint8ArrayList | Uint8Array>()
Expand Down
2 changes: 1 addition & 1 deletion packages/multistream-select/test/multistream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ describe('Multistream', () => {
const duplexes = duplexPair<Uint8Array>()
const inputStream = lpStream(duplexes[0])
const outputStream = lpStream(duplexes[1], {
maxDataLength: 100
maxDataLength: 9999
})

void inputStream.write(input)
Expand Down

0 comments on commit d8f5bc2

Please sign in to comment.