From c8de8079667ab6bf3c3ca0ccaa3b93450d2127c5 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Tue, 21 Jan 2020 18:02:11 +0100 Subject: [PATCH 1/2] Added py-spy and changed the stress script to use it --- requirements/requirements-ci.txt | 1 + requirements/requirements-dev.in | 1 + requirements/requirements-dev.txt | 1 + requirements/requirements-docs.txt | 2 +- tools/debugging/stress_test_transfers.py | 92 +++++++++++++++++------- 5 files changed, 72 insertions(+), 25 deletions(-) diff --git a/requirements/requirements-ci.txt b/requirements/requirements-ci.txt index ccb58a9d0b..c47274fe1c 100644 --- a/requirements/requirements-ci.txt +++ b/requirements/requirements-ci.txt @@ -109,6 +109,7 @@ ptyprocess==0.6.0 py-ecc==1.4.7 py-geth==2.1.0 py-solc==3.2.0 +py-spy==0.3.2 py==1.8.0 pyasn1-modules==0.2.5 pyasn1==0.4.5 diff --git a/requirements/requirements-dev.in b/requirements/requirements-dev.in index fd597ff69b..c11a777d44 100644 --- a/requirements/requirements-dev.in +++ b/requirements/requirements-dev.in @@ -33,6 +33,7 @@ responses # Debugging pdbpp colour +py-spy # Continuous Integration coverage diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 31d1c15992..1b65ce4e8c 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -108,6 +108,7 @@ ptyprocess==0.6.0 py-ecc==1.4.7 py-geth==2.1.0 py-solc==3.2.0 +py-spy==0.3.2 py==1.8.0 # via pytest pyasn1-modules==0.2.5 # via matrix-synapse, service-identity pyasn1==0.4.5 # via matrix-synapse, pyasn1-modules, service-identity diff --git a/requirements/requirements-docs.txt b/requirements/requirements-docs.txt index 3dbcbdbe3b..03a93b62c8 100644 --- a/requirements/requirements-docs.txt +++ b/requirements/requirements-docs.txt @@ -33,4 +33,4 @@ urllib3==1.25.3 # via requests wheel==0.33.4 # via astunparse # The following packages are considered to be unsafe in a requirements file: -# setuptools==44.0.0 # via sphinx +# setuptools==45.1.0 # via sphinx diff --git a/tools/debugging/stress_test_transfers.py b/tools/debugging/stress_test_transfers.py index 79ef390e87..e49902ccff 100755 --- a/tools/debugging/stress_test_transfers.py +++ b/tools/debugging/stress_test_transfers.py @@ -9,11 +9,12 @@ import signal from abc import ABC, abstractmethod from dataclasses import dataclass +from datetime import datetime from http import HTTPStatus from itertools import chain, count, product from time import time from types import TracebackType -from typing import Any, Callable, Iterator, List, NewType, NoReturn, Optional, Set, Type +from typing import Any, Callable, Iterable, Iterator, List, NewType, NoReturn, Optional, Set, Type import gevent import requests @@ -22,7 +23,7 @@ from gevent.event import AsyncResult, Event from gevent.greenlet import Greenlet from gevent.pool import Pool -from gevent.subprocess import STDOUT, Popen +from gevent.subprocess import DEVNULL, STDOUT, Popen from greenlet import greenlet from raiden.network.utils import get_free_port @@ -99,6 +100,16 @@ class InitiatorAndTarget: @dataclass class StressTestConfiguration: + port_generator: Iterator[Port] + retry_timeout: int + capacity_lower_bound: Amount + token_address: str + iteration_counter: Iterable[int] + profiler_data_directory: Optional[str] + + +@dataclass +class StressTestPlan: # These values can NOT be iterables because they will be consumed multiple # times. @@ -246,7 +257,7 @@ def start_and_wait_for_server( nursery: Nursery, port_generator: Iterator[Port], node: NodeConfig, retry_timeout: int ) -> RunningNode: # redirect the process output for debugging - os.makedirs(node.data_dir, exist_ok=True) + os.makedirs(os.path.expanduser(node.data_dir), exist_ok=True) stdout = open(os.path.join(node.data_dir, "stress_test.out"), "a") port = next(port_generator) @@ -446,25 +457,45 @@ def scheduler_preserve_order( yield Transfer(from_to, Amount(transfer)) +def run_profiler( + nursery: Nursery, running_nodes: List[RunningNode], profiler_data_directory: str +) -> List[Popen]: + os.makedirs(os.path.expanduser(profiler_data_directory), exist_ok=True) + + profiler_processes: List[Popen] = list() + for node in running_nodes: + args = [ + "py-spy", + "record", + "--pid", + str(node.process.pid), + "--output", + os.path.join( + profiler_data_directory, + f"{node.config.address}-{datetime.utcnow().isoformat()}.data", + ), + ] + profiler = Popen(args, stdout=DEVNULL, stderr=DEVNULL) + + nursery.track(profiler) + + return profiler_processes + + def run_stress_test( - nursery: Nursery, - port_generator: Iterator[Port], - retry_timeout: int, - running_nodes: List[RunningNode], - capacity_lower_bound: Amount, - token_address: str, - iteration_counter: Iterator[int], + nursery: Nursery, running_nodes: List[RunningNode], config: StressTestConfiguration ) -> None: identifier_generator = count(start=FIRST_VALID_PAYMENT_ID) + profiler_processes: List[Popen] = list() # TODO: Add tests with fees. This may require changes to the transfer plan, # since ATM it depends only in the `capacity_lower_bound` settings. - for iteration in iteration_counter: + for iteration in config.iteration_counter: log.info(f"Starting iteration {iteration}") # The configuration has to be re-created on every iteration because the # port numbers change - config = StressTestConfiguration( + plan = StressTestPlan( initiator_target_pairs=[paths_for_mediated_transfers(running_nodes)], concurrency=[50], planners=[do_fifty_transfer_up_to], @@ -475,7 +506,7 @@ def run_stress_test( # their channel status. The script should assert the open channels have # at least `capacity_lower_bound` together. for concurent_paths, concurrency, transfer_planner, scheduler in zip( - config.initiator_target_pairs, config.concurrency, config.planners, config.schedulers + plan.initiator_target_pairs, plan.concurrency, plan.planners, plan.schedulers ): log.info( f"Starting run {concurent_paths}, {concurrency}, {transfer_planner}, {scheduler}" @@ -484,22 +515,32 @@ def run_stress_test( # The plan MUST be executed successfully until exhaustion, # otherwise the next plan may try to use an amount that is not # available. - transfer_plan = transfer_planner(capacity_lower_bound) + transfer_plan = transfer_planner(config.capacity_lower_bound) transfers = list(scheduler(concurent_paths, transfer_plan)) + if config.profiler_data_directory: + profiler_processes = run_profiler( + nursery, running_nodes, config.profiler_data_directory + ) + # TODO: `do_transfers` should return the amount of tokens # transferred with each `(from, to)` pair, and the total amount # must be lower than the `capacity_lower_bound`. do_transfers( transfers=transfers, - token_address=token_address, + token_address=config.token_address, identifier_generator=identifier_generator, pool_size=concurrency, ) # After each `do_transfers` the state of the system must be # reset, otherwise there is a bug in the planner or Raiden. - running_nodes = restart_network(nursery, port_generator, running_nodes, retry_timeout) + running_nodes = restart_network( + nursery, config.port_generator, running_nodes, config.retry_timeout + ) + + for profiler in profiler_processes: + profiler.send_signal(signal.SIGINT) # TODO: cancel the spawn_later if `greenlet` exits normally. @@ -532,10 +573,14 @@ def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--nodes-data-dir", default=os.getcwd()) parser.add_argument("--wait-after-first-sync", default=False, action="store_true") + parser.add_argument("--profiler-data-directory", default=None) parser.add_argument("--interface", default="127.0.0.1") parser.add_argument("config") args = parser.parse_args() + if args.profiler_data_directory is not None and os.geteuid() != 0: + raise RuntimeError("To enable profiling the script has to be executed with root.") + config = configparser.ConfigParser() config.read(args.config) @@ -588,8 +633,6 @@ def main() -> None: "--proportional-imbalance-fee", "0xf9BA8aDF7F7024D7de8eB37b4c981CFFe3C88Ea7", "0", - "--flamegraph", - "/tmp/flamegraph", ] raiden_args.extend(chain.from_iterable(node.items())) @@ -604,6 +647,7 @@ def main() -> None: iterations = 5 token_address = config.defaults()["token-address"] + profiler_data_directory = args.profiler_data_directory if not is_checksum_address(token_address): raise ValueError(f"Invalid token address {token_address}, check it is checksummed.") @@ -642,16 +686,16 @@ def main() -> None: print("All nodes are ready! Press Enter to continue and perform the stress tests.") input() - nursery.spawn_under_watch( - run_stress_test, - nursery, + test_config = StressTestConfiguration( port_generator, retry_timeout, - nodes_running, - capacity_lower_bound, + Amount(capacity_lower_bound), token_address, iteration_counter, - ).get() + profiler_data_directory, + ) + + nursery.spawn_under_watch(run_stress_test, nursery, nodes_running, test_config).get() if __name__ == "__main__": From 026c1c58c200db9dcdafca022bd299700788c8f6 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Mon, 27 Jan 2020 16:41:47 +0100 Subject: [PATCH 2/2] pr review --- requirements/requirements-ci.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements/requirements-ci.txt b/requirements/requirements-ci.txt index c47274fe1c..42f3853ce8 100644 --- a/requirements/requirements-ci.txt +++ b/requirements/requirements-ci.txt @@ -77,6 +77,7 @@ jinja2==2.10.1 jsonschema==3.0.1 lazy-object-proxy==1.4.1 lru-dict==1.1.6 +macholib==1.14 # via pyinstaller markupsafe==1.1.1 marshmallow-dataclass==6.0.0 marshmallow-enum==1.5.1