Skip to content

Commit

Permalink
change QR scanner to command mode, allow scanning seeds
Browse files Browse the repository at this point in the history
  • Loading branch information
stepansnigirev committed Nov 16, 2022
1 parent 4f6ed13 commit d838664
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 56 deletions.
4 changes: 2 additions & 2 deletions src/gui/screens/mnemonic.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ def __init__(self, mnemonic="", title="Your recovery phrase:", note=None):

class MnemonicPrompt(Prompt):
def __init__(self, mnemonic="", title="Your recovery phrase:", note=None):
super().__init__(title, message="")
super().__init__(title, message="", note=note)
table = MnemonicTable(self)
table.set_mnemonic(mnemonic)
table.align(self.title, lv.ALIGN.OUT_BOTTOM_MID, 0, 30)
table.align(self.title, lv.ALIGN.OUT_BOTTOM_MID, 0, 50)


class ExportMnemonicScreen(MnemonicScreen):
Expand Down
6 changes: 5 additions & 1 deletion src/gui/screens/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@

class Prompt(Screen):
def __init__(self, title="Are you sure?", message="Make a choice",
confirm_text="Confirm", cancel_text="Cancel"):
confirm_text="Confirm", cancel_text="Cancel", note=None):
super().__init__()
self.title = add_label(title, scr=self, style="title")
if note is not None:
self.note = add_label(note, scr=self, style="hint")
self.note.align(self.title, lv.ALIGN.OUT_BOTTOM_MID, 0, 5)
obj = self.note
self.page = lv.page(self)
self.page.set_size(480, 600)
self.message = add_label(message, scr=self.page)
Expand Down
121 changes: 70 additions & 51 deletions src/hosts/qr.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(self, path, trigger=None, uart="YA", baudrate=9600):
self.scanning = False
self.parts = None
self.raw = False
self.chunk_timeout = 0.1
self.chunk_timeout = 0.5

@property
def MASK(self):
Expand Down Expand Up @@ -254,10 +254,25 @@ def clean_uart(self):
self.uart.read()

def _stop_scanner(self):
if self.trigger is not None:
self.trigger.on() # trigger is reversed, so on means disable
else:
self.set_setting(SCAN_ADDR, 0)

def _start_scanner(self):
self.clean_uart()
if self.trigger is not None:
self.trigger.off()
else:
self.set_setting(SCAN_ADDR, 1)

async def _restart_scanner(self):
if self.trigger is not None:
self.trigger.on()
await asyncio.sleep_ms(30)
self.trigger.off()
else:
self.set_setting(SETTINGS_ADDR, self.CMD_MODE)
self.set_setting(SCAN_ADDR, 1)

def stop_scanning(self):
self.scanning = False
Expand All @@ -273,14 +288,10 @@ def abort(self):
def tmpfile(self):
return self.path+"/tmp"

async def scan(self, raw=False, chunk_timeout=0.1):
self.clean_uart()
async def scan(self, raw=True, chunk_timeout=0.5):
self.raw = raw
self.chunk_timeout = chunk_timeout
if self.trigger is not None:
self.trigger.off()
else:
self.set_setting(SETTINGS_ADDR, self.CONT_MODE)
self._start_scanner()
# clear the data
with open(self.tmpfile,"wb") as f:
pass
Expand Down Expand Up @@ -312,56 +323,63 @@ async def scan(self, raw=False, chunk_timeout=0.1):
self.f = open(self.path+"/data.txt", "rb")
return self.f

def check_animated(self, data: bytes):
try:
# should be only ascii characters
d = data.decode().strip().lower()
if d.startswith("ur:"): # ur:bytes or ur:crypto-psbt
return True
# this will raise if it's not a valid prefix
self.parse_prefix(data.split(b" ")[0])
return True
except Exception as e:
print("Exception at check animated", e)
return False
return False

async def update(self):
if not self.scanning:
self.clean_uart()
return
# read all available data
if self.uart.any() > 0:
if self.raw: # read only one QR code
if not self.animated: # read only one QR code
# let all data to come on the first QR code
await asyncio.sleep(self.chunk_timeout)
d = self.uart.read()
if d[-len(self.EOL):] == self.EOL:
d = d[:-len(self.EOL)]
self._stop_scanner()
fname = self.path + "/data.txt"
with open(fname, "wb") as fout:
fout.write(d)
self.stop_scanning()
return
# if not animated -> stop and return
if not self.check_animated(d):
if d[-len(self.EOL):] == self.EOL:
d = d[:-len(self.EOL)]
self._stop_scanner()
fname = self.path + "/data.txt"
with open(fname, "wb") as fout:
fout.write(d)
self.stop_scanning()
return
else:
# if animated - we process chunks one at a time
d = self.uart.read()
num_lines = d.count(self.EOL)
# no new lines - just write and continue
if num_lines == 0:
with open(self.tmpfile,"ab") as f:
f.write(d)
return
# slice to write
start = 0
end = len(d)
while num_lines >= 1: # last one is incomplete
end = d.index(self.EOL, start)
with open(self.tmpfile,"ab") as f:
f.write(d[start:end])
try:
if self.process_chunk():
self.stop_scanning()
break
# animated in trigger mode
elif self.trigger is not None:
self.trigger.on()
await asyncio.sleep_ms(30)
self.trigger.off()
except Exception as e:
print("QR exception", e)
self.stop_scanning()
raise e
num_lines -= 1
start = end + len(self.EOL)
# erase the content of the file
with open(self.tmpfile, "wb") as f:
pass
# no new lines - just write and continue
if d[-len(self.EOL):] != self.EOL:
with open(self.tmpfile,"ab") as f:
f.write(d)
return
# restart scan while processing data
await self._restart_scanner()
# slice to write
d = d[:-len(self.EOL)]
with open(self.tmpfile,"ab") as f:
f.write(d)
try:
if self.process_chunk():
self.stop_scanning()
except Exception as e:
self.stop_scanning()
raise e
# erase the content of the file
with open(self.tmpfile, "wb") as f:
pass

def process_chunk(self):
"""Returns true when scanning complete"""
Expand Down Expand Up @@ -532,17 +550,18 @@ def process_normal(self, f):
else:
return False

def parse_prefix(self, prefix):
def parse_prefix(self, prefix: bytes):
print(prefix)
if not prefix.startswith(b"p") or b"of" not in prefix:
raise HostError("Invalid prefix")
raise HostError("Invalid prefix, should be in pMofN format")
m, n = prefix[1:].split(b"of")
m = int(m)
n = int(n)
if n < m or m < 0 or n < 0:
raise HostError("Invalid prefix")
return m, n

async def get_data(self, raw=False, chunk_timeout=0.1):
async def get_data(self, raw=True, chunk_timeout=0.5):
delete_recursively(self.path)
if self.manager is not None:
# pass self so user can abort
Expand Down
39 changes: 37 additions & 2 deletions src/specter.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ async def import_mnemonic(self):
last=(255, None))
if host == 255:
return
stream = await host.get_data(raw=True, chunk_timeout=0.5)
stream = await host.get_data()
if not stream:
return
data = stream.read()
Expand Down Expand Up @@ -563,6 +563,35 @@ async def unlock(self):
self.GLOBAL = settings
BaseApp.GLOBAL = settings

async def maybe_import_mnemonic(self, stream, popup=False, show_fn=None):
if show_fn is None:
show_fn = self.gui.show_screen(popup)
data = stream.read(240) # one word is at most 8 chars, so total len is < 240 even if it has prefix of some kind (for future)
mnemonic_type = ""
# digital mnemonic
d = data.strip()
if len(d) >= 4*12 and len(d) <= 4*24 and len(d) % 12 == 0 and (b" " not in d):
mnemonic = " ".join([bip39.WORDLIST[int(d[4*i:4*i+4])] for i in range(len(d)//4)])
mnemonic_type = "digital"
# binary mnemonic
elif len(data) >= 16 and len(data) <= 32:
mnemonic = bip39.mnemonic_from_bytes(data)
mnemonic_type = "binary"
# text mnemonic
else:
mnemonic = data.decode()
# split on \n and \r to avoid double-scan
mnemonic = mnemonic.split("\r")[0].split("\n")[0]
if not bip39.mnemonic_is_valid(mnemonic):
raise SpecterError("Invalid data: %r" % mnemonic)
mnemonic_type = "text"
scr = MnemonicPrompt(title="Imported mnemonic:", mnemonic=mnemonic, note="Data looks like a %s mnemonic.\nDo you want to use it?" % mnemonic_type)
# confirm mnemonic
if not await show_fn(scr):
return
self.keystore.set_mnemonic(mnemonic, "")
self.init_apps()

async def process_host_request(self, stream, popup=True, appname=None, show_fn=None):
"""
This method is called whenever we got data from the host.
Expand All @@ -585,7 +614,13 @@ async def process_host_request(self, stream, popup=True, appname=None, show_fn=N
if app.can_process(stream):
matching_apps.append(app)
if len(matching_apps) == 0:
raise HostError("Can't find matching app for this request:\n\n %r" % stream.read(100))
stream.seek(0)
try:
await self.maybe_import_mnemonic(stream, popup, show_fn)
return
except Exception as e:
print(e)
raise HostError("Can't find matching app for this request:\n\n %r" % stream.read(100))
# TODO: if more than one - ask which one to use
if len(matching_apps) > 1:
raise HostError(
Expand Down

0 comments on commit d838664

Please sign in to comment.