Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Link in both directions between persist_events spans
Browse files Browse the repository at this point in the history
  • Loading branch information
richvdh committed Jun 14, 2021
1 parent 0780047 commit f18d434
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 10 deletions.
20 changes: 18 additions & 2 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"):
import logging
import re
from functools import wraps
from typing import TYPE_CHECKING, Dict, List, Optional, Pattern, Type
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Pattern, Type

import attr

Expand Down Expand Up @@ -453,12 +453,28 @@ def start_active_span(
)


def start_active_span_follows_from(operation_name, contexts):
def start_active_span_follows_from(
operation_name: str, contexts: Collection, inherit_force_tracing=False
):
"""Starts an active opentracing span, with additional references to previous spans
Args:
operation_name: name of the operation represented by the new span
contexts: the previous spans to inherit from
inherit_force_tracing: if set, and any of the previous contexts have had tracing
forced, the new span will also have tracing forced.
"""
if opentracing is None:
return noop_context_manager()

references = [opentracing.follows_from(context) for context in contexts]
scope = start_active_span(operation_name, references=references)

if inherit_force_tracing and any(
is_context_forced_tracing(ctx) for ctx in contexts
):
force_tracing(scope.span)

return scope


Expand Down
42 changes: 34 additions & 8 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
from collections import deque
from typing import (
Any,
Awaitable,
Callable,
Collection,
Expand Down Expand Up @@ -104,12 +105,17 @@
)


@attr.s(auto_attribs=True, frozen=True, slots=True)
@attr.s(auto_attribs=True, slots=True)
class _EventPersistQueueItem:
events_and_contexts: List[Tuple[EventBase, EventContext]]
backfilled: bool
deferred: ObservableDeferred
opentracing_context: Optional[object]

parent_opentracing_span_contexts: List = []
"""A list of opentracing spans waiting for this batch"""

opentracing_span_context: Any = None
"""The opentracing span under which the persistence actually happened"""


_PersistResult = TypeVar("_PersistResult")
Expand Down Expand Up @@ -164,20 +170,36 @@ async def add_to_queue(
end_item = queue[-1]
else:
# need to make a new queue item
span = opentracing.active_span()
deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)

end_item = _EventPersistQueueItem(
events_and_contexts=[],
backfilled=backfilled,
deferred=deferred,
opentracing_context=span.context if span else None,
)
queue.append(end_item)

# add our events to the queue item
end_item.events_and_contexts.extend(events_and_contexts)

# also add our active opentracing span to the item so that we get a link back
span = opentracing.active_span()
if span:
end_item.parent_opentracing_span_contexts.append(span.context)

# start a processor for the queue, if there isn't one already
self._handle_queue(room_id)
return await make_deferred_yieldable(end_item.deferred.observe())

# wait for the queue item to complete
res = await make_deferred_yieldable(end_item.deferred.observe())

# add another opentracing span which links to the persist trace.
with opentracing.start_active_span_follows_from(
"persist_event_batch_complete", (end_item.opentracing_span_context,)
):
pass

return res

def _handle_queue(self, room_id):
"""Attempts to handle the queue for a room if not already being handled.
Expand All @@ -204,10 +226,14 @@ async def handle_queue_loop():
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
with opentracing.start_active_span(
with opentracing.start_active_span_follows_from(
"persist_event_batch",
child_of=item.opentracing_context,
):
item.parent_opentracing_span_contexts,
inherit_force_tracing=True,
) as scope:
if scope:
item.opentracing_span_context = scope.span.context

ret = await self._per_item_callback(
item.events_and_contexts, item.backfilled
)
Expand Down

0 comments on commit f18d434

Please sign in to comment.