Skip to content

Commit

Permalink
fix: ensure identify streams are closed (#551)
Browse files Browse the repository at this point in the history
* fix: ensure identify streams are closed

* fix: call connection.addStream properly

* chore: simplify stream closure

* test: improve durability of identify push test
  • Loading branch information
jacobheun authored Feb 5, 2020
1 parent 5608178 commit f662fdc
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
10 changes: 7 additions & 3 deletions src/identify/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const debug = require('debug')
const pb = require('it-protocol-buffers')
const lp = require('it-length-prefixed')
const pipe = require('it-pipe')
const { collect, take } = require('streaming-iterables')
const { collect, take, consume } = require('streaming-iterables')

const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
Expand Down Expand Up @@ -114,7 +114,8 @@ class IdentifyService {
protocols: Array.from(this._protocols.keys())
}],
pb.encode(Message),
stream
stream,
consume
)
} catch (err) {
// Just log errors
Expand Down Expand Up @@ -153,6 +154,7 @@ class IdentifyService {
async identify (connection) {
const { stream } = await connection.newStream(MULTICODEC_IDENTIFY)
const [data] = await pipe(
[],
stream,
lp.decode(),
take(1),
Expand Down Expand Up @@ -242,7 +244,8 @@ class IdentifyService {
pipe(
[message],
lp.encode(),
stream
stream,
consume
)
}

Expand All @@ -255,6 +258,7 @@ class IdentifyService {
*/
async _handlePush ({ connection, stream }) {
const [data] = await pipe(
[],
stream,
lp.decode(),
take(1),
Expand Down
2 changes: 1 addition & 1 deletion src/upgrader.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class Upgrader {
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
log('%s: incoming stream opened on %s', direction, protocol)
if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol })
connection.addStream(stream, protocol)
connection.addStream(muxedStream, { protocol })
this._onStream({ connection, stream, protocol })
} catch (err) {
log.error(err)
Expand Down
15 changes: 10 additions & 5 deletions test/identify/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const duplexPair = require('it-pair/duplex')
const multiaddr = require('multiaddr')
const pWaitFor = require('p-wait-for')

const { codes: Errors } = require('../../src/errors')
const { IdentifyService, multicodecs } = require('../../src/identify')
Expand Down Expand Up @@ -203,16 +204,17 @@ describe('Identify', () => {
})

sinon.spy(libp2p.identifyService, 'identify')
sinon.spy(libp2p.peerStore, 'replace')
const peerStoreSpy = sinon.spy(libp2p.peerStore, 'replace')

const connection = await libp2p.dialer.connectToPeer(remoteAddr)
expect(connection).to.exist()
// Wait for nextTick to trigger the identify call
await delay(1)

// Wait for peer store to be updated
await pWaitFor(() => peerStoreSpy.callCount === 1)
expect(libp2p.identifyService.identify.callCount).to.equal(1)
await libp2p.identifyService.identify.firstCall.returnValue

expect(libp2p.peerStore.replace.callCount).to.equal(1)
// The connection should have no open streams
expect(connection.streams).to.have.length(0)
await connection.close()
})

Expand Down Expand Up @@ -247,6 +249,9 @@ describe('Identify', () => {
const results = await call.returnValue
expect(results.length).to.equal(1)
}

// Verify the streams close
await pWaitFor(() => connection.streams.length === 0)
})
})
})

0 comments on commit f662fdc

Please sign in to comment.