Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Apr 29, 2020
1 parent 1bcfb18 commit a568346
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 27 deletions.
30 changes: 10 additions & 20 deletions tests/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,23 @@
# limitations under the License.

import logging
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, List, Optional, Tuple

import attr

from twisted.internet.interfaces import IConsumer, IPullProducer, IReactorTime
from twisted.internet.task import LoopingCall
from twisted.web.http import HTTPChannel

from synapse.app.generic_worker import GenericWorkerServer
from synapse.app.generic_worker import (
GenericWorkerReplicationHandler,
GenericWorkerServer,
)
from synapse.http.site import SynapseRequest
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.tcp.client import ReplicationDataHandler
from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.server import HomeServer
from synapse.util import Clock

from tests import unittest
Expand Down Expand Up @@ -77,7 +79,7 @@ def prepare(self, reactor, clock, hs):
self._server_transport = None

def _build_replication_data_handler(self):
return TestReplicationDataHandler(self.worker_hs.get_datastore())
return TestReplicationDataHandler(self.worker_hs)

def reconnect(self):
if self._client_transport:
Expand Down Expand Up @@ -172,32 +174,20 @@ def assert_request_is_get_repl_stream_updates(
self.assertEqual(request.method, b"GET")


class TestReplicationDataHandler(ReplicationDataHandler):
class TestReplicationDataHandler(GenericWorkerReplicationHandler):
"""Drop-in for ReplicationDataHandler which just collects RDATA rows"""

def __init__(self, store: BaseSlavedStore):
super().__init__(store)

# streams to subscribe to: map from stream id to position
self.stream_positions = {} # type: Dict[str, int]
def __init__(self, hs: HomeServer):
super().__init__(hs)

# list of received (stream_name, token, row) tuples
self.received_rdata_rows = [] # type: List[Tuple[str, int, Any]]

def get_streams_to_replicate(self):
return self.stream_positions

async def on_rdata(self, stream_name, token, rows):
await super().on_rdata(stream_name, token, rows)
for r in rows:
self.received_rdata_rows.append((stream_name, token, r))

if (
stream_name in self.stream_positions
and token > self.stream_positions[stream_name]
):
self.stream_positions[stream_name] = token


@attr.s()
class OneShotRequestFactory:
Expand Down
1 change: 0 additions & 1 deletion tests/replication/tcp/streams/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def prepare(self, reactor, clock, hs):
self.user_tok = self.login("u1", "pass")

self.reconnect()
self.test_handler.stream_positions["events"] = 0

self.room_id = self.helper.create_room_as(tok=self.user_tok)
self.test_handler.received_rdata_rows.clear()
Expand Down
3 changes: 0 additions & 3 deletions tests/replication/tcp/streams/test_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ def _build_replication_data_handler(self):
def test_receipt(self):
self.reconnect()

# make the client subscribe to the receipts stream
self.test_handler.stream_positions.update({"receipts": 0})

# tell the master to send a new receipt
self.get_success(
self.hs.get_datastore().insert_receipt(
Expand Down
3 changes: 0 additions & 3 deletions tests/replication/tcp/streams/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ def test_typing(self):

self.reconnect()

# make the client subscribe to the typing stream
self.test_handler.stream_positions.update({"typing": 0})

typing._push_update(member=RoomMember(room_id, USER_ID), typing=True)

self.reactor.advance(0)
Expand Down

0 comments on commit a568346

Please sign in to comment.