Skip to content
This repository has been archived by the owner on Apr 14, 2022. It is now read-only.

015-storage-messaging #16

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions 015-storage-messaging.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# Storage messaging

Created on 19.01.22

That it is an alternative to (014-safekeeper-gossip)[]

## Motivation

As in 014-safekeeper-gossip we need to solve the following problems:

* Trim WAL on safekeepers
* Decide on which SK should push WAL to the S3
* Decide on which SK should forward WAL to the pageserver
* Decide on when to shut down SK<->pageserver connection

This RFC suggests a more generic and hopefully more manageable way to address those problems. However, unlike 014-safekeeper-gossip, it does not bring us any closer to safekeeper-to-safekeeper recovery but rather unties two sets of different issues we previously wanted to solve with gossip.

Also, with this approach, we would not need "call me maybe" anymore, and the pageserver will have all the data required to understand that it needs to reconnect to another safekeeper.

## Summary

Instead of p2p gossip, let's have a centralized broker where all the storage nodes report per-timeline state. Each storage node should have a `--broker-url=1.2.3.4` CLI param.

Here I propose two ways to do that. After a lot of arguing with myself, I'm leaning towards the etcd approach. My arguments for it are in the pros/cons section. Both options require adding a Grpc client in our codebase either directly or as an etcd dependency.

## Non-goals

That RFC does *not* suggest moving the compute to pageserver and compute to safekeeper mappings out of the console. The console is still the only place in the cluster responsible for the persistency of that info. So I'm implying that each pageserver and safekeeper exactly knows what timelines he serves, as it currently is. We need some mechanism for a new pageserver to discover mapping info, but that is out of the scope of this RFC.

## Impacted components

pageserver, safekeeper
adds either etcd or console as a storage dependency

## Possible implementation: custom message broker in the console

We've decided to go with an etcd approach instead of the message broker.

<details closed>
<summary>Original suggestion</summary>
<br>
We can add a Grpc service in the console that acts as a message broker since the console knows the addresses of all the components. The broker can ignore the payload and only redirect messages. So, for example, each safekeeper may send a message to the peering safekeepers or to the pageserver responsible for a given timeline.

Message format could be `{sender, destination, payload}`.

The destination is either:
1. `sk_#{tenant}_#{timeline}` -- to be broadcasted on all safekeepers, responsible for that timeline, or
2. `pserver_#{tenant}_#{timeline}` -- to be broadcasted on all pageservers, responsible for that timeline

Sender is either:
1. `sk_#{sk_id}`, or
2. `pserver_#{pserver_id}`

I can think of the following behavior to address our original problems:

* WAL trimming
Each safekeeper periodically broadcasts `(write_lsn, commit_lsn)` to all peering (peering == responsible for that timeline) safekeepers

* Decide on which SK should push WAL to the S3

Each safekeeper periodically broadcasts `i_am_alive_#{current_timestamp}` message to all peering safekeepers. That way, safekeepers may maintain the vector of alive peers (loose one, with false negatives). Alive safekeeper with the minimal id pushes data to S3.

* Decide on which SK should forward WAL to the pageserver

Each safekeeper periodically sends (write_lsn, commit_lsn, compute_connected) to the relevant pageservers. With that info, pageserver can maintain a view of the safekeepers state, connect to a random one, and detect the moments (e.g., one the safekeepers is not making progress or down) when it needs to reconnect to another safekeeper. Pageserver should resolve exact IP addresses through the console, e.g., exchange `#sk_#{sk_id}` to `4.5.6.7:6400`.

Pageserver connection to the safekeeper triggered by the state change `compute_connected: false -> true`. With that, we don't need "call me maybe" anymore.

Also, we don't have a "peer address amnesia" problem as in the gossip approach (with gossip, after a simultaneous reboot, safekeepers wouldn't know each other addresses until the next compute connection).

* Decide on when to shutdown sk<->pageserver connection

Again, pageserver would have all the info to understand when to shut down the safekeeper connection.

### Scalability

One node is enough (c) No, seriously, it is enough.

### High Availability

Broker lives in the console, so we can rely on k8s maintaining the console app alive.

If the console is down, we won't trim WAL and reconnect the pageserver to another safekeeper. But, at the same, if the console is down, we already can't accept new compute connections and start stopped computes, so we are making things a bit worse, but not dramatically.

### Interactions

```
.________________.
sk_1 <-> | | <-> pserver_1
... | Console broker | ...
sk_n <-> |________________| <-> pserver_m
```
</details>


## Implementation: etcd state store

Alternatively, we can set up `etcd` and maintain the following data structure in it:

```ruby
"compute_#{tenant}_#{timeline}" => {
safekeepers => {
"sk_#{sk_id}" => {
write_lsn: "0/AEDF130",
commit_lsn: "0/AEDF100",
compute_connected: true,
last_updated: 1642621138,
},
}
}
```

As etcd doesn't support field updates in the nested objects that translates to the following set of keys:

```ruby
"compute_#{tenant}_#{timeline}/safekeepers/sk_#{sk_id}/write_lsn",
"compute_#{tenant}_#{timeline}/safekeepers/sk_#{sk_id}/commit_lsn",
...
```

Each storage node can subscribe to the relevant sets of keys and maintain a local view of that structure. So in terms of the data flow, everything is the same as in the previous approach. Still, we can avoid implementing the message broker and prevent runtime storage dependency on a console.

### Safekeeper address discovery

During the startup safekeeper should publish the address he is listening on as the part of `{"sk_#{sk_id}" => ip_address}`. Then the pageserver can resolve `sk_#{sk_id}` to the actual address. This way it would work both locally and in the cloud setup. Safekeeper should have `--advertised-address` CLI option so that we can listen on e.g. 0.0.0.0 but advertize something more useful.

### Safekeeper behavior

For each timeline safekeeper periodically broadcasts `compute_#{tenant}_#{timeline}/safekeepers/sk_#{sk_id}/*` fields. It subscribes to changes of `compute_#{tenant}_#{timeline}` -- that way safekeeper will have an information about peering safekeepers.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not specific to safekeeper, but what is the period? And how the reader can identify the data as stale or its advertiser is down? There could be some threshold value, e.g no updates in 2 * update_period. Do we need to guard against possible clock skew?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the period?

Period would delay WAL trim / S3 pushdown switchover / pageserver reconnection time (only for stale connections, though; usually pageserver will receive a tcp error). I think 1 second is fine.

And how the reader can identify the data as stale or its advertiser is down?

Safekeeprs only care about median(lsn) among the group, so they can avoid that.

Pageserver needs to check that the safekeeper it is connected to is not extremly behind the peering safekeepers. We can try to do that based on the amount of LSN lag between our safekeeper and rest of them, but that approach can lead to a livelock. Alternatively, we can advertise the time of the last heartbeat from the compute in each safekeeper (right now heartbeats are sent each second) and have quite a big timeout to force a re-connection in the pageserver (let's say 30 sec) to defend against the time skew. Which should be fine since that is a last resort measure -- with most of usual the problems on the safekeeper (crash, restart, oom, etc) we should be able to detect disconnect on the pageserver and that 30 sec timeout would happen only in rare connectivity / freeze scenarios. IIRC protocol level heartbeats between safekeeper and pageserver are not enabled right now -- we can enable them also and have a smaller disconnect timeouts there (since we may just measure delay without looking at the exact value of the timestamp).

Copy link

@LizardWizzard LizardWizzard Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

median(lsn)

MIN for commit and MAX to calculate the lag, right?

to defend against the time skew

I think there is one more option, we can defend even from a connection thread on the safekeeper being stuck, not only the clock skew. We can compare last heartbeat time not with the pageserver current system time, but with values advertised by other safekeepers. In that case safekeeper should forward system time from compute, avoiding generation of it's own timestamp. Then all the compared values are from the same time source. Exception to that is only when there is a different compute, which shouldn't be a problem and even if it is we can add some compute id, to avoid comparison of timestamps from different computes. What do you think?

I've tried to make some sketches, this is the version with livelock I used to imagine different scenarios: (click to open in full screen)

mermaid-diagram-20220126193556

I can fix it to use timestamp comparison and if you have some failure scenarios in mind I can add them too. I think it is a good idea to add strict sequence of actions for different kinds of failures.

The other possible diagram might describe concurrent checkpoint handling by pageservers. I'm still thinking what type of coordination we need here. Can we even avoid it somehow?

For that case pageserver which is currently streaming to s3 can advertise the flag that it is streaming, latest checkpoint lsn and a timestamp, this timestamp may be the max value of compute heartbeat timestamp it has seen. This way the other pageserver can detect the difference between the current max timestamp and the advertised one, and take over the streaming role. Thoughts?

Copy link
Contributor

@arssher arssher Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt we need to fight with clock skews here. How about the following straw man for choosing the safekeeper to fetch WAL from:

  1. If there is no live connection to safekeeper, choose the one with max commit_lsn.
  2. Otherwise, switch to safekeeper with higher commit_lsn iff either 1) timeout (tracked locally on pageserver) passed with no data coming through current connection (prevents livelock) 2) diff between commit_lsn of our safekeeper and max commit_lsn exceeded some threshold -- we are talking with deeply lagging safekeeper.

This procedure can be run regularly + on interesting events, like lost connection to safekeeper or got new data from the broker.

Generally tracking of live servers without bothering about time skew can be done by pushing keys to etcd with expiration period, but not sure where we need that currently.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worthwhile to avoid clock skew early on during initial system design.

I've modified the diagram to match the picture we've agreed upon: (click to open full screen)

image

Let me know if I missed something or you spot any errors.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my understanding of tenant migration process with metadata service: (click to open full screen)

image

That amount of information is enough to properly trim WAL. To decide on who is pushing the data to S3 safekeeper may use etcd leases or broadcast a timestamp and hence track who is alive.

### Pageserver behavior

Pageserver subscribes to `compute_#{tenant}_#{timeline}` for each tenant it owns. With that info, pageserver can maintain a view of the safekeepers state, connect to a random one, and detect the moments (e.g., one the safekeepers is not making progress or down) when it needs to reconnect to another safekeeper. Pageserver should resolve exact IP addresses through the console, e.g., exchange `#sk_#{sk_id}` to `4.5.6.7:6400`.

Pageserver connection to the safekeeper can be triggered by the state change `compute_connected: false -> true`. With that, we don't need "call me maybe" anymore.

As an alternative to compute_connected, we can track timestamp of the latest message arrived to safekeeper from compute. Usually compute broadcasts KeepAlive to all safekeepers every second, so it'll be updated every second when connection is ok. Then the connection can be considered down when this timestamp isn't updated for a several seconds.

This will help to faster detect issues with safekeeper (and switch to another) in the following cases:

when compute failed but TCP connection stays alive until timeout (usually about a minute)
when safekeeper failed and didn't set compute_connected to false

Another way to deal with [2] is to process (write_lsn, commit_lsn, compute_connected) as a KeepAlive on the pageserver side and detect issues when sk_id don't send anything for some time. This way is fully compliant to this RFC.

Also, we don't have a "peer address amnesia" problem as in the gossip approach (with gossip, after a simultaneous reboot, safekeepers wouldn't know each other addresses until the next compute connection).

### Interactions

```
.________________.
sk_1 <-> | | <-> pserver_1
... | etcd | ...
sk_n <-> |________________| <-> pserver_m
```

# Pros/cons

## Console broker/etcd vs gossip:

Gossip pros:
* gossip allows running storage without the console or etcd

Console broker/etcd pros:
* simpler
* solves "call me maybe" as well
* avoid possible N-to-N connection issues with gossip without grouping safekeepers in pre-defined triples

## Console broker vs. etcd:

Initially, I wanted to avoid etcd as a dependency mostly because I've seen how painful for Clickhouse was their ZooKeeper dependency: in each chat, at each conference, people were complaining about configuration and maintenance barriers with ZooKeeper. It was that bad that ClickHouse re-implemented ZooKeeper to embed it: https://clickhouse.com/docs/en/operations/clickhouse-keeper/.

But with an etcd we are in a bit different situation:

1. We don't need persistency and strong consistency guarantees for the data we store in the etcd

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel great about running a single-node in-memory etcd because probably we'd be the first to try that in production. If we need a third-party in-memory kv store, redis would run faster and be easier to manage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't suggest running it in a single-node setup in production. My point was that if we later want to drop etcd as a dependency for self-hosted installations, it would be easier to do so if we do not store any persistent data there -- we can change it to Redis or swap it with custom grpc service. But once we start keeping reliable info there, it would be way harder to switch to something in-house.

2. etcd uses Grpc as a protocol, and messages are pretty simple

So it looks like implementing in-mem store with etcd interface is straightforward thing _if we will want that in future_. At the same time, we can avoid implementing it right now, and we will be able to run local zenith installation with etcd running somewhere in the background (as opposed to building and running console, which in turn requires Postgres).