Skip to content

Commit

Permalink
feat(sharding): add command to sync tables onto new nodes (#8912)
Browse files Browse the repository at this point in the history
* feat(sharding): add command to sync tables onto new nodes

clickhouse-operator only syncs some tables onto new nodes. This new
command ensures that when adding new shards, they are automatically
synced up on redeploying

Note that there might be timing concerns here as resharding on altinity
cloud does not redeploy automatically. In practice however what this
means is that new nodes just won't ingest any data until another deploy

* Add test to the new command

* Improve non-replicated test
  • Loading branch information
macobo authored Mar 8, 2022
1 parent 717ee74 commit c8d6b22
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 61 deletions.
3 changes: 2 additions & 1 deletion bin/migrate
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ set -e

python manage.py migrate
python manage.py migrate_clickhouse
python manage.py run_async_migrations
python manage.py run_async_migrations
python manage.py sync_replicated_schema
46 changes: 46 additions & 0 deletions ee/clickhouse/sql/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# This file contains all CREATE TABLE queries, used to sync and test schema
import re

from ee.clickhouse.sql.cohort import *
from ee.clickhouse.sql.dead_letter_queue import *
from ee.clickhouse.sql.events import *
from ee.clickhouse.sql.groups import *
from ee.clickhouse.sql.person import *
from ee.clickhouse.sql.plugin_log_entries import *
from ee.clickhouse.sql.session_recording_events import *

CREATE_TABLE_QUERIES = [
CREATE_COHORTPEOPLE_TABLE_SQL,
PERSON_STATIC_COHORT_TABLE_SQL,
DEAD_LETTER_QUEUE_TABLE_SQL,
KAFKA_DEAD_LETTER_QUEUE_TABLE_SQL,
DEAD_LETTER_QUEUE_TABLE_MV_SQL,
EVENTS_TABLE_SQL,
KAFKA_EVENTS_TABLE_SQL,
EVENTS_TABLE_MV_SQL,
GROUPS_TABLE_SQL,
KAFKA_GROUPS_TABLE_SQL,
GROUPS_TABLE_MV_SQL,
PERSONS_TABLE_SQL,
KAFKA_PERSONS_TABLE_SQL,
PERSONS_TABLE_MV_SQL,
PERSONS_DISTINCT_ID_TABLE_SQL,
KAFKA_PERSONS_DISTINCT_ID_TABLE_SQL,
PERSONS_DISTINCT_ID_TABLE_MV_SQL,
PERSON_DISTINCT_ID2_TABLE_SQL,
KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL,
PERSON_DISTINCT_ID2_MV_SQL,
KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL,
PLUGIN_LOG_ENTRIES_TABLE_SQL,
PLUGIN_LOG_ENTRIES_TABLE_MV_SQL,
SESSION_RECORDING_EVENTS_TABLE_SQL,
KAFKA_SESSION_RECORDING_EVENTS_TABLE_SQL,
SESSION_RECORDING_EVENTS_TABLE_MV_SQL,
WRITABLE_EVENTS_TABLE_SQL,
DISTRIBUTED_EVENTS_TABLE_SQL,
WRITABLE_SESSION_RECORDING_EVENTS_TABLE_SQL,
DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL,
]

build_query = lambda query: query if isinstance(query, str) else query()
get_table_name = lambda query: re.findall(r" ([a-z0-9_]+) ON CLUSTER", build_query(query))[0]
48 changes: 3 additions & 45 deletions ee/clickhouse/sql/test/test_schema.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,15 @@
import re
import uuid

import pytest

from ee.clickhouse.sql.cohort import *
from ee.clickhouse.sql.dead_letter_queue import *
from ee.clickhouse.sql.events import *
from ee.clickhouse.sql.groups import *
from ee.clickhouse.sql.person import *
from ee.clickhouse.sql.plugin_log_entries import *
from ee.clickhouse.sql.session_recording_events import *

CREATE_TABLE_QUERIES = [
CREATE_COHORTPEOPLE_TABLE_SQL,
PERSON_STATIC_COHORT_TABLE_SQL,
DEAD_LETTER_QUEUE_TABLE_SQL,
DEAD_LETTER_QUEUE_TABLE_MV_SQL,
KAFKA_DEAD_LETTER_QUEUE_TABLE_SQL,
EVENTS_TABLE_SQL,
KAFKA_EVENTS_TABLE_SQL,
EVENTS_TABLE_MV_SQL,
GROUPS_TABLE_SQL,
KAFKA_GROUPS_TABLE_SQL,
GROUPS_TABLE_MV_SQL,
PERSONS_TABLE_SQL,
KAFKA_PERSONS_TABLE_SQL,
PERSONS_TABLE_MV_SQL,
PERSONS_DISTINCT_ID_TABLE_SQL,
KAFKA_PERSONS_DISTINCT_ID_TABLE_SQL,
PERSONS_DISTINCT_ID_TABLE_MV_SQL,
PERSON_DISTINCT_ID2_TABLE_SQL,
KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL,
PERSON_DISTINCT_ID2_MV_SQL,
KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL,
PLUGIN_LOG_ENTRIES_TABLE_SQL,
PLUGIN_LOG_ENTRIES_TABLE_MV_SQL,
SESSION_RECORDING_EVENTS_TABLE_SQL,
SESSION_RECORDING_EVENTS_TABLE_MV_SQL,
KAFKA_SESSION_RECORDING_EVENTS_TABLE_SQL,
WRITABLE_EVENTS_TABLE_SQL,
DISTRIBUTED_EVENTS_TABLE_SQL,
WRITABLE_SESSION_RECORDING_EVENTS_TABLE_SQL,
DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL,
]

build_query = lambda query: query if isinstance(query, str) else query()
get_table_name = lambda query: re.findall(r" ([a-z0-9_]+) ON CLUSTER", build_query(query))[0]
from ee.clickhouse.sql.schema import CREATE_TABLE_QUERIES, KAFKA_EVENTS_TABLE_SQL, build_query, get_table_name

KAFKA_CREATE_TABLE_QUERIES = [query for query in CREATE_TABLE_QUERIES if "Kafka" in build_query(query)]
MERGE_TREE_TABLE_QUERIES = [query for query in CREATE_TABLE_QUERIES if "MergeTree" in build_query(query)]


@pytest.mark.parametrize("query", CREATE_TABLE_QUERIES, ids=get_table_name)
def test_create_table_query(query, snapshot):
def test_create_table_query(query, snapshot, settings):
settings.CLICKHOUSE_REPLICATION = False

assert build_query(query) == snapshot
Expand Down
63 changes: 63 additions & 0 deletions ee/management/commands/sync_replicated_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from typing import Dict, Set

import structlog
from django.conf import settings
from django.core.management.base import BaseCommand

from ee.clickhouse.client import sync_execute
from ee.clickhouse.sql.schema import CREATE_TABLE_QUERIES, build_query, get_table_name

logger = structlog.get_logger(__name__)


class Command(BaseCommand):
help = "Synchronize schema across clickhouse cluster, creating missing tables on new nodes"

def add_arguments(self, parser):
parser.add_argument(
"--dry-run", action="store_true", help="Exits with a non-zero status if schema changes would be required."
)

def handle(self, *args, **options):
if not settings.CLICKHOUSE_REPLICATION or settings.MULTI_TENANCY:
logger.info("✅ Skipping non-replicated or cloud setup")
return

out_of_sync_hosts = self.get_out_of_sync_hosts()

if len(out_of_sync_hosts) > 0:
logger.info("Schema out of sync on some clickhouse nodes!", out_of_sync_hosts=out_of_sync_hosts)

if options.get("dry_run"):
exit(1)

logger.info("Creating missing tables")
for query in CREATE_TABLE_QUERIES:
sync_execute(build_query(query))

logger.info("✅ All ClickHouse nodes schema in sync")

def get_out_of_sync_hosts(self):
table_names = list(map(get_table_name, CREATE_TABLE_QUERIES))
rows = sync_execute(
"""
SELECT hostName() as host, groupArray(name)
FROM clusterAllReplicas(%(cluster)s, system, tables)
WHERE database = %(database)s
AND name IN %(table_names)s
GROUP BY host
""",
{
"cluster": settings.CLICKHOUSE_CLUSTER,
"database": settings.CLICKHOUSE_DATABASE,
"table_names": table_names,
},
)

out_of_sync: Dict[str, Set[str]] = {}
for host, host_tables in rows:
missing_tables = set(table_names) - set(host_tables)
if len(missing_tables) > 0:
out_of_sync[host] = missing_tables

return out_of_sync
Empty file.
47 changes: 47 additions & 0 deletions ee/management/commands/test/test_sync_replicated_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import pytest
from django.conf import settings

from ee.clickhouse.client import sync_execute
from ee.clickhouse.sql.events import KAFKA_EVENTS_TABLE_SQL
from ee.clickhouse.sql.schema import CREATE_TABLE_QUERIES
from ee.clickhouse.util import ClickhouseTestMixin
from ee.management.commands.sync_replicated_schema import Command
from posthog.conftest import create_clickhouse_tables
from posthog.test.base import BaseTest


@pytest.mark.ee
class TestSyncReplicatedSchema(BaseTest, ClickhouseTestMixin):
def setUp(self):
settings.CLICKHOUSE_REPLICATION = True
self.recreate_database()
sync_execute(KAFKA_EVENTS_TABLE_SQL())

def tearDown(self):
self.recreate_database()
settings.CLICKHOUSE_REPLICATION = False
create_clickhouse_tables(0)

def recreate_database(self):
sync_execute(f"DROP DATABASE {settings.CLICKHOUSE_DATABASE} SYNC")
sync_execute(f"CREATE DATABASE {settings.CLICKHOUSE_DATABASE}")

def test_get_out_of_sync_hosts(self):
# :KLUDGE: We simulate an out-of-sync database by wiping everything but one table
out_of_sync_hosts = Command().get_out_of_sync_hosts()

self.assertEqual(len(out_of_sync_hosts), 1)

[values] = list(out_of_sync_hosts.values())
self.assertEqual(len(values), len(CREATE_TABLE_QUERIES) - 1)

def test_handle_sync(self):
Command().handle()

self.assertEqual(len(Command().get_out_of_sync_hosts()), 0)

def test_handle_not_replicated_does_nothing(self):
settings.CLICKHOUSE_REPLICATION = False

Command().handle()
self.assertEqual(len(Command().get_out_of_sync_hosts()), 1)
23 changes: 8 additions & 15 deletions posthog/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from django.conf import settings
from infi.clickhouse_orm import Database

from ee.clickhouse.client import sync_execute
Expand All @@ -7,14 +8,6 @@
KAFKA_DEAD_LETTER_QUEUE_TABLE_SQL,
TRUNCATE_DEAD_LETTER_QUEUE_TABLE_MV_SQL,
)
from posthog.settings import (
CLICKHOUSE_DATABASE,
CLICKHOUSE_HTTP_URL,
CLICKHOUSE_PASSWORD,
CLICKHOUSE_REPLICATION,
CLICKHOUSE_USER,
CLICKHOUSE_VERIFY,
)
from posthog.test.base import TestMixin


Expand Down Expand Up @@ -53,7 +46,7 @@ def create_clickhouse_tables(num_tables: int):
GROUPS_TABLE_SQL(),
]

if CLICKHOUSE_REPLICATION:
if settings.CLICKHOUSE_REPLICATION:
TABLES_TO_CREATE_DROP.extend([DISTRIBUTED_EVENTS_TABLE_SQL(), DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL()])

if num_tables == len(TABLES_TO_CREATE_DROP):
Expand Down Expand Up @@ -101,11 +94,11 @@ def reset_clickhouse_tables():
@pytest.fixture(scope="package")
def django_db_setup(django_db_setup, django_db_keepdb):
database = Database(
CLICKHOUSE_DATABASE,
db_url=CLICKHOUSE_HTTP_URL,
username=CLICKHOUSE_USER,
password=CLICKHOUSE_PASSWORD,
verify_ssl_cert=CLICKHOUSE_VERIFY,
settings.CLICKHOUSE_DATABASE,
db_url=settings.CLICKHOUSE_HTTP_URL,
username=settings.CLICKHOUSE_USER,
password=settings.CLICKHOUSE_PASSWORD,
verify_ssl_cert=settings.CLICKHOUSE_VERIFY,
)

if not django_db_keepdb:
Expand All @@ -116,7 +109,7 @@ def django_db_setup(django_db_setup, django_db_keepdb):

database.create_database() # Create database if it doesn't exist
table_count = sync_execute(
"SELECT count() FROM system.tables WHERE database = %(database)s", {"database": CLICKHOUSE_DATABASE}
"SELECT count() FROM system.tables WHERE database = %(database)s", {"database": settings.CLICKHOUSE_DATABASE}
)[0][0]
create_clickhouse_tables(table_count)

Expand Down

0 comments on commit c8d6b22

Please sign in to comment.