Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use subprocess instead of os.system #1477

Merged
merged 9 commits into from
Nov 30, 2024
110 changes: 79 additions & 31 deletions src/click/_termui_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import typing as t
from gettext import gettext as _
from io import StringIO
from shutil import which
from types import TracebackType

from ._compat import _default_text_stdout
Expand Down Expand Up @@ -372,31 +373,42 @@ def pager(generator: t.Iterable[str], color: t.Optional[bool] = None) -> None:
pager_cmd = (os.environ.get("PAGER", None) or "").strip()
if pager_cmd:
if WIN:
return _tempfilepager(generator, pager_cmd, color)
return _pipepager(generator, pager_cmd, color)
if _tempfilepager(generator, pager_cmd, color):
return
elif _pipepager(generator, pager_cmd, color):
return
if os.environ.get("TERM") in ("dumb", "emacs"):
return _nullpager(stdout, generator, color)
if WIN or sys.platform.startswith("os2"):
return _tempfilepager(generator, "more <", color)
if hasattr(os, "system") and os.system("(less) 2>/dev/null") == 0:
return _pipepager(generator, "less", color)
if (WIN or sys.platform.startswith("os2")) and _tempfilepager(
generator, "more", color
):
return
if _pipepager(generator, "less", color):
return

import tempfile

fd, filename = tempfile.mkstemp()
os.close(fd)
try:
if hasattr(os, "system") and os.system(f'more "{filename}"') == 0:
return _pipepager(generator, "more", color)
if _pipepager(generator, "more", color):
return
return _nullpager(stdout, generator, color)
finally:
os.unlink(filename)


def _pipepager(generator: t.Iterable[str], cmd: str, color: t.Optional[bool]) -> None:
def _pipepager(generator: t.Iterable[str], cmd: str, color: t.Optional[bool]) -> bool:
"""Page through text by feeding it to another program. Invoking a
pager through this might support colors.

Returns True if the command was found, False otherwise and thus another
pager should be attempted.
"""
cmd_absolute = which(cmd)
if cmd_absolute is None:
return False

import subprocess

env = dict(os.environ)
Expand All @@ -412,19 +424,25 @@ def _pipepager(generator: t.Iterable[str], cmd: str, color: t.Optional[bool]) ->
elif "r" in less_flags or "R" in less_flags:
color = True

c = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, env=env)
stdin = t.cast(t.BinaryIO, c.stdin)
encoding = get_best_encoding(stdin)
c = subprocess.Popen(
[cmd_absolute],
shell=True,
stdin=subprocess.PIPE,
env=env,
errors="replace",
text=True,
)
assert c.stdin is not None
try:
for text in generator:
if not color:
text = strip_ansi(text)

stdin.write(text.encode(encoding, "replace"))
c.stdin.write(text)
except (OSError, KeyboardInterrupt):
pass
else:
stdin.close()
c.stdin.close()

# Less doesn't respect ^C, but catches it for its own UI purposes (aborting
# search or other commands inside less).
Expand All @@ -442,11 +460,25 @@ def _pipepager(generator: t.Iterable[str], cmd: str, color: t.Optional[bool]) ->
else:
break

return True


def _tempfilepager(
generator: t.Iterable[str], cmd: str, color: t.Optional[bool]
) -> None:
"""Page through text by invoking a program on a temporary file."""
generator: t.Iterable[str],
cmd: str,
color: t.Optional[bool],
) -> bool:
"""Page through text by invoking a program on a temporary file.

Returns True if the command was found, False otherwise and thus another
pager should be attempted.
"""
# Which is necessary for Windows, it is also recommended in the Popen docs.
cmd_absolute = which(cmd)
if cmd_absolute is None:
return False

import subprocess
import tempfile

fd, filename = tempfile.mkstemp()
Expand All @@ -458,11 +490,16 @@ def _tempfilepager(
with open_stream(filename, "wb")[0] as f:
f.write(text.encode(encoding))
try:
os.system(f'{cmd} "{filename}"')
subprocess.call([cmd_absolute, filename])
except OSError:
# Command not found
pass
finally:
os.close(fd)
os.unlink(filename)

return True


def _nullpager(
stream: t.TextIO, generator: t.Iterable[str], color: t.Optional[bool]
Expand Down Expand Up @@ -497,7 +534,7 @@ def get_editor(self) -> str:
if WIN:
return "notepad"
for editor in "sensible-editor", "vim", "nano":
if os.system(f"which {editor} >/dev/null 2>&1") == 0:
if which(editor) is not None:
return editor
return "vi"

Expand Down Expand Up @@ -596,22 +633,33 @@ def _unquote_file(url: str) -> str:
null.close()
elif WIN:
if locate:
url = _unquote_file(url.replace('"', ""))
args = f'explorer /select,"{url}"'
url = _unquote_file(url)
args = ["explorer", f"/select,{url}"]
else:
url = url.replace('"', "")
wait_str = "/WAIT" if wait else ""
args = f'start {wait_str} "" "{url}"'
return os.system(args)
args = ["start"]
if wait:
args.append("/WAIT")
args.append("")
args.append(url)
try:
return subprocess.call(args)
except OSError:
# Command not found
return 127
elif CYGWIN:
if locate:
url = os.path.dirname(_unquote_file(url).replace('"', ""))
args = f'cygstart "{url}"'
url = _unquote_file(url)
args = ["cygstart", os.path.dirname(url)]
else:
url = url.replace('"', "")
wait_str = "-w" if wait else ""
args = f'cygstart {wait_str} "{url}"'
return os.system(args)
args = ["cygstart"]
if wait:
args.append("-w")
args.append(url)
try:
return subprocess.call(args)
except OSError:
# Command not found
return 127

try:
if locate:
Expand Down
1 change: 1 addition & 0 deletions tests/test_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def tracking_import(module, locals=None, globals=None, fromlist=None,
"typing",
"types",
"gettext",
"shutil",
}

if WIN:
Expand Down
1 change: 0 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ def _test_gen_func():
yield "abc"


@pytest.mark.skipif(WIN, reason="Different behavior on windows.")
@pytest.mark.parametrize("cat", ["cat", "cat ", "cat "])
@pytest.mark.parametrize(
"test",
Expand Down