Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Update code to refer to "workers" #15606

Merged
merged 7 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15606.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update internal terminology for workers.
6 changes: 0 additions & 6 deletions docs/replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ minimal.

See [the TCP replication documentation](tcp_replication.md).

### The Slaved DataStore

There are read-only version of the synapse storage layer in
`synapse/replication/slave/storage` that use the response of the
replication API to invalidate their caches.

### The TCP Replication Module
Information about how the tcp replication module is structured, including how
the classes interact, can be found in
Expand Down
4 changes: 2 additions & 2 deletions synapse/app/admin_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
logger = logging.getLogger("synapse.app.admin_cmd")


class AdminCmdSlavedStore(
class AdminCmdStore(
FilteringWorkerStore,
ClientIpWorkerStore,
DeviceWorkerStore,
Expand Down Expand Up @@ -103,7 +103,7 @@ def __init__(


class AdminCmdServer(HomeServer):
DATASTORE_CLASS = AdminCmdSlavedStore # type: ignore
DATASTORE_CLASS = AdminCmdStore # type: ignore


async def export_data_command(hs: HomeServer, args: argparse.Namespace) -> None:
Expand Down
4 changes: 2 additions & 2 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
logger = logging.getLogger("synapse.app.generic_worker")


class GenericWorkerSlavedStore(
class GenericWorkerStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
UserDirectoryStore,
Expand Down Expand Up @@ -154,7 +154,7 @@ class GenericWorkerSlavedStore(


class GenericWorkerServer(HomeServer):
DATASTORE_CLASS = GenericWorkerSlavedStore # type: ignore
DATASTORE_CLASS = GenericWorkerStore # type: ignore

def _listen_http(self, listener_config: ListenerConfig) -> None:
assert listener_config.http_options is not None
Expand Down
6 changes: 2 additions & 4 deletions synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@
from synapse.util.frozenutils import freeze

if TYPE_CHECKING:
from synapse.app.generic_worker import GenericWorkerSlavedStore
from synapse.app.generic_worker import GenericWorkerStore
from synapse.server import HomeServer


Expand Down Expand Up @@ -237,9 +237,7 @@ def __init__(self, hs: "HomeServer", auth_handler: AuthHandler) -> None:

# TODO: Fix this type hint once the types for the data stores have been ironed
# out.
self._store: Union[
DataStore, "GenericWorkerSlavedStore"
] = hs.get_datastores().main
self._store: Union[DataStore, "GenericWorkerStore"] = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._auth = hs.get_auth()
self._auth_handler = auth_handler
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
class ReplicationDataHandler:
"""Handles incoming stream updates from replication.

This instance notifies the slave data store about updates. Can be subclassed
This instance notifies the data store about updates. Can be subclassed
to handle updates in additional ways.
"""

Expand Down Expand Up @@ -91,7 +91,7 @@ async def on_rdata(
) -> None:
"""Called to handle a batch of replication data with a given stream token.

By default this just pokes the slave store. Can be overridden in subclasses to
By default, this just pokes the data store. Can be overridden in subclasses to
handle more.

Args:
Expand Down
7 changes: 2 additions & 5 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,10 @@ def __init__(
writers=hs.config.worker.writers.account_data,
)
else:
# Multiple writers are not supported for SQLite.
#
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
#
# If this process is the writer than we need to use
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
self._account_data_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
Expand Down
14 changes: 7 additions & 7 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,11 @@ def _invalidate_caches_for_event(
async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
) -> None:
"""Invalidates the cache and adds it to the cache stream so slaves
"""Invalidates the cache and adds it to the cache stream so other workers
will know to invalidate their caches.

This should only be used to invalidate caches where slaves won't
otherwise know from other replication streams that the cache should
This should only be used to invalidate caches where other workers won't
otherwise have known from other replication streams that the cache should
be invalidated.
"""
cache_func = getattr(self, cache_name, None)
Expand All @@ -297,11 +297,11 @@ def _invalidate_cache_and_stream(
cache_func: CachedFunction,
keys: Tuple[Any, ...],
) -> None:
"""Invalidates the cache and adds it to the cache stream so slaves
"""Invalidates the cache and adds it to the cache stream so other workers
will know to invalidate their caches.

This should only be used to invalidate caches where slaves won't
otherwise know from other replication streams that the cache should
This should only be used to invalidate caches where other workers won't
otherwise have known from other replication streams that the cache should
be invalidated.
"""
txn.call_after(cache_func.invalidate, keys)
Expand All @@ -310,7 +310,7 @@ def _invalidate_cache_and_stream(
def _invalidate_all_cache_and_stream(
self, txn: LoggingTransaction, cache_func: CachedFunction
) -> None:
"""Invalidates the entire cache and adds it to the cache stream so slaves
"""Invalidates the entire cache and adds it to the cache stream so other workers
will know to invalidate their caches.
"""

Expand Down
2 changes: 0 additions & 2 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ def __init__(
is_writer=hs.config.worker.worker_app is None,
)

# Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a
# StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker).
device_list_max = self._device_list_id_gen.get_current_token()
device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
db_conn,
Expand Down
7 changes: 2 additions & 5 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,10 @@ def __init__(
writers=hs.config.worker.writers.events,
)
else:
# Multiple writers are not supported for SQLite.
#
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
#
# If this process is the writer than we need to use
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
self._stream_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
Expand Down
7 changes: 2 additions & 5 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,10 @@ def __init__(
else:
self._can_write_to_receipts = True

# Multiple writers are not supported for SQLite.
#
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
#
# If this process is the writer than we need to use
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
self._receipts_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/schema/main/delta/34/cache_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
logger = logging.getLogger(__name__)


# This stream is used to notify replication slaves that some caches have
# This stream is used to notify workers over replication that some caches have
# been invalidated that they cannot infer from the other streams.
CREATE_TABLE = """
CREATE TABLE cache_invalidation_stream (
Expand Down
2 changes: 1 addition & 1 deletion tests/app/test_openid_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:

def default_config(self) -> JsonDict:
conf = super().default_config()
# we're using FederationReaderServer, which uses a SlavedStore, so we
# we're using GenericWorkerServer, which uses a GenericWorkerStore, so we
# have to tell the FederationHandler not to try to access stuff that is only
# in the primary store.
conf["worker_app"] = "yes"
Expand Down
13 changes: 0 additions & 13 deletions tests/replication/slave/storage/__init__.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from tests.replication._base import BaseStreamTestCase


class BaseSlavedStoreTestCase(BaseStreamTestCase):
class BaseWorkerStoreTestCase(BaseStreamTestCase):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
return self.setup_test_homeserver(federation_client=Mock())

Expand All @@ -34,7 +34,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.reconnect()

self.master_store = hs.get_datastores().main
self.slaved_store = self.worker_hs.get_datastores().main
self.worker_store = self.worker_hs.get_datastores().main
persistence = hs.get_storage_controllers().persistence
assert persistence is not None
self.persistance = persistence
Expand All @@ -50,7 +50,7 @@ def check(
self, method: str, args: Iterable[Any], expected_result: Optional[Any] = None
) -> None:
master_result = self.get_success(getattr(self.master_store, method)(*args))
slaved_result = self.get_success(getattr(self.slaved_store, method)(*args))
worker_result = self.get_success(getattr(self.worker_store, method)(*args))
if expected_result is not None:
self.assertEqual(
master_result,
Expand All @@ -59,14 +59,14 @@ def check(
% (expected_result, master_result),
)
self.assertEqual(
slaved_result,
worker_result,
expected_result,
"Expected slave result to be %r but was %r"
% (expected_result, slaved_result),
"Expected worker result to be %r but was %r"
% (expected_result, worker_result),
)
self.assertEqual(
master_result,
slaved_result,
"Slave result %r does not match master result %r"
% (slaved_result, master_result),
worker_result,
"Worker result %r does not match master result %r"
% (worker_result, master_result),
)
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

from tests.server import FakeTransport

from ._base import BaseSlavedStoreTestCase
from ._base import BaseWorkerStoreTestCase

USER_ID = "@feeling:test"
USER_ID_2 = "@bright:test"
Expand All @@ -63,7 +63,7 @@ def unpatch() -> None:
return unpatch


class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase):
class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
STORE_TYPE = EventsWorkerStore

def setUp(self) -> None:
Expand Down Expand Up @@ -294,7 +294,7 @@ def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(
assert j2.internal_metadata.stream_ordering is not None

event_source = RoomEventSource(self.hs)
event_source.store = self.slaved_store
event_source.store = self.worker_store
current_token = event_source.get_current_key()

# gradually stream out the replication
Expand All @@ -310,12 +310,12 @@ def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(
#
# First, we get a list of the rooms we are joined to
joined_rooms = self.get_success(
self.slaved_store.get_rooms_for_user_with_stream_ordering(USER_ID_2)
self.worker_store.get_rooms_for_user_with_stream_ordering(USER_ID_2)
)

# Then, we get a list of the events since the last sync
membership_changes = self.get_success(
self.slaved_store.get_membership_changes_for_user(
self.worker_store.get_membership_changes_for_user(
USER_ID_2, prev_token, current_token
)
)
Expand Down