Skip to content
This repository was archived by the owner on Jun 27, 2023. It is now read-only.

Commit 760594e

Browse files
authored
fix: do not unsubscribe after publish (#78)
Now that we route all messages via a `'message'` event, we can't check the number of listeners for a given topic using the listener count.
1 parent 9cb24bc commit 760594e

File tree

2 files changed

+28
-8
lines changed

2 files changed

+28
-8
lines changed

src/index.ts

+4-7
Original file line numberDiff line numberDiff line change
@@ -621,10 +621,6 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
621621
super.dispatchEvent(new CustomEvent<Message>('message', {
622622
detail: rpcMessage
623623
}))
624-
625-
if (this.listenerCount(topic) === 0) {
626-
this.unsubscribe(topic)
627-
}
628624
}
629625
}
630626

@@ -655,6 +651,8 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
655651
throw new Error('Pubsub has not started')
656652
}
657653

654+
log('subscribe to topic: %s', topic)
655+
658656
if (!this.subscriptions.has(topic)) {
659657
this.subscriptions.add(topic)
660658

@@ -676,11 +674,10 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
676674
super.removeEventListener(topic)
677675

678676
const wasSubscribed = this.subscriptions.has(topic)
679-
const listeners = this.listenerCount(topic)
680677

681-
log('unsubscribe from %s - am subscribed %s, listeners %d', topic, wasSubscribed, listeners)
678+
log('unsubscribe from %s - am subscribed %s', topic, wasSubscribed)
682679

683-
if (wasSubscribed && listeners === 0) {
680+
if (wasSubscribed) {
684681
this.subscriptions.delete(topic)
685682

686683
for (const peerId of this.peers.keys()) {

test/pubsub.spec.ts

+24-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ describe('pubsub base implementation', () => {
3131
beforeEach(async () => {
3232
const peerId = await createPeerId()
3333
pubsub = new PubsubImplementation({
34-
multicodecs: [protocol]
34+
multicodecs: [protocol],
35+
emitSelf: true
3536
})
3637
pubsub.init(new Components({
3738
peerId: peerId,
@@ -75,6 +76,28 @@ describe('pubsub base implementation', () => {
7576

7677
await expect(pubsub.validate(signedMessage)).to.eventually.be.undefined()
7778
})
79+
80+
it('calls publishes messages twice', async () => {
81+
let count = 0
82+
83+
await pubsub.start()
84+
pubsub.subscribe(topic)
85+
86+
pubsub.addEventListener('message', evt => {
87+
if (evt.detail.topic === topic) {
88+
count++
89+
}
90+
})
91+
await pubsub.publish(topic, message)
92+
await pubsub.publish(topic, message)
93+
94+
// event dispatch is async
95+
await pWaitFor(() => {
96+
return count === 2
97+
})
98+
99+
expect(count).to.eql(2)
100+
})
78101
})
79102

80103
describe('subscribe', () => {

0 commit comments

Comments
 (0)