diff --git a/src/cryptoadvance/specter/controller.py b/src/cryptoadvance/specter/controller.py index 2e84437948..77b30401f4 100644 --- a/src/cryptoadvance/specter/controller.py +++ b/src/cryptoadvance/specter/controller.py @@ -3,7 +3,7 @@ import random, copy from collections import OrderedDict from threading import Thread - +from .key import Key from functools import wraps from flask import g, request, redirect, url_for @@ -13,10 +13,10 @@ from flask_login.config import EXEMPT_METHODS -from .helpers import normalize_xpubs, run_shell, set_loglevel, get_loglevel, get_version_info -from .descriptor import AddChecksum - -from .logic import Specter, purposes, addrtypes, get_cli, SpecterError +from .helpers import get_devices_with_keys_by_type, run_shell, set_loglevel, get_loglevel, get_version_info +from .specter import Specter +from .specter_error import SpecterError +from .wallet_manager import purposes from .rpc import RpcError from datetime import datetime import urllib @@ -40,7 +40,7 @@ def inject_debug(): @login_required def combine(wallet_alias): try: - wallet = app.specter.wallets.get_by_alias(wallet_alias) + wallet = app.specter.wallet_manager.get_by_alias(wallet_alias) except SpecterError as se: app.logger.error("SpecterError while combine: %s" % se) return render_template("base.jinja", error=se, specter=app.specter, rand=rand) @@ -65,18 +65,19 @@ def combine(wallet_alias): @login_required def broadcast(wallet_alias): try: - wallet = app.specter.wallets.get_by_alias(wallet_alias) + wallet = app.specter.wallet_manager.get_by_alias(wallet_alias) except SpecterError as se: app.logger.error("SpecterError while broadcast: %s" % se) return render_template("base.jinja", error=se, specter=app.specter, rand=rand) if request.method == 'POST': tx = request.form.get('tx') - if wallet.cli.testmempoolaccept([tx])[0]['allowed']: + res = wallet.cli.testmempoolaccept([tx])[0] + if res['allowed']: app.specter.broadcast(tx) wallet.delete_pending_psbt(wallet.cli.decoderawtransaction(tx)['txid']) return jsonify(success=True) else: - return jsonify(success=False, error="Failed to broadcast transaction: transaction is invalid") + return jsonify(success=False, error="Failed to broadcast transaction: transaction is invalid\n%s" % res["reject-reason"]) return jsonify(success=False, error="broadcast tx request must use POST") @app.route('/') @@ -84,11 +85,11 @@ def broadcast(wallet_alias): def index(): notify_upgrade() app.specter.check() - if len(app.specter.wallets) > 0: - return redirect("/wallets/%s" % app.specter.wallets[app.specter.wallets.names()[0]]["alias"]) + if len(app.specter.wallet_manager.wallets) > 0: + return redirect("/wallets/%s" % app.specter.wallet_manager.wallets[app.specter.wallet_manager.wallets_names[0]].alias) # TODO: add onboarding process - if not app.specter.devices: + if len(app.specter.device_manager.devices) == 0: # For now: can't do anything until a device is registered return redirect("/new_device/") @@ -211,7 +212,7 @@ def settings(): @app.route('/new_wallet/') @login_required -def new_wallet(): +def new_wallet_type(): app.specter.check() err = None if app.specter.chain is None: @@ -219,54 +220,116 @@ def new_wallet(): return render_template("base.jinja", error=err, specter=app.specter, rand=rand) return render_template("wallet/new_wallet/new_wallet_type.jinja", specter=app.specter, rand=rand) -@app.route('/new_wallet/simple/', methods=['GET', 'POST']) + +@app.route('/new_wallet//', methods=['GET', 'POST']) @login_required -def new_wallet_simple(): +def new_wallet(wallet_type): + wallet_types = ['simple', 'multisig'] + if wallet_type not in wallet_types: + err = "Unknown wallet type requested" + return render_template("base.jinja", specter=app.specter, rand=rand) app.specter.check() - name = "Simple" + name = wallet_type.title() wallet_name = name i = 2 err = None - while wallet_name in app.specter.wallets.names(): + while wallet_name in app.specter.wallet_manager.wallets_names: wallet_name = "%s %d" % (name, i) - i+=1 - device = None + i += 1 + + if wallet_type == "multisig": + sigs_total = len(app.specter.device_manager.devices) + if sigs_total < 2: + err = "You need more devices to do multisig" + return render_template("base.jinja", specter=app.specter, rand=rand) + sigs_required = sigs_total*2//3 + if sigs_required < 2: + sigs_required = 2 + else: + sigs_total = 1 + sigs_required = 1 + if request.method == 'POST': action = request.form['action'] wallet_name = request.form['wallet_name'] - if wallet_name in app.specter.wallets.names(): + if wallet_name in app.specter.wallet_manager.wallets_names: err = "Wallet already exists" - if "device" not in request.form: - err = "Select the device" - else: - device_name = request.form['device'] - wallet_type = request.form['type'] + address_type = request.form['type'] + pur = { + '': "General", + "wpkh": "Segwit (bech32)", + "sh-wpkh": "Nested Segwit", + "pkh": "Legacy", + "wsh": "Segwit (bech32)", + "sh-wsh": "Nested Segwit", + "sh": "Legacy", + } + sigs_total = int(request.form.get('sigs_total', 1)) + sigs_required = int(request.form.get('sigs_required', 1)) if action == 'device' and err is None: - dev = copy.deepcopy(app.specter.devices[device_name]) - prefix = "tpub" - if app.specter.chain == "main": - prefix = "xpub" - allowed_types = [None, wallet_type] - dev["keys"] = [k for k in dev["keys"] if k["xpub"].startswith(prefix) and k["type"] in allowed_types] - pur = { - None: "General", - "wpkh": "Segwit (bech32)", - "sh-wpkh": "Nested Segwit", - "pkh": "Legacy", - } - return render_template("wallet/new_wallet/new_wallet_keys.jinja", purposes=pur, wallet_type=wallet_type, wallet_name=wallet_name, device=dev, error=err, specter=app.specter, rand=rand) + cosigners = [app.specter.device_manager.get_by_alias(alias) for alias in request.form.getlist('devices')] + if len(cosigners) != sigs_total: + err = "Select the device" if sigs_total == 1 else "Select all the cosigners" + return render_template( + "wallet/new_wallet/new_wallet.jinja", + wallet_type=wallet_type, + wallet_name=wallet_name, + sigs_required=sigs_required, + sigs_total=sigs_total, + error=err, + specter=app.specter, + rand=rand + ) + devices = get_devices_with_keys_by_type(app, cosigners, address_type) + for device in devices: + if len(device.keys) == 0: + err = "Device %s doesn't have keys matching this wallet type" % device.name + break + return render_template( + "wallet/new_wallet/new_wallet_keys.jinja", + purposes=pur, + wallet_type=address_type, + wallet_name=wallet_name, + cosigners=devices, + sigs_required=sigs_required, + sigs_total=sigs_total, + error=err, + specter=app.specter, + rand=rand + ) if action == 'key' and err is None: - original_xpub = request.form['key'] - device = app.specter.devices[device_name] - key = None - for k in device["keys"]: - if k["original"] == original_xpub: - key = k - break - if key is None: - return render_template("base.jinja", error="Key not found", specter=app.specter, rand=rand) + keys = [] + cosigners = [] + devices = [] + for i in range(sigs_total): + try: + key = request.form['key%d' % i] + cosigner_name = request.form['cosigner%d' % i] + cosigner = app.specter.device_manager.get_by_alias(cosigner_name) + cosigners.append(cosigner) + for k in cosigner.keys: + if k.original == key: + keys.append(k) + break + except: + pass + if len(keys) != sigs_total or len(cosigners) != sigs_total: + devices = get_devices_with_keys_by_type(app, cosigners, address_type) + err = "Did you select enough keys?" + return render_template( + "wallet/new_wallet/new_wallet_keys.jinja", + purposes=pur, + wallet_type=address_type, + wallet_name=wallet_name, + cosigners=devices, + sigs_required=sigs_required, + sigs_total=sigs_total, + error=err, + specter=app.specter, + rand=rand + ) # create a wallet here - wallet = app.specter.wallets.create_simple(wallet_name, wallet_type, key, device) + wallet = app.specter.wallet_manager.create_wallet(wallet_name, sigs_required, address_type, keys, cosigners) app.logger.info("Created Wallet %s" % wallet_name) rescan_blockchain = 'rescanblockchain' in request.form if rescan_blockchain: @@ -284,112 +347,31 @@ def new_wallet_simple(): try: wallet.cli.rescanblockchain(startblock, timeout=1) except requests.exceptions.ReadTimeout: - # this is normal behaviour in our usecase + # this is normal behavior in our usecase pass except Exception as e: app.logger.error("Exception while rescanning blockchain: %e" % e) err = "%r" % e wallet.getdata() - return redirect("/wallets/%s/" % wallet["alias"]) - return render_template("wallet/new_wallet/new_wallet.jinja", wallet_name=wallet_name, device=device, error=err, specter=app.specter, rand=rand) - -@app.route('/new_wallet/multisig/', methods=['GET', 'POST']) -@login_required -def new_wallet_multi(): - app.specter.check() - name = "Multisig" - wallet_type = "wsh" - wallet_name = name - i = 2 - err = None - while wallet_name in app.specter.wallets.names(): - wallet_name = "%s %d" % (name, i) - i+=1 - - sigs_total = len(app.specter.devices) - if sigs_total < 2: - err = "You need more devices to do multisig" - return render_template("base.jinja", specter=app.specter, rand=rand) - sigs_required = sigs_total*2//3 - if sigs_required < 2: - sigs_required = 2 - cosigners = [] - keys = [] - - if request.method == 'POST': - action = request.form['action'] - wallet_name = request.form['wallet_name'] - sigs_required = int(request.form['sigs_required']) - sigs_total = int(request.form['sigs_total']) - if wallet_name in app.specter.wallets.names(): - err = "Wallet already exists" - wallet_type = request.form['type'] - pur = { - None: "General", - "wsh": "Segwit (bech32)", - "sh-wsh": "Nested Segwit", - "sh": "Legacy", - } - if action == 'device' and err is None: - cosigners = request.form.getlist('devices') - if len(cosigners) != sigs_total: - err = "Select all the cosigners" - else: - devs = [] - prefix = "tpub" - if app.specter.chain == "main": - prefix = "xpub" - for k in cosigners: - dev = copy.deepcopy(app.specter.devices[k]) - dev["keys"] = [k for k in dev["keys"] if k["xpub"].startswith(prefix) and (k["type"] is None or k["type"] == wallet_type)] - if len(dev["keys"]) == 0: - err = "Device %s doesn't have keys matching this wallet type" % dev["name"] - devs.append(dev) - return render_template("wallet/new_wallet/new_wallet_keys.jinja", purposes=pur, - wallet_type=wallet_type, wallet_name=wallet_name, - cosigners=devs, keys=keys, sigs_required=sigs_required, - sigs_total=sigs_total, - error=err, specter=app.specter, rand=rand) - if action == 'key' and err is None: - cosigners = [] - devs = [] - for i in range(sigs_total): - try: - key = request.form['key%d' % i] - cosigner_name = request.form['cosigner%d' % i] - cosigner = app.specter.devices[cosigner_name] - cosigners.append(cosigner) - for k in cosigner["keys"]: - if k["original"] == key: - keys.append(k) - break - except: - pass - if len(keys) != sigs_total or len(cosigners) != sigs_total: - prefix = "tpub" - if app.specter.chain == "main": - prefix = "xpub" - for k in cosigners: - dev = copy.deepcopy(k) - dev["keys"] = [k for k in dev["keys"] if k["xpub"].startswith(prefix) and (k["type"] is None or k["type"] == wallet_type)] - devs.append(dev) - err="Did you select all the keys?" - return render_template("wallet/new_wallet/new_wallet_keys.jinja", purposes=pur, - wallet_type=wallet_type, wallet_name=wallet_name, - cosigners=devs, keys=keys, sigs_required=sigs_required, - sigs_total=sigs_total, - error=err, specter=app.specter, rand=rand) - # create a wallet here - wallet = app.specter.wallets.create_multi(wallet_name, sigs_required, wallet_type, keys, cosigners) - return redirect("/wallets/%s/" % wallet["alias"]) - return render_template("wallet/new_wallet/new_wallet.jinja", cosigners=cosigners, wallet_type=wallet_type, wallet_name=wallet_name, error=err, sigs_required=sigs_required, sigs_total=sigs_total, specter=app.specter, rand=rand) + return redirect("/wallets/%s/" % wallet.alias) + + return render_template( + "wallet/new_wallet/new_wallet.jinja", + wallet_type=wallet_type, + wallet_name=wallet_name, + sigs_required=sigs_required, + sigs_total=sigs_total, + error=err, + specter=app.specter, + rand=rand + ) @app.route('/wallets//') @login_required def wallet(wallet_alias): app.specter.check() try: - wallet = app.specter.wallets.get_by_alias(wallet_alias) + wallet = app.specter.wallet_manager.get_by_alias(wallet_alias) except SpecterError as se: app.logger.error("SpecterError while wallet: %s" % se) return render_template("base.jinja", error=se, specter=app.specter, rand=rand) @@ -403,7 +385,7 @@ def wallet(wallet_alias): def wallet_tx(wallet_alias): app.specter.check() try: - wallet = app.specter.wallets.get_by_alias(wallet_alias) + wallet = app.specter.wallet_manager.get_by_alias(wallet_alias) except SpecterError as se: app.logger.error("SpecterError while wallet_tx: %s" % se) return render_template("base.jinja", error=se, specter=app.specter, rand=rand) @@ -414,7 +396,7 @@ def wallet_tx(wallet_alias): def wallet_addresses(wallet_alias): app.specter.check() try: - wallet = app.specter.wallets.get_by_alias(wallet_alias) + wallet = app.specter.wallet_manager.get_by_alias(wallet_alias) except SpecterError as se: app.logger.error("SpecterError while wallet_addresses: %s" % se) return render_template("base.jinja", error=se, specter=app.specter, rand=rand) @@ -427,7 +409,7 @@ def wallet_addresses(wallet_alias): if viewtype == 'address': wallet.setlabel(account, label) else: - for address in wallet.addressesonlabel(account): + for address in wallet.addresses_on_label(account): wallet.setlabel(address, label) wallet.getdata() alladdresses = True if request.args.get('all') != 'False' else False @@ -438,7 +420,7 @@ def wallet_addresses(wallet_alias): def wallet_receive(wallet_alias): app.specter.check() try: - wallet = app.specter.wallets.get_by_alias(wallet_alias) + wallet = app.specter.wallet_manager.get_by_alias(wallet_alias) except SpecterError as se: app.logger.error("SpecterError while wallet_receive: %s" % se) return render_template("base.jinja", error=se, specter=app.specter, rand=rand) @@ -448,8 +430,8 @@ def wallet_receive(wallet_alias): wallet.getnewaddress() elif action == "updatelabel": label = request.form['label'] - wallet.setlabel(wallet['address'], label) - if wallet.txoncurrentaddr > 0: + wallet.setlabel(wallet.address, label) + if wallet.tx_on_current_address > 0: wallet.getnewaddress() return render_template("wallet/receive/wallet_receive.jinja", wallet_alias=wallet_alias, wallet=wallet, specter=app.specter, rand=rand) @@ -464,7 +446,7 @@ def fees(blocks): def wallet_send(wallet_alias): app.specter.check() try: - wallet = app.specter.wallets.get_by_alias(wallet_alias) + wallet = app.specter.wallet_manager.get_by_alias(wallet_alias) except SpecterError as se: app.logger.error("SpecterError while wallet_send: %s" % se) return render_template("base.jinja", error=se, specter=app.specter, rand=rand) @@ -527,7 +509,7 @@ def wallet_send(wallet_alias): def wallet_sendpending(wallet_alias): app.specter.check() try: - wallet = app.specter.wallets.get_by_alias(wallet_alias) + wallet = app.specter.wallet_manager.get_by_alias(wallet_alias) except SpecterError as se: app.logger.error("SpecterError while wallet_sendpending: %s" % se) return render_template("base.jinja", error=se, specter=app.specter, rand=rand) @@ -551,7 +533,7 @@ def wallet_settings(wallet_alias): app.specter.check() error = None try: - wallet = app.specter.wallets.get_by_alias(wallet_alias) + wallet = app.specter.wallet_manager.get_by_alias(wallet_alias) except SpecterError as se: app.logger.error("SpecterError while wallet_receive: %s" % se) return render_template("base.jinja", error=se, specter=app.specter, rand=rand) @@ -575,40 +557,38 @@ def wallet_settings(wallet_alias): wallet.getdata() elif action == "keypoolrefill": delta = int(request.form['keypooladd']) - wallet.keypoolrefill(wallet["keypool"], wallet["keypool"]+delta) - wallet.keypoolrefill(wallet["change_keypool"], wallet["change_keypool"]+delta, change=True) + wallet.keypoolrefill(wallet.keypool, wallet.keypool + delta) + wallet.keypoolrefill(wallet.change_keypool, wallet.change_keypool + delta, change=True) wallet.getdata() elif action == "rebuildcache": wallet.cli.cache.rebuild_cache() elif action == "deletewallet": - app.specter.wallets.delete_wallet(wallet) + app.specter.wallet_manager.delete_wallet(wallet) response = redirect(url_for('index')) return response elif action == "rename": wallet_name = request.form['newtitle'] - if wallet_name in app.specter.wallets.names(): + if wallet_name in app.specter.wallet_manager.wallets_names: error = "Wallet already exists" else: - app.specter.wallets.rename_wallet(wallet, wallet_name) - - cc_file = None - qr_text = wallet["name"]+"&"+wallet.descriptor - if wallet.is_multisig: - cc_file = wallet.get_cc_file() - if cc_file is not None: - cc_file = urllib.parse.quote(cc_file) + app.specter.wallet_manager.rename_wallet(wallet, wallet_name) + return render_template("wallet/settings/wallet_settings.jinja", - cc_file=cc_file, - wallet_alias=wallet_alias, wallet=wallet, - specter=app.specter, rand=rand, - error=error, - qr_text=qr_text) + wallet_alias=wallet_alias, + wallet=wallet, + specter=app.specter, + rand=rand, + error=error + ) else: - return render_template("wallet/settings/wallet_settings.jinja", - wallet_alias=wallet_alias, wallet=wallet, - specter=app.specter, rand=rand, - error=error, - qr_text=qr_text) + return render_template( + "wallet/settings/wallet_settings.jinja", + wallet_alias=wallet_alias, + wallet=wallet, + specter=app.specter, + rand=rand, + error=error + ) ################# devices management ##################### @@ -625,62 +605,55 @@ def new_device(): device_name = request.form['device_name'] if not device_name: err = "Device name must not be empty" - elif device_name in app.specter.devices.names(): + elif device_name in app.specter.device_manager.devices_names: err = "Device with this name already exists" xpubs = request.form['xpubs'] if not xpubs: err = "xpubs name must not be empty" - normalized, parsed, failed = normalize_xpubs(xpubs) + keys, failed = Key.parse_xpubs(xpubs) if len(failed) > 0: err = "Failed to parse these xpubs:\n" + "\n".join(failed) if err is None: - dev = app.specter.devices.add(name=device_name, device_type=device_type, keys=normalized) - return redirect("/devices/%s/" % dev["alias"]) + device = app.specter.device_manager.add_device(name=device_name, device_type=device_type, keys=keys) + return redirect("/devices/%s/" % device.alias) return render_template("device/new_device.jinja", device_type=device_type, device_name=device_name, xpubs=xpubs, error=err, specter=app.specter, rand=rand) - -def get_key_meta(key): - k = copy.deepcopy(key) - k["chain"] = "Mainnet" if k["xpub"].startswith("xpub") else "Testnet" - k["purpose"] = purposes[k["type"]] - if k["derivation"] is not None: - k["combined"] = "[%s%s]%s" % (k["fingerprint"], k["derivation"][1:], k["xpub"]) - else: - k["combined"] = k["xpub"] - return k - @app.route('/devices//', methods=['GET', 'POST']) @login_required def device(device_alias): app.specter.check() + err = None try: - device = app.specter.devices.get_by_alias(device_alias) + device = app.specter.device_manager.get_by_alias(device_alias) except: return render_template("base.jinja", error="Device not found", specter=app.specter, rand=rand) + wallets = device.wallets(app.specter.wallet_manager) if request.method == 'POST': action = request.form['action'] if action == "forget": - app.specter.devices.remove(device) - return redirect("/") + if len(wallets) != 0: + err = "Device could not be removed since it is used in wallets: {}.\nYou must delete those wallets before you can remove this device.".format([wallet.name for wallet in wallets]) + else: + app.specter.device_manager.remove_device(device) + return redirect("/") if action == "delete_key": key = request.form['key'] - device.remove_key(key) + device.remove_key(Key.from_json({ 'original': key })) if action == "add_keys": - return render_template("device/new_device.jinja", device_alias=device_alias, device=device, device_type=device["type"], specter=app.specter, rand=rand) + return render_template("device/new_device.jinja", device=device, specter=app.specter, rand=rand) if action == "morekeys": # refactor to fn xpubs = request.form['xpubs'] - normalized, parsed, failed = normalize_xpubs(xpubs) + keys, failed = Key.parse_xpubs(xpubs) err = None if len(failed) > 0: err = "Failed to parse these xpubs:\n" + "\n".join(failed) - return render_template("device/new_device.jinja", device_alias=device_alias, device=device, xpubs=xpubs, device_type=device["type"], error=err, specter=app.specter, rand=rand) + return render_template("device/new_device.jinja", device=device, xpubs=xpubs, error=err, specter=app.specter, rand=rand) if err is None: - device.add_keys(normalized) + device.add_keys(keys) device = copy.deepcopy(device) - device["keys"] = [get_key_meta(key) for key in device["keys"]] - device["keys"].sort(key=lambda x: x["chain"]+x["purpose"], reverse=True) - return render_template("device/device.jinja", device_alias=device_alias, device=device, purposes=purposes, specter=app.specter, rand=rand) + device.keys.sort(key=lambda k: k.metadata["chain"] + k.metadata["purpose"], reverse=True) + return render_template("device/device.jinja", device=device, purposes=purposes, wallets=wallets, error=err, specter=app.specter, rand=rand) @@ -690,29 +663,11 @@ def device(device_alias): def timedatetime(s): return format(datetime.fromtimestamp(s), "%d.%m.%Y %H:%M") -@app.template_filter('derivation') -def derivation(wallet): - s = "address=m/0/{}\n".format(wallet['address_index']) - if wallet.is_multisig: - s += "type={}".format(MSIG_TYPES[wallet['address_type']]) - for k in wallet['keys']: - s += "\n{}{}".format(k['fingerprint'], k['derivation'][1:]) - else: - s += "type={}".format(SINGLE_TYPES[wallet['address_type']]) - k = wallet['key'] - s += "\n{}{}".format(k['fingerprint'], k['derivation'][1:]) - return s - -@app.template_filter('prettyjson') -def txonaddr(obj): - return json.dumps(obj, indent=4) - @app.template_filter('btcamount') def btcamount(value): value = float(value) return "{:.8f}".format(value).rstrip("0").rstrip(".") - def notify_upgrade(): ''' If a new version is available, notifies the user via flash that there is an upgrade to specter.desktop diff --git a/src/cryptoadvance/specter/device.py b/src/cryptoadvance/specter/device.py new file mode 100644 index 0000000000..6e72db1954 --- /dev/null +++ b/src/cryptoadvance/specter/device.py @@ -0,0 +1,127 @@ +import json +from .helpers import decode_base58, hash160 +from .serializations import PSBT +from .key import Key + + +class Device: + QR_CODE_TYPES = ['specter', 'other'] + SD_CARD_TYPES = ['coldcard', 'other'] + HWI_TYPES = ['keepkey', 'ledger', 'trezor', 'specter', 'coldcard'] + + def __init__(self, name, alias, device_type, keys, fullpath, manager): + self.name = name + self.alias = alias + self.device_type = device_type + self.keys = keys + self.fullpath = fullpath + self.manager = manager + + @classmethod + def from_json(cls, device_dict, manager, default_alias='', default_fullpath=''): + name = device_dict['name'] if 'name' in device_dict else '' + alias = device_dict['alias'] if 'alias' in device_dict else default_alias + device_type = device_dict['type'] if 'type' in device_dict else '' + keys = [Key.from_json(key_dict) for key_dict in device_dict['keys']] + fullpath = device_dict['fullpath'] if 'fullpath' in device_dict else default_fullpath + return cls(name, alias, device_type, keys, fullpath, manager) + + @property + def json(self): + return { + "name": self.name, + "alias": self.alias, + "type": self.device_type, + "keys": [key.json for key in self.keys], + "fullpath": self.fullpath, + } + + def _update_keys(self): + with open(self.fullpath, "r") as f: + content = json.loads(f.read()) + content['keys'] = [key.json for key in self.keys] + with open(self.fullpath, "w") as f: + f.write(json.dumps(content,indent=4)) + self.manager.update() + + def remove_key(self, key): + self.keys = [k for k in self.keys if k != key] + self._update_keys() + + def add_keys(self, keys): + for key in keys: + if key not in self.keys: + self.keys.append(key) + self._update_keys() + + def wallets(self, wallet_manager): + wallets = [] + for wallet in wallet_manager.wallets.values(): + if self in wallet.devices: + wallets.append(wallet) + return wallets + + @staticmethod + def create_sdcard_psbt(base64_psbt, keys): + sdcard_psbt = PSBT() + sdcard_psbt.deserialize(base64_psbt) + if len(keys) > 1: + for k in keys: + key = b'\x01' + decode_base58(k.xpub) + if k.fingerprint != '': + fingerprint = bytes.fromhex(k.fingerprint) + else: + fingerprint = _get_xpub_fingerprint(k.xpub) + if k.derivation != '': + der = _der_to_bytes(k.derivation) + else: + der = b'' + value = fingerprint + der + sdcard_psbt.unknown[key] = value + return sdcard_psbt.serialize() + + @staticmethod + def create_qrcode_psbt(base64_psbt, fingerprint): + qr_psbt = PSBT() + qr_psbt.deserialize(base64_psbt) + for inp in qr_psbt.inputs + qr_psbt.outputs: + inp.witness_script = b"" + inp.redeem_script = b"" + if len(inp.hd_keypaths) > 0: + k = list(inp.hd_keypaths.keys())[0] + # proprietary field - wallet derivation path + # only contains two last derivation indexes - change and index + inp.unknown[b"\xfc\xca\x01" + fingerprint] = b"".join([i.to_bytes(4, "little") for i in inp.hd_keypaths[k][-2:]]) + inp.hd_keypaths = {} + return qr_psbt.serialize() + + def __eq__(self, other): + if other is None: + return False + return self.alias == other.alias + + def __hash__(self): + return hash(self.alias) + + +def _get_xpub_fingerprint(xpub): + b = decode_base58(xpub) + return hash160(b[-33:])[:4] + +def _der_to_bytes(derivation): + items = derivation.split("/") + if len(items) == 0: + return b'' + if items[0] == 'm': + items = items[1:] + if items[-1] == '': + items = items[:-1] + res = b'' + for item in items: + index = 0 + if item[-1] == 'h' or item[-1] == "'": + index += 0x80000000 + item = item[:-1] + index += int(item) + res += index.to_bytes(4,'big') + return res diff --git a/src/cryptoadvance/specter/device_manager.py b/src/cryptoadvance/specter/device_manager.py new file mode 100644 index 0000000000..0203987c52 --- /dev/null +++ b/src/cryptoadvance/specter/device_manager.py @@ -0,0 +1,86 @@ +import os, json, logging +from .device import Device +from .devices.coldcard import ColdCard +from .devices.trezor import Trezor +from .devices.ledger import Ledger +from .devices.keepkey import Keepkey +from .devices.specter import Specter +from .helpers import alias, load_jsons + + +logger = logging.getLogger(__name__) + +device_classes = { + 'coldcard': ColdCard, + 'trezor': Trezor, + 'keepkey': Keepkey, + 'ledger': Ledger, + 'specter': Specter +} + +def get_device_class(device_type): + if device_type in device_classes: + return device_classes[device_type] + return Device + +class DeviceManager: + ''' A DeviceManager mainly manages the persistence of a device-json-structures + compliant to helper.load_jsons + ''' + # of them via json-files in an empty data folder + def __init__(self, data_folder): + if data_folder is not None: + self.data_folder = data_folder + if data_folder.startswith("~"): + data_folder = os.path.expanduser(data_folder) + # creating folders if they don't exist + if not os.path.isdir(data_folder): + os.mkdir(data_folder) + self.update() + + def update(self): + self.devices = {} + devices_files = load_jsons(self.data_folder, key="name") + for device_alias in devices_files: + fullpath = os.path.join(self.data_folder, "%s.json" % device_alias) + self.devices[devices_files[device_alias]["name"]] = get_device_class(devices_files[device_alias]["type"]).from_json( + devices_files[device_alias], + self, + default_alias=device_alias, + default_fullpath=fullpath + ) + + @property + def devices_names(self): + return sorted(self.devices.keys()) + + def add_device(self, name, device_type, keys): + device_alias = alias(name) + fullpath = os.path.join(self.data_folder, "%s.json" % device_alias) + i = 2 + while os.path.isfile(fullpath): + device_alias = alias("%s %d" % (name, i)) + fullpath = os.path.join(self.data_folder, "%s.json" % device_alias) + i += 1 + # remove duplicated keys if any exist + non_dup_keys = [] + for key in keys: + if key not in non_dup_keys: + non_dup_keys.append(key) + keys = non_dup_keys + device = get_device_class(device_type)(name, device_alias, device_type, keys, fullpath, self) + with open(fullpath, "w") as file: + file.write(json.dumps(device.json, indent=4)) + + self.update() # reload files + return device + + def get_by_alias(self, device_alias): + for device_name in self.devices: + if self.devices[device_name].alias == device_alias: + return self.devices[device_name] + logger.error("Could not find Device %s" % device_alias) + + def remove_device(self, device): + os.remove(device.fullpath) + self.update() diff --git a/src/cryptoadvance/specter/devices/coldcard.py b/src/cryptoadvance/specter/devices/coldcard.py new file mode 100644 index 0000000000..f34c770193 --- /dev/null +++ b/src/cryptoadvance/specter/devices/coldcard.py @@ -0,0 +1,53 @@ +import urllib +from .sd_card_device import SDCardDevice +from ..helpers import get_xpub_fingerprint + + +CC_TYPES = { + 'legacy': 'BIP45', + 'p2sh-segwit': 'P2WSH-P2SH', + 'bech32': 'P2WSH' +} +class ColdCard(SDCardDevice): + def __init__(self, name, alias, device_type, keys, fullpath, manager): + SDCardDevice.__init__(self, name, alias, 'coldcard', keys, fullpath, manager) + self.sd_card_support = True + self.qr_code_support = False + self.wallet_export_type = 'file' + + def create_psbts(self, base64_psbt, wallet): + psbts = SDCardDevice.create_psbts(self, base64_psbt, wallet) + return psbts + + def export_wallet(self, wallet): + CC_TYPES = { + 'legacy': 'BIP45', + 'p2sh-segwit': 'P2WSH-P2SH', + 'bech32': 'P2WSH' + } + # try to find at least one derivation + # cc assume the same derivation for all keys :( + derivation = None + for k in wallet.keys: + if k.derivation != '': + derivation = k.derivation.replace("h","'") + break + if derivation is None: + return None + cc_file = """# Coldcard Multisig setup file (created on Specter Desktop) +# +Name: {} +Policy: {} of {} +Derivation: {} +Format: {} +""".format(wallet.name, wallet.sigs_required, + len(wallet.keys), derivation, + CC_TYPES[wallet.address_type] + ) + for k in wallet.keys: + # cc assumes fingerprint is known + fingerprint = k.fingerprint + if fingerprint == '': + fingerprint = get_xpub_fingerprint(k.xpub).hex() + cc_file += "{}: {}\n".format(fingerprint.upper(), k.xpub) + return urllib.parse.quote(cc_file) diff --git a/src/cryptoadvance/specter/devices/hwi_device.py b/src/cryptoadvance/specter/devices/hwi_device.py new file mode 100644 index 0000000000..07b1346264 --- /dev/null +++ b/src/cryptoadvance/specter/devices/hwi_device.py @@ -0,0 +1,11 @@ +from ..device import Device + + +class HWIDevice(Device): + def __init__(self, name, alias, device_type, keys, fullpath, manager): + Device.__init__(self, name, alias, device_type, keys, fullpath, manager) + self.hwi_support = True + self.exportable_to_wallet = False + + def create_psbts(self, base64_psbt, wallet): + return { 'hwi': base64_psbt } diff --git a/src/cryptoadvance/specter/devices/keepkey.py b/src/cryptoadvance/specter/devices/keepkey.py new file mode 100644 index 0000000000..fd4b1996f1 --- /dev/null +++ b/src/cryptoadvance/specter/devices/keepkey.py @@ -0,0 +1,7 @@ +from .hwi_device import HWIDevice + +class Keepkey(HWIDevice): + def __init__(self, name, alias, device_type, keys, fullpath, manager): + HWIDevice.__init__(self, name, alias, 'keepkey', keys, fullpath, manager) + self.sd_card_support = False + self.qr_code_support = False diff --git a/src/cryptoadvance/specter/devices/ledger.py b/src/cryptoadvance/specter/devices/ledger.py new file mode 100644 index 0000000000..05f285efdd --- /dev/null +++ b/src/cryptoadvance/specter/devices/ledger.py @@ -0,0 +1,7 @@ +from .hwi_device import HWIDevice + +class Ledger(HWIDevice): + def __init__(self, name, alias, device_type, keys, fullpath, manager): + HWIDevice.__init__(self, name, alias, 'ledger', keys, fullpath, manager) + self.sd_card_support = False + self.qr_code_support = False diff --git a/src/cryptoadvance/specter/devices/sd_card_device.py b/src/cryptoadvance/specter/devices/sd_card_device.py new file mode 100644 index 0000000000..86b710ed8f --- /dev/null +++ b/src/cryptoadvance/specter/devices/sd_card_device.py @@ -0,0 +1,52 @@ +from .hwi_device import HWIDevice +from ..helpers import decode_base58, get_xpub_fingerprint, hash160 +from ..serializations import PSBT + + +class SDCardDevice(HWIDevice): + def __init__(self, name, alias, device_type, keys, fullpath, manager): + HWIDevice.__init__(self, name, alias, device_type, keys, fullpath, manager) + self.sd_card_support = True + self.exportable_to_wallet = True + + def create_psbts(self, base64_psbt, wallet): + psbts = HWIDevice.create_psbts(self, base64_psbt, wallet) + sdcard_psbt = PSBT() + sdcard_psbt.deserialize(base64_psbt) + if len(wallet.keys) > 1: + for k in wallet.keys: + key = b'\x01' + decode_base58(k.xpub) + if k.fingerprint != '': + fingerprint = bytes.fromhex(k.fingerprint) + else: + fingerprint = _get_xpub_fingerprint(k.xpub) + if k.derivation != '': + der = _der_to_bytes(k.derivation) + else: + der = b'' + value = fingerprint + der + sdcard_psbt.unknown[key] = value + psbts['sdcard'] = sdcard_psbt.serialize() + return psbts + +def _get_xpub_fingerprint(xpub): + b = decode_base58(xpub) + return hash160(b[-33:])[:4] + +def _der_to_bytes(derivation): + items = derivation.split("/") + if len(items) == 0: + return b'' + if items[0] == 'm': + items = items[1:] + if items[-1] == '': + items = items[:-1] + res = b'' + for item in items: + index = 0 + if item[-1] == 'h' or item[-1] == "'": + index += 0x80000000 + item = item[:-1] + index += int(item) + res += index.to_bytes(4,'big') + return res diff --git a/src/cryptoadvance/specter/devices/specter.py b/src/cryptoadvance/specter/devices/specter.py new file mode 100644 index 0000000000..e38c502d7b --- /dev/null +++ b/src/cryptoadvance/specter/devices/specter.py @@ -0,0 +1,39 @@ +import hashlib +from .sd_card_device import SDCardDevice +from ..serializations import PSBT + + +class Specter(SDCardDevice): + def __init__(self, name, alias, device_type, keys, fullpath, manager): + SDCardDevice.__init__(self, name, alias, 'specter', keys, fullpath, manager) + self.sd_card_support = False + self.qr_code_support = True + self.wallet_export_type = 'qr' + + def create_psbts(self, base64_psbt, wallet): + psbts = SDCardDevice.create_psbts(self, base64_psbt, wallet) + qr_psbt = PSBT() + qr_psbt.deserialize(base64_psbt) + for inp in qr_psbt.inputs + qr_psbt.outputs: + inp.witness_script = b"" + inp.redeem_script = b"" + if len(inp.hd_keypaths) > 0: + k = list(inp.hd_keypaths.keys())[0] + # proprietary field - wallet derivation path + # only contains two last derivation indexes - change and index + inp.unknown[b"\xfc\xca\x01" + get_wallet_fingerprint(wallet)] = b"".join([i.to_bytes(4, "little") for i in inp.hd_keypaths[k][-2:]]) + inp.hd_keypaths = {} + psbts['qrcode'] = qr_psbt.serialize() + return psbts + + def export_wallet(self, wallet): + return wallet.name + "&" + get_wallet_qr_descriptor(wallet) + +def get_wallet_qr_descriptor(wallet): + return wallet.recv_descriptor.split("#")[0].replace("/0/*", "") + +def get_wallet_fingerprint(wallet): + """ Unique fingerprint of the wallet - first 4 bytes of hash160 of its descriptor """ + h256 = hashlib.sha256(get_wallet_qr_descriptor(wallet).encode()).digest() + h160 = hashlib.new('ripemd160', h256).digest() + return h160[:4] diff --git a/src/cryptoadvance/specter/devices/trezor.py b/src/cryptoadvance/specter/devices/trezor.py new file mode 100644 index 0000000000..3eb14fd657 --- /dev/null +++ b/src/cryptoadvance/specter/devices/trezor.py @@ -0,0 +1,7 @@ +from .hwi_device import HWIDevice + +class Trezor(HWIDevice): + def __init__(self, name, alias, device_type, keys, fullpath, manager): + HWIDevice.__init__(self, name, alias, 'trezor', keys, fullpath, manager) + self.sd_card_support = False + self.qr_code_support = False diff --git a/src/cryptoadvance/specter/helpers.py b/src/cryptoadvance/specter/helpers.py index fd5f561e14..f31488692b 100644 --- a/src/cryptoadvance/specter/helpers.py +++ b/src/cryptoadvance/specter/helpers.py @@ -1,13 +1,7 @@ -import collections -import hashlib -import json -import logging -import os -import subprocess -import sys +import collections, copy, hashlib, json, logging, os, six, subprocess, sys from collections import OrderedDict +from .descriptor import AddChecksum -import six try: collectionsAbc = collections.abc @@ -15,6 +9,9 @@ collectionsAbc = collections logger = logging.getLogger(__name__) +def alias(name): + name = name.replace(" ", "_") + return "".join(x for x in name if x.isalnum() or x=="_").lower() def deep_update(d, u): for k, v in six.iteritems(u): @@ -44,23 +41,6 @@ def load_jsons(folder, key=None): BASE58_ALPHABET = b'123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' -VALID_PREFIXES = { - b"\x04\x35\x87\xcf": { # testnet - b"\x04\x35\x87\xcf": None, # unknown, maybe pkh - b"\x04\x4a\x52\x62": "sh-wpkh", - b"\x04\x5f\x1c\xf6": "wpkh", - b"\x02\x42\x89\xef": "sh-wsh", - b"\x02\x57\x54\x83": "wsh", - }, - b"\x04\x88\xb2\x1e": { # mainnet - b"\x04\x88\xb2\x1e": None, #unknown, maybe pkh - b"\x04\x9d\x7c\xb2": "sh-wpkh", - b"\x04\xb2\x47\x46": "wpkh", - b"\x02\x95\xb4\x3f": "sh-wsh", - b"\x02\xaa\x7e\xd3": "wsh", - } -} - def double_sha256(s): return hashlib.sha256(hashlib.sha256(s).digest()).digest() @@ -112,89 +92,6 @@ def get_xpub_fingerprint(xpub): b = decode_base58(xpub) return hash160(b[-33:])[:4] -def parse_xpub(xpub): - r = {"derivation": None} - derivation = None - arr = xpub.strip().split("]") - r["original"] = arr[-1] - if len(arr) > 1: - derivation = arr[0].replace("'","h").lower() - xpub = arr[1] - if derivation is not None: - if derivation[0]!="[": - raise Exception("Missing leading [") - arr = derivation[1:].split("/") - try: - fng = bytes.fromhex(arr[0].replace("-","")) # coldcard has hexstrings like 7c-2c-8e-1b - except: - raise Exception("Fingerprint is not hex") - if len(fng) != 4: - raise Exception("Incorrect fingerprint length") - r["fingerprint"] = arr[0] - for der in arr[1:]: - if der[-1] == "h": - der = der[:-1] - try: - i = int(der) - except: - print("index") - raise Exception("Incorrect index") - arr[0] = "m" - r["derivation"] = "/".join(arr) - # checking xpub prefix and defining key type - b = decode_base58(xpub, num_bytes=82) - prefix = b[:4] - is_valid = False - key_type = None - for k in VALID_PREFIXES: - if prefix in VALID_PREFIXES[k].keys(): - key_type = VALID_PREFIXES[k][prefix] - prefix = k - is_valid = True - break - if not is_valid: - raise Exception("Invalid xpub prefix: %s", prefix.hex()) - - # defining key type from derivation - if r["derivation"] is not None and key_type is None: - arr = r["derivation"].split("/") - purpose = arr[1] - if purpose == "44h": - key_type = "pkh" - elif purpose == "49h": - key_type = "sh-wpkh" - elif purpose == "84h": - key_type = "wpkh" - elif purpose == "45h": - key_type = "sh" - elif purpose == "48h": - if len(arr)>=5: - if arr[4] == "1h": - key_type = "sh-wsh" - elif arr[4] == "2h": - key_type = "wsh" - r["type"] = key_type - - b = prefix + b[4:] - r["xpub"] = encode_base58_checksum(b) - return r - -def normalize_xpubs(xpubs): - xpubs = xpubs - lines = [l.strip() for l in xpubs.split("\n") if len(l) > 0] - parsed = [] - failed = [] - normalized = [] - for line in lines: - try: - x = parse_xpub(line) - normalized.append(x) - parsed.append(line) - except Exception as e: - failed.append(line + "\n" + str(e)) - return (normalized, parsed, failed) - - def which(program): ''' mimics the "which" command in bash but even for stuff not on the path. Also has implicit pyinstaller support @@ -307,3 +204,72 @@ def save_hwi_bridge_config(specter, config): config['whitelisted_domains'] = whitelisted_domains with open(os.path.join(specter.data_folder, 'hwi_bridge_config.json'), "w") as f: f.write(json.dumps(config, indent=4)) + +def der_to_bytes(derivation): + items = derivation.split("/") + if len(items) == 0: + return b'' + if items[0] == 'm': + items = items[1:] + if items[-1] == '': + items = items[:-1] + res = b'' + for item in items: + index = 0 + if item[-1] == 'h' or item[-1] == "'": + index += 0x80000000 + item = item[:-1] + index += int(item) + res += index.to_bytes(4,'big') + return res + +def get_devices_with_keys_by_type(app, cosigners, wallet_type): + devices = [] + prefix = "tpub" + if app.specter.chain == "main": + prefix = "xpub" + for cosigner in cosigners: + device = copy.deepcopy(cosigner) + allowed_types = ['', wallet_type] + device.keys = [key for key in device.keys if key.xpub.startswith(prefix) and key.key_type in allowed_types] + devices.append(device) + return devices + +def sort_descriptor(cli, descriptor, index=None, change=False): + descriptor = descriptor.replace("sortedmulti", "multi") + if index is not None: + descriptor = descriptor.replace("*", f"{index}") + # remove checksum + descriptor = descriptor.split("#")[0] + # get address (should be already imported to the wallet) + address = cli.deriveaddresses(AddChecksum(descriptor), change=change)[0] + + # get pubkeys involved + address_info = cli.getaddressinfo(address) + if 'pubkeys' in address_info: + pubkeys = address_info["pubkeys"] + elif 'embedded' in address_info and 'pubkeys' in address_info['embedded']: + pubkeys = address_info["embedded"]["pubkeys"] + else: + raise Exception("Could not find 'pubkeys' in address info:\n%s" % json.dumps(address_info, indent=2)) + + # get xpubs from the descriptor + arr = descriptor.split("(multi(")[1].split(")")[0].split(",") + + # getting [wsh] or [sh, wsh] + prefix = descriptor.split("(multi(")[0].split("(") + sigs_required = arr[0] + keys = arr[1:] + + # sort them according to sortedmulti + z = sorted(zip(pubkeys,keys), key=lambda x: x[0]) + keys = [zz[1] for zz in z] + inner = f"{sigs_required},"+",".join(keys) + desc = f"multi({inner})" + + # Write from the inside out + prefix.reverse() + for p in prefix: + desc = f"{p}({desc})" + + return AddChecksum(desc) diff --git a/src/cryptoadvance/specter/hwi_rpc.py b/src/cryptoadvance/specter/hwi_rpc.py index 5d408551c9..ba1c5fc26c 100644 --- a/src/cryptoadvance/specter/hwi_rpc.py +++ b/src/cryptoadvance/specter/hwi_rpc.py @@ -1,7 +1,7 @@ from hwilib.serializations import PSBT import hwilib.commands as hwi_commands from hwilib import bech32 -from .helpers import normalize_xpubs, convert_xpub_prefix +from .helpers import convert_xpub_prefix from .specter_hwi import SpecterClient, enumerate as specter_enumerate from .json_rpc import JSONRPC diff --git a/src/cryptoadvance/specter/key.py b/src/cryptoadvance/specter/key.py new file mode 100644 index 0000000000..c2b0b9d35e --- /dev/null +++ b/src/cryptoadvance/specter/key.py @@ -0,0 +1,159 @@ +from collections import OrderedDict +from .helpers import decode_base58, encode_base58_checksum + + +purposes = OrderedDict({ + '': "General", + "wpkh": "Single (Segwit)", + "sh-wpkh": "Single (Nested)", + "pkh": "Single (Legacy)", + "wsh": "Multisig (Segwit)", + "sh-wsh": "Multisig (Nested)", + "sh": "Multisig (Legacy)", +}) + +VALID_PREFIXES = { + b"\x04\x35\x87\xcf": { # testnet + b"\x04\x35\x87\xcf": '', # unknown, maybe pkh + b"\x04\x4a\x52\x62": "sh-wpkh", + b"\x04\x5f\x1c\xf6": "wpkh", + b"\x02\x42\x89\xef": "sh-wsh", + b"\x02\x57\x54\x83": "wsh", + }, + b"\x04\x88\xb2\x1e": { # mainnet + b"\x04\x88\xb2\x1e": '', # unknown, maybe pkh + b"\x04\x9d\x7c\xb2": "sh-wpkh", + b"\x04\xb2\x47\x46": "wpkh", + b"\x02\x95\xb4\x3f": "sh-wsh", + b"\x02\xaa\x7e\xd3": "wsh", + } +} + +class Key: + def __init__(self, original, fingerprint, derivation, key_type, xpub): + if key_type not in purposes: + raise Exception('Invalid key type specified: {}.') + self.original = original + self.fingerprint = fingerprint + self.derivation = derivation + self.key_type = key_type + self.xpub = xpub + + @classmethod + def from_json(cls, key_dict): + original = key_dict['original'] if 'original' in key_dict else '' + fingerprint = key_dict['fingerprint'] if 'fingerprint' in key_dict else '' + derivation = key_dict['derivation'] if 'derivation' in key_dict else '' + key_type = key_dict['type'] if 'type' in key_dict else '' + xpub = key_dict['xpub'] if 'xpub' in key_dict else '' + return cls(original, fingerprint, derivation, key_type, xpub) + + @classmethod + def parse_xpub(cls, xpub): + derivation = '' + arr = xpub.strip().split("]") + original = arr[-1] + if len(arr) > 1: + derivation = arr[0].replace("'","h").lower() + xpub = arr[1] + + fingerprint = '' + if derivation != '': + if derivation[0] != "[": + raise Exception("Missing leading [") + derivation_path = derivation[1:].split("/") + try: + fng = bytes.fromhex(derivation_path[0].replace("-","")) # coldcard has hexstrings like 7c-2c-8e-1b + except: + raise Exception("Fingerprint is not hex") + if len(fng) != 4: + raise Exception("Incorrect fingerprint length") + fingerprint = derivation_path[0] + for path in derivation_path[1:]: + if path[-1] == "h": + path = path[:-1] + try: + i = int(path) + except: + raise Exception("Incorrect index") + derivation_path[0] = "m" + derivation = "/".join(derivation_path) + + # checking xpub prefix and defining key type + xpub_bytes = decode_base58(xpub, num_bytes=82) + prefix = xpub_bytes[:4] + is_valid = False + key_type = '' + for k in VALID_PREFIXES: + if prefix in VALID_PREFIXES[k].keys(): + key_type = VALID_PREFIXES[k][prefix] + prefix = k + is_valid = True + break + if not is_valid: + raise Exception("Invalid xpub prefix: %s", prefix.hex()) + + xpub_bytes = prefix + xpub_bytes[4:] + xpub = encode_base58_checksum(xpub_bytes) + + # defining key type from derivation + if derivation != '' and key_type == '': + derivation_path = derivation.split("/") + purpose = derivation_path[1] + if purpose == "44h": + key_type = "pkh" + elif purpose == "49h": + key_type = "sh-wpkh" + elif purpose == "84h": + key_type = "wpkh" + elif purpose == "45h": + key_type = "sh" + elif purpose == "48h": + if len(derivation_path) >= 5: + if derivation_path[4] == "1h": + key_type = "sh-wsh" + elif derivation_path[4] == "2h": + key_type = "wsh" + + return cls(original, fingerprint, derivation, key_type, xpub) + + @classmethod + def parse_xpubs(cls, xpubs): + xpubs = xpubs + lines = [l.strip() for l in xpubs.split("\n") if len(l) > 0] + failed = [] + keys = [] + for line in lines: + try: + keys.append(Key.parse_xpub(line)) + except Exception as e: + failed.append(line + "\n" + str(e)) + return keys, failed + + + @property + def metadata(self): + metadata = {} + metadata["chain"] = "Mainnet" if self.xpub.startswith("xpub") else "Testnet" + metadata["purpose"] = purposes[self.key_type] + if self.derivation is not None: + metadata["combined"] = "[%s%s]%s" % (self.fingerprint, self.derivation[1:], self.xpub) + else: + metadata["combined"] = self.xpub + return metadata + + @property + def json(self): + return { + 'original': self.original, + 'fingerprint': self.fingerprint, + 'derivation': self.derivation, + 'type': self.key_type, + 'xpub': self.xpub + } + + def __eq__(self, other): + return self.original == other.original + + def __hash__(self): + return hash(self.original) \ No newline at end of file diff --git a/src/cryptoadvance/specter/logic.py b/src/cryptoadvance/specter/logic.py deleted file mode 100644 index 1d304096ee..0000000000 --- a/src/cryptoadvance/specter/logic.py +++ /dev/null @@ -1,1179 +0,0 @@ -import base64 -import copy -import hashlib -import json -import logging -import os -import random -import shutil -from requests import ConnectionError -from collections import OrderedDict -from time import time - -from . import helpers -from .descriptor import AddChecksum -from .helpers import deep_update, get_xpub_fingerprint, load_jsons -from .rpc import RPC_PORTS, autodetect_cli_confs, get_default_datadir, RpcError -from .rpc_cache import BitcoinCLICached -from .serializations import PSBT - -logger = logging.getLogger(__name__) - -# a gap of 20 addresses is what many wallets do -WALLET_CHUNK = 20 -# we don't need to scan earlier than that -# as we don't support legacy wallets -FIRST_SEGWIT_BLOCK = 481824 - -purposes = OrderedDict({ - None: "General", - "wpkh": "Single (Segwit)", - "sh-wpkh": "Single (Nested)", - "pkh": "Single (Legacy)", - "wsh": "Multisig (Segwit)", - "sh-wsh": "Multisig (Nested)", - "sh": "Multisig (Legacy)", -}) - -addrtypes = { - "pkh": "legacy", - "sh-wpkh": "p2sh-segwit", - "wpkh": "bech32", - "sh": "legacy", - "sh-wsh": "p2sh-segwit", - "wsh": "bech32" -} - -def alias(name): - name = name.replace(" ", "_") - return "".join(x for x in name if x.isalnum() or x=="_").lower() - -def get_cli(conf): - if "user" not in conf or conf["user"]=="": - conf["autodetect"] = True - if conf["autodetect"]: - if "port" in conf: - cli_conf_arr = autodetect_cli_confs(port=conf["port"]) - else: - cli_conf_arr = autodetect_cli_confs() - if len(cli_conf_arr) > 0: - cli = BitcoinCLICached(**cli_conf_arr[0]) - else: - return None - else: - cli = BitcoinCLICached(conf["user"], conf["password"], - host=conf["host"], port=conf["port"], protocol=conf["protocol"]) - return cli - -class SpecterError(Exception): - ''' A SpecterError contains meaningfull messages which can be passed directly to the user ''' - pass - -class Specter: - ''' A central Object mostly holding app-settings ''' - CONFIG_FILE_NAME = "config.json" - def __init__(self, data_folder="./data", config={}): - if data_folder.startswith("~"): - data_folder = os.path.expanduser(data_folder) - self.data_folder = data_folder - self.cli = None - self.devices = None - self.wallets = None - - self.file_config = None # what comes from config file - self.arg_config = config # what comes from arguments - - # default config - self.config = { - "rpc": { - "autodetect": True, - "user": "", - "password": "", - "port": "", - "host": "localhost", # localhost - "protocol": "http" # https for the future - }, - "auth": "none", - "explorers": { - "main": "", - "test": "", - "regtest": "", - "signet": "" - }, - "hwi_bridge_url": "/hwi/api/", - # unique id that will be used in wallets path in Bitcoin Core - # empty by default for backward-compatibility - "uid": "", - } - - # creating folders if they don't exist - if not os.path.isdir(data_folder): - os.makedirs(data_folder) - - self._info = { "chain": None } - # health check: loads config and tests rpc - self.check() - - def check(self): - - # if config.json file exists - load from it - if os.path.isfile(os.path.join(self.data_folder, "config.json")): - with open(os.path.join(self.data_folder, "config.json"), "r") as f: - self.file_config = json.loads(f.read()) - deep_update(self.config, self.file_config) - # otherwise - create one and assign unique id - else: - if self.config["uid"] == "": - self.config["uid"] = random.randint(0,256**8).to_bytes(8,'big').hex() - self._save() - - # init arguments - deep_update(self.config, self.arg_config) # override loaded config - - self.cli = get_cli(self.config["rpc"]) - self._is_configured = (self.cli is not None) - self._is_running = False - if self._is_configured: - try: - self._info = self.cli.getblockchaininfo() - self._is_running = True - except Exception as e: - logger.error("Exception %s while specter.check()" % e) - pass - - if not self._is_running: - self._info["chain"] = None - - chain = self._info["chain"] - if self.wallets is None or chain is None: - wallets_path = "specter%s" % self.config["uid"] - self.wallets = WalletManager( - os.path.join(self.data_folder, "wallets"), - self.cli, - chain=chain, - path=wallets_path) - else: - self.wallets.update(os.path.join(self.data_folder, "wallets"), - self.cli, - chain=chain) - - if self.devices is None: - self.devices = DeviceManager(os.path.join(self.data_folder, "devices")) - else: - self.devices.update(os.path.join(self.data_folder, "devices")) - - try: - self.wallets.load_all() - except Exception as e: - logger.error("can't load wallets: %s " % e) - - def test_rpc(self, **kwargs): - conf = copy.deepcopy(self.config["rpc"]) - conf.update(kwargs) - cli = get_cli(conf) - if cli is None: - return {"out": "", "err": "autodetect failed", "code": -1} - r = {} - r['tests'] = {} - try: - r['tests']['recent_version'] = int(cli.getnetworkinfo()['version']) >= 170000 - r['tests']['connectable'] = True - r['tests']['credentials'] = True - try: - cli.listwallets() - r['tests']['wallets'] = True - except RpcError as rpce: - logger.error(rpce) - if rpce.status_code == 404: - r['tests']['wallets'] = False - else: - raise rpce - r["out"] = json.dumps(cli.getblockchaininfo(),indent=4) - r["err"] = "" - r["code"] = 0 - except ConnectionError as e: - logger.error(e) - r['tests']['connectable'] = False - r["err"] = "Failed to connect!" - r["code"] = -1 - except RpcError as rpce: - logger.error(rpce) - if rpce.status_code == 401: - r['tests']['credentials'] = False - else: - raise rpce - except Exception as e: - logger.error(e) - r["out"] = "" - if cli.r is not None and "error" in cli.r: - r["err"] = cli.r["error"] - r["code"] = cli.r.status_code - else: - r["err"] = "Failed to connect" - r["code"] = -1 - return r - - def _save(self): - with open(os.path.join(self.data_folder, self.CONFIG_FILE_NAME), "w") as f: - f.write(json.dumps(self.config, indent=4)) - - def update_rpc(self, **kwargs): - need_update = False - for k in kwargs: - if self.config["rpc"][k] != kwargs[k]: - self.config["rpc"][k] = kwargs[k] - need_update = True - if need_update: - self._save() - self.check() - - def update_auth(self, auth): - ''' simply persisting the current auth-choice ''' - if self.config["auth"] != auth: - self.config["auth"] = auth - self._save() - - def update_explorer(self, explorer): - ''' update the block explorers urls ''' - - if explorer and not explorer.endswith("/"): - # make sure the urls end with a "/" - explorer += "/" - - # update the urls in the app config - if self.config["explorers"][self.chain] != explorer: - self.config["explorers"][self.chain] = explorer - - def update_hwi_bridge_url(self, url): - ''' update the hwi bridge url to use ''' - if self.config["hwi_bridge_url"] != url: - if url and not url.endswith("/"): - # make sure the urls end with a "/" - url += "/" - self.config["hwi_bridge_url"] = url - self._save() - - @property - def info(self): - return self._info - - def combine(self, psbt_arr): - final_psbt = self.cli.combinepsbt(psbt_arr) - return final_psbt - - def finalize(self, psbt): - final_psbt = self.cli.finalizepsbt(psbt) - return final_psbt - - def broadcast(self, raw): - res = self.cli.sendrawtransaction(raw) - return res - - def estimatesmartfee(self, blocks): - return self.cli.estimatesmartfee(blocks) - - @property - def chain(self): - return self._info["chain"] - - @property - def explorer(self): - if "explorers" in self.config and self.chain in self.config["explorers"]: - return self.config["explorers"][self.chain] - else: - return "" - - -class DeviceManager: - ''' A DeviceManager mainly manages the persistence of a device-json-structures - compliant to helper.load_jsons - ''' - # of them via json-files in an empty data folder - def __init__(self, data_folder): - self.update(data_folder) - - def update(self, data_folder=None): - if data_folder is not None: - self.data_folder = data_folder - if data_folder.startswith("~"): - data_folder = os.path.expanduser(data_folder) - # creating folders if they don't exist - if not os.path.isdir(data_folder): - os.mkdir(data_folder) - self._devices = load_jsons(self.data_folder, key="name") - - def names(self): - return list(self._devices.keys()) - - def add(self, name, device_type, keys): - dev = { - "name": name, - "type": device_type, - "keys": [] - } - fname = alias(name) - i = 2 - while os.path.isfile(os.path.join(self.data_folder, "%s.json" % fname)): - fname = alias("%s %d" % (name, i)) - i+=1 - # removing duplicates - key_arr = [k["original"] for k in dev["keys"]] - for k in keys: - if k["original"] not in key_arr: - dev["keys"].append(k) - key_arr.append(k["original"]) - with open(os.path.join(self.data_folder, "%s.json" % fname), "w") as f: - f.write(json.dumps(dev,indent=4)) - self.update() # reload files - return self[name] - - def get_by_alias(self, fname): - for dev in self: - if dev["alias"] == fname: - return dev - logger.error("Could not find Device %s" % fname) - - def remove(self, device): - os.remove(device["fullpath"]) - self.update() - - def __getitem__(self, name): - return Device(self._devices[name], manager=self) - - def __iter__(self): - self._n = 0 - return self - - def __next__(self): - arr = sorted(list(self._devices.keys())) - if self._n < len(arr): - v = self._devices[arr[self._n]] - self._n += 1 - return Device(v, manager=self) - else: - raise StopIteration - - def __len__(self): - return len(self._devices.keys()) - -class Device(dict): - QR_CODE_TYPES = ['specter', 'other'] - SD_CARD_TYPES = ['coldcard', 'other'] - HWI_TYPES = ['keepkey', 'ledger', 'trezor', 'specter', 'coldcard'] - - def __init__(self, d, manager): - self.manager = manager - self.update(d) - self._dict = d - - def update_keys(self, keys): - self["keys"] = keys - with open(self["fullpath"], "r") as f: - content = json.loads(f.read()) - content["keys"] = self["keys"] - with open(self["fullpath"], "w") as f: - f.write(json.dumps(content,indent=4)) - self.manager.update() - - def remove_key(self, key): - keys = [k for k in self["keys"] if k["original"]!=key] - self.update_keys(keys) - - def add_keys(self, normalized): - key_arr = [k["original"] for k in self["keys"]] - keys = self["keys"] - for k in normalized: - if k["original"] not in key_arr: - keys.append(k) - key_arr.append(k["original"]) - self.update_keys(keys) - -class WalletManager: - # chain is required to manage wallets when bitcoin-cli is not running - def __init__(self, data_folder, cli, chain, path="specter"): - self.data_folder = data_folder - self.chain = chain - self.cli = cli - self.cli_path = path - self.update(data_folder, cli, chain) - - def update(self, data_folder=None, cli=None, chain=None): - if chain is not None: - self.chain = chain - if data_folder is not None: - self.data_folder = data_folder - if data_folder.startswith("~"): - data_folder = os.path.expanduser(data_folder) - # creating folders if they don't exist - if not os.path.isdir(data_folder): - os.mkdir(data_folder) - self.working_folder = None - if self.chain is not None and self.data_folder is not None: - self.working_folder = os.path.join(self.data_folder, self.chain) - if self.working_folder is not None and not os.path.isdir(self.working_folder): - os.mkdir(self.working_folder) - if cli is not None: - self.cli = cli - if self.working_folder is not None: - self._wallets = {} - wallets = load_jsons(self.working_folder, key="name") - existing_wallets = [w["name"] for w in self.cli.listwalletdir()["wallets"]] - for k in wallets: - if os.path.join(self.cli_path,wallets[k]["alias"]) in existing_wallets: - self._wallets[k] = wallets[k] - else: - logger.warn("Couldn't find wallet %s in core's wallets. Silently ignored!" % wallets[k]["alias"]) - else: - self._wallets = {} - - def load_all(self): - loaded_wallets = self.cli.listwallets() - loadable_wallets = [w["name"] for w in self.cli.listwalletdir()["wallets"]] - not_loaded_wallets = [w for w in loadable_wallets if w not in loaded_wallets] - if not_loaded_wallets != []: - logger.warn("not loaded wallets:%s" % not_loaded_wallets) - for k in self._wallets: - if os.path.join(self.cli_path,self._wallets[k]["alias"]) in not_loaded_wallets: - logger.debug("loading %s " % self._wallets[k]["alias"]) - self.cli.loadwallet(os.path.join(self.cli_path,self._wallets[k]["alias"])) - if "pending_psbts" in self._wallets[k] and len(self._wallets[k]["pending_psbts"]) > 0: - for psbt in self._wallets[k]["pending_psbts"]: - logger.debug("lock %s " % self._wallets[k]["alias"], self._wallets[k]["pending_psbts"][psbt]["tx"]["vin"]) - Wallet(self._wallets[k], self).cli.lockunspent(False, [utxo for utxo in self._wallets[k]["pending_psbts"][psbt]["tx"]["vin"]]) - - def get_by_alias(self, alias): - for w in self: - if w["alias"] == alias: - return w - raise SpecterError("Wallet %s does not exist!" % alias) - - def names(self): - return list(self._wallets.keys()) - - def _get_intial_wallet_dict(self, name): - walletsindir = [wallet["name"] for wallet in self.cli.listwalletdir()["wallets"]] - al = alias(name) - i = 2 - while os.path.isfile(os.path.join(self.working_folder, "%s.json" % al)) or os.path.join(self.cli_path,al) in walletsindir: - al = alias("%s %d" % (name, i)) - i+=1 - dic = { - "alias": al, - "fullpath": os.path.join(self.working_folder, "%s.json" % al), - "name": name, - "address_index": 0, - "keypool": 0, - "address": None, - "change_index": 0, - "change_address": None, - "change_keypool": 0, - "pending_psbts": {} - } - return dic - - def create_simple(self, name, key_type, key, device): - o = self._get_intial_wallet_dict(name) - arr = key_type.split("-") - desc = key["xpub"] - if key["derivation"] is not None: - desc = "[%s%s]%s" % (key["fingerprint"], key["derivation"][1:], key["xpub"]) - recv_desc = "%s/0/*" % desc - change_desc = "%s/1/*" % desc - for el in arr[::-1]: - recv_desc = "%s(%s)" % (el, recv_desc) - change_desc = "%s(%s)" % (el, change_desc) - recv_desc = AddChecksum(recv_desc) - change_desc = AddChecksum(change_desc) - o.update({ - "type": "simple", - "description": purposes[key_type], - "key": key, - "recv_descriptor": recv_desc, - "change_descriptor": change_desc, - "device": device["name"], - "device_type": device["type"], - "address_type": addrtypes[key_type], - }) - # add wallet to internal dict - self._wallets[o["alias"]] = o - # create a wallet in Bitcoin Core - r = self.cli.createwallet(os.path.join(self.cli_path,o["alias"]), True) - # save wallet file to disk - if self.working_folder is not None: - with open(o["fullpath"], "w+") as f: - f.write(json.dumps(o, indent=4)) - # update myself - loads wallet files - self.update() - # get Wallet class instance - w = Wallet(o, self) - return w - - def create_multi(self, name, sigs_required, key_type, keys, devices): - o = self._get_intial_wallet_dict(name) - # TODO: refactor, ugly - arr = key_type.split("-") - descs = [key["xpub"] for key in keys] - for i, desc in enumerate(descs): - key = keys[i] - if key["derivation"] is not None: - descs[i] = "[%s%s]%s" % (key["fingerprint"], key["derivation"][1:], key["xpub"]) - recv_descs = ["%s/0/*" % desc for desc in descs] - change_descs = ["%s/1/*" % desc for desc in descs] - recv_desc = "multi({},{})".format(sigs_required, ",".join(recv_descs)) - change_desc = "multi({},{})".format(sigs_required, ",".join(change_descs)) - for el in arr[::-1]: - recv_desc = "%s(%s)" % (el, recv_desc) - change_desc = "%s(%s)" % (el, change_desc) - recv_desc = AddChecksum(recv_desc) - change_desc = AddChecksum(change_desc) - devices_list = [] - for device in devices: - devices_list.append({ - "name": device["name"], - "type": device["type"] - }) - o.update({ - "type": "multisig", - "description": "{} of {} {}".format(sigs_required, len(keys), purposes[key_type]), - "sigs_required": sigs_required, - "keys": keys, - "recv_descriptor": recv_desc, - "change_descriptor": change_desc, - "devices": devices_list, - "address_type": addrtypes[key_type] - }) - # add wallet to internal dict - self._wallets[o["alias"]] = o - # create a wallet in Bitcoin Core - r = self.cli.createwallet(os.path.join(self.cli_path,o["alias"]), True) - # save wallet file to disk - if self.working_folder is not None: - with open(o["fullpath"], "w+") as f: - f.write(json.dumps(o, indent=4)) - # update myself - loads wallet files - self.update() - # get Wallet class instance - w = Wallet(o, self) - return w - - def delete_wallet(self, wallet): - logger.info("Deleting {}".format(wallet["alias"])) - self.cli.unloadwallet(os.path.join(self.cli_path,wallet["alias"])) - # Try deleting wallet file - if get_default_datadir() and os.path.exists(os.path.join(get_default_datadir(), os.path.join(self.cli_path,wallet["alias"]))): - shutil.rmtree(os.path.join(get_default_datadir(), os.path.join(self.cli_path,wallet["alias"]))) - # Delete JSON - if os.path.exists(wallet["fullpath"]): - os.remove(wallet["fullpath"]) - - def rename_wallet(self, wallet, name): - logger.info("Renaming {}".format(wallet["alias"])) - wallet["name"] = name - if self.working_folder is not None: - with open(wallet["fullpath"], "w+") as f: - f.write(json.dumps(wallet, indent=4)) - self.update() - - def __getitem__(self, name): - return Wallet(self._wallets[name], manager=self) - - def __iter__(self): - self._n = 0 - return self - - def __next__(self): - arr = sorted(list(self._wallets.keys())) - if self._n < len(arr): - v = self._wallets[arr[self._n]] - self._n += 1 - return Wallet(v, manager=self) - else: - raise StopIteration - - def __len__(self): - return len(self._wallets.keys()) - -class Wallet(dict): - def __init__(self, d, manager=None): - self.update(d) - self.manager = manager - self.cli_path = manager.cli_path - self.cli = manager.cli.wallet(os.path.join(self.cli_path,self["alias"])) - self._dict = d - # check if address is known and derive if not - # address derivation will also refill the keypool if necessary - if self._dict["address"] is None: - self._dict["address"] = self.get_address(0) - self.setlabel(self._dict["address"], "Address #0") - if self._dict["change_address"] is None: - self._dict["change_address"] = self.get_address(0, change=True) - self.cli.scan_addresses(self) - self.getdata() - - def _commit(self, update_manager=True): - with open(self["fullpath"], "w") as f: - f.write(json.dumps(self._dict, indent=4)) - self.update(self._dict) - if update_manager: - self.manager.update() - - def _uses_device_type(self, type_list): - if not self.is_multisig: - return self.get("device_type") in type_list - else: - for device in self.get("devices"): - if device["type"] in type_list: - return True - return False - - @property - def uses_qrcode_device(self): - return self._uses_device_type(Device.QR_CODE_TYPES) - @property - def uses_sdcard_device(self): - return self._uses_device_type(Device.SD_CARD_TYPES) - @property - def uses_hwi_device(self): - return self._uses_device_type(Device.HWI_TYPES) - - @property - def is_multisig(self): - return "sigs_required" in self - - @property - def sigs_required(self): - return 1 if not self.is_multisig else self["sigs_required"] - - @property - def pending_psbts(self): - if "pending_psbts" not in self._dict: - return {} - return self._dict["pending_psbts"] - - @property - def locked_amount(self): - amount = 0 - for psbt in self.pending_psbts: - amount += sum([utxo["witness_utxo"]["amount"] for utxo in self.pending_psbts[psbt]["inputs"]]) - return amount - - def delete_pending_psbt(self, txid): - try: - self.cli.lockunspent(True, self._dict["pending_psbts"][txid]["tx"]["vin"]) - except: - # UTXO was spent - pass - del self._dict["pending_psbts"][txid] - self._commit() - - def update_pending_psbt(self, psbt, txid, raw, device_name): - if txid in self._dict["pending_psbts"]: - self._dict["pending_psbts"][txid]["sigs_count"] += 1 - self._dict["pending_psbts"][txid]["base64"] = psbt - if device_name: - if "devices_signed" not in self._dict["pending_psbts"][txid]: - self._dict["pending_psbts"][txid]["devices_signed"] = [] - self._dict["pending_psbts"][txid]["devices_signed"].append(device_name) - if "hex" in raw: - self._dict["pending_psbts"][txid]["raw"] = raw["hex"] - self._commit() - - def _check_change(self): - addr = self["change_address"] - if addr is not None: - # check if address was used already - v = self.cli.getreceivedbyaddress(addr, 0) - # if not - just return - if v == 0: - return - self._dict["change_index"] += 1 - else: - if "change_index" not in self._dict: - self._dict["change_index"] = 0 - index = self._dict["change_index"] - self._dict["change_address"] = self.get_address(self._dict["change_index"], change=True) - self._commit() - - @property - def txlist(self): - ''' The last 1000 transactions for that wallet - filtering out change addresses transactions and duplicated transactions (except for self-transfers) - This list is used for the wallet `txs` tab to list the wallet transacions. - ''' - txidlist = [] - txlist = [] - for tx in self.transactions: - if tx["is_change"] == False and (tx["is_self"] or tx["txid"] not in txidlist): - txidlist.append(tx["txid"]) - txlist.append(tx) - return txlist - - def getdata(self): - try: - self.balance = self.getbalances() - except: - self.balance = None - try: - self.utxo = self.cli.listunspent(0) - except: - self.utxo = None - try: - self.transactions = self.cli.listtransactions("*", 1000, 0, True) - except: - self.transactions = None - try: - self.info = self.cli.getwalletinfo() - except: - self.info = None - - self._check_change() - return { - "balance": self.balance, - "transactions": self.transactions, - "utxo": self.utxo - } - - @property - def rescan_progress(self): - """Returns None if rescanblockchain is not launched, - value between 0 and 1 otherwise - """ - if self.info is None or "scanning" not in self.info or self.info["scanning"] == False: - return None - else: - return self.info["scanning"]["progress"] - - def getnewaddress(self, change=False): - label = "Change" if change else "Address" - index_type = "change_index" if change else "address_index" - address_type = "change_address" if change else "address" - self._dict[index_type] += 1 - addr = self.get_address(self._dict[index_type], change=change) - self.setlabel(addr, "{} #{}".format(label, self._dict[index_type])) - self._dict[address_type] = addr - self._commit() - return addr - - def get_address(self, index, change=False): - # FIXME: refactor wallet dict keys to get rid of this - pool, desc = ("keypool", "recv_descriptor") - if change: - pool, desc = ("change_keypool", "change_descriptor") - if self._dict[pool] < index+WALLET_CHUNK: - self.keypoolrefill(self._dict[pool], index+WALLET_CHUNK, change=change) - if self.is_multisig: - # using sortedmulti for addresses - sorted_desc = self.sort_descriptor(self[desc], index=index, change=change) - return self.cli.deriveaddresses(sorted_desc, change=change)[0] - return self.cli.deriveaddresses(self._dict[desc], [index, index+1], change=change)[0] - - def geterror(self): - if self.cli.r is not None: - try: - err = self.cli.r.json() - except: - return self.cli.r.text - if "error" in err: - if "message" in err["error"]: - return err["error"]["message"] - return err - return self.cli.r - return None - - def getbalance(self, *args, **kwargs): - ''' wrapping the cli-call: - Returns the total available balance. - The available balance is what the wallet considers currently spendable ''' - default_args = ["*", 0, True] - args = list(args) + default_args[len(args):] - try: - return self.cli.getbalance(*args, **kwargs) - except: - return None - - def getbalances(self, *args, **kwargs): - ''' 18.1 doesn't support it, so we need to build it ourselves... ''' - r = { - "trusted": 0, - "untrusted_pending": 0, - } - try: - r["trusted"] = self.getbalance() - unspent = self.cli.listunspent(0, 0) - for t in unspent: - r["untrusted_pending"] += t["amount"] - # Bitcoin Core doesn't return locked UTXO with `listunspent` command. - # `getbalance` command doesn't return balance from unconfirmed UTXO. - # So to imitate `getbalances` we need to add all unconfirmed locked UTXO balance manually with this loop. - locked_utxo = self.cli.listlockunspent() - for tx in locked_utxo: - tx_data = self.cli.gettransaction(tx["txid"]) - raw_tx = self.cli.decoderawtransaction(tx_data["hex"]) - if "confirmations" not in tx_data or tx_data["confirmations"] == 0: - r["untrusted_pending"] += raw_tx["vout"][tx["vout"]]["value"] - except: - r = { "trusted": None, "untrusted_pending": None } - self.balance = r - return r - - def getfullbalance(self): - ''' Returns sum of trusted balances AND pending transactions ''' - r = self.getbalances() - if r["trusted"] is None: - return None - return r["trusted"]+r["untrusted_pending"] - - def sort_descriptor(self, descriptor, index=None, change=False): - if index is not None: - descriptor = descriptor.replace("*", f"{index}") - # remove checksum - descriptor = descriptor.split("#")[0] - - # get address (should be already imported to the wallet) - address = self.cli.deriveaddresses(AddChecksum(descriptor), change=change)[0] - - # get pubkeys involved - address_info = self.cli.getaddressinfo(address) - if 'pubkeys' in address_info: - pubkeys = address_info["pubkeys"] - elif 'embedded' in address_info and 'pubkeys' in address_info['embedded']: - pubkeys = address_info["embedded"]["pubkeys"] - else: - raise Exception("Could not find 'pubkeys' in address info:\n%s" % json.dumps(address_info, indent=2)) - - # get xpubs from the descriptor - arr = descriptor.split("(multi(")[1].split(")")[0].split(",") - - # getting [wsh] or [sh, wsh] - prefix = descriptor.split("(multi(")[0].split("(") - sigs_required = arr[0] - keys = arr[1:] - - # sort them according to sortedmulti - z = sorted(zip(pubkeys,keys), key=lambda x: x[0]) - keys = [zz[1] for zz in z] - inner = f"{sigs_required},"+",".join(keys) - desc = f"multi({inner})" - - # Write from the inside out - prefix.reverse() - for p in prefix: - desc = f"{p}({desc})" - - return AddChecksum(desc) - - def keypoolrefill(self, start, end=None, change=False): - if end is None: - end = start + WALLET_CHUNK - desc = "recv_descriptor" if not change else "change_descriptor" - pool = "keypool" if not change else "change_keypool" - args = [ - { - "desc": self[desc], - "internal": change, - "range": [start, end], - "timestamp": "now", - "keypool": True, - "watchonly": True - } - ] - r = self.cli.importmulti(args, {"rescan": False}, timeout=120) - # bip67 requires sorted public keys for multisig addresses - if self.is_multisig: - # we do one at a time - args[0].pop("range") - for i in range(start, end): - sorted_desc = self.sort_descriptor(self[desc], index=i, change=change) - args[0]["desc"] = sorted_desc - self.cli.importmulti(args, {"rescan": False}, timeout=120) - self._dict[pool] = end - self._commit(update_manager=False) - return end - - def txonaddr(self, addr): - txlist = [tx for tx in self.transactions if tx["address"] == addr] - return len(txlist) - - def balanceonaddr(self, addr): - balancelist = [utxo["amount"] for utxo in self.utxo if utxo["address"] == addr] - return sum(balancelist) - - def txonlabel(self, label): - txlist = [tx for tx in self.transactions if self.getlabel(tx["address"]) == label] - return len(txlist) - - def balanceonlabel(self, label): - balancelist = [utxo["amount"] for utxo in self.utxo if self.getlabel(utxo["address"]) == label] - return sum(balancelist) - - def addressesonlabel(self, label): - return list(dict.fromkeys( - [tx["address"] for tx in self.transactions if self.getlabel(tx["address"]) == label] - )) - - def istxspent(self, txid): - return txid in [utxo["txid"] for utxo in self.utxo] - - def setlabel(self, addr, label): - self.cli.setlabel(addr, label) - - def getlabel(self, addr): - address_info = self.cli.getaddressinfo(addr) - return address_info["label"] if "label" in address_info and address_info["label"] != "" else addr - - def getaddressname(self, addr, addr_idx): - address_info = self.cli.getaddressinfo(addr) - if ("label" not in address_info or address_info["label"] == "") and addr_idx > -1: - self.setlabel(addr, "Address #{}".format(addr_idx)) - address_info["label"] = "Address #{}".format(addr_idx) - return addr if ("label" not in address_info or address_info["label"] == "") else address_info["label"] - - @property - def fullbalance(self): - ''' This is cached. Consider to use getfullbalance ''' - if self.balance is None: - return None - if self.balance["trusted"] is None or self.balance["untrusted_pending"] is None: - return None - return self.balance["trusted"]+self.balance["untrusted_pending"] - - @property - def availablebalance(self): - ''' This is cached.''' - if self.balance is None: - return None - if self.balance["trusted"] is None or self.balance["untrusted_pending"] is None: - return None - locked = [0] - for psbt in self.pending_psbts: - for i, txid in enumerate([tx["txid"] for tx in self.pending_psbts[psbt]["tx"]["vin"]]): - tx_data = self.cli.gettransaction(txid) - if "confirmations" in tx_data and tx_data["confirmations"] != 0: - locked.append(self.pending_psbts[psbt]["inputs"][i]["witness_utxo"]["amount"]) - return self.balance["trusted"]+self.balance["untrusted_pending"] - sum(locked) - - @property - def descriptor(self): - return self['recv_descriptor'].split("#")[0].replace("/0/*", "").replace("multi", "sortedmulti") - - @property - def fingerprint(self): - """ Unique fingerprint of the wallet - first 4 bytes of hash160 of its descriptor """ - h256 = hashlib.sha256(self.descriptor.encode()).digest() - h160 = hashlib.new('ripemd160', h256).digest() - return h160[:4] - - @property - def txoncurrentaddr(self): - addr = self["address"] - return self.txonaddr(addr) - - @property - def utxoaddresses(self): - return list(dict.fromkeys([ - utxo["address"] for utxo in - sorted( - self.utxo, - key = lambda utxo: next( - tx for tx in self.transactions if tx["txid"] == utxo["txid"] - )["time"] - ) - ])) - - @property - def utxolabels(self): - return list(dict.fromkeys([ - self.getlabel(utxo["address"]) for utxo in - sorted( - self.utxo, - key = lambda utxo: next( - tx for tx in self.transactions if tx["txid"] == utxo["txid"] - )["time"] - ) - ])) - - @property - def addresses(self): - return [self.get_address(idx) for idx in range(0,self._dict["address_index"] + 1)] - - @property - def active_addresses(self): - return list(dict.fromkeys(self.addresses + self.utxoaddresses)) - - @property - def change_addresses(self): - return [self.get_address(idx, change=True) for idx in range(0,self._dict["change_index"] + 1)] - - @property - def labels(self): - return list(dict.fromkeys([self.getlabel(addr) for addr in self.active_addresses])) - - def createpsbt(self, address:str, amount:float, subtract:bool=False, fee_rate:float=0.0, fee_unit="SAT_B", selected_coins=[]): - """ - fee_rate: in sat/B or BTC/kB. Default (None) bitcoin core sets feeRate automatically. - """ - if self.fullbalance < amount: - return None - logger.debug("fee unit: %s" % fee_unit) - if fee_unit not in ["SAT_B", "BTC_KB"]: - raise ValueError('Invalid bitcoin unit') - - extra_inputs = [] - if self.balance["trusted"] < amount: - txlist = self.cli.listunspent(0,0) - b = amount-self.balance["trusted"] - for tx in txlist: - extra_inputs.append({"txid": tx["txid"], "vout": tx["vout"]}) - b -= tx["amount"] - if b < 0: - break; - elif selected_coins != []: - still_needed = amount - for coin in selected_coins: - coin_txid = coin.split(",")[0] - coin_vout = int(coin.split(",")[1]) - coin_amount = float(coin.split(",")[2]) - extra_inputs.append({"txid": coin_txid, "vout": coin_vout}) - still_needed -= coin_amount - if still_needed < 0: - break; - if still_needed > 0: - raise SpecterError("Selected coins does not cover Full amount! Please select more coins!") - - # subtract fee from amount of this output: - # currently only one address is supported, so either - # empty array (subtract from change) or [0] - subtract_arr = [0] if subtract else [] - - options = { - "includeWatching": True, - "changeAddress": self["change_address"], - "subtractFeeFromOutputs": subtract_arr - } - - self.setlabel(self["change_address"], "Change #{}".format(self._dict["change_index"])) - - if fee_rate > 0.0 and fee_unit == "SAT_B": - # bitcoin core needs us to convert sat/B to BTC/kB - options["feeRate"] = fee_rate / 10 ** 8 * 1024 - - # Dont reuse change addresses - use getrawchangeaddress instead - r = self.cli.walletcreatefundedpsbt( - extra_inputs, # inputs - [{address: amount}], # output - 0, # locktime - options, # options - True # replaceable - ) - b64psbt = r["psbt"] - psbt = self.cli.decodepsbt(b64psbt) - psbt['base64'] = b64psbt - # adding xpub fields for coldcard - cc_psbt = PSBT() - cc_psbt.deserialize(b64psbt) - if self.is_multisig: - for k in self._dict["keys"]: - key = b'\x01'+helpers.decode_base58(k["xpub"]) - if "fingerprint" in k and k["fingerprint"] is not None: - fingerprint = bytes.fromhex(k["fingerprint"]) - else: - fingerprint = helpers.get_xpub_fingerprint(k["xpub"]) - if "derivation" in k and k["derivation"] is not None: - der = der_to_bytes(k["derivation"]) - else: - der = b'' - value = fingerprint+der - cc_psbt.unknown[key] = value - psbt["coldcard"]=cc_psbt.serialize() - - # removing unnecessary fields for specter - # to reduce size of the QR code - qr_psbt = PSBT() - qr_psbt.deserialize(b64psbt) - for inp in qr_psbt.inputs + qr_psbt.outputs: - inp.witness_script = b"" - inp.redeem_script = b"" - if len(inp.hd_keypaths) > 0: - k = list(inp.hd_keypaths.keys())[0] - # proprietary field - wallet derivation path - # only contains two last derivation indexes - change and index - inp.unknown[b"\xfc\xca\x01"+self.fingerprint] = b"".join([i.to_bytes(4, "little") for i in inp.hd_keypaths[k][-2:]]) - inp.hd_keypaths = {} - psbt["specter"]=qr_psbt.serialize() - logger.info("PSBT for Specter: %s" % psbt["specter"]) - - self.cli.lockunspent(False, psbt["tx"]["vin"]) - - if "pending_psbts" not in self._dict: - self._dict["pending_psbts"] = {} - psbt["amount"] = amount - psbt["address"] = address - psbt["time"] = time() - psbt["sigs_count"] = 0 - self._dict["pending_psbts"][psbt["tx"]["txid"]] = psbt - self._commit() - - return psbt - - def get_cc_file(self): - CC_TYPES = { - 'legacy': 'BIP45', - 'p2sh-segwit': 'P2WSH-P2SH', - 'bech32': 'P2WSH' - } - # try to find at least one derivation - # cc assume the same derivation for all keys :( - derivation = None - for k in self["keys"]: - if "derivation" in k and k["derivation"] is not None: - derivation = k["derivation"].replace("h","'") - break - if derivation is None: - return None - cc_file = """# Coldcard Multisig setup file (created on Specter Desktop) -# -Name: {} -Policy: {} of {} -Derivation: {} -Format: {} -""".format(self['name'], self['sigs_required'], - len(self['keys']), derivation, - CC_TYPES[self['address_type']] - ) - for k in self['keys']: - # cc assumes fingerprint is known - fingerprint = None - if 'fingerprint' in k: - fingerprint = k['fingerprint'] - if fingerprint is None: - fingerprint = get_xpub_fingerprint(k['xpub']).hex() - cc_file += "{}: {}\n".format(fingerprint.upper(), k['xpub']) - return cc_file - -def der_to_bytes(derivation): - items = derivation.split("/") - if len(items) == 0: - return b'' - if items[0] == 'm': - items = items[1:] - if items[-1] == '': - items = items[:-1] - res = b'' - for item in items: - index = 0 - if item[-1] == 'h' or item[-1] == "'": - index += 0x80000000 - item = item[:-1] - index += int(item) - res += index.to_bytes(4,'big') - return res - -if __name__ == '__main__': - # specter = Specter("~/_specter", config={"rpc":{"port":18332}}) - # specter = Specter(config={"rpc":{"port":18332}}) - specter = Specter("~/.specter") - w = specter.wallets['Stupid'] - print(w.getbalances()) - # print(w.getbalance("*", 0, True)) - # for v in specter.devices: - # print(v) - # print(v.is_multisig) diff --git a/src/cryptoadvance/specter/rpc.py b/src/cryptoadvance/specter/rpc.py index ba990ce844..c77df33bad 100644 --- a/src/cryptoadvance/specter/rpc.py +++ b/src/cryptoadvance/specter/rpc.py @@ -236,7 +236,7 @@ def fn(*args, **kwargs): ##### WORKING WITH WALLETS ######### - print(cli.getbalance(wallet="")) + # print(cli.getbalance(wallet="")) # or diff --git a/src/cryptoadvance/specter/rpc_cache.py b/src/cryptoadvance/specter/rpc_cache.py index ea9bed7fff..7df469d455 100644 --- a/src/cryptoadvance/specter/rpc_cache.py +++ b/src/cryptoadvance/specter/rpc_cache.py @@ -30,7 +30,7 @@ def scan_addresses(self, wallet): if "hdkeypath" in address_info: path = address_info["hdkeypath"].split('/') change = int(path[-2]) == 1 - while int(path[-1]) > wallet._dict["change_index" if change else "address_index"]: + while int(path[-1]) > (wallet.change_index if change else wallet.address_index): wallet.getnewaddress(change=change) if not scanning: self.cache.scanning_ended() @@ -64,7 +64,7 @@ def wallet(self, name=""): except RpcError as rpce: raise rpce except Exception as e: - raise e + raise e def listtransactions(self, *args, **kwargs): cli_transactions = self.cli.listtransactions(*args, **kwargs) diff --git a/src/cryptoadvance/specter/serializations.py b/src/cryptoadvance/specter/serializations.py index af10f52e93..42930039c5 100644 --- a/src/cryptoadvance/specter/serializations.py +++ b/src/cryptoadvance/specter/serializations.py @@ -787,7 +787,7 @@ def deserialize(self, psbt): # make sure that we got an unsigned tx if self.tx.is_null(): - raise PSBTSerializationError("No unsigned trasaction was provided") + raise PSBTSerializationError("No unsigned transaction was provided") # Read input data for txin in self.tx.vin: diff --git a/src/cryptoadvance/specter/server.py b/src/cryptoadvance/specter/server.py index de68869af5..ab22cd527f 100644 --- a/src/cryptoadvance/specter/server.py +++ b/src/cryptoadvance/specter/server.py @@ -7,9 +7,8 @@ from flask import Flask from flask_login import LoginManager, login_user -from .descriptor import AddChecksum from .helpers import hwi_get_config -from .logic import Specter +from .specter import Specter from .hwi_server import hwi_server logger = logging.getLogger(__name__) diff --git a/src/cryptoadvance/specter/specter.py b/src/cryptoadvance/specter/specter.py new file mode 100644 index 0000000000..ff3583678b --- /dev/null +++ b/src/cryptoadvance/specter/specter.py @@ -0,0 +1,237 @@ +import copy, json, logging, os, random +from .helpers import deep_update +from .rpc import autodetect_cli_confs, RpcError +from .rpc_cache import BitcoinCLICached +from .device_manager import DeviceManager +from .wallet_manager import WalletManager + + +logger = logging.getLogger(__name__) + +def get_cli(conf): + if "user" not in conf or conf["user"]=="": + conf["autodetect"] = True + if conf["autodetect"]: + if "port" in conf: + cli_conf_arr = autodetect_cli_confs(port=conf["port"]) + else: + cli_conf_arr = autodetect_cli_confs() + if len(cli_conf_arr) > 0: + cli = BitcoinCLICached(**cli_conf_arr[0]) + else: + return None + else: + cli = BitcoinCLICached(conf["user"], conf["password"], + host=conf["host"], port=conf["port"], protocol=conf["protocol"]) + return cli + +class Specter: + ''' A central Object mostly holding app-settings ''' + CONFIG_FILE_NAME = "config.json" + def __init__(self, data_folder="./data", config={}): + if data_folder.startswith("~"): + data_folder = os.path.expanduser(data_folder) + self.data_folder = data_folder + self.cli = None + self.device_manager = None + self.wallet_manager = None + + self.file_config = None # what comes from config file + self.arg_config = config # what comes from arguments + + # default config + self.config = { + "rpc": { + "autodetect": True, + "user": "", + "password": "", + "port": "", + "host": "localhost", # localhost + "protocol": "http" # https for the future + }, + "auth": "none", + "explorers": { + "main": "", + "test": "", + "regtest": "", + "signet": "" + }, + "hwi_bridge_url": "/hwi/api/", + # unique id that will be used in wallets path in Bitcoin Core + # empty by default for backward-compatibility + "uid": "", + } + + # creating folders if they don't exist + if not os.path.isdir(data_folder): + os.makedirs(data_folder) + + self._info = { "chain": None } + # health check: loads config and tests rpc + self.check() + + def check(self): + # if config.json file exists - load from it + if os.path.isfile(os.path.join(self.data_folder, "config.json")): + with open(os.path.join(self.data_folder, "config.json"), "r") as f: + self.file_config = json.loads(f.read()) + deep_update(self.config, self.file_config) + # otherwise - create one and assign unique id + else: + if self.config["uid"] == "": + self.config["uid"] = random.randint(0,256**8).to_bytes(8,'big').hex() + self._save() + + # init arguments + deep_update(self.config, self.arg_config) # override loaded config + + self.cli = get_cli(self.config["rpc"]) + self._is_configured = (self.cli is not None) + self._is_running = False + if self._is_configured: + try: + self._info = self.cli.getblockchaininfo() + self._is_running = True + except Exception as e: + logger.error("Exception %s while specter.check()" % e) + pass + + if not self._is_running: + self._info["chain"] = None + + chain = self._info["chain"] + if self.device_manager is None: + self.device_manager = DeviceManager(os.path.join(self.data_folder, "devices")) + else: + self.device_manager.update() + + if self.wallet_manager is None or chain is None: + wallets_path = "specter%s" % self.config["uid"] + self.wallet_manager = WalletManager( + os.path.join(self.data_folder, "wallets"), + self.cli, + chain, + self.device_manager, + path=wallets_path + ) + else: + self.wallet_manager.update( + os.path.join(self.data_folder, "wallets"), + self.cli, + chain=chain + ) + + def test_rpc(self, **kwargs): + conf = copy.deepcopy(self.config["rpc"]) + conf.update(kwargs) + cli = get_cli(conf) + if cli is None: + return {"out": "", "err": "autodetect failed", "code": -1} + r = {} + r['tests'] = {} + try: + r['tests']['recent_version'] = int(cli.getnetworkinfo()['version']) >= 170000 + r['tests']['connectable'] = True + r['tests']['credentials'] = True + try: + cli.listwallets() + r['tests']['wallets'] = True + except RpcError as rpce: + logger.error(rpce) + if rpce.status_code == 404: + r['tests']['wallets'] = False + else: + raise rpce + r["out"] = json.dumps(cli.getblockchaininfo(),indent=4) + r["err"] = "" + r["code"] = 0 + except ConnectionError as e: + logger.error(e) + r['tests']['connectable'] = False + r["err"] = "Failed to connect!" + r["code"] = -1 + except RpcError as rpce: + logger.error(rpce) + if rpce.status_code == 401: + r['tests']['credentials'] = False + else: + raise rpce + except Exception as e: + logger.error(e) + r["out"] = "" + if cli.r is not None and "error" in cli.r: + r["err"] = cli.r["error"] + r["code"] = cli.r.status_code + else: + r["err"] = "Failed to connect" + r["code"] = -1 + return r + + def _save(self): + with open(os.path.join(self.data_folder, self.CONFIG_FILE_NAME), "w") as f: + f.write(json.dumps(self.config, indent=4)) + + def update_rpc(self, **kwargs): + need_update = False + for k in kwargs: + if self.config["rpc"][k] != kwargs[k]: + self.config["rpc"][k] = kwargs[k] + need_update = True + if need_update: + self._save() + self.check() + + def update_auth(self, auth): + ''' simply persisting the current auth-choice ''' + if self.config["auth"] != auth: + self.config["auth"] = auth + self._save() + + def update_explorer(self, explorer): + ''' update the block explorers urls ''' + if explorer and not explorer.endswith("/"): + # make sure the urls end with a "/" + explorer += "/" + # update the urls in the app config + if self.config["explorers"][self.chain] != explorer: + self.config["explorers"][self.chain] = explorer + self._save() + + def update_hwi_bridge_url(self, url): + ''' update the hwi bridge url to use ''' + if self.config["hwi_bridge_url"] != url: + if url and not url.endswith("/"): + # make sure the urls end with a "/" + url += "/" + self.config["hwi_bridge_url"] = url + self._save() + + def combine(self, psbt_arr): + final_psbt = self.cli.combinepsbt(psbt_arr) + return final_psbt + + def finalize(self, psbt): + final_psbt = self.cli.finalizepsbt(psbt) + return final_psbt + + def broadcast(self, raw): + res = self.cli.sendrawtransaction(raw) + return res + + def estimatesmartfee(self, blocks): + return self.cli.estimatesmartfee(blocks) + + @property + def info(self): + return self._info + + @property + def chain(self): + return self._info["chain"] + + @property + def explorer(self): + if "explorers" in self.config and self.chain in self.config["explorers"]: + return self.config["explorers"][self.chain] + else: + return "" diff --git a/src/cryptoadvance/specter/specter_error.py b/src/cryptoadvance/specter/specter_error.py new file mode 100644 index 0000000000..a453e23f28 --- /dev/null +++ b/src/cryptoadvance/specter/specter_error.py @@ -0,0 +1,3 @@ +class SpecterError(Exception): + ''' A SpecterError contains meaningfull messages which can be passed directly to the user ''' + pass \ No newline at end of file diff --git a/src/cryptoadvance/specter/static/img/ledger_icon.svg b/src/cryptoadvance/specter/static/img/ledger_icon.svg new file mode 100644 index 0000000000..720abb7ef5 --- /dev/null +++ b/src/cryptoadvance/specter/static/img/ledger_icon.svg @@ -0,0 +1,14 @@ + + + + ledger + Created with Sketch. + + + + + + + + + \ No newline at end of file diff --git a/src/cryptoadvance/specter/static/img/trezor_icon.svg b/src/cryptoadvance/specter/static/img/trezor_icon.svg new file mode 100644 index 0000000000..4d8d8d8eec --- /dev/null +++ b/src/cryptoadvance/specter/static/img/trezor_icon.svg @@ -0,0 +1,11 @@ + + + + trezor + Created with Sketch. + + + + + + \ No newline at end of file diff --git a/src/cryptoadvance/specter/static/styles.css b/src/cryptoadvance/specter/static/styles.css index 5ed0ed3760..5ffde33066 100644 --- a/src/cryptoadvance/specter/static/styles.css +++ b/src/cryptoadvance/specter/static/styles.css @@ -776,7 +776,7 @@ table tr:hover .btn.hovering{ .page_overlay_popup { display: none; /*margin: 0 auto;*/ - max-width: 450px; + max-width: 500px; text-align: center; border-radius: 0.5em; border: 2px solid #313E50; diff --git a/src/cryptoadvance/specter/templates/device/device.jinja b/src/cryptoadvance/specter/templates/device/device.jinja index f6a357ad79..6430766062 100644 --- a/src/cryptoadvance/specter/templates/device/device.jinja +++ b/src/cryptoadvance/specter/templates/device/device.jinja @@ -9,28 +9,28 @@ - {% for key in device["keys"] %} + {% for key in device.keys %} {% from "components/bitcoin_svg.jinja" import bitcoin_svg %} - {% if key["xpub"].startswith("xpub") %} + {% if key.xpub.startswith("xpub") %} {{ bitcoin_svg('main', 16) }} Main {% else %} {{ bitcoin_svg('test', 16) }} Test {% endif %} - {{ purposes[key["type"]] }} - {{ key["derivation"] }} + {{ purposes[key.key_type] }} + {{ key.derivation }} - + - {{ key["original"] }} + {{ key.original }}
- +
@@ -48,5 +48,6 @@
+ {# TODO: Add wallets used in #} {% endblock %} diff --git a/src/cryptoadvance/specter/templates/device/new_device.jinja b/src/cryptoadvance/specter/templates/device/new_device.jinja index 048b50b84a..101a0dc661 100644 --- a/src/cryptoadvance/specter/templates/device/new_device.jinja +++ b/src/cryptoadvance/specter/templates/device/new_device.jinja @@ -2,7 +2,7 @@ {% block main %} {% include "includes/hwi/hwi.jinja" %} {% if device %} -

Adding keys to {{ device["name"] }}

+

Adding keys to {{ device.name }}

{% else %}

Setting up a new device

{% endif %} @@ -12,9 +12,19 @@
Name it     -

+ Device Type: +
+ +
{% endif %}

Scan, paste or load xpubs:

@@ -124,10 +134,6 @@ upub5En4f7k8gaG2KDHvBeEYox...rFpJRHpiZ4DE } else { txt.value += str+"\n"; } - var deviceType = document.getElementById("device_type"); - if (deviceType != null) { - deviceType.value = "coldcard"; - } } reader.readAsText(files[i]); } @@ -167,10 +173,6 @@ upub5En4f7k8gaG2KDHvBeEYox...rFpJRHpiZ4DE scanner.stop(); document.getElementById("popup").style.display = 'none'; document.getElementById("txt").value += result + "\n"; - var deviceType = document.getElementById("device_type"); - if (deviceType != null) { - deviceType.value = "specter"; - } }); document.getElementById("scanme").addEventListener("click", function(){ try{ diff --git a/src/cryptoadvance/specter/templates/includes/hwi/components/display_address.jinja b/src/cryptoadvance/specter/templates/includes/hwi/components/display_address.jinja index acd4bc4ab6..5138a5687b 100644 --- a/src/cryptoadvance/specter/templates/includes/hwi/components/display_address.jinja +++ b/src/cryptoadvance/specter/templates/includes/hwi/components/display_address.jinja @@ -3,7 +3,7 @@

Confirm Address


Please confirm address matches on your device

- Expected address: {{ wallet['address'] }} + Expected address: {{ wallet.address }}
@@ -12,29 +12,31 @@ {% endif %} \ No newline at end of file diff --git a/src/cryptoadvance/specter/templates/includes/hwi/components/sign_tx.jinja b/src/cryptoadvance/specter/templates/includes/hwi/components/sign_tx.jinja index ab3a125dab..979e98cb24 100644 --- a/src/cryptoadvance/specter/templates/includes/hwi/components/sign_tx.jinja +++ b/src/cryptoadvance/specter/templates/includes/hwi/components/sign_tx.jinja @@ -10,14 +10,13 @@
+ +
{{ device_psbts['sdcard'] }}
+ + + + + \ No newline at end of file diff --git a/src/cryptoadvance/specter/templates/includes/overlay/sd_card_sign.jinja b/src/cryptoadvance/specter/templates/includes/overlay/sd_card_sign.jinja new file mode 100644 index 0000000000..54b5fdbd05 --- /dev/null +++ b/src/cryptoadvance/specter/templates/includes/overlay/sd_card_sign.jinja @@ -0,0 +1,23 @@ + + + \ No newline at end of file diff --git a/src/cryptoadvance/specter/templates/includes/overlay/sign_tx_method.jinja b/src/cryptoadvance/specter/templates/includes/overlay/sign_tx_method.jinja new file mode 100644 index 0000000000..f8ef046e68 --- /dev/null +++ b/src/cryptoadvance/specter/templates/includes/overlay/sign_tx_method.jinja @@ -0,0 +1,54 @@ +{# + sign_tx_method - Popup to choose how to sign for a device. + Parameters: + - wallet: The wallet object used. + - device: The device object used. + - psbt: The psbt to be signed. + #} +{% macro sign_tx_method(wallet, device, psbt) -%} + {% set device_psbts = device.create_psbts(psbt['base64'], wallet) %} + +{%- endmacro %} \ No newline at end of file diff --git a/src/cryptoadvance/specter/templates/includes/sidebar/components/sidebar_device_list_item.jinja b/src/cryptoadvance/specter/templates/includes/sidebar/components/sidebar_device_list_item.jinja index 774eeeb5f3..4bb12043ce 100644 --- a/src/cryptoadvance/specter/templates/includes/sidebar/components/sidebar_device_list_item.jinja +++ b/src/cryptoadvance/specter/templates/includes/sidebar/components/sidebar_device_list_item.jinja @@ -6,11 +6,11 @@ #} {% macro sidebar_device_list_item(device, device_alias) -%} - {% if device.type == "specter" or device.type == "coldcard" %} - + {% if device.device_type in ['specter', 'coldcard', 'trezor', 'ledger'] %} + {% else %} {% endif %} -
{{device.name}}
{{ device["keys"] | length}} keys
+
{{ device.name }}
{{ device.keys | length}} keys
{%- endmacro %} diff --git a/src/cryptoadvance/specter/templates/includes/sidebar/components/sidebar_wallet_list_item.jinja b/src/cryptoadvance/specter/templates/includes/sidebar/components/sidebar_wallet_list_item.jinja index 868c8b0bff..ce086f8a9c 100644 --- a/src/cryptoadvance/specter/templates/includes/sidebar/components/sidebar_wallet_list_item.jinja +++ b/src/cryptoadvance/specter/templates/includes/sidebar/components/sidebar_wallet_list_item.jinja @@ -10,10 +10,10 @@ {% include "includes/sidebar/components/wallet_type_svg.jinja" %}
- {{wallet.name}} + {{ wallet.name }} {% if wallet.fullbalance is not none %} {{ wallet.fullbalance | btcamount}} - {%endif%} + {% endif %}
{{ wallet.description }}
diff --git a/src/cryptoadvance/specter/templates/includes/sidebar/sidebar.jinja b/src/cryptoadvance/specter/templates/includes/sidebar/sidebar.jinja index 458f113cc2..218d266a47 100644 --- a/src/cryptoadvance/specter/templates/includes/sidebar/sidebar.jinja +++ b/src/cryptoadvance/specter/templates/includes/sidebar/sidebar.jinja @@ -19,13 +19,13 @@
- {% if specter.devices and specter.chain %} + {% if (specter.device_manager.devices | length) != 0 and specter.chain %}
Wallets
{% from 'includes/sidebar/components/sidebar_wallet_list_item.jinja' import sidebar_wallet_list_item %} - {% for wallet in specter.wallets %} - {{ sidebar_wallet_list_item(wallet, wallet_alias, specter.info.chain) }} + {% for wallet in specter.wallet_manager.wallets_names %} + {{ sidebar_wallet_list_item(specter.wallet_manager.wallets[wallet], wallet_alias, specter.info.chain) }} {% endfor %} {{ sidebar_btn('new_wallet', 'Add new wallet') }} {% endif %} @@ -34,8 +34,8 @@ Devices {% from 'includes/sidebar/components/sidebar_device_list_item.jinja' import sidebar_device_list_item %} - {% for device in specter.devices %} - {{ sidebar_device_list_item(device, device_alias) }} + {% for device_name in specter.device_manager.devices_names %} + {{ sidebar_device_list_item(specter.device_manager.devices[device_name], device_alias) }} {% endfor %} {{ sidebar_btn('new_device', 'Add new device') }}