Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added repl context object #117

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions click_repl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
InternalCommandException as InternalCommandException,
)
from .utils import exit as exit # noqa: F401
from .core import pass_context as pass_context # noqa: F401

__version__ = "0.3.0"
28 changes: 28 additions & 0 deletions click_repl/_ctx_stack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from .core import ReplContext


# To store the ReplContext objects generated throughout the Runtime.
_context_stack: list[ReplContext] = []


def _push_context(ctx: ReplContext) -> None:
"""
Pushes a new REPL context onto the current stack.

Parameters
----------
ctx
The :class:`~click_repl.core.ReplContext` object that should be
added to the REPL context stack.
"""
_context_stack.append(ctx)


def _pop_context() -> None:
"""Removes the top-level REPL context from the stack."""
_context_stack.pop()
96 changes: 53 additions & 43 deletions click_repl/_repl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import click
import sys
from prompt_toolkit import PromptSession
from prompt_toolkit.history import InMemoryHistory

from ._completer import ClickCompleter
from .exceptions import ClickExit # type: ignore[attr-defined]
from .exceptions import CommandLineParserError, ExitReplException, InvalidGroupFormat
from .utils import _execute_internal_and_sys_cmds
from .core import ReplContext
from .globals_ import ISATTY, get_current_repl_ctx


__all__ = ["bootstrap_prompt", "register_repl", "repl"]
Expand Down Expand Up @@ -90,58 +91,67 @@ def repl(

original_command = available_commands.pop(repl_command_name, None)

if isatty:
prompt_kwargs = bootstrap_prompt(group, prompt_kwargs, group_ctx)
session = PromptSession(**prompt_kwargs)
repl_ctx = ReplContext(
group_ctx,
bootstrap_prompt(group, prompt_kwargs, group_ctx),
get_current_repl_ctx(silent=True)
)

def get_command():
return session.prompt()
if ISATTY:
# If stdin is a TTY, prompt the user for input using PromptSession.
def get_command() -> str:
return repl_ctx.session.prompt() # type: ignore

else:
get_command = sys.stdin.readline

while True:
try:
command = get_command()
except KeyboardInterrupt:
continue
except EOFError:
break

if not command:
if isatty:
# If stdin is not a TTY, read input from stdin directly.
def get_command() -> str:
inp = sys.stdin.readline().strip()
repl_ctx._history.append(inp)
return inp

with repl_ctx:
while True:
try:
command = get_command()
except KeyboardInterrupt:
continue
else:
except EOFError:
break

try:
args = _execute_internal_and_sys_cmds(
command, allow_internal_commands, allow_system_commands
)
if args is None:
continue
if not command:
if isatty:
continue
else:
break

try:
args = _execute_internal_and_sys_cmds(
command, allow_internal_commands, allow_system_commands
)
if args is None:
continue

except CommandLineParserError:
continue
except CommandLineParserError:
continue

except ExitReplException:
break
except ExitReplException:
break

try:
# The group command will dispatch based on args.
old_protected_args = group_ctx.protected_args
try:
group_ctx.protected_args = args
group.invoke(group_ctx)
finally:
group_ctx.protected_args = old_protected_args
except click.ClickException as e:
e.show()
except (ClickExit, SystemExit):
pass

except ExitReplException:
break
# The group command will dispatch based on args.
old_protected_args = group_ctx.protected_args
try:
group_ctx.protected_args = args
group.invoke(group_ctx)
finally:
group_ctx.protected_args = old_protected_args
except click.ClickException as e:
e.show()
except (ClickExit, SystemExit):
pass

except ExitReplException:
break

if original_command is not None:
available_commands[repl_command_name] = original_command
Expand Down
214 changes: 214 additions & 0 deletions click_repl/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
"""
Core functionalities for managing context of the click_repl app.
"""

from __future__ import annotations

from functools import wraps
from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, TypeVar

from click import Context
from prompt_toolkit import PromptSession
from typing_extensions import Concatenate, Final, ParamSpec, TypeAlias, TypedDict

from ._ctx_stack import _pop_context, _push_context
from .globals_ import ISATTY, get_current_repl_ctx

if TYPE_CHECKING:
from prompt_toolkit.formatted_text import AnyFormattedText


P = ParamSpec("P")
R = TypeVar("R")
F = TypeVar("F", bound=Callable[..., Any])


__all__ = ["ReplContext", "pass_context"]


_PromptSession: TypeAlias = PromptSession[Dict[str, Any]]


class ReplContextInfoDict(TypedDict):
group_ctx: Context
prompt_kwargs: dict[str, Any]
session: _PromptSession | None
parent: ReplContext | None
_history: list[str]


class ReplContext:
"""
Context object for the REPL sessions.

This class tracks the depth of nested REPLs, ensuring seamless navigation
between different levels. It facilitates nested REPL scenarios, allowing
multiple levels of interactive REPL sessions.

Each REPL's properties are stored inside this context class, allowing them to
be accessed and shared with their parent REPL.

All the settings for each REPL session persist until the session is terminated.

Parameters
----------
group_ctx
The click context object that belong to the CLI/parent Group.

prompt_kwargs
Extra keyword arguments for
:class:`~prompt_toolkit.shortcuts.PromptSession` class.

parent
REPL Context object of the parent REPL session, if exists. Otherwise, :obj:`None`.
"""

__slots__ = (
"group_ctx",
"prompt_kwargs",
"parent",
"session",
"_history",
)

def __init__(
self,
group_ctx: Context,
prompt_kwargs: dict[str, Any] = {},
parent: ReplContext | None = None,
) -> None:
"""
Initializes the `ReplContext` class.
"""
session: _PromptSession | None

if ISATTY:
session = PromptSession(**prompt_kwargs)

else:
session = None

self.group_ctx: Final[Context] = group_ctx
"""The click context object that belong to the CLI/parent Group."""

self.session = session
"""Object that's responsible for managing and executing the REPL."""

self._history: list[str] = []
"""
History of past executed commands.

Used only when :func:`~sys.stdin.isatty` is :obj:`False`.
"""

self.prompt_kwargs = prompt_kwargs
"""
Extra keyword arguments for
:class:`~prompt_toolkit.shortcuts.PromptSession` class.
"""

self.parent: Final[ReplContext | None] = parent
"""
REPL Context object of the parent REPL session, if exists.
Otherwise, :obj:`None`.
"""

def __enter__(self) -> ReplContext:
_push_context(self)
return self

def __exit__(self, *_: Any) -> None:
_pop_context()

@property
def prompt(self) -> AnyFormattedText:
"""
The prompt text of the REPL.

Returns
-------
prompt_toolkit.formatted_text.AnyFormattedText
The prompt object if :func:`~sys.stdin.isatty` is :obj:`True`,
else :obj:`None`.
"""
if ISATTY and self.session is not None:
return self.session.message
return None

@prompt.setter
def prompt(self, value: AnyFormattedText) -> None:
if ISATTY and self.session is not None:
self.session.message = value

def to_info_dict(self) -> ReplContextInfoDict:
"""
Provides a dictionary with minimal info about the current REPL.

Returns
-------
ReplContextInfoDict
A dictionary that has the instance variables and their values.
"""

res: ReplContextInfoDict = {
"group_ctx": self.group_ctx,
"prompt_kwargs": self.prompt_kwargs,
"session": self.session,
"parent": self.parent,
"_history": self._history,
}

return res

def session_reset(self) -> None:
"""
Resets values of :class:`~prompt_toolkit.session.PromptSession` to
the provided :attr:`~.prompt_kwargs`, discarding any changes done to the
:class:`~prompt_toolkit.session.PromptSession` object.
"""

if ISATTY and self.session is not None:
self.session = PromptSession(**self.prompt_kwargs)

def history(self) -> Generator[str, None, None]:
"""
Generates the history of past executed commands.

Yields
------
str
The executed command string from the history,
in chronological order from most recent to oldest.
"""

if ISATTY and self.session is not None:
yield from self.session.history.load_history_strings()

else:
yield from reversed(self._history)


def pass_context(
func: Callable[Concatenate[ReplContext | None, P], R],
) -> Callable[P, R]:
"""
Decorator that marks a callback function to receive the current
REPL context object as its first argument.

Parameters
----------
func
The callback function to pass context as its first parameter.

Returns
-------
Callable[P,R]
The decorated callback function that receives the current REPL
context object as its first argument.
"""

@wraps(func)
def decorator(*args: P.args, **kwargs: P.kwargs) -> R:
return func(get_current_repl_ctx(), *args, **kwargs)

return decorator
Loading
Loading