Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compatibility with legacy Windows terminals #31

Merged
merged 8 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion covert/__main__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import sys
from sys import stderr, stdout

import colorama
import covert
from covert.cli import main_benchmark, main_dec, main_enc

Expand Down Expand Up @@ -151,6 +151,7 @@ def argparse():


def main():
colorama.init()
# CLI argument processing
args = argparse()
if len(args.outfile) > 1:
Expand Down
78 changes: 39 additions & 39 deletions covert/cli.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import mmap
import os
import sys
import itertools
from concurrent.futures import ThreadPoolExecutor
from contextlib import suppress
from io import BytesIO
from pathlib import Path
from sys import stderr, stdin, stdout
from time import perf_counter

import pyperclip
Expand Down Expand Up @@ -63,7 +63,7 @@ def run_decryption(infile, args, passwords, identities):
f = None
if prev and prev.name is not None:
r = '<renamed>' if prev.renamed else ''
progress.write(f'{prev.size:15,d} 📄 {prev.name:60}{r}', file=stderr)
progress.write(f'{prev.size:15,d} 📄 {prev.name:60}{r}', file=sys.stderr)
if a.curfile:
n = a.curfile.name or ''
if not n and a.curfile.size is not None and a.curfile.size < TTY_MAX_SIZE:
Expand All @@ -72,7 +72,7 @@ def run_decryption(infile, args, passwords, identities):
if not outdir:
outdir = Path(args.outfile).resolve()
outdir.mkdir(parents=True, exist_ok=True)
progress.write(f" ▶️ \x1B[1;34m Extracting to \x1B[1;37m{outdir}\x1B[0m", file=stderr)
progress.write(f" ▶️ \x1B[1;34m Extracting to \x1B[1;37m{outdir}\x1B[0m", file=sys.stderr)
name = outdir.joinpath(n)
if not name.resolve().is_relative_to(outdir) or name.is_reserved():
progress.close()
Expand All @@ -82,7 +82,7 @@ def run_decryption(infile, args, passwords, identities):
elif outdir is None:
outdir = False
progress.write(
" ▶️ \x1B[1;34m The archive contains files. To extract, use \x1B[1;37m-o PATH\x1B[0m", file=stderr
" ▶️ \x1B[1;34m The archive contains files. To extract, use \x1B[1;37m-o PATH\x1B[0m", file=sys.stderr
)

# Next file
Expand All @@ -99,26 +99,26 @@ def run_decryption(infile, args, passwords, identities):
if progress:
progress.close()
# Print any messages
pretty = stdout.isatty()
pretty = sys.stdout.isatty()
for i, m in enumerate(messages):
if pretty:
stderr.write("\x1B[1m 💬\n\x1B[1;34m")
stderr.flush()
sys.stderr.write("\x1B[1m 💬\n\x1B[1;34m")
sys.stderr.flush()
# Replace dangerous characters
m = ''.join(c if c.isprintable() or c in ' \t\n' else f'\x1B[31m{repr(c)[1:-1]}\x1B[1;34m' for c in m)
try:
print(m)
finally:
if pretty:
stderr.write(f"\x1B[0m")
stderr.flush()
sys.stderr.write(f"\x1B[0m")
sys.stderr.flush()
# Print signatures
stderr.write(f' 🔷 File hash: {a.filehash[:12].hex()}\n')
sys.stderr.write(f' 🔷 File hash: {a.filehash[:12].hex()}\n')
for valid, key, text in a.signatures:
if valid:
stderr.write(f" ✅ {key} {text}\n")
sys.stderr.write(f" ✅ {key} {text}\n")
else:
stderr.write(f"\x1B[1;31m ❌ {key} {text}\x1B[0m\n")
sys.stderr.write(f"\x1B[1;31m ❌ {key} {text}\x1B[0m\n")


def main_enc(args):
Expand Down Expand Up @@ -148,36 +148,36 @@ def main_enc(args):
l = len(recipients)
recipients = list(sorted(set(recipients), key=str))
if len(recipients) < l:
stderr.write(' ⚠️ Duplicate recipient keys dropped.\n')
sys.stderr.write(' ⚠️ Duplicate recipient keys dropped.\n')
# Signatures
signatures = {key for keystr in args.identities for key in pubkey.read_sk_any(keystr) if key.edsk}
signatures = list(sorted(signatures, key=str))
# Input files
if not args.files or True in args.files:
if stdin.isatty():
if sys.stdin.isatty():
data = tty.editor()
# Prune surrounding whitespace
data = '\n'.join([l.rstrip() for l in data.split('\n')]).strip('\n')
stin = util.encode(data)
else:
stin = stdin.buffer
stin = sys.stdin.buffer
args.files = [stin] + [f for f in args.files if f != True]
# Collect the password hashing results
if passwords and stderr.isatty():
stderr.write("Password hashing... ")
stderr.flush()
if passwords and sys.stderr.isatty():
sys.stderr.write("Password hashing... ")
sys.stderr.flush()
pwhashes = set(pwhasher)
if passwords and stderr.isatty():
stderr.write("\r\x1B[K")
stderr.flush()
if passwords and sys.stderr.isatty():
sys.stderr.write("\r\x1B[0K")
sys.stderr.flush()
del passwords
a = Archive()
a.file_index(args.files)
if signatures:
a.index['s'] = [s.edpk for s in signatures]
# Output files
realoutf = open(args.outfile, "wb") if args.outfile else stdout.buffer
if args.armor or not args.outfile and stdout.isatty():
realoutf = open(args.outfile, "wb") if args.outfile else sys.stdout.buffer
if args.armor or not args.outfile and sys.stdout.isatty():
if a.total_size > (ARMOR_MAX_SIZE if args.outfile else TTY_MAX_SIZE):
if not args.outfile:
raise ValueError("Too much data for console. How about -o FILE to write a file?")
Expand All @@ -189,10 +189,10 @@ def main_enc(args):
def nextfile_callback(prev, cur):
if prev:
s, n = prev.size, prev.name
progress.write(f'{s:15,d} 📄 {n:60}' if n else f'{s:15,d} 💬 <message>', file=stderr)
progress.write(f'{s:15,d} 📄 {n:60}' if n else f'{s:15,d} 💬 <message>', file=sys.stderr)
if not cur:
a.random_padding(padding)
progress.write(f'\x1B[1;30m{a.padding:15,d} ⬛ <padding>\x1B[0m', file=stderr)
progress.write(f'\x1B[1;30m{a.padding:15,d} ⬛ <padding>\x1B[0m', file=sys.stderr)

a.nextfilecb = nextfile_callback
# Main processing
Expand All @@ -204,7 +204,7 @@ def nextfile_callback(prev, cur):
progress.update(len(block))
outf.write(block)
# Pretty output printout
if stderr.isatty():
if sys.stderr.isatty():
# Print a list of files
lock = " 🔓 wide-open" if args.wideopen else " 🔒 covert"
methods = " ".join(
Expand All @@ -220,8 +220,8 @@ def nextfile_callback(prev, cur):
elif args.paste:
lock += f" 📋 copied\n"
out = f"\n\x1B[1m{lock}\x1B[0m\n"
stderr.write(out)
stderr.flush()
sys.stderr.write(out)
sys.stderr.flush()
if outf is not realoutf:
outf.seek(0)
data = outf.read()
Expand All @@ -233,15 +233,15 @@ def nextfile_callback(prev, cur):
with realoutf:
pretty = realoutf.isatty()
if pretty:
stderr.write("\x1B[1;30m```\x1B[0;34m\n")
stderr.flush()
sys.stderr.write("\x1B[1;30m```\x1B[0;34m\n")
sys.stderr.flush()
try:
realoutf.write(f"{data}\n".encode())
realoutf.flush()
finally:
if pretty:
stderr.write("\x1B[1;30m```\x1B[0m\n")
stderr.flush()
sys.stderr.write("\x1B[1;30m```\x1B[0m\n")
sys.stderr.flush()


def main_dec(args):
Expand All @@ -252,7 +252,7 @@ def main_dec(args):
args.askpass = 1
identities = {key for keystr in args.identities for key in pubkey.read_sk_any(keystr)}
identities = list(sorted(identities, key=str))
infile = open(args.files[0], "rb") if args.files else stdin.buffer
infile = open(args.files[0], "rb") if args.files else sys.stdin.buffer
# If ASCII armored or TTY, read all input immediately (assumed to be short enough)
total_size = os.path.getsize(args.files[0]) if args.files else 0
if infile.isatty():
Expand All @@ -278,17 +278,17 @@ def main_dec(args):
def pwhashgen():
it = itertools.chain(pwhasher, (passphrase.pwhash(passphrase.ask('Passphrase')[0]) for i in range(args.askpass)))
while True:
if stderr.isatty():
stderr.write("Password hashing... ")
stderr.flush()
if sys.stderr.isatty():
sys.stderr.write("Password hashing... ")
sys.stderr.flush()
try:
pwhash = next(it)
except StopIteration:
break
finally:
if stderr.isatty():
stderr.write("\r\x1B[K")
stderr.flush()
if sys.stderr.isatty():
sys.stderr.write("\r\x1B[0K")
sys.stderr.flush()
yield pwhash
run_decryption(infile, args, pwhashgen(), identities)

Expand Down
11 changes: 7 additions & 4 deletions covert/passphrase.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ def ask(prompt, create=False):
pwtitle, pwrest = pwhint.split('\n', 1)
else:
pwtitle, pwrest, valid = 'Covert decryption', '\n', True
out = f"\x1B[1;1H\x1B[1;37;44m{pwtitle:56}\x1B[0m\n{pwrest}{prompt}: "
out = f"\x1B[1;1H\x1B[1;37;44m{pwtitle:56}\x1B[0m\n{pwrest}"
pwdisp = pwd if visible else len(pwd) * '·'
out += f"{pwdisp[:pos]}\x1B7{pwdisp[pos:]}"
beforecursor = f"{prompt}: {pwdisp[:pos]}"
aftercursor = pwdisp[pos:]
out += f"{beforecursor}{aftercursor}"
help = ''
if pwd or not create:
help += "\n \x1B[1;34mtab \x1B[0;34m"
Expand All @@ -116,8 +118,9 @@ def ask(prompt, create=False):
else:
help += "\n \x1B[1;34mtab \x1B[0;34msuggest a strong password\n\x1B[1;34menter \x1B[0;34mgenerate and use a strong password"
help += "\n \x1B[1;34mdown \x1B[0;34mhide input" if visible else "\n \x1B[1;34mup \x1B[0;34mshow input"
out += f'\n{help}\n\x1B[0m\x1B[K\x1B8'
out = out.replace('\n', '\x1B[K\n')
out += f'\n{help}\n'
out = out.replace('\n', '\x1B[0K\n')
out += f"\x1B[0m\x1B[0K\x1B[5;{1 + len(beforecursor)}H"
term.write(out)
for ch in term.reader():
if len(ch) == 1: # Text input
Expand Down
21 changes: 12 additions & 9 deletions covert/tty.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import io
import os
import sys
import time
from contextlib import contextmanager
from shutil import get_terminal_size


def editor():
with fullscreen() as term:
term.write(f'\x1B[1;1H\x1B[1;44m 〰 ENTER MESSAGE 〰 (ESC to finish)\x1B[K\x1B[0m\n')
term.write(f'\x1B[1;1H\x1B[1;44m 〰 ENTER MESSAGE 〰 (ESC to finish)\x1B[0K\x1B[0m\n')
data = ""
startrow = row = col = 0
while True:
Expand Down Expand Up @@ -56,8 +57,8 @@ def editor():
win = get_terminal_size()
startrow = min(max(0, row - 1), startrow)
startrow = max(row - win.lines + 2, startrow)
draw = "\x1B[K\n".join(l[:win.columns - 1] for l in data.split("\n")[startrow:startrow + win.lines - 1])
term.write(f"\x1B[2;1H{draw}\x1B[J\x1B[{row - startrow + 2};{col+1}H")
draw = "\x1B[0K\n".join(l[:win.columns - 1] for l in data.split("\n")[startrow:startrow + win.lines - 1])
term.write(f"\x1B[2;1H{draw}\x1B[0J\x1B[{row - startrow + 2};{col+1}H")


def read_hidden(prompt):
Expand All @@ -80,10 +81,11 @@ def read_hidden(prompt):
elif len(key) == 1:
data += key
t = time.monotonic()
term.write(f"\x1B7 ({len(data)}) \x1B8")
status = f" ({len(data)}) "
term.write(f"{status}\x1B[{len(status)}D")
finally:
# Return to start of line and clear the prompt
term.write(f"\x1B[0m\r\x1B[K")
term.write(f"\x1B[0m\r\x1B[0K")


@contextmanager
Expand Down Expand Up @@ -132,11 +134,11 @@ def stdio_terminal():

@contextmanager
def modeswitch(term):
term.write('\x1B[?1049h')
term.write('\x1B[?1049h\x1B[2J')
try:
yield
finally:
term.write('\x1B[?1049l')
term.write('\x1B[2J\x1B[?1049l')


@contextmanager
Expand All @@ -158,8 +160,9 @@ def write(self, text):
self.tty.flush()
else:
text = text.replace('\n', '\r\n')
for ch in text:
msvcrt.putwch(ch)
sys.stderr.write(text)
#for ch in text:
# msvcrt.putwch(ch)

def reader_windows(self):
while True:
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"Operating System :: OS Independent",
],
install_requires=[
"colorama>=0.4",
"pynacl>=1.4",
"tqdm>=4.62",
"msgpack>=1.0",
Expand Down