-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Client does not receive pubsub after api node reboots #3465
Comments
Thank you for submitting your first issue to this repository! A maintainer will be here shortly to triage and review.
Finally, remember to use https://discuss.ipfs.io if you just need general support. |
I have no knowledge of history on this part of the code base, someone with more knowledge should chime. With that said, I did some investigation and as far as I can tell, it is combination of API design problem combined with lack of it's documentation. What follows is a breakdown
@valmack from provided example I can see no What follows is API design critique and a proposal to improve it:
I would like to propose API address those issues: interface PubSub {
/**
* Creates a subscribtion for the given topic. You can provide your own custom queuingStrategy
* if no queuingStrategy is supplied, the default used which just holds a single message in the
* queue. Subscription will error if internal quee overflows.
*/
subscribe <Topic extends string>(topic:Topic, options?:AbortOptions & QueuingStrategy):SubscriptionResult<Topic>
/**
* Returns all active subscribtions (excluding cancelled and not yet establed subscribtions, in other words `sub`
* from `sub = pubsub.subscribe(subject)` is not included in `pubsub.ls()` until `await sub` resolves).
*/
ls():Subscription<string>[]
/**
* Publish a data message to a pubsub topic.
*/
publish(topic:string, data:Uint8Array, options?:AbortOptions):Promise<void>
}
/**
* `SubscriptionResult` represents a `Subscription` that may fail (e.g. if http-client can't connect
* to the API endpoint). For convenince it implemets `Subscription` intreface so user can start
* consuming messages without having to await for `pubsub.subscribe`:
*
* @example
* ```js
* try {
* for await (const msg of pubsub.subscribe(topic)) {
* }
* } catch(error) {
* // handle failure
* }
* ```
*
* However in some cases it is desired to tell if subscribtion was succesful without starting to consume it's messages. For
* this reason `SubscriptionResult` also implemets promise interface:
* @example
* ```js
* const subscribtion = await pubsub.subscribe(topic)
* ```
*/
interface SubscriptionResult<Topic> extends Subscription<Topic> {
then(succeed:(messages:Subscription<Topic>) => void, fail:(error:Error) => void): Promise<void>
catch(fail:(error:Error) => void): Promise<void>
finally <T>(then:() => void):Promise<void>
}
interface Subscription<Topic> {
/**
* Topic of this subscribtion, useful for bookeeping.
*/
readonly topic: Topic
/**
* Whether or not subscribtion is locked by the reader (as one reader is allowed at a time)
*/
readonly locked: boolean
/**
* Provides async iteration interface (via `for await`) over the messages of this subscribtion.
* Obtains an exclusive lock preventing more then one consumer at a time.
* Cancels subscribtion when async iteration is complete (e.g. via breaking a for await loop
* or calling `return()` on the returned `SubscriptionIterator`).
*/
[Symbol.asyncIterator](): SubscriptionIterator<Topic>
/**
* Just like method above except it can be passed `{ preventCancel: true }`
* in order to iterate over messages without cancelling this subscribtion.
*/
values(options?:ReadableStreamIteratorOptions): SubscriptionIterator<Topic>
/**
* Just convenince function to cancel subscribtion.
*/
cancel():void
}
interface SubscriptionIterator<Topic> {
/**
* Topic of this subscribtion.
*/
readonly topic: Topic
/**
* Reads next message from the subscribtion.
*/
next():Promise<IteratorResult<Message, void>>
/**
* Cancelles subscribtion unless `preventCancel` was used. Either way cancells itreation meaning
* calling `next()` will return `{done: true}`.
*/
return():Promise<IteratorReturnResult<void>>
/**
* Provides [Symbol.asyncIterator]()
*/
[Symbol.asyncIterator](): this
}
interface Message {
from: string
topicIDs: string[]
data: Uint8Array
seqno: Uint8Array
}
// Same as in ReadableStream's
// See: https://streams.spec.whatwg.org/
interface ReadableStreamIteratorOptions {
preventCancel: boolean
}
interface AbortOptions {
timeout?: number
signal?: AbortSignal
} Such API would avoid all the above pitfalls:
Here are couple more examples illustrating various points // Example which consumes message for the pubsub topic recovering from errors by
// resubscribing
const forEach = async (pubsub:PubSub, topic:string, fn:(message:Message) => void) => {
try {
for await (const message of pubsub.subscribe("hello")) {
fn(message)
}
} finally {
// Restart subscsribtion if it finished for whatever reason. You should probably use
// something like exponential beckoff strategy here instead of just doing it right away.
forEach(pubsub, topic, fn)
}
}
// Example that drops `n` messages from the subscribtion
const dropN = async (pubsub:PubSub, topic:string, n:number) => {
const subscribtion = pubsub.subscribe(topic)
// Use preventCancel to prevent subscribtion from been cancelled
for await (const _ of subscribtion.values({ preventCancel: true })) {
if (--n == 0) {
break
}
}
// return subscribtion with rest of the messages
return subscribtion
}
// Exmaple which creates multiple subscribtion but only if all are succesful and
// without consuming ony of the messages.
const subs = async function * (pubsub:PubSub, topics:string[]) {
const subscribtions = []
try {
for (const topic of topics) {
// use await here to wait for subscsribtion to be succesful
subscribtions.push(await pubsub.subscribe(topic))
}
} catch (error) {
// If one of the subscribtions failed cancel ones that were succesful
for (const subscribtion of subscribtions) {
subscribtion.cancel()
}
throw error
}
return subscribtions
} |
Just confirmed that and what I see after publishing two messages and then stopping deamon |
@Gozala thank you for your investigation into this! Can you tell me what package and version you are using for the ipfs client in your test? I am still having the problem with my code below and const ipfsClient = require("ipfs-http-client")
const topic = process.env.TOPIC || '/ceramic/dev-unstable'
const ipfsApiUrl = process.env.IPFS_API_URL || 'http://localhost:5002'
const run = async () => {
const ipfs = ipfsClient({ url: ipfsApiUrl })
const receiveMsg = (msg) => { console.log('receiving', msg) }
const onError = (error) => { console.error('caught error', error) }
await ipfs.pubsub.subscribe(topic, receiveMsg, { onError })
.then(() => {
console.log('subscribe success')
})
.catch((err) => {
console.error('subscribe failure', err.message)
})
setInterval(async () => {
await ipfs.pubsub.publish(
topic,
'val was here',
{ timeout: 500 })
}, 5000);
}
run() |
@Gozala I'm unable to replicate the behavior that you are describing with the in nodejs:
|
@oed @valmack that linked example in observablehq loads http client from https://unpkg.com/ipfs-http-client@48.1.0/dist/index.min.js and I was running go-ipfs@0.6.0 daemon.
I could also see in the network panel when the connection is dropped.
js-ipfs/packages/ipfs-http-client/src/pubsub/subscribe.js Lines 53 to 56 in 7b48f14
js-ipfs/packages/ipfs-http-client/src/pubsub/subscribe.js Lines 83 to 93 in 7b48f14
Are you running client in nodejs ? I have not tried that yet & there is a chance that node-fetch has bug and unlike browser it does not produce an error (I’ll check, both Firefox & Chrome reported errors as expected). I also have not tried with js-ipfs daemon and there is a chance it behaves different there, so I’ll check that as well. How are you two going about restarting a ( |
@Gozala Coincidentally I just found the I am running client in Node.js and have used jsipfs deamon cli and from dockerized node app. |
I meant to say (A) in my previous comment as well. Working on code sandbox example for this. |
Looks like http-client in node, unlike in browsers, is not triggering I suspect an issue in node-fetch, I’ll investigate this further & report updates here. My guess is it’s treated as clean connection close that calls
Which just causes an unsubscribe
|
Created a reproducible test case for this issues, which as expected fails on node but passes in browsers. Expect fix in that pull #3468 |
As it was expected problem is coming from node-fetch, which does not seem to check if server fully transmitted a message before a connection was terminated However upstream node-fetch (we seem to be on 2.6.1) seems to have switched to stream pipeline API, which I do not believe would handle this either, but all this makes fixing this quite a bit more challenging. That said we could possibly avoid all this by either:
That is not to say let's ignore the node-fetch problem, on the contrary we should try to get it fixed, but it seems reasonable to pursue fixing issue here without blocking all node-fetch fix, that is because even with node-fetch fixed connection still may be closed on the server cleanly (I would actually argue that if node is stopped as opposed to crash it should probably close connections gracefully) in which case |
Reported issue to node-fetch node-fetch/node-fetch#1055 with reproducible test case attached node-fetch/node-fetch#1056 It also appears that node-fetch internally does some additional checks when In http module of ipfs-utils when end of the response.body is reached we could check if |
@vasco-santos do you mind sharing your feedback regarding proposed API #3465 (comment) for addressing usability issues. The reason I am asking is, having started with implementation, I keep feeling that maybe all this should go into libp2p instead and just be exposed directly rather than just wrapping current API with proposed one. |
@Gozala Just read through your proposal, thanks for pinging me. I think that this approach makes sense taking into account all the pitfalls when forwarding libp2p pubsub messages from the node to the IPFS client. The reason for this is that in the IPFS client an HTTP Req happens to subscribe and the client will need to wait for messages to be received under a specific form, which is not valid as typical events. However, this approach takes some assumptions that will make more complex other pubsub simple use cases. Having an API where subscribe returns an
Subscriptions should be long lived and valid until the unsubscribe(topic) is called. This is also a reason that makes me feel that the usability of blocking a code flow in a The errors concerns are a good point. From the libp2p side of things, errors can occur when sending messages with the subscriptions and on receiving/parsing received messages. While the first might be worth throwing an error (specially if we cannot send a subscription message to any peer -- we currently don't do this!), the second should just be catched and logged for possible inspection. We will not throw on errors when handling messages received. |
Could you please ellaborate on this a bit I'm not sure what you mean by non valid typical events or what are those simple use cases.
That is still possible it's just an actor doing subscription will have to pass "subscription" object to an actor doing listening. Which as I understand is other way round now, where listener needs to pass a handler that then subscriber will use to subscribe with, but still separation of concerns is there.
It is true that it is different from
Additional benefit is that once all readers are done, writer end gets aborted and do necessary cleanup.
They might indeed, but I do not think that is inherently bad, as that forces error handling.
Honestly that makes me think something along these lines of the example below might be even better, because there is nothing worth than errors that you can not act upon try {
for await (const message of libp2p.pubsub.subscribe(topic)) {
try {
const data = message.decode()
// use data
} catch(parseError) {
console.error(parseError)
}
}
} catch(networkError) {
// either recover or loose susbscribtion
} P.S: I mentioned readable and writable streams, not implying using them but rather to convey idea of channel / pipe where one end reads what the other writes into. Which is conceptually not that different from event emitters except they decouple reader and writer ends and tend to encourage channel per "event type" as opposed to multiplex channels, which is also possible. |
What I mean is that you can do something as follows: cons topic = 'example-topic'
await libp2p.pubsub.subscribe(topic)
// In any other places in the codebase with the libp2p reference you can simply do multiple times:
libp2p.pubsub.on(topic, (msg) => {}) You can add event listeners for the same topic in multiple places of the codebase and they can be setup even before the subscribe happens (I know, this might also be problematic if the subscribe does not happen and create false expectations). I see quite often in dapps using pubsub approaches where there are multiple topic handlers for the same subscription.
Yes of course this can be done in the user layer. The biggest concern I have with this change is the big breaking change that will happen in projects using libp2p pubsub that might need to completely change their component orchestrating, in order to move receive pubsub messages around. I can see this as a daunting task that might get projects to delay updating libp2p. In my opinion, this is an easy to use and flexible API compared to returning an async iterator that will require users to propagate data around their codebase. I can see advantages in returning async iterators as you suggest.
This is a + compared to requiring users to remove event handlers when they unsubscribe/stop.
I totally agree with this in the context of the IPFS client. There might be network errors when dealing with connections between the IPFS client and an IPFS daemon. However, what types of errors might exist from libp2p sending subscribe messages to all connected peers? For instance, if a given libp2p node is not connected to any other node running pubsub and a Also FYI, taking into account this change in the context of the client, we currently have the API proposed here in the libp2p-daemon-client. This might have network errors related to the Overall, I am not totally against returning an Async Iterator. Probably I would prefer that approach if we were creating this now. However, taking into consideration the pros and cons including the migration that will need to happen, I think we should not do this now. We can keep it in mind as an improvement. |
That is a fare point. I think there are couple of options available however:
What about actual platform errors, e.g. dropped connections with inability to reopen them. While it sounds fairly unlikely and argument could be made to just try creating connections in the future, it also might be warranted to produce a subscription error allowing user space code to act (e.g. suggest user to troubleshoot network connectivity). On the flip side we can also make HTTP client, try to reconnect instead of failing, in fact me and @hugomrdias were considering to do just that. However I can shake a feeling that it's best to let user space code act upon errors even if they just retry subscription.
Makes sense, thanks for providing insightful feedback. |
Let's try this with IPFS first then. I think we can have the backwards compatibility adapter as you suggest later on.
That should probably be part of the subsystem itself (combined with the connection Manager), not a subscription. Managing the open connections of a subsystem, deciding if proactively we should try to connect to more pubsub peers, trim connections, retry connections should not affect the API consumption. We should have a more clear picture of how to handle possible errors on this regards once I start to work on the connection manager overhaul that will look into this specificities.
I think that we should also do that, but we should also change the pubsub API in this context, even if the client iself can recover. |
Pubsub is a weird bit of the API, in the long term I'd like to remove it from the top-level I like @Gozala's suggestion, this seems natural to me: try {
for await (const message of ipfs.pubsub.subscribe('a-topic')) {
// ...
}
} catch (err) {
// ...
} As an addition, I like the idea of breaking out of the loop to unsubscribe if the subscription is an async generator: const subscription = {
[Symbol.asyncIterator]: () => subscription,
async next () {
//...
},
return () {
// do unsubscribe here
}
}
for await (const message of subscription) {
break // or throw or what have you
}
// no longer subscribed Though we'd have to think about what it means to be unsubscribed if there are multiple subscribers within an app context. I have questions about this: const subscription = await ipfs.pubsub.subscribe('a-topic')
for await (const message of subscription) {
// ...
} What happens when messages arrive between the subscribe and the This could be exacerbated by the user doing async work between the
I think we can handle this in an efficient way internally as @Gozala suggests.
How do you define 'complete' here? Topics are global within your peers or overlay, what's to stop another peer sending messages? Or do you just mean it gets invoked when That said, the issue here is that the
I don't think we should do this, at least not by default - there are no guarantees reconnecting will be possible, there are loads of different strategies to use etc, I think it'd be better if we fail fast and leave what to do next up to the user. To move this forward, I think we should:
|
Basically it means you will no longer get messages on this subscription handler, it could be due to unsubscribe or due to some error. Either way it's a signal that if you want to get more messages on this topic with this handler you've got to resubscribe. |
I think communicating sequential processes (CSP) is a perfect solution here. Essentially you want to put a some queue between producer/writer and consumer/reader so they can act on their own schedule. In other words incoming messages get queued until buffer is filled and then subscription could be terminated. This would allow consumer to either:
|
Any update here @Gozala? Looks like the PR with a fix was merged, although I'm not sure if it has been released yet: node-fetch/node-fetch#1064 |
We should try to figure out what the right approach here is. Seems like the fix to node-fetch won't be backported to What are the options we have here @Gozala? This issue is causing a bunch of trouble for us on our infrastructure. |
@oed I apologize for not getting around to finishing this up, there had been a reorg and I find myself in a position where I have almost no time to work on js-ipfs. In terms of what’s remaining this PR #3468 just needs to be updated and merged. I will do it as soon as I’ll get a chance. @BigLep can we put this on the project board to make sure this gets enough attention. |
So #3468 can be fixed without the fix in node-fetch? That's great if true! 🙏 |
It is kind of workaround, but I think it is reasonable one, it will make sure that onError fires when subscription will no longer produce any output. |
From @Gozala on discord: So it sounds like getting node-fetch/node-fetch#1064 backported to node-fetch v2 might be our only hope here? |
…ed (#3468) This change fixes #3465 by upgrading to a temporary fork of node-fetch with node-fetch/node-fetch#1172 applied. Co-authored-by: achingbrain <alex@achingbrain.net>
…ed (#3468) This change fixes #3465 by upgrading to a temporary fork of node-fetch with node-fetch/node-fetch#1172 applied. Co-authored-by: achingbrain <alex@achingbrain.net>
For the Client
For the API
Darwin MacBook-Pro.local 19.6.0 Darwin Kernel Version 19.6.0: Mon Aug 31 22:12:52 PDT 2020; root:xnu-6153.141.2~1/RELEASE_X86_64 x86_64
pubsub
Severity: Medium
Description:
I'm running an IPFS client
C
in docker connected to an IPFS API nodeA
also in docker. I'm also running an additional node on my local machineL
.C
receives the api url ofA
and subscribes totopic
. When I send messages to this topic fromL
I can see them coming through my handler onC
.If
A
stops running,C
no longer receives messages from the topic.When
A
restarts,C
still does not receive messages from the topic--I must resubscribe totopic
fromC
in order to start receiving messages again.It seems the handler to
ipfs.pubsub.subscribe
somehow gets broken.Steps to reproduce the error:
1. Start jsipfs daemon
2. Subscribe to pubsub topic from client
3. Stop the jsipfs daemon
You will see errors from the client script but keep it running..
4. Start the jsipfs daemon again
Errors from the client script stop, it keeps sending messages to the API (you should see debug logs with 200 responses)
But it stops receiving messages (from itself and from others--you can test this by running another node that sends messages to the topic)
Also
onError
mentioned below never gets triggered hereThe text was updated successfully, but these errors were encountered: