Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve logging and opentracing for to-device message handling #14598

Merged
merged 9 commits into from
Dec 6, 2022
1 change: 1 addition & 0 deletions changelog.d/14598.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve opentracing and logging for to-device message handling.
3 changes: 3 additions & 0 deletions synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ class EventContentFields:
# The authorising user for joining a restricted room.
AUTHORISING_USER: Final = "join_authorised_via_users_server"

# an unspecced field added to to-device messages to identify them uniquely-ish
TO_DEVICE_MSGID: Final = "org.matrix.msgid"


class RoomTypes:
"""Understood values of the room_type field of m.room.create events."""
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]
if not message_id:
continue

set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)

edus = [
Edu(
Expand Down
3 changes: 0 additions & 3 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,9 +578,6 @@ async def _get_to_device_messages(
device_id,
), messages in recipient_device_to_messages.items():
for message_json in messages:
# Remove 'message_id' from the to-device message, as it's an internal ID
message_json.pop("message_id", None)

message_payload.append(
{
"to_user_id": user_id,
Expand Down
36 changes: 22 additions & 14 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import logging
from typing import TYPE_CHECKING, Any, Dict

from synapse.api.constants import EduTypes, ToDeviceEventTypes
from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
from synapse.api.errors import SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
Expand Down Expand Up @@ -216,14 +216,24 @@ async def send_device_message(
"""
sender_user_id = requester.user.to_string()

message_id = random_string(16)
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)

log_kv({"number_of_to_device_messages": len(messages)})
set_tag("sender", sender_user_id)
set_tag(SynapseTags.TO_DEVICE_TYPE, message_type)
set_tag(SynapseTags.TO_DEVICE_SENDER, sender_user_id)
local_messages = {}
remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
for user_id, by_device in messages.items():
# add an opentracing log entry for each message
for device_id, message_content in by_device.items():
log_kv(
{
"event": "send_to_device_message",
"user_id": user_id,
"device_id": device_id,
EventContentFields.TO_DEVICE_MSGID: message_content.get(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we can't search by message ID?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. The message ID is logged further down the stack, when we store the message in the database (either as store_outgoing_to_device_message, or in add_messages_from_remote_to_device_inbox).

EventContentFields.TO_DEVICE_MSGID
),
}
)

# Ratelimit local cross-user key requests by the sending device.
if (
message_type == ToDeviceEventTypes.RoomKeyRequest
Expand All @@ -233,6 +243,7 @@ async def send_device_message(
requester, (sender_user_id, requester.device_id)
)
if not allowed:
log_kv({"message": f"dropping key requests to {user_id}"})
logger.info(
"Dropping room_key_request from %s to %s due to rate limit",
sender_user_id,
Expand All @@ -247,18 +258,11 @@ async def send_device_message(
"content": message_content,
"type": message_type,
"sender": sender_user_id,
"message_id": message_id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do seem to use this over federation?

message_id = content["message_id"]

Or am I misreading the code? (It looks like there are more places we use message_id in general?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do seem to use this over federation?

yes, but this code is for handling local user -> local user DMs (see the condition at line 244).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaaaaaaaaaaaaaaaaaaah

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For messages being sent over federation, messages are grouped into EDUs by destination server. We now generate a new message id for each EDU at line 268:

message_id = random_string(16)

}
for device_id, message_content in by_device.items()
}
if messages_by_device:
local_messages[user_id] = messages_by_device
log_kv(
{
"user_id": user_id,
"device_id": list(messages_by_device),
}
)
else:
destination = get_domain_from_id(user_id)
remote_messages.setdefault(destination, {})[user_id] = by_device
Expand All @@ -267,7 +271,11 @@ async def send_device_message(

remote_edu_contents = {}
for destination, messages in remote_messages.items():
log_kv({"destination": destination})
# The EDU contains a "message_id" property which is used for
# idempotence. Make up a random one.
message_id = random_string(16)
log_kv({"destination": destination, "message_id": message_id})

remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
Expand Down
26 changes: 19 additions & 7 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,20 @@
import attr
from prometheus_client import Counter

from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.handlers.relations import BundledAggregations
from synapse.logging.context import current_context
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
from synapse.logging.opentracing import (
SynapseTags,
log_kv,
set_tag,
start_active_span,
trace,
)
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
Expand Down Expand Up @@ -1584,6 +1590,7 @@ async def _generate_sync_entry_for_device_list(
else:
return DeviceListUpdates()

@trace
async def _generate_sync_entry_for_to_device(
self, sync_result_builder: "SyncResultBuilder"
) -> None:
Expand All @@ -1603,11 +1610,16 @@ async def _generate_sync_entry_for_to_device(
)

for message in messages:
# We pop here as we shouldn't be sending the message ID down
# `/sync`
message_id = message.pop("message_id", None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does mean that we'll send down message_id for "old" to-device messages, but I guess that those will drain quickly and so is fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a detail I'm brushing under the carpet.

if message_id:
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
log_kv(
{
"event": "to_device_message",
"sender": message["sender"],
"type": message["type"],
EventContentFields.TO_DEVICE_MSGID: message["content"].get(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this supposed to be SynapseTags.TO_DEVICE_MSGID instead of EventContentFields.TO_DEVICE_MSGID?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really, no. Note that this is being used in log_kv, which adds entries to the "logs" section of the span:

image

The "logs" are not searchable, so are basically free text. I figured it made more sense to make the log fields match the fields on the wire.

SynapseTags.* are intended for use with set_tag, which adds entries to the "tags" section of the span:

image

Since these are searchable (that's the whole point of them), it seemed to make sense to use tags that were namespaced with a to_device prefix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, thanks for the explanation!

EventContentFields.TO_DEVICE_MSGID
),
}
)

logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
Expand Down
11 changes: 9 additions & 2 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,15 @@ def report_span(self, span: "opentracing.Span") -> None:


class SynapseTags:
# The message ID of any to_device message processed
TO_DEVICE_MESSAGE_ID = "to_device.message_id"
# The message ID of any to_device EDU processed
TO_DEVICE_EDU_ID = "to_device.edu_id"

# Details about to-device messages
TO_DEVICE_TYPE = "to_device.type"
TO_DEVICE_SENDER = "to_device.sender"
TO_DEVICE_RECIPIENT = "to_device.recipient"
TO_DEVICE_RECIPIENT_DEVICE = "to_device.recipient_device"
TO_DEVICE_MSGID = "to_device.msgid" # client-generated ID

# Whether the sync response has new data to be returned to the client.
SYNC_RESULT = "sync.new_data"
Expand Down
1 change: 0 additions & 1 deletion synapse/rest/client/sendtodevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def __init__(self, hs: "HomeServer"):
def on_PUT(
self, request: SynapseRequest, message_type: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("message_type", message_type)
set_tag("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self._put, request, message_type, txn_id
Expand Down
92 changes: 75 additions & 17 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,15 @@
cast,
)

from synapse.api.constants import EventContentFields
from synapse.logging import issue9533_logger
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.logging.opentracing import (
SynapseTags,
log_kv,
set_tag,
start_active_span,
trace,
)
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
Expand Down Expand Up @@ -397,6 +404,17 @@ def get_device_messages_txn(
(recipient_user_id, recipient_device_id), []
).append(message_dict)

# start a new span for each message, so that we can tag each separately
with start_active_span("get_to_device_message"):
set_tag(SynapseTags.TO_DEVICE_TYPE, message_dict["type"])
set_tag(SynapseTags.TO_DEVICE_SENDER, message_dict["sender"])
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, recipient_user_id)
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, recipient_device_id)
set_tag(
SynapseTags.TO_DEVICE_MSGID,
message_dict["content"].get(EventContentFields.TO_DEVICE_MSGID),
)

if limit is not None and rowcount == limit:
# We ended up bumping up against the message limit. There may be more messages
# to retrieve. Return what we have, as well as the last stream position that
Expand Down Expand Up @@ -678,12 +696,35 @@ def add_messages_txn(
],
)

if remote_messages_by_destination:
issue9533_logger.debug(
"Queued outgoing to-device messages with stream_id %i for %s",
stream_id,
list(remote_messages_by_destination.keys()),
)
for destination, edu in remote_messages_by_destination.items():
if issue9533_logger.isEnabledFor(logging.DEBUG):
issue9533_logger.debug(
"Queued outgoing to-device messages with "
"stream_id %i, EDU message_id %s, type %s for %s: %s",
stream_id,
edu["message_id"],
edu["type"],
destination,
[
f"{user_id}/{device_id} (msgid "
f"{msg.get(EventContentFields.TO_DEVICE_MSGID)})"
for (user_id, messages_by_device) in edu["messages"].items()
for (device_id, msg) in messages_by_device.items()
],
)

for (user_id, messages_by_device) in edu["messages"].items():
for (device_id, msg) in messages_by_device.items():
with start_active_span("store_outgoing_to_device_message"):
set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["sender"])
set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["message_id"])
set_tag(SynapseTags.TO_DEVICE_TYPE, edu["type"])
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
set_tag(
SynapseTags.TO_DEVICE_MSGID,
msg.get(EventContentFields.TO_DEVICE_MSGID),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are quite a lot of tags. Don't suppose you have any insight into if this is expensive or not? I don't really know enough about elasticsearch to know if this is absolutely fine or if we need to be careful.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hrm, well, good question.

There's a few things to think about here.

Firstly, load on the synapse side.

  • If the active span isn't being sampled, set_tag is basically a no-op. I don't love that we'll be making all these function calls, but hopefully it's not too bad.
  • If the span is being sampled, making the tag is relatively inexpensive. All the tags are aggregated and sent over to the server once the span completes.

On the server side, I don't really know, but my understanding is that each tag is entered into an index which references a "document", where the document is a single span (which then holds references to the trace it contains). So... the index entry is just a (key, value) pair where the value is a uuid. In other words, no, I don't think adding extra tags is expensive on the server side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self._clock.time_msec()
Expand Down Expand Up @@ -801,7 +842,19 @@ def _add_messages_to_local_device_inbox_txn(
# Only insert into the local inbox if the device exists on
# this server
device_id = row["device_id"]
message_json = json_encoder.encode(messages_by_device[device_id])

with start_active_span("serialise_to_device_message"):
msg = messages_by_device[device_id]
set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])
set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"])
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
set_tag(
SynapseTags.TO_DEVICE_MSGID,
msg["content"].get(EventContentFields.TO_DEVICE_MSGID),
)
message_json = json_encoder.encode(msg)

messages_json_for_user[device_id] = message_json

if messages_json_for_user:
Expand All @@ -821,15 +874,20 @@ def _add_messages_to_local_device_inbox_txn(
],
)

issue9533_logger.debug(
"Stored to-device messages with stream_id %i for %s",
stream_id,
[
(user_id, device_id)
for (user_id, messages_by_device) in local_by_user_then_device.items()
for device_id in messages_by_device.keys()
],
)
if issue9533_logger.isEnabledFor(logging.DEBUG):
issue9533_logger.debug(
"Stored to-device messages with stream_id %i: %s",
stream_id,
[
f"{user_id}/{device_id} (msgid "
f"{msg['content'].get(EventContentFields.TO_DEVICE_MSGID)})"
for (
user_id,
messages_by_device,
) in messages_by_user_then_device.items()
for (device_id, msg) in messages_by_device.items()
],
)


class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
Expand Down
7 changes: 6 additions & 1 deletion tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,12 @@ def test_application_services_receive_bursts_of_to_device(self):
fake_device_ids = [f"device_{num}" for num in range(number_of_messages - 1)]
messages = {
self.exclusive_as_user: {
device_id: to_device_message_content for device_id in fake_device_ids
device_id: {
"type": "test_to_device_message",
"sender": "@some:sender",
"content": to_device_message_content,
}
for device_id in fake_device_ids
}
}

Expand Down