Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge branch 'develop' into anoa/v2_lookup
Browse files Browse the repository at this point in the history
* develop:
  Drop some unused tables. (#5893)
  Avoid deep recursion in appservice recovery (#5885)
  Opentracing doc update (#5776)
  Refactor the Appservice scheduler code
  • Loading branch information
anoadragon453 committed Aug 21, 2019
2 parents 3a114fe + 4dab867 commit 5426e13
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 141 deletions.
1 change: 1 addition & 0 deletions changelog.d/5776.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update opentracing docs to use the unified `trace` method.
1 change: 1 addition & 0 deletions changelog.d/5885.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix stack overflow when recovering an appservice which had an outage.
1 change: 1 addition & 0 deletions changelog.d/5886.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor the Appservice scheduler code.
1 change: 1 addition & 0 deletions changelog.d/5893.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Drop some unused tables.
153 changes: 90 additions & 63 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,35 +70,37 @@ def __init__(self, hs):
self.store = hs.get_datastore()
self.as_api = hs.get_application_service_api()

def create_recoverer(service, callback):
return _Recoverer(self.clock, self.store, self.as_api, service, callback)

self.txn_ctrl = _TransactionController(
self.clock, self.store, self.as_api, create_recoverer
)
self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)

@defer.inlineCallbacks
def start(self):
logger.info("Starting appservice scheduler")

# check for any DOWN ASes and start recoverers for them.
recoverers = yield _Recoverer.start(
self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
services = yield self.store.get_appservices_by_state(
ApplicationServiceState.DOWN
)
self.txn_ctrl.add_recoverers(recoverers)

for service in services:
self.txn_ctrl.start_recoverer(service)

def submit_event_for_as(self, service, event):
self.queuer.enqueue(service, event)


class _ServiceQueuer(object):
"""Queues events for the same application service together, sending
transactions as soon as possible. Once a transaction is sent successfully,
this schedules any other events in the queue to run.
"""Queue of events waiting to be sent to appservices.
Groups events into transactions per-appservice, and sends them on to the
TransactionController. Makes sure that we only have one transaction in flight per
appservice at a given time.
"""

def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]}

# the appservices which currently have a transaction in flight
self.requests_in_flight = set()
self.txn_ctrl = txn_ctrl
self.clock = clock
Expand Down Expand Up @@ -136,13 +138,29 @@ def _send_request(self, service):


class _TransactionController(object):
def __init__(self, clock, store, as_api, recoverer_fn):
"""Transaction manager.
Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer
if a transaction fails.
(Note we have only have one of these in the homeserver.)
Args:
clock (synapse.util.Clock):
store (synapse.storage.DataStore):
as_api (synapse.appservice.api.ApplicationServiceApi):
"""

def __init__(self, clock, store, as_api):
self.clock = clock
self.store = store
self.as_api = as_api
self.recoverer_fn = recoverer_fn
# keep track of how many recoverers there are
self.recoverers = []

# map from service id to recoverer instance
self.recoverers = {}

# for UTs
self.RECOVERER_CLASS = _Recoverer

@defer.inlineCallbacks
def send(self, service, events):
Expand All @@ -154,61 +172,63 @@ def send(self, service, events):
if sent:
yield txn.complete(self.store)
else:
run_in_background(self._start_recoverer, service)
run_in_background(self._on_txn_fail, service)
except Exception:
logger.exception("Error creating appservice transaction")
run_in_background(self._start_recoverer, service)
run_in_background(self._on_txn_fail, service)

@defer.inlineCallbacks
def on_recovered(self, recoverer):
self.recoverers.remove(recoverer)
logger.info(
"Successfully recovered application service AS ID %s", recoverer.service.id
)
self.recoverers.pop(recoverer.service.id)
logger.info("Remaining active recoverers: %s", len(self.recoverers))
yield self.store.set_appservice_state(
recoverer.service, ApplicationServiceState.UP
)

def add_recoverers(self, recoverers):
for r in recoverers:
self.recoverers.append(r)
if len(recoverers) > 0:
logger.info("New active recoverers: %s", len(self.recoverers))

@defer.inlineCallbacks
def _start_recoverer(self, service):
def _on_txn_fail(self, service):
try:
yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
logger.info(
"Application service falling behind. Starting recoverer. AS ID %s",
service.id,
)
recoverer = self.recoverer_fn(service, self.on_recovered)
self.add_recoverers([recoverer])
recoverer.recover()
self.start_recoverer(service)
except Exception:
logger.exception("Error starting AS recoverer")

def start_recoverer(self, service):
"""Start a Recoverer for the given service
Args:
service (synapse.appservice.ApplicationService):
"""
logger.info("Starting recoverer for AS ID %s", service.id)
assert service.id not in self.recoverers
recoverer = self.RECOVERER_CLASS(
self.clock, self.store, self.as_api, service, self.on_recovered
)
self.recoverers[service.id] = recoverer
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))

@defer.inlineCallbacks
def _is_service_up(self, service):
state = yield self.store.get_appservice_state(service)
return state == ApplicationServiceState.UP or state is None


class _Recoverer(object):
@staticmethod
@defer.inlineCallbacks
def start(clock, store, as_api, callback):
services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
for r in recoverers:
logger.info(
"Starting recoverer for AS ID %s which was marked as " "DOWN",
r.service.id,
)
r.recover()
return recoverers
"""Manages retries and backoff for a DOWN appservice.
We have one of these for each appservice which is currently considered DOWN.
Args:
clock (synapse.util.Clock):
store (synapse.storage.DataStore):
as_api (synapse.appservice.api.ApplicationServiceApi):
service (synapse.appservice.ApplicationService): the service we are managing
callback (callable[_Recoverer]): called once the service recovers.
"""

def __init__(self, clock, store, as_api, service, callback):
self.clock = clock
Expand All @@ -224,7 +244,9 @@ def _retry():
"as-recoverer-%s" % (self.service.id,), self.retry
)

self.clock.call_later((2 ** self.backoff_counter), _retry)
delay = 2 ** self.backoff_counter
logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
self.clock.call_later(delay, _retry)

def _backoff(self):
# cap the backoff to be around 8.5min => (2^9) = 512 secs
Expand All @@ -234,25 +256,30 @@ def _backoff(self):

@defer.inlineCallbacks
def retry(self):
logger.info("Starting retries on %s", self.service.id)
try:
txn = yield self.store.get_oldest_unsent_txn(self.service)
if txn:
while True:
txn = yield self.store.get_oldest_unsent_txn(self.service)
if not txn:
# nothing left: we're done!
self.callback(self)
return

logger.info(
"Retrying transaction %s for AS ID %s", txn.id, txn.service.id
)
sent = yield txn.send(self.as_api)
if sent:
yield txn.complete(self.store)
# reset the backoff counter and retry immediately
self.backoff_counter = 1
yield self.retry()
else:
self._backoff()
else:
self._set_service_recovered()
except Exception as e:
logger.exception(e)
self._backoff()

def _set_service_recovered(self):
self.callback(self)
if not sent:
break

yield txn.complete(self.store)

# reset the backoff counter and then process the next transaction
self.backoff_counter = 1

except Exception:
logger.exception("Unexpected error running retries")

# we didn't manage to send all of the transactions before we got an error of
# some flavour: reschedule the next retry.
self._backoff()
67 changes: 40 additions & 27 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
an optional dependency. This does however limit the number of modifiable spans
at any point in the code to one. From here out references to `opentracing`
in the code snippets refer to the Synapses module.
Most methods provided in the module have a direct correlation to those provided
by opentracing. Refer to docs there for a more in-depth documentation on some of
the args and methods.
Tracing
-------
Expand All @@ -68,52 +71,62 @@
Tracing functions
-----------------
Functions can be easily traced using decorators. There is a decorator for
'normal' function and for functions which are actually deferreds. The name of
Functions can be easily traced using decorators. The name of
the function becomes the operation name for the span.
.. code-block:: python
from synapse.logging.opentracing import trace, trace_deferred
from synapse.logging.opentracing import trace
# Start a span using 'normal_function' as the operation name
# Start a span using 'interesting_function' as the operation name
@trace
def normal_function(*args, **kwargs):
def interesting_function(*args, **kwargs):
# Does all kinds of cool and expected things
return something_usual_and_useful
# Start a span using 'deferred_function' as the operation name
@trace_deferred
@defer.inlineCallbacks
def deferred_function(*args, **kwargs):
# We start
yield we_wait
# we finish
return something_usual_and_useful
Operation names can be explicitly set for functions by using
``trace_using_operation_name`` and
``trace_deferred_using_operation_name``
``trace_using_operation_name``
.. code-block:: python
from synapse.logging.opentracing import (
trace_using_operation_name,
trace_deferred_using_operation_name
)
from synapse.logging.opentracing import trace_using_operation_name
@trace_using_operation_name("A *much* better operation name")
def normal_function(*args, **kwargs):
def interesting_badly_named_function(*args, **kwargs):
# Does all kinds of cool and expected things
return something_usual_and_useful
@trace_deferred_using_operation_name("Another exciting operation name!")
@defer.inlineCallbacks
def deferred_function(*args, **kwargs):
# We start
yield we_wait
# we finish
return something_usual_and_useful
Setting Tags
------------
To set a tag on the active span do
.. code-block:: python
from synapse.logging.opentracing import set_tag
set_tag(tag_name, tag_value)
There's a convenient decorator to tag all the args of the method. It uses
inspection in order to use the formal parameter names prefixed with 'ARG_' as
tag names. It uses kwarg names as tag names without the prefix.
.. code-block:: python
from synapse.logging.opentracing import tag_args
@tag_args
def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"):
pass
set_fates("the story", "the end", "the act")
# This will have the following tags
# - ARG_clotho: "the story"
# - ARG_lachesis: "the end"
# - ARG_atropos: "the act"
# - father: "Zues"
# - mother: "Themis"
Contexts and carriers
---------------------
Expand Down
14 changes: 2 additions & 12 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1302,15 +1302,11 @@ def _delete_existing_rows_txn(cls, txn, events_and_contexts):
"event_reference_hashes",
"event_search",
"event_to_state_groups",
"guest_access",
"history_visibility",
"local_invites",
"room_names",
"state_events",
"rejections",
"redactions",
"room_memberships",
"topics",
):
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
Expand Down Expand Up @@ -1454,23 +1450,17 @@ def _update_metadata_tables_txn(

for event, _ in events_and_contexts:
if event.type == EventTypes.Name:
# Insert into the room_names and event_search tables.
# Insert into the event_search table.
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
# Insert into the topics table and event_search table.
# Insert into the event_search table.
self._store_room_topic_txn(txn, event)
elif event.type == EventTypes.Message:
# Insert into the event_search table.
self._store_room_message_txn(txn, event)
elif event.type == EventTypes.Redaction:
# Insert into the redactions table.
self._store_redaction(txn, event)
elif event.type == EventTypes.RoomHistoryVisibility:
# Insert into the event_search table.
self._store_history_visibility_txn(txn, event)
elif event.type == EventTypes.GuestAccess:
# Insert into the event_search table.
self._store_guest_access_txn(txn, event)

self._handle_event_relations(txn, event)

Expand Down
Loading

0 comments on commit 5426e13

Please sign in to comment.