Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Support stable identifiers for MSC3440: Threading #12151

Merged
merged 13 commits into from
Mar 10, 2022
Merged
1 change: 1 addition & 0 deletions changelog.d/12151.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support the stable identifiers from [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440): threads.
4 changes: 3 additions & 1 deletion synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ class RelationTypes:
ANNOTATION: Final = "m.annotation"
REPLACE: Final = "m.replace"
REFERENCE: Final = "m.reference"
THREAD: Final = "io.element.thread"
THREAD: Final = "m.thread"
# TODO Remove this in Synapse >= v1.57.0.
UNSTABLE_THREAD: Final = "io.element.thread"


class LimitBlockingTypes:
Expand Down
28 changes: 13 additions & 15 deletions synapse/api/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@
"org.matrix.labels": {"type": "array", "items": {"type": "string"}},
"org.matrix.not_labels": {"type": "array", "items": {"type": "string"}},
# MSC3440, filtering by event relations.
"related_by_senders": {"type": "array", "items": {"type": "string"}},
"io.element.relation_senders": {"type": "array", "items": {"type": "string"}},
"related_by_rel_types": {"type": "array", "items": {"type": "string"}},
"io.element.relation_types": {"type": "array", "items": {"type": "string"}},
},
}
Expand Down Expand Up @@ -318,19 +320,15 @@ def __init__(self, hs: "HomeServer", filter_json: JsonDict):
self.labels = filter_json.get("org.matrix.labels", None)
self.not_labels = filter_json.get("org.matrix.not_labels", [])

# Ideally these would be rejected at the endpoint if they were provided
# and not supported, but that would involve modifying the JSON schema
# based on the homeserver configuration.
if hs.config.experimental.msc3440_enabled:
self.relation_senders = self.filter_json.get(
"io.element.relation_senders", None
)
self.relation_types = self.filter_json.get(
"io.element.relation_types", None
)
else:
self.relation_senders = None
self.relation_types = None
# Fallback to the unstable prefix if the stable version is not given.
self.related_by_senders = self.filter_json.get(
"related_by_senders",
self.filter_json.get("io.element.relation_senders", None),
)
self.related_by_rel_types = self.filter_json.get(
"related_by_rel_types",
self.filter_json.get("io.element.relation_types", None),
)

def filters_all_types(self) -> bool:
return "*" in self.not_types
Expand Down Expand Up @@ -461,7 +459,7 @@ async def _check_event_relations(
event_ids = [event.event_id for event in events if isinstance(event, EventBase)] # type: ignore[attr-defined]
event_ids_to_keep = set(
await self._store.events_have_relations(
event_ids, self.relation_senders, self.relation_types
event_ids, self.related_by_senders, self.related_by_rel_types
)
)

Expand All @@ -474,7 +472,7 @@ async def _check_event_relations(
async def filter(self, events: Iterable[FilterEvent]) -> List[FilterEvent]:
result = [event for event in events if self._check(event)]

if self.relation_senders or self.relation_types:
if self.related_by_senders or self.related_by_rel_types:
return await self._check_event_relations(result)

return result
Expand Down
2 changes: 0 additions & 2 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ class ExperimentalConfig(Config):
def read_config(self, config: JsonDict, **kwargs):
experimental = config.get("experimental_features") or {}

# MSC3440 (thread relation)
self.msc3440_enabled: bool = experimental.get("msc3440_enabled", False)
# MSC3666: including bundled relations in /search.
self.msc3666_enabled: bool = experimental.get("msc3666_enabled", False)

Expand Down
4 changes: 3 additions & 1 deletion synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,11 +515,13 @@ def _inject_bundled_aggregations(
thread.latest_event, serialized_latest_event, thread.latest_edit
)

serialized_aggregations[RelationTypes.THREAD] = {
thread_summary = {
"latest_event": serialized_latest_event,
"count": thread.count,
"current_user_participated": thread.current_user_participated,
}
serialized_aggregations[RelationTypes.THREAD] = thread_summary
serialized_aggregations[RelationTypes.UNSTABLE_THREAD] = thread_summary
clokep marked this conversation as resolved.
Show resolved Hide resolved

# Include the bundled aggregations in the event.
if serialized_aggregations:
Expand Down
5 changes: 4 additions & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,10 @@ async def _validate_event_relation(self, event: EventBase) -> None:
raise SynapseError(400, "Can't send same reaction twice")

# Don't attempt to start a thread if the parent event is a relation.
elif relation_type == RelationTypes.THREAD:
elif (
relation_type == RelationTypes.THREAD
or relation_type == RelationTypes.UNSTABLE_THREAD
):
if await self.store.event_includes_relation(relates_to):
raise SynapseError(
400, "Cannot start threads from an event with a relation"
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/client/versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def on_GET(self, request: Request) -> Tuple[int, JsonDict]:
# Adds support for jump to date endpoints (/timestamp_to_event) as per MSC3030
"org.matrix.msc3030": self.config.experimental.msc3030_enabled,
# Adds support for thread relations, per MSC3440.
"org.matrix.msc3440": self.config.experimental.msc3440_enabled,
"org.matrix.msc3440": True,
},
},
)
Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1811,7 +1811,10 @@ def _handle_event_relations(
if rel_type == RelationTypes.REPLACE:
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))

if rel_type == RelationTypes.THREAD:
if (
rel_type == RelationTypes.THREAD
or rel_type == RelationTypes.UNSTABLE_THREAD
):
txn.call_after(
self.store.get_thread_summary.invalidate, (parent_id, event.room_id)
)
Expand Down
65 changes: 24 additions & 41 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,14 @@
from synapse.api.constants import RelationTypes
from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
from synapse.storage.databases.main.stream import generate_pagination_where_clause
from synapse.storage.engines import PostgresEngine
from synapse.storage.relations import AggregationPaginationToken, PaginationChunk
from synapse.types import JsonDict, RoomStreamToken, StreamToken
from synapse.util.caches.descriptors import cached, cachedList

if TYPE_CHECKING:
from synapse.server import HomeServer
from synapse.storage.databases.main import DataStore

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -81,16 +75,6 @@ def __bool__(self) -> bool:


class RelationsWorkerStore(SQLBaseStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)

self._msc3440_enabled = hs.config.experimental.msc3440_enabled

@cached(tree=True)
async def get_relations_for_event(
self,
Expand Down Expand Up @@ -514,14 +498,15 @@ def _get_thread_summaries_txn(
AND parent.room_id = child.room_id
WHERE
%s
AND relation_type = ?
AND (relation_type = ? OR relation_type = ?)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (and the similar changes) mean that having both m.thread and io.element.thread in a thread would work...for now (until support for io.element.thread is dropped).

I prefer doing it this way though since it does not double the queries to get stable and unstable threads. Thoughts on whether this is acceptable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above is still true, but only if you have the experimental config flag enabled.

ORDER BY child.topological_ordering DESC, child.stream_ordering DESC
"""

clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", event_ids
)
args.append(RelationTypes.THREAD)
args.append(RelationTypes.UNSTABLE_THREAD)

txn.execute(sql % (clause,), args)
latest_event_ids = {}
Expand All @@ -543,7 +528,7 @@ def _get_thread_summaries_txn(
AND parent.room_id = child.room_id
WHERE
%s
AND relation_type = ?
AND (relation_type = ? OR relation_type = ?)
GROUP BY parent.event_id
"""

Expand All @@ -553,6 +538,7 @@ def _get_thread_summaries_txn(
txn.database_engine, "relates_to_id", latest_event_ids.keys()
)
args.append(RelationTypes.THREAD)
args.append(RelationTypes.UNSTABLE_THREAD)

txn.execute(sql % (clause,), args)
counts = dict(cast(List[Tuple[str, int]], txn.fetchall()))
Expand Down Expand Up @@ -617,14 +603,14 @@ def _get_thread_summary_txn(txn: LoggingTransaction) -> Set[str]:
AND parent.room_id = child.room_id
WHERE
%s
AND relation_type = ?
AND (relation_type = ? OR relation_type = ?)
AND child.sender = ?
"""

clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", event_ids
)
args.extend((RelationTypes.THREAD, user_id))
args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD, user_id))

txn.execute(sql % (clause,), args)
return {row[0] for row in txn.fetchall()}
Expand Down Expand Up @@ -830,26 +816,23 @@ async def get_bundled_aggregations(
results.setdefault(event_id, BundledAggregations()).replace = edit

# Fetch thread summaries.
if self._msc3440_enabled:
summaries = await self._get_thread_summaries(seen_event_ids)
# Only fetch participated for a limited selection based on what had
# summaries.
participated = await self._get_threads_participated(
summaries.keys(), user_id
)
for event_id, summary in summaries.items():
if summary:
thread_count, latest_thread_event, edit = summary
results.setdefault(
event_id, BundledAggregations()
).thread = _ThreadAggregation(
latest_event=latest_thread_event,
latest_edit=edit,
count=thread_count,
# If there's a thread summary it must also exist in the
# participated dictionary.
current_user_participated=participated[event_id],
)
summaries = await self._get_thread_summaries(seen_event_ids)
# Only fetch participated for a limited selection based on what had
# summaries.
participated = await self._get_threads_participated(summaries.keys(), user_id)
for event_id, summary in summaries.items():
if summary:
thread_count, latest_thread_event, edit = summary
results.setdefault(
event_id, BundledAggregations()
).thread = _ThreadAggregation(
latest_event=latest_thread_event,
latest_edit=edit,
count=thread_count,
# If there's a thread summary it must also exist in the
# participated dictionary.
current_user_participated=participated[event_id],
)

return results

Expand Down
18 changes: 10 additions & 8 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,21 +325,23 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
args.extend(event_filter.labels)

# Filter on relation_senders / relation types from the joined tables.
if event_filter.relation_senders:
if event_filter.related_by_senders:
clauses.append(
"(%s)"
% " OR ".join(
"related_event.sender = ?" for _ in event_filter.relation_senders
"related_event.sender = ?" for _ in event_filter.related_by_senders
)
)
args.extend(event_filter.relation_senders)
args.extend(event_filter.related_by_senders)

if event_filter.relation_types:
if event_filter.related_by_rel_types:
clauses.append(
"(%s)"
% " OR ".join("relation_type = ?" for _ in event_filter.relation_types)
% " OR ".join(
"relation_type = ?" for _ in event_filter.related_by_rel_types
)
)
args.extend(event_filter.relation_types)
args.extend(event_filter.related_by_rel_types)

return " AND ".join(clauses), args

Expand Down Expand Up @@ -1203,15 +1205,15 @@ def _paginate_room_events_txn(
# If there is a filter on relation_senders and relation_types join to the
# relations table.
if event_filter and (
event_filter.relation_senders or event_filter.relation_types
event_filter.related_by_senders or event_filter.related_by_rel_types
):
# Filtering by relations could cause the same event to appear multiple
# times (since there's no limit on the number of relations to an event).
needs_distinct = True
join_clause += """
LEFT JOIN event_relations AS relation ON (event.event_id = relation.relates_to_id)
"""
if event_filter.relation_senders:
if event_filter.related_by_senders:
join_clause += """
LEFT JOIN events AS related_event ON (relation.event_id = related_event.event_id)
"""
Expand Down
8 changes: 2 additions & 6 deletions tests/rest/client/test_relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,7 @@ def test_aggregation_must_be_annotation(self) -> None:
)
self.assertEqual(400, channel.code, channel.json_body)

@unittest.override_config(
{"experimental_features": {"msc3440_enabled": True, "msc3666_enabled": True}}
)
@unittest.override_config({"experimental_features": {"msc3666_enabled": True}})
def test_bundled_aggregations(self) -> None:
"""
Test that annotations, references, and threads get correctly bundled.
Expand Down Expand Up @@ -597,6 +595,7 @@ def assert_bundle(event_json: JsonDict) -> None:
RelationTypes.ANNOTATION,
RelationTypes.REFERENCE,
RelationTypes.THREAD,
RelationTypes.UNSTABLE_THREAD,
),
)

Expand Down Expand Up @@ -758,7 +757,6 @@ def test_aggregation_get_event_for_thread(self) -> None:
},
)

@unittest.override_config({"experimental_features": {"msc3440_enabled": True}})
def test_ignore_invalid_room(self) -> None:
"""Test that we ignore invalid relations over federation."""
# Create another room and send a message in it.
Expand Down Expand Up @@ -1065,7 +1063,6 @@ def test_edit_reply(self) -> None:
{"event_id": edit_event_id, "sender": self.user_id}, m_replace_dict
)

@unittest.override_config({"experimental_features": {"msc3440_enabled": True}})
def test_edit_thread(self) -> None:
"""Test that editing a thread works."""

Expand Down Expand Up @@ -1383,7 +1380,6 @@ def test_redact_relation_annotation(self) -> None:
chunk = self._get_aggregations()
self.assertEqual(chunk, [{"type": "m.reaction", "key": "a", "count": 1}])

@unittest.override_config({"experimental_features": {"msc3440_enabled": True}})
def test_redact_relation_thread(self) -> None:
"""
Test that thread replies are properly handled after the thread reply redacted.
Expand Down
Loading