Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dvc/dagascii: Use pager instead of AsciiCanvas._do_draw #2815

Merged
merged 14 commits into from Dec 6, 2019
136 changes: 43 additions & 93 deletions dvc/dagascii.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
from __future__ import print_function
from __future__ import unicode_literals

import logging
import math
import os
import pydoc
import sys

from grandalf.graphs import Edge
Expand All @@ -12,6 +15,42 @@
from grandalf.routing import EdgeViewer
from grandalf.routing import route_with_lines

from dvc.env import DVC_PAGER


logger = logging.getLogger(__name__)


DEFAULT_PAGER = "less"
DEFAULT_PAGER_FORMATTED = "{} --chop-long-lines --clear-screen".format(
DEFAULT_PAGER
)


def make_pager(cmd):
def pager(text):
return pydoc.tempfilepager(pydoc.plain(text), cmd)

return pager


def find_pager():
if not sys.stdout.isatty():
return pydoc.plainpager

env_pager = os.getenv(DVC_PAGER)
if env_pager:
return make_pager(env_pager)

if os.system("({}) 2>{}".format(DEFAULT_PAGER, os.devnull)) == 0:
return make_pager(DEFAULT_PAGER_FORMATTED)

logger.warning(
"Unable to find `less` in the PATH. Check out "
"man.dvc.org/doc/command-reference/pipeline/show for more info."
)
return pydoc.plainpager


class VertexViewer(object):
"""Class to define vertex box boundaries that will be accounted for during
Expand Down Expand Up @@ -60,99 +99,10 @@ def __init__(self, cols, lines):

def draw(self):
"""Draws ASCII canvas on the screen."""
if sys.stdout.isatty(): # pragma: no cover
from asciimatics.screen import Screen

Screen.wrapper(self._do_draw)
This conversation was marked as resolved.
Show resolved Hide resolved
else:
for line in self.canvas:
print("".join(line))

def _do_draw(self, screen): # pragma: no cover
This conversation was marked as resolved.
Show resolved Hide resolved
# pylint: disable=too-many-locals
# pylint: disable=too-many-branches, too-many-statements
from dvc.system import System
from asciimatics.event import KeyboardEvent

offset_x = 0
offset_y = 0
smaxrow, smaxcol = screen.dimensions
assert smaxrow > 1
assert smaxcol > 1
smaxrow -= 1
smaxcol -= 1

if self.lines + 1 > smaxrow:
max_y = self.lines + 1 - smaxrow
else:
max_y = 0

if self.cols + 1 > smaxcol:
max_x = self.cols + 1 - smaxcol
else:
max_x = 0

while True:
for y in range(smaxrow + 1):
y_index = offset_y + y
line = []
for x in range(smaxcol + 1):
x_index = offset_x + x
if (
len(self.canvas) > y_index
and len(self.canvas[y_index]) > x_index
):
line.append(self.canvas[y_index][x_index])
else:
line.append(" ")
assert len(line) == (smaxcol + 1)
screen.print_at("".join(line), 0, y)

screen.refresh()

# NOTE: get_event() doesn't block by itself,
# so we have to do the blocking ourselves.
#
# NOTE: using this workaround while waiting for PR [1]
# to get merged and released. After that need to adjust
# asciimatics version requirements.
#
# [1] https://github.com/peterbrittain/asciimatics/pull/188
System.wait_for_input(self.TIMEOUT)

event = screen.get_event()
if not isinstance(event, KeyboardEvent):
continue

k = event.key_code
if k == screen.KEY_DOWN or k == ord("s"):
offset_y += 1
elif k == screen.KEY_PAGE_DOWN or k == ord("S"):
offset_y += smaxrow
elif k == screen.KEY_UP or k == ord("w"):
offset_y -= 1
elif k == screen.KEY_PAGE_UP or k == ord("W"):
offset_y -= smaxrow
elif k == screen.KEY_RIGHT or k == ord("d"):
offset_x += 1
elif k == ord("D"):
offset_x += smaxcol
elif k == screen.KEY_LEFT or k == ord("a"):
offset_x -= 1
elif k == ord("A"):
offset_x -= smaxcol
elif k == ord("q") or k == ord("Q"):
break

if offset_y > max_y:
offset_y = max_y
elif offset_y < 0:
offset_y = 0

if offset_x > max_x:
offset_x = max_x
elif offset_x < 0:
offset_x = 0
pager = find_pager()
lines = map("".join, self.canvas)
joined_lines = os.linesep.join(lines)
pager(joined_lines)

def point(self, x, y, char):
"""Create a point on ASCII canvas.
Expand Down
1 change: 1 addition & 0 deletions dvc/env.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DVC_DAEMON = "DVC_DAEMON"
DVC_PAGER = "DVC_PAGER"
efiop marked this conversation as resolved.
Show resolved Hide resolved
35 changes: 0 additions & 35 deletions dvc/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,41 +218,6 @@ def inode(path):
assert inode < 2 ** 64
return inode

@staticmethod
def _wait_for_input_windows(timeout):
import sys
import ctypes
import msvcrt
from ctypes.wintypes import DWORD, HANDLE

# https://docs.microsoft.com/en-us/windows/desktop/api/synchapi/nf-synchapi-waitforsingleobject
from win32event import WAIT_OBJECT_0, WAIT_TIMEOUT

func = ctypes.windll.kernel32.WaitForSingleObject
func.argtypes = [HANDLE, DWORD]
func.restype = DWORD

rc = func(msvcrt.get_osfhandle(sys.stdin.fileno()), timeout * 1000)
if rc not in [WAIT_OBJECT_0, WAIT_TIMEOUT]:
raise RuntimeError(rc)

@staticmethod
def _wait_for_input_posix(timeout):
import sys
import select

try:
select.select([sys.stdin], [], [], timeout)
except select.error:
pass

@staticmethod
def wait_for_input(timeout):
if System.is_unix():
return System._wait_for_input_posix(timeout)
else:
return System._wait_for_input_windows(timeout)

@staticmethod
def is_symlink(path):
path = fspath(path)
Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def run(self):
"jsonpath-ng>=1.4.3",
"requests>=2.22.0",
"grandalf==0.6",
"asciimatics>=1.10.0",
"distro>=1.3.0",
"appdirs>=1.4.3",
"treelib>=1.5.5",
Expand Down
41 changes: 41 additions & 0 deletions tests/unit/test_dagascii.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from dvc import dagascii
from dvc.env import DVC_PAGER


def test_find_pager_uses_default_pager_when_found(mocker):
mocker.patch("sys.stdout.isatty", return_value=True)
mocker.patch("os.system", return_value=0)
m_make_pager = mocker.patch.object(dagascii, "make_pager")

dagascii.find_pager()

m_make_pager.assert_called_once_with(dagascii.DEFAULT_PAGER_FORMATTED)


def test_find_pager_returns_plain_pager_when_default_missing(mocker):
mocker.patch("sys.stdout.isatty", return_value=True)
mocker.patch("os.system", return_value=1)

pager = dagascii.find_pager()

assert pager.__name__ == "plainpager"


def test_find_pager_uses_custom_pager_when_env_var_is_defined(
mocker, monkeypatch
):
mocker.patch("sys.stdout.isatty", return_value=True)
m_make_pager = mocker.patch.object(dagascii, "make_pager")
monkeypatch.setenv(DVC_PAGER, dagascii.DEFAULT_PAGER)

dagascii.find_pager()

m_make_pager.assert_called_once_with(dagascii.DEFAULT_PAGER)


def test_find_pager_returns_plain_pager_when_is_not_atty(mocker):
mocker.patch("sys.stdout.isatty", return_value=False)

pager = dagascii.find_pager()

assert pager.__name__ == "plainpager"