From 6f59898219a63a8cba94df2412ba0b3e294a6c82 Mon Sep 17 00:00:00 2001 From: foonoxous Date: Sun, 28 Nov 2021 01:52:54 +0000 Subject: [PATCH 1/7] Add Colorama for terminal color handling. --- covert/__main__.py | 3 ++- setup.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/covert/__main__.py b/covert/__main__.py index 894fdab..dba7ee6 100644 --- a/covert/__main__.py +++ b/covert/__main__.py @@ -1,7 +1,7 @@ import os import sys from sys import stderr, stdout - +import colorama import covert from covert.cli import main_benchmark, main_dec, main_enc @@ -151,6 +151,7 @@ def argparse(): def main(): + colorama.init() # CLI argument processing args = argparse() if len(args.outfile) > 1: diff --git a/setup.py b/setup.py index adf0d1d..dcec6ce 100644 --- a/setup.py +++ b/setup.py @@ -19,6 +19,7 @@ "Operating System :: OS Independent", ], install_requires=[ + "colorama>=0.4", "pynacl>=1.4", "tqdm>=4.62", "msgpack>=1.0", From 0a076fd84083b114b7cdaab468a4d9eb03fc29ab Mon Sep 17 00:00:00 2001 From: foonoxous Date: Mon, 29 Nov 2021 22:42:03 +0000 Subject: [PATCH 2/7] Attempt using stderr in terminal as well. --- covert/tty.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/covert/tty.py b/covert/tty.py index 2711a41..7502562 100644 --- a/covert/tty.py +++ b/covert/tty.py @@ -1,5 +1,6 @@ import io import os +import sys import time from contextlib import contextmanager from shutil import get_terminal_size @@ -158,8 +159,9 @@ def write(self, text): self.tty.flush() else: text = text.replace('\n', '\r\n') - for ch in text: - msvcrt.putwch(ch) + sys.stderr.write(text) + #for ch in text: + # msvcrt.putwch(ch) def reader_windows(self): while True: From c81dfc828eab2cceefefebe659b5dd60de592628 Mon Sep 17 00:00:00 2001 From: foonoxous Date: Mon, 29 Nov 2021 22:46:00 +0000 Subject: [PATCH 3/7] Clear screen on full screen mode switching for better compatibility. --- covert/tty.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/covert/tty.py b/covert/tty.py index 7502562..060c40f 100644 --- a/covert/tty.py +++ b/covert/tty.py @@ -133,11 +133,11 @@ def stdio_terminal(): @contextmanager def modeswitch(term): - term.write('\x1B[?1049h') + term.write('\x1B[?1049h\x1B[2J') try: yield finally: - term.write('\x1B[?1049l') + term.write('\x1B[2J\x1B[?1049l') @contextmanager From 9ba2080dc4040c9a651ad0696a224bf30f5cca7d Mon Sep 17 00:00:00 2001 From: foonoxous Date: Mon, 29 Nov 2021 23:12:53 +0000 Subject: [PATCH 4/7] Use colorama-wrapped stdio on cli.py --- covert/cli.py | 78 +++++++++++++++++++++++++-------------------------- 1 file changed, 39 insertions(+), 39 deletions(-) diff --git a/covert/cli.py b/covert/cli.py index 4d9feec..ea97a1c 100644 --- a/covert/cli.py +++ b/covert/cli.py @@ -1,11 +1,11 @@ import mmap import os +import sys import itertools from concurrent.futures import ThreadPoolExecutor from contextlib import suppress from io import BytesIO from pathlib import Path -from sys import stderr, stdin, stdout from time import perf_counter import pyperclip @@ -63,7 +63,7 @@ def run_decryption(infile, args, passwords, identities): f = None if prev and prev.name is not None: r = '' if prev.renamed else '' - progress.write(f'{prev.size:15,d} ๐Ÿ“„ {prev.name:60}{r}', file=stderr) + progress.write(f'{prev.size:15,d} ๐Ÿ“„ {prev.name:60}{r}', file=sys.stderr) if a.curfile: n = a.curfile.name or '' if not n and a.curfile.size is not None and a.curfile.size < TTY_MAX_SIZE: @@ -72,7 +72,7 @@ def run_decryption(infile, args, passwords, identities): if not outdir: outdir = Path(args.outfile).resolve() outdir.mkdir(parents=True, exist_ok=True) - progress.write(f" โ–ถ๏ธ \x1B[1;34m Extracting to \x1B[1;37m{outdir}\x1B[0m", file=stderr) + progress.write(f" โ–ถ๏ธ \x1B[1;34m Extracting to \x1B[1;37m{outdir}\x1B[0m", file=sys.stderr) name = outdir.joinpath(n) if not name.resolve().is_relative_to(outdir) or name.is_reserved(): progress.close() @@ -82,7 +82,7 @@ def run_decryption(infile, args, passwords, identities): elif outdir is None: outdir = False progress.write( - " โ–ถ๏ธ \x1B[1;34m The archive contains files. To extract, use \x1B[1;37m-o PATH\x1B[0m", file=stderr + " โ–ถ๏ธ \x1B[1;34m The archive contains files. To extract, use \x1B[1;37m-o PATH\x1B[0m", file=sys.stderr ) # Next file @@ -99,26 +99,26 @@ def run_decryption(infile, args, passwords, identities): if progress: progress.close() # Print any messages - pretty = stdout.isatty() + pretty = sys.stdout.isatty() for i, m in enumerate(messages): if pretty: - stderr.write("\x1B[1m ๐Ÿ’ฌ\n\x1B[1;34m") - stderr.flush() + sys.stderr.write("\x1B[1m ๐Ÿ’ฌ\n\x1B[1;34m") + sys.stderr.flush() # Replace dangerous characters m = ''.join(c if c.isprintable() or c in ' \t\n' else f'\x1B[31m{repr(c)[1:-1]}\x1B[1;34m' for c in m) try: print(m) finally: if pretty: - stderr.write(f"\x1B[0m") - stderr.flush() + sys.stderr.write(f"\x1B[0m") + sys.stderr.flush() # Print signatures - stderr.write(f' ๐Ÿ”ท File hash: {a.filehash[:12].hex()}\n') + sys.stderr.write(f' ๐Ÿ”ท File hash: {a.filehash[:12].hex()}\n') for valid, key, text in a.signatures: if valid: - stderr.write(f" โœ… {key} {text}\n") + sys.stderr.write(f" โœ… {key} {text}\n") else: - stderr.write(f"\x1B[1;31m โŒ {key} {text}\x1B[0m\n") + sys.stderr.write(f"\x1B[1;31m โŒ {key} {text}\x1B[0m\n") def main_enc(args): @@ -148,36 +148,36 @@ def main_enc(args): l = len(recipients) recipients = list(sorted(set(recipients), key=str)) if len(recipients) < l: - stderr.write(' โš ๏ธ Duplicate recipient keys dropped.\n') + sys.stderr.write(' โš ๏ธ Duplicate recipient keys dropped.\n') # Signatures signatures = {key for keystr in args.identities for key in pubkey.read_sk_any(keystr) if key.edsk} signatures = list(sorted(signatures, key=str)) # Input files if not args.files or True in args.files: - if stdin.isatty(): + if sys.stdin.isatty(): data = tty.editor() # Prune surrounding whitespace data = '\n'.join([l.rstrip() for l in data.split('\n')]).strip('\n') stin = util.encode(data) else: - stin = stdin.buffer + stin = sys.stdin.buffer args.files = [stin] + [f for f in args.files if f != True] # Collect the password hashing results - if passwords and stderr.isatty(): - stderr.write("Password hashing... ") - stderr.flush() + if passwords and sys.stderr.isatty(): + sys.stderr.write("Password hashing... ") + sys.stderr.flush() pwhashes = set(pwhasher) - if passwords and stderr.isatty(): - stderr.write("\r\x1B[K") - stderr.flush() + if passwords and sys.stderr.isatty(): + sys.stderr.write("\r\x1B[K") + sys.stderr.flush() del passwords a = Archive() a.file_index(args.files) if signatures: a.index['s'] = [s.edpk for s in signatures] # Output files - realoutf = open(args.outfile, "wb") if args.outfile else stdout.buffer - if args.armor or not args.outfile and stdout.isatty(): + realoutf = open(args.outfile, "wb") if args.outfile else sys.stdout.buffer + if args.armor or not args.outfile and sys.stdout.isatty(): if a.total_size > (ARMOR_MAX_SIZE if args.outfile else TTY_MAX_SIZE): if not args.outfile: raise ValueError("Too much data for console. How about -o FILE to write a file?") @@ -189,10 +189,10 @@ def main_enc(args): def nextfile_callback(prev, cur): if prev: s, n = prev.size, prev.name - progress.write(f'{s:15,d} ๐Ÿ“„ {n:60}' if n else f'{s:15,d} ๐Ÿ’ฌ ', file=stderr) + progress.write(f'{s:15,d} ๐Ÿ“„ {n:60}' if n else f'{s:15,d} ๐Ÿ’ฌ ', file=sys.stderr) if not cur: a.random_padding(padding) - progress.write(f'\x1B[1;30m{a.padding:15,d} โฌ› \x1B[0m', file=stderr) + progress.write(f'\x1B[1;30m{a.padding:15,d} โฌ› \x1B[0m', file=sys.stderr) a.nextfilecb = nextfile_callback # Main processing @@ -204,7 +204,7 @@ def nextfile_callback(prev, cur): progress.update(len(block)) outf.write(block) # Pretty output printout - if stderr.isatty(): + if sys.stderr.isatty(): # Print a list of files lock = " ๐Ÿ”“ wide-open" if args.wideopen else " ๐Ÿ”’ covert" methods = " ".join( @@ -220,8 +220,8 @@ def nextfile_callback(prev, cur): elif args.paste: lock += f" ๐Ÿ“‹ copied\n" out = f"\n\x1B[1m{lock}\x1B[0m\n" - stderr.write(out) - stderr.flush() + sys.stderr.write(out) + sys.stderr.flush() if outf is not realoutf: outf.seek(0) data = outf.read() @@ -233,15 +233,15 @@ def nextfile_callback(prev, cur): with realoutf: pretty = realoutf.isatty() if pretty: - stderr.write("\x1B[1;30m```\x1B[0;34m\n") - stderr.flush() + sys.stderr.write("\x1B[1;30m```\x1B[0;34m\n") + sys.stderr.flush() try: realoutf.write(f"{data}\n".encode()) realoutf.flush() finally: if pretty: - stderr.write("\x1B[1;30m```\x1B[0m\n") - stderr.flush() + sys.stderr.write("\x1B[1;30m```\x1B[0m\n") + sys.stderr.flush() def main_dec(args): @@ -252,7 +252,7 @@ def main_dec(args): args.askpass = 1 identities = {key for keystr in args.identities for key in pubkey.read_sk_any(keystr)} identities = list(sorted(identities, key=str)) - infile = open(args.files[0], "rb") if args.files else stdin.buffer + infile = open(args.files[0], "rb") if args.files else sys.stdin.buffer # If ASCII armored or TTY, read all input immediately (assumed to be short enough) total_size = os.path.getsize(args.files[0]) if args.files else 0 if infile.isatty(): @@ -278,17 +278,17 @@ def main_dec(args): def pwhashgen(): it = itertools.chain(pwhasher, (passphrase.pwhash(passphrase.ask('Passphrase')[0]) for i in range(args.askpass))) while True: - if stderr.isatty(): - stderr.write("Password hashing... ") - stderr.flush() + if sys.stderr.isatty(): + sys.stderr.write("Password hashing... ") + sys.stderr.flush() try: pwhash = next(it) except StopIteration: break finally: - if stderr.isatty(): - stderr.write("\r\x1B[K") - stderr.flush() + if sys.stderr.isatty(): + sys.stderr.write("\r\x1B[K") + sys.stderr.flush() yield pwhash run_decryption(infile, args, pwhashgen(), identities) From 61b436fa8740387df4541db3ec08b8376091d101 Mon Sep 17 00:00:00 2001 From: foonoxous Date: Mon, 29 Nov 2021 23:23:46 +0000 Subject: [PATCH 5/7] Improve ANSI compatibility with colorama. --- covert/cli.py | 4 ++-- covert/passphrase.py | 4 ++-- covert/tty.py | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/covert/cli.py b/covert/cli.py index ea97a1c..43632ba 100644 --- a/covert/cli.py +++ b/covert/cli.py @@ -168,7 +168,7 @@ def main_enc(args): sys.stderr.flush() pwhashes = set(pwhasher) if passwords and sys.stderr.isatty(): - sys.stderr.write("\r\x1B[K") + sys.stderr.write("\r\x1B[0K") sys.stderr.flush() del passwords a = Archive() @@ -287,7 +287,7 @@ def pwhashgen(): break finally: if sys.stderr.isatty(): - sys.stderr.write("\r\x1B[K") + sys.stderr.write("\r\x1B[0K") sys.stderr.flush() yield pwhash run_decryption(infile, args, pwhashgen(), identities) diff --git a/covert/passphrase.py b/covert/passphrase.py index 7547726..e1c78a4 100644 --- a/covert/passphrase.py +++ b/covert/passphrase.py @@ -116,8 +116,8 @@ def ask(prompt, create=False): else: help += "\n \x1B[1;34mtab \x1B[0;34msuggest a strong password\n\x1B[1;34menter \x1B[0;34mgenerate and use a strong password" help += "\n \x1B[1;34mdown \x1B[0;34mhide input" if visible else "\n \x1B[1;34mup \x1B[0;34mshow input" - out += f'\n{help}\n\x1B[0m\x1B[K\x1B8' - out = out.replace('\n', '\x1B[K\n') + out += f'\n{help}\n\x1B[0m\x1B[0K\x1B8' + out = out.replace('\n', '\x1B[0K\n') term.write(out) for ch in term.reader(): if len(ch) == 1: # Text input diff --git a/covert/tty.py b/covert/tty.py index 060c40f..82e87fb 100644 --- a/covert/tty.py +++ b/covert/tty.py @@ -8,7 +8,7 @@ def editor(): with fullscreen() as term: - term.write(f'\x1B[1;1H\x1B[1;44m ใ€ฐ ENTER MESSAGE ใ€ฐ (ESC to finish)\x1B[K\x1B[0m\n') + term.write(f'\x1B[1;1H\x1B[1;44m ใ€ฐ ENTER MESSAGE ใ€ฐ (ESC to finish)\x1B[0K\x1B[0m\n') data = "" startrow = row = col = 0 while True: @@ -57,7 +57,7 @@ def editor(): win = get_terminal_size() startrow = min(max(0, row - 1), startrow) startrow = max(row - win.lines + 2, startrow) - draw = "\x1B[K\n".join(l[:win.columns - 1] for l in data.split("\n")[startrow:startrow + win.lines - 1]) + draw = "\x1B[0K\n".join(l[:win.columns - 1] for l in data.split("\n")[startrow:startrow + win.lines - 1]) term.write(f"\x1B[2;1H{draw}\x1B[J\x1B[{row - startrow + 2};{col+1}H") @@ -84,7 +84,7 @@ def read_hidden(prompt): term.write(f"\x1B7 ({len(data)}) \x1B8") finally: # Return to start of line and clear the prompt - term.write(f"\x1B[0m\r\x1B[K") + term.write(f"\x1B[0m\r\x1B[0K") @contextmanager From 674de8bccaf350cd8f15d72eb176d30166f8fa75 Mon Sep 17 00:00:00 2001 From: foonoxous Date: Mon, 29 Nov 2021 23:35:23 +0000 Subject: [PATCH 6/7] Improve ANSI compatibility with colorama. --- covert/passphrase.py | 7 +++++-- covert/tty.py | 5 +++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/covert/passphrase.py b/covert/passphrase.py index e1c78a4..45a4f52 100644 --- a/covert/passphrase.py +++ b/covert/passphrase.py @@ -103,9 +103,11 @@ def ask(prompt, create=False): pwtitle, pwrest = pwhint.split('\n', 1) else: pwtitle, pwrest, valid = 'Covert decryption', '\n', True - out = f"\x1B[1;1H\x1B[1;37;44m{pwtitle:56}\x1B[0m\n{pwrest}{prompt}: " + out = f"\x1B[1;1H\x1B[1;37;44m{pwtitle:56}\x1B[0m\n{pwrest}" pwdisp = pwd if visible else len(pwd) * 'ยท' - out += f"{pwdisp[:pos]}\x1B7{pwdisp[pos:]}" + beforecursor = f"{prompt}: {pwdisp[:pos]}" + aftercursor = pwdisp[pos:] + out += f"{beforecursor}{aftercursor}" help = '' if pwd or not create: help += "\n \x1B[1;34mtab \x1B[0;34m" @@ -118,6 +120,7 @@ def ask(prompt, create=False): help += "\n \x1B[1;34mdown \x1B[0;34mhide input" if visible else "\n \x1B[1;34mup \x1B[0;34mshow input" out += f'\n{help}\n\x1B[0m\x1B[0K\x1B8' out = out.replace('\n', '\x1B[0K\n') + out += f"\x1B[5;{1 + len(beforecursor)}H" term.write(out) for ch in term.reader(): if len(ch) == 1: # Text input diff --git a/covert/tty.py b/covert/tty.py index 82e87fb..1d3e32d 100644 --- a/covert/tty.py +++ b/covert/tty.py @@ -58,7 +58,7 @@ def editor(): startrow = min(max(0, row - 1), startrow) startrow = max(row - win.lines + 2, startrow) draw = "\x1B[0K\n".join(l[:win.columns - 1] for l in data.split("\n")[startrow:startrow + win.lines - 1]) - term.write(f"\x1B[2;1H{draw}\x1B[J\x1B[{row - startrow + 2};{col+1}H") + term.write(f"\x1B[2;1H{draw}\x1B[0J\x1B[{row - startrow + 2};{col+1}H") def read_hidden(prompt): @@ -81,7 +81,8 @@ def read_hidden(prompt): elif len(key) == 1: data += key t = time.monotonic() - term.write(f"\x1B7 ({len(data)}) \x1B8") + status = f" ({len(data)}) " + term.write(f"{status}\x1B[{len(status)}D") finally: # Return to start of line and clear the prompt term.write(f"\x1B[0m\r\x1B[0K") From 17edf7da147d206b00b64051012fc329e097d869 Mon Sep 17 00:00:00 2001 From: foonoxous Date: Mon, 29 Nov 2021 23:38:13 +0000 Subject: [PATCH 7/7] Improve ANSI compatibility with colorama. --- covert/passphrase.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/covert/passphrase.py b/covert/passphrase.py index 45a4f52..6aac9ab 100644 --- a/covert/passphrase.py +++ b/covert/passphrase.py @@ -118,9 +118,9 @@ def ask(prompt, create=False): else: help += "\n \x1B[1;34mtab \x1B[0;34msuggest a strong password\n\x1B[1;34menter \x1B[0;34mgenerate and use a strong password" help += "\n \x1B[1;34mdown \x1B[0;34mhide input" if visible else "\n \x1B[1;34mup \x1B[0;34mshow input" - out += f'\n{help}\n\x1B[0m\x1B[0K\x1B8' + out += f'\n{help}\n' out = out.replace('\n', '\x1B[0K\n') - out += f"\x1B[5;{1 + len(beforecursor)}H" + out += f"\x1B[0m\x1B[0K\x1B[5;{1 + len(beforecursor)}H" term.write(out) for ch in term.reader(): if len(ch) == 1: # Text input