Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
feat!: add upgrader options (#290)
Browse files Browse the repository at this point in the history
Allow upgrader to accept a StreamMuxerFactory and skip encryption & protection for transports that inherently support encryption and/or multiplexing (WebRTC, WebTransport, etc.) or don't support protection.

BREAKING CHANGE: the return type of StreamMuxer.newStream can now return a promise

Co-authored-by: Marco Munizaga <marco@marcopolo.io>
  • Loading branch information
ckousik and MarcoPolo authored Oct 6, 2022
1 parent 242608d commit c502b66
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 22 deletions.
2 changes: 1 addition & 1 deletion packages/interface-mocks/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class MockConnection implements Connection {
}

const id = `${Math.random()}`
const stream: Stream = this.muxer.newStream(id)
const stream: Stream = await this.muxer.newStream(id)
const result = await mss.select(stream, protocols, options)

const streamWithProtocol: Stream = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const conn = dialer.newStream()
const conn = await dialer.newStream()
expect(dialer.streams).to.include(conn)
expect(isValidTick(conn.stat.timeline.open)).to.equal(true)

Expand Down Expand Up @@ -91,7 +91,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const conn = listener.newStream()
const conn = await listener.newStream()

void drainAndClose(conn)

Expand Down Expand Up @@ -128,8 +128,8 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const dialerInitiatorStream = dialer.newStream()
const listenerInitiatorStream = listener.newStream()
const dialerInitiatorStream = await dialer.newStream()
const listenerInitiatorStream = await listener.newStream()

await Promise.all([
drainAndClose(dialerInitiatorStream),
Expand Down Expand Up @@ -167,8 +167,8 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const dialerConn = dialer.newStream()
const listenerConn = listener.newStream()
const dialerConn = await dialer.newStream()
const listenerConn = await listener.newStream()

void pipe([new Uint8ArrayList(uint8ArrayFromString('hey'))], dialerConn)
void pipe([new Uint8ArrayList(uint8ArrayFromString('hello'))], listenerConn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const streams = Array(expectedStreams).fill(0).map(() => dialer.newStream())
const streams = await Promise.all(Array(expectedStreams).fill(0).map(() => dialer.newStream()))

void Promise.all(
streams.map(async stream => {
Expand Down Expand Up @@ -89,7 +89,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const streams = Array(expectedStreams).fill(0).map(() => dialer.newStream())
const streams = await Promise.all(Array(expectedStreams).fill(0).map(() => dialer.newStream()))

void Promise.all(
streams.map(async stream => {
Expand Down Expand Up @@ -132,7 +132,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const streams = Array(expectedStreams).fill(0).map(() => dialer.newStream())
const streams = await Promise.all(Array(expectedStreams).fill(0).map(() => dialer.newStream()))

const streamPipes = streams.map(async stream => {
return await pipe(
Expand Down Expand Up @@ -176,7 +176,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
dialer.close()

try {
dialer.newStream()
await dialer.newStream()
expect.fail('newStream should throw if called after close')
} catch (e) {
expect(dialer.streams, 'closed muxer should have no streams').to.have.lengthOf(0)
Expand All @@ -200,8 +200,8 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const stream = dialer.newStream()
const streams = Array.from(Array(5), () => dialer.newStream())
const stream = await dialer.newStream()
const streams = await Promise.all(Array.from(Array(5), () => dialer.newStream()))
let closed = false
const controllers: AbortController[] = []

Expand Down Expand Up @@ -271,7 +271,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const stream = dialer.newStream()
const stream = await dialer.newStream()
await stream.sink(data)

const err = await deferred.promise
Expand All @@ -297,7 +297,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const stream = dialer.newStream()
const stream = await dialer.newStream()
await stream.closeRead()

// Source should be done
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export default async (createMuxer: (init?: StreamMuxerInit) => Promise<StreamMux
void pipe(dialerSocket, dialer, dialerSocket)

const spawnStream = async () => {
const stream = dialer.newStream()
const stream = await dialer.newStream()
expect(stream).to.exist // eslint-disable-line

const res = await pipe(
Expand Down
6 changes: 3 additions & 3 deletions packages/interface-stream-muxer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
- [`const muxer = new Muxer([options])`](#const-muxer--new-muxeroptions)
- [`muxer.onStream`](#muxeronstream)
- [`muxer.onStreamEnd`](#muxeronstreamend)
- [`const stream = muxer.newStream([options])`](#const-stream--muxernewstreamoptions)
- [`const stream = await muxer.newStream([options])`](#const-stream--muxernewstreamoptions)
- [`const streams = muxer.streams`](#const-streams--muxerstreams)
- [License](#license)
- [Contribution](#contribution)
Expand Down Expand Up @@ -150,15 +150,15 @@ const muxer = new Muxer()
muxer.onStreamEnd = stream => { /* ... */ }
```
### `const stream = muxer.newStream([options])`
### `const stream = await muxer.newStream([options])`
Initiate a new stream with the remote. Returns a [duplex stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it).
e.g.
```js
// Create a new stream on the muxed connection
const stream = muxer.newStream()
const stream = await muxer.newStream()

// Use this new stream like any other duplex stream:
pipe([1, 2, 3], stream, consume)
Expand Down
2 changes: 1 addition & 1 deletion packages/interface-stream-muxer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export interface StreamMuxer extends Duplex<Uint8Array> {
* Initiate a new stream with the given name. If no name is
* provided, the id of the stream will be used.
*/
newStream: (name?: string) => Stream
newStream: (name?: string) => Stream | Promise<Stream>

/**
* Close or abort all tracked streams and stop the muxer
Expand Down
1 change: 1 addition & 0 deletions packages/interface-transport/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
"@libp2p/interface-connection": "^3.0.0",
"@libp2p/interfaces": "^3.0.0",
"@multiformats/multiaddr": "^11.0.0",
"@libp2p/interface-stream-muxer": "^2.0.2",
"it-stream-types": "^1.0.4"
},
"devDependencies": {
Expand Down
11 changes: 9 additions & 2 deletions packages/interface-transport/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { AbortOptions } from '@libp2p/interfaces'
import type { EventEmitter } from '@libp2p/interfaces/events'
import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Connection, MultiaddrConnection } from '@libp2p/interface-connection'
import type { Duplex } from 'it-stream-types'
Expand Down Expand Up @@ -78,16 +79,22 @@ export interface UpgraderEvents {
'connectionEnd': CustomEvent<Connection>
}

export interface UpgraderOptions {
skipEncryption?: boolean
skipProtection?: boolean
muxerFactory?: StreamMuxerFactory
}

export interface Upgrader extends EventEmitter<UpgraderEvents> {
/**
* Upgrades an outbound connection on `transport.dial`.
*/
upgradeOutbound: (maConn: MultiaddrConnection) => Promise<Connection>
upgradeOutbound: (maConn: MultiaddrConnection, opts?: UpgraderOptions) => Promise<Connection>

/**
* Upgrades an inbound connection on transport listener.
*/
upgradeInbound: (maConn: MultiaddrConnection) => Promise<Connection>
upgradeInbound: (maConn: MultiaddrConnection, opts?: UpgraderOptions) => Promise<Connection>
}

export interface ProtocolHandler {
Expand Down

0 comments on commit c502b66

Please sign in to comment.