Skip to content

Commit

Permalink
Rpc reconnection tests (#580)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Jul 24, 2023
2 parents 2ad7951 + 6bcae10 commit f14168b
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 3 deletions.
94 changes: 92 additions & 2 deletions pytest_tests/testsuites/failovers/test_failover_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@

import allure
import pytest
import subprocess
from cluster import StorageNode
from failover_utils import wait_all_storage_nodes_returned, wait_object_replication
from failover_utils import wait_all_storage_nodes_returned, wait_object_replication, get_morph_chain_endpoints
from file_helper import generate_file, get_file_hash
from iptables_helper import IpTablesHelper
from python_keywords.container import create_container
from python_keywords.neofs_verbs import get_object, put_object_to_random_node
from python_keywords.neofs_verbs import get_object, put_object_to_random_node, get_netmap_netinfo
from wellknown_acl import PUBLIC_ACL
from python_keywords.node_management import storage_node_healthcheck, check_node_in_map
from neofs_testlib.hosting import Hosting
from common import STORAGE_NODE_SERVICE_NAME_REGEX

from steps.cluster_test_base import ClusterTestBase

Expand Down Expand Up @@ -110,3 +114,89 @@ def test_block_storage_node_traffic(
wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint()
)
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)

@pytest.mark.sanity
@allure.title("RPC reconnection test")
def test_rpc_reconnection(self, default_wallet, hosting: Hosting):
"""
When RPC connection fails (and it can), storage node reconnects to some other node and continues to operate.
"""
dport_repeat = 10 # Constant for the number of the disconnect should be repeated
morph_chain_endpoints = get_morph_chain_endpoints(hosting)

required_keys = ['epoch', 'time_per_block', 'audit_fee', 'storage_price', 'container_fee', 'eigentrust_alpha',
'number_of_eigentrust_iterations', 'epoch_duration', 'inner_ring_candidate_fee',
'maximum_object_size', 'withdrawal_fee', 'systemdns', 'homomorphic_hashing_disabled',
'maintenance_mode_allowed']

for storage_node in hosting.find_service_configs(STORAGE_NODE_SERVICE_NAME_REGEX):
host = hosting.get_host_by_service(storage_node.name)
pid = host.get_service_pid(storage_node.name)

for morph_chain_addr, morph_chain_port in morph_chain_endpoints:
with allure.step(f'Disconnecting storage node {storage_node.name} '
f'from {morph_chain_addr} {dport_repeat} times'):
for repeat in range(dport_repeat):
with allure.step(f'Disconnect number {repeat}'):
try:
"""
Of course, it would be cleaner to use such code:
with Namespace(pid, 'net'):
subprocess.check_output(['ss', '-K', 'dst', addr, 'dport', port])
But it would be required to run the tests from root, which is bad practice.
But we face the limitations of the ubuntu-latest runner:
And using setfacl is not possible due to GitHub ubuntu-latest runner limitations.
"""
command = f'ss -K dst {morph_chain_addr} dport {morph_chain_port}'
sudo_command = f'sudo nsenter -t {pid} -n {command}'
output = subprocess.check_output(sudo_command, shell=True)
logger.info(f'Output of the command {sudo_command}: {output}')
except subprocess.CalledProcessError as e:
logger.error(
f'Error occurred while running command: {sudo_command}. Error message: {str(e)}')
raise
finally:
# Delay between shutdown attempts, emulates a real disconnection
sleep(1)
logger.info(
f'Disconnected storage node {storage_node.name} '
f'from {morph_chain_addr} {dport_repeat} times')

for node in self.cluster.storage_nodes:

with allure.step(f'Checking if node {node} is alive'):
try:
health_check = storage_node_healthcheck(node)
assert (
health_check.health_status == "READY"
and health_check.network_status == "ONLINE"
)
except Exception as err:
logger.warning(f'Node {node} is not online:\n{err}')
raise AssertionError(
f'After the RPC connection failed, the storage node {node} DID NOT reconnect '
f'to any other node and FAILED to continue operating. '
)

with allure.step(f'Checking netinfo for node {node}'):
try:
net_info = get_netmap_netinfo(
wallet=default_wallet,
endpoint=self.cluster.default_rpc_endpoint,
shell=self.shell,
)
missing_keys = [key for key in required_keys if key not in net_info]
if missing_keys:
raise AssertionError(
f'Error occurred while checking netinfo for node {node} - '
f'missing keys in the output: {missing_keys}.'
f'Netmap netinfo: {net_info}'
)
except Exception as err:
logger.warning(
f'Error occurred while checking netinfo for node {node}. Error message: {str(err)}')
raise Exception(
f'After the RPC connection failed, the storage node {node} cannot get netmap netinfo'
)

logger.info(f'Node {node} is alive and online')
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mmh3==3.0.0
multidict==6.0.2
mypy==0.950
mypy-extensions==0.4.3
neofs-testlib==1.1.1
neofs-testlib==1.1.2
netaddr==0.8.0
packaging==21.3
paramiko==2.10.3
Expand Down
19 changes: 19 additions & 0 deletions robot/resources/lib/python_keywords/failover_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
from time import sleep

import allure
from typing import List, Tuple, Optional
from urllib.parse import urlparse
from cluster import Cluster, StorageNode
from neofs_testlib.shell import Shell
from neofs_testlib.hosting import Hosting
from python_keywords.node_management import storage_node_healthcheck
from storage_policy import get_nodes_with_object
from common import MORPH_CHAIN_SERVICE_NAME_REGEX, ENDPOINT_INTERNAL0

logger = logging.getLogger("NeoLogger")

Expand Down Expand Up @@ -52,3 +56,18 @@ def is_all_storage_nodes_returned(cluster: Cluster) -> bool:
if health_check.health_status != "READY" or health_check.network_status != "ONLINE":
return False
return True


@allure.step("Get morph chain endpoints")
def get_morph_chain_endpoints(hosting: Hosting) -> List[Tuple[str, str]]:
morph_chain_config = hosting.find_service_configs(MORPH_CHAIN_SERVICE_NAME_REGEX)
endpoints = []
for config in morph_chain_config:
if ENDPOINT_INTERNAL0 not in config.attributes:
raise ValueError(f"{ENDPOINT_INTERNAL0} is not present in the attributes of the config: {config}")
morph_chain_addr_full = config.attributes[ENDPOINT_INTERNAL0]
parsed_url = urlparse(morph_chain_addr_full)
addr = parsed_url.hostname
port = str(parsed_url.port)
endpoints.append((addr, port))
return endpoints
3 changes: 3 additions & 0 deletions robot/variables/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@

HOSTING_CONFIG_FILE = os.getenv("HOSTING_CONFIG_FILE", ".devenv.hosting.yaml")
STORAGE_NODE_SERVICE_NAME_REGEX = r"s\d\d"
MORPH_CHAIN_SERVICE_NAME_REGEX = r"morph-chain\d\d"
HTTP_GATE_SERVICE_NAME_REGEX = r"http-gate\d\d"
S3_GATE_SERVICE_NAME_REGEX = r"s3-gate\d\d"

ENDPOINT_INTERNAL0 = "endpoint_internal0"

# Generate wallet configs
# TODO: we should move all info about wallet configs to fixtures
WALLET_CONFIG = os.path.join(os.getcwd(), "wallet_config.yml")
Expand Down

0 comments on commit f14168b

Please sign in to comment.