Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem (Fix #125): can't manage chain processes in pystarport #126

Merged
merged 1 commit into from
Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/nix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- run: nix run nixpkgs.python3Packages.flake8 -c flake8 --max-line-length 88 --extend-ignore E203 integration-tests/tests pystarport
- run: nix run nixpkgs.python3Packages.black -c black --check --diff integration-tests/tests pystarport
- run: nix-shell --run "isort -c --diff -rc integration-tests/tests pystarport"
- run: nix-shell --run "python -mpytest integration-tests/tests"
- run: nix-shell --run "python -mpytest -v integration-tests/tests"
- run: nix-build -o pystarportImage -A pystarportImage docker.nix
- uses: actions/upload-artifact@v2
if: github.ref == 'refs/heads/master'
Expand Down
31 changes: 12 additions & 19 deletions integration-tests/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,15 @@ def event_loop(request):

@pytest.fixture(scope="session")
async def cluster():
data_dir = tempfile.TemporaryDirectory(suffix="chain-test")
cluster = Cluster(
yaml.safe_load(open(CURRENT_DIR / "config.yml")),
Path(data_dir.name),
CLUSTER_BASE_PORT,
)
await cluster.init()
await cluster.start()
log_task = asyncio.create_task(cluster.watch_logs())

yield cluster

log_task.cancel()
try:
await log_task
except asyncio.CancelledError:
pass
await cluster.terminate()
data_dir.cleanup()
with tempfile.TemporaryDirectory(suffix="chain-test") as tempdir:
cluster = Cluster(
yaml.safe_load(open(CURRENT_DIR / "config.yml")),
Path(tempdir),
CLUSTER_BASE_PORT,
)
await cluster.init()
await cluster.start()

yield cluster

await cluster.terminate()
17 changes: 16 additions & 1 deletion pystarport/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pystarport/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ jsonmerge = "^1.7.0"
PyYAML = "^5.3.1"
python-dateutil = "^2.8.1"
durations = "^0.3.3"
supervisor = "^4.2.1"

[tool.poetry.dev-dependencies]

Expand Down
17 changes: 16 additions & 1 deletion pystarport/pystarport/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import signal
from pathlib import Path

import fire
Expand Down Expand Up @@ -32,7 +33,16 @@ def init(self):

async def _start(self):
await self._cluster.start()
await self._cluster.watch_logs()

# register signal to quit supervisord
loop = asyncio.get_event_loop()
for signame in ("SIGINT", "SIGTERM"):
loop.add_signal_handler(
getattr(signal, signame),
lambda: asyncio.ensure_future(self._cluster.terminate()),
)

await self._cluster.wait()

def start(self):
"""
Expand All @@ -50,6 +60,11 @@ def serve(self):
"""
asyncio.run(self._serve())

def supervisorctl(self):
from supervisor.supervisorctl import main

main(["-c", self._cluster.data_dir / "tasks.ini"])


def main():
fire.Fire(CLI)
Expand Down
113 changes: 60 additions & 53 deletions pystarport/pystarport/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
import datetime
import json
import os
import re
import subprocess
import sys
from pathlib import Path

import dateutil.parser
import durations
import jsonmerge
import tomlkit
from supervisor import xmlrpc
from supervisor.compat import xmlrpclib

from . import ports
from .utils import interact, local_ip
from .utils import interact, local_ip, write_ini

CHAIN = "chain-maind" # edit by nix-build

Expand Down Expand Up @@ -126,62 +126,41 @@ def __init__(self, config, data_dir, base_port, cmd=CHAIN):
self.data_dir = data_dir
self.base_port = base_port
self.cli = ClusterCLI(data_dir, base_port, config["chain_id"], cmd)
# subprocesses spawned
self.processes = []
self.supervisord_process = None
self._supervisorctl = None

@property
def supervisorctl(self):
# https://github.com/Supervisor/supervisor/blob/76df237032f7d9fbe80a0adce3829c8b916d5b58/supervisor/options.py#L1718
if self._supervisorctl is None:
self._supervisorctl = xmlrpclib.ServerProxy(
# dumbass ServerProxy won't allow us to pass in a non-HTTP url,
# so we fake the url we pass into it and always use the transport's
# 'serverurl' to figure out what to attach to
"http://127.0.0.1",
transport=xmlrpc.SupervisorTransport(
serverurl=f"unix://{self.data_dir}/supervisor.sock"
),
)
return self._supervisorctl

async def start(self):
assert not self.processes, "already started"

count = len(
[name for name in os.listdir(self.data_dir) if re.match(r"^node\d+$", name)]
)
if count == 0:
print("not initialized yet", file=sys.stderr)
return
for i in range(count):
(self.data_dir / f"node{i}.log").touch()
self.processes = [
await asyncio.create_subprocess_exec(
self.cmd,
"start",
"--home",
str(self.cli.home(i)),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
for i in range(count)
]

async def watch_logs(self):
async def watch_one(reader, prefix):
while True:
line = await reader.readline()
if not line:
break
log = prefix + line.decode()
sys.stdout.write(log)

await asyncio.wait(
[
watch_one(p.stdout, f"node{i}-stdout: ")
for i, p in enumerate(self.processes)
]
+ [
watch_one(p.stderr, f"node{i}-stderr: ")
for i, p in enumerate(self.processes)
]
assert not self.supervisord_process, "already started"
self.supervisord_process = await asyncio.create_subprocess_exec(
sys.executable,
"-msupervisor.supervisord",
"-c",
self.data_dir / "tasks.ini",
)

async def wait(self):
await asyncio.wait(
[p.wait() for p in self.processes], return_when=asyncio.FIRST_COMPLETED
)
await self.supervisord_process.wait()

async def terminate(self):
for p in self.processes:
p.terminate()
for p in self.processes:
await p.wait()
self.supervisord_process.terminate()
await self.supervisord_process.wait()
self.supervisord_process = None
self._supervisorctl = None

async def init(self):
"""
Expand Down Expand Up @@ -258,6 +237,34 @@ async def init(self):
)
edit_app_cfg(self.data_dir / f"node{i}/config/app.toml", self.base_port, i)

# write supervisord config file
supervisord_ini = {
"supervisord": {
"pidfile": "%(here)s/supervisord.pid",
"nodaemon": "true",
"logfile": "/dev/null",
"logfile_maxbytes": "0",
},
"rpcinterface:supervisor": {
"supervisor.rpcinterface_factory": "supervisor.rpcinterface:"
"make_main_rpcinterface",
},
"unix_http_server": {"file": "%(here)s/supervisor.sock"},
"supervisorctl": {"serverurl": "unix://%(here)s/supervisor.sock"},
}
for i, node in enumerate(self.config["validators"]):
supervisord_ini[f"program:node{i}"] = {
"command": f"{self.cmd} start --home {self.cli.home(i)}",
# redirect to supervisord's stdout, easier to collect all logs
"stdout_logfile": "/dev/fd/1",
"stdout_logfile_maxbytes": "0",
"autostart": "true",
"autorestart": "true",
"redirect_stderr": "true",
"startsecs": "3",
}
write_ini(open(self.data_dir / "tasks.ini", "w"), supervisord_ini)


def edit_tm_cfg(path, base_port, i, peers):
doc = tomlkit.parse(open(path).read())
Expand Down Expand Up @@ -290,6 +297,6 @@ async def test():
)
await c.init()
await c.start()
await c.watch_logs()
await c.wait()

asyncio.run(test())
10 changes: 10 additions & 0 deletions pystarport/pystarport/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import configparser


async def interact(cmd, ignore_error=False, input=None, **kwargs):
Expand All @@ -19,3 +20,12 @@ async def interact(cmd, ignore_error=False, input=None, **kwargs):

def local_ip():
return "127.0.0.1"


def write_ini(fp, cfg):
ini = configparser.ConfigParser()
for section, items in cfg.items():
ini.add_section(section)
sec = ini[section]
sec.update(items)
ini.write(fp)
1 change: 1 addition & 0 deletions shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ in
buildInputs = [
chain
pystarport
python3Packages.poetry
python3Packages.pytest-asyncio
python3Packages.pytest
python3Packages.flake8
Expand Down