Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Re-implement unread counts (again) (#8059)
Browse files Browse the repository at this point in the history
  • Loading branch information
babolivier authored Sep 2, 2020
1 parent 0d4f614 commit 5a1dd29
Show file tree
Hide file tree
Showing 12 changed files with 457 additions and 122 deletions.
1 change: 1 addition & 0 deletions changelog.d/8059.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add unread messages count to sync responses, as specified in [MSC2654](https://github.com/matrix-org/matrix-doc/pull/2654).
33 changes: 18 additions & 15 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ def __nonzero__(self) -> bool:
__bool__ = __nonzero__ # python3


@attr.s(slots=True, frozen=True)
# We can't freeze this class, because we need to update it after it's instantiated to
# update its unread count. This is because we calculate the unread count for a room only
# if there are updates for it, which we check after the instance has been created.
# This should not be a big deal because we update the notification counts afterwards as
# well anyway.
@attr.s(slots=True)
class JoinedSyncResult:
room_id = attr.ib(type=str)
timeline = attr.ib(type=TimelineBatch)
Expand All @@ -104,6 +109,7 @@ class JoinedSyncResult:
account_data = attr.ib(type=List[JsonDict])
unread_notifications = attr.ib(type=JsonDict)
summary = attr.ib(type=Optional[JsonDict])
unread_count = attr.ib(type=int)

def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
Expand Down Expand Up @@ -931,23 +937,18 @@ async def compute_state_delta(

async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
) -> Optional[Dict[str, str]]:
) -> Dict[str, int]:
with Measure(self.clock, "unread_notifs_for_room_id"):
last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
user_id=sync_config.user.to_string(),
room_id=room_id,
receipt_type="m.read",
)

if last_unread_event_id:
notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
)
return notifs

# There is no new information in this period, so your notification
# count is whatever it was last time.
return None
notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
)
return notifs

async def generate_sync_result(
self,
Expand Down Expand Up @@ -1886,7 +1887,7 @@ async def _generate_room_entry(
)

if room_builder.rtype == "joined":
unread_notifications = {} # type: Dict[str, str]
unread_notifications = {} # type: Dict[str, int]
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
Expand All @@ -1895,14 +1896,16 @@ async def _generate_room_entry(
account_data=account_data_events,
unread_notifications=unread_notifications,
summary=summary,
unread_count=0,
)

if room_sync or always_include:
notifs = await self.unread_notifs_for_room_id(room_id, sync_config)

if notifs is not None:
unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]
unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]

room_sync.unread_count = notifs["unread_count"]

sync_result_builder.joined.append(room_sync)

Expand Down
92 changes: 64 additions & 28 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

from prometheus_client import Counter

from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import EventTypes, Membership, RelationTypes
from synapse.event_auth import get_user_power_level
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.state import POWER_KEY
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import register_cache
Expand Down Expand Up @@ -51,6 +53,48 @@
)


STATE_EVENT_TYPES_TO_MARK_UNREAD = {
EventTypes.Topic,
EventTypes.Name,
EventTypes.RoomAvatar,
EventTypes.Tombstone,
}


def _should_count_as_unread(event: EventBase, context: EventContext) -> bool:
# Exclude rejected and soft-failed events.
if context.rejected or event.internal_metadata.is_soft_failed():
return False

# Exclude notices.
if (
not event.is_state()
and event.type == EventTypes.Message
and event.content.get("msgtype") == "m.notice"
):
return False

# Exclude edits.
relates_to = event.content.get("m.relates_to", {})
if relates_to.get("rel_type") == RelationTypes.REPLACE:
return False

# Mark events that have a non-empty string body as unread.
body = event.content.get("body")
if isinstance(body, str) and body:
return True

# Mark some state events as unread.
if event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD:
return True

# Mark encrypted events as unread.
if not event.is_state() and event.type == EventTypes.Encrypted:
return True

return False


class BulkPushRuleEvaluator(object):
"""Calculates the outcome of push rules for an event for all users in the
room at once.
Expand Down Expand Up @@ -133,9 +177,12 @@ async def _get_power_levels_and_sender_level(self, event, context):
return pl_event.content if pl_event else {}, sender_level

async def action_for_event_by_user(self, event, context) -> None:
"""Given an event and context, evaluate the push rules and insert the
results into the event_push_actions_staging table.
"""Given an event and context, evaluate the push rules, check if the message
should increment the unread count, and insert the results into the
event_push_actions_staging table.
"""
count_as_unread = _should_count_as_unread(event, context)

rules_by_user = await self._get_rules_for_event(event, context)
actions_by_user = {}

Expand Down Expand Up @@ -172,6 +219,8 @@ async def action_for_event_by_user(self, event, context) -> None:
if event.type == EventTypes.Member and event.state_key == uid:
display_name = event.content.get("displayname", None)

actions_by_user[uid] = []

for rule in rules:
if "enabled" in rule and not rule["enabled"]:
continue
Expand All @@ -189,7 +238,9 @@ async def action_for_event_by_user(self, event, context) -> None:
# Mark in the DB staging area the push actions for users who should be
# notified for this event. (This will then get handled when we persist
# the event)
await self.store.add_push_actions_to_staging(event.event_id, actions_by_user)
await self.store.add_push_actions_to_staging(
event.event_id, actions_by_user, count_as_unread,
)


def _condition_checker(evaluator, conditions, uid, display_name, cache):
Expand Down Expand Up @@ -369,8 +420,8 @@ async def _update_rules_with_member_event_ids(
Args:
ret_rules_by_user (dict): Partiallly filled dict of push rules. Gets
updated with any new rules.
member_event_ids (list): List of event ids for membership events that
have happened since the last time we filled rules_by_user
member_event_ids (dict): Dict of user id to event id for membership events
that have happened since the last time we filled rules_by_user
state_group: The state group we are currently computing push rules
for. Used when updating the cache.
"""
Expand All @@ -390,34 +441,19 @@ async def _update_rules_with_member_event_ids(
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Found members %r: %r", self.room_id, members.values())

interested_in_user_ids = {
user_ids = {
user_id
for user_id, membership in members.values()
if membership == Membership.JOIN
}

logger.debug("Joined: %r", interested_in_user_ids)

if_users_with_pushers = await self.store.get_if_users_have_pushers(
interested_in_user_ids, on_invalidate=self.invalidate_all_cb
)

user_ids = {
uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
}

logger.debug("With pushers: %r", user_ids)

users_with_receipts = await self.store.get_users_with_read_receipts_in_room(
self.room_id, on_invalidate=self.invalidate_all_cb
)

logger.debug("With receipts: %r", users_with_receipts)
logger.debug("Joined: %r", user_ids)

# any users with pushers must be ours: they have pushers
for uid in users_with_receipts:
if uid in interested_in_user_ids:
user_ids.add(uid)
# Previously we only considered users with pushers or read receipts in that
# room. We can't do this anymore because we use push actions to calculate unread
# counts, which don't rely on the user having pushers or sent a read receipt into
# the room. Therefore we just need to filter for local users here.
user_ids = list(filter(self.is_mine_id, user_ids))

rules_by_user = await self.store.bulk_get_push_rules(
user_ids, on_invalidate=self.invalidate_all_cb
Expand Down
2 changes: 1 addition & 1 deletion synapse/push/push_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def get_badge_count(store, user_id):
)
# return one badge count per conversation, as count per
# message is so noisy as to be almost useless
badge += 1 if notifs["notify_count"] else 0
badge += 1 if notifs["unread_count"] else 0
return badge


Expand Down
1 change: 1 addition & 0 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ def serialize(events):
result["ephemeral"] = {"events": ephemeral_events}
result["unread_notifications"] = room.unread_notifications
result["summary"] = room.summary
result["org.matrix.msc2654.unread_count"] = room.unread_count

return result

Expand Down
Loading

0 comments on commit 5a1dd29

Please sign in to comment.