Skip to content

Commit

Permalink
Merge pull request #8 from PostHog/distributed-migrations-table
Browse files Browse the repository at this point in the history
fix(sharding): Distributed migrations table
  • Loading branch information
macobo authored Mar 15, 2022
2 parents 6e2e2b7 + 5c3c2d8 commit 15a4530
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 15 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

SETUP_INFO = dict(
name = 'infi.clickhouse_orm',
version = '2.1.0.post15',
version = '2.1.0.post16',
author = 'James Greenhill',
author_email = 'fuziontech@gmail.com',

Expand Down
20 changes: 14 additions & 6 deletions src/infi/clickhouse_orm/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class Database(object):
'''

def __init__(self, db_name, db_url='http://localhost:8123/',
username=None, password=None, readonly=False, autocreate=True,
username=None, password=None, cluster=None,
readonly=False, autocreate=True,
timeout=60, verify_ssl_cert=True, log_statements=False):
'''
Initializes a database instance. Unless it's readonly, the database will be
Expand All @@ -91,6 +92,7 @@ def __init__(self, db_name, db_url='http://localhost:8123/',
- `db_url`: URL of the ClickHouse server.
- `username`: optional connection credentials.
- `password`: optional connection credentials.
- `cluster`: optional cluster to create tables on
- `readonly`: use a read-only connection.
- `autocreate`: automatically create the database if it does not exist (unless in readonly mode).
- `timeout`: the connection timeout in seconds.
Expand All @@ -99,6 +101,7 @@ def __init__(self, db_name, db_url='http://localhost:8123/',
'''
self.db_name = db_name
self.db_url = db_url
self.cluster = cluster
self.readonly = False
self.timeout = timeout
self.request_session = requests.Session()
Expand Down Expand Up @@ -352,12 +355,17 @@ def migrate(self, migrations_package_name, up_to=9999, replicated=False):
break

def _get_applied_migrations(self, migrations_package_name, replicated):
from .migrations import MigrationHistory, MigrationHistoryReplicated
from .migrations import MigrationHistory, MigrationHistoryReplicated, MigrationHistoryDistributed

Model = MigrationHistoryReplicated if replicated else MigrationHistory
self.create_table(Model)
query = "SELECT module_name from $table WHERE package_name = '%s'" % migrations_package_name
query = self._substitute(query, Model)
query = "SELECT DISTINCT module_name FROM $table WHERE package_name = '%s'" % migrations_package_name

if replicated:
self.create_table(MigrationHistoryReplicated)
self.create_table(MigrationHistoryDistributed)
query = self._substitute(query, MigrationHistoryDistributed)
else:
self.create_table(MigrationHistory)
query = self._substitute(query, MigrationHistory)
return set(obj.module_name for obj in self.select(query))

def _send(self, data, settings=None, stream=False):
Expand Down
10 changes: 7 additions & 3 deletions src/infi/clickhouse_orm/engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ class Distributed(Engine):
See full documentation here
https://clickhouse.tech/docs/en/engines/table-engines/special/distributed/
"""
def __init__(self, cluster, table=None, sharding_key=None):
def __init__(self, cluster=None, table=None, sharding_key=None):
"""
- `cluster`: what cluster to access data from
- `cluster`: what cluster to access data from. Defaults to db.cluster
- `table`: underlying table that actually stores data.
If you are not specifying any table here, ensure that it can be inferred
from your model's superclass (see models.DistributedModel.fix_engine_table)
Expand Down Expand Up @@ -259,7 +259,11 @@ def _build_sql_params(self, db):
raise ValueError("Cannot create {} engine: specify an underlying table".format(
self.__class__.__name__))

params = ["`%s`" % p for p in [self.cluster, db.db_name, self.table_name]]
if self.cluster is None and db.cluster is None:
raise ValueError("Cannot create engine: specify a cluster")

cluster = self.cluster if self.cluster is not None else db.cluster
params = ["`%s`" % p for p in [cluster, db.db_name, self.table_name]]
if self.sharding_key:
params.append(self.sharding_key)
return params
Expand Down
21 changes: 18 additions & 3 deletions src/infi/clickhouse_orm/migrations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from .engines import MergeTree
from .engines import Distributed, MergeTree
from .fields import DateField, StringField
from .models import BufferModel, Model
from .utils import escape, get_subclass_names
Expand Down Expand Up @@ -294,18 +294,33 @@ class MigrationHistoryReplicated(Model):
package_name = StringField()
module_name = StringField()
applied = DateField()

engine = MergeTree(
"applied",
("package_name", "module_name"),
replica_table_path="/clickhouse/prod/tables/noshard/posthog.infi_clickhouse_orm_migrations",
replica_name="{replica}-{shard}",
)

@classmethod
def table_name(cls):
return "infi_clickhouse_orm_migrations"

class MigrationHistoryDistributed(Model):
"""
Distributed table for storing which migrations are applied to the containing database
"""

package_name = StringField()
module_name = StringField()
applied = DateField()

engine = Distributed(table="infi_clickhouse_orm_migrations", sharding_key="rand()")

@classmethod
def table_name(cls):
return "infi_clickhouse_orm_migrations_distributed"


# Expose only relevant classes in import *
__all__ = get_subclass_names(locals(), Operation)
4 changes: 2 additions & 2 deletions src/infi/clickhouse_orm/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytz

from .fields import Field, StringField
from .utils import parse_tsv, NO_VALUE, get_subclass_names, arg_to_sql, unescape
from .utils import on_cluster, parse_tsv, NO_VALUE, get_subclass_names, arg_to_sql, unescape
from .query import QuerySet
from .funcs import F
from .engines import Merge, Distributed
Expand Down Expand Up @@ -352,7 +352,7 @@ def create_table_sql(cls, db):
'''
Returns the SQL statement for creating a table for this model.
'''
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db.db_name, cls.table_name())]
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` %s (' % (db.db_name, cls.table_name(), on_cluster(db))]
# Fields
items = []
for name, field in cls.fields().items():
Expand Down
6 changes: 6 additions & 0 deletions src/infi/clickhouse_orm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ def get_subclass_names(locals, base_class):
from inspect import isclass
return [c.__name__ for c in locals.values() if isclass(c) and issubclass(c, base_class)]

def on_cluster(db):
if db.cluster is not None:
return "ON CLUSTER '{}'".format(db.cluster)
else:
return ''


class NoValue:
'''
Expand Down

0 comments on commit 15a4530

Please sign in to comment.