Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Statistics-based Room Directory Queries #5947

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5947.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Perform room directory searches more efficiently, using room statistics.
8 changes: 8 additions & 0 deletions synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,10 @@ async def on_GET(self, origin, content, query):
else:
network_tuple = ThirdPartyInstanceID(None, None)

if limit == 0:
# zero is a special value which corresponds to no limit.
limit = None

data = await maybeDeferred(
self.handler.get_local_public_room_list,
limit,
Expand Down Expand Up @@ -796,6 +800,10 @@ async def on_POST(self, origin, content, query):
if search_filter is None:
logger.warning("Nonefilter")

if limit == 0:
# zero is a special value which corresponds to no limit.
limit = None

data = await self.handler.get_local_public_room_list(
limit=limit,
since_token=since_token,
Expand Down
281 changes: 96 additions & 185 deletions synapse/handlers/room_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@
from collections import namedtuple

from six import PY3, iteritems
from six.moves import range

import msgpack
from unpaddedbase64 import decode_base64, encode_base64

from twisted.internet import defer

from synapse.api.constants import EventTypes, JoinRules
from synapse.api.errors import Codes, HttpResponseException
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.types import ThirdPartyInstanceID
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache

Expand All @@ -37,7 +35,6 @@

REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000


# This is used to indicate we should only return rooms published to the main list.
EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)

Expand Down Expand Up @@ -72,6 +69,8 @@ def get_local_public_room_list(
This can be (None, None) to indicate the main list, or a particular
appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists.
from_federation (bool): true iff the request comes from the federation
API
"""
if not self.enable_room_list_search:
return defer.succeed({"chunk": [], "total_room_count_estimate": 0})
Expand Down Expand Up @@ -133,200 +132,113 @@ def _get_public_room_list(
from_federation (bool): Whether this request originated from a
federating server or a client. Used for room filtering.
timeout (int|None): Amount of seconds to wait for a response before
timing out.
timing out. TODO
"""
pagination_token = None
if since_token and since_token != "END":
since_token = RoomListNextBatch.from_token(since_token)
if since_token[0] in ("+", "-"):
forwards = since_token[0] == "+"
pagination_token = since_token[1:]
else:
raise SynapseError(400, "Invalid since token.")
else:
since_token = None
forwards = True

rooms_to_order_value = {}
rooms_to_num_joined = {}

newly_visible = []
newly_unpublished = []
if since_token:
stream_token = since_token.stream_ordering
current_public_id = yield self.store.get_current_public_room_stream_id()
public_room_stream_id = since_token.public_room_stream_id
newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
public_room_stream_id, current_public_id, network_tuple=network_tuple
)
else:
stream_token = yield self.store.get_room_max_stream_ordering()
public_room_stream_id = yield self.store.get_current_public_room_stream_id()
# we request one more than wanted to see if there are more pages to come
probing_limit = limit + 1 if limit is not None else None

room_ids = yield self.store.get_public_room_ids_at_stream_id(
public_room_stream_id, network_tuple=network_tuple
results = yield self.store.get_largest_public_rooms(
network_tuple,
search_filter,
probing_limit,
pagination_token,
forwards,
fetch_creation_event_ids=from_federation,
)

# We want to return rooms in a particular order: the number of joined
# users. We then arbitrarily use the room_id as a tie breaker.

@defer.inlineCallbacks
def get_order_for_room(room_id):
# Most of the rooms won't have changed between the since token and
# now (especially if the since token is "now"). So, we can ask what
# the current users are in a room (that will hit a cache) and then
# check if the room has changed since the since token. (We have to
# do it in that order to avoid races).
# If things have changed then fall back to getting the current state
# at the since token.
joined_users = yield self.store.get_users_in_room(room_id)
if self.store.has_room_changed_since(room_id, stream_token):
latest_event_ids = yield self.store.get_forward_extremeties_for_room(
room_id, stream_token
)
def build_room_entry(room):
entry = {
"room_id": room["room_id"],
"name": room["name"],
"topic": room["topic"],
"canonical_alias": room["canonical_alias"],
"num_joined_members": room["joined_members"],
"avatar_url": room["avatar"],
"world_readable": room["history_visibility"] == "world_readable",
}

# Filter out Nones – rather omit the field altogether
return {k: v for k, v in entry.items() if v is not None}

if from_federation:
room_creation_event_ids = [r["creation_event_id"] for r in results]

results = [build_room_entry(r) for r in results]

response = {}
num_results = len(results)
if num_results > 0:
final_room_id = results[-1]["room_id"]
initial_room_id = results[0]["room_id"]
if limit is not None:
more_to_come = num_results == probing_limit
results = results[0:limit]
else:
more_to_come = False

if not forwards or (forwards and more_to_come):
response["next_batch"] = "+%s" % (final_room_id,)

if since_token and (forwards or (not forwards and more_to_come)):
if num_results > 0:
response["prev_batch"] = "-%s" % (initial_room_id,)
else:
response["prev_batch"] = "-%s" % (pagination_token,)

if from_federation:
# only show rooms with m.federate=True or absent (default is True)

if not latest_event_ids:
return
# we already have rooms' creation state events' IDs
# so get rooms' creation state events
creation_events_by_id = yield self.store.get_events(room_creation_event_ids)

joined_users = yield self.state_handler.get_current_users_in_room(
room_id, latest_event_ids
# now filter out rooms with m.federate: False in their create event
results = [
room
for (room, room_creation_event_id) in zip(
results, room_creation_event_ids
)
if creation_events_by_id[room_creation_event_id].content.get(
"m.federate", True
)
]

num_joined_users = len(joined_users)
rooms_to_num_joined[room_id] = num_joined_users
for room in results:
# populate search result entries with additional fields, namely
# 'aliases' and 'guest_can_join'
room_id = room["room_id"]

if num_joined_users == 0:
return
aliases = yield self.store.get_aliases_for_room(room_id)
if aliases:
room["aliases"] = aliases

# We want larger rooms to be first, hence negating num_joined_users
rooms_to_order_value[room_id] = (-num_joined_users, room_id)
state_ids = yield self.store.get_current_state_ids(room_id)
guests_can_join = False
guest_access_state_id = state_ids.get((EventTypes.GuestAccess, ""))
if guest_access_state_id is not None:
guest_access = yield self.store.get_event(guest_access_state_id)
if guest_access is not None:
if guest_access.content.get("guest_access") == "can_join":
guests_can_join = True
room["guest_can_join"] = guests_can_join

logger.info(
"Getting ordering for %i rooms since %s", len(room_ids), stream_token
)
yield concurrently_execute(get_order_for_room, room_ids, 10)

sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
sorted_rooms = [room_id for room_id, _ in sorted_entries]

# `sorted_rooms` should now be a list of all public room ids that is
# stable across pagination. Therefore, we can use indices into this
# list as our pagination tokens.

# Filter out rooms that we don't want to return
rooms_to_scan = [
r
for r in sorted_rooms
if r not in newly_unpublished and rooms_to_num_joined[r] > 0
]

total_room_count = len(rooms_to_scan)

if since_token:
# Filter out rooms we've already returned previously
# `since_token.current_limit` is the index of the last room we
# sent down, so we exclude it and everything before/after it.
if since_token.direction_is_forward:
rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :]
else:
rooms_to_scan = rooms_to_scan[: since_token.current_limit]
rooms_to_scan.reverse()

logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan))

# _append_room_entry_to_chunk will append to chunk but will stop if
# len(chunk) > limit
#
# Normally we will generate enough results on the first iteration here,
# but if there is a search filter, _append_room_entry_to_chunk may
# filter some results out, in which case we loop again.
#
# We don't want to scan over the entire range either as that
# would potentially waste a lot of work.
#
# XXX if there is no limit, we may end up DoSing the server with
# calls to get_current_state_ids for every single room on the
# server. Surely we should cap this somehow?
#
if limit:
step = limit + 1
else:
# step cannot be zero
step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1

chunk = []
for i in range(0, len(rooms_to_scan), step):
if timeout and self.clock.time() > timeout:
raise Exception("Timed out searching room directory")

batch = rooms_to_scan[i : i + step]
logger.info("Processing %i rooms for result", len(batch))
yield concurrently_execute(
lambda r: self._append_room_entry_to_chunk(
r,
rooms_to_num_joined[r],
chunk,
limit,
search_filter,
from_federation=from_federation,
),
batch,
5,
)
logger.info("Now %i rooms in result", len(chunk))
if len(chunk) >= limit + 1:
break

chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))

# Work out the new limit of the batch for pagination, or None if we
# know there are no more results that would be returned.
# i.e., [since_token.current_limit..new_limit] is the batch of rooms
# we've returned (or the reverse if we paginated backwards)
# We tried to pull out limit + 1 rooms above, so if we have <= limit
# then we know there are no more results to return
new_limit = None
if chunk and (not limit or len(chunk) > limit):

if not since_token or since_token.direction_is_forward:
if limit:
chunk = chunk[:limit]
last_room_id = chunk[-1]["room_id"]
else:
if limit:
chunk = chunk[-limit:]
last_room_id = chunk[0]["room_id"]

new_limit = sorted_rooms.index(last_room_id)

results = {"chunk": chunk, "total_room_count_estimate": total_room_count}

if since_token:
results["new_rooms"] = bool(newly_visible)

if not since_token or since_token.direction_is_forward:
if new_limit is not None:
results["next_batch"] = RoomListNextBatch(
stream_ordering=stream_token,
public_room_stream_id=public_room_stream_id,
current_limit=new_limit,
direction_is_forward=True,
).to_token()

if since_token:
results["prev_batch"] = since_token.copy_and_replace(
direction_is_forward=False,
current_limit=since_token.current_limit + 1,
).to_token()
else:
if new_limit is not None:
results["prev_batch"] = RoomListNextBatch(
stream_ordering=stream_token,
public_room_stream_id=public_room_stream_id,
current_limit=new_limit,
direction_is_forward=False,
).to_token()

if since_token:
results["next_batch"] = since_token.copy_and_replace(
direction_is_forward=True,
current_limit=since_token.current_limit - 1,
).to_token()

return results
response["chunk"] = results

# TODO for federation, we currently don't remove m.federate=False rooms
# from the total room count estimate.
response["total_room_count_estimate"] = yield self.store.count_public_rooms()

return response

@defer.inlineCallbacks
def _append_room_entry_to_chunk(
Expand Down Expand Up @@ -587,7 +499,6 @@ class RoomListNextBatch(
),
)
):

KEY_DICT = {
"stream_ordering": "s",
"public_room_stream_id": "p",
Expand Down
8 changes: 8 additions & 0 deletions synapse/rest/client/v1/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ def on_GET(self, request):
limit = parse_integer(request, "limit", 0)
since_token = parse_string(request, "since", None)

if limit == 0:
# zero is a special value which corresponds to no limit.
limit = None

handler = self.hs.get_room_list_handler()
if server:
data = yield handler.get_remote_public_room_list(
Expand Down Expand Up @@ -387,6 +391,10 @@ def on_POST(self, request):
else:
network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)

if limit == 0:
# zero is a special value which corresponds to no limit.
limit = None

handler = self.hs.get_room_list_handler()
if server:
data = yield handler.get_remote_public_room_list(
Expand Down
Loading