Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Re-implement unread counts (again) #8059

Merged
merged 17 commits into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8059.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add unread messages count to sync responses, as specified in [MSC2654](https://github.com/matrix-org/matrix-doc/pull/2654).
33 changes: 18 additions & 15 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ def __nonzero__(self) -> bool:
__bool__ = __nonzero__ # python3


@attr.s(slots=True, frozen=True)
# We can't freeze this class, because we need to update it after it's instantiated to
# update its unread count. This is because we calculate the unread count for a room only
# if there are updates for it, which we check after the instance has been created.
# This should not be a big deal because we update the notification counts afterwards as
# well anyway.
@attr.s(slots=True)
class JoinedSyncResult:
room_id = attr.ib(type=str)
timeline = attr.ib(type=TimelineBatch)
Expand All @@ -104,6 +109,7 @@ class JoinedSyncResult:
account_data = attr.ib(type=List[JsonDict])
unread_notifications = attr.ib(type=JsonDict)
summary = attr.ib(type=Optional[JsonDict])
unread_count = attr.ib(type=int)

def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
Expand Down Expand Up @@ -931,23 +937,18 @@ async def compute_state_delta(

async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
) -> Optional[Dict[str, str]]:
) -> Dict[str, int]:
with Measure(self.clock, "unread_notifs_for_room_id"):
last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
user_id=sync_config.user.to_string(),
room_id=room_id,
receipt_type="m.read",
)

if last_unread_event_id:
notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
)
return notifs

# There is no new information in this period, so your notification
# count is whatever it was last time.
return None
notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
)
return notifs
babolivier marked this conversation as resolved.
Show resolved Hide resolved

async def generate_sync_result(
self,
Expand Down Expand Up @@ -1886,7 +1887,7 @@ async def _generate_room_entry(
)

if room_builder.rtype == "joined":
unread_notifications = {} # type: Dict[str, str]
unread_notifications = {} # type: Dict[str, int]
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
Expand All @@ -1895,14 +1896,16 @@ async def _generate_room_entry(
account_data=account_data_events,
unread_notifications=unread_notifications,
summary=summary,
unread_count=0,
)

if room_sync or always_include:
notifs = await self.unread_notifs_for_room_id(room_id, sync_config)

if notifs is not None:
unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]
unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]

room_sync.unread_count = notifs["unread_count"]

sync_result_builder.joined.append(room_sync)

Expand Down
92 changes: 64 additions & 28 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

from prometheus_client import Counter

from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import EventTypes, Membership, RelationTypes
from synapse.event_auth import get_user_power_level
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.state import POWER_KEY
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import register_cache
Expand Down Expand Up @@ -51,6 +53,48 @@
)


STATE_EVENT_TYPES_TO_MARK_UNREAD = {
EventTypes.Topic,
EventTypes.Name,
EventTypes.RoomAvatar,
EventTypes.Tombstone,
}


def _should_count_as_unread(event: EventBase, context: EventContext) -> bool:
# Exclude rejected and soft-failed events.
if context.rejected or event.internal_metadata.is_soft_failed():
return False

# Exclude notices.
if (
not event.is_state()
and event.type == EventTypes.Message
and event.content.get("msgtype") == "m.notice"
):
return False

# Exclude edits.
relates_to = event.content.get("m.relates_to", {})
if relates_to.get("rel_type") == RelationTypes.REPLACE:
return False

# Mark events that have a non-empty string body as unread.
body = event.content.get("body")
if isinstance(body, str) and body:
return True

# Mark some state events as unread.
if event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD:
return True

# Mark encrypted events as unread.
if not event.is_state() and event.type == EventTypes.Encrypted:
return True

return False


class BulkPushRuleEvaluator(object):
"""Calculates the outcome of push rules for an event for all users in the
room at once.
Expand Down Expand Up @@ -133,9 +177,12 @@ async def _get_power_levels_and_sender_level(self, event, context):
return pl_event.content if pl_event else {}, sender_level

async def action_for_event_by_user(self, event, context) -> None:
"""Given an event and context, evaluate the push rules and insert the
results into the event_push_actions_staging table.
"""Given an event and context, evaluate the push rules, check if the message
should increment the unread count, and insert the results into the
event_push_actions_staging table.
"""
count_as_unread = _should_count_as_unread(event, context)

rules_by_user = await self._get_rules_for_event(event, context)
actions_by_user = {}

Expand Down Expand Up @@ -172,6 +219,8 @@ async def action_for_event_by_user(self, event, context) -> None:
if event.type == EventTypes.Member and event.state_key == uid:
display_name = event.content.get("displayname", None)

actions_by_user[uid] = []

for rule in rules:
if "enabled" in rule and not rule["enabled"]:
continue
Expand All @@ -189,7 +238,9 @@ async def action_for_event_by_user(self, event, context) -> None:
# Mark in the DB staging area the push actions for users who should be
# notified for this event. (This will then get handled when we persist
# the event)
await self.store.add_push_actions_to_staging(event.event_id, actions_by_user)
await self.store.add_push_actions_to_staging(
event.event_id, actions_by_user, count_as_unread,
)


def _condition_checker(evaluator, conditions, uid, display_name, cache):
Expand Down Expand Up @@ -369,8 +420,8 @@ async def _update_rules_with_member_event_ids(
Args:
ret_rules_by_user (dict): Partiallly filled dict of push rules. Gets
updated with any new rules.
member_event_ids (list): List of event ids for membership events that
have happened since the last time we filled rules_by_user
member_event_ids (dict): Dict of user id to event id for membership events
that have happened since the last time we filled rules_by_user
state_group: The state group we are currently computing push rules
for. Used when updating the cache.
"""
Expand All @@ -390,34 +441,19 @@ async def _update_rules_with_member_event_ids(
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Found members %r: %r", self.room_id, members.values())

interested_in_user_ids = {
user_ids = {
user_id
for user_id, membership in members.values()
if membership == Membership.JOIN
}

logger.debug("Joined: %r", interested_in_user_ids)

if_users_with_pushers = await self.store.get_if_users_have_pushers(
interested_in_user_ids, on_invalidate=self.invalidate_all_cb
)

user_ids = {
uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
}

logger.debug("With pushers: %r", user_ids)

users_with_receipts = await self.store.get_users_with_read_receipts_in_room(
self.room_id, on_invalidate=self.invalidate_all_cb
)

logger.debug("With receipts: %r", users_with_receipts)
logger.debug("Joined: %r", user_ids)

# any users with pushers must be ours: they have pushers
for uid in users_with_receipts:
if uid in interested_in_user_ids:
user_ids.add(uid)
# Previously we only considered users with pushers or read receipts in that
# room. We can't do this anymore because we use push actions to calculate unread
# counts, which don't rely on the user having pushers or sent a read receipt into
# the room. Therefore we just need to filter for local users here.
user_ids = list(filter(self.is_mine_id, user_ids))

rules_by_user = await self.store.bulk_get_push_rules(
user_ids, on_invalidate=self.invalidate_all_cb
Expand Down
2 changes: 1 addition & 1 deletion synapse/push/push_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def get_badge_count(store, user_id):
)
# return one badge count per conversation, as count per
# message is so noisy as to be almost useless
badge += 1 if notifs["notify_count"] else 0
badge += 1 if notifs["unread_count"] else 0
return badge


Expand Down
1 change: 1 addition & 0 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ def serialize(events):
result["ephemeral"] = {"events": ephemeral_events}
result["unread_notifications"] = room.unread_notifications
result["summary"] = room.summary
result["org.matrix.msc2654.unread_count"] = room.unread_count

return result

Expand Down
Loading