Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove the unused public_room_list_stream #10565

Merged
merged 11 commits into from
Aug 17, 2021
1 change: 1 addition & 0 deletions changelog.d/10565.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove the unused public rooms replication stream.
22 changes: 4 additions & 18 deletions synapse/replication/slave/storage/room.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2015-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,26 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.replication.tcp.streams import PublicRoomsStream
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.room import RoomWorkerStore

from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker


class RoomStore(RoomWorkerStore, BaseSlavedStore):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should just directly use RoomWorkerStore in generic_worker. We seem to do a mixture though so I wouldn't block on that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do import the *WorkerStore variant in GenericWorkerSlavedStore in some cases, so doing so here probably isn't too contentious.

Where is this mixture you mentioned?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this mixture you mentioned?

See where we directly use non-worker stores:

from synapse.storage.databases.main.censor_events import CensorEventsStore
from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore
from synapse.storage.databases.main.lock import LockStore
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
from synapse.storage.databases.main.metrics import ServerMetricsStore
from synapse.storage.databases.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.databases.main.presence import PresenceStore
from synapse.storage.databases.main.search import SearchStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, OK. I've removed the class for now as it's probably best to not keep it around unless needed.

Tests pass, though it probably warrants a double-check to make sure I did the right thing :)

def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
self._public_room_id_gen = SlavedIdTracker(
db_conn, "public_room_list_stream", "stream_id"
)

def get_current_public_room_stream_id(self):
return self._public_room_id_gen.get_current_token()

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == PublicRoomsStream.NAME:
self._public_room_id_gen.advance(instance_name, token)

return super().process_replication_rows(stream_name, instance_name, token, rows)
"""This class is used to tie together worker storage methods
and those that can only be called from the main process.
"""
3 changes: 0 additions & 3 deletions synapse/replication/tcp/streams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
GroupServerStream,
PresenceFederationStream,
PresenceStream,
PublicRoomsStream,
PushersStream,
PushRulesStream,
ReceiptsStream,
Expand All @@ -57,7 +56,6 @@
PushRulesStream,
PushersStream,
CachesStream,
PublicRoomsStream,
DeviceListsStream,
ToDeviceStream,
FederationStream,
Expand All @@ -79,7 +77,6 @@
"PushRulesStream",
"PushersStream",
"CachesStream",
"PublicRoomsStream",
"DeviceListsStream",
"ToDeviceStream",
"TagAccountDataStream",
Expand Down
25 changes: 0 additions & 25 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,31 +447,6 @@ def __init__(self, hs):
)


class PublicRoomsStream(Stream):
"""The public rooms list changed"""

PublicRoomsStreamRow = namedtuple(
"PublicRoomsStreamRow",
(
"room_id", # str
"visibility", # str
"appservice_id", # str, optional
"network_id", # str, optional
),
)

NAME = "public_rooms"
ROW_TYPE = PublicRoomsStreamRow

def __init__(self, hs):
store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_current_public_room_stream_id),
store.get_all_new_public_rooms,
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
)


class DeviceListsStream(Stream):
"""Either a user has updated their devices or a remote server needs to be
told about a device update.
Expand Down
4 changes: 1 addition & 3 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._clock = hs.get_clock()
self.database_engine = database.engine

self._public_room_id_gen = StreamIdGenerator(
db_conn, "public_room_list_stream", "stream_id"
)
self._device_list_id_gen = StreamIdGenerator(
db_conn,
"device_lists_stream",
Expand Down Expand Up @@ -170,6 +167,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):
sequence_name="cache_invalidation_stream_seq",
writers=[],
)

else:
self._cache_id_gen = None

Expand Down
201 changes: 30 additions & 171 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,55 +890,6 @@ def _quarantine_media_txn(

return total_media_quarantined

async def get_all_new_public_rooms(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
"""Get updates for public rooms replication stream.

Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.

Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.

The token returned can be used in a subsequent call to this
function to get further updatees.

The updates are a list of 2-tuples of stream ID and the row data
"""
if last_id == current_id:
return [], current_id, False

def get_all_new_public_rooms(txn):
sql = """
SELECT stream_id, room_id, visibility, appservice_id, network_id
FROM public_room_list_stream
WHERE stream_id > ? AND stream_id <= ?
ORDER BY stream_id ASC
LIMIT ?
"""

txn.execute(sql, (last_id, current_id, limit))
updates = [(row[0], row[1:]) for row in txn]
limited = False
upto_token = current_id
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True

return updates, upto_token, limited

return await self.db_pool.runInteraction(
"get_all_new_public_rooms", get_all_new_public_rooms
)

async def get_rooms_for_retention_period_in_range(
self, min_ms: Optional[int], max_ms: Optional[int], include_null: bool = False
) -> Dict[str, dict]:
Expand Down Expand Up @@ -1410,34 +1361,17 @@ async def store_room(
StoreError if the room could not be stored.
"""
try:

def store_room_txn(txn, next_id):
self.db_pool.simple_insert_txn(
txn,
"rooms",
{
"room_id": room_id,
"creator": room_creator_user_id,
"is_public": is_public,
"room_version": room_version.identifier,
"has_auth_chain_index": True,
},
)
if is_public:
self.db_pool.simple_insert_txn(
txn,
table="public_room_list_stream",
values={
"stream_id": next_id,
"room_id": room_id,
"visibility": is_public,
},
)

async with self._public_room_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"store_room_txn", store_room_txn, next_id
)
await self.db_pool.simple_insert(
"rooms",
{
"room_id": room_id,
"creator": room_creator_user_id,
"is_public": is_public,
"room_version": room_version.identifier,
"has_auth_chain_index": True,
},
desc="store_room",
)
except Exception as e:
logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.")
Expand Down Expand Up @@ -1471,48 +1405,13 @@ async def maybe_store_room_on_outlier_membership(
)

async def set_room_is_public(self, room_id, is_public):
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
def set_room_is_public_txn(txn, next_id):
self.db_pool.simple_update_one_txn(
txn,
table="rooms",
keyvalues={"room_id": room_id},
updatevalues={"is_public": is_public},
)

entries = self.db_pool.simple_select_list_txn(
txn,
table="public_room_list_stream",
keyvalues={
"room_id": room_id,
"appservice_id": None,
"network_id": None,
},
retcols=("stream_id", "visibility"),
)

entries.sort(key=lambda r: r["stream_id"])

add_to_stream = True
if entries:
add_to_stream = bool(entries[-1]["visibility"]) != is_public

if add_to_stream:
self.db_pool.simple_insert_txn(
txn,
table="public_room_list_stream",
values={
"stream_id": next_id,
"room_id": room_id,
"visibility": is_public,
"appservice_id": None,
"network_id": None,
},
)
await self.db_pool.simple_update_one(
table="rooms",
keyvalues={"room_id": room_id},
updatevalues={"is_public": is_public},
desc="set_room_is_public",
)

async with self._public_room_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"set_room_is_public", set_room_is_public_txn, next_id
)
self.hs.get_notifier().on_new_replication_data()

async def set_room_is_public_appservice(
Expand All @@ -1533,68 +1432,31 @@ async def set_room_is_public_appservice(
list.
"""

def set_room_is_public_appservice_txn(txn, next_id):
if is_public:
try:
self.db_pool.simple_insert_txn(
txn,
table="appservice_room_list",
values={
"appservice_id": appservice_id,
"network_id": network_id,
"room_id": room_id,
},
)
except self.database_engine.module.IntegrityError:
# We've already inserted, nothing to do.
return
else:
self.db_pool.simple_delete_txn(
txn,
if is_public:
try:
await self.db_pool.simple_insert(
table="appservice_room_list",
keyvalues={
values={
"appservice_id": appservice_id,
"network_id": network_id,
"room_id": room_id,
},
desc="set_room_is_public_appservice_true",
)

entries = self.db_pool.simple_select_list_txn(
txn,
table="public_room_list_stream",
except self.database_engine.module.IntegrityError:
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
# We've already inserted, nothing to do.
return
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
else:
await self.db_pool.simple_delete(
table="appservice_room_list",
keyvalues={
"room_id": room_id,
"appservice_id": appservice_id,
"network_id": network_id,
"room_id": room_id,
},
retcols=("stream_id", "visibility"),
desc="set_room_is_public_appservice_false",
)

entries.sort(key=lambda r: r["stream_id"])

add_to_stream = True
if entries:
add_to_stream = bool(entries[-1]["visibility"]) != is_public

if add_to_stream:
self.db_pool.simple_insert_txn(
txn,
table="public_room_list_stream",
values={
"stream_id": next_id,
"room_id": room_id,
"visibility": is_public,
"appservice_id": appservice_id,
"network_id": network_id,
},
)

async with self._public_room_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"set_room_is_public_appservice",
set_room_is_public_appservice_txn,
next_id,
)
self.hs.get_notifier().on_new_replication_data()

async def add_event_report(
Expand Down Expand Up @@ -1787,9 +1649,6 @@ def _get_event_reports_paginate_txn(txn):
"get_event_reports_paginate", _get_event_reports_paginate_txn
)

def get_current_public_room_stream_id(self):
return self._public_room_id_gen.get_current_token()

async def block_room(self, room_id: str, user_id: str) -> None:
"""Marks the room as blocked. Can be called multiple times.

Expand Down