Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add unit test for event persister sharding #8433

Merged
merged 7 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8433.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add unit test for event persister sharding.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,6 @@ ignore_missing_imports = True

[mypy-nacl.*]
ignore_missing_imports = True

[mypy-hiredis]
ignore_missing_imports = True
20 changes: 19 additions & 1 deletion stubs/txredisapi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"""Contains *incomplete* type hints for txredisapi.
"""

from typing import List, Optional, Union
from typing import List, Optional, Union, Type

class RedisProtocol:
def publish(self, channel: str, message: bytes): ...
Expand All @@ -42,3 +42,21 @@ def lazyConnection(

class SubscriberFactory:
def buildProtocol(self, addr): ...

class ConnectionHandler: ...

class RedisFactory:
continueTrying: bool
handler: RedisProtocol
def __init__(
self,
uuid: str,
dbid: Optional[int],
poolsize: int,
isLazy: bool = False,
handler: Type = ConnectionHandler,
charset: str = "utf-8",
password: Optional[str] = None,
replyTimeout: Optional[int] = None,
convertNumbers: Optional[int] = True,
): ...
6 changes: 3 additions & 3 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,9 @@ def start_replication(self, hs):
using TCP.
"""
if hs.config.redis.redis_enabled:
import txredisapi

from synapse.replication.tcp.redis import (
RedisDirectTcpReplicationClientFactory,
lazyConnection,
)

logger.info(
Expand All @@ -271,7 +270,8 @@ def start_replication(self, hs):
# connection after SUBSCRIBE is called).

# First create the connection for sending commands.
outbound_redis_connection = txredisapi.lazyConnection(
outbound_redis_connection = lazyConnection(
reactor=hs.get_reactor(),
host=hs.config.redis_host,
port=hs.config.redis_port,
password=hs.config.redis.redis_password,
Expand Down
40 changes: 39 additions & 1 deletion synapse/replication/tcp/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import logging
from inspect import isawaitable
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional

import txredisapi

Expand Down Expand Up @@ -228,3 +228,41 @@ def buildProtocol(self, addr):
p.password = self.password

return p


def lazyConnection(
reactor,
host: str = "localhost",
port: int = 6379,
dbid: Optional[int] = None,
reconnect: bool = True,
charset: str = "utf-8",
password: Optional[str] = None,
connectTimeout: Optional[int] = None,
replyTimeout: Optional[int] = None,
convertNumbers: bool = True,
) -> txredisapi.RedisProtocol:
"""Equivalent to `txredisapi.lazyConnection`, except allows specifying a
reactor.
Comment on lines +245 to +246
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""

isLazy = True
poolsize = 1

uuid = "%s:%d" % (host, port)
factory = txredisapi.RedisFactory(
uuid,
dbid,
poolsize,
isLazy,
txredisapi.ConnectionHandler,
charset,
password,
replyTimeout,
convertNumbers,
)
factory.continueTrying = reconnect
for x in range(poolsize):
reactor.connectTCP(host, port, factory, connectTimeout)

return factory.handler
Loading