Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 16b1f7f

Browse files
committed
Insert a thread_id when inserting new data.
1 parent 9dc5e22 commit 16b1f7f

File tree

5 files changed

+39
-19
lines changed

5 files changed

+39
-19
lines changed

synapse/push/bulk_push_rule_evaluator.py

+15-14
Original file line numberDiff line numberDiff line change
@@ -198,15 +198,15 @@ async def _get_power_levels_and_sender_level(
198198
return pl_event.content if pl_event else {}, sender_level
199199

200200
async def _get_mutual_relations(
201-
self, event: EventBase, rules: Iterable[Tuple[PushRule, bool]]
201+
self, parent_id: str, rules: Iterable[Tuple[PushRule, bool]]
202202
) -> Dict[str, Set[Tuple[str, str]]]:
203203
"""
204204
Fetch event metadata for events which related to the same event as the given event.
205205
206206
If the given event has no relation information, returns an empty dictionary.
207207
208208
Args:
209-
event_id: The event ID which is targeted by relations.
209+
parent_id: The event ID which is targeted by relations.
210210
rules: The push rules which will be processed for this event.
211211
212212
Returns:
@@ -220,12 +220,6 @@ async def _get_mutual_relations(
220220
if not self._relations_match_enabled:
221221
return {}
222222

223-
# If the event does not have a relation, then cannot have any mutual
224-
# relations.
225-
relation = relation_from_event(event)
226-
if not relation:
227-
return {}
228-
229223
# Pre-filter to figure out which relation types are interesting.
230224
rel_types = set()
231225
for rule, enabled in rules:
@@ -246,9 +240,7 @@ async def _get_mutual_relations(
246240
return {}
247241

248242
# If any valid rules were found, fetch the mutual relations.
249-
return await self.store.get_mutual_event_relations(
250-
relation.parent_id, rel_types
251-
)
243+
return await self.store.get_mutual_event_relations(parent_id, rel_types)
252244

253245
@measure_func("action_for_event_by_user")
254246
async def action_for_event_by_user(
@@ -281,9 +273,17 @@ async def action_for_event_by_user(
281273
sender_power_level,
282274
) = await self._get_power_levels_and_sender_level(event, context)
283275

284-
relations = await self._get_mutual_relations(
285-
event, itertools.chain(*rules_by_user.values())
286-
)
276+
relation = relation_from_event(event)
277+
# If the event does not have a relation, then cannot have any mutual
278+
# relations or thread ID.
279+
relations = {}
280+
thread_id = "main"
281+
if relation:
282+
relations = await self._get_mutual_relations(
283+
relation.parent_id, itertools.chain(*rules_by_user.values())
284+
)
285+
if relation.rel_type == RelationTypes.THREAD:
286+
thread_id = relation.parent_id
287287

288288
evaluator = PushRuleEvaluatorForEvent(
289289
event,
@@ -352,6 +352,7 @@ async def action_for_event_by_user(
352352
event.event_id,
353353
actions_by_user,
354354
count_as_unread,
355+
thread_id,
355356
)
356357

357358

synapse/storage/databases/main/event_push_actions.py

+19-3
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,7 @@ async def add_push_actions_to_staging(
759759
event_id: str,
760760
user_id_actions: Dict[str, Collection[Union[Mapping, str]]],
761761
count_as_unread: bool,
762+
thread_id: str,
762763
) -> None:
763764
"""Add the push actions for the event to the push action staging area.
764765
@@ -767,6 +768,7 @@ async def add_push_actions_to_staging(
767768
user_id_actions: A mapping of user_id to list of push actions, where
768769
an action can either be a string or dict.
769770
count_as_unread: Whether this event should increment unread counts.
771+
thread_id: The thread this event is parent of, if applicable.
770772
"""
771773
if not user_id_actions:
772774
return
@@ -775,7 +777,7 @@ async def add_push_actions_to_staging(
775777
# can be used to insert into the `event_push_actions_staging` table.
776778
def _gen_entry(
777779
user_id: str, actions: Collection[Union[Mapping, str]]
778-
) -> Tuple[str, str, str, int, int, int]:
780+
) -> Tuple[str, str, str, int, int, int, str]:
779781
is_highlight = 1 if _action_has_highlight(actions) else 0
780782
notif = 1 if "notify" in actions else 0
781783
return (
@@ -785,11 +787,20 @@ def _gen_entry(
785787
notif, # notif column
786788
is_highlight, # highlight column
787789
int(count_as_unread), # unread column
790+
thread_id, # thread_id column
788791
)
789792

790793
await self.db_pool.simple_insert_many(
791794
"event_push_actions_staging",
792-
keys=("event_id", "user_id", "actions", "notif", "highlight", "unread"),
795+
keys=(
796+
"event_id",
797+
"user_id",
798+
"actions",
799+
"notif",
800+
"highlight",
801+
"unread",
802+
"thread_id",
803+
),
793804
values=[
794805
_gen_entry(user_id, actions)
795806
for user_id, actions in user_id_actions.items()
@@ -1070,6 +1081,8 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
10701081
)
10711082

10721083
# Replace the previous summary with the new counts.
1084+
#
1085+
# TODO(threads): Upsert per-thread instead of setting them all to main.
10731086
self.db_pool.simple_upsert_txn(
10741087
txn,
10751088
table="event_push_summary",
@@ -1079,6 +1092,7 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
10791092
"unread_count": unread_count,
10801093
"stream_ordering": old_rotate_stream_ordering,
10811094
"last_receipt_stream_ordering": stream_ordering,
1095+
"thread_id": "main",
10821096
},
10831097
)
10841098

@@ -1227,17 +1241,19 @@ def _rotate_notifs_before_txn(
12271241

12281242
logger.info("Rotating notifications, handling %d rows", len(summaries))
12291243

1244+
# TODO(threads): Update on a per-thread basis.
12301245
self.db_pool.simple_upsert_many_txn(
12311246
txn,
12321247
table="event_push_summary",
12331248
key_names=("user_id", "room_id"),
12341249
key_values=[(user_id, room_id) for user_id, room_id in summaries],
1235-
value_names=("notif_count", "unread_count", "stream_ordering"),
1250+
value_names=("notif_count", "unread_count", "stream_ordering", "thread_id"),
12361251
value_values=[
12371252
(
12381253
summary.notif_count,
12391254
summary.unread_count,
12401255
summary.stream_ordering,
1256+
"main",
12411257
)
12421258
for summary in summaries.values()
12431259
],

synapse/storage/databases/main/events.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -2192,9 +2192,9 @@ def _set_push_actions_for_event_and_users_txn(
21922192
sql = """
21932193
INSERT INTO event_push_actions (
21942194
room_id, event_id, user_id, actions, stream_ordering,
2195-
topological_ordering, notif, highlight, unread
2195+
topological_ordering, notif, highlight, unread, thread_id
21962196
)
2197-
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread
2197+
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread, thread_id
21982198
FROM event_push_actions_staging
21992199
WHERE event_id = ?
22002200
"""

synapse/storage/databases/main/receipts.py

+2
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,7 @@ def _insert_linearized_receipt_txn(
694694
"stream_id": stream_id,
695695
"event_id": event_id,
696696
"data": json_encoder.encode(data),
697+
"thread_id": None,
697698
},
698699
# receipts_linearized has a unique constraint on
699700
# (user_id, room_id, receipt_type), so no need to lock
@@ -841,6 +842,7 @@ def _insert_graph_receipt_txn(
841842
values={
842843
"event_ids": json_encoder.encode(event_ids),
843844
"data": json_encoder.encode(data),
845+
"thread_id": None,
844846
},
845847
# receipts_graph has a unique constraint on
846848
# (user_id, room_id, receipt_type), so no need to lock

tests/replication/slave/storage/test_events.py

+1
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ def build_event(
404404
event.event_id,
405405
{user_id: actions for user_id, actions in push_actions},
406406
False,
407+
"main",
407408
)
408409
)
409410
return event, context

0 commit comments

Comments
 (0)