Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClickHouse sharding plan #8652

Closed
fuziontech opened this issue Feb 16, 2022 · 8 comments
Closed

ClickHouse sharding plan #8652

fuziontech opened this issue Feb 16, 2022 · 8 comments
Assignees
Labels
deployments Anything tied to self-hosted deployments of PostHog enhancement New feature or request

Comments

@fuziontech
Copy link
Member

Plan for how we are going to shard ClickHouse on CH-Operator based installs and Cloud (and externally hosted CH as well)

@fuziontech fuziontech added helm P0 Critical, breaking issue (page crash, missing functionality) labels Feb 16, 2022
@macobo macobo added clickhouse enhancement New feature or request and removed P0 Critical, breaking issue (page crash, missing functionality) labels Feb 22, 2022
@macobo
Copy link
Contributor

macobo commented Feb 22, 2022

There's multiple parts to this. Here's my thinking right now:

Problem being solved

  • Trying to scale vertically hits a limit for the users
  • Schemas locally/for our users and on cloud are out of sync

Also related: https://github.com/PostHog/product-internal/blob/main/requests-for-comments/2022-01-01-highscale-querying.md

Steps

1. Get schemas in sync

PostHog cloud and self-hosted have currently different schemas. CLICKHOUSE_REPLICATION env flag replicates some of this, but not all (and some is wrong, e.g. #7571).

Related PRs:

2. Support sharding in the helm chart

Code-wise should be relatively straight-forward as clickhouse-operator has options for this.

Some known subtasks:

  • Expose sharding options in values.yaml
  • Figure out what how to LB, how to set CLICKHOUSE_STABLE_HOST
  • ClickHouse service is created before all nodes are up - causing a race when creating the schema

Related PRs:

3. Figure out how to upgrade from old schema to new

This is slightly tricky since table engines are changing - meaning we can't rename data-containing tables.

ATTACH PARTITION might work well for this without requiring any data swapping. See also https://stackoverflow.com/questions/68716267/is-it-possible-to-change-a-table-engine-of-an-existed-clickhouse-table

An annoyance here is the differentation between async migrations and normal clickhouse migrations. This will probably be an async migration even though it might not need to actually run anything long-running.

A related gotcha is posthog/async_migrations/migrations/0002_events_sample_by.py - this won't work due to no capability to run the code on all clickhouse nodes at this time. Will make it impossible to run that async migration if user is already sharded.

Related PRs:

4. Rebalancing data

We should figure out a mechanism for "rebalancing" data across a cluster. Note that this isn't strictly needed right now but operationally important for users scaling out.

Note that this logic can be relatively dumb for now and only work for self-hosted - e.g. we can stop ingestion if needed and even duplicate data temporarily as data is getting moved.

5. Documentation, removing CLICKHOUSE_REPLICATION

At this point we have a fully functioning story but it's not yet used anywhere.

Other information

Related issue: ClickHouse/ClickHouse#13574

@yakkomajuri
Copy link
Contributor

As for 0002_events_sample_by won't we be above the limit version by the time sharding is introduced anyway?

@macobo
Copy link
Contributor

macobo commented Feb 22, 2022

As for 0002_events_sample_by won't we be above the limit version by the time sharding is introduced anyway?

See this:

An annoyance here is the differentation between async migrations and normal clickhouse migrations. This will probably be an async migration even though it might not need to actually run anything long-running.

So no, kind of not really due to the design of async migrations and that we need to set up new tables in clickhouse migrations.

@macobo
Copy link
Contributor

macobo commented Mar 1, 2022

Note: After investigating solutions for a while, I'll skip 4. Rebalancing data for now.

Rationale:

@macobo
Copy link
Contributor

macobo commented Mar 11, 2022

All the above has finished, some unexpected things have cropped up:

@macobo
Copy link
Contributor

macobo commented Mar 15, 2022

One last issue that needs resolving: clickhouse-operator only syncs some tables onto new nodes. See #8912 for the posthog-specific solution, but one table we don't replicate that way is the clickhouse migrations one.

I'll add a Distributed engine table to our fork of clickhouse orm.

@macobo
Copy link
Contributor

macobo commented Mar 18, 2022

Tested this by:

  1. Setting up a new cluster with kind.
  2. Deploying the following values.yaml:
cloud: local
  1. Adding the make some noise plugin, to ingest some events.
┌─_table──────────────────────┬─count()─┐
│ events                      │   45765 │
│ person                      │    7599 │
│ person_distinct_id2         │    7595 │
  1. Deployed the following values.yaml - includes all the sharding changes we've done for 1.34
cloud: "local"

image:
  repository: macobo/posthog-test
  sha: sha256:56d7b853ed4fffa158d869bd4877388252f96e35a628a3cba4407076d7a374a9
  1. Ran the sharding async migration on the cluster
  2. Verified schemas and event counts - all looks OK.
  3. Deployed a sharding change:
cloud: "local"

image:
  repository: macobo/posthog-test
  sha: sha256:56d7b853ed4fffa158d869bd4877388252f96e35a628a3cba4407076d7a374a9

clickhouse:
  layout:
    shardsCount: 2
    replicasCount: 2
  1. Verified the following tables on all shards:
Click to see SQL
SELECT
    hostName() AS host,
    name,
    engine_full
FROM clusterAllReplicas('posthog', system, tables)
WHERE name IN ('events', 'person', 'person_distinct_id', 'session_recording_events', 'sharded_events', 'sharded_session_recording_events', 'infi_clickhouse_orm_migrations', 'infi_clickhouse_orm_migrations_distributed')
ORDER BY
    name ASC,
    host ASC
FORMAT Vertical

Query id: a060b40a-4a2b-4793-87dc-0534b191662d

Row 1:
──────
host:        chi-posthog-posthog-0-0-0
name:        events
engine_full: Distributed('posthog', 'posthog', 'sharded_events', sipHash64(distinct_id))

Row 2:
──────
host:        chi-posthog-posthog-0-0-0
name:        events
engine_full: 

Row 3:
──────
host:        chi-posthog-posthog-0-1-0
name:        events
engine_full: Distributed('posthog', 'posthog', 'sharded_events', sipHash64(distinct_id))

Row 4:
──────
host:        chi-posthog-posthog-0-1-0
name:        events
engine_full: 

Row 5:
──────
host:        chi-posthog-posthog-1-0-0
name:        events
engine_full: Distributed('posthog', 'posthog', 'sharded_events', sipHash64(distinct_id))

Row 6:
──────
host:        chi-posthog-posthog-1-0-0
name:        events
engine_full: 

Row 7:
──────
host:        chi-posthog-posthog-1-1-0
name:        events
engine_full: Distributed('posthog', 'posthog', 'sharded_events', sipHash64(distinct_id))

Row 8:
──────
host:        chi-posthog-posthog-1-1-0
name:        events
engine_full: 

Row 9:
───────
host:        chi-posthog-posthog-0-0-0
name:        infi_clickhouse_orm_migrations
engine_full: MergeTree PARTITION BY toYYYYMM(applied) ORDER BY (package_name, module_name) SETTINGS index_granularity = 8192

Row 10:
───────
host:        chi-posthog-posthog-0-1-0
name:        infi_clickhouse_orm_migrations
engine_full: MergeTree PARTITION BY toYYYYMM(applied) ORDER BY (package_name, module_name) SETTINGS index_granularity = 8192

Row 11:
───────
host:        chi-posthog-posthog-1-0-0
name:        infi_clickhouse_orm_migrations
engine_full: MergeTree PARTITION BY toYYYYMM(applied) ORDER BY (package_name, module_name) SETTINGS index_granularity = 8192

Row 12:
───────
host:        chi-posthog-posthog-1-1-0
name:        infi_clickhouse_orm_migrations
engine_full: MergeTree PARTITION BY toYYYYMM(applied) ORDER BY (package_name, module_name) SETTINGS index_granularity = 8192

Row 13:
───────
host:        chi-posthog-posthog-0-0-0
name:        infi_clickhouse_orm_migrations_distributed
engine_full: Distributed('posthog', 'posthog', 'infi_clickhouse_orm_migrations', rand())

Row 14:
───────
host:        chi-posthog-posthog-0-1-0
name:        infi_clickhouse_orm_migrations_distributed
engine_full: Distributed('posthog', 'posthog', 'infi_clickhouse_orm_migrations', rand())

Row 15:
───────
host:        chi-posthog-posthog-1-0-0
name:        infi_clickhouse_orm_migrations_distributed
engine_full: Distributed('posthog', 'posthog', 'infi_clickhouse_orm_migrations', rand())

Row 16:
───────
host:        chi-posthog-posthog-1-1-0
name:        infi_clickhouse_orm_migrations_distributed
engine_full: Distributed('posthog', 'posthog', 'infi_clickhouse_orm_migrations', rand())

Row 17:
───────
host:        chi-posthog-posthog-0-0-0
name:        person
engine_full: ReplicatedReplacingMergeTree('/clickhouse/tables/am0004_20220318084324_noshard/posthog.person', '{replica}-{shard}', _timestamp) ORDER BY (team_id, id) SETTINGS index_granularity = 8192

Row 18:
───────
host:        chi-posthog-posthog-0-1-0
name:        person
engine_full: ReplicatedReplacingMergeTree('/clickhouse/tables/am0004_20220318084324_noshard/posthog.person', '{replica}-{shard}', _timestamp) ORDER BY (team_id, id) SETTINGS index_granularity = 8192

Row 19:
───────
host:        chi-posthog-posthog-1-0-0
name:        person
engine_full: ReplicatedReplacingMergeTree('/clickhouse/tables/am0004_20220318084324_noshard/posthog.person', '{replica}-{shard}', _timestamp) ORDER BY (team_id, id) SETTINGS index_granularity = 8192

Row 20:
───────
host:        chi-posthog-posthog-1-1-0
name:        person
engine_full: ReplicatedReplacingMergeTree('/clickhouse/tables/am0004_20220318084324_noshard/posthog.person', '{replica}-{shard}', _timestamp) ORDER BY (team_id, id) SETTINGS index_granularity = 8192

Row 21:
───────
host:        chi-posthog-posthog-0-0-0
name:        person_distinct_id
engine_full: CollapsingMergeTree(_sign) ORDER BY (team_id, distinct_id, person_id) SETTINGS index_granularity = 8192

Row 22:
───────
host:        chi-posthog-posthog-0-1-0
name:        person_distinct_id
engine_full: CollapsingMergeTree(_sign) ORDER BY (team_id, distinct_id, person_id) SETTINGS index_granularity = 8192

Row 23:
───────
host:        chi-posthog-posthog-1-0-0
name:        person_distinct_id
engine_full: CollapsingMergeTree(_sign) ORDER BY (team_id, distinct_id, person_id) SETTINGS index_granularity = 8192

Row 24:
───────
host:        chi-posthog-posthog-1-1-0
name:        person_distinct_id
engine_full: CollapsingMergeTree(_sign) ORDER BY (team_id, distinct_id, person_id) SETTINGS index_granularity = 8192

Row 25:
───────
host:        chi-posthog-posthog-0-0-0
name:        session_recording_events
engine_full: Distributed('posthog', 'posthog', 'sharded_session_recording_events', sipHash64(distinct_id))

Row 26:
───────
host:        chi-posthog-posthog-0-1-0
name:        session_recording_events
engine_full: Distributed('posthog', 'posthog', 'sharded_session_recording_events', sipHash64(distinct_id))

Row 27:
───────
host:        chi-posthog-posthog-1-0-0
name:        session_recording_events
engine_full: Distributed('posthog', 'posthog', 'sharded_session_recording_events', sipHash64(distinct_id))

Row 28:
───────
host:        chi-posthog-posthog-1-1-0
name:        session_recording_events
engine_full: Distributed('posthog', 'posthog', 'sharded_session_recording_events', sipHash64(distinct_id))

Row 29:
───────
host:        chi-posthog-posthog-0-0-0
name:        sharded_events
engine_full: ReplicatedReplacingMergeTree('/clickhouse/tables/am0004_20220318084324_{shard}/posthog.events', '{replica}', _timestamp) PARTITION BY toYYYYMM(timestamp) ORDER BY (team_id, toDate(timestamp), event, cityHash64(distinct_id), cityHash64(uuid)) SAMPLE BY cityHash64(distinct_id) SETTINGS index_granularity = 8192

Row 30:
───────
host:        chi-posthog-posthog-0-1-0
name:        sharded_events
engine_full: ReplicatedReplacingMergeTree('/clickhouse/tables/am0004_20220318084324_{shard}/posthog.events', '{replica}', _timestamp) PARTITION BY toYYYYMM(timestamp) ORDER BY (team_id, toDate(timestamp), event, cityHash64(distinct_id), cityHash64(uuid)) SAMPLE BY cityHash64(distinct_id) SETTINGS index_granularity = 8192

Row 31:
───────
host:        chi-posthog-posthog-1-0-0
name:        sharded_events
engine_full: ReplicatedReplacingMergeTree('/clickhouse/tables/am0004_20220318084324_{shard}/posthog.events', '{replica}', _timestamp) PARTITION BY toYYYYMM(timestamp) ORDER BY (team_id, toDate(timestamp), event, cityHash64(distinct_id), cityHash64(uuid)) SAMPLE BY cityHash64(distinct_id) SETTINGS index_granularity = 8192

Row 32:
───────
host:        chi-posthog-posthog-1-1-0
name:        sharded_events
engine_full: ReplicatedReplacingMergeTree('/clickhouse/tables/am0004_20220318084324_{shard}/posthog.events', '{replica}', _timestamp) PARTITION BY toYYYYMM(timestamp) ORDER BY (team_id, toDate(timestamp), event, cityHash64(distinct_id), cityHash64(uuid)) SAMPLE BY cityHash64(distinct_id) SETTINGS index_granularity = 8192

Row 33:
───────
host:        chi-posthog-posthog-0-0-0
name:        sharded_session_recording_events
engine_full: ReplicatedReplacingMergeTree('/clickhouse/tables/am0004_20220318084324_{shard}/posthog.session_recording_events', '{replica}', _timestamp) PARTITION BY toYYYYMMDD(timestamp) ORDER BY (team_id, toHour(timestamp), session_id, timestamp, uuid) TTL toDate(created_at) + toIntervalWeek(3) SETTINGS index_granularity = 512

Row 34:
───────
host:        chi-posthog-posthog-0-1-0
name:        sharded_session_recording_events
engine_full: ReplicatedReplacingMergeTree('/clickhouse/tables/am0004_20220318084324_{shard}/posthog.session_recording_events', '{replica}', _timestamp) PARTITION BY toYYYYMMDD(timestamp) ORDER BY (team_id, toHour(timestamp), session_id, timestamp, uuid) TTL toDate(created_at) + toIntervalWeek(3) SETTINGS index_granularity = 512

Row 35:
───────
host:        chi-posthog-posthog-1-0-0
name:        sharded_session_recording_events
engine_full: ReplicatedReplacingMergeTree('/clickhouse/tables/am0004_20220318084324_{shard}/posthog.session_recording_events', '{replica}', _timestamp) PARTITION BY toYYYYMMDD(timestamp) ORDER BY (team_id, toHour(timestamp), session_id, timestamp, uuid) TTL toDate(created_at) + toIntervalWeek(3) SETTINGS index_granularity = 512

Row 36:
───────
host:        chi-posthog-posthog-1-1-0
name:        sharded_session_recording_events
engine_full: ReplicatedReplacingMergeTree('/clickhouse/tables/am0004_20220318084324_{shard}/posthog.session_recording_events', '{replica}', _timestamp) PARTITION BY toYYYYMMDD(timestamp) ORDER BY (team_id, toHour(timestamp), session_id, timestamp, uuid) TTL toDate(created_at) + toIntervalWeek(3) SETTINGS index_granularity = 512

36 rows in set. Elapsed: 0.123 sec.

@guidoiaquinti
Copy link
Contributor

👋 @macobo can we consider this ✅ ?

@macobo macobo closed this as completed May 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
deployments Anything tied to self-hosted deployments of PostHog enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants