Skip to content

Commit

Permalink
Problem (Fix #125): can't manage chain processes in pystarport
Browse files Browse the repository at this point in the history
Solution:
- integrate supervisord into pystarport
- redirect all logs to stdout
  • Loading branch information
yihuang committed Sep 29, 2020
1 parent 70a2050 commit 2b51c67
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 73 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/nix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- run: nix run nixpkgs.python3Packages.flake8 -c flake8 --max-line-length 88 --extend-ignore E203 integration-tests/tests pystarport
- run: nix run nixpkgs.python3Packages.black -c black --check --diff integration-tests/tests pystarport
- run: nix-shell --run "isort -c --diff -rc integration-tests/tests pystarport"
- run: nix-shell --run "python -mpytest integration-tests/tests"
- run: nix-shell --run "python -mpytest -v integration-tests/tests"
- run: nix-build -o pystarportImage -A pystarportImage docker.nix
- uses: actions/upload-artifact@v2
if: github.ref == 'refs/heads/master'
Expand Down
31 changes: 12 additions & 19 deletions integration-tests/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,15 @@ def event_loop(request):

@pytest.fixture(scope="session")
async def cluster():
data_dir = tempfile.TemporaryDirectory(suffix="chain-test")
cluster = Cluster(
yaml.safe_load(open(CURRENT_DIR / "config.yml")),
Path(data_dir.name),
CLUSTER_BASE_PORT,
)
await cluster.init()
await cluster.start()
log_task = asyncio.create_task(cluster.watch_logs())

yield cluster

log_task.cancel()
try:
await log_task
except asyncio.CancelledError:
pass
await cluster.terminate()
data_dir.cleanup()
with tempfile.TemporaryDirectory(suffix="chain-test") as tempdir:
cluster = Cluster(
yaml.safe_load(open(CURRENT_DIR / "config.yml")),
Path(tempdir),
CLUSTER_BASE_PORT,
)
await cluster.init()
await cluster.start()

yield cluster

await cluster.terminate()
17 changes: 16 additions & 1 deletion pystarport/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pystarport/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ jsonmerge = "^1.7.0"
PyYAML = "^5.3.1"
python-dateutil = "^2.8.1"
durations = "^0.3.3"
supervisor = "^4.2.1"

[tool.poetry.dev-dependencies]

Expand Down
12 changes: 11 additions & 1 deletion pystarport/pystarport/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import signal
from pathlib import Path

import fire
Expand Down Expand Up @@ -32,7 +33,16 @@ def init(self):

async def _start(self):
await self._cluster.start()
await self._cluster.watch_logs()

# register signal to quit supervisord
loop = asyncio.get_event_loop()
for signame in ("SIGINT", "SIGTERM"):
loop.add_signal_handler(
getattr(signal, signame),
lambda: asyncio.ensure_future(self._cluster.terminate()),
)

await self._cluster.wait()

def start(self):
"""
Expand Down
92 changes: 41 additions & 51 deletions pystarport/pystarport/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import tomlkit

from . import ports
from .utils import interact, local_ip
from .utils import interact, local_ip, write_ini

CHAIN = "chain-maind" # edit by nix-build

Expand Down Expand Up @@ -126,62 +126,24 @@ def __init__(self, config, data_dir, base_port, cmd=CHAIN):
self.data_dir = data_dir
self.base_port = base_port
self.cli = ClusterCLI(data_dir, base_port, config["chain_id"], cmd)
# subprocesses spawned
self.processes = []
self.supervisord_process = None

async def start(self):
assert not self.processes, "already started"

count = len(
[name for name in os.listdir(self.data_dir) if re.match(r"^node\d+$", name)]
)
if count == 0:
print("not initialized yet", file=sys.stderr)
return
for i in range(count):
(self.data_dir / f"node{i}.log").touch()
self.processes = [
await asyncio.create_subprocess_exec(
self.cmd,
"start",
"--home",
str(self.cli.home(i)),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
for i in range(count)
]

async def watch_logs(self):
async def watch_one(reader, prefix):
while True:
line = await reader.readline()
if not line:
break
log = prefix + line.decode()
sys.stdout.write(log)

await asyncio.wait(
[
watch_one(p.stdout, f"node{i}-stdout: ")
for i, p in enumerate(self.processes)
]
+ [
watch_one(p.stderr, f"node{i}-stderr: ")
for i, p in enumerate(self.processes)
]
assert not self.supervisord_process, "already started"
self.supervisord_process = await asyncio.create_subprocess_exec(
sys.executable,
"-msupervisor.supervisord",
"-c",
self.data_dir / "tasks.ini",
)

async def wait(self):
await asyncio.wait(
[p.wait() for p in self.processes], return_when=asyncio.FIRST_COMPLETED
)
await self.supervisord_process.wait()

async def terminate(self):
for p in self.processes:
p.terminate()
for p in self.processes:
await p.wait()
self.supervisord_process.terminate()
await self.supervisord_process.wait()
self.supervisord_process = None

async def init(self):
"""
Expand Down Expand Up @@ -258,6 +220,34 @@ async def init(self):
)
edit_app_cfg(self.data_dir / f"node{i}/config/app.toml", self.base_port, i)

# write supervisord config file
supervisord_ini = {
"supervisord": {
"pidfile": "%(here)s/supervisord.pid",
"nodaemon": "true",
"logfile": "/dev/null",
"logfile_maxbytes": "0",
},
"rpcinterface:supervisor": {
"supervisor.rpcinterface_factory": "supervisor.rpcinterface:"
"make_main_rpcinterface",
},
"unix_http_server": {"file": "%(here)s/supervisor.sock"},
"supervisorctl": {"serverurl": "unix://%(here)s/supervisor.sock"},
}
for i, node in enumerate(self.config["validators"]):
supervisord_ini[f"program:node{i}"] = {
"command": f"{self.cmd} start --home {self.cli.home(i)}",
# redirect to supervisord's stdout, easier to collect all logs
"stdout_logfile": "/dev/fd/1",
"stdout_logfile_maxbytes": "0",
"autostart": "true",
"autorestart": "true",
"redirect_stderr": "true",
"startsecs": "3",
}
write_ini(open(self.data_dir / "tasks.ini", "w"), supervisord_ini)


def edit_tm_cfg(path, base_port, i, peers):
doc = tomlkit.parse(open(path).read())
Expand Down Expand Up @@ -290,6 +280,6 @@ async def test():
)
await c.init()
await c.start()
await c.watch_logs()
await c.wait()

asyncio.run(test())
10 changes: 10 additions & 0 deletions pystarport/pystarport/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import configparser


async def interact(cmd, ignore_error=False, input=None, **kwargs):
Expand All @@ -19,3 +20,12 @@ async def interact(cmd, ignore_error=False, input=None, **kwargs):

def local_ip():
return "127.0.0.1"


def write_ini(fp, cfg):
ini = configparser.ConfigParser()
for section, items in cfg.items():
ini.add_section(section)
sec = ini[section]
sec.update(items)
ini.write(fp)
1 change: 1 addition & 0 deletions shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ in
buildInputs = [
chain
pystarport
python3Packages.poetry
python3Packages.pytest-asyncio
python3Packages.pytest
python3Packages.flake8
Expand Down

0 comments on commit 2b51c67

Please sign in to comment.