From d8386647693ae181476ef75e08528477ee792a0b Mon Sep 17 00:00:00 2001 From: Stepan Snigirev Date: Wed, 16 Nov 2022 23:05:57 +0100 Subject: [PATCH] change QR scanner to command mode, allow scanning seeds --- src/gui/screens/mnemonic.py | 4 +- src/gui/screens/prompt.py | 6 +- src/hosts/qr.py | 121 +++++++++++++++++++++--------------- src/specter.py | 39 +++++++++++- 4 files changed, 114 insertions(+), 56 deletions(-) diff --git a/src/gui/screens/mnemonic.py b/src/gui/screens/mnemonic.py index 474d25d8..d47eb7e6 100644 --- a/src/gui/screens/mnemonic.py +++ b/src/gui/screens/mnemonic.py @@ -26,10 +26,10 @@ def __init__(self, mnemonic="", title="Your recovery phrase:", note=None): class MnemonicPrompt(Prompt): def __init__(self, mnemonic="", title="Your recovery phrase:", note=None): - super().__init__(title, message="") + super().__init__(title, message="", note=note) table = MnemonicTable(self) table.set_mnemonic(mnemonic) - table.align(self.title, lv.ALIGN.OUT_BOTTOM_MID, 0, 30) + table.align(self.title, lv.ALIGN.OUT_BOTTOM_MID, 0, 50) class ExportMnemonicScreen(MnemonicScreen): diff --git a/src/gui/screens/prompt.py b/src/gui/screens/prompt.py index a5d81c77..e5081c77 100644 --- a/src/gui/screens/prompt.py +++ b/src/gui/screens/prompt.py @@ -6,9 +6,13 @@ class Prompt(Screen): def __init__(self, title="Are you sure?", message="Make a choice", - confirm_text="Confirm", cancel_text="Cancel"): + confirm_text="Confirm", cancel_text="Cancel", note=None): super().__init__() self.title = add_label(title, scr=self, style="title") + if note is not None: + self.note = add_label(note, scr=self, style="hint") + self.note.align(self.title, lv.ALIGN.OUT_BOTTOM_MID, 0, 5) + obj = self.note self.page = lv.page(self) self.page.set_size(480, 600) self.message = add_label(message, scr=self.page) diff --git a/src/hosts/qr.py b/src/hosts/qr.py index f24a7b70..e9b02089 100644 --- a/src/hosts/qr.py +++ b/src/hosts/qr.py @@ -87,7 +87,7 @@ def __init__(self, path, trigger=None, uart="YA", baudrate=9600): self.scanning = False self.parts = None self.raw = False - self.chunk_timeout = 0.1 + self.chunk_timeout = 0.5 @property def MASK(self): @@ -254,10 +254,25 @@ def clean_uart(self): self.uart.read() def _stop_scanner(self): + if self.trigger is not None: + self.trigger.on() # trigger is reversed, so on means disable + else: + self.set_setting(SCAN_ADDR, 0) + + def _start_scanner(self): + self.clean_uart() + if self.trigger is not None: + self.trigger.off() + else: + self.set_setting(SCAN_ADDR, 1) + + async def _restart_scanner(self): if self.trigger is not None: self.trigger.on() + await asyncio.sleep_ms(30) + self.trigger.off() else: - self.set_setting(SETTINGS_ADDR, self.CMD_MODE) + self.set_setting(SCAN_ADDR, 1) def stop_scanning(self): self.scanning = False @@ -273,14 +288,10 @@ def abort(self): def tmpfile(self): return self.path+"/tmp" - async def scan(self, raw=False, chunk_timeout=0.1): - self.clean_uart() + async def scan(self, raw=True, chunk_timeout=0.5): self.raw = raw self.chunk_timeout = chunk_timeout - if self.trigger is not None: - self.trigger.off() - else: - self.set_setting(SETTINGS_ADDR, self.CONT_MODE) + self._start_scanner() # clear the data with open(self.tmpfile,"wb") as f: pass @@ -312,56 +323,63 @@ async def scan(self, raw=False, chunk_timeout=0.1): self.f = open(self.path+"/data.txt", "rb") return self.f + def check_animated(self, data: bytes): + try: + # should be only ascii characters + d = data.decode().strip().lower() + if d.startswith("ur:"): # ur:bytes or ur:crypto-psbt + return True + # this will raise if it's not a valid prefix + self.parse_prefix(data.split(b" ")[0]) + return True + except Exception as e: + print("Exception at check animated", e) + return False + return False + async def update(self): if not self.scanning: self.clean_uart() return # read all available data if self.uart.any() > 0: - if self.raw: # read only one QR code + if not self.animated: # read only one QR code + # let all data to come on the first QR code await asyncio.sleep(self.chunk_timeout) d = self.uart.read() - if d[-len(self.EOL):] == self.EOL: - d = d[:-len(self.EOL)] - self._stop_scanner() - fname = self.path + "/data.txt" - with open(fname, "wb") as fout: - fout.write(d) - self.stop_scanning() - return + # if not animated -> stop and return + if not self.check_animated(d): + if d[-len(self.EOL):] == self.EOL: + d = d[:-len(self.EOL)] + self._stop_scanner() + fname = self.path + "/data.txt" + with open(fname, "wb") as fout: + fout.write(d) + self.stop_scanning() + return else: + # if animated - we process chunks one at a time d = self.uart.read() - num_lines = d.count(self.EOL) - # no new lines - just write and continue - if num_lines == 0: - with open(self.tmpfile,"ab") as f: - f.write(d) - return - # slice to write - start = 0 - end = len(d) - while num_lines >= 1: # last one is incomplete - end = d.index(self.EOL, start) - with open(self.tmpfile,"ab") as f: - f.write(d[start:end]) - try: - if self.process_chunk(): - self.stop_scanning() - break - # animated in trigger mode - elif self.trigger is not None: - self.trigger.on() - await asyncio.sleep_ms(30) - self.trigger.off() - except Exception as e: - print("QR exception", e) - self.stop_scanning() - raise e - num_lines -= 1 - start = end + len(self.EOL) - # erase the content of the file - with open(self.tmpfile, "wb") as f: - pass + # no new lines - just write and continue + if d[-len(self.EOL):] != self.EOL: + with open(self.tmpfile,"ab") as f: + f.write(d) + return + # restart scan while processing data + await self._restart_scanner() + # slice to write + d = d[:-len(self.EOL)] + with open(self.tmpfile,"ab") as f: + f.write(d) + try: + if self.process_chunk(): + self.stop_scanning() + except Exception as e: + self.stop_scanning() + raise e + # erase the content of the file + with open(self.tmpfile, "wb") as f: + pass def process_chunk(self): """Returns true when scanning complete""" @@ -532,9 +550,10 @@ def process_normal(self, f): else: return False - def parse_prefix(self, prefix): + def parse_prefix(self, prefix: bytes): + print(prefix) if not prefix.startswith(b"p") or b"of" not in prefix: - raise HostError("Invalid prefix") + raise HostError("Invalid prefix, should be in pMofN format") m, n = prefix[1:].split(b"of") m = int(m) n = int(n) @@ -542,7 +561,7 @@ def parse_prefix(self, prefix): raise HostError("Invalid prefix") return m, n - async def get_data(self, raw=False, chunk_timeout=0.1): + async def get_data(self, raw=True, chunk_timeout=0.5): delete_recursively(self.path) if self.manager is not None: # pass self so user can abort diff --git a/src/specter.py b/src/specter.py index 90b086b5..b3a43f14 100644 --- a/src/specter.py +++ b/src/specter.py @@ -235,7 +235,7 @@ async def import_mnemonic(self): last=(255, None)) if host == 255: return - stream = await host.get_data(raw=True, chunk_timeout=0.5) + stream = await host.get_data() if not stream: return data = stream.read() @@ -563,6 +563,35 @@ async def unlock(self): self.GLOBAL = settings BaseApp.GLOBAL = settings + async def maybe_import_mnemonic(self, stream, popup=False, show_fn=None): + if show_fn is None: + show_fn = self.gui.show_screen(popup) + data = stream.read(240) # one word is at most 8 chars, so total len is < 240 even if it has prefix of some kind (for future) + mnemonic_type = "" + # digital mnemonic + d = data.strip() + if len(d) >= 4*12 and len(d) <= 4*24 and len(d) % 12 == 0 and (b" " not in d): + mnemonic = " ".join([bip39.WORDLIST[int(d[4*i:4*i+4])] for i in range(len(d)//4)]) + mnemonic_type = "digital" + # binary mnemonic + elif len(data) >= 16 and len(data) <= 32: + mnemonic = bip39.mnemonic_from_bytes(data) + mnemonic_type = "binary" + # text mnemonic + else: + mnemonic = data.decode() + # split on \n and \r to avoid double-scan + mnemonic = mnemonic.split("\r")[0].split("\n")[0] + if not bip39.mnemonic_is_valid(mnemonic): + raise SpecterError("Invalid data: %r" % mnemonic) + mnemonic_type = "text" + scr = MnemonicPrompt(title="Imported mnemonic:", mnemonic=mnemonic, note="Data looks like a %s mnemonic.\nDo you want to use it?" % mnemonic_type) + # confirm mnemonic + if not await show_fn(scr): + return + self.keystore.set_mnemonic(mnemonic, "") + self.init_apps() + async def process_host_request(self, stream, popup=True, appname=None, show_fn=None): """ This method is called whenever we got data from the host. @@ -585,7 +614,13 @@ async def process_host_request(self, stream, popup=True, appname=None, show_fn=N if app.can_process(stream): matching_apps.append(app) if len(matching_apps) == 0: - raise HostError("Can't find matching app for this request:\n\n %r" % stream.read(100)) + stream.seek(0) + try: + await self.maybe_import_mnemonic(stream, popup, show_fn) + return + except Exception as e: + print(e) + raise HostError("Can't find matching app for this request:\n\n %r" % stream.read(100)) # TODO: if more than one - ask which one to use if len(matching_apps) > 1: raise HostError(