From 1b66f0532142b64c4a3df32d411e439dc4b28558 Mon Sep 17 00:00:00 2001 From: Casey Riebe Date: Tue, 10 Aug 2021 13:14:57 -0500 Subject: [PATCH] Adding Linux-style command chaining --- click_shell/__init__.py | 2 +- click_shell/cmd.py | 88 +++++++++++++++++++++++++++++++++++++++-- click_shell/core.py | 9 +++-- tests/conftest.py | 7 ++++ 4 files changed, 98 insertions(+), 8 deletions(-) create mode 100644 tests/conftest.py diff --git a/click_shell/__init__.py b/click_shell/__init__.py index 4c33545..2bd9e1b 100644 --- a/click_shell/__init__.py +++ b/click_shell/__init__.py @@ -15,4 +15,4 @@ '__version__', ] -__version__ = "3.0.dev0" +__version__ = "3.0.dev1" diff --git a/click_shell/cmd.py b/click_shell/cmd.py index 80fdf00..788a945 100644 --- a/click_shell/cmd.py +++ b/click_shell/cmd.py @@ -14,6 +14,66 @@ from ._compat import readline +class CommandChainException(Exception): + pass + + +class Command: + INITIAL = 'INITIAL' + AND = 'AND' + OR = 'OR' + + def __init__(self, command: str, cmd_type: Union[AND, OR, INITIAL]): + self.text = command + self.cmd_type = cmd_type + + +class CommandQueue(List): + """ + ClickCmd expects a List, so we implement our own with parsing + """ + + def __init__(self): + super().__init__() + self.queue = list() + + def parse_line(self, *, line: str): + initial_command = True + for command in line.split("&&"): + command = command.strip() + if initial_command: + self.queue.append(Command(command=command, cmd_type=Command.INITIAL)) + initial_command = False + continue + if '||' not in command: + self.queue.append(Command(command=command, cmd_type=Command.AND)) + else: + first = True + for _command in command.split("||"): + _command = _command.strip() + if first: + self.queue.append(Command(command=_command, cmd_type=Command.AND)) + first = False + continue + self.queue.append(Command(command=_command, cmd_type=Command.OR)) + + def pop(self, __index: int = ...): + return self.queue.pop(__index) + + def flush(self) -> None: + """ + Flushes out the queue by setting the property to an empty list + :return: + """ + self.queue = list() + + def __len__(self) -> int: + """ + :return: int length of self.queue + """ + return len(self.queue) + + class ClickCmd(Cmd): """ A simple wrapper around the builtin python cmd module that: @@ -57,6 +117,8 @@ def __init__( if not os.path.isdir(os.path.dirname(self.hist_file)): os.makedirs(os.path.dirname(self.hist_file)) + self.cmdqueue = CommandQueue() + def preloop(self): # read our history if readline: @@ -108,9 +170,12 @@ def cmdloop(self, intro: str = None): # pylint: disable=too-many-branches click.echo(file=self._stdout) click.echo('KeyboardInterrupt', file=self._stdout) continue - line = self.precmd(line) - stop = self.onecmd(line) - stop = self.postcmd(stop, line) + try: + line = self.precmd(line) + stop = self.onecmd(line) + stop = self.postcmd(stop, line) + except CommandChainException: + self.cmdqueue.flush() finally: self.postloop() @@ -120,6 +185,23 @@ def cmdloop(self, intro: str = None): # pylint: disable=too-many-branches if self.old_delims: readline.set_completer_delims(self.old_delims) + def precmd(self, line: Union[str, Command]) -> str: + if isinstance(line, str): + if any([chain in line for chain in ["&&", "||"]]): + self.cmdqueue.parse_line(line=line) + line = self.cmdqueue.pop(0).text + if isinstance(line, Command): + # a non-zero return code followed by and AND-ed command + if self.ctx.return_code and line.cmd_type == Command.AND: # noqa: return_code set in initial context + raise CommandChainException + # a zero return code followed by an OR-ed command + elif not self.ctx.return_code and line.cmd_type == Command.OR: # noqa: return_code set in initial context + raise CommandChainException + else: + line = line.text + + return line + def get_prompt(self) -> Optional[str]: if callable(self.prompt): kwargs = {} diff --git a/click_shell/core.py b/click_shell/core.py index 2ab46f9..b221210 100644 --- a/click_shell/core.py +++ b/click_shell/core.py @@ -34,10 +34,11 @@ def get_invoke(command: click.Command) -> Callable[[ClickCmd, str], bool]: def invoke_(self: ClickCmd, arg: str): # pylint: disable=unused-argument try: - command.main(args=shlex.split(arg), - prog_name=command.name, - standalone_mode=False, - parent=self.ctx) + self.ctx.return_code = 0 # Set initial code + self.ctx.return_code = command.main(args=shlex.split(arg), + prog_name=command.name, + standalone_mode=False, + parent=self.ctx) except click.ClickException as e: # Show the error message e.show() diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..71b463e --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,7 @@ +import pytest +from click.testing import CliRunner + + +@pytest.fixture +def cli_runner(): + return CliRunner()