Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Propagate opentracing contexts through EDUs (#5852)
Browse files Browse the repository at this point in the history
Propagate opentracing contexts through EDUs
Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
  • Loading branch information
JorikSchellekens authored Aug 22, 2019
1 parent 0b39fa5 commit 8767b63
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 94 deletions.
1 change: 1 addition & 0 deletions changelog.d/5852.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Pass opentracing contexts between servers when transmitting EDUs.
27 changes: 25 additions & 2 deletions docs/opentracing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ It is up to the remote server to decide what it does with the spans
it creates. This is called the sampling policy and it can be configured
through Jaeger's settings.

For OpenTracing concepts see
For OpenTracing concepts see
https://opentracing.io/docs/overview/what-is-tracing/.

For more information about Jaeger's implementation see
Expand Down Expand Up @@ -79,7 +79,7 @@ Homeserver whitelisting

The homeserver whitelist is configured using regular expressions. A list of regular
expressions can be given and their union will be compared when propagating any
spans contexts to another homeserver.
spans contexts to another homeserver.

Though it's mostly safe to send and receive span contexts to and from
untrusted users since span contexts are usually opaque ids it can lead to
Expand All @@ -92,6 +92,29 @@ two problems, namely:
but that doesn't prevent another server sending you baggage which will be logged
to OpenTracing's logs.

==========
EDU FORMAT
==========

EDUs can contain tracing data in their content. This is not specced but
it could be of interest for other homeservers.

EDU format (if you're using jaeger):

.. code-block:: json
{
"edu_type": "type",
"content": {
"org.matrix.opentracing_context": {
"uber-trace-id": "fe57cf3e65083289"
}
}
}
Though you don't have to use jaeger you must inject the span context into
`org.matrix.opentracing_context` using the opentracing `Format.TEXT_MAP` inject method.

==================
Configuring Jaeger
==================
Expand Down
15 changes: 8 additions & 7 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import log_kv, trace
from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
from synapse.logging.utils import log_function
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
Expand Down Expand Up @@ -811,12 +811,13 @@ def on_edu(self, edu_type, origin, content):
if not handler:
logger.warn("No handler registered for EDU type %s", edu_type)

try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
with start_active_span_from_edu(content, "handle_edu"):
try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)

def on_query(self, query_type, args):
handler = self.query_handlers.get(query_type)
Expand Down
170 changes: 97 additions & 73 deletions synapse/federation/sender/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,19 @@
# limitations under the License.
import logging

from canonicaljson import json

from twisted.internet import defer

from synapse.api.errors import HttpResponseException
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Transaction
from synapse.logging.opentracing import (
extract_text_map,
set_tag,
start_active_span_follows_from,
tags,
)
from synapse.util.metrics import measure_func

logger = logging.getLogger(__name__)
Expand All @@ -44,93 +52,109 @@ def __init__(self, hs):
@defer.inlineCallbacks
def send_new_transaction(self, destination, pending_pdus, pending_edus):

# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus
# Make a transaction-sending opentracing span. This span follows on from
# all the edus in that transaction. This needs to be done since there is
# no active span here, so if the edus were not received by the remote the
# span would have no causality and it would be forgotten.
# The span_contexts is a generator so that it won't be evaluated if
# opentracing is disabled. (Yay speed!)

success = True
span_contexts = (
extract_text_map(json.loads(edu.get_context())) for edu in pending_edus
)

logger.debug("TX [%s] _attempt_new_transaction", destination)
with start_active_span_follows_from("send_transaction", span_contexts):

txn_id = str(self._next_txn_id)
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus

logger.debug(
"TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
destination,
txn_id,
len(pdus),
len(edus),
)
success = True

transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self._server_name,
destination=destination,
pdus=pdus,
edus=edus,
)
logger.debug("TX [%s] _attempt_new_transaction", destination)

self._next_txn_id += 1
txn_id = str(self._next_txn_id)

logger.info(
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
destination,
txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
)
logger.debug(
"TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
destination,
txn_id,
len(pdus),
len(edus),
)

# Actually send the transaction

# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data

try:
response = yield self._transport_layer.send_transaction(
transaction, json_data_cb
transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self._server_name,
destination=destination,
pdus=pdus,
edus=edus,
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response

if e.code in (401, 404, 429) or 500 <= e.code:
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
raise e
self._next_txn_id += 1

logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
logger.info(
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
destination,
txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
)

if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
# Actually send the transaction

# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data

try:
response = yield self._transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response

if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response", destination, txn_id, code
)
raise e

logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)

if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
destination,
txn_id,
e_id,
r,
)
else:
for p in pdus:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
"TX [%s] {%s} Failed to send event %s",
destination,
txn_id,
e_id,
r,
p.event_id,
)
else:
for p in pdus:
logger.warn(
"TX [%s] {%s} Failed to send event %s",
destination,
txn_id,
p.event_id,
)
success = False
success = False

return success
set_tag(tags.ERROR, not success)
return success
3 changes: 3 additions & 0 deletions synapse/federation/units.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class Edu(JsonEncodedObject):

internal_keys = ["origin", "destination"]

def get_context(self):
return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")


class Transaction(JsonEncodedObject):
""" A transaction is a list of Pdus and Edus to be sent to a remote home
Expand Down
27 changes: 21 additions & 6 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,17 @@

import logging

from canonicaljson import json

from twisted.internet import defer

from synapse.api.errors import SynapseError
from synapse.logging.opentracing import (
get_active_span_text_map,
set_tag,
start_active_span,
whitelisted_homeserver,
)
from synapse.types import UserID, get_domain_from_id
from synapse.util.stringutils import random_string

Expand Down Expand Up @@ -100,14 +108,21 @@ def send_device_message(self, sender_user_id, message_type, messages):

message_id = random_string(16)

context = get_active_span_text_map()

remote_edu_contents = {}
for destination, messages in remote_messages.items():
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
}
with start_active_span("to_device_for_user"):
set_tag("destination", destination)
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
"org.matrix.opentracing_context": json.dumps(context)
if whitelisted_homeserver(destination)
else None,
}

stream_id = yield self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
Expand Down
26 changes: 26 additions & 0 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"):
``logging/opentracing.py`` has a ``whitelisted_homeserver`` method which takes
in a destination and compares it to the whitelist.
Most injection methods take a 'destination' arg. The context will only be injected
if the destination matches the whitelist or the destination is None.
=======
Gotchas
=======
Expand Down Expand Up @@ -576,6 +579,29 @@ def inject_active_span_text_map(carrier, destination, check_destination=True):
)


def get_active_span_text_map(destination=None):
"""
Gets a span context as a dict. This can be used instead of manually
injecting a span into an empty carrier.
Args:
destination (str): the name of the remote server.
Returns:
dict: the active span's context if opentracing is enabled, otherwise empty.
"""

if not opentracing or (destination and not whitelisted_homeserver(destination)):
return {}

carrier = {}
opentracing.tracer.inject(
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
)

return carrier


def active_span_context_as_string():
"""
Returns:
Expand Down
Loading

0 comments on commit 8767b63

Please sign in to comment.