Skip to content

Commit

Permalink
pyln-testing: use files for stdout and stderr, not threads.
Browse files Browse the repository at this point in the history
Some flakes are caused by weird races in this code.  Plus, if we
get things to write straight to files, we might see things in
there on post-mortem which happen after the python runner exits.

It's a bit less efficient, but much simpler.  Let's see if it helps!

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
  • Loading branch information
rustyrussell committed May 27, 2022
1 parent 6449e44 commit 1390b0b
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 192 deletions.
148 changes: 56 additions & 92 deletions contrib/pyln-testing/pyln/testing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,20 @@ class TailableProc(object):
tail the processes and react to their output.
"""

def __init__(self, outputDir=None, verbose=True):
def __init__(self, outputDir, verbose=True):
self.logs = []
self.logs_cond = threading.Condition(threading.RLock())
self.env = os.environ.copy()
self.running = False
self.proc = None
self.outputDir = outputDir
if not os.path.exists(outputDir):
os.makedirs(outputDir)
# Create and open them.
self.stdout_filename = os.path.join(outputDir, "log")
self.stderr_filename = os.path.join(outputDir, "errlog")
self.stdout_write = open(self.stdout_filename, "wt")
self.stderr_write = open(self.stderr_filename, "wt")
self.stdout_read = open(self.stdout_filename, "rt")
self.stderr_read = open(self.stderr_filename, "rt")
self.logsearch_start = 0
self.err_logs = []
self.prefix = ""
Expand All @@ -192,29 +199,17 @@ def __init__(self, outputDir=None, verbose=True):
# pass it to the log matcher and not print it to stdout).
self.log_filter = lambda line: False

def start(self, stdin=None, stdout=None, stderr=None):
def start(self, stdin=None):
"""Start the underlying process and start monitoring it.
"""
logging.debug("Starting '%s'", " ".join(self.cmd_line))
self.proc = subprocess.Popen(self.cmd_line,
stdin=stdin,
stdout=stdout if stdout else subprocess.PIPE,
stderr=stderr,
stdout=self.stdout_write,
stderr=self.stderr_write,
env=self.env)
self.thread = threading.Thread(target=self.tail)
self.thread.daemon = True
self.thread.start()
self.running = True

def save_log(self):
if self.outputDir:
logpath = os.path.join(self.outputDir, 'log')
with open(logpath, 'w') as f:
for l in self.logs:
f.write(l + '\n')

def stop(self, timeout=10):
self.save_log()
self.proc.terminate()

# Now give it some time to react to the signal
Expand All @@ -224,56 +219,32 @@ def stop(self, timeout=10):
self.proc.kill()

self.proc.wait()
self.thread.join()

return self.proc.returncode

def kill(self):
"""Kill process without giving it warning."""
self.proc.kill()
self.proc.wait()
self.thread.join()

def tail(self):
"""Tail the stdout of the process and remember it.

Stores the lines of output produced by the process in
self.logs and signals that a new line was read so that it can
be picked up by consumers.
def logs_catchup(self):
"""Save the latest stdout / stderr contents; return true if we got anything.
"""
for line in iter(self.proc.stdout.readline, ''):
if len(line) == 0:
break

line = line.decode('UTF-8', 'replace').rstrip()

if self.log_filter(line):
continue

if self.verbose:
sys.stdout.write("{}: {}\n".format(self.prefix, line))

with self.logs_cond:
self.logs.append(line)
self.logs_cond.notifyAll()

self.running = False
self.proc.stdout.close()

if self.proc.stderr:
for line in iter(self.proc.stderr.readline, ''):

if line is None or len(line) == 0:
break

line = line.rstrip().decode('UTF-8', 'replace')
self.err_logs.append(line)

self.proc.stderr.close()
new_stdout = self.stdout_read.readlines()
if self.verbose:
for line in new_stdout:
sys.stdout.write("{}: {}".format(self.prefix, line))
self.logs += [l.rstrip() for l in new_stdout]
new_stderr = self.stderr_read.readlines()
if self.verbose:
for line in new_stderr:
sys.stderr.write("{}-stderr: {}".format(self.prefix, line))
self.err_logs += [l.rstrip() for l in new_stderr]
return len(new_stdout) > 0 or len(new_stderr) > 0

def is_in_log(self, regex, start=0):
"""Look for `regex` in the logs."""

self.logs_catchup()
ex = re.compile(regex)
for l in self.logs[start:]:
if ex.search(l):
Expand All @@ -286,6 +257,7 @@ def is_in_log(self, regex, start=0):
def is_in_stderr(self, regex):
"""Look for `regex` in stderr."""

self.logs_catchup()
ex = re.compile(regex)
for l in self.err_logs:
if ex.search(l):
Expand All @@ -311,31 +283,30 @@ def wait_for_logs(self, regexs, timeout=TIMEOUT):
logging.debug("Waiting for {} in the logs".format(regexs))
exs = [re.compile(r) for r in regexs]
start_time = time.time()
pos = self.logsearch_start
while True:
if timeout is not None and time.time() > start_time + timeout:
print("Time-out: can't find {} in logs".format(exs))
for r in exs:
if self.is_in_log(r):
print("({} was previously in logs!)".format(r))
raise TimeoutError('Unable to find "{}" in logs.'.format(exs))

with self.logs_cond:
if pos >= len(self.logs):
if not self.running:
raise ValueError('Process died while waiting for logs')
self.logs_cond.wait(1)
continue

for r in exs.copy():
self.logsearch_start = pos + 1
if r.search(self.logs[pos]):
logging.debug("Found '%s' in logs", r)
exs.remove(r)
break
if len(exs) == 0:
return self.logs[pos]
pos += 1
if self.logsearch_start >= len(self.logs):
if not self.logs_catchup():
time.sleep(0.25)

if timeout is not None and time.time() > start_time + timeout:
print("Time-out: can't find {} in logs".format(exs))
for r in exs:
if self.is_in_log(r):
print("({} was previously in logs!)".format(r))
raise TimeoutError('Unable to find "{}" in logs.'.format(exs))
continue

line = self.logs[self.logsearch_start]
print("Looking for {} in {}".format(exs[0], line))
self.logsearch_start += 1
for r in exs.copy():
if r.search(line):
logging.debug("Found '%s' in logs", r)
exs.remove(r)
if len(exs) == 0:
return line
# Don't match same line with different regexs!
break

def wait_for_log(self, regex, timeout=TIMEOUT):
"""Look for `regex` in the logs.
Expand Down Expand Up @@ -620,10 +591,9 @@ def cmd_line(self):

return self.cmd_prefix + [self.executable] + opts

def start(self, stdin=None, stdout=None, stderr=None,
wait_for_initialized=True):
def start(self, stdin=None, wait_for_initialized=True):
self.opts['bitcoin-rpcport'] = self.rpcproxy.rpcport
TailableProc.start(self, stdin, stdout, stderr)
TailableProc.start(self, stdin)
if wait_for_initialized:
self.wait_for_log("Server started with public key")
logging.info("LightningD started")
Expand Down Expand Up @@ -852,8 +822,8 @@ def is_synced_with_bitcoin(self, info=None):
info = self.rpc.getinfo()
return 'warning_bitcoind_sync' not in info and 'warning_lightningd_sync' not in info

def start(self, wait_for_bitcoind_sync=True, stderr=None):
self.daemon.start(stderr=stderr)
def start(self, wait_for_bitcoind_sync=True):
self.daemon.start()
# Cache `getinfo`, we'll be using it a lot
self.info = self.rpc.getinfo()
# This shortcut is sufficient for our simple tests.
Expand All @@ -878,7 +848,6 @@ def stop(self, timeout=10):
if self.rc is None:
self.rc = self.daemon.stop()

self.daemon.save_log()
self.daemon.cleanup()

if self.rc != 0 and not self.may_fail:
Expand Down Expand Up @@ -1417,12 +1386,7 @@ def get_node(self, node_id=None, options=None, dbfile=None,

if start:
try:
# Capture stderr if we're failing
if expect_fail:
stderr = subprocess.PIPE
else:
stderr = None
node.start(wait_for_bitcoind_sync, stderr=stderr)
node.start(wait_for_bitcoind_sync)
except Exception:
if expect_fail:
return node
Expand Down
59 changes: 32 additions & 27 deletions tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ def crash_bitcoincli(r):
# Ignore BROKEN log message about blocksonly mode.
l2 = node_factory.get_node(start=False, expect_fail=True,
allow_broken_log=True)
with pytest.raises(ValueError):
l2.start(stderr=subprocess.PIPE)
l2.daemon.start(wait_for_initialized=False)
# Will exit with failure code.
assert l2.daemon.wait() == 1
assert l2.daemon.is_in_stderr(r".*deactivating transaction relay is not"
" supported.") is not None
# wait_for_log gets upset since daemon is not running.
wait_for(lambda: l2.daemon.is_in_log('deactivating transaction'
' relay is not supported'))
" supported.")
assert l2.daemon.is_in_log('deactivating transaction'
' relay is not supported')


def test_bitcoin_ibd(node_factory, bitcoind):
Expand Down Expand Up @@ -1208,8 +1208,10 @@ def test_rescan(node_factory, bitcoind):
l1.daemon.opts['rescan'] = -500000
l1.stop()
bitcoind.generate_block(4)
with pytest.raises(ValueError):
l1.start()
l1.daemon.start(wait_for_initialized=False)
# Will exit with failure code.
assert l1.daemon.wait() == 1
assert l1.daemon.is_in_stderr(r"bitcoind has gone backwards from 500000 to 105 blocks!")

# Restarting with future absolute blockheight is fine if we can find it.
l1.daemon.opts['rescan'] = -105
Expand Down Expand Up @@ -1237,14 +1239,19 @@ def test_bitcoind_goes_backwards(node_factory, bitcoind):
bitcoind.start()

# Will simply refuse to start.
with pytest.raises(ValueError):
l1.start()
l1.daemon.start(wait_for_initialized=False)
# Will exit with failure code.
assert l1.daemon.wait() == 1
assert l1.daemon.is_in_stderr('bitcoind has gone backwards')

# Nor will it start with if we ask for a reindex of fewer blocks.
l1.daemon.opts['rescan'] = 3

with pytest.raises(ValueError):
l1.start()
# Will simply refuse to start.
l1.daemon.start(wait_for_initialized=False)
# Will exit with failure code.
assert l1.daemon.wait() == 1
assert l1.daemon.is_in_stderr('bitcoind has gone backwards')

# This will force it, however.
l1.daemon.opts['rescan'] = -100
Expand Down Expand Up @@ -1673,7 +1680,7 @@ def test_newaddr(node_factory, chainparams):
assert both['bech32'].startswith(chainparams['bip173_prefix'])


def test_bitcoind_fail_first(node_factory, bitcoind, executor):
def test_bitcoind_fail_first(node_factory, bitcoind):
"""Make sure we handle spurious bitcoin-cli failures during startup
See [#2687](https://github.com/ElementsProject/lightning/issues/2687) for
Expand All @@ -1682,7 +1689,9 @@ def test_bitcoind_fail_first(node_factory, bitcoind, executor):
"""
# Do not start the lightning node since we need to instrument bitcoind
# first.
l1 = node_factory.get_node(start=False)
l1 = node_factory.get_node(start=False,
allow_broken_log=True,
may_fail=True)

# Instrument bitcoind to fail some queries first.
def mock_fail(*args):
Expand All @@ -1691,22 +1700,17 @@ def mock_fail(*args):
l1.daemon.rpcproxy.mock_rpc('getblockhash', mock_fail)
l1.daemon.rpcproxy.mock_rpc('estimatesmartfee', mock_fail)

f = executor.submit(l1.start)

wait_for(lambda: l1.daemon.running)
# Make sure it fails on the first `getblock` call (need to use `is_in_log`
# since the `wait_for_log` in `start` sets the offset)
wait_for(lambda: l1.daemon.is_in_log(
r'getblockhash [a-z0-9]* exited with status 1'))
wait_for(lambda: l1.daemon.is_in_log(
r'Unable to estimate opening fees'))
l1.daemon.start(wait_for_initialized=False)
l1.daemon.wait_for_logs([r'getblockhash [a-z0-9]* exited with status 1',
r'Unable to estimate opening fees',
r'BROKEN.*we have been retrying command for --bitcoin-retry-timeout=60 seconds'])
# Will exit with failure code.
assert l1.daemon.wait() == 1

# Now unset the mock, so calls go through again
l1.daemon.rpcproxy.mock_rpc('getblockhash', None)
l1.daemon.rpcproxy.mock_rpc('estimatesmartfee', None)

f.result()


@pytest.mark.developer("needs --dev-force-bip32-seed")
@unittest.skipIf(TEST_NETWORK != 'regtest', "Addresses are network specific")
Expand Down Expand Up @@ -2057,8 +2061,9 @@ def test_new_node_is_mainnet(node_factory):
del l1.daemon.opts['network']

# Wrong chain, will fail to start, but that's OK.
with pytest.raises(ValueError):
l1.start()
l1.daemon.start(wait_for_initialized=False)
# Will exit with failure code.
assert l1.daemon.wait() == 1

# Should create these
assert os.path.isfile(os.path.join(netdir, "hsm_secret"))
Expand Down
Loading

0 comments on commit 1390b0b

Please sign in to comment.