Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added py-spy and changed the stress script to use it #5729

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions requirements/requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ jinja2==2.10.1
jsonschema==3.0.1
lazy-object-proxy==1.4.1
lru-dict==1.1.6
macholib==1.14 # via pyinstaller
markupsafe==1.1.1
marshmallow-dataclass==6.0.0
marshmallow-enum==1.5.1
Expand Down Expand Up @@ -109,6 +110,7 @@ ptyprocess==0.6.0
py-ecc==1.4.7
py-geth==2.1.0
py-solc==3.2.0
py-spy==0.3.2
py==1.8.0
pyasn1-modules==0.2.5
pyasn1==0.4.5
Expand Down
1 change: 1 addition & 0 deletions requirements/requirements-dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ responses
# Debugging
pdbpp
colour
py-spy

# Continuous Integration
coverage
Expand Down
1 change: 1 addition & 0 deletions requirements/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ ptyprocess==0.6.0
py-ecc==1.4.7
py-geth==2.1.0
py-solc==3.2.0
py-spy==0.3.2
py==1.8.0 # via pytest
pyasn1-modules==0.2.5 # via matrix-synapse, service-identity
pyasn1==0.4.5 # via matrix-synapse, pyasn1-modules, service-identity
Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements-docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ urllib3==1.25.3 # via requests
wheel==0.33.4 # via astunparse

# The following packages are considered to be unsafe in a requirements file:
# setuptools==44.0.0 # via sphinx
# setuptools==45.1.0 # via sphinx
92 changes: 68 additions & 24 deletions tools/debugging/stress_test_transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
import signal
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from http import HTTPStatus
from itertools import chain, count, product
from time import time
from types import TracebackType
from typing import Any, Callable, Iterator, List, NewType, NoReturn, Optional, Set, Type
from typing import Any, Callable, Iterable, Iterator, List, NewType, NoReturn, Optional, Set, Type

import gevent
import requests
Expand All @@ -22,7 +23,7 @@
from gevent.event import AsyncResult, Event
from gevent.greenlet import Greenlet
from gevent.pool import Pool
from gevent.subprocess import STDOUT, Popen
from gevent.subprocess import DEVNULL, STDOUT, Popen
from greenlet import greenlet

from raiden.network.utils import get_free_port
Expand Down Expand Up @@ -99,6 +100,16 @@ class InitiatorAndTarget:

@dataclass
class StressTestConfiguration:
port_generator: Iterator[Port]
retry_timeout: int
capacity_lower_bound: Amount
token_address: str
iteration_counter: Iterable[int]
profiler_data_directory: Optional[str]


@dataclass
class StressTestPlan:
# These values can NOT be iterables because they will be consumed multiple
# times.

Expand Down Expand Up @@ -246,7 +257,7 @@ def start_and_wait_for_server(
nursery: Nursery, port_generator: Iterator[Port], node: NodeConfig, retry_timeout: int
) -> RunningNode:
# redirect the process output for debugging
os.makedirs(node.data_dir, exist_ok=True)
os.makedirs(os.path.expanduser(node.data_dir), exist_ok=True)
stdout = open(os.path.join(node.data_dir, "stress_test.out"), "a")

port = next(port_generator)
Expand Down Expand Up @@ -446,25 +457,45 @@ def scheduler_preserve_order(
yield Transfer(from_to, Amount(transfer))


def run_profiler(
nursery: Nursery, running_nodes: List[RunningNode], profiler_data_directory: str
) -> List[Popen]:
os.makedirs(os.path.expanduser(profiler_data_directory), exist_ok=True)

profiler_processes: List[Popen] = list()
for node in running_nodes:
args = [
"py-spy",
"record",
"--pid",
str(node.process.pid),
"--output",
os.path.join(
profiler_data_directory,
f"{node.config.address}-{datetime.utcnow().isoformat()}.data",
),
]
profiler = Popen(args, stdout=DEVNULL, stderr=DEVNULL)

nursery.track(profiler)

return profiler_processes


def run_stress_test(
nursery: Nursery,
port_generator: Iterator[Port],
retry_timeout: int,
running_nodes: List[RunningNode],
capacity_lower_bound: Amount,
token_address: str,
iteration_counter: Iterator[int],
nursery: Nursery, running_nodes: List[RunningNode], config: StressTestConfiguration
) -> None:
identifier_generator = count(start=FIRST_VALID_PAYMENT_ID)
profiler_processes: List[Popen] = list()

# TODO: Add tests with fees. This may require changes to the transfer plan,
# since ATM it depends only in the `capacity_lower_bound` settings.
for iteration in iteration_counter:
for iteration in config.iteration_counter:
log.info(f"Starting iteration {iteration}")

# The configuration has to be re-created on every iteration because the
# port numbers change
config = StressTestConfiguration(
plan = StressTestPlan(
initiator_target_pairs=[paths_for_mediated_transfers(running_nodes)],
concurrency=[50],
planners=[do_fifty_transfer_up_to],
Expand All @@ -475,7 +506,7 @@ def run_stress_test(
# their channel status. The script should assert the open channels have
# at least `capacity_lower_bound` together.
for concurent_paths, concurrency, transfer_planner, scheduler in zip(
config.initiator_target_pairs, config.concurrency, config.planners, config.schedulers
plan.initiator_target_pairs, plan.concurrency, plan.planners, plan.schedulers
):
log.info(
f"Starting run {concurent_paths}, {concurrency}, {transfer_planner}, {scheduler}"
Expand All @@ -484,22 +515,32 @@ def run_stress_test(
# The plan MUST be executed successfully until exhaustion,
# otherwise the next plan may try to use an amount that is not
# available.
transfer_plan = transfer_planner(capacity_lower_bound)
transfer_plan = transfer_planner(config.capacity_lower_bound)
transfers = list(scheduler(concurent_paths, transfer_plan))

if config.profiler_data_directory:
profiler_processes = run_profiler(
nursery, running_nodes, config.profiler_data_directory
)

# TODO: `do_transfers` should return the amount of tokens
# transferred with each `(from, to)` pair, and the total amount
# must be lower than the `capacity_lower_bound`.
do_transfers(
transfers=transfers,
token_address=token_address,
token_address=config.token_address,
identifier_generator=identifier_generator,
pool_size=concurrency,
)

# After each `do_transfers` the state of the system must be
# reset, otherwise there is a bug in the planner or Raiden.
running_nodes = restart_network(nursery, port_generator, running_nodes, retry_timeout)
running_nodes = restart_network(
nursery, config.port_generator, running_nodes, config.retry_timeout
)

for profiler in profiler_processes:
profiler.send_signal(signal.SIGINT)


# TODO: cancel the spawn_later if `greenlet` exits normally.
Expand Down Expand Up @@ -532,10 +573,14 @@ def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--nodes-data-dir", default=os.getcwd())
parser.add_argument("--wait-after-first-sync", default=False, action="store_true")
parser.add_argument("--profiler-data-directory", default=None)
parser.add_argument("--interface", default="127.0.0.1")
parser.add_argument("config")
args = parser.parse_args()

if args.profiler_data_directory is not None and os.geteuid() != 0:
raise RuntimeError("To enable profiling the script has to be executed with root.")

config = configparser.ConfigParser()
config.read(args.config)

Expand Down Expand Up @@ -588,8 +633,6 @@ def main() -> None:
"--proportional-imbalance-fee",
"0xf9BA8aDF7F7024D7de8eB37b4c981CFFe3C88Ea7",
"0",
"--flamegraph",
"/tmp/flamegraph",
]
raiden_args.extend(chain.from_iterable(node.items()))

Expand All @@ -604,6 +647,7 @@ def main() -> None:

iterations = 5
token_address = config.defaults()["token-address"]
profiler_data_directory = args.profiler_data_directory

if not is_checksum_address(token_address):
raise ValueError(f"Invalid token address {token_address}, check it is checksummed.")
Expand Down Expand Up @@ -642,16 +686,16 @@ def main() -> None:
print("All nodes are ready! Press Enter to continue and perform the stress tests.")
input()

nursery.spawn_under_watch(
run_stress_test,
nursery,
test_config = StressTestConfiguration(
port_generator,
retry_timeout,
nodes_running,
capacity_lower_bound,
Amount(capacity_lower_bound),
token_address,
iteration_counter,
).get()
profiler_data_directory,
)

nursery.spawn_under_watch(run_stress_test, nursery, nodes_running, test_config).get()


if __name__ == "__main__":
Expand Down