Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
anoadragon453 committed Feb 12, 2021
1 parent e10b3b3 commit a8b380c
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 4 deletions.
67 changes: 63 additions & 4 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,10 @@ async def _handle_state_delta(self, deltas):
"""Process current state deltas to find new joins that need to be
handled.
"""
join_deltas = []

for delta in deltas:
stream_id = delta["stream_id"]
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
Expand Down Expand Up @@ -892,12 +895,32 @@ async def _handle_state_delta(self, deltas):
# Ignore changes to join events.
continue

await self._on_user_joined_room(room_id, state_key)
join_deltas.append(delta)

await self._handle_join_deltas(deltas)

async def _handle_join_deltas(
join_event_data: List[Tuple]
) -> None:
"""Send out presence updates given a collection of user room join to a room.
async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
"""Called when we detect a user joining the room via the current state
delta stream.
We process these in batch as joining a room can produce many new joins in the
event stream at once.
...
"""
# For each delta:

# If this is a local join, just get all hosts in the room

# If this is a remote join, check if this is the first time a user from this hs
# has joined this room (at this Stream ordering!). If it is, note down this host

# Now we have a list of hosts

# Figure out which hosts we haven't seen before (whether we share a room
# with them currently. Might need to take into account the list of local joins?)

# Send presence to the remaining hosts

if self.is_mine_id(user_id):
# If this is a local user then we need to send their presence
Expand All @@ -906,9 +929,45 @@ async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
# TODO: We should be able to filter the hosts down to those that
# haven't previously seen the user

# Not only do we want to check if we've seen this homeserver before. Really,
# we want to check that this homeserver has not seen this user before
#
# So REALLY, we want to check if whether this user already shares a room with
# another user on the remote homeserver

state = await self.current_state_for_user(user_id)
hosts = await self.state.get_current_hosts_in_room(room_id)

# Get list of rooms ID that this user is in
user_rooms = await self.store.get_rooms_for_user(user_id)

# How about we use the users_who_share_pub/private_rooms tables,
# which presumably have indexes on user1/user2.

for host in hosts:
# Get the current list of rooms that this host is in
shared_rooms = await self.store.get_shared_rooms_for_user_and_destination(
user_id, host
)

# If this user already shares a room with another user on this host,
# then that host should already have presence information for this user.
if len(shared_rooms) > 2:
continue

# Check what other rooms this user shares with other users on a homeserver
room_ids = await self.store.get_rooms_for_destination(
destination=host, max_stream_ordering=stream_id,
)

if len(room_ids) > 1:
# This homeserver has a user in another room that we're aware of
# It's not the first time we've seen this homeserver
return

# Filter out ourselves.
hosts.discard(self.server_name)

self.federation.send_presence_to_destinations(
states=[state], destinations=hosts
)
Expand Down
15 changes: 15 additions & 0 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,21 @@ async def get_rooms_for_user(self, user_id: str, on_invalidate=None):
)
return frozenset(r.room_id for r in rooms)

@cached(cache_context=True, iterable=True)
async def get_rooms_for_destination(
self, destination: str, max_stream_ordering: Optional[int] = None,
):
sql = """
select distinct e.room_id from current_state_events cse
inner join events e using (event_id)
where
cse.state_key = ?
and cse.membership = ?
and e.stream_ordering < 10000000;
"""

args = ("%:" + destination, Membership.JOIN)

@cached(max_entries=500000, iterable=True)
async def get_users_who_share_room_with_user(self, user_id: str) -> Set[str]:
"""Returns the set of users who share a room with `user_id`
Expand Down
46 changes: 46 additions & 0 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,52 @@ async def get_user_dir_rooms_user_is_in(self, user_id):
users.update(rows)
return list(users)

async def get_shared_rooms_for_user_and_destination(
self, user_id: str, destination: str
) -> Set[str]:
"""Returns the rooms that a local user shares with any user from a remote
destination
Args:
user_id: The MXID of a local user
destination: The server name of the remote destination
Returns:
A set of room ID's that both the local user and one or more users from the
destination share.
"""

def _get_shared_rooms_for_user_and_destination_txn(txn):
destination_clause = "%:" + destination

txn.execute(
"""
SELECT p1.room_id
FROM users_in_public_rooms as p1
INNER JOIN users_in_public_rooms as p2
p1.room_id = p2.room_id
AND p1.user_id = ?
AND p2.user_id LIKE ?
UNION
SELECT room_id
FROM users_who_share_private_rooms
WHERE
user_id = ?
AND other_user_id LIKE ?
""",
(user_id, destination, user_id, destination_clause),
)

rows = self.db_pool.cursor_to_dict(txn)
return rows

rows = await self.db_pool.runInteraction(
"get_shared_rooms_for_user_and_destination",
_get_shared_rooms_for_user_and_destination_txn,
)

return {row["room_id"] for row in rows}

@cached()
async def get_shared_rooms_for_users(
self, user_id: str, other_user_id: str
Expand Down

0 comments on commit a8b380c

Please sign in to comment.