Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Don't send normal presence updates over federation replication stream (
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored Apr 19, 2021
1 parent c571736 commit 2b7dd21
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 183 deletions.
1 change: 1 addition & 0 deletions changelog.d/9828.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for handling presence on a worker.
70 changes: 2 additions & 68 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ def __init__(self, hs: "HomeServer"):
# Pending presence map user_id -> UserPresenceState
self.presence_map = {} # type: Dict[str, UserPresenceState]

# Stream position -> list[user_id]
self.presence_changed = SortedDict() # type: SortedDict[int, List[str]]

# Stores the destinations we need to explicitly send presence to about a
# given user.
# Stream position -> (user_id, destinations)
Expand All @@ -96,7 +93,7 @@ def __init__(self, hs: "HomeServer"):

self.edus = SortedDict() # type: SortedDict[int, Edu]

# stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
# stream ID for the next entry into keyed_edu_changed/edus.
self.pos = 1

# map from stream ID to the time that stream entry was generated, so that we
Expand All @@ -117,7 +114,6 @@ def register(name: str, queue: Sized) -> None:

for queue_name in [
"presence_map",
"presence_changed",
"keyed_edu",
"keyed_edu_changed",
"edus",
Expand Down Expand Up @@ -155,23 +151,12 @@ def _clear_queue_before_pos(self, position_to_delete: int) -> None:
"""Clear all the queues from before a given position"""
with Measure(self.clock, "send_queue._clear"):
# Delete things out of presence maps
keys = self.presence_changed.keys()
i = self.presence_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_changed[key]

user_ids = {
user_id for uids in self.presence_changed.values() for user_id in uids
}

keys = self.presence_destinations.keys()
i = self.presence_destinations.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_destinations[key]

user_ids.update(
user_id for user_id, _ in self.presence_destinations.values()
)
user_ids = {user_id for user_id, _ in self.presence_destinations.values()}

to_del = [
user_id for user_id in self.presence_map if user_id not in user_ids
Expand Down Expand Up @@ -244,23 +229,6 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""
# nothing to do here: the replication listener will handle it.

def send_presence(self, states: List[UserPresenceState]) -> None:
"""As per FederationSender
Args:
states
"""
pos = self._next_pos()

# We only want to send presence for our own users, so lets always just
# filter here just in case.
local_states = [s for s in states if self.is_mine_id(s.user_id)]

self.presence_map.update({state.user_id: state for state in local_states})
self.presence_changed[pos] = [state.user_id for state in local_states]

self.notifier.on_new_replication_data()

def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
Expand Down Expand Up @@ -325,18 +293,6 @@ async def get_replication_rows(
# of the federation stream.
rows = [] # type: List[Tuple[int, BaseFederationRow]]

# Fetch changed presence
i = self.presence_changed.bisect_right(from_token)
j = self.presence_changed.bisect_right(to_token) + 1
dest_user_ids = [
(pos, user_id)
for pos, user_id_list in self.presence_changed.items()[i:j]
for user_id in user_id_list
]

for (key, user_id) in dest_user_ids:
rows.append((key, PresenceRow(state=self.presence_map[user_id])))

# Fetch presence to send to destinations
i = self.presence_destinations.bisect_right(from_token)
j = self.presence_destinations.bisect_right(to_token) + 1
Expand Down Expand Up @@ -427,22 +383,6 @@ def add_to_buffer(self, buff):
raise NotImplementedError()


class PresenceRow(
BaseFederationRow, namedtuple("PresenceRow", ("state",)) # UserPresenceState
):
TypeId = "p"

@staticmethod
def from_data(data):
return PresenceRow(state=UserPresenceState.from_dict(data))

def to_data(self):
return self.state.as_dict()

def add_to_buffer(self, buff):
buff.presence.append(self.state)


class PresenceDestinationsRow(
BaseFederationRow,
namedtuple(
Expand Down Expand Up @@ -506,7 +446,6 @@ def add_to_buffer(self, buff):


_rowtypes = (
PresenceRow,
PresenceDestinationsRow,
KeyedEduRow,
EduRow,
Expand All @@ -518,7 +457,6 @@ def add_to_buffer(self, buff):
ParsedFederationStreamData = namedtuple(
"ParsedFederationStreamData",
(
"presence", # list(UserPresenceState)
"presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
Expand All @@ -543,7 +481,6 @@ def process_rows_for_federation(
# them into the appropriate collection and then send them off.

buff = ParsedFederationStreamData(
presence=[],
presence_destinations=[],
keyed_edus={},
edus={},
Expand All @@ -559,9 +496,6 @@ def process_rows_for_federation(
parsed_row = RowType.from_data(row.data)
parsed_row.add_to_buffer(buff)

if buff.presence:
transaction_queue.send_presence(buff.presence)

for state, destinations in buff.presence_destinations:
transaction_queue.send_presence_to_destinations(
states=[state], destinations=destinations
Expand Down
96 changes: 1 addition & 95 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
from synapse.handlers.presence import get_interested_remotes
from synapse.logging.context import preserve_fn
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
Expand All @@ -34,7 +32,7 @@
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func
from synapse.util.metrics import Measure

if TYPE_CHECKING:
from synapse.events.presence_router import PresenceRouter
Expand Down Expand Up @@ -79,15 +77,6 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""
raise NotImplementedError()

@abc.abstractmethod
def send_presence(self, states: List[UserPresenceState]) -> None:
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
"""
raise NotImplementedError()

@abc.abstractmethod
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
Expand Down Expand Up @@ -176,11 +165,6 @@ def __init__(self, hs: "HomeServer"):
),
)

# Map of user_id -> UserPresenceState for all the pending presence
# to be sent out by user_id. Entries here get processed and put in
# pending_presence_by_dest
self.pending_presence = {} # type: Dict[str, UserPresenceState]

LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
Expand All @@ -201,8 +185,6 @@ def __init__(self, hs: "HomeServer"):
self._is_processing = False
self._last_poked_id = -1

self._processing_pending_presence = False

# map from room_id to a set of PerDestinationQueues which we believe are
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
# here for a given room means that we are rate-limiting RR flushes to that room,
Expand Down Expand Up @@ -546,48 +528,6 @@ def _flush_rrs_for_room(self, room_id: str) -> None:
for queue in queues:
queue.flush_read_receipts_for_room(room_id)

@preserve_fn # the caller should not yield on this
async def send_presence(self, states: List[UserPresenceState]) -> None:
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
return

# First we queue up the new presence by user ID, so multiple presence
# updates in quick succession are correctly handled.
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update(
{state.user_id: state for state in states if self.is_mine_id(state.user_id)}
)

# We then handle the new pending presence in batches, first figuring
# out the destinations we need to send each state to and then poking it
# to attempt a new transaction. We linearize this so that we don't
# accidentally mess up the ordering and send multiple presence updates
# in the wrong order
if self._processing_pending_presence:
return

self._processing_pending_presence = True
try:
while True:
states_map = self.pending_presence
self.pending_presence = {}

if not states_map:
break

await self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
self._processing_pending_presence = False

def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
Expand All @@ -608,40 +548,6 @@ def send_presence_to_destinations(
continue
self._get_per_destination_queue(destination).send_presence(states)

@measure_func("txnqueue._process_presence")
async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
# We pull the presence router here instead of __init__
# to prevent a dependency cycle:
#
# AuthHandler -> Notifier -> FederationSender
# -> PresenceRouter -> ModuleApi -> AuthHandler
if self._presence_router is None:
self._presence_router = self.hs.get_presence_router()

assert self._presence_router is not None

hosts_and_states = await get_interested_remotes(
self.store,
self._presence_router,
states,
self.state,
)

for destinations, states in hosts_and_states:
for destination in destinations:
if destination == self.server_name:
continue

if not self._federation_shard_config.should_handle(
self._instance_name, destination
):
continue

self._get_per_destination_queue(destination).send_presence(states)

def build_and_send_edu(
self,
destination: str,
Expand Down
Loading

0 comments on commit 2b7dd21

Please sign in to comment.