Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refuse to upgrade database on worker processes #8266

Merged
merged 4 commits into from
Sep 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8266.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Do not attempt to upgrade upgrade database schema on worker processes.
78 changes: 61 additions & 17 deletions synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ class UpgradeDatabaseException(PrepareDatabaseException):
pass


OUTDATED_SCHEMA_ON_WORKER_ERROR = (
"Expected database schema version %i but got %i: run the main synapse process to "
"upgrade the database schema before starting worker processes."
)

EMPTY_DATABASE_ON_WORKER_ERROR = (
"Uninitialised database: run the main synapse process to prepare the database "
"schema before starting worker processes."
)

UNAPPLIED_DELTA_ON_WORKER_ERROR = (
"Database schema delta %s has not been applied: run the main synapse process to "
"upgrade the database schema before starting worker processes."
)


def prepare_database(db_conn, database_engine, config, databases=["main", "state"]):
"""Prepares a physical database for usage. Will either create all necessary tables
or upgrade from an older schema version.
Expand All @@ -71,25 +87,35 @@ def prepare_database(db_conn, database_engine, config, databases=["main", "state
if version_info:
user_version, delta_files, upgraded = version_info

# config should only be None when we are preparing an in-memory SQLite db,
# which should be empty.
if config is None:
if user_version != SCHEMA_VERSION:
# If we don't pass in a config file then we are expecting to
# have already upgraded the DB.
raise UpgradeDatabaseException(
"Expected database schema version %i but got %i"
% (SCHEMA_VERSION, user_version)
)
else:
_upgrade_existing_database(
cur,
user_version,
delta_files,
upgraded,
database_engine,
config,
databases=databases,
raise ValueError(
"config==None in prepare_database, but databse is not empty"
)

# if it's a worker app, refuse to upgrade the database, to avoid multiple
# workers doing it at once.
if config.worker_app is not None and user_version != SCHEMA_VERSION:
raise UpgradeDatabaseException(
OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, user_version)
)

_upgrade_existing_database(
cur,
user_version,
delta_files,
upgraded,
database_engine,
config,
databases=databases,
)
else:
# if it's a worker app, refuse to upgrade the database, to avoid multiple
# workers doing it at once.
if config and config.worker_app is not None:
raise UpgradeDatabaseException(EMPTY_DATABASE_ON_WORKER_ERROR)

_setup_new_database(cur, database_engine, databases=databases)

# check if any of our configured dynamic modules want a database
Expand Down Expand Up @@ -295,6 +321,8 @@ def _upgrade_existing_database(
else:
assert config

is_worker = config and config.worker_app is not None

if current_version > SCHEMA_VERSION:
raise ValueError(
"Cannot use this database as it is too "
Expand Down Expand Up @@ -322,7 +350,7 @@ def _upgrade_existing_database(
specific_engine_extensions = (".sqlite", ".postgres")

for v in range(start_ver, SCHEMA_VERSION + 1):
logger.info("Upgrading schema to v%d", v)
logger.info("Applying schema deltas for v%d", v)

# We need to search both the global and per data store schema
# directories for schema updates.
Expand Down Expand Up @@ -382,9 +410,15 @@ def _upgrade_existing_database(
continue

root_name, ext = os.path.splitext(file_name)

if ext == ".py":
# This is a python upgrade module. We need to import into some
# package and then execute its `run_upgrade` function.
if is_worker:
raise PrepareDatabaseException(
UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
)

module_name = "synapse.storage.v%d_%s" % (v, root_name)
with open(absolute_path) as python_file:
module = imp.load_source(module_name, absolute_path, python_file)
Expand All @@ -399,10 +433,18 @@ def _upgrade_existing_database(
continue
elif ext == ".sql":
# A plain old .sql file, just read and execute it
if is_worker:
raise PrepareDatabaseException(
UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
)
logger.info("Applying schema %s", relative_path)
executescript(cur, absolute_path)
elif ext == specific_engine_extension and root_name.endswith(".sql"):
# A .sql file specific to our engine; just read and execute it
if is_worker:
raise PrepareDatabaseException(
UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
)
logger.info("Applying engine-specific schema %s", relative_path)
executescript(cur, absolute_path)
elif ext in specific_engine_extensions and root_name.endswith(".sql"):
Expand Down Expand Up @@ -432,6 +474,8 @@ def _upgrade_existing_database(
(v, True),
)

logger.info("Schema now up to date")


def _apply_module_schemas(txn, database_engine, config):
"""Apply the module schemas for the dynamic modules, if any
Expand Down