diff --git a/packages/interface-ipfs-core/src/name-pubsub/pubsub.js b/packages/interface-ipfs-core/src/name-pubsub/pubsub.js index 688be1cf0b..77c2dd4cc8 100644 --- a/packages/interface-ipfs-core/src/name-pubsub/pubsub.js +++ b/packages/interface-ipfs-core/src/name-pubsub/pubsub.js @@ -88,6 +88,14 @@ export function testPubsub (factory, options) { const topic = `${namespace}${uint8ArrayToString(routingKey, 'base64url')}` await nodeB.pubsub.subscribe(topic, () => {}) + + // wait for nodeA to see nodeB's subscription + await waitFor(async () => { + const peers = await nodeA.pubsub.peers(topic) + + return peers.map(p => p.toString()).includes(idB.id.toString()) + }) + await nodeA.name.publish(ipfsRef, { resolve: false }) await delay(1000) // guarantee record is written @@ -149,6 +157,14 @@ export function testPubsub (factory, options) { const topic = `${namespace}${uint8ArrayToString(routingKey, 'base64url')}` await nodeB.pubsub.subscribe(topic, checkMessage) + + // wait for nodeA to see nodeB's subscription + await waitFor(async () => { + const peers = await nodeA.pubsub.peers(topic) + + return peers.map(p => p.toString()).includes(idB.id.toString()) + }) + await nodeA.name.publish(ipfsRef, { resolve: false, key: testAccountName }) await waitFor(alreadySubscribed) diff --git a/packages/interface-ipfs-core/src/pubsub/publish.js b/packages/interface-ipfs-core/src/pubsub/publish.js index 3986adac4b..7af4b1c0ba 100644 --- a/packages/interface-ipfs-core/src/pubsub/publish.js +++ b/packages/interface-ipfs-core/src/pubsub/publish.js @@ -5,11 +5,30 @@ import { nanoid } from 'nanoid' import { getTopic } from './utils.js' import { expect } from 'aegir/chai' import { getDescribe, getIt } from '../utils/mocha.js' +import pWaitFor from 'p-wait-for' /** * @typedef {import('ipfsd-ctl').Factory} Factory + * @typedef {import('ipfs-core-types').IPFS} IPFS */ +/** + * @param {string} topic + * @param {IPFS} ipfs + * @param {IPFS} remote + */ +async function waitForRemoteToBeSubscribed (topic, ipfs, remote) { + await remote.pubsub.subscribe(topic, () => {}) + const remoteId = await remote.id() + + // wait for remote to be subscribed to topic + await pWaitFor(async () => { + const peers = await ipfs.pubsub.peers(topic) + + return peers.map(p => p.toString()).includes(remoteId.id.toString()) + }) +} + /** * @param {Factory} factory * @param {object} options @@ -21,23 +40,37 @@ export function testPublish (factory, options) { describe('.pubsub.publish', function () { this.timeout(80 * 1000) - /** @type {import('ipfs-core-types').IPFS} */ + /** @type {IPFS} */ let ipfs + /** @type {IPFS} */ + let remote + before(async () => { ipfs = (await factory.spawn()).api + remote = (await factory.spawn()).api + + // ensure we have peers to allow publishing + const remoteId = await remote.id() + await ipfs.swarm.connect(remoteId.addresses[0]) }) after(() => factory.clean()) it('should fail with undefined msg', async () => { const topic = getTopic() + + await waitForRemoteToBeSubscribed(topic, ipfs, remote) + // @ts-expect-error invalid parameter await expect(ipfs.pubsub.publish(topic)).to.eventually.be.rejected() }) - it('should publish message from buffer', () => { + it('should publish message from buffer', async () => { const topic = getTopic() + + await waitForRemoteToBeSubscribed(topic, ipfs, remote) + return ipfs.pubsub.publish(topic, uint8ArrayFromString(nanoid())) }) @@ -45,6 +78,8 @@ export function testPublish (factory, options) { const count = 10 const topic = getTopic() + await waitForRemoteToBeSubscribed(topic, ipfs, remote) + for (let i = 0; i < count; i++) { await ipfs.pubsub.publish(topic, uint8ArrayFromString(nanoid())) } diff --git a/packages/ipfs-core-config/src/libp2p-pubsub-routers.browser.js b/packages/ipfs-core-config/src/libp2p-pubsub-routers.browser.js index a5d6dba740..99fce4374b 100644 --- a/packages/ipfs-core-config/src/libp2p-pubsub-routers.browser.js +++ b/packages/ipfs-core-config/src/libp2p-pubsub-routers.browser.js @@ -5,7 +5,6 @@ import { gossipsub } from '@chainsafe/libp2p-gossipsub' /** @type {() => Record PubSub>}>} */ export const routers = () => ({ gossipsub: gossipsub({ - allowPublishToZeroPeers: true, fallbackToFloodsub: true, emitSelf: true, maxInboundStreams: 64, diff --git a/packages/ipfs-core-config/src/libp2p-pubsub-routers.js b/packages/ipfs-core-config/src/libp2p-pubsub-routers.js index 06154d1c35..c67c71f99d 100644 --- a/packages/ipfs-core-config/src/libp2p-pubsub-routers.js +++ b/packages/ipfs-core-config/src/libp2p-pubsub-routers.js @@ -6,7 +6,6 @@ import { floodsub } from '@libp2p/floodsub' /** @type {() => Record PubSub>}>} */ export const routers = () => ({ gossipsub: gossipsub({ - allowPublishToZeroPeers: true, fallbackToFloodsub: true, emitSelf: true, maxInboundStreams: 64,