Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add unit test for event persister sharding (#8433)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored Oct 2, 2020
1 parent 05ee048 commit 6c5d5e5
Show file tree
Hide file tree
Showing 8 changed files with 371 additions and 27 deletions.
1 change: 1 addition & 0 deletions changelog.d/8433.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add unit test for event persister sharding.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,6 @@ ignore_missing_imports = True

[mypy-nacl.*]
ignore_missing_imports = True

[mypy-hiredis]
ignore_missing_imports = True
20 changes: 19 additions & 1 deletion stubs/txredisapi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"""Contains *incomplete* type hints for txredisapi.
"""

from typing import List, Optional, Union
from typing import List, Optional, Union, Type

class RedisProtocol:
def publish(self, channel: str, message: bytes): ...
Expand All @@ -42,3 +42,21 @@ def lazyConnection(

class SubscriberFactory:
def buildProtocol(self, addr): ...

class ConnectionHandler: ...

class RedisFactory:
continueTrying: bool
handler: RedisProtocol
def __init__(
self,
uuid: str,
dbid: Optional[int],
poolsize: int,
isLazy: bool = False,
handler: Type = ConnectionHandler,
charset: str = "utf-8",
password: Optional[str] = None,
replyTimeout: Optional[int] = None,
convertNumbers: Optional[int] = True,
): ...
6 changes: 3 additions & 3 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,9 @@ def start_replication(self, hs):
using TCP.
"""
if hs.config.redis.redis_enabled:
import txredisapi

from synapse.replication.tcp.redis import (
RedisDirectTcpReplicationClientFactory,
lazyConnection,
)

logger.info(
Expand All @@ -271,7 +270,8 @@ def start_replication(self, hs):
# connection after SUBSCRIBE is called).

# First create the connection for sending commands.
outbound_redis_connection = txredisapi.lazyConnection(
outbound_redis_connection = lazyConnection(
reactor=hs.get_reactor(),
host=hs.config.redis_host,
port=hs.config.redis_port,
password=hs.config.redis.redis_password,
Expand Down
40 changes: 39 additions & 1 deletion synapse/replication/tcp/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import logging
from inspect import isawaitable
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional

import txredisapi

Expand Down Expand Up @@ -228,3 +228,41 @@ def buildProtocol(self, addr):
p.password = self.password

return p


def lazyConnection(
reactor,
host: str = "localhost",
port: int = 6379,
dbid: Optional[int] = None,
reconnect: bool = True,
charset: str = "utf-8",
password: Optional[str] = None,
connectTimeout: Optional[int] = None,
replyTimeout: Optional[int] = None,
convertNumbers: bool = True,
) -> txredisapi.RedisProtocol:
"""Equivalent to `txredisapi.lazyConnection`, except allows specifying a
reactor.
"""

isLazy = True
poolsize = 1

uuid = "%s:%d" % (host, port)
factory = txredisapi.RedisFactory(
uuid,
dbid,
poolsize,
isLazy,
txredisapi.ConnectionHandler,
charset,
password,
replyTimeout,
convertNumbers,
)
factory.continueTrying = reconnect
for x in range(poolsize):
reactor.connectTCP(host, port, factory, connectTimeout)

return factory.handler
Loading

0 comments on commit 6c5d5e5

Please sign in to comment.