Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dvc/dagascii: Use pager instead of AsciiCanvas._do_draw #2815

Merged
merged 14 commits into from Dec 6, 2019
136 changes: 43 additions & 93 deletions dvc/dagascii.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
from __future__ import print_function
from __future__ import unicode_literals

import logging
import math
import os
import pydoc
import sys

from grandalf.graphs import Edge
Expand All @@ -12,6 +15,42 @@
from grandalf.routing import EdgeViewer
from grandalf.routing import route_with_lines

from dvc.env import DVC_PAGER


logger = logging.getLogger(__name__)


DEFAULT_PAGER = "less"
DEFAULT_PAGER_FORMATTED = "{} --chop-long-lines --clear-screen".format(
DEFAULT_PAGER
)


def find_pager():
def make_pager(cmd):
def pager(text):
return pydoc.tempfilepager(pydoc.plain(text), cmd)

pager.cmd = cmd
efiop marked this conversation as resolved.
Show resolved Hide resolved
return pager

if not sys.stdout.isatty():
return pydoc.plainpager

env_pager = os.getenv(DVC_PAGER)
if env_pager:
return make_pager(env_pager)

if os.system("({}) 2>{}".format(DEFAULT_PAGER, os.devnull)) == 0:
return make_pager(DEFAULT_PAGER_FORMATTED)

logger.warning(
"Unable to find `less` in the PATH. Check out "
"man.dvc.org/doc/command-reference/pipeline/show for more info."
)
return pydoc.plainpager


class VertexViewer(object):
"""Class to define vertex box boundaries that will be accounted for during
Expand Down Expand Up @@ -60,99 +99,10 @@ def __init__(self, cols, lines):

def draw(self):
"""Draws ASCII canvas on the screen."""
if sys.stdout.isatty(): # pragma: no cover
from asciimatics.screen import Screen

Screen.wrapper(self._do_draw)
This conversation was marked as resolved.
Show resolved Hide resolved
else:
for line in self.canvas:
print("".join(line))

def _do_draw(self, screen): # pragma: no cover
This conversation was marked as resolved.
Show resolved Hide resolved
# pylint: disable=too-many-locals
# pylint: disable=too-many-branches, too-many-statements
from dvc.system import System
from asciimatics.event import KeyboardEvent

offset_x = 0
offset_y = 0
smaxrow, smaxcol = screen.dimensions
assert smaxrow > 1
assert smaxcol > 1
smaxrow -= 1
smaxcol -= 1

if self.lines + 1 > smaxrow:
max_y = self.lines + 1 - smaxrow
else:
max_y = 0

if self.cols + 1 > smaxcol:
max_x = self.cols + 1 - smaxcol
else:
max_x = 0

while True:
for y in range(smaxrow + 1):
y_index = offset_y + y
line = []
for x in range(smaxcol + 1):
x_index = offset_x + x
if (
len(self.canvas) > y_index
and len(self.canvas[y_index]) > x_index
):
line.append(self.canvas[y_index][x_index])
else:
line.append(" ")
assert len(line) == (smaxcol + 1)
screen.print_at("".join(line), 0, y)

screen.refresh()

# NOTE: get_event() doesn't block by itself,
# so we have to do the blocking ourselves.
#
# NOTE: using this workaround while waiting for PR [1]
# to get merged and released. After that need to adjust
# asciimatics version requirements.
#
# [1] https://github.com/peterbrittain/asciimatics/pull/188
System.wait_for_input(self.TIMEOUT)

event = screen.get_event()
if not isinstance(event, KeyboardEvent):
continue

k = event.key_code
if k == screen.KEY_DOWN or k == ord("s"):
offset_y += 1
elif k == screen.KEY_PAGE_DOWN or k == ord("S"):
offset_y += smaxrow
elif k == screen.KEY_UP or k == ord("w"):
offset_y -= 1
elif k == screen.KEY_PAGE_UP or k == ord("W"):
offset_y -= smaxrow
elif k == screen.KEY_RIGHT or k == ord("d"):
offset_x += 1
elif k == ord("D"):
offset_x += smaxcol
elif k == screen.KEY_LEFT or k == ord("a"):
offset_x -= 1
elif k == ord("A"):
offset_x -= smaxcol
elif k == ord("q") or k == ord("Q"):
break

if offset_y > max_y:
offset_y = max_y
elif offset_y < 0:
offset_y = 0

if offset_x > max_x:
offset_x = max_x
elif offset_x < 0:
offset_x = 0
pager = find_pager()
lines = map("".join, self.canvas)
joined_lines = os.linesep.join(lines)
pager(joined_lines)

def point(self, x, y, char):
"""Create a point on ASCII canvas.
Expand Down
1 change: 1 addition & 0 deletions dvc/env.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DVC_DAEMON = "DVC_DAEMON"
DVC_PAGER = "DVC_PAGER"
efiop marked this conversation as resolved.
Show resolved Hide resolved
35 changes: 0 additions & 35 deletions dvc/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,41 +218,6 @@ def inode(path):
assert inode < 2 ** 64
return inode

@staticmethod
def _wait_for_input_windows(timeout):
import sys
import ctypes
import msvcrt
from ctypes.wintypes import DWORD, HANDLE

# https://docs.microsoft.com/en-us/windows/desktop/api/synchapi/nf-synchapi-waitforsingleobject
from win32event import WAIT_OBJECT_0, WAIT_TIMEOUT

func = ctypes.windll.kernel32.WaitForSingleObject
func.argtypes = [HANDLE, DWORD]
func.restype = DWORD

rc = func(msvcrt.get_osfhandle(sys.stdin.fileno()), timeout * 1000)
if rc not in [WAIT_OBJECT_0, WAIT_TIMEOUT]:
raise RuntimeError(rc)

@staticmethod
def _wait_for_input_posix(timeout):
import sys
import select

try:
select.select([sys.stdin], [], [], timeout)
except select.error:
pass

@staticmethod
def wait_for_input(timeout):
if System.is_unix():
return System._wait_for_input_posix(timeout)
else:
return System._wait_for_input_windows(timeout)

@staticmethod
def is_symlink(path):
path = fspath(path)
Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def run(self):
"jsonpath-ng>=1.4.3",
"requests>=2.22.0",
"grandalf==0.6",
"asciimatics>=1.10.0",
"distro>=1.3.0",
"appdirs>=1.4.3",
"treelib>=1.5.5",
Expand Down
28 changes: 28 additions & 0 deletions tests/unit/test_dagascii.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import mock
import os

from dvc import dagascii
from dvc.env import DVC_PAGER


def test_less_pager_returned_when_less_found():
with mock.patch.object(os, "system") as m:
This conversation was marked as resolved.
Show resolved Hide resolved
m.return_value = 0
pager = dagascii.find_pager()

assert pager.cmd == dagascii.DEFAULT_PAGER_FORMATTED


def test_plainpager_returned_when_less_missing():
with mock.patch.object(os, "system") as m:
m.return_value = 1 # any non-zero value
pager = dagascii.find_pager()

assert pager.__name__ == "plainpager"


def test_tempfilepager_returned_when_var_defined():
os.environ[DVC_PAGER] = dagascii.DEFAULT_PAGER
This conversation was marked as resolved.
Show resolved Hide resolved
pager = dagascii.find_pager()

assert pager.cmd == dagascii.DEFAULT_PAGER