Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Update event push action and receipt tables to support threads. #13753

Merged
merged 9 commits into from
Sep 14, 2022
1 change: 1 addition & 0 deletions changelog.d/13753.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prepatory work for storing thread IDs for notifications and receipts.
29 changes: 15 additions & 14 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,15 @@ async def _get_power_levels_and_sender_level(
return pl_event.content if pl_event else {}, sender_level

async def _get_mutual_relations(
self, event: EventBase, rules: Iterable[Tuple[PushRule, bool]]
self, parent_id: str, rules: Iterable[Tuple[PushRule, bool]]
) -> Dict[str, Set[Tuple[str, str]]]:
"""
Fetch event metadata for events which related to the same event as the given event.

If the given event has no relation information, returns an empty dictionary.

Args:
event_id: The event ID which is targeted by relations.
parent_id: The event ID which is targeted by relations.
rules: The push rules which will be processed for this event.

Returns:
Expand All @@ -220,12 +220,6 @@ async def _get_mutual_relations(
if not self._relations_match_enabled:
return {}

# If the event does not have a relation, then cannot have any mutual
# relations.
relation = relation_from_event(event)
if not relation:
return {}

# Pre-filter to figure out which relation types are interesting.
rel_types = set()
for rule, enabled in rules:
Expand All @@ -246,9 +240,7 @@ async def _get_mutual_relations(
return {}

# If any valid rules were found, fetch the mutual relations.
return await self.store.get_mutual_event_relations(
relation.parent_id, rel_types
)
return await self.store.get_mutual_event_relations(parent_id, rel_types)

@measure_func("action_for_event_by_user")
async def action_for_event_by_user(
Expand Down Expand Up @@ -281,9 +273,17 @@ async def action_for_event_by_user(
sender_power_level,
) = await self._get_power_levels_and_sender_level(event, context)

relations = await self._get_mutual_relations(
event, itertools.chain(*rules_by_user.values())
)
relation = relation_from_event(event)
# If the event does not have a relation, then cannot have any mutual
# relations or thread ID.
relations = {}
thread_id = "main"
if relation:
relations = await self._get_mutual_relations(
relation.parent_id, itertools.chain(*rules_by_user.values())
)
if relation.rel_type == RelationTypes.THREAD:
thread_id = relation.parent_id

evaluator = PushRuleEvaluatorForEvent(
event,
Expand Down Expand Up @@ -352,6 +352,7 @@ async def action_for_event_by_user(
event.event_id,
actions_by_user,
count_as_unread,
thread_id,
)


Expand Down
121 changes: 118 additions & 3 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
)
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached

Expand Down Expand Up @@ -232,6 +233,104 @@ def __init__(
replaces_index="event_push_summary_user_rm",
)

self.db_pool.updates.register_background_index_update(
"event_push_summary_unique_index2",
index_name="event_push_summary_unique_index2",
table="event_push_summary",
columns=["user_id", "room_id", "thread_id"],
unique=True,
)

self.db_pool.updates.register_background_update_handler(
"event_push_backfill_thread_id",
self._background_backfill_thread_id,
)

async def _background_backfill_thread_id(
self, progress: JsonDict, batch_size: int
) -> int:
"""
Fill in the thread_id field for event_push_actions and event_push_summary.

This is preparatory so that it can be made non-nullable in the future.

Because all current (null) data is done in an unthreaded manner this
simply assumes it is on the "main" timeline. Since event_push_actions
are periodically cleared it is not possible to correctly re-calculate
the thread_id.
"""
event_push_actions_done = progress.get("event_push_actions_done", False)
clokep marked this conversation as resolved.
Show resolved Hide resolved

def add_thread_id_txn(
txn: LoggingTransaction, table_name: str, start_stream_ordering: int
) -> int:
sql = f"""
SELECT stream_ordering
FROM {table_name}
WHERE
thread_id IS NULL
AND stream_ordering > ?
ORDER BY stream_ordering
LIMIT ?
"""
txn.execute(sql, (start_stream_ordering, batch_size))

# No more rows to process.
rows = txn.fetchall()
if not rows:
progress[f"{table_name}_done"] = True
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
return 0

# Update the thread ID for any of those rows.
max_stream_ordering = rows[-1][0]

sql = f"""
UPDATE {table_name}
SET thread_id = 'main'
WHERE stream_ordering <= ? AND thread_id IS NULL
"""
txn.execute(sql, (max_stream_ordering,))

# Update progress.
processed_rows = txn.rowcount
progress[f"max_{table_name}_stream_ordering"] = max_stream_ordering
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)

return processed_rows

# First update the event_push_actions table, then the event_push_summary table.
#
# Note that the event_push_actions_staging table is ignored since it is
# assumed that items in that table will only exist for a short period of
# time.
if not event_push_actions_done:
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
"event_push_actions",
progress.get("max_event_push_actions_stream_ordering", 0),
)
else:
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
"event_push_summary",
progress.get("max_event_push_summary_stream_ordering", 0),
)

# Only done after the event_push_summary table is done.
if not result:
await self.db_pool.updates._end_background_update(
"event_push_backfill_thread_id"
)

return result

@cached(tree=True, max_entries=5000)
async def get_unread_event_push_actions_by_room_for_user(
self,
Expand Down Expand Up @@ -670,6 +769,7 @@ async def add_push_actions_to_staging(
event_id: str,
user_id_actions: Dict[str, Collection[Union[Mapping, str]]],
count_as_unread: bool,
thread_id: str,
) -> None:
"""Add the push actions for the event to the push action staging area.

Expand All @@ -678,6 +778,7 @@ async def add_push_actions_to_staging(
user_id_actions: A mapping of user_id to list of push actions, where
an action can either be a string or dict.
count_as_unread: Whether this event should increment unread counts.
thread_id: The thread this event is parent of, if applicable.
"""
if not user_id_actions:
return
Expand All @@ -686,7 +787,7 @@ async def add_push_actions_to_staging(
# can be used to insert into the `event_push_actions_staging` table.
def _gen_entry(
user_id: str, actions: Collection[Union[Mapping, str]]
) -> Tuple[str, str, str, int, int, int]:
) -> Tuple[str, str, str, int, int, int, str]:
is_highlight = 1 if _action_has_highlight(actions) else 0
notif = 1 if "notify" in actions else 0
return (
Expand All @@ -696,11 +797,20 @@ def _gen_entry(
notif, # notif column
is_highlight, # highlight column
int(count_as_unread), # unread column
thread_id, # thread_id column
)

await self.db_pool.simple_insert_many(
"event_push_actions_staging",
keys=("event_id", "user_id", "actions", "notif", "highlight", "unread"),
keys=(
"event_id",
"user_id",
"actions",
"notif",
"highlight",
"unread",
"thread_id",
),
values=[
_gen_entry(user_id, actions)
for user_id, actions in user_id_actions.items()
Expand Down Expand Up @@ -981,6 +1091,8 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
)

# Replace the previous summary with the new counts.
#
# TODO(threads): Upsert per-thread instead of setting them all to main.
self.db_pool.simple_upsert_txn(
txn,
table="event_push_summary",
Expand All @@ -990,6 +1102,7 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
"unread_count": unread_count,
"stream_ordering": old_rotate_stream_ordering,
"last_receipt_stream_ordering": stream_ordering,
"thread_id": "main",
},
)

Expand Down Expand Up @@ -1138,17 +1251,19 @@ def _rotate_notifs_before_txn(

logger.info("Rotating notifications, handling %d rows", len(summaries))

# TODO(threads): Update on a per-thread basis.
self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
key_names=("user_id", "room_id"),
key_values=[(user_id, room_id) for user_id, room_id in summaries],
value_names=("notif_count", "unread_count", "stream_ordering"),
value_names=("notif_count", "unread_count", "stream_ordering", "thread_id"),
value_values=[
(
summary.notif_count,
summary.unread_count,
summary.stream_ordering,
"main",
)
for summary in summaries.values()
],
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2192,9 +2192,9 @@ def _set_push_actions_for_event_and_users_txn(
sql = """
INSERT INTO event_push_actions (
room_id, event_id, user_id, actions, stream_ordering,
topological_ordering, notif, highlight, unread
topological_ordering, notif, highlight, unread, thread_id
)
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread, thread_id
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should be providing the thread_id like we do the room_id? (I.e. not storing it in event_push_actions_staging and only storing it in event_push_actions)

FROM event_push_actions_staging
WHERE event_id = ?
"""
Expand Down
20 changes: 20 additions & 0 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,24 @@ def __init__(
prefilled_cache=receipts_stream_prefill,
)

self.db_pool.updates.register_background_index_update(
"receipts_linearized_unique_index",
index_name="receipts_linearized_unique_index",
table="receipts_linearized",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)

self.db_pool.updates.register_background_index_update(
"receipts_graph_unique_index",
index_name="receipts_graph_unique_index",
table="receipts_graph",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)

def get_max_receipt_stream_id(self) -> int:
"""Get the current max stream ID for receipts stream"""
return self._receipts_id_gen.get_current_token()
Expand Down Expand Up @@ -677,6 +695,7 @@ def _insert_linearized_receipt_txn(
"event_id": event_id,
"event_stream_ordering": stream_ordering,
"data": json_encoder.encode(data),
"thread_id": None,
},
# receipts_linearized has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
Expand Down Expand Up @@ -824,6 +843,7 @@ def _insert_graph_receipt_txn(
values={
"event_ids": json_encoder.encode(event_ids),
"data": json_encoder.encode(data),
"thread_id": None,
},
# receipts_graph has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
Expand Down
6 changes: 5 additions & 1 deletion synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

SCHEMA_VERSION = 72 # remember to update the list below when updating
SCHEMA_VERSION = 73 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema

This should be incremented whenever the codebase changes its requirements on the
Expand Down Expand Up @@ -77,6 +77,10 @@
- Tables related to groups are dropped.
- Unused column application_services_state.last_txn is dropped
- Cache invalidation stream id sequence now begins at 2 to match code expectation.

Changes in SCHEMA_VERSION = 73;
- thread_id column is added to event_push_actions, event_push_actions_staging
event_push_summary, receipts_linearized, and receipts_graph.
"""


Expand Down
30 changes: 30 additions & 0 deletions synapse/storage/schema/main/delta/72/06thread_notifications.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Add a nullable column for thread ID to the event push actions tables; this
-- will be filled in with a default value for any previously existing rows.
--
-- After migration this can be made non-nullable.

ALTER TABLE event_push_actions_staging ADD COLUMN thread_id TEXT;
ALTER TABLE event_push_actions ADD COLUMN thread_id TEXT;
ALTER TABLE event_push_summary ADD COLUMN thread_id TEXT;

-- Update the unique index for `event_push_summary`.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7006, 'event_push_summary_unique_index2', '{}');

INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(7006, 'event_push_backfill_thread_id', '{}', 'event_push_summary_unique_index2');
Loading