Skip to content

Commit f86dfbc

Browse files
authored
fix: quick reconnects (#57)
1 parent 66529fa commit f86dfbc

File tree

3 files changed

+46
-28
lines changed

3 files changed

+46
-28
lines changed

src/index.js

+2-12
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,6 @@ class PubsubBaseProtocol extends EventEmitter {
189189

190190
const peer = this._addPeer(peerId, this.multicodecs)
191191

192-
if (peer.isConnected) {
193-
return
194-
}
195-
196192
try {
197193
const { stream } = await conn.newStream(this.multicodecs)
198194
peer.attachConnection(stream)
@@ -239,7 +235,6 @@ class PubsubBaseProtocol extends EventEmitter {
239235

240236
peer.once('close', () => this._removePeer(peer))
241237
}
242-
++existing._references
243238

244239
return existing
245240
}
@@ -254,13 +249,8 @@ class PubsubBaseProtocol extends EventEmitter {
254249
if (!peer) return
255250
const id = peer.id.toB58String()
256251

257-
this.log('remove', id, peer._references)
258-
259-
// Only delete when no one else is referencing this peer.
260-
if (--peer._references === 0) {
261-
this.log('delete peer', id)
262-
this.peers.delete(id)
263-
}
252+
this.log('delete peer', id)
253+
this.peers.delete(id)
264254

265255
return peer
266256
}

src/peer.js

+23-11
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ const EventEmitter = require('events')
55
const lp = require('it-length-prefixed')
66
const pushable = require('it-pushable')
77
const pipe = require('it-pipe')
8+
const debug = require('debug')
9+
10+
const log = debug('libp2p-pubsub:peer')
11+
log.error = debug('libp2p-pubsub:peer:error')
812

913
const { RPC } = require('./message')
1014

@@ -39,8 +43,6 @@ class Peer extends EventEmitter {
3943
* @type {Pushable}
4044
*/
4145
this.stream = null
42-
43-
this._references = 0
4446
}
4547

4648
/**
@@ -83,25 +85,38 @@ class Peer extends EventEmitter {
8385
* @param {Connection} conn
8486
* @returns {void}
8587
*/
86-
attachConnection (conn) {
87-
this.conn = conn
88+
async attachConnection (conn) {
89+
const _prevStream = this.stream
90+
if (_prevStream) {
91+
// End the stream without emitting a close event
92+
await _prevStream.end(false)
93+
}
94+
8895
this.stream = pushable({
89-
onEnd: () => {
96+
onEnd: (emit) => {
9097
// close readable side of the stream
9198
this.conn.reset && this.conn.reset()
9299
this.conn = null
93100
this.stream = null
94-
this.emit('close')
101+
if (emit !== false) {
102+
this.emit('close')
103+
}
95104
}
96105
})
106+
this.conn = conn
97107

98108
pipe(
99109
this.stream,
100110
lp.encode(),
101111
conn
102-
)
112+
).catch(err => {
113+
log.error(err)
114+
})
103115

104-
this.emit('connection')
116+
// Only emit if the connection is new
117+
if (!_prevStream) {
118+
this.emit('connection')
119+
}
105120
}
106121

107122
_sendRawSubscriptions (topics, subscribe) {
@@ -173,9 +188,6 @@ class Peer extends EventEmitter {
173188
* @returns {void}
174189
*/
175190
close () {
176-
// Force removal of peer
177-
this._references = 1
178-
179191
// End the pushable
180192
if (this.stream) {
181193
this.stream.end()

test/pubsub.spec.js

+21-5
Original file line numberDiff line numberDiff line change
@@ -189,14 +189,15 @@ describe('pubsub base protocol', () => {
189189
expect(pubsubB.peers.size).to.be.eql(1)
190190
})
191191

192-
it('should not create a new stream if onConnect is called twice', async () => {
192+
it('should use the latest connection if onConnect is called more than once', async () => {
193193
const onConnectA = registrarRecordA[protocol].onConnect
194194
const handlerB = registrarRecordB[protocol].handler
195195

196196
// Notice peers of connection
197197
const [c0, c1] = ConnectionPair()
198+
const [c2] = ConnectionPair()
198199

199-
const spyNewStream = sinon.spy(c0, 'newStream')
200+
sinon.spy(c0, 'newStream')
200201

201202
await onConnectA(peerIdB, c0)
202203
await handlerB({
@@ -206,10 +207,25 @@ describe('pubsub base protocol', () => {
206207
remotePeer: peerIdA
207208
}
208209
})
209-
expect(spyNewStream).to.have.property('callCount', 1)
210+
expect(c0.newStream).to.have.property('callCount', 1)
210211

211-
await onConnectA(peerIdB, c0)
212-
expect(spyNewStream).to.have.property('callCount', 1)
212+
sinon.spy(pubsubA, '_removePeer')
213+
214+
sinon.spy(c2, 'newStream')
215+
216+
await onConnectA(peerIdB, c2)
217+
expect(c2.newStream).to.have.property('callCount', 1)
218+
expect(pubsubA._removePeer).to.have.property('callCount', 0)
219+
220+
// Verify the first stream was closed
221+
const { stream: firstStream } = await c0.newStream.returnValues[0]
222+
try {
223+
await firstStream.sink(['test'])
224+
} catch (err) {
225+
expect(err).to.exist()
226+
return
227+
}
228+
expect.fail('original stream should have ended')
213229
})
214230

215231
it('should handle newStream errors in onConnect', async () => {

0 commit comments

Comments
 (0)