Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 3bcd36c

Browse files
committedAug 12, 2021
feat: pubsub over gRPC
Browsers can only have six concurrently open connections to a host name. Pubsub works over HTTP by holding a connection open per subscription, which means you can only subscribe six times before things start to hang. gRPC runs over websockets so doesn't have this limitation. This PR adds pubsub support to the gRPC server and `ipfs-client` module so you can subscribe to lots and lots of channels concurrently, working around the browser connection limitation. Refs: #3741
1 parent 5ab3ced commit 3bcd36c

File tree

15 files changed

+415
-8
lines changed

15 files changed

+415
-8
lines changed
 

‎packages/interface-ipfs-core/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
"pako": "^1.0.2",
7878
"peer-id": "^0.15.1",
7979
"readable-stream": "^3.4.0",
80+
"sinon": "^11.1.1",
8081
"uint8arrays": "^2.1.6"
8182
},
8283
"contributors": [

‎packages/interface-ipfs-core/src/pubsub/peers.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ module.exports = (factory, options) => {
3737
let ipfs3Id
3838

3939
before(async () => {
40-
ipfs1 = (await factory.spawn({ type: 'proc', ipfsOptions })).api
40+
ipfs1 = (await factory.spawn({ ipfsOptions })).api
4141
// webworkers are not dialable because webrtc is not available
42-
ipfs2 = (await factory.spawn({ type: isWebWorker ? 'go' : undefined })).api
43-
ipfs3 = (await factory.spawn({ type: isWebWorker ? 'go' : undefined })).api
42+
ipfs2 = (await factory.spawn({ type: isWebWorker ? 'js' : undefined, ipfsOptions })).api
43+
ipfs3 = (await factory.spawn({ type: isWebWorker ? 'js' : undefined, ipfsOptions })).api
4444

4545
ipfs2Id = await ipfs2.id()
4646
ipfs3Id = await ipfs3.id()

‎packages/interface-ipfs-core/src/pubsub/subscribe.js

+137-4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const { AbortController } = require('native-abort-controller')
1313
const { isWebWorker, isNode } = require('ipfs-utils/src/env')
1414
const getIpfsOptions = require('../utils/ipfs-options-websockets-filter-all')
1515
const first = require('it-first')
16+
const sinon = require('sinon')
1617

1718
/**
1819
* @typedef {import('ipfsd-ctl').Factory} Factory
@@ -44,12 +45,10 @@ module.exports = (factory, options) => {
4445
let ipfs2Id
4546

4647
before(async () => {
47-
ipfs1 = (await factory.spawn({ type: 'proc', ipfsOptions })).api
48-
// TODO 'multiple connected nodes' tests fails with go in Firefox
49-
// and JS is flaky everywhere
48+
ipfs1 = (await factory.spawn({ ipfsOptions })).api
5049

5150
// webworkers are not dialable because webrtc is not available
52-
ipfs2 = (await factory.spawn({ type: isWebWorker ? 'go' : undefined })).api
51+
ipfs2 = (await factory.spawn({ type: isWebWorker ? 'js' : undefined, ipfsOptions })).api
5352

5453
ipfs1Id = await ipfs1.id()
5554
ipfs2Id = await ipfs2.id()
@@ -84,6 +83,7 @@ module.exports = (factory, options) => {
8483
await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hi'))
8584

8685
const msg = await first(msgStream)
86+
8787
expect(uint8ArrayToString(msg.data)).to.equal('hi')
8888
expect(msg).to.have.property('seqno')
8989
expect(msg.seqno).to.be.an.instanceof(Uint8Array)
@@ -410,6 +410,139 @@ module.exports = (factory, options) => {
410410
expect(uint8ArrayToString(msg.data).startsWith(msgBase)).to.be.true()
411411
})
412412
})
413+
414+
it('should receive messages from a different node on lots of topics', async () => {
415+
// @ts-ignore this is mocha
416+
this.timeout(5 * 60 * 1000)
417+
418+
const numTopics = 20
419+
const topics = []
420+
const expectedStrings = []
421+
const msgStreams = []
422+
423+
for (let i = 0; i < numTopics; i++) {
424+
const topic = `pubsub-topic-${Math.random()}`
425+
topics.push(topic)
426+
427+
const msgStream1 = pushable()
428+
const msgStream2 = pushable()
429+
430+
msgStreams.push({
431+
msgStream1,
432+
msgStream2
433+
})
434+
435+
/** @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} */
436+
const sub1 = msg => {
437+
msgStream1.push(msg)
438+
msgStream1.end()
439+
}
440+
/** @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} */
441+
const sub2 = msg => {
442+
msgStream2.push(msg)
443+
msgStream2.end()
444+
}
445+
446+
await Promise.all([
447+
ipfs1.pubsub.subscribe(topic, sub1),
448+
ipfs2.pubsub.subscribe(topic, sub2)
449+
])
450+
451+
await waitForPeers(ipfs2, topic, [ipfs1Id.id], 30000)
452+
}
453+
454+
await delay(5000) // gossipsub needs this delay https://github.com/libp2p/go-libp2p-pubsub/issues/331
455+
456+
for (let i = 0; i < numTopics; i++) {
457+
const expectedString = `hello pubsub ${Math.random()}`
458+
expectedStrings.push(expectedString)
459+
460+
await ipfs2.pubsub.publish(topics[i], uint8ArrayFromString(expectedString))
461+
}
462+
463+
for (let i = 0; i < numTopics; i++) {
464+
const [sub1Msg] = await all(msgStreams[i].msgStream1)
465+
expect(uint8ArrayToString(sub1Msg.data)).to.equal(expectedStrings[i])
466+
expect(sub1Msg.from).to.eql(ipfs2Id.id)
467+
468+
const [sub2Msg] = await all(msgStreams[i].msgStream2)
469+
expect(uint8ArrayToString(sub2Msg.data)).to.equal(expectedStrings[i])
470+
expect(sub2Msg.from).to.eql(ipfs2Id.id)
471+
}
472+
})
473+
474+
it('should unsubscribe multiple handlers', async () => {
475+
// @ts-ignore this is mocha
476+
this.timeout(2 * 60 * 1000)
477+
478+
const topic = `topic-${Math.random()}`
479+
480+
const handler1 = sinon.stub()
481+
const handler2 = sinon.stub()
482+
483+
await Promise.all([
484+
ipfs1.pubsub.subscribe(topic, sinon.stub()),
485+
ipfs2.pubsub.subscribe(topic, handler1),
486+
ipfs2.pubsub.subscribe(topic, handler2)
487+
])
488+
489+
await waitForPeers(ipfs1, topic, [ipfs2Id.id], 30000)
490+
491+
expect(handler1).to.have.property('callCount', 0)
492+
expect(handler2).to.have.property('callCount', 0)
493+
494+
await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 1'))
495+
496+
await delay(1000)
497+
498+
expect(handler1).to.have.property('callCount', 1)
499+
expect(handler2).to.have.property('callCount', 1)
500+
501+
await ipfs2.pubsub.unsubscribe(topic)
502+
503+
await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 2'))
504+
505+
await delay(1000)
506+
507+
expect(handler1).to.have.property('callCount', 1)
508+
expect(handler2).to.have.property('callCount', 1)
509+
})
510+
511+
it('should unsubscribe individual handlers', async () => {
512+
// @ts-ignore this is mocha
513+
this.timeout(2 * 60 * 1000)
514+
515+
const topic = `topic-${Math.random()}`
516+
517+
const handler1 = sinon.stub()
518+
const handler2 = sinon.stub()
519+
520+
await Promise.all([
521+
ipfs1.pubsub.subscribe(topic, sinon.stub()),
522+
ipfs2.pubsub.subscribe(topic, handler1),
523+
ipfs2.pubsub.subscribe(topic, handler2)
524+
])
525+
526+
await waitForPeers(ipfs1, topic, [ipfs2Id.id], 30000)
527+
528+
expect(handler1).to.have.property('callCount', 0)
529+
expect(handler2).to.have.property('callCount', 0)
530+
531+
await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 1'))
532+
533+
await delay(1000)
534+
535+
expect(handler1).to.have.property('callCount', 1)
536+
expect(handler2).to.have.property('callCount', 1)
537+
538+
await ipfs2.pubsub.unsubscribe(topic, handler1)
539+
await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 2'))
540+
541+
await delay(1000)
542+
543+
expect(handler1).to.have.property('callCount', 1)
544+
expect(handler2).to.have.property('callCount', 2)
545+
})
413546
})
414547
})
415548
}

‎packages/ipfs-grpc-client/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
"it-pushable": "^1.4.2",
4545
"multiaddr": "^10.0.0",
4646
"multiformats": "^9.4.1",
47+
"p-defer": "^3.0.0",
4748
"protobufjs": "^6.10.2",
4849
"wherearewe": "1.0.0",
4950
"ws": "^7.3.1"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
'use strict'
2+
3+
const serverStreamToIterator = require('../../utils/server-stream-to-iterator')
4+
const withTimeoutOption = require('ipfs-core-utils/src/with-timeout-option')
5+
const subscriptions = require('./subscriptions')
6+
const defer = require('p-defer')
7+
8+
/**
9+
* @param {import('@improbable-eng/grpc-web').grpc} grpc
10+
* @param {*} service
11+
* @param {import('../../types').Options} opts
12+
*/
13+
module.exports = function grpcPubsubSubscribe (grpc, service, opts) {
14+
/**
15+
* @type {import('ipfs-core-types/src/pubsub').API["subscribe"]}
16+
*/
17+
async function pubsubSubscribe (topic, handler, options = {}) {
18+
const request = {
19+
topic
20+
}
21+
22+
const deferred = defer()
23+
24+
Promise.resolve().then(async () => {
25+
try {
26+
for await (const result of serverStreamToIterator(grpc, service, request, {
27+
host: opts.url,
28+
debug: Boolean(process.env.DEBUG),
29+
metadata: options,
30+
agent: opts.agent
31+
})) {
32+
if (result.handler) {
33+
const subs = subscriptions.get(topic) || new Map()
34+
subs.set(result.handler, handler)
35+
subscriptions.set(topic, subs)
36+
37+
deferred.resolve()
38+
} else {
39+
handler({
40+
from: result.from,
41+
seqno: result.seqno,
42+
data: result.data,
43+
topicIDs: result.topicIDs
44+
})
45+
}
46+
}
47+
} catch (err) {
48+
if (options && options.onError) {
49+
options.onError(err)
50+
}
51+
}
52+
})
53+
54+
await deferred.promise
55+
}
56+
57+
return withTimeoutOption(pubsubSubscribe)
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
'use strict'
2+
3+
/**
4+
* @typedef {import('ipfs-core-types/src/pubsub').MessageHandlerFn} Subscription
5+
*/
6+
7+
/** @type {Map<string, Map<string, Subscription>>} */
8+
const subs = new Map()
9+
10+
module.exports = subs
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
'use strict'
2+
3+
const withTimeoutOption = require('ipfs-core-utils/src/with-timeout-option')
4+
const toHeaders = require('../../utils/to-headers')
5+
const unaryToPromise = require('../../utils/unary-to-promise')
6+
const subscriptions = require('./subscriptions')
7+
8+
/**
9+
* @param {import('@improbable-eng/grpc-web').grpc} grpc
10+
* @param {*} service
11+
* @param {import('../../types').Options} opts
12+
*/
13+
module.exports = function grpcPubsubUnsubscribe (grpc, service, opts) {
14+
/**
15+
* @type {import('ipfs-core-types/src/pubsub').API["unsubscribe"]}
16+
*/
17+
async function pubsubUnsubscribe (topic, handler, options = {}) {
18+
const handlers = []
19+
const subs = subscriptions.get(topic)
20+
21+
if (!subs) {
22+
return
23+
}
24+
25+
if (handler) {
26+
for (const [key, value] of subs.entries()) {
27+
if (value === handler) {
28+
handlers.push(key)
29+
}
30+
}
31+
} else {
32+
33+
}
34+
35+
const request = {
36+
topic,
37+
handlers
38+
}
39+
40+
await unaryToPromise(grpc, service, request, {
41+
host: opts.url,
42+
metadata: toHeaders(options),
43+
agent: opts.agent
44+
})
45+
46+
for (const handlerId of handlers) {
47+
subs.delete(handlerId)
48+
}
49+
50+
if (!subs.size) {
51+
subscriptions.delete(topic)
52+
}
53+
}
54+
55+
return withTimeoutOption(pubsubUnsubscribe)
56+
}

‎packages/ipfs-grpc-client/src/index.js

+6
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ function create (opts = { url: '' }) {
4949
ls: require('./core-api/files/ls')(grpc, service.MFS.ls, options),
5050
// @ts-ignore - TODO: fix after https://github.com/ipfs/js-ipfs/issues/3594
5151
write: require('./core-api/files/write')(grpc, service.MFS.write, options)
52+
},
53+
pubsub: {
54+
// @ts-ignore - TODO: fix after https://github.com/ipfs/js-ipfs/issues/3594
55+
subscribe: require('./core-api/pubsub/subscribe')(grpc, service.PubSub.subscribe, options),
56+
// @ts-ignore - TODO: fix after https://github.com/ipfs/js-ipfs/issues/3594
57+
unsubscribe: require('./core-api/pubsub/unsubscribe')(grpc, service.PubSub.unsubscribe, options)
5258
}
5359
}
5460

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
syntax = "proto3";
2+
3+
import "common.proto";
4+
5+
package ipfs;
6+
7+
service PubSub {
8+
rpc subscribe (SubscribeRequest) returns (stream SubscribeResponse) {}
9+
rpc unsubscribe (UnSubscribeRequest) returns (UnSubscribeResponse) {}
10+
}
11+
12+
message SubscribeRequest {
13+
string topic = 1;
14+
}
15+
16+
message SubscribeResponse {
17+
string handler = 1;
18+
string from = 2;
19+
bytes seqno = 3;
20+
bytes data = 4;
21+
repeated string topicIDs = 5;
22+
}
23+
24+
message UnSubscribeRequest {
25+
string topic = 1;
26+
repeated string handlers = 2;
27+
}
28+
29+
message UnSubscribeResponse {
30+
31+
}

‎packages/ipfs-grpc-server/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
"it-pipe": "^1.1.0",
4343
"it-pushable": "^1.4.2",
4444
"multiaddr": "^10.0.0",
45+
"nanoid": "3.1.23",
4546
"protobufjs": "^6.10.2",
4647
"ws": "^7.3.1"
4748
},
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
'use strict'
2+
3+
const subscriptions = require('./subscriptions')
4+
const { nanoid } = require('nanoid')
5+
6+
/**
7+
* @param {import('ipfs-core-types').IPFS} ipfs
8+
* @param {import('../../types').Options} options
9+
*/
10+
module.exports = function grpcPubsubSubscribe (ipfs, options = {}) {
11+
/**
12+
* TODO: Fill out input/output types after https://github.com/ipfs/js-ipfs/issues/3594
13+
*
14+
* @type {import('../../types').ServerStreamingEndpoint<any, any, any>}
15+
*/
16+
async function pubsubSubscribe (request, sink, metadata) {
17+
const opts = {
18+
...metadata
19+
}
20+
21+
const handlerId = nanoid()
22+
const handler = {
23+
/** @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} */
24+
onMessage: (message) => {
25+
sink.push(message)
26+
},
27+
onUnsubscribe: () => {
28+
sink.end()
29+
}
30+
}
31+
32+
subscriptions.set(handlerId, handler)
33+
34+
sink.push({
35+
handler: handlerId
36+
})
37+
38+
await ipfs.pubsub.subscribe(request.topic, handler.onMessage, opts)
39+
}
40+
41+
return pubsubSubscribe
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
'use strict'
2+
3+
/**
4+
* @typedef {object} Subscription
5+
* @property {import('ipfs-core-types/src/pubsub').MessageHandlerFn} onMessage
6+
* @property {() => void} onUnsubscribe
7+
*/
8+
9+
/** @type {Map<string, Subscription>} */
10+
const subs = new Map()
11+
12+
module.exports = subs
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
'use strict'
2+
3+
const subscriptions = require('./subscriptions')
4+
const { callbackify } = require('util')
5+
6+
/**
7+
* @param {import('ipfs-core-types').IPFS} ipfs
8+
* @param {import('../../types').Options} options
9+
*/
10+
module.exports = function grpcPubsubUnsubscribe (ipfs, options = {}) {
11+
/**
12+
* TODO: Fill out input/output types after https://github.com/ipfs/js-ipfs/issues/3594
13+
*
14+
* @type {import('../../types').UnaryEndpoint<any, any, any>}
15+
*/
16+
async function pubsubUnsubscribe (request, metadata) {
17+
const opts = {
18+
...metadata
19+
}
20+
21+
if (!request.handlers || !request.handlers.length) {
22+
await ipfs.pubsub.unsubscribe(request.topic, undefined, opts)
23+
24+
return {}
25+
}
26+
27+
for (const handlerId of request.handlers) {
28+
const handler = subscriptions.get(handlerId)
29+
30+
if (!handler) {
31+
continue
32+
}
33+
34+
await ipfs.pubsub.unsubscribe(request.topic, handler.onMessage, opts)
35+
36+
handler.onUnsubscribe()
37+
subscriptions.delete(handlerId)
38+
}
39+
40+
return {}
41+
}
42+
43+
return callbackify(pubsubUnsubscribe)
44+
}

‎packages/ipfs-grpc-server/src/index.js

+8-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ const loadServices = require('./utils/load-services')
88

99
const {
1010
Root,
11-
MFS
11+
MFS,
12+
PubSub
1213
} = loadServices()
1314

1415
/**
@@ -31,6 +32,12 @@ module.exports = async function createServer (ipfs, options = {}) {
3132
// @ts-ignore - types differ because we only invoke via websockets - https://github.com/ipfs/js-ipfs/issues/3594
3233
write: require('./endpoints/mfs/write')(ipfs, options)
3334
})
35+
server.addService(PubSub, {
36+
// @ts-ignore - types differ because we only invoke via websockets - https://github.com/ipfs/js-ipfs/issues/3594
37+
subscribe: require('./endpoints/pubsub/subscribe')(ipfs, options),
38+
// @ts-ignore - types differ because we only invoke via websockets - https://github.com/ipfs/js-ipfs/issues/3594
39+
unsubscribe: require('./endpoints/pubsub/unsubscribe')(ipfs, options)
40+
})
3441

3542
const socket = options.socket || await webSocketServer(ipfs, options)
3643

‎packages/ipfs/test/interface-client.js

+5
Original file line numberDiff line numberDiff line change
@@ -146,4 +146,9 @@ describe('interface-ipfs-core ipfs-client tests', () => {
146146
}
147147
]
148148
})
149+
150+
tests.pubsub(factory({
151+
type: 'js',
152+
ipfsClientModule: require('ipfs-client')
153+
}))
149154
})

0 commit comments

Comments
 (0)
This repository has been archived.