Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert run_as_background_process inner function into an async function #8032

Merged
merged 5 commits into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8032.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
2 changes: 1 addition & 1 deletion synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async def handle_event(event):

async def start_scheduler():
try:
return self.scheduler.start()
return await self.scheduler.start()
except Exception:
logger.error("Application Services Failure")

Expand Down
5 changes: 2 additions & 3 deletions synapse/http/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,9 @@ def processing(self):

Returns a context manager; the correct way to use this is:

@defer.inlineCallbacks
def handle_request(request):
async def handle_request(request):
with request.processing("FooServlet"):
yield really_handle_the_request()
await really_handle_the_request()

Once the context manager is closed, the completion of the request will be logged,
and the various metrics will be updated.
Expand Down
34 changes: 12 additions & 22 deletions synapse/metrics/background_process_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import inspect
import logging
import threading
from asyncio import iscoroutine
from functools import wraps
from typing import TYPE_CHECKING, Dict, Optional, Set

from prometheus_client.core import REGISTRY, Counter, Gauge

from twisted.internet import defer
from twisted.python.failure import Failure

from synapse.logging.context import LoggingContext, PreserveLoggingContext

Expand Down Expand Up @@ -167,7 +166,7 @@ def update_metrics(self):
)


def run_as_background_process(desc, func, *args, **kwargs):
def run_as_background_process(desc: str, func, *args, **kwargs):
"""Run the given function in its own logcontext, with resource metrics

This should be used to wrap processes which are fired off to run in the
Expand All @@ -179,7 +178,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
normal synapse inlineCallbacks function).

Args:
desc (str): a description for this background process type
desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine
args: positional args for func
kwargs: keyword args for func
Expand All @@ -188,8 +187,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
follow the synapse logcontext rules.
"""

@defer.inlineCallbacks
def run():
async def run():
with _bg_metrics_lock:
count = _background_process_counts.get(desc, 0)
_background_process_counts[desc] = count + 1
Expand All @@ -203,29 +201,21 @@ def run():
try:
result = func(*args, **kwargs)

# We probably don't have an ensureDeferred in our call stack to handle
# coroutine results, so we need to ensureDeferred here.
#
# But we need this check because ensureDeferred doesn't like being
# called on immediate values (as opposed to Deferreds or coroutines).
if iscoroutine(result):
result = defer.ensureDeferred(result)
if inspect.isawaitable(result):
result = await result

return (yield result)
return result
except Exception:
# failure.Failure() fishes the original Failure out of our stack, and
# thus gives us a sensible stack trace.
f = Failure()
logger.error(
"Background process '%s' threw an exception",
desc,
exc_info=(f.type, f.value, f.getTracebackObject()),
logger.exception(
"Background process '%s' threw an exception", desc,
)
finally:
_background_process_in_flight_count.labels(desc).dec()

with PreserveLoggingContext():
return run()
# Note that we return a Deferred here so that it can be used in a
# looping_call and other places that expect a Deferred.
return defer.ensureDeferred(run())


def wrap_as_background_process(desc):
Expand Down