Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Port receipt and read markers to async/wait #6280

Merged
merged 7 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6279.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Port `federation_server.py` to async/await.
1 change: 1 addition & 0 deletions changelog.d/6280.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Port receipt and read markers to async/wait.
205 changes: 87 additions & 118 deletions synapse/federation/federation_server.py

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

from sortedcontainers import SortedDict

from twisted.internet import defer

from synapse.metrics import LaterGauge
from synapse.storage.presence import UserPresenceState
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -212,7 +214,7 @@ def send_read_receipt(self, receipt):
receipt (synapse.types.ReadReceipt):
"""
# nothing to do here: the replication listener will handle it.
pass
return defer.succeed(None)

def send_presence(self, states):
"""As per FederationSender
Expand Down
13 changes: 5 additions & 8 deletions synapse/handlers/read_marker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import logging

from twisted.internet import defer

from synapse.util.async_helpers import Linearizer

from ._base import BaseHandler
Expand All @@ -32,31 +30,30 @@ def __init__(self, hs):
self.read_marker_linearizer = Linearizer(name="read_marker")
self.notifier = hs.get_notifier()

@defer.inlineCallbacks
def received_client_read_marker(self, room_id, user_id, event_id):
async def received_client_read_marker(self, room_id, user_id, event_id):
"""Updates the read marker for a given user in a given room if the event ID given
is ahead in the stream relative to the current read marker.

This uses a notifier to indicate that account data should be sent down /sync if
the read marker has changed.
"""

with (yield self.read_marker_linearizer.queue((room_id, user_id))):
existing_read_marker = yield self.store.get_account_data_for_room_and_type(
with await self.read_marker_linearizer.queue((room_id, user_id)):
existing_read_marker = await self.store.get_account_data_for_room_and_type(
user_id, room_id, "m.fully_read"
)

should_update = True

if existing_read_marker:
# Only update if the new marker is ahead in the stream
should_update = yield self.store.is_event_after(
should_update = await self.store.is_event_after(
event_id, existing_read_marker["event_id"]
)

if should_update:
content = {"event_id": event_id}
max_id = yield self.store.add_account_data_to_room(
max_id = await self.store.add_account_data_to_room(
user_id, room_id, "m.fully_read", content
)
self.notifier.on_new_event("account_data_key", max_id, users=[user_id])
37 changes: 12 additions & 25 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from synapse.handlers._base import BaseHandler
from synapse.types import ReadReceipt, get_domain_from_id
from synapse.util.async_helpers import maybe_awaitable

logger = logging.getLogger(__name__)

Expand All @@ -36,8 +37,7 @@ def __init__(self, hs):
self.clock = self.hs.get_clock()
self.state = hs.get_state_handler()

@defer.inlineCallbacks
def _received_remote_receipt(self, origin, content):
async def _received_remote_receipt(self, origin, content):
"""Called when we receive an EDU of type m.receipt from a remote HS.
"""
receipts = []
Expand All @@ -62,17 +62,16 @@ def _received_remote_receipt(self, origin, content):
)
)

yield self._handle_new_receipts(receipts)
await self._handle_new_receipts(receipts)

@defer.inlineCallbacks
def _handle_new_receipts(self, receipts):
async def _handle_new_receipts(self, receipts):
"""Takes a list of receipts, stores them and informs the notifier.
"""
min_batch_id = None
max_batch_id = None

for receipt in receipts:
res = yield self.store.insert_receipt(
res = await self.store.insert_receipt(
receipt.room_id,
receipt.receipt_type,
receipt.user_id,
Expand All @@ -99,14 +98,15 @@ def _handle_new_receipts(self, receipts):

self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids)
# Note that the min here shouldn't be relied upon to be accurate.
yield self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids
await maybe_awaitable(
self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids
)
)

return True

@defer.inlineCallbacks
def received_client_receipt(self, room_id, receipt_type, user_id, event_id):
async def received_client_receipt(self, room_id, receipt_type, user_id, event_id):
"""Called when a client tells us a local user has read up to the given
event_id in the room.
"""
Expand All @@ -118,24 +118,11 @@ def received_client_receipt(self, room_id, receipt_type, user_id, event_id):
data={"ts": int(self.clock.time_msec())},
)

is_new = yield self._handle_new_receipts([receipt])
is_new = await self._handle_new_receipts([receipt])
if not is_new:
return

yield self.federation.send_read_receipt(receipt)

@defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key):
"""Gets all receipts for a room, upto the given key.
"""
result = yield self.store.get_linearized_receipts_for_room(
room_id, to_key=to_key
)

if not result:
return []

return result
await self.federation.send_read_receipt(receipt)


class ReceiptEventSource(object):
Expand Down
13 changes: 5 additions & 8 deletions synapse/rest/client/v2_alpha/read_marker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import logging

from twisted.internet import defer

from synapse.http.servlet import RestServlet, parse_json_object_from_request

from ._base import client_patterns
Expand All @@ -34,17 +32,16 @@ def __init__(self, hs):
self.read_marker_handler = hs.get_read_marker_handler()
self.presence_handler = hs.get_presence_handler()

@defer.inlineCallbacks
def on_POST(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
async def on_POST(self, request, room_id):
requester = await self.auth.get_user_by_req(request)

yield self.presence_handler.bump_presence_active_time(requester.user)
await self.presence_handler.bump_presence_active_time(requester.user)

body = parse_json_object_from_request(request)

read_event_id = body.get("m.read", None)
if read_event_id:
yield self.receipts_handler.received_client_receipt(
await self.receipts_handler.received_client_receipt(
room_id,
"m.read",
user_id=requester.user.to_string(),
Expand All @@ -53,7 +50,7 @@ def on_POST(self, request, room_id):

read_marker_event_id = body.get("m.fully_read", None)
if read_marker_event_id:
yield self.read_marker_handler.received_client_read_marker(
await self.read_marker_handler.received_client_read_marker(
room_id,
user_id=requester.user.to_string(),
event_id=read_marker_event_id,
Expand Down
11 changes: 4 additions & 7 deletions synapse/rest/client/v2_alpha/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import logging

from twisted.internet import defer

from synapse.api.errors import SynapseError
from synapse.http.servlet import RestServlet

Expand All @@ -39,16 +37,15 @@ def __init__(self, hs):
self.receipts_handler = hs.get_receipts_handler()
self.presence_handler = hs.get_presence_handler()

@defer.inlineCallbacks
def on_POST(self, request, room_id, receipt_type, event_id):
requester = yield self.auth.get_user_by_req(request)
async def on_POST(self, request, room_id, receipt_type, event_id):
requester = await self.auth.get_user_by_req(request)

if receipt_type != "m.read":
raise SynapseError(400, "Receipt type must be 'm.read'")

yield self.presence_handler.bump_presence_active_time(requester.user)
await self.presence_handler.bump_presence_active_time(requester.user)

yield self.receipts_handler.received_client_receipt(
await self.receipts_handler.received_client_receipt(
room_id, receipt_type, user_id=requester.user.to_string(), event_id=event_id
)

Expand Down
7 changes: 3 additions & 4 deletions synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2439,12 +2439,11 @@ def _purge_room_txn(self, txn, room_id):

logger.info("[purge] done")

@defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2):
async def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
"""
to_1, so_1 = yield self._get_event_ordering(event_id1)
to_2, so_2 = yield self._get_event_ordering(event_id2)
to_1, so_1 = await self._get_event_ordering(event_id1)
to_2, so_2 = await self._get_event_ordering(event_id2)
return (to_1, so_1) > (to_2, so_2)

@cachedInlineCallbacks(max_entries=5000)
Expand Down
7 changes: 3 additions & 4 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def concurrently_execute(func, args, limit):
the number of concurrent executions.

Args:
func (func): Function to execute, should return a deferred.
func (func): Function to execute, should return a deferred or coroutine.
args (list): List of arguments to pass to func, each invocation of func
gets a signle argument.
limit (int): Maximum number of conccurent executions.
Expand All @@ -148,11 +148,10 @@ def concurrently_execute(func, args, limit):
"""
it = iter(args)

@defer.inlineCallbacks
def _concurrently_execute_inner():
async def _concurrently_execute_inner():
try:
while True:
yield func(next(it))
await maybe_awaitable(func(next(it)))
except StopIteration:
pass

Expand Down
3 changes: 3 additions & 0 deletions tests/handlers/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ def get_current_users_in_room(room_id):
self.datastore.get_to_device_stream_token = lambda: 0
self.datastore.get_new_device_msgs_for_remote = lambda *args, **kargs: ([], 0)
self.datastore.delete_device_msgs_for_remote = lambda *args, **kargs: None
self.datastore.set_received_txn_response = lambda *args, **kwargs: defer.succeed(
None
)

def test_started_typing_local(self):
self.room_members = [U_APPLE, U_BANANA]
Expand Down