Skip to content

Commit

Permalink
Problem (Fix #125): can't manage chain processes in pystarport (#126)
Browse files Browse the repository at this point in the history
Solution:
- integrate supervisord into pystarport
- redirect all logs to stdout
- add command supervisorctl to pystarport cli
  • Loading branch information
yihuang authored Sep 29, 2020
1 parent 70a2050 commit 2c6e9ac
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 75 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/nix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- run: nix run nixpkgs.python3Packages.flake8 -c flake8 --max-line-length 88 --extend-ignore E203 integration-tests/tests pystarport
- run: nix run nixpkgs.python3Packages.black -c black --check --diff integration-tests/tests pystarport
- run: nix-shell --run "isort -c --diff -rc integration-tests/tests pystarport"
- run: nix-shell --run "python -mpytest integration-tests/tests"
- run: nix-shell --run "python -mpytest -v integration-tests/tests"
- run: nix-build -o pystarportImage -A pystarportImage docker.nix
- uses: actions/upload-artifact@v2
if: github.ref == 'refs/heads/master'
Expand Down
31 changes: 12 additions & 19 deletions integration-tests/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,15 @@ def event_loop(request):

@pytest.fixture(scope="session")
async def cluster():
data_dir = tempfile.TemporaryDirectory(suffix="chain-test")
cluster = Cluster(
yaml.safe_load(open(CURRENT_DIR / "config.yml")),
Path(data_dir.name),
CLUSTER_BASE_PORT,
)
await cluster.init()
await cluster.start()
log_task = asyncio.create_task(cluster.watch_logs())

yield cluster

log_task.cancel()
try:
await log_task
except asyncio.CancelledError:
pass
await cluster.terminate()
data_dir.cleanup()
with tempfile.TemporaryDirectory(suffix="chain-test") as tempdir:
cluster = Cluster(
yaml.safe_load(open(CURRENT_DIR / "config.yml")),
Path(tempdir),
CLUSTER_BASE_PORT,
)
await cluster.init()
await cluster.start()

yield cluster

await cluster.terminate()
17 changes: 16 additions & 1 deletion pystarport/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pystarport/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ jsonmerge = "^1.7.0"
PyYAML = "^5.3.1"
python-dateutil = "^2.8.1"
durations = "^0.3.3"
supervisor = "^4.2.1"

[tool.poetry.dev-dependencies]

Expand Down
17 changes: 16 additions & 1 deletion pystarport/pystarport/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import signal
from pathlib import Path

import fire
Expand Down Expand Up @@ -32,7 +33,16 @@ def init(self):

async def _start(self):
await self._cluster.start()
await self._cluster.watch_logs()

# register signal to quit supervisord
loop = asyncio.get_event_loop()
for signame in ("SIGINT", "SIGTERM"):
loop.add_signal_handler(
getattr(signal, signame),
lambda: asyncio.ensure_future(self._cluster.terminate()),
)

await self._cluster.wait()

def start(self):
"""
Expand All @@ -50,6 +60,11 @@ def serve(self):
"""
asyncio.run(self._serve())

def supervisorctl(self):
from supervisor.supervisorctl import main

main(["-c", self._cluster.data_dir / "tasks.ini"])


def main():
fire.Fire(CLI)
Expand Down
113 changes: 60 additions & 53 deletions pystarport/pystarport/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
import datetime
import json
import os
import re
import subprocess
import sys
from pathlib import Path

import dateutil.parser
import durations
import jsonmerge
import tomlkit
from supervisor import xmlrpc
from supervisor.compat import xmlrpclib

from . import ports
from .utils import interact, local_ip
from .utils import interact, local_ip, write_ini

CHAIN = "chain-maind" # edit by nix-build

Expand Down Expand Up @@ -126,62 +126,41 @@ def __init__(self, config, data_dir, base_port, cmd=CHAIN):
self.data_dir = data_dir
self.base_port = base_port
self.cli = ClusterCLI(data_dir, base_port, config["chain_id"], cmd)
# subprocesses spawned
self.processes = []
self.supervisord_process = None
self._supervisorctl = None

@property
def supervisorctl(self):
# https://github.com/Supervisor/supervisor/blob/76df237032f7d9fbe80a0adce3829c8b916d5b58/supervisor/options.py#L1718
if self._supervisorctl is None:
self._supervisorctl = xmlrpclib.ServerProxy(
# dumbass ServerProxy won't allow us to pass in a non-HTTP url,
# so we fake the url we pass into it and always use the transport's
# 'serverurl' to figure out what to attach to
"http://127.0.0.1",
transport=xmlrpc.SupervisorTransport(
serverurl=f"unix://{self.data_dir}/supervisor.sock"
),
)
return self._supervisorctl

async def start(self):
assert not self.processes, "already started"

count = len(
[name for name in os.listdir(self.data_dir) if re.match(r"^node\d+$", name)]
)
if count == 0:
print("not initialized yet", file=sys.stderr)
return
for i in range(count):
(self.data_dir / f"node{i}.log").touch()
self.processes = [
await asyncio.create_subprocess_exec(
self.cmd,
"start",
"--home",
str(self.cli.home(i)),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
for i in range(count)
]

async def watch_logs(self):
async def watch_one(reader, prefix):
while True:
line = await reader.readline()
if not line:
break
log = prefix + line.decode()
sys.stdout.write(log)

await asyncio.wait(
[
watch_one(p.stdout, f"node{i}-stdout: ")
for i, p in enumerate(self.processes)
]
+ [
watch_one(p.stderr, f"node{i}-stderr: ")
for i, p in enumerate(self.processes)
]
assert not self.supervisord_process, "already started"
self.supervisord_process = await asyncio.create_subprocess_exec(
sys.executable,
"-msupervisor.supervisord",
"-c",
self.data_dir / "tasks.ini",
)

async def wait(self):
await asyncio.wait(
[p.wait() for p in self.processes], return_when=asyncio.FIRST_COMPLETED
)
await self.supervisord_process.wait()

async def terminate(self):
for p in self.processes:
p.terminate()
for p in self.processes:
await p.wait()
self.supervisord_process.terminate()
await self.supervisord_process.wait()
self.supervisord_process = None
self._supervisorctl = None

async def init(self):
"""
Expand Down Expand Up @@ -258,6 +237,34 @@ async def init(self):
)
edit_app_cfg(self.data_dir / f"node{i}/config/app.toml", self.base_port, i)

# write supervisord config file
supervisord_ini = {
"supervisord": {
"pidfile": "%(here)s/supervisord.pid",
"nodaemon": "true",
"logfile": "/dev/null",
"logfile_maxbytes": "0",
},
"rpcinterface:supervisor": {
"supervisor.rpcinterface_factory": "supervisor.rpcinterface:"
"make_main_rpcinterface",
},
"unix_http_server": {"file": "%(here)s/supervisor.sock"},
"supervisorctl": {"serverurl": "unix://%(here)s/supervisor.sock"},
}
for i, node in enumerate(self.config["validators"]):
supervisord_ini[f"program:node{i}"] = {
"command": f"{self.cmd} start --home {self.cli.home(i)}",
# redirect to supervisord's stdout, easier to collect all logs
"stdout_logfile": "/dev/fd/1",
"stdout_logfile_maxbytes": "0",
"autostart": "true",
"autorestart": "true",
"redirect_stderr": "true",
"startsecs": "3",
}
write_ini(open(self.data_dir / "tasks.ini", "w"), supervisord_ini)


def edit_tm_cfg(path, base_port, i, peers):
doc = tomlkit.parse(open(path).read())
Expand Down Expand Up @@ -290,6 +297,6 @@ async def test():
)
await c.init()
await c.start()
await c.watch_logs()
await c.wait()

asyncio.run(test())
10 changes: 10 additions & 0 deletions pystarport/pystarport/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import configparser


async def interact(cmd, ignore_error=False, input=None, **kwargs):
Expand All @@ -19,3 +20,12 @@ async def interact(cmd, ignore_error=False, input=None, **kwargs):

def local_ip():
return "127.0.0.1"


def write_ini(fp, cfg):
ini = configparser.ConfigParser()
for section, items in cfg.items():
ini.add_section(section)
sec = ini[section]
sec.update(items)
ini.write(fp)
1 change: 1 addition & 0 deletions shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ in
buildInputs = [
chain
pystarport
python3Packages.poetry
python3Packages.pytest-asyncio
python3Packages.pytest
python3Packages.flake8
Expand Down

0 comments on commit 2c6e9ac

Please sign in to comment.