Skip to content

Commit

Permalink
Experimental support of recvscreen
Browse files Browse the repository at this point in the history
  • Loading branch information
ptr-yudai committed Jan 2, 2024
1 parent 0f4c164 commit 2e371d1
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 0 deletions.
1 change: 1 addition & 0 deletions ptrlib/binary/encoding/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .ansi import *
from .bitconv import *
from .byteconv import *
from .dump import *
Expand Down
193 changes: 193 additions & 0 deletions ptrlib/binary/encoding/ansi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import functools
import re

try:
cache = functools.cache
except AttributeError:
cache = functools.lru_cache


@cache
def escape_codes():
codes = {}
# Cursor
codes['CSI_CURSOR_MOVE'] = re.compile(b'^\x1b\[([1-9]\d*);([1-9]\d*)[Hf]')
codes['CSI_CURSOR_ROW'] = re.compile(b'^\x1b\[([1-9]\d*)d')
codes['CSI_CURSOR_COLUMN'] = re.compile(b'^\x1b\[([1-9]\d*)[`G]')
codes['CSI_CURSOR_UP'] = re.compile(b'^\x1b\[(\d+)A')
codes['CSI_CURSOR_DOWN'] = re.compile(b'^\x1b\[(\d+)B')
codes['CSI_CURSOR_RIGHT'] = re.compile(b'^\x1b\[(\d+)C')
codes['CSI_CURSOR_LEFT'] = re.compile(b'^\x1b\[(\d+)D')
codes['CSI_CURSOR_UP_HEAD'] = re.compile(b'^\x1b\[(\d+)F')
codes['CSI_CURSOR_DOWN_HEAD'] = re.compile(b'^\x1b\[(\d+)E')
codes['CSI_CURSOR_SAVE'] = re.compile(b'^\x1b\[s')
codes['CSI_CURSOR_RESTORE'] = re.compile(b'^\x1b\[u')
codes['CSI_CURSOR_REQUEST'] = re.compile(b'^\x1b\[6n')
codes['FP_CURSOR_SAVE'] = re.compile(b'^\x1b7')
codes['FP_CURSOR_RESTORE'] = re.compile(b'^\x1b8')
codes['FE_CURSOR_ONEUP'] = re.compile(b'^\x1bM')

# Character
codes['CSI_CHAR_REPEAT'] = re.compile(b'^\x1b\[(\d+)b')

# Erase
codes['CSI_ERASE_DISPLAY_FORWARD'] = re.compile(b'^\x1b\[[0]J')
codes['CSI_ERASE_DISPLAY_BACKWARD'] = re.compile(b'^\x1b\[1J')
codes['CSI_ERASE_DISPLAY_ALL'] = re.compile(b'^\x1b\[2J')
codes['CSI_ERASE_LINE_FORWARD'] = re.compile(b'^\x1b\[[0]K')
codes['CSI_ERASE_LINE_BACKWARD'] = re.compile(b'^\x1b\[1K')
codes['CSI_ERASE_LINE_ALL'] = re.compile(b'^\x1b\[2K')

# Others
codes['CSI_COLOR'] = re.compile(b'^\x1b\[(\d+)m')
codes['CSI_MODE'] = re.compile(b'^\x1b\[=(\d+)[hl]')
codes['CSI_PRIVATE_MODE'] = re.compile(b'^\x1b\[?(\d+)[hl]')

return codes


def draw_ansi(buf):
"""Interpret ANSI code sequences to screen
Args:
buf (bytes): ANSI code sequences
Returns:
list: 2D array of screen to be drawn
"""
draw = []
E = escape_codes()
width = height = x = y = 0
saved_dec = saved_sco = None
while len(buf):
if buf[0] == 13: # \r
x = 0
buf = buf[1:]
continue

elif buf[0] == 10: # \n
x = 0
y += 1
buf = buf[1:]
continue

elif buf[0] != 0x1b:
if x >= width: width = x + 1
if y >= height: height = y + 1
draw.append(('PUTCHAR', x, y, buf[0]))
x += 1
buf = buf[1:]
continue

# CSI sequences
if m := E['CSI_CURSOR_MOVE'].match(buf):
y, x = int(m.group(1)) - 1, int(m.group(2)) - 1
elif m := E['CSI_CURSOR_ROW'].match(buf):
y = int(m.group(1)) - 1
elif m := E['CSI_CURSOR_COLUMN'].match(buf):
x = int(m.group(1)) - 1
elif m := E['CSI_CURSOR_UP'].match(buf):
y = max(0, y - int(m.group(1)))
elif m := E['CSI_CURSOR_DOWN'].match(buf):
y += int(m.group(1))
elif m := E['CSI_CURSOR_LEFT'].match(buf):
x = max(0, x - int(m.group(1)))
elif m := E['CSI_CURSOR_RIGHT'].match(buf):
x += int(m.group(1))
elif m := E['CSI_CURSOR_UP_HEAD'].match(buf):
x, y = 0, max(0, y - int(m.group(1)))
elif m := E['CSI_CURSOR_DOWN_HEAD'].match(buf):
x, y = 0, y + int(m.group(1))
elif m := E['CSI_CURSOR_SAVE'].match(buf):
saved_sco = (x, y)
elif m := E['CSI_CURSOR_RESTORE'].match(buf):
if saved_sco is not None: x, y = saved_sco
elif m := E['CSI_CURSOR_REQUEST'].match(buf):
pass # Not implemented: Request cursor position
elif m := E['CSI_COLOR'].match(buf):
pass # Not implemented: Change color
elif m := E['CSI_MODE'].match(buf):
pass # Not implemented: Set mode

# Repease character
elif m := E['CSI_CHAR_REPEAT'].match(buf):
n = int(m.group(1))
draw.append(('CSI_CHAR_REPEAT', x, y, n))
x += n

# Fe escape sequences
elif m := E['FE_CURSOR_ONEUP'].match(buf):
y = max(0, y - 1) # scroll not implemented

# Fp escape sequences
elif m := E['FP_CURSOR_SAVE'].match(buf):
saved_dec = (x, y)
elif m := E['FP_CURSOR_RESTORE'].match(buf):
if saved_dec is not None: x, y = saved_dec

# Operation
else:
for k in ['CSI_ERASE_DISPLAY_FORWARD',
'CSI_ERASE_DISPLAY_BACKWARD',
'CSI_ERASE_DISPLAY_ALL',
'CSI_ERASE_LINE_FORWARD',
'CSI_ERASE_LINE_BACKWARD',
'CSI_ERASE_LINE_ALL']:
if m := E[k].match(buf):
if k == 'CSI_ERASE_DISPLAY_ALL':
draw = []
else:
draw.append((k, x, y, None))
break

# Otherwise draw text
if m:
buf = buf[m.end():]
else:
# TODO: skip ESC only?
raise NotImplementedError(f"Could not interpret code: {buf[:10]}")

# Emualte drawing
screen = [[' ' for x in range(width)] for y in range(height)]
last_char = ' '
for op, x, y, attr in draw:
if op == 'PUTCHAR':
last_char = chr(attr)
screen[y][x] = last_char

elif op == 'CSI_CHAR_REPEAT':
for j in range(attr):
screen[y][x+j] = last_char

elif op == 'CSI_ERASE_DISPLAY_FORWARD':
for j in range(x, width):
screen[y][j] = ' '
for i in range(y+1, height):
for j in range(width):
screen[i][j] = ' '

elif op == 'CSI_ERASE_DISPLAY_BACKWARD':
for j in range(x):
screen[y][j] = ' '
for i in range(y):
for j in range(width):
screen[i][j] = ' '

elif op == 'CSI_ERASE_DISPLAY_ALL':
for i in range(height):
for j in range(width):
screen[i][j] = ' '

elif op == 'CSI_ERASE_LINE_FORWARD':
for j in range(x, width):
screen[y][j] = ' '

elif op == 'CSI_ERASE_LINE_BACKWARD':
for j in range(x):
screen[y][j] = ' '

elif op == 'CSI_ERASE_LINE_ALL':
for j in range(width):
screen[y][j] = ' '

return screen
40 changes: 40 additions & 0 deletions ptrlib/connection/tube.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,21 @@ def recvregex(self,
else:
return group, data[:pos]

def recvscreen(self, delim: bytes=b'\x1b[H', timeout: Optional[Union[int, float]]=None):
"""Receive a screen
Receive a screen drawn by ncurses
Args:
delim (bytes): Refresh sequence
Returns:
str: Rectangle string drawing the screen
"""
self.recvuntil(delim, timeout=timeout)
buf = self.recvuntil(delim, drop=True, lookahead=True)
return '\n'.join(map(lambda row: ''.join(row), draw_ansi(buf)))

@abstractmethod
def _send(self, data: bytes):
pass
Expand Down Expand Up @@ -315,6 +330,31 @@ def sendlineafter(self, delim: Union[str, bytes], data: Union[str, bytes, int],

return recv_data

def sendctrl(self, name: str):
"""Send control key
Send control key given its name
Args:
name (str): Name of the control key to send
"""
if name.lower() in ['w', 'up']:
sock.send(b'\x1bOA')
elif name.lower() in ['s', 'down']:
sock.send(b'\x1bOB')
elif name.lower() in ['a', 'left']:
sock.send(b'\x1bOD')
elif name.lower() in ['d', 'right']:
sock.send(b'\x1bOC')
elif name.lower() in ['esc', 'escape']:
sock.send(b'\x1b')
elif name.lower() in ['bk', 'backspace']:
sock.send(b'\x08')
elif name.lower() in ['del', 'delete']:
sock.send(b'\x7f')
else:
raise ValueError(f"Invalid control key name: {name}")

def sh(self, timeout: Optional[Union[int, float]]=None):
"""Alias for interactive
"""
Expand Down

0 comments on commit 2e371d1

Please sign in to comment.