From 2de676cee38ac24df0b5d070508e4708c255258d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Sep 2020 17:23:45 +0100 Subject: [PATCH 1/7] Allow HTTP replication between workers in tests --- tests/replication/_base.py | 62 ++++++++++++++++++++++++++++++-------- tests/unittest.py | 2 +- 2 files changed, 50 insertions(+), 14 deletions(-) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index ae60874ec3c2..2f334690e30b 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -27,7 +27,7 @@ GenericWorkerServer, ) from synapse.http.server import JsonResource -from synapse.http.site import SynapseRequest +from synapse.http.site import SynapseRequest, SynapseSite from synapse.replication.http import ReplicationRestResource, streams from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol @@ -202,14 +202,20 @@ def setUp(self): self.reactor.lookups["testserv"] = "1.2.3.4" - self._worker_hs_to_resource = {} + # A map from a HS instance to the associated HTTP Site to use for + # handling inbound HTTP requests to that instance. + self._hs_to_site = {self.hs: self.site} - # When we see a connection attempt to the master replication listener we - # automatically set up the connection. This is so that tests don't + # When we see a connection attempt to the replication listener on a HS + # we automatically set up the connection. This is so that tests don't # manually have to go and explicitly set it up each time (plus sometimes # it is impossible to write the handling explicitly in the tests). + # + # This sets registers the master replication listener: self.reactor.add_tcp_client_callback( - "1.2.3.4", 8765, self._handle_http_replication_attempt + "1.2.3.4", + 8765, + lambda: self._handle_http_replication_attempt(self.hs, 8765), ) def create_test_json_resource(self): @@ -253,9 +259,31 @@ def make_worker_hs( **kwargs ) + # If the instance is in the `instance_map` config then workers may try + # and send HTTP requests to it, so we register it with + # `_handle_http_replication_attempt` like we do with the master HS. + instance_name = worker_hs.get_instance_name() + instance_loc = worker_hs.config.worker.instance_map.get(instance_name) + if instance_loc: + # Ensure the host is one that has a fake DNS entry. + if instance_loc.host not in self.reactor.lookups: + raise Exception( + "Host does not have an IP for instance_map[%r].host = %r" + % (instance_name, instance_loc.host,) + ) + + self.reactor.add_tcp_client_callback( + self.reactor.lookups[instance_loc.host], + instance_loc.port, + lambda: self._handle_http_replication_attempt( + worker_hs, instance_loc.port + ), + ) + store = worker_hs.get_datastore() store.db_pool._db_pool = self.database_pool._db_pool + # Set up TCP replication between master and the new worker. repl_handler = ReplicationCommandHandler(worker_hs) client = ClientReplicationStreamProtocol( worker_hs, "client", "test", self.clock, repl_handler, @@ -269,12 +297,20 @@ def make_worker_hs( server.makeConnection(server_transport) # Set up a resource for the worker - resource = ReplicationRestResource(self.hs) + resource = ReplicationRestResource(worker_hs) for servlet in self.servlets: servlet(worker_hs, resource) - self._worker_hs_to_resource[worker_hs] = resource + self._hs_to_site[worker_hs] = SynapseSite( + logger_name="synapse.access.http.fake", + site_tag="{}-{}".format( + worker_hs.config.server.server_name, worker_hs.get_instance_name() + ), + config=worker_hs.config.server.listeners[0], + resource=resource, + server_version_string="1", + ) return worker_hs @@ -285,7 +321,7 @@ def _get_worker_hs_config(self) -> dict: return config def render_on_worker(self, worker_hs: HomeServer, request: SynapseRequest): - render(request, self._worker_hs_to_resource[worker_hs], self.reactor) + render(request, self._hs_to_site[worker_hs].resource, self.reactor) def replicate(self): """Tell the master side of replication that something has happened, and then @@ -294,9 +330,9 @@ def replicate(self): self.streamer.on_notifier_poke() self.pump() - def _handle_http_replication_attempt(self): - """Handles a connection attempt to the master replication HTTP - listener. + def _handle_http_replication_attempt(self, hs, repl_port): + """Handles a connection attempt to the given HS replication HTTP + listener on the given port. """ # We should have at least one outbound connection attempt, where the @@ -305,7 +341,7 @@ def _handle_http_replication_attempt(self): self.assertGreaterEqual(len(clients), 1) (host, port, client_factory, _timeout, _bindAddress) = clients.pop() self.assertEqual(host, "1.2.3.4") - self.assertEqual(port, 8765) + self.assertEqual(port, repl_port) # Set up client side protocol client_protocol = client_factory.buildProtocol(None) @@ -315,7 +351,7 @@ def _handle_http_replication_attempt(self): # Set up the server side protocol channel = _PushHTTPChannel(self.reactor) channel.requestFactory = request_factory - channel.site = self.site + channel.site = self._hs_to_site[hs] # Connect client to server and vice versa. client_to_server_transport = FakeTransport( diff --git a/tests/unittest.py b/tests/unittest.py index e654c0442d6c..82ede9de3444 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -241,7 +241,7 @@ def setUp(self): # create a site to wrap the resource. self.site = SynapseSite( logger_name="synapse.access.http.fake", - site_tag="test", + site_tag=self.hs.config.server.server_name, config=self.hs.config.server.listeners[0], resource=self.resource, server_version_string="1", From ff35d07e9201305984ecf273f5ca85e83e5faa9d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Sep 2020 18:45:55 +0100 Subject: [PATCH 2/7] Add redis support to replication tests --- synapse/replication/tcp/handler.py | 6 +- synapse/replication/tcp/redis.py | 41 +++++++ tests/replication/_base.py | 172 ++++++++++++++++++++++++++--- 3 files changed, 203 insertions(+), 16 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index b323841f73cb..e92da7b26378 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -251,10 +251,9 @@ def start_replication(self, hs): using TCP. """ if hs.config.redis.redis_enabled: - import txredisapi - from synapse.replication.tcp.redis import ( RedisDirectTcpReplicationClientFactory, + lazyConnection, ) logger.info( @@ -271,7 +270,8 @@ def start_replication(self, hs): # connection after SUBSCRIBE is called). # First create the connection for sending commands. - outbound_redis_connection = txredisapi.lazyConnection( + outbound_redis_connection = lazyConnection( + reactor=hs.get_reactor(), host=hs.config.redis_host, port=hs.config.redis_port, password=hs.config.redis.redis_password, diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index f225e533de5b..92ecdfebaf69 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -228,3 +228,44 @@ def buildProtocol(self, addr): p.password = self.password return p + + +def lazyConnection( + reactor, + host="localhost", + port=6379, + dbid=None, + reconnect=True, + charset="utf-8", + password=None, + connectTimeout=None, + replyTimeout=None, + convertNumbers=True, +): + """Equivalent to `txredisapi.lazyConnection`, except allows specifying a + reactor. + """ + + isLazy = True + poolsize = 1 + + uuid = "%s:%d" % (host, port) + factory = txredisapi.RedisFactory( + uuid, + dbid, + poolsize, + isLazy, + txredisapi.ConnectionHandler, + charset, + password, + replyTimeout, + convertNumbers, + ) + factory.continueTrying = reconnect + for x in range(poolsize): + reactor.connectTCP(host, port, factory, connectTimeout) + + if isLazy: + return factory.handler + else: + return factory.deferred diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 2f334690e30b..d381e7a94235 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -12,13 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from typing import Any, Callable, List, Optional, Tuple import attr +import hiredis from twisted.internet.interfaces import IConsumer, IPullProducer, IReactorTime +from twisted.internet.protocol import Protocol from twisted.internet.task import LoopingCall from twisted.web.http import HTTPChannel @@ -197,17 +198,29 @@ def setUp(self): self.server_factory = ReplicationStreamProtocolFactory(self.hs) self.streamer = self.hs.get_replication_streamer() + # Fake in memory Redis server that servers can connect to. + self._redis_server = FakeRedisPubSubServer() + store = self.hs.get_datastore() self.database_pool = store.db_pool self.reactor.lookups["testserv"] = "1.2.3.4" + self.reactor.lookups["localhost"] = "127.0.0.1" # A map from a HS instance to the associated HTTP Site to use for # handling inbound HTTP requests to that instance. self._hs_to_site = {self.hs: self.site} - # When we see a connection attempt to the replication listener on a HS - # we automatically set up the connection. This is so that tests don't + if self.hs.config.redis.redis_enabled: + # Handle attempts to connect to fake redis server. + self.reactor.add_tcp_client_callback( + "localhost", 6379, self.connect_any_redis_attempts, + ) + + self.hs.get_tcp_replication().start_replication(self.hs) + + # When we see a connection attempt to the master replication listener we + # automatically set up the connection. This is so that tests don't # manually have to go and explicitly set it up each time (plus sometimes # it is impossible to write the handling explicitly in the tests). # @@ -283,18 +296,20 @@ def make_worker_hs( store = worker_hs.get_datastore() store.db_pool._db_pool = self.database_pool._db_pool - # Set up TCP replication between master and the new worker. - repl_handler = ReplicationCommandHandler(worker_hs) - client = ClientReplicationStreamProtocol( - worker_hs, "client", "test", self.clock, repl_handler, - ) - server = self.server_factory.buildProtocol(None) + # Set up TCP replication between master and the new worker if we don't + # have Redis support enabled. + if not worker_hs.config.redis_enabled: + repl_handler = ReplicationCommandHandler(worker_hs) + client = ClientReplicationStreamProtocol( + worker_hs, "client", "test", self.clock, repl_handler, + ) + server = self.server_factory.buildProtocol(None) - client_transport = FakeTransport(server, self.reactor) - client.makeConnection(client_transport) + client_transport = FakeTransport(server, self.reactor) + client.makeConnection(client_transport) - server_transport = FakeTransport(client, self.reactor) - server.makeConnection(server_transport) + server_transport = FakeTransport(client, self.reactor) + server.makeConnection(server_transport) # Set up a resource for the worker resource = ReplicationRestResource(worker_hs) @@ -312,6 +327,9 @@ def make_worker_hs( server_version_string="1", ) + if worker_hs.config.redis.redis_enabled: + worker_hs.get_tcp_replication().start_replication(worker_hs) + return worker_hs def _get_worker_hs_config(self) -> dict: @@ -369,6 +387,32 @@ def _handle_http_replication_attempt(self, hs, repl_port): # inside `connecTCP` before the connection has been passed back to the # code that requested the TCP connection. + def connect_any_redis_attempts(self): + """If redis is enabled we need to deal with workers connecting to a + redis server. We don't want to use a real Redis server so we use a + fake one. + """ + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) + self.assertEqual(host, "localhost") + self.assertEqual(port, 6379) + + client_protocol = client_factory.buildProtocol(None) + server_protocol = self._redis_server.buildProtocol(None) + + client_to_server_transport = FakeTransport( + server_protocol, self.reactor, client_protocol + ) + client_protocol.makeConnection(client_to_server_transport) + + server_to_client_transport = FakeTransport( + client_protocol, self.reactor, server_protocol + ) + server_protocol.makeConnection(server_to_client_transport) + + return client_to_server_transport, server_to_client_transport + class TestReplicationDataHandler(GenericWorkerReplicationHandler): """Drop-in for ReplicationDataHandler which just collects RDATA rows""" @@ -503,3 +547,105 @@ def _run_once(self): pass self.stopProducing() + + +class FakeRedisPubSubServer: + """A fake Redis server for pub/sub. + """ + + def __init__(self): + self._subscribers = set() + + def add_subscriber(self, conn): + """A connection has called SUBSCRIBE + """ + self._subscribers.add(conn) + + def remove_subscriber(self, conn): + """A connection has called UNSUBSCRIBE + """ + self._subscribers.discard(conn) + + def publish(self, conn, channel, msg) -> int: + """A connection want to publish a message to subscribers. + """ + for sub in self._subscribers: + sub.send(["message", channel, msg]) + + return len(self._subscribers) + + def buildProtocol(self, addr): + return FakeRedisPubSubProtocol(self) + + +class FakeRedisPubSubProtocol(Protocol): + """A connection from a client talking to the fake Redis server. + """ + + def __init__(self, server: FakeRedisPubSubServer): + self._server = server + self._reader = hiredis.Reader() + + def dataReceived(self, data): + self._reader.feed(data) + + # We might get multiple messages in one packet. + while True: + msg = self._reader.gets() + + if msg is False: + # No more messages. + return + + if not isinstance(msg, list): + # Inbound commands should always be a list + raise Exception("Expected redis list") + + self.handle_command(msg[0], *msg[1:]) + + def handle_command(self, command, *args): + """Received a Redis command from the client. + """ + + # We currently only support pub/sub. + if command == b"PUBLISH": + channel, message = args + num_subscribers = self._server.publish(self, channel, message) + self.send(num_subscribers) + elif command == b"SUBSCRIBE": + (channel,) = args + self._server.add_subscriber(self) + self.send(["subscribe", channel, 1]) + else: + raise Exception("Unknown command") + + def send(self, msg): + """Send a message back to the client. + """ + raw = self.encode(msg).encode("utf-8") + + self.transport.write(raw) + self.transport.flush() + + def encode(self, obj): + """Encode an object to its Redis format. + + Supports: strings/bytes, integers and list/tuples. + """ + + if isinstance(obj, bytes): + # We assume bytes are just unicode strings. + obj = obj.decode("utf-8") + + if isinstance(obj, str): + return "${len}\r\n{str}\r\n".format(len=len(obj), str=obj) + if isinstance(obj, int): + return ":{val}\r\n".format(val=obj) + if isinstance(obj, (list, tuple)): + items = "".join(self.encode(a) for a in obj) + return "*{len}\r\n{items}".format(len=len(obj), items=items) + + raise Exception("Unrecognized type for encoding redis: %r: %r", type(obj), obj) + + def connectionList(self, reason): + self._server.remove_subscriber(self) From 3302cb4e5b06c87084c3fa721656dbe8c274ee05 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Sep 2020 18:48:08 +0100 Subject: [PATCH 3/7] Add basic sharded worker tests --- .../test_sharded_event_persister.py | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 tests/replication/test_sharded_event_persister.py diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py new file mode 100644 index 000000000000..77e3f32c4e3c --- /dev/null +++ b/tests/replication/test_sharded_event_persister.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from synapse.rest import admin +from synapse.rest.client.v1 import login, room + +from tests.replication._base import BaseMultiWorkerStreamTestCase + +logger = logging.getLogger(__name__) + + +class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase): + """Checks event persisting sharding works + """ + + servlets = [ + admin.register_servlets_for_client_rest_resource, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + # Register a user who sends a message that we'll get notified about + self.other_user_id = self.register_user("otheruser", "pass") + self.other_access_token = self.login("otheruser", "pass") + + def default_config(self): + conf = super().default_config() + conf["redis"] = {"enabled": "true"} + conf["stream_writers"] = {"events": ["worker1", "worker2"]} + conf["instance_map"] = { + "worker1": {"host": "testserv", "port": 1001}, + "worker2": {"host": "testserv", "port": 1002}, + } + return conf + + def test_basic(self): + """Simple test that rooms can be created and joined when there are + multiple event persisters. + """ + + self.make_worker_hs( + "synapse.app.generic_worker", {"worker_name": "worker1"}, + ) + + self.make_worker_hs( + "synapse.app.generic_worker", {"worker_name": "worker2"}, + ) + + user_id = self.register_user("user", "pass") + access_token = self.login("user", "pass") + + # Create a room + room = self.helper.create_room_as(user_id, tok=access_token) + + # The other user joins + self.helper.join( + room=room, user=self.other_user_id, tok=self.other_access_token + ) + + # The other user sends some messages + self.helper.send(room, body="Hi!", tok=self.other_access_token) From 522a4e9d235201c8b183a5ff812ba10299871860 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Oct 2020 10:48:41 +0100 Subject: [PATCH 4/7] Newsfile --- changelog.d/8433.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8433.misc diff --git a/changelog.d/8433.misc b/changelog.d/8433.misc new file mode 100644 index 000000000000..05f8b5bbf41e --- /dev/null +++ b/changelog.d/8433.misc @@ -0,0 +1 @@ +Add unit test for event persister sharding. From 6a0da53685b82377817eda456925e4d008b922af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Oct 2020 10:57:34 +0100 Subject: [PATCH 5/7] Fix typing --- mypy.ini | 3 +++ stubs/txredisapi.pyi | 20 +++++++++++++++++++- synapse/replication/tcp/redis.py | 27 ++++++++++++--------------- 3 files changed, 34 insertions(+), 16 deletions(-) diff --git a/mypy.ini b/mypy.ini index 79867814328b..501ec72ed31a 100644 --- a/mypy.ini +++ b/mypy.ini @@ -142,3 +142,6 @@ ignore_missing_imports = True [mypy-nacl.*] ignore_missing_imports = True + +[mypy-hiredis] +ignore_missing_imports = True diff --git a/stubs/txredisapi.pyi b/stubs/txredisapi.pyi index c66413f003cd..522244bb57f7 100644 --- a/stubs/txredisapi.pyi +++ b/stubs/txredisapi.pyi @@ -16,7 +16,7 @@ """Contains *incomplete* type hints for txredisapi. """ -from typing import List, Optional, Union +from typing import List, Optional, Union, Type class RedisProtocol: def publish(self, channel: str, message: bytes): ... @@ -42,3 +42,21 @@ def lazyConnection( class SubscriberFactory: def buildProtocol(self, addr): ... + +class ConnectionHandler: ... + +class RedisFactory: + continueTrying: bool + handler: RedisProtocol + def __init__( + self, + uuid: str, + dbid: Optional[int], + poolsize: int, + isLazy: bool = False, + handler: Type = ConnectionHandler, + charset: str = "utf-8", + password: Optional[str] = None, + replyTimeout: Optional[int] = None, + convertNumbers: Optional[int] = True, + ): ... diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 92ecdfebaf69..de19705c1f41 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -15,7 +15,7 @@ import logging from inspect import isawaitable -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional import txredisapi @@ -232,16 +232,16 @@ def buildProtocol(self, addr): def lazyConnection( reactor, - host="localhost", - port=6379, - dbid=None, - reconnect=True, - charset="utf-8", - password=None, - connectTimeout=None, - replyTimeout=None, - convertNumbers=True, -): + host: str = "localhost", + port: int = 6379, + dbid: Optional[int] = None, + reconnect: bool = True, + charset: str = "utf-8", + password: Optional[str] = None, + connectTimeout: Optional[int] = None, + replyTimeout: Optional[int] = None, + convertNumbers: bool = True, +) -> txredisapi.RedisProtocol: """Equivalent to `txredisapi.lazyConnection`, except allows specifying a reactor. """ @@ -265,7 +265,4 @@ def lazyConnection( for x in range(poolsize): reactor.connectTCP(host, port, factory, connectTimeout) - if isLazy: - return factory.handler - else: - return factory.deferred + return factory.handler From 8c78cdfc1bf9b20fa82d7870c24795d5ebf2c741 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Oct 2020 18:57:17 +0100 Subject: [PATCH 6/7] Update tests/replication/_base.py Co-authored-by: Patrick Cloke --- tests/replication/_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index d381e7a94235..81ea985b9f43 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -224,7 +224,7 @@ def setUp(self): # manually have to go and explicitly set it up each time (plus sometimes # it is impossible to write the handling explicitly in the tests). # - # This sets registers the master replication listener: + # Register the master replication listener: self.reactor.add_tcp_client_callback( "1.2.3.4", 8765, @@ -647,5 +647,5 @@ def encode(self, obj): raise Exception("Unrecognized type for encoding redis: %r: %r", type(obj), obj) - def connectionList(self, reason): + def connectionLost(self, reason): self._server.remove_subscriber(self) From 49323eecd26d570c0955d263d2e775783cfa6943 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Oct 2020 20:09:36 +0100 Subject: [PATCH 7/7] Actually test that event persistence is sharded --- .../test_sharded_event_persister.py | 47 +++++++++++++++---- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py index 77e3f32c4e3c..6068d1490538 100644 --- a/tests/replication/test_sharded_event_persister.py +++ b/tests/replication/test_sharded_event_persister.py @@ -18,6 +18,7 @@ from synapse.rest.client.v1 import login, room from tests.replication._base import BaseMultiWorkerStreamTestCase +from tests.utils import USE_POSTGRES_FOR_TESTS logger = logging.getLogger(__name__) @@ -26,6 +27,11 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase): """Checks event persisting sharding works """ + # Event persister sharding requires postgres (due to needing + # `MutliWriterIdGenerator`). + if not USE_POSTGRES_FOR_TESTS: + skip = "Requires Postgres" + servlets = [ admin.register_servlets_for_client_rest_resource, room.register_servlets, @@ -48,8 +54,8 @@ def default_config(self): return conf def test_basic(self): - """Simple test that rooms can be created and joined when there are - multiple event persisters. + """Simple test to ensure that multiple rooms can be created and joined, + and that different rooms get handled by different instances. """ self.make_worker_hs( @@ -60,16 +66,37 @@ def test_basic(self): "synapse.app.generic_worker", {"worker_name": "worker2"}, ) + persisted_on_1 = False + persisted_on_2 = False + + store = self.hs.get_datastore() + user_id = self.register_user("user", "pass") access_token = self.login("user", "pass") - # Create a room - room = self.helper.create_room_as(user_id, tok=access_token) + # Keep making new rooms until we see rooms being persisted on both + # workers. + for _ in range(10): + # Create a room + room = self.helper.create_room_as(user_id, tok=access_token) - # The other user joins - self.helper.join( - room=room, user=self.other_user_id, tok=self.other_access_token - ) + # The other user joins + self.helper.join( + room=room, user=self.other_user_id, tok=self.other_access_token + ) + + # The other user sends some messages + rseponse = self.helper.send(room, body="Hi!", tok=self.other_access_token) + event_id = rseponse["event_id"] + + # The event position includes which instance persisted the event. + pos = self.get_success(store.get_position_for_event(event_id)) + + persisted_on_1 |= pos.instance_name == "worker1" + persisted_on_2 |= pos.instance_name == "worker2" + + if persisted_on_1 and persisted_on_2: + break - # The other user sends some messages - self.helper.send(room, body="Hi!", tok=self.other_access_token) + self.assertTrue(persisted_on_1) + self.assertTrue(persisted_on_2)