Skip to content

Commit

Permalink
Test Prompt changes in test_readme.
Browse files Browse the repository at this point in the history
  • Loading branch information
byllyfish committed Jul 13, 2023
1 parent f57b3d0 commit f635bba
Showing 1 changed file with 12 additions and 21 deletions.
33 changes: 12 additions & 21 deletions tests/test_readme.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,33 @@
from shellous.harvest import harvest_results
from shellous.log import LOGGER

_PROMPT = ">>> "
_PROMPT_ELLIPSIS = "... "
_PS1 = "<+==+>"


class Prompt:
"Utility class to help with an interactive prompt."

def __init__(self, stdin, stdout, errbuf):
def __init__(self, stdin, stdout):
self.stdin = stdin
self.stdout = stdout
self.errbuf = errbuf
self.prompt_bytes = _PROMPT.encode("utf-8")
self.prompt_bytes = _PS1.encode("utf-8")

async def prompt(self, input_text=""):
"Write some input text to stdin, then await the response."
if input_text:
self.stdin.write(input_text.encode("utf-8") + b"\n")

# Drain our write to stdin, and wait for prompt from stdout.
cancelled, (out, _) = await harvest_results(
cancelled, (buf, _) = await harvest_results(
self.stdout.readuntil(self.prompt_bytes),
self.stdin.drain(),
timeout=5.0,
)
assert isinstance(buf, bytes)

if cancelled:
raise asyncio.CancelledError()

# If there are ellipsis bytes in the beginning of out, remove them.
assert isinstance(out, bytes)
out = re.sub(rb"^(?:\.\.\. )+", b"", out)

# Combine stderr and stdout, then clear stderr.
buf = self.errbuf + out
self.errbuf.clear()

# Clean up the output to remove the prompt, then return as string.
buf = buf.replace(b"\r\n", b"\n")
assert buf.endswith(self.prompt_bytes)
Expand All @@ -57,21 +49,20 @@ async def prompt(self, input_text=""):

async def run_asyncio_repl(cmds, logfile=None):
"Helper function to run the asyncio REPL and feed it commands."
errbuf = bytearray()
repl = (
sh(sys.executable, "-m", "asyncio")
.stdin(sh.CAPTURE)
.stdout(sh.CAPTURE)
.stderr(errbuf)
.set(_return_result=True, inherit_env=False)
.stderr(sh.STDOUT)
.set(inherit_env=False)
.env(**_current_env())
)

async with repl as run:
assert run.stdin is not None

p = Prompt(run.stdin, run.stdout, errbuf)
await p.prompt()
p = Prompt(run.stdin, run.stdout)
await p.prompt(f"import sys; sys.ps1 = '{_PS1}'; sys.ps2 = ''")

# Optionally redirect logging to a file.
await p.prompt("import shellous.log, logging")
Expand Down Expand Up @@ -225,12 +216,12 @@ def _separate_cmds_and_outputs(lines):
output = ""

for line in lines:
if line.startswith(_PROMPT):
if line.startswith(">>> "):
if cmd is not None:
yield (cmd, output.rstrip("\n"))
cmd = line[4:]
output = ""
elif line.startswith(_PROMPT_ELLIPSIS):
elif line.startswith("... "):
assert cmd
cmd += "\n" + line[4:]
else:
Expand Down

0 comments on commit f635bba

Please sign in to comment.