Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
fix: quick reconnects (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun authored Jul 7, 2020
1 parent 66529fa commit f86dfbc
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 28 deletions.
14 changes: 2 additions & 12 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,6 @@ class PubsubBaseProtocol extends EventEmitter {

const peer = this._addPeer(peerId, this.multicodecs)

if (peer.isConnected) {
return
}

try {
const { stream } = await conn.newStream(this.multicodecs)
peer.attachConnection(stream)
Expand Down Expand Up @@ -239,7 +235,6 @@ class PubsubBaseProtocol extends EventEmitter {

peer.once('close', () => this._removePeer(peer))
}
++existing._references

return existing
}
Expand All @@ -254,13 +249,8 @@ class PubsubBaseProtocol extends EventEmitter {
if (!peer) return
const id = peer.id.toB58String()

this.log('remove', id, peer._references)

// Only delete when no one else is referencing this peer.
if (--peer._references === 0) {
this.log('delete peer', id)
this.peers.delete(id)
}
this.log('delete peer', id)
this.peers.delete(id)

return peer
}
Expand Down
34 changes: 23 additions & 11 deletions src/peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ const EventEmitter = require('events')
const lp = require('it-length-prefixed')
const pushable = require('it-pushable')
const pipe = require('it-pipe')
const debug = require('debug')

const log = debug('libp2p-pubsub:peer')
log.error = debug('libp2p-pubsub:peer:error')

const { RPC } = require('./message')

Expand Down Expand Up @@ -39,8 +43,6 @@ class Peer extends EventEmitter {
* @type {Pushable}
*/
this.stream = null

this._references = 0
}

/**
Expand Down Expand Up @@ -83,25 +85,38 @@ class Peer extends EventEmitter {
* @param {Connection} conn
* @returns {void}
*/
attachConnection (conn) {
this.conn = conn
async attachConnection (conn) {
const _prevStream = this.stream
if (_prevStream) {
// End the stream without emitting a close event
await _prevStream.end(false)
}

this.stream = pushable({
onEnd: () => {
onEnd: (emit) => {
// close readable side of the stream
this.conn.reset && this.conn.reset()
this.conn = null
this.stream = null
this.emit('close')
if (emit !== false) {
this.emit('close')
}
}
})
this.conn = conn

pipe(
this.stream,
lp.encode(),
conn
)
).catch(err => {
log.error(err)
})

this.emit('connection')
// Only emit if the connection is new
if (!_prevStream) {
this.emit('connection')
}
}

_sendRawSubscriptions (topics, subscribe) {
Expand Down Expand Up @@ -173,9 +188,6 @@ class Peer extends EventEmitter {
* @returns {void}
*/
close () {
// Force removal of peer
this._references = 1

// End the pushable
if (this.stream) {
this.stream.end()
Expand Down
26 changes: 21 additions & 5 deletions test/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,15 @@ describe('pubsub base protocol', () => {
expect(pubsubB.peers.size).to.be.eql(1)
})

it('should not create a new stream if onConnect is called twice', async () => {
it('should use the latest connection if onConnect is called more than once', async () => {
const onConnectA = registrarRecordA[protocol].onConnect
const handlerB = registrarRecordB[protocol].handler

// Notice peers of connection
const [c0, c1] = ConnectionPair()
const [c2] = ConnectionPair()

const spyNewStream = sinon.spy(c0, 'newStream')
sinon.spy(c0, 'newStream')

await onConnectA(peerIdB, c0)
await handlerB({
Expand All @@ -206,10 +207,25 @@ describe('pubsub base protocol', () => {
remotePeer: peerIdA
}
})
expect(spyNewStream).to.have.property('callCount', 1)
expect(c0.newStream).to.have.property('callCount', 1)

await onConnectA(peerIdB, c0)
expect(spyNewStream).to.have.property('callCount', 1)
sinon.spy(pubsubA, '_removePeer')

sinon.spy(c2, 'newStream')

await onConnectA(peerIdB, c2)
expect(c2.newStream).to.have.property('callCount', 1)
expect(pubsubA._removePeer).to.have.property('callCount', 0)

// Verify the first stream was closed
const { stream: firstStream } = await c0.newStream.returnValues[0]
try {
await firstStream.sink(['test'])
} catch (err) {
expect(err).to.exist()
return
}
expect.fail('original stream should have ended')
})

it('should handle newStream errors in onConnect', async () => {
Expand Down

0 comments on commit f86dfbc

Please sign in to comment.