Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reliable and/or Persistent Pubsub #42

Closed
keks opened this issue Oct 16, 2017 · 30 comments
Closed

Reliable and/or Persistent Pubsub #42

keks opened this issue Oct 16, 2017 · 30 comments
Assignees

Comments

@keks
Copy link
Contributor

keks commented Oct 16, 2017

Reliable Pubsub

Sometimes, messages get lost. To allow the receiver to recover lost messages, we could start systematically referencing previous messages when constructing them. That way, when the receiver receives the next message, it will find an unknown message reference and can ask its peers about it (or possibly fetch it like a regular ipfs/ipld object?)

The system I propose is the system is the system used at OrbitDB. Every peer maintains a list of messages that have not been referenced yet, DAG leafs so to speak. Once a new message is received, all message IDs referenced by the new message are removed from the list, and the new message's ID is added.
Once we author a message, we add a field "after": [...], containing the leaf list. Afterwards our leaf list contains only the ID of the message we just authored.

Persistent Pubsub

If we have something like the above, nodes already keep state about the "newest" messages. If we send these to peers sending us a subscription request for that topic, they can start fetching old messages. How many this would be depends on the application - in orbit it would suffice to fetch one screen worth of backlog, but in other application you may want to fetch the entire history.
This solves the problem described in #19.

Note: I want to make this optional. Whether or not this is used depends on the topic configuration.

@keks
Copy link
Contributor Author

keks commented Oct 16, 2017

Additional benefits: Some information on relative message timing (i.e. msg x was before msg y) can be derived (not for all messages of course).

@keks
Copy link
Contributor Author

keks commented Oct 16, 2017

So basically I want to optionally incorporate an orbit-style concurrent log into pubsub, but without orbit's rich query features.

@keks
Copy link
Contributor Author

keks commented Oct 26, 2017

@Stebalien What do you think about this? If you approve of this I'll go ahead and do it.

@keks
Copy link
Contributor Author

keks commented Nov 17, 2017

bump @Stebalien :)

@Stebalien
Copy link
Member

First of all, sorry for the massive delay. I keep putting this off to avoid switching all the pubsub context and switching out all secio/transport context but I should have gotten to this ages ago.

That way, when the receiver receives the next message, it will find an unknown message reference and can ask its peers about it (or possibly fetch it like a regular ipfs/ipld object?)

I took a stab at this in https://github.com/libp2p/go-floodsub/tree/feat/use-ipld but got stuck in the weeds trying to figure out how to integrate CRDTs. However, you may find some of the changes I made there useful/interesting (take a look at the commit messages, I tried to write informative ones).

Once a new message is received, all message IDs referenced by the new message are removed from the list, and the new message's ID is added.

That's actually a neat solution. I couldn't do this in my attempt because I wanted to make the messages free-form (any IPLD object you want) and allow the users to specify some form of combining function (takes multiple heads and spits out a new head to be forwarded to peers). Unfortunately, there was no way for me to know how much state pubsub needed to keep to do this (a CRDT update can traverse the entire state tree). However, building in exactly one way of doing this is much simpler and avoids questions like that.

One concern I have with this is missing/dropped messages. That is, I could have two messages, a and z and not know that there exists a path from a to z because I'm missing the intermediate nodes. To resolve that, I'd need to always walk all the way back through history (unless we implemented some form of snapshot function...).

Although, in that case, will we send a message m with after: [a, z]?

@keks
Copy link
Contributor Author

keks commented Nov 20, 2017

First of all, sorry for the massive delay.

Not a problem :)

However, you may find some of the changes I made there useful/interesting

I'll take a look, thanks!

take a look at the commit messages, I tried to write informative ones

That is an art I have yet to learn to master ;)

One concern I have with this is missing/dropped messages. That is, I could have two messages, a and z and not know that there exists a path from a to z because I'm missing the intermediate nodes. To resolve that, I'd need to always walk all the way back through history (unless we implemented some form of snapshot function...).

Hm, not sure what your concern is. Maybe I misunderstood the situation you described so I'll rephrase it with more detail and you can tell me whether that's what you meant.

We receive a message a, which has some message in "after". We know all of them. Then, at some point, we receive a message z, and it has after = ["m"] we don't know. At this point we also don't know yet that there exists a path a <- ... <- l <- m <- z (we only know messages up to l).
In this situation we would ask the peer we got z from about m, and not deliver z to the subscribers before we receive and deliver m.
If our peer can not send us m (either sends "msg not found" or times out), we could just drop z.
Assuming that we successfully retrieved m, we would first deliver m with after = ["l"] and then z with after = ["m"].
Does that clear things up?

For performance reasons we could use bloom-filters for each HEAD that tracks the ids of contained messages, but I think this is optimization and should be done later.

@keks
Copy link
Contributor Author

keks commented Nov 20, 2017

So yes, we would need to reverse-walk the path until we hit a known message, but that's what we want because we want to receive every single message.

Instead of only asking the peer about the single message we don't know yet, we could send them our HEADs and they send us everything newer than that. We could then walk this msgpack in a single go. That would probably reduce latency a lot when multiple messages have been dropped in a row.

@vyzo
Copy link
Collaborator

vyzo commented Nov 20, 2017

Messages only have a partial order, can't expect to order them in this way. So we need a mechanism that accounts for different paths and dropped messages.

@keks
Copy link
Contributor Author

keks commented Nov 20, 2017

@vyzo Of course message don't have total order, but that is not required. I only want to deliver messages is a valid ordering, not the valid ordering.

@vyzo
Copy link
Collaborator

vyzo commented Nov 20, 2017

I don't think we shuld try to order at all at the transport layer; peers can simply provide a manifest of the last X messages they've seen (for some time t).

@keks
Copy link
Contributor Author

keks commented Nov 20, 2017

  1. Why?
  2. That is not really reliable - it will silently drop messages if they are older than t.
  3. In the average case in a well-connected network the manifest will often be larger than the after array, since that only contains as many elements as it merges concurrent messages (which will often be 1), resulting in higher message size and more communication overhead

@keks
Copy link
Contributor Author

keks commented Nov 20, 2017

@vyzo could you give reasons for your preference?

@vyzo
Copy link
Collaborator

vyzo commented Nov 20, 2017

simplicity, simplicity, simplicity.

@keks
Copy link
Contributor Author

keks commented Nov 20, 2017

@vyzo
First off, I don't think my proposal is overly complex.

You only briefly mentioned a part of your preferred solution (the manifest), without specifying what role it plays in the final protocol/algorithm: Do you piggy-back it with every outgoing message? Or is it something your peers can request from you? When would they do so?
No matter which of the two you choose, they are both incorrect for some instances of the protocol. You can't call something reliable that silently drops messages, even if they are older than t.

Should we have a call about this? I feel like this mix of IRC and github issue with short answers is not really helping to solve the disagreement and actually talking could help.

@keks
Copy link
Contributor Author

keks commented Nov 20, 2017

vyzo agreed to a call on IRC

@Stebalien do you want to join the call? I think 2pm UTC or later would be good for me, is that okay for you both?

@vyzo
Copy link
Collaborator

vyzo commented Nov 20, 2017

that works for me too.

@keks
Copy link
Contributor Author

keks commented Nov 21, 2017

Just came of a video chat with @vyzo and we agreed I can move forward on this. I just want to piggy-back an array of message IDs and keep a small index of those, he was thinking I planned running a complex protocol to infer order. That never was my plan though.

He also asked me to not make assumptions that conflict with broadcast trees so that we can move to a more efficient algorithm in the future.

@Stebalien
Copy link
Member

@keks

Hm, not sure what your concern is. Maybe I misunderstood the situation you described so I'll rephrase it with more detail and you can tell me whether that's what you meant.
...

Ah. I assumed that the "after" links would only be used to provide order within pubsub itself. It would be up to the application to hunt down the parents when it needs reliability.

Specifically, I was considering the following problem:

a -> b -> c -> ... -> forgotten past -> z -> forgotten past ...

Basically, what happens if z is so far in the past that I don't recognize it as an old/duplicate message (we don't want to keep the entire history)? We'll get z and then start iterating backwards from z forever. Really, we'll have to explore all paths backwards until we find a link from z to a.

In this case, my proposed solution is to just keep z as a leaf (maybe try exploring a bit to find a relationship between a and z but don't spend too much time). That is, the next message we send out will have after: [a, z].

Personally, I'd rather not enforce reliability within pubsub. There are many reasonable pubsub applications (e.g., IPNS) that don't care about reliable delivery, they just want a (partial) order and validation. I assume that any application needing reliability can play catch-up itself (we could even provide an option in the IPFS API to do this for applications but that wouldn't actually be a part of pubsub).

@keks
Copy link
Contributor Author

keks commented Nov 22, 2017

@Stebalien

It would be up to the application to hunt down the parents when it needs reliability.

Applications already do this. The only ipfs pubsub client I've been working on is orbit, an append-only CRDT store, synced using ipfs pubsub. The biggest issue was that after subscribing a topic (as in, open a CRDT-store) you wouldn't know about any data until someone who has been there for longer than you posts something and you can reconstruct the history.
I believe append-only CRDT-stores are an important use-case of ipfs pubsub, so I'd like to allow them to do their job well. I don't want to force reliability on users who need high throughput but can take a few dropped messages. That's why I want to make this optional - you have to pass an Option argument when Subscribing to activate the code.

Basically, what happens if z is so far in the past that I don't recognize it as an old/duplicate message?

I guess we discard the message that references z because it references a message that can't be reconstructed. If I send you a message m with after = ["z"] I better have z and can send it to you if you don't have it. If I can't, the message is invalid. Metaphysically you could say that the context in which a message is sent is part of the message, so if the context is undefined, so is the message.

We could however help the sender of m getting their act together by telling them about our HEADs when we ask for z, so they can catch up on what has happened and then resend the message with a more recent after set.

we don't want to keep the entire history

In many use-cases (did I tell you about append-only CRDT stores yet? ;) keeping the entire history is actually very much an option. Peers could run garbarge collection on top of that, basically "squashing" all the CRDT entries into a reduced state that everyone agrees on, then starting with a fresh append only log of mutations.

There are many reasonable pubsub applications (e.g., IPNS) that don't care about reliable delivery, they just want a (partial) order and validation.

Totally. Let's have a few options that allow pubsub to work for all of us!

I assume that any application needing reliability can play catch-up itself

Yes, but they won't know the state of the topic when they join until someone sends something.

@vyzo
Copy link
Collaborator

vyzo commented Nov 22, 2017

we don't want to keep the entire history

In many use-cases (did I tell you about append-only CRDT stores yet? ;) keeping the entire history is actually very much an option. Peers could run garbarge collection on top of that, basically "squashing" all the CRDT entries into a reduced state that everyone agrees on, then starting with a fresh append only log of mutations.

For many use cases it's unreasonable (and useless) to keep the entire history.
And there are use cases that don't care at all about reliability or previous messages, let's not forget that.
So all this reliability stuff should be optional and very carefully thought out.

@keks
Copy link
Contributor Author

keks commented Nov 22, 2017

For many use cases it's unreasonable (and useless) to keep the entire history.

I agree

And there are use cases that don't care at all about reliability or previous messages, let's not forget that.

Totally!

So all this reliability stuff should be optional and very carefully thought out.

yup!

@zcstarr
Copy link

zcstarr commented Jan 24, 2018

Curious where this has landed? I'd like to help! My personal use case would be to target using persistent + reliable pubsub for off-chain state channels.

@Stebalien
Copy link
Member

I believe @keks is busy at the moment but I'm sure @vyzo would be happy to discuss it.

@vyzo
Copy link
Collaborator

vyzo commented Jan 24, 2018

The plan is to add gossip next, and we can build some forms of reliability and persistence with that.

@keks
Copy link
Contributor Author

keks commented Jan 24, 2018 via email

@vyzo
Copy link
Collaborator

vyzo commented Jan 24, 2018

@keks gossip should be implemented and work in meshsub will be well underway by then.

@jvsteiner
Copy link

The idea of using links to previous messages to enable history lookup is attractive. I just wanted to add that there may be cases where only having the ability to lookup the message that immediately precedes the current one may be inefficient or undesirable.

One example might be where there is a message sent on a regular basis - say every 10 seconds, and at some point I need to look up what was published three months ago, but I'm not interested in the intervening messages. There are a number of ways I could do this more efficiently - thinking about something like a skip list, here - but the point is I might like some ability to configure how the history lookup is structured.

I wouldn't argue for the example above to be included out of the box, and I realize it would always be possible implement the history structure in application code, I guess I think it really should be optional to use linear style historical linking, in case I don't need it, and possibly there's a clever way to make the strategy configurable.

@hsanjuan
Copy link
Contributor

hsanjuan commented Feb 22, 2019

By referencing previous messages you are basically creating a chain. If you want a verifiable chain you'll be basically creating a Merkle-DAG. In the end the problem comes down to [partial] DAG-syncing when receiving a pubsub message.

I have the feeling the proposal here is implement a new chain-syncing protocol very similar to what IPFS does already when you pin an item (or should be doing - bitswap/graphsync).

I don't quite see that this is a pubsub-specific problem though. I always understood pubsub as an efficient messaging layer. What seems to be missing is an efficient graphsync protocol that can provide blocks and retrieve blocks (DAGs) that applications built on top of pubsub can use. This is what IPFS actually does (even if bitswap is not ideal at the moment).

If pubsub would create blocks, store blocks, exchange blocks, apart from simply sending opaque messages around, doesn't it start looking a lot like a IPFS?

Merkle-CRDTs (OrbitDB, now Cluster) are using the same approach proposed here, each pubsub update points to a DAG which can be traversed to obtain any missed updates along with ordering. And that DAG is stored/provided by IPFS. I am not convinced that having pubsub do this internally could be a drop-in replacement, as it would remove freedom for each application to optimize and customize, for example, deciding which messages to drop and on what conditions, and how to construct the messages (and where to point from them, since they may not necessarily point to the previous message only). I personally think that it's great that pubsub just does pubsub: please try to get my blob to as many subscribed peers as possible by using a clever strategy to distribute the message.

All in all, given an ipld.DAGService you can abstract everything and get a "persistent pubsub". The question is what is behind that DAGService: IPFS? A new GraphSync protocol/service? How are ipld objects getting stored? with a BlockService? How about routing and discovery, are we limited to the know pubsub peers? In the end things come down to having something like IPFS that can discover, transfer, announce and store IPLD blocks. We just want it to do it faster...

@aschmahmann
Copy link
Contributor

@hsanjuan I agree that your solution will work as you've done it in https://github.com/ipfs/go-ds-crdt/.

However, by trying to layer on top of PubSub instead of interacting with the internals we end up excessive message passing. For example, imagine you have a line of nodes (1-2-3-4...-N) my understanding of the go-ds-crdt implementation is that if you are extremely unlucky you end up with N^2 messages every heartbeat (if all nodes are perfectly synchronized in time) and even if you're not unlucky you get aN messages (where a is some function of N).

Instead, if you can manipulate the gossip/repeating layer so that each node does a persistence rebroadcast on its own time and only propagates incoming new messages then even in the worst case you only get 2N messages for nodes that are already at the same state (e.g. all nodes broadcast simultaneously and do exactly one rebroadcast before they stop propagating messages).

If you look at the WIP PR at #171 you'll see a configuration layer for PubSub and a gossipsub configuration that allows for persistence in the way described above.

@aschmahmann
Copy link
Contributor

There is now a solution for adding a persistence layer on top of PubSub courtesy of the merged PR above into https://github.com/libp2p/go-libp2p-pubsub-router.

I'm closing this issue, but feel free to reopen the issue or start a new one if I've missed something.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants