Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Only run one background update at a time #7190

Merged
merged 8 commits into from
Apr 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7190.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Only run one background database update at a time.
114 changes: 56 additions & 58 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ def __init__(self, hs, database):
self._clock = hs.get_clock()
self.db = database

# if a background update is currently running, its name.
self._current_background_update = None # type: Optional[str]

self._background_update_performance = {}
self._background_update_queue = []
self._background_update_handlers = {}
self._all_done = False

Expand All @@ -111,34 +113,33 @@ async def run_background_updates(self, sleep=True):
except Exception:
logger.exception("Error doing update")
else:
if result is None:
if result:
logger.info(
"No more background updates to do."
" Unscheduling background update task."
)
self._all_done = True
return None

@defer.inlineCallbacks
def has_completed_background_updates(self):
async def has_completed_background_updates(self) -> bool:
"""Check if all the background updates have completed

Returns:
Deferred[bool]: True if all background updates have completed
True if all background updates have completed
"""
# if we've previously determined that there is nothing left to do, that
# is easy
if self._all_done:
return True

# obviously, if we have things in our queue, we're not done.
if self._background_update_queue:
# obviously, if we are currently processing an update, we're not done.
if self._current_background_update:
return False

# otherwise, check if there are updates to be run. This is important,
# as we may be running on a worker which doesn't perform the bg updates
# itself, but still wants to wait for them to happen.
updates = yield self.db.simple_select_onecol(
updates = await self.db.simple_select_onecol(
"background_updates",
keyvalues=None,
retcol="1",
Expand All @@ -153,11 +154,10 @@ def has_completed_background_updates(self):
async def has_completed_background_update(self, update_name) -> bool:
"""Check if the given background update has finished running.
"""

if self._all_done:
return True

if update_name in self._background_update_queue:
if update_name == self._current_background_update:
return False

update_exists = await self.db.simple_select_one_onecol(
Expand All @@ -170,9 +170,7 @@ async def has_completed_background_update(self, update_name) -> bool:

return not update_exists

async def do_next_background_update(
self, desired_duration_ms: float
) -> Optional[int]:
async def do_next_background_update(self, desired_duration_ms: float) -> bool:
"""Does some amount of work on the next queued background update

Returns once some amount of work is done.
Expand All @@ -181,33 +179,51 @@ async def do_next_background_update(
desired_duration_ms(float): How long we want to spend
updating.
Returns:
None if there is no more work to do, otherwise an int
True if we have finished running all the background updates, otherwise False
"""
if not self._background_update_queue:
updates = await self.db.simple_select_list(
"background_updates",
keyvalues=None,
retcols=("update_name", "depends_on"),

def get_background_updates_txn(txn):
txn.execute(
"""
SELECT update_name, depends_on FROM background_updates
ORDER BY ordering, update_name
"""
)
in_flight = {update["update_name"] for update in updates}
for update in updates:
if update["depends_on"] not in in_flight:
self._background_update_queue.append(update["update_name"])
return self.db.cursor_to_dict(txn)

if not self._background_update_queue:
# no work left to do
return None
if not self._current_background_update:
all_pending_updates = await self.db.runInteraction(
"background_updates", get_background_updates_txn,
)
if not all_pending_updates:
# no work left to do
return True

# find the first update which isn't dependent on another one in the queue.
pending = {update["update_name"] for update in all_pending_updates}
for upd in all_pending_updates:
depends_on = upd["depends_on"]
if not depends_on or depends_on not in pending:
break
logger.info(
"Not starting on bg update %s until %s is done",
upd["update_name"],
depends_on,
)
else:
# if we get to the end of that for loop, there is a problem
raise Exception(
"Unable to find a background update which doesn't depend on "
"another: dependency cycle?"
)

# pop from the front, and add back to the back
update_name = self._background_update_queue.pop(0)
self._background_update_queue.append(update_name)
self._current_background_update = upd["update_name"]

res = await self._do_background_update(update_name, desired_duration_ms)
return res
await self._do_background_update(desired_duration_ms)
return False

async def _do_background_update(
self, update_name: str, desired_duration_ms: float
) -> int:
async def _do_background_update(self, desired_duration_ms: float) -> int:
update_name = self._current_background_update
logger.info("Starting update batch on background update '%s'", update_name)

update_handler = self._background_update_handlers[update_name]
Expand Down Expand Up @@ -400,27 +416,6 @@ def updater(progress, batch_size):

self.register_background_update_handler(update_name, updater)

def start_background_update(self, update_name, progress):
"""Starts a background update running.

Args:
update_name: The update to set running.
progress: The initial state of the progress of the update.

Returns:
A deferred that completes once the task has been added to the
queue.
"""
# Clear the background update queue so that we will pick up the new
# task on the next iteration of do_background_update.
self._background_update_queue = []
progress_json = json.dumps(progress)

return self.db.simple_insert(
"background_updates",
{"update_name": update_name, "progress_json": progress_json},
)

def _end_background_update(self, update_name):
"""Removes a completed background update task from the queue.

Expand All @@ -429,9 +424,12 @@ def _end_background_update(self, update_name):
Returns:
A deferred that completes once the task is removed.
"""
self._background_update_queue = [
name for name in self._background_update_queue if name != update_name
]
if update_name != self._current_background_update:
raise Exception(
"Cannot end background update %s which isn't currently running"
% update_name
)
self._current_background_update = None
return self.db.simple_delete_one(
"background_updates", keyvalues={"update_name": update_name}
)
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 57
SCHEMA_VERSION = 58

dir_path = os.path.abspath(os.path.dirname(__file__))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/* add an "ordering" column to background_updates, which can be used to sort them
to achieve some level of consistency. */

ALTER TABLE background_updates ADD COLUMN ordering INT NOT NULL DEFAULT 0;
24 changes: 15 additions & 9 deletions tests/storage/test_background_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, homeserver):
self.updates = self.hs.get_datastore().db.updates # type: BackgroundUpdater
# the base test class should have run the real bg updates for us
self.assertTrue(self.updates.has_completed_background_updates())
self.assertTrue(
self.get_success(self.updates.has_completed_background_updates())
)

self.update_handler = Mock()
self.updates.register_background_update_handler(
Expand All @@ -25,12 +27,20 @@ def test_do_background_update(self):
# the target runtime for each bg update
target_background_update_duration_ms = 50000

store = self.hs.get_datastore()
self.get_success(
store.db.simple_insert(
"background_updates",
values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
)
)

# first step: make a bit of progress
@defer.inlineCallbacks
def update(progress, count):
yield self.clock.sleep((count * duration_ms) / 1000)
progress = {"my_key": progress["my_key"] + 1}
yield self.hs.get_datastore().db.runInteraction(
yield store.db.runInteraction(
"update_progress",
self.updates._background_update_progress_txn,
"test_update",
Expand All @@ -39,18 +49,14 @@ def update(progress, count):
return count

self.update_handler.side_effect = update

self.get_success(
self.updates.start_background_update("test_update", {"my_key": 1})
)
self.update_handler.reset_mock()
res = self.get_success(
self.updates.do_next_background_update(
target_background_update_duration_ms
),
by=0.1,
)
self.assertIsNotNone(res)
self.assertFalse(res)

# on the first call, we should get run with the default background update size
self.update_handler.assert_called_once_with(
Expand All @@ -73,13 +79,13 @@ def update(progress, count):
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
self.assertIsNotNone(result)
self.assertFalse(result)
self.update_handler.assert_called_once()

# third step: we don't expect to be called any more
self.update_handler.reset_mock()
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
self.assertIsNone(result)
self.assertTrue(result)
self.assertFalse(self.update_handler.called)
11 changes: 7 additions & 4 deletions tests/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from synapse.http.site import SynapseRequest, SynapseSite
from synapse.logging.context import (
SENTINEL_CONTEXT,
LoggingContext,
current_context,
set_current_context,
)
Expand Down Expand Up @@ -419,15 +420,17 @@ def setup_test_homeserver(self, *args, **kwargs):
config_obj.parse_config_dict(config, "", "")
kwargs["config"] = config_obj

async def run_bg_updates():
with LoggingContext("run_bg_updates", request="run_bg_updates-1"):
while not await stor.db.updates.has_completed_background_updates():
await stor.db.updates.do_next_background_update(1)

hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
stor = hs.get_datastore()

# Run the database background updates, when running against "master".
if hs.__class__.__name__ == "TestHomeServer":
while not self.get_success(
stor.db.updates.has_completed_background_updates()
):
self.get_success(stor.db.updates.do_next_background_update(1))
self.get_success(run_bg_updates())

return hs

Expand Down