Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(p2p): fix update whitelist handler #885

Merged
merged 1 commit into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 19 additions & 39 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
from twisted.internet.task import LoopingCall
from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol
from twisted.python.failure import Failure
from twisted.web.client import Agent

from hathor.conf import HathorSettings
from hathor.conf.get_settings import get_global_settings
from hathor.p2p.netfilter.factory import NetfilterFactory
from hathor.p2p.peer_discovery import PeerDiscovery
from hathor.p2p.peer_id import PeerId
Expand All @@ -39,12 +40,10 @@
from hathor.util import Random

if TYPE_CHECKING:
from twisted.internet.interfaces import IDelayedCall

from hathor.manager import HathorManager

logger = get_logger()
settings = HathorSettings()
settings = get_global_settings()

# The timeout in seconds for the whitelist GET request
WHITELIST_REQUEST_TIMEOUT = 45
Expand Down Expand Up @@ -102,6 +101,7 @@ def __init__(self,
self.log = logger.new()
self.rng = rng
self.manager = None
self._settings = get_global_settings()

self.reactor = reactor
self.my_peer = my_peer
Expand All @@ -124,7 +124,7 @@ def __init__(self,
self.client_factory = HathorClientFactory(self.network, self.my_peer, p2p_manager=self, use_ssl=self.use_ssl)

# Global maximum number of connections.
self.max_connections: int = settings.PEER_MAX_CONNECTIONS
self.max_connections: int = self._settings.PEER_MAX_CONNECTIONS

# Global rate limiter for all connections.
self.rate_limiter = RateLimiter(self.reactor)
Expand Down Expand Up @@ -168,7 +168,7 @@ def __init__(self,
self._last_sync_rotate: float = 0.

# A timer to try to reconnect to the disconnect known peers.
if settings.ENABLE_PEER_WHITELIST:
if self._settings.ENABLE_PEER_WHITELIST:
self.wl_reconnect = LoopingCall(self.update_whitelist)
self.wl_reconnect.clock = self.reactor

Expand All @@ -185,6 +185,9 @@ def __init__(self,
self._sync_factories = {}
self._enabled_sync_versions = set()

# agent to perform HTTP requests
self._http_agent = Agent(self.reactor)

def add_sync_factory(self, sync_version: SyncVersion, sync_factory: SyncAgentFactory) -> None:
"""Add factory for the given sync version, must use a sync version that does not already exist."""
# XXX: to allow code in `set_manager` to safely use the the available sync versions, we add this restriction:
Expand Down Expand Up @@ -273,7 +276,7 @@ def start(self) -> None:
self.lc_reconnect.start(5, now=False)
self.lc_sync_update.start(self.lc_sync_update_interval, now=False)

if settings.ENABLE_PEER_WHITELIST:
if self._settings.ENABLE_PEER_WHITELIST:
self._start_whitelist_reconnect()

for description in self.listen_addresses:
Expand Down Expand Up @@ -519,51 +522,28 @@ def reconnect_to_all(self) -> None:
self.connect_to_if_not_connected(peer, int(now))

def update_whitelist(self) -> Deferred[None]:
from twisted.web.client import Agent, readBody
from twisted.web.client import readBody
from twisted.web.http_headers import Headers
assert settings.WHITELIST_URL is not None
assert self._settings.WHITELIST_URL is not None
self.log.info('update whitelist')
agent = Agent(self.reactor)
d = agent.request(
d = self._http_agent.request(
b'GET',
settings.WHITELIST_URL.encode(),
self._settings.WHITELIST_URL.encode(),
Headers({'User-Agent': ['hathor-core']}),
None)
# Twisted Agent does not have a direct way to configure the HTTP client timeout
# only a TCP connection timeout.
# In this request we need a timeout that encompasses the connection and download time.
# The callLater below is a manual client timeout that includes it and
# will cancel the deferred in case it's called
timeout_call = self.reactor.callLater(WHITELIST_REQUEST_TIMEOUT, d.cancel)
d.addBoth(self._update_whitelist_timeout, timeout_call)
d.addCallback(readBody)
d.addErrback(self._update_whitelist_err)
d.addTimeout(WHITELIST_REQUEST_TIMEOUT, self.reactor)
d.addCallback(self._update_whitelist_cb)
return d

def _update_whitelist_timeout(self, param: Union[Failure, Optional[bytes]],
timeout_call: 'IDelayedCall') -> Union[Failure, Optional[bytes]]:
""" This method is always called for both cb and errback in the update whitelist get request deferred.
Because of that, the first parameter type will depend, will be a failure in case of errback
or optional bytes in case of cb (see _update_whitelist_cb).
d.addErrback(self._update_whitelist_err)

We just need to cancel the timeout call later and return the first parameter,
to continue the cb/errback sequence.
"""
if timeout_call.active():
timeout_call.cancel()
return param
return d

def _update_whitelist_err(self, *args: Any, **kwargs: Any) -> None:
self.log.error('update whitelist failed', args=args, kwargs=kwargs)

def _update_whitelist_cb(self, body: Optional[bytes]) -> None:
def _update_whitelist_cb(self, body: bytes) -> None:
assert self.manager is not None
if body is None:
self.log.warn('update whitelist got no response')
return
else:
self.log.info('update whitelist got response')
self.log.info('update whitelist got response')
try:
text = body.decode()
new_whitelist = parse_whitelist(text)
Expand Down
71 changes: 68 additions & 3 deletions tests/p2p/test_whitelist.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from unittest.mock import patch
from unittest.mock import Mock, patch

from hathor.conf import HathorSettings
from twisted.internet.defer import Deferred, TimeoutError
from twisted.python.failure import Failure
from twisted.web.client import Agent

from hathor.conf.get_settings import get_global_settings
from hathor.conf.settings import HathorSettings
from hathor.manager import HathorManager
from hathor.p2p.manager import WHITELIST_REQUEST_TIMEOUT
from hathor.p2p.sync_version import SyncVersion
from hathor.simulator import FakeConnection
from tests import unittest

settings = HathorSettings()
settings = get_global_settings()


class WhitelistTestCase(unittest.SyncV1Params, unittest.TestCase):
Expand Down Expand Up @@ -79,3 +86,61 @@ def test_sync_v11_whitelist_yes_yes(self):

self.assertFalse(conn.tr1.disconnecting)
self.assertFalse(conn.tr2.disconnecting)

def test_update_whitelist(self) -> None:
network = 'testnet'
manager: HathorManager = self.create_peer(network)
connections_manager = manager.connections

settings_mock = Mock(spec_set=HathorSettings)
settings_mock.WHITELIST_URL = 'some_url'
connections_manager._settings = settings_mock

agent_mock = Mock(spec_set=Agent)
agent_mock.request = Mock()
connections_manager._http_agent = agent_mock

with (
patch.object(connections_manager, '_update_whitelist_cb') as _update_whitelist_cb_mock,
patch.object(connections_manager, '_update_whitelist_err') as _update_whitelist_err_mock,
patch('twisted.web.client.readBody') as read_body_mock
):
# Test success
agent_mock.request.return_value = Deferred()
read_body_mock.return_value = b'body'
d = connections_manager.update_whitelist()
d.callback(None)

read_body_mock.assert_called_once_with(None)
_update_whitelist_cb_mock.assert_called_once_with(b'body')
_update_whitelist_err_mock.assert_not_called()

read_body_mock.reset_mock()
_update_whitelist_cb_mock.reset_mock()
_update_whitelist_err_mock.reset_mock()

# Test request error
agent_mock.request.return_value = Deferred()
d = connections_manager.update_whitelist()
error = Failure('some_error')
d.errback(error)

read_body_mock.assert_not_called()
_update_whitelist_cb_mock.assert_not_called()
_update_whitelist_err_mock.assert_called_once_with(error)

read_body_mock.reset_mock()
_update_whitelist_cb_mock.reset_mock()
_update_whitelist_err_mock.reset_mock()

# Test timeout
agent_mock.request.return_value = Deferred()
read_body_mock.return_value = b'body'
connections_manager.update_whitelist()

self.clock.advance(WHITELIST_REQUEST_TIMEOUT + 1)

read_body_mock.assert_not_called()
_update_whitelist_cb_mock.assert_not_called()
_update_whitelist_err_mock.assert_called_once()
assert isinstance(_update_whitelist_err_mock.call_args.args[0].value, TimeoutError)