Skip to content
This repository has been archived by the owner on May 7, 2024. It is now read-only.

Commit

Permalink
fix: Close stream after sink (#23)
Browse files Browse the repository at this point in the history
* Close stream after sink

* Update src/index.ts

Co-authored-by: Alex Potsides <alex@achingbrain.net>

* Update src/index.ts

Co-authored-by: Alex Potsides <alex@achingbrain.net>

* Update src/index.ts

Co-authored-by: Alex Potsides <alex@achingbrain.net>

* Update test/browser.ts

Co-authored-by: Alex Potsides <alex@achingbrain.net>

* Update test/browser.ts

Co-authored-by: Alex Potsides <alex@achingbrain.net>

* Set type for err

* Don't use errCode

* Wait for pong before closing write side

* Update go-libp2p for tests

* Undo implementing ping again

* go mod tidy

* Fix go.mod - don't reuire newer quic-go

* Revert "Undo implementing ping again"

This reverts commit e67b51f.

* Update comment

* Make aegir lint happy

Co-authored-by: Alex Potsides <alex@achingbrain.net>
  • Loading branch information
MarcoPolo and achingbrain authored Nov 16, 2022
1 parent 97de951 commit a95720c
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 18 deletions.
4 changes: 2 additions & 2 deletions go-libp2p-webtransport-server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module github.com/libp2p/js-libp2p-webtransport/go-libp2p-webtransport-server/m/
go 1.19

require (
github.com/libp2p/go-libp2p v0.23.2
github.com/multiformats/go-multiaddr v0.7.0
github.com/libp2p/go-libp2p v0.23.4
github.com/multiformats/go-multiaddr v0.8.0
)

require (
Expand Down
8 changes: 4 additions & 4 deletions go-libp2p-webtransport-server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38y
github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic=
github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM=
github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro=
github.com/libp2p/go-libp2p v0.23.2 h1:yqyTeKQJyofWXxEv/eEVUvOrGdt/9x+0PIQ4N1kaxmE=
github.com/libp2p/go-libp2p v0.23.2/go.mod h1:s9DEa5NLR4g+LZS+md5uGU4emjMWFiqkZr6hBTY8UxI=
github.com/libp2p/go-libp2p v0.23.4 h1:hWi9XHSOVFR1oDWRk7rigfyA4XNMuYL20INNybP9LP8=
github.com/libp2p/go-libp2p v0.23.4/go.mod h1:s9DEa5NLR4g+LZS+md5uGU4emjMWFiqkZr6hBTY8UxI=
github.com/libp2p/go-libp2p-asn-util v0.2.0 h1:rg3+Os8jbnO5DxkC7K/Utdi+DkY3q/d1/1q+8WeNAsw=
github.com/libp2p/go-libp2p-asn-util v0.2.0/go.mod h1:WoaWxbHKBymSN41hWSq/lGKJEca7TNm58+gGJi2WsLI=
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
Expand Down Expand Up @@ -320,8 +320,8 @@ github.com/multiformats/go-base36 v0.1.0 h1:JR6TyF7JjGd3m6FbLU2cOxhC0Li8z8dLNGQ8
github.com/multiformats/go-base36 v0.1.0/go.mod h1:kFGE83c6s80PklsHO9sRn2NCoffoRdUUOENyW/Vv6sM=
github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU55txyt0p4aiWVohjo=
github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4=
github.com/multiformats/go-multiaddr v0.7.0 h1:gskHcdaCyPtp9XskVwtvEeQOG465sCohbQIirSyqxrc=
github.com/multiformats/go-multiaddr v0.7.0/go.mod h1:Fs50eBDWvZu+l3/9S6xAE7ZYj6yhxlvaVZjakWN7xRs=
github.com/multiformats/go-multiaddr v0.8.0 h1:aqjksEcqK+iD/Foe1RRFsGZh8+XFiGo7FgUCZlpv3LU=
github.com/multiformats/go-multiaddr v0.8.0/go.mod h1:Fs50eBDWvZu+l3/9S6xAE7ZYj6yhxlvaVZjakWN7xRs=
github.com/multiformats/go-multiaddr-dns v0.3.1 h1:QgQgR+LQVt3NPTjbrLLpsaT2ufAA2y0Mkk+QRVJbW3A=
github.com/multiformats/go-multiaddr-dns v0.3.1/go.mod h1:G/245BRQ6FJGmryJCrOuTdB37AMA5AMOVuO6NY3JwTk=
github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E=
Expand Down
7 changes: 7 additions & 0 deletions go-libp2p-webtransport-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package main

import (
"fmt"
"io"
"os"
"os/signal"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/network"
webtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
"github.com/multiformats/go-multiaddr"
)
Expand All @@ -22,6 +24,11 @@ func main() {
panic(err)
}

h.SetStreamHandler("echo", func(s network.Stream) {
io.Copy(s, s)
s.Close()
})

for _, a := range h.Addrs() {
withP2p := a.Encapsulate(multiaddr.StringCast("/p2p/" + h.ID().String()))
fmt.Printf("addr=%s\n", withP2p.String())
Expand Down
35 changes: 26 additions & 9 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import type { StreamMuxerFactory, StreamMuxerInit, StreamMuxer } from '@libp2p/i
import { Uint8ArrayList } from 'uint8arraylist'

const log = logger('libp2p:webtransport')

declare global {
interface Window {
WebTransport: any
Expand Down Expand Up @@ -63,7 +62,13 @@ async function webtransportBiDiStreamToStream (bidiStream: any, streamId: string
let writerClosed = false
let readerClosed = false;
(async function () {
await (writer.closed.then((ok: any) => ({ ok })).catch((err: any) => ({ err })))
const err: Error | undefined = await writer.closed.catch((err: Error) => err)
if (err != null) {
const msg = err.message
if (!(msg.includes('aborted by the remote server') || msg.includes('STOP_SENDING'))) {
log.error(`WebTransport writer closed unexpectedly: streamId=${streamId} err=${err.message}`)
}
}
writerClosed = true
if (writerClosed && readerClosed) {
cleanupStreamFromActiveStreams()
Expand All @@ -73,7 +78,10 @@ async function webtransportBiDiStreamToStream (bidiStream: any, streamId: string
});

(async function () {
await (reader.closed.then((ok: any) => ({ ok })).catch((err: any) => ({ err })))
const err: Error | undefined = await reader.closed.catch((err: Error) => err)
if (err != null) {
log.error(`WebTransport reader closed unexpectedly: streamId=${streamId} err=${err.message}`)
}
readerClosed = true
if (writerClosed && readerClosed) {
cleanupStreamFromActiveStreams()
Expand All @@ -82,6 +90,7 @@ async function webtransportBiDiStreamToStream (bidiStream: any, streamId: string
log.error('WebTransport failed to cleanup closed stream')
})

let sinkSunk = false
const stream: Stream = {
id: streamId,
abort (_err: Error) {
Expand Down Expand Up @@ -148,14 +157,22 @@ async function webtransportBiDiStreamToStream (bidiStream: any, streamId: string
}
})(),
sink: async function (source: Source<Uint8Array | Uint8ArrayList>) {
for await (const chunks of source) {
if (chunks.constructor === Uint8Array) {
await writer.write(chunks)
} else {
for (const buf of chunks) {
await writer.write(buf)
if (sinkSunk) {
throw new Error('sink already called on stream')
}
sinkSunk = true
try {
for await (const chunks of source) {
if (chunks instanceof Uint8Array) {
await writer.write(chunks)
} else {
for (const buf of chunks) {
await writer.write(buf)
}
}
}
} finally {
stream.closeWrite()
}
}
}
Expand Down
84 changes: 81 additions & 3 deletions test/browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,46 @@ describe('libp2p-webtransport', () => {
})

await node.start()
const res = await node.ping(ma)
console.log('Ping ', res)
expect(res).to.greaterThan(0)

// Ping many times
for (let index = 0; index < 100; index++) {
const now = Date.now()

// Note we're re-implementing the ping protocol here because as of this
// writing, go-libp2p will reset the stream instead of close it. The next
// version of go-libp2p v0.24.0 will have this fix. When that's released
// we can use the builtin ping system
const stream = await node.dialProtocol(ma, '/ipfs/ping/1.0.0')

const data = new Uint8Array(32)
globalThis.crypto.getRandomValues(data)

const pong = new Promise<void>((resolve, reject) => {
(async () => {
for await (const chunk of stream.source) {
const v = chunk.subarray()
const byteMatches: boolean = v.every((byte: number, i: number) => byte === data[i])
if (byteMatches) {
resolve()
} else {
reject(new Error('Wrong pong'))
}
}
})().catch(reject)
})

let res = -1
await stream.sink((async function * () {
yield data
// Wait for the pong before we close the write side
await pong
res = Date.now() - now
})())

await stream.close()

expect(res).to.be.greaterThan(-1)
}

await node.stop()
const conns = node.connectionManager.getConnections()
Expand All @@ -48,6 +85,47 @@ describe('libp2p-webtransport', () => {

const err = await expect(node.dial(ma)).to.eventually.be.rejected()
expect(err.errors[0].toString()).to.contain('WebTransportError: Opening handshake failed.')

await node.stop()
})

it('Closes writes of streams after they have sunk a source', async () => {
// This is the behavor of stream muxers: (see mplex, yamux and compliance tests: https://github.com/libp2p/js-libp2p-interfaces/blob/master/packages/interface-stream-muxer-compliance-tests/src/close-test.ts)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const maStr: string = process.env.serverAddr!
const ma = multiaddr(maStr)
const node = await createLibp2p({
transports: [webTransport()],
connectionEncryption: [() => new Noise()]
})

async function * gen () {
yield new Uint8Array([0])
yield new Uint8Array([1, 2, 3, 4])
yield new Uint8Array([5, 6, 7])
yield new Uint8Array([8, 9, 10, 11])
yield new Uint8Array([12, 13, 14, 15])
}

await node.start()
const stream = await node.dialProtocol(ma, 'echo')

await stream.sink(gen())

let expectedNextNumber = 0
for await (const chunk of stream.source) {
for (const byte of chunk.subarray()) {
expect(byte).to.equal(expectedNextNumber++)
}
}
expect(expectedNextNumber).to.equal(16)

// Close read, we've should have closed the write side during sink
stream.closeRead()

expect(stream.stat.timeline.close).to.be.greaterThan(0)

await node.stop()
})
})

Expand Down

0 comments on commit a95720c

Please sign in to comment.