Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Run _upgrade_existing_database on workers if at current schema_version #11346

Merged
merged 5 commits into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11346.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in v1.47.0rc1 which caused worker processes to not halt startup in the presence of outstanding database migrations.
40 changes: 22 additions & 18 deletions synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,24 +131,16 @@ def prepare_database(
"config==None in prepare_database, but database is not empty"
)

# if it's a worker app, refuse to upgrade the database, to avoid multiple
# workers doing it at once.
if config.worker.worker_app is None:
_upgrade_existing_database(
cur,
version_info,
database_engine,
config,
databases=databases,
)
elif version_info.current_version < SCHEMA_VERSION:
# If the DB is on an older version than we expect then we refuse
# to start the worker (as the main process needs to run first to
# update the schema).
raise UpgradeDatabaseException(
OUTDATED_SCHEMA_ON_WORKER_ERROR
% (SCHEMA_VERSION, version_info.current_version)
)
# This should be run on all processes, master or worker. The master will
# apply the deltas, while workers will check if any outstanding deltas
# exist and raise an PrepareDatabaseException if they do.
_upgrade_existing_database(
cur,
version_info,
database_engine,
config,
databases=databases,
)

else:
logger.info("%r: Initialising new database", databases)
Expand Down Expand Up @@ -358,6 +350,18 @@ def _upgrade_existing_database(

is_worker = config and config.worker.worker_app is not None

# If the schema version needs to be updated, and we are on a worker, we immediately
# know to bail out as workers cannot update the database schema. Only one worker
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
# must update the database at the time, therefore we delegate this task to the master.
if is_worker and current_schema_state.current_version < SCHEMA_VERSION:
# If the DB is on an older version than we expect then we refuse
# to start the worker (as the main process needs to run first to
# update the schema).
raise UpgradeDatabaseException(
OUTDATED_SCHEMA_ON_WORKER_ERROR
% (SCHEMA_VERSION, current_schema_state.current_version)
)

if (
current_schema_state.compat_version is not None
and current_schema_state.compat_version > SCHEMA_VERSION
Expand Down
52 changes: 51 additions & 1 deletion tests/storage/test_rollback_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from unittest import mock

from synapse.app.generic_worker import GenericWorkerServer
from synapse.storage.database import LoggingDatabaseConnection
from synapse.storage.prepare_database import PrepareDatabaseException, prepare_database
Expand All @@ -19,6 +22,22 @@
from tests.unittest import HomeserverTestCase


def fake_listdir(filepath: str) -> List[str]:
"""
A fake implementation of os.listdir which we can use to mock out the filesystem.

Args:
filepath: The directory to list files for.

Returns:
A list of files and folders in the directory.
"""
if filepath.endswith("full_schemas"):
return [SCHEMA_VERSION]

return ["99_add_unicorn_to_database.sql"]


class WorkerSchemaTests(HomeserverTestCase):
def make_homeserver(self, reactor, clock):
hs = self.setup_test_homeserver(
Expand Down Expand Up @@ -51,7 +70,7 @@ def test_rolling_back(self):

prepare_database(db_conn, db_pool.engine, self.hs.config)

def test_not_upgraded(self):
def test_not_upgraded_old_schema_version(self):
"""Test that workers don't start if the DB has an older schema version"""
db_pool = self.hs.get_datastore().db_pool
db_conn = LoggingDatabaseConnection(
Expand All @@ -67,3 +86,34 @@ def test_not_upgraded(self):

with self.assertRaises(PrepareDatabaseException):
prepare_database(db_conn, db_pool.engine, self.hs.config)

def test_not_upgraded_current_schema_version_with_outstanding_deltas(self):
"""
Test that workers don't start if the DB is on the current schema version,
but there are still outstanding delta migrations to run.
"""
db_pool = self.hs.get_datastore().db_pool
db_conn = LoggingDatabaseConnection(
db_pool._db_pool.connect(),
db_pool.engine,
"tests",
)

# Set the schema version of the database to the current version
cur = db_conn.cursor()
cur.execute("UPDATE schema_version SET version = ?", (SCHEMA_VERSION,))

db_conn.commit()

# Path `os.listdir` here to make synapse think that there is a migration
# file ready to be run.
# Note that we can't patch this function for the whole method, else Synapse
# will try to find the file when building the database initially.
with mock.patch("os.listdir", mock.Mock(side_effect=fake_listdir)):
with self.assertRaises(PrepareDatabaseException):
# Synapse should think that there is an outstanding migration file due to
# patching 'os.listdir' in the function decorator.
#
# We expect Synapse to raise an exception to indicate the master process
# needs to apply this migration file.
prepare_database(db_conn, db_pool.engine, self.hs.config)