Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F14864114
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
110 KB
Subscribers
None
View Options
diff --git a/electrum/electrumabc_plugins/digitalbitbox/digitalbitbox.py b/electrum/electrumabc_plugins/digitalbitbox/digitalbitbox.py
index 6b8d177e8..6e2b39045 100644
--- a/electrum/electrumabc_plugins/digitalbitbox/digitalbitbox.py
+++ b/electrum/electrumabc_plugins/digitalbitbox/digitalbitbox.py
@@ -1,887 +1,887 @@
# ----------------------------------------------------------------------------------
# Electrum plugin for the Digital Bitbox hardware wallet by Shift Devices AG
# digitalbitbox.com
#
try:
import base64
import binascii
import hashlib
import hmac
import json
import math
import os
import re
import struct
import sys
import time
import hid
import requests
from ecdsa.curves import SECP256k1
from ecdsa.ecdsa import generator_secp256k1
from ecdsa.util import sigencode_der
from electrumabc.base_wizard import HWD_SETUP_NEW_WALLET
from electrumabc.bitcoin import (
DecodeAES_bytes,
EncodeAES_base64,
EncodeAES_bytes,
Hash,
MyVerifyingKey,
+ ScriptType,
SignatureType,
hmac_oneshot,
msg_magic,
point_to_ser,
pubkey_from_signature,
public_key_to_p2pkh,
push_script,
verify_message,
)
from electrumabc.i18n import _
from electrumabc.keystore import HardwareKeyStore
from electrumabc.printerror import print_error
- from electrumabc.transaction import Transaction, TxInput
+ from electrumabc.transaction import Transaction
from electrumabc.util import UserCancelled, to_string
from ..hw_wallet import HardwareClientBase, HWPluginBase
DIGIBOX = True
except ImportError:
DIGIBOX = False
# ----------------------------------------------------------------------------------
# USB HID interface
#
def to_hexstr(s):
return binascii.hexlify(s).decode("ascii")
def derive_keys(x):
h = Hash(x)
h = hashlib.sha512(h).digest()
return (h[:32], h[32:])
MIN_MAJOR_VERSION = 5
class DigitalBitboxClient(HardwareClientBase):
def __init__(self, plugin, hidDevice):
HardwareClientBase.__init__(self, plugin=plugin)
self.dbb_hid = hidDevice
self.opened = True
self.password = None
self.isInitialized = False
self.setupRunning = False
self.usbReportSize = 64 # firmware > v2.0.0
def close(self):
if self.opened:
with self.device_manager().hid_lock:
try:
self.dbb_hid.close()
except Exception:
pass
self.opened = False
def timeout(self, cutoff):
pass
def is_pairable(self):
return True
def is_initialized(self):
return self.dbb_has_password()
def is_paired(self):
return self.password is not None
def has_usable_connection_with_device(self):
try:
self.dbb_has_password()
except Exception:
return False
return True
def _get_xpub(self, bip32_path):
if self.check_device_dialog():
return self.hid_send_encrypt(b'{"xpub": "%s"}' % bip32_path.encode("utf8"))
def get_xpub(self, bip32_path, xtype):
assert xtype == "standard"
reply = self._get_xpub(bip32_path)
if reply:
xpub = reply["xpub"]
return xpub
else:
raise RuntimeError("no reply")
def dbb_has_password(self):
reply = self.hid_send_plain(b'{"ping":""}')
if "ping" not in reply:
raise Exception(
_(
"Device communication error. Please unplug and replug your Digital"
" Bitbox."
)
)
if reply["ping"] == "password":
return True
return False
def stretch_key(self, key):
return binascii.hexlify(
hashlib.pbkdf2_hmac(
"sha512", key.encode("utf-8"), b"Digital Bitbox", iterations=20480
)
)
def backup_password_dialog(self):
msg = _("Enter the password used when the backup was created:")
while True:
password = self.handler.get_passphrase(msg, False)
if password is None:
return None
if len(password) < 4:
msg = (
_("Password must have at least 4 characters.")
+ "\n\n"
+ _("Enter password:")
)
elif len(password) > 64:
msg = (
_("Password must have less than 64 characters.")
+ "\n\n"
+ _("Enter password:")
)
else:
return password.encode("utf8")
def password_dialog(self, msg):
while True:
password = self.handler.get_passphrase(msg, False)
if password is None:
return False
if len(password) < 4:
msg = (
_("Password must have at least 4 characters.")
+ "\n\n"
+ _("Enter password:")
)
elif len(password) > 64:
msg = (
_("Password must have less than 64 characters.")
+ "\n\n"
+ _("Enter password:")
)
else:
self.password = password.encode("utf8")
return True
def check_device_dialog(self):
# Check device firmware version
match = re.search(
r"v([0-9])+\.[0-9]+\.[0-9]+", self.dbb_hid.get_serial_number_string()
)
if match is None:
raise Exception("error detecting firmware version")
major_version = int(match.group(1))
if major_version < MIN_MAJOR_VERSION:
raise Exception(
"Please upgrade to the newest firmware using the BitBox Desktop app:"
" https://shiftcrypto.ch/start"
)
# Set password if fresh device
if self.password is None and not self.dbb_has_password():
if not self.setupRunning:
return False # A fresh device cannot connect to an existing wallet
msg = (
_("An uninitialized Digital Bitbox is detected.")
+ " "
+ _("Enter a new password below.")
+ "\n\n"
+ _("REMEMBER THE PASSWORD!")
+ "\n\n"
+ _("You cannot access your coins or a backup without the password.")
+ "\n"
+ _("A backup is saved automatically when generating a new wallet.")
)
if self.password_dialog(msg):
reply = self.hid_send_plain(b'{"password":"' + self.password + b'"}')
else:
return False
# Get password from user if not yet set
msg = _("Enter your Digital Bitbox password:")
while self.password is None:
if not self.password_dialog(msg):
return False
reply = self.hid_send_encrypt(b'{"led":"blink"}')
if "error" in reply:
self.password = None
if reply["error"]["code"] == 109:
msg = (
_("Incorrect password entered.")
+ "\n\n"
+ reply["error"]["message"]
+ "\n\n"
+ _("Enter your Digital Bitbox password:")
)
else:
# Should never occur
msg = (
_("Unexpected error occurred.")
+ "\n\n"
+ reply["error"]["message"]
+ "\n\n"
+ _("Enter your Digital Bitbox password:")
)
# Initialize device if not yet initialized
if not self.setupRunning:
self.isInitialized = True # Wallet exists. Electrum code later checks if the device matches the wallet
elif not self.isInitialized:
reply = self.hid_send_encrypt(b'{"device":"info"}')
if reply["device"]["id"] != "":
self.recover_or_erase_dialog() # Already seeded
else:
self.seed_device_dialog() # Seed if not initialized
self.mobile_pairing_dialog()
return self.isInitialized
def recover_or_erase_dialog(self):
msg = _("The Digital Bitbox is already seeded. Choose an option:") + "\n"
choices = [
(_("Create a wallet using the current seed")),
(
_(
"Load a wallet from the micro SD card (the current seed is"
" overwritten)"
)
),
(_("Erase the Digital Bitbox")),
]
try:
reply = self.handler.win.query_choice(msg, choices)
except Exception:
return # Back button pushed
if reply == 2:
self.dbb_erase()
elif reply == 1:
if not self.dbb_load_backup():
return
else:
if self.hid_send_encrypt(b'{"device":"info"}')["device"]["lock"]:
raise Exception(_("Full 2FA enabled. This is not supported yet."))
# Use existing seed
self.isInitialized = True
def seed_device_dialog(self):
msg = _("Choose how to initialize your Digital Bitbox:") + "\n"
choices = [
(_("Generate a new random wallet")),
(_("Load a wallet from the micro SD card")),
]
try:
reply = self.handler.win.query_choice(msg, choices)
except Exception:
return # Back button pushed
if reply == 0:
self.dbb_generate_wallet()
else:
if not self.dbb_load_backup(show_msg=False):
return
self.isInitialized = True
def mobile_pairing_dialog(self):
dbb_user_dir = None
if sys.platform == "darwin":
dbb_user_dir = os.path.join(
os.environ.get("HOME", ""), "Library", "Application Support", "DBB"
)
elif sys.platform == "win32":
dbb_user_dir = os.path.join(os.environ["APPDATA"], "DBB")
else:
dbb_user_dir = os.path.join(os.environ["HOME"], ".dbb")
if not dbb_user_dir:
return
try:
with open(os.path.join(dbb_user_dir, "config.dat"), encoding="utf-8") as f:
dbb_config = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return
if (
"encryptionprivkey" not in dbb_config
or "comserverchannelid" not in dbb_config
):
return
choices = [
_("Do not pair"),
_("Import pairing from the Digital Bitbox desktop app"),
]
try:
reply = self.handler.win.query_choice(_("Mobile pairing options"), choices)
except Exception:
return # Back button pushed
if reply == 0:
if self.plugin.is_mobile_paired():
del self.plugin.digitalbitbox_config["encryptionprivkey"]
del self.plugin.digitalbitbox_config["comserverchannelid"]
elif reply == 1:
# import pairing from dbb app
self.plugin.digitalbitbox_config["encryptionprivkey"] = dbb_config[
"encryptionprivkey"
]
self.plugin.digitalbitbox_config["comserverchannelid"] = dbb_config[
"comserverchannelid"
]
self.plugin.config.set_key("digitalbitbox", self.plugin.digitalbitbox_config)
def dbb_generate_wallet(self):
key = self.stretch_key(self.password)
filename = ("Electrum-" + time.strftime("%Y-%m-%d-%H-%M-%S") + ".pdf").encode(
"utf8"
)
msg = (
b'{"seed":{"source": "create", "key": "%s", "filename": "%s", "entropy":'
b' "%s"}}' % (key, filename, to_hexstr(os.urandom(32)))
)
reply = self.hid_send_encrypt(msg)
if "error" in reply:
raise Exception(reply["error"]["message"])
def dbb_erase(self):
self.handler.show_message(
_("Are you sure you want to erase the Digital Bitbox?")
+ "\n\n"
+ _("To continue, touch the Digital Bitbox's light for 3 seconds.")
+ "\n\n"
+ _("To cancel, briefly touch the light or wait for the timeout.")
)
hid_reply = self.hid_send_encrypt(b'{"reset":"__ERASE__"}')
self.handler.finished()
if "error" in hid_reply:
raise Exception(hid_reply["error"]["message"])
else:
self.password = None
raise Exception("Device erased")
def dbb_load_backup(self, show_msg=True):
backups = self.hid_send_encrypt(b'{"backup":"list"}')
if "error" in backups:
raise Exception(backups["error"]["message"])
try:
f = self.handler.win.query_choice(
_("Choose a backup file:"), backups["backup"]
)
except Exception:
return False # Back button pushed
key = self.backup_password_dialog()
if key is None:
raise Exception("Canceled by user")
key = self.stretch_key(key)
if show_msg:
self.handler.show_message(
_("Loading backup...")
+ "\n\n"
+ _("To continue, touch the Digital Bitbox's light for 3 seconds.")
+ "\n\n"
+ _("To cancel, briefly touch the light or wait for the timeout.")
)
msg = (
'{"seed":{"source": "backup", "key": "%s", "filename": "%s"}}'
% (key, backups["backup"][f])
).encode("utf8")
hid_reply = self.hid_send_encrypt(msg)
self.handler.finished()
if "error" in hid_reply:
raise Exception(hid_reply["error"]["message"])
return True
def hid_send_frame(self, data):
HWW_CID = 0xFF000000
HWW_CMD = 0x80 + 0x40 + 0x01
data_len = len(data)
seq = 0
idx = 0
write = []
while idx < data_len:
if idx == 0:
# INIT frame
write = data[idx : idx + min(data_len, self.usbReportSize - 7)]
self.dbb_hid.write(
b"\0"
+ struct.pack(">IBH", HWW_CID, HWW_CMD, data_len & 0xFFFF)
+ write
+ b"\xee" * (self.usbReportSize - 7 - len(write))
)
else:
# CONT frame
write = data[idx : idx + min(data_len, self.usbReportSize - 5)]
self.dbb_hid.write(
b"\0"
+ struct.pack(">IB", HWW_CID, seq)
+ write
+ b"\xee" * (self.usbReportSize - 5 - len(write))
)
seq += 1
idx += len(write)
def hid_read_frame(self):
# INIT response
read = bytearray(self.dbb_hid.read(self.usbReportSize))
# cid = ((read[0] * 256 + read[1]) * 256 + read[2]) * 256 + read[3]
# cmd = read[4]
data_len = read[5] * 256 + read[6]
data = read[7:]
idx = len(read) - 7
while idx < data_len:
# CONT response
read = bytearray(self.dbb_hid.read(self.usbReportSize))
data += read[5:]
idx += len(read) - 5
return data
def hid_send_plain(self, msg):
reply = ""
try:
serial_number = self.dbb_hid.get_serial_number_string()
if "v2.0." in serial_number or "v1." in serial_number:
hidBufSize = 4096
self.dbb_hid.write("\0" + msg + "\0" * (hidBufSize - len(msg)))
r = bytearray()
while len(r) < hidBufSize:
r += bytearray(self.dbb_hid.read(hidBufSize))
else:
self.hid_send_frame(msg)
r = self.hid_read_frame()
r = r.rstrip(b" \t\r\n\0")
r = r.replace(b"\0", b"")
r = to_string(r, "utf8")
reply = json.loads(r)
except Exception as e:
print_error("Exception caught " + str(e))
return reply
def hid_send_encrypt(self, msg):
sha256_byte_len = 32
reply = ""
try:
encryption_key, authentication_key = derive_keys(self.password)
msg = EncodeAES_bytes(encryption_key, msg)
hmac_digest = hmac_oneshot(authentication_key, msg, hashlib.sha256)
authenticated_msg = base64.b64encode(msg + hmac_digest)
reply = self.hid_send_plain(authenticated_msg)
if "ciphertext" in reply:
b64_unencoded = bytes(base64.b64decode("".join(reply["ciphertext"])))
reply_hmac = b64_unencoded[-sha256_byte_len:]
hmac_calculated = hmac_oneshot(
authentication_key, b64_unencoded[:-sha256_byte_len], hashlib.sha256
)
if not hmac.compare_digest(reply_hmac, hmac_calculated):
raise Exception("Failed to validate HMAC")
reply = DecodeAES_bytes(
encryption_key, b64_unencoded[:-sha256_byte_len]
)
reply = to_string(reply, "utf8")
reply = json.loads(reply)
if "error" in reply:
self.password = None
except Exception as e:
print_error("Exception caught " + str(e))
return reply
# ----------------------------------------------------------------------------------
#
#
class DigitalBitboxKeyStore(HardwareKeyStore):
hw_type = "digitalbitbox"
device = "DigitalBitbox"
def __init__(self, d):
HardwareKeyStore.__init__(self, d)
self.force_watching_only = False
self.maxInputs = 14 # maximum inputs per single sign command
def get_derivation(self):
return str(self.derivation)
def is_p2pkh(self):
return self.derivation.startswith("m/44'/")
def give_error(self, message, clear_client=False):
if clear_client:
self.client = None
raise Exception(message)
def decrypt_message(self, pubkey, message, password):
raise RuntimeError(
_("Encryption and decryption are currently not supported for {}").format(
self.device
)
)
def sign_message(self, sequence, message, password, sigtype=SignatureType.BITCOIN):
if sigtype == SignatureType.ECASH:
raise RuntimeError(
_("eCash message signing is not available for {}").format(self.device)
)
sig = None
try:
message = message.encode("utf8")
inputPath = self.get_derivation() + "/%d/%d" % sequence
msg_hash = Hash(msg_magic(message))
inputHash = to_hexstr(msg_hash)
hasharray = []
hasharray.append({"hash": inputHash, "keypath": inputPath})
hasharray = json.dumps(hasharray)
msg = b'{"sign":{"meta":"sign message", "data":%s}}' % hasharray.encode(
"utf8"
)
dbb_client = self.plugin.get_client(self)
if not dbb_client.is_paired():
raise Exception(_("Could not sign message."))
reply = dbb_client.hid_send_encrypt(msg)
self.handler.show_message(
_("Signing message ...")
+ "\n\n"
+ _(
"To continue, touch the Digital Bitbox's blinking light for 3"
" seconds."
)
+ "\n\n"
+ _(
"To cancel, briefly touch the blinking light or wait for the"
" timeout."
)
)
reply = dbb_client.hid_send_encrypt(
msg
) # Send twice, first returns an echo for smart verification (not implemented)
self.handler.finished()
if "error" in reply:
raise Exception(reply["error"]["message"])
if "sign" not in reply:
raise Exception(_("Could not sign message."))
if "recid" in reply["sign"][0]:
# firmware > v2.1.1
sig = bytes(
[27 + int(reply["sign"][0]["recid"], 16) + 4]
) + binascii.unhexlify(reply["sign"][0]["sig"])
pk, compressed = pubkey_from_signature(sig, msg_hash)
pk = point_to_ser(pk.pubkey.point, compressed)
addr = public_key_to_p2pkh(pk)
if verify_message(addr, sig, message) is False:
raise Exception(_("Could not sign message"))
elif "pubkey" in reply["sign"][0]:
# firmware <= v2.1.1
for i in range(4):
sig = bytes([27 + i + 4]) + binascii.unhexlify(
reply["sign"][0]["sig"]
)
try:
addr = public_key_to_p2pkh(
binascii.unhexlify(reply["sign"][0]["pubkey"])
)
if verify_message(addr, sig, message):
break
except Exception:
continue
else:
raise Exception(_("Could not sign message"))
except Exception as e:
self.give_error(e)
return sig
def sign_transaction(self, tx, password, *, use_cache=False):
if tx.is_complete():
return
try:
p2pkhTransaction = True
derivations = self.get_tx_derivations(tx)
inputhasharray = []
hasharray = []
pubkeyarray = []
# Build hasharray from inputs
- for i, txin in enumerate(tx.inputs()):
- if txin["type"] == "coinbase":
- self.give_error("Coinbase not supported") # should never happen
+ for i, txin in enumerate(tx.txinputs()):
+ if txin.type == ScriptType.coinbase:
+ # should never happen
+ self.give_error("Coinbase not supported")
- if txin["type"] != "p2pkh":
+ if txin.type != ScriptType.p2pkh:
p2pkhTransaction = False
- for x_pubkey in txin["x_pubkeys"]:
+ for x_pubkey in txin.x_pubkeys:
if x_pubkey in derivations:
index = derivations.get(x_pubkey)
inputPath = "%s/%d/%d" % (
self.get_derivation(),
index[0],
index[1],
)
inputHash = Hash(tx.serialize_preimage(i))
hasharray_i = {
"hash": to_hexstr(inputHash),
"keypath": inputPath,
}
hasharray.append(hasharray_i)
inputhasharray.append(inputHash)
break
else:
self.give_error(
"No matching x_key for sign_transaction"
) # should never happen
# Build pubkeyarray from outputs
for o in tx.outputs():
info = tx.output_info.get(o.destination)
if info is not None:
index, xpubs, m, script_type = info
changePath = self.get_derivation() + "/%d/%d" % index
changePubkey = self.derive_pubkey(index[0], index[1])
pubkeyarray_i = {"pubkey": changePubkey, "keypath": changePath}
pubkeyarray.append(pubkeyarray_i)
# Special serialization of the unsigned transaction for
# the mobile verification app.
# At the moment, verification only works for p2pkh transactions.
if p2pkhTransaction:
class CustomTXSerialization(Transaction):
# fixme: Transaction.serialize no longer calls input_script,
# so this overloading does not affect
# CustomTXSerialization.serialize()
@classmethod
def input_script(
self, txin, estimate_size=False, sign_schnorr=False
):
if txin["type"] == "p2pkh":
return Transaction.get_preimage_script(txin)
if txin["type"] == "p2sh":
# Multisig verification has partial support, but is disabled. This is the
# expected serialization though, so we leave it here until we activate it.
return "00" + push_script(
Transaction.get_preimage_script(txin)
)
raise Exception("unsupported type %s" % txin["type"])
tx_dbb_serialized = (
CustomTXSerialization(tx.serialize()).serialize().hex()
)
else:
# We only need this for the signing echo / verification.
tx_dbb_serialized = None
# Build sign command
dbb_signatures = []
steps = math.ceil(1.0 * len(hasharray) / self.maxInputs)
for step in range(int(steps)):
hashes = hasharray[step * self.maxInputs : (step + 1) * self.maxInputs]
msg = {
"sign": {
"data": hashes,
"checkpub": pubkeyarray,
},
}
if tx_dbb_serialized is not None:
msg["sign"]["meta"] = to_hexstr(Hash(tx_dbb_serialized))
msg = json.dumps(msg).encode("ascii")
dbb_client = self.plugin.get_client(self)
if not dbb_client.is_paired():
raise Exception("Could not sign transaction.")
reply = dbb_client.hid_send_encrypt(msg)
if "error" in reply:
raise Exception(reply["error"]["message"])
if "echo" not in reply:
raise Exception("Could not sign transaction.")
if self.plugin.is_mobile_paired() and tx_dbb_serialized is not None:
reply["tx"] = tx_dbb_serialized.hex()
self.plugin.comserver_post_notification(reply)
if steps > 1:
self.handler.show_message(
_("Signing large transaction. Please be patient ...")
+ "\n\n"
+ _(
"To continue, touch the Digital Bitbox's blinking light for"
" 3 seconds."
)
+ " "
+ _("(Touch {} of {})").format((step + 1), steps)
+ "\n\n"
+ _(
"To cancel, briefly touch the blinking light or wait for"
" the timeout."
)
+ "\n\n"
)
else:
self.handler.show_message(
_("Signing transaction...")
+ "\n\n"
+ _(
"To continue, touch the Digital Bitbox's blinking light for"
" 3 seconds."
)
+ "\n\n"
+ _(
"To cancel, briefly touch the blinking light or wait for"
" the timeout."
)
)
# Send twice, first returns an echo for smart verification
reply = dbb_client.hid_send_encrypt(msg)
self.handler.finished()
if "error" in reply:
if reply["error"].get("code") in (600, 601):
# aborted via LED short touch or timeout
raise UserCancelled()
raise Exception(reply["error"]["message"])
if "sign" not in reply:
raise Exception("Could not sign transaction.")
dbb_signatures.extend(reply["sign"])
# Fill signatures
if len(dbb_signatures) != len(tx.inputs()):
raise Exception(
"Incorrect number of transactions signed."
) # Should never occur
- for i, txin in enumerate(tx.inputs()):
- num = txin["num_sig"]
- for pubkey in txin["pubkeys"]:
- signatures = list(filter(None, txin["signatures"]))
- if len(signatures) == num:
- break # txin is complete
- ii = txin["pubkeys"].index(pubkey)
+ for i, txin in enumerate(tx.txinputs()):
+ if txin.is_complete():
+ break
+ pubkeys, x_pubkeys = txin.get_sorted_pubkeys()
+ for ii, pubkey in enumerate(pubkeys):
signed = dbb_signatures[i]
if "recid" in signed:
# firmware > v2.1.1
recid = int(signed["recid"], 16)
s = binascii.unhexlify(signed["sig"])
h = inputhasharray[i]
pk = MyVerifyingKey.from_signature(s, recid, h, curve=SECP256k1)
pk = to_hexstr(point_to_ser(pk.pubkey.point, True))
elif "pubkey" in signed:
# firmware <= v2.1.1
pk = signed["pubkey"]
if pk != pubkey:
continue
sig_r = int(signed["sig"][:64], 16)
sig_s = int(signed["sig"][64:], 16)
sig = sigencode_der(sig_r, sig_s, generator_secp256k1.order())
- txin["signatures"][ii] = to_hexstr(sig) + "41"
- tx.update_input(i, TxInput.from_coin_dict(txin))
+ txin.update_signature(sig + b"\x41", ii)
+ tx.update_input(i, txin)
except UserCancelled:
raise
except Exception as e:
self.give_error(e, True)
else:
print_error("Transaction is_complete", tx.is_complete())
tx.raw = tx.serialize()
class DigitalBitboxPlugin(HWPluginBase):
libraries_available = DIGIBOX
keystore_class = DigitalBitboxKeyStore
client = None
DEVICE_IDS = [(0x03EB, 0x2402)] # Digital Bitbox
def __init__(self, parent, config, name):
HWPluginBase.__init__(self, parent, config, name)
if self.libraries_available:
self.device_manager().register_devices(self.DEVICE_IDS, plugin=self)
self.digitalbitbox_config = self.config.get("digitalbitbox", {})
def get_dbb_device(self, device):
with self.device_manager().hid_lock:
dev = hid.device()
dev.open_path(device.path)
return dev
def create_client(self, device, handler):
if device.interface_number == 0 or device.usage_page == 0xFFFF:
self.handler = handler
client = self.get_dbb_device(device)
if client is not None:
client = DigitalBitboxClient(self, client)
return client
else:
return None
def setup_device(self, device_info, wizard, purpose):
device_id = device_info.device.id_
client = self.scan_and_create_client_for_device(
device_id=device_id, wizard=wizard
)
if purpose == HWD_SETUP_NEW_WALLET:
client.setupRunning = True
wizard.run_task_without_blocking_gui(
task=lambda: client.get_xpub("m/44'/0'", "standard")
)
return client
def is_mobile_paired(self):
return "encryptionprivkey" in self.digitalbitbox_config
def comserver_post_notification(self, payload):
assert self.is_mobile_paired(), "unexpected mobile pairing error"
url = "https://digitalbitbox.com/smartverification/index.php"
key_s = base64.b64decode(self.digitalbitbox_config["encryptionprivkey"])
args = "c=data&s=0&dt=0&uuid=%s&pl=%s" % (
self.digitalbitbox_config["comserverchannelid"],
EncodeAES_base64(key_s, json.dumps(payload).encode("ascii")).decode(
"ascii"
),
)
try:
requests.post(url, args)
except Exception as e:
self.handler.show_error(str(e))
def get_xpub(self, device_id, derivation, xtype, wizard):
client = self.scan_and_create_client_for_device(
device_id=device_id, wizard=wizard
)
client.check_device_dialog()
xpub = client.get_xpub(derivation, xtype)
return xpub
def get_client(self, keystore, force_pair=True):
devmgr = self.device_manager()
handler = keystore.handler
client = devmgr.client_for_keystore(self, handler, keystore, force_pair)
if client is not None:
client.check_device_dialog()
return client
diff --git a/electrum/electrumabc_plugins/ledger/ledger.py b/electrum/electrumabc_plugins/ledger/ledger.py
index 0d430afbf..487c57278 100644
--- a/electrum/electrumabc_plugins/ledger/ledger.py
+++ b/electrum/electrumabc_plugins/ledger/ledger.py
@@ -1,875 +1,875 @@
from __future__ import annotations
import hashlib
import inspect
import sys
import traceback
from struct import pack, unpack
from typing import Optional, Tuple
from electrumabc import bitcoin
from electrumabc.address import Address
-from electrumabc.bitcoin import TYPE_ADDRESS, TYPE_SCRIPT, SignatureType
-from electrumabc.constants import DEFAULT_TXIN_SEQUENCE
+from electrumabc.bitcoin import TYPE_ADDRESS, TYPE_SCRIPT, ScriptType, SignatureType
from electrumabc.i18n import _
from electrumabc.keystore import HardwareKeyStore
from electrumabc.plugins import Device
from electrumabc.printerror import is_verbose, print_error
from electrumabc.serialize import serialize_sequence
from electrumabc.transaction import Transaction
-from electrumabc.util import bfh, bh2u, versiontuple
+from electrumabc.util import bfh, versiontuple
from ..hw_wallet import HardwareClientBase, HWPluginBase
from ..hw_wallet.plugin import (
is_any_tx_output_on_change_branch,
validate_op_return_output_and_get_data,
)
try:
import hid
from btchip.bitcoinTransaction import bitcoinTransaction
from btchip.btchip import btchip
from btchip.btchipComm import HIDDongleHIDAPI
from btchip.btchipException import BTChipException
from btchip.btchipFirmwareWizard import checkFirmware
from btchip.btchipUtils import compress_public_key
BTCHIP = True
BTCHIP_DEBUG = is_verbose
except ImportError:
BTCHIP = False
MSG_NEEDS_FW_UPDATE_CASHADDR = _(
'Firmware version (or "Bitcoin Cash" app) too old for CashAddr support. '
) + _("Please update at https://www.ledgerwallet.com")
MSG_NEEDS_SW_UPDATE_CASHADDR = _("python-btchip is too old for CashAddr support. ") + _(
"Please update to v0.1.27 or greater"
)
BITCOIN_CASH_SUPPORT_HW1 = (1, 0, 4)
BITCOIN_CASH_SUPPORT = (1, 1, 8)
CASHADDR_SUPPORT = (1, 2, 5)
MULTI_OUTPUT_SUPPORT = (1, 1, 4)
TRUSTED_INPUTS_REQUIRED = (1, 4, 0)
def test_pin_unlocked(func):
"""Function decorator to test the Ledger for being unlocked, and if not,
raise a human-readable exception.
"""
def catch_exception(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except BTChipException as e:
if e.sw in (0x6982, 0x6F04):
raise Exception(
_("Your {} is locked. Please unlock it.").format(self.device)
+ "\n\n"
+ _(
"After unlocking, may also need to re-open this wallet window"
" as well."
)
) from e
else:
raise
return catch_exception
class LedgerClient(HardwareClientBase):
def __init__(
self, hidDevice, *, product_key: Tuple[int, int], plugin: HWPluginBase
):
HardwareClientBase.__init__(self, plugin=plugin)
self.device = plugin.device
self.dongleObject = btchip(hidDevice)
self.preflightDone = False
self._product_key = product_key
def is_pairable(self):
return True
def close(self):
with self.device_manager().hid_lock:
self.dongleObject.dongle.close()
def timeout(self, cutoff):
pass
def is_initialized(self):
return True
def is_hw1(self):
return self._product_key[0] == 0x2581
def device_model_name(self):
return LedgerPlugin.device_name_from_product_key(self._product_key)
def i4b(self, x):
return pack(">I", x)
def has_usable_connection_with_device(self):
try:
self.dongleObject.getFirmwareVersion()
except BTChipException as e:
if e.sw == 0x6700:
# When Ledger is in the app selection menu, getting the firmware version results
# in 0x6700 being returned. Getting an error code back means we can actually
# communicate with the device, so we return True here.
return True
return False
except Exception:
return False
return True
@test_pin_unlocked
def get_xpub(self, bip32_path, xtype):
self.checkDevice()
# bip32_path is of the form 44'/0'/1'
# S-L-O-W - we don't handle the fingerprint directly, so compute
# it manually from the previous node
# This only happens once so it's bearable
# self.get_client() # prompt for the PIN before displaying the dialog if necessary
# self.handler.show_message("Computing master public key")
splitPath = bip32_path.split("/")
if splitPath[0] == "m":
splitPath = splitPath[1:]
bip32_path = bip32_path[2:]
fingerprint = 0
if len(splitPath) > 1:
prevPath = "/".join(splitPath[0 : len(splitPath) - 1])
nodeData = self.dongleObject.getWalletPublicKey(prevPath)
publicKey = compress_public_key(nodeData["publicKey"])
h = bitcoin.hash_160(publicKey)
fingerprint = unpack(">I", h[0:4])[0]
nodeData = self.dongleObject.getWalletPublicKey(bip32_path)
publicKey = compress_public_key(nodeData["publicKey"])
depth = len(splitPath)
lastChild = splitPath[len(splitPath) - 1].split("'")
childnum = (
int(lastChild[0]) if len(lastChild) == 1 else 0x80000000 | int(lastChild[0])
)
xpub = bitcoin.serialize_xpub(
xtype,
nodeData["chainCode"],
publicKey,
depth,
self.i4b(fingerprint),
self.i4b(childnum),
)
return xpub
def has_detached_pin_support(self, client):
try:
client.getVerifyPinRemainingAttempts()
return True
except BTChipException as e:
if e.sw == 0x6D00:
return False
raise e
def is_pin_validated(self, client):
try:
# Invalid SET OPERATION MODE to verify the PIN status
client.dongle.exchange(bytearray([0xE0, 0x26, 0x00, 0x00, 0x01, 0xAB]))
except BTChipException as e:
if e.sw == 0x6982:
return False
if e.sw == 0x6A80:
return True
raise e
def supports_bitcoin_cash(self):
return self.bitcoinCashSupported
def fw_supports_cashaddr(self):
return self.cashaddrFWSupported
def sw_supports_cashaddr(self):
return self.cashaddrSWSupported
def supports_cashaddr(self):
return self.fw_supports_cashaddr() and self.sw_supports_cashaddr()
def supports_multi_output(self):
return self.multiOutputSupported
def requires_trusted_inputs(self):
return self.trustedInputsRequired
def perform_hw1_preflight(self):
try:
firmwareInfo = self.dongleObject.getFirmwareVersion()
firmwareVersion = versiontuple(firmwareInfo["version"])
self.bitcoinCashSupported = (
firmwareVersion >= BITCOIN_CASH_SUPPORT
or self.is_hw1()
and firmwareVersion >= BITCOIN_CASH_SUPPORT_HW1
)
self.cashaddrFWSupported = firmwareVersion >= CASHADDR_SUPPORT
self.multiOutputSupported = firmwareVersion >= MULTI_OUTPUT_SUPPORT
self.trustedInputsRequired = firmwareVersion >= TRUSTED_INPUTS_REQUIRED
if not checkFirmware(firmwareInfo) or not self.supports_bitcoin_cash():
self.close()
raise Exception(
_(
"{} firmware version too old. Please update at"
" https://www.ledgerwallet.com"
).format(self.device)
)
try:
self.dongleObject.getOperationMode()
except BTChipException as e:
if e.sw == 0x6985:
self.close()
self.handler.get_setup()
# Acquire the new client on the next run
else:
raise e
if (
self.has_detached_pin_support(self.dongleObject)
and not self.is_pin_validated(self.dongleObject)
and (self.handler is not None)
):
remaining_attempts = self.dongleObject.getVerifyPinRemainingAttempts()
if remaining_attempts != 1:
msg = _("Enter your {} PIN - remaining attempts: {}").format(
self.device, remaining_attempts
)
else:
msg = _(
"Enter your {} PIN - WARNING: LAST ATTEMPT. If the PIN is not"
" correct, the {} will be wiped."
).format(self.device, self.device)
confirmed, p, pin = self.password_dialog(msg)
if not confirmed:
raise Exception(
_(
"Aborted by user - please unplug the {hw_device_name} and"
" plug it in again before retrying"
).format(hw_device_name=self.device)
)
pin = pin.encode()
self.dongleObject.verifyPin(pin)
gwpkArgSpecs = inspect.getfullargspec(self.dongleObject.getWalletPublicKey)
self.cashaddrSWSupported = "cashAddr" in gwpkArgSpecs.args
except BTChipException as e:
if e.sw == 0x6FAA:
raise Exception(
_(
"{hw_device_name} is temporarily locked - please unplug and"
" plug it in again.\n\nIf this problem persists please exit and"
" restart the Bitcoin Cash application running on the"
" device.\n\nYou may also need to re-open this wallet window as"
" well."
).format(hw_device_name=self.device)
) from e
if (e.sw & 0xFFF0) == 0x63C0:
raise Exception(
_(
"Invalid PIN - please unplug the {hw_device_name} and plug it"
" in again before retrying"
).format(hw_device_name=self.device)
) from e
if e.sw == 0x6F00 and e.message == "Invalid channel":
# based on docs 0x6f00 might be a more general error, hence we also compare message to be sure
raise Exception(
_("Invalid channel.")
+ "\n"
+ _(
"Please make sure that 'Browser support' is disabled on"
" your {}."
).format(self.device)
) from e
if e.sw == 0x6702:
# This happens with firmware/BTC/BCH apps > 2.0.2 when the user didn't
# open the BCH app
raise Exception(
_("Open the BCH app")
+ "\n"
+ _("Please make sure that the BCH app is open on your device.")
) from e
raise e
def checkDevice(self):
if not self.preflightDone:
try:
self.perform_hw1_preflight()
except BTChipException as e:
if e.sw == 0x6D00 or e.sw == 0x6700:
raise RuntimeError(
_("{} not in Bitcoin Cash mode").format(self.device)
) from e
raise e
self.preflightDone = True
def password_dialog(self, msg=None):
response = self.handler.get_word(msg)
if response is None:
return False, None, None
return True, response, response
class LedgerKeyStore(HardwareKeyStore):
hw_type = "ledger"
device = "Ledger"
def __init__(self, d):
HardwareKeyStore.__init__(self, d)
# Errors and other user interaction is done through the wallet's
# handler. The handler is per-window and preserved across
# device reconnects
self.force_watching_only = False
self.signing = False
self.cfg = d.get("cfg", {"mode": 0})
def dump(self):
obj = HardwareKeyStore.dump(self)
obj["cfg"] = self.cfg
return obj
def get_derivation(self):
return self.derivation
def get_client(self):
return self.plugin.get_client(self).dongleObject
def get_client_electrum(self):
return self.plugin.get_client(self)
def give_error(self, message, clear_client=False):
print_error(message)
if not self.signing:
self.handler.show_error(message)
else:
self.signing = False
if clear_client:
self.client = None
raise Exception(message)
def set_and_unset_signing(func):
"""Function decorator to set and unset self.signing."""
def wrapper(self, *args, **kwargs):
try:
self.signing = True
return func(self, *args, **kwargs)
finally:
self.signing = False
return wrapper
def address_id_stripped(self, address):
# Strip the leading "m/"
change, index = self.get_address_index(address)
derivation = self.derivation
address_path = f"{derivation:s}/{change:d}/{index:d}"
return address_path[2:]
def decrypt_message(self, pubkey, message, password):
raise RuntimeError(
_("Encryption and decryption are currently not supported for {}").format(
self.device
)
)
@test_pin_unlocked
@set_and_unset_signing
def sign_message(self, sequence, message, password, sigtype=SignatureType.BITCOIN):
if sigtype == SignatureType.ECASH:
raise RuntimeError(
_("eCash message signing is not available for {}").format(self.device)
)
message = message.encode("utf8")
message_hash = hashlib.sha256(message).hexdigest().upper()
# prompt for the PIN before displaying the dialog if necessary
self.get_client()
address_path = self.get_derivation()[2:] + "/{:d}/{:d}".format(*sequence)
self.handler.show_message(
_("Signing message...") + "\n" + _("Message hash: {}").format(message_hash)
)
try:
info = self.get_client().signMessagePrepare(address_path, message)
pin = ""
if info["confirmationNeeded"]:
pin = self.handler.get_auth(
info
) # does the authenticate dialog and returns pin
if not pin:
raise UserWarning(_("Cancelled by user"))
pin = str(pin).encode()
signature = self.get_client().signMessageSign(pin)
except BTChipException as e:
if e.sw == 0x6A80:
self.give_error(
_(
"Unfortunately, this message cannot be signed by the {}. Only"
" alphanumerical messages shorter than 140 characters are"
" supported. Please remove any extra characters (tab, carriage"
" return) and retry."
).format(self.device)
)
elif e.sw == 0x6985: # cancelled by user
return b""
elif e.sw == 0x6982:
raise # pin lock. decorator will catch it
else:
self.give_error(e, True)
except UserWarning:
self.handler.show_error(_("Cancelled by user"))
return b""
except Exception as e:
self.give_error(e, True)
finally:
self.handler.finished()
# Parse the ASN.1 signature
rLength = signature[3]
r = signature[4 : 4 + rLength]
sLength = signature[4 + rLength + 1]
s = signature[4 + rLength + 2 :]
if rLength == 33:
r = r[1:]
if sLength == 33:
s = s[1:]
# And convert it
# Pad r and s points with 0x00 bytes when the point is small to get valid signature.
r_padded = bytes([0x00]) * (32 - len(r)) + r
s_padded = bytes([0x00]) * (32 - len(s)) + s
return bytes([27 + 4 + (signature[0] & 0x01)]) + r_padded + s_padded
@test_pin_unlocked
@set_and_unset_signing
def sign_transaction(self, tx: Transaction, password, *, use_cache=False):
if tx.is_complete():
return
inputs = []
inputsPaths = []
pubKeys = []
chipInputs = []
redeemScripts = []
signatures = []
changePath = ""
output = None
p2shTransaction = False
pin = ""
# prompt for the PIN before displaying the dialog if necessary
client_ledger = self.get_client()
client_electrum = self.get_client_electrum()
assert client_electrum
# Fetch inputs of the transaction to sign
derivations = self.get_tx_derivations(tx)
- for txin in tx.inputs():
- if txin["type"] == "coinbase":
- self.give_error(_("Coinbase not supported")) # should never happen
+ for txin in tx.txinputs():
+ if txin.type == ScriptType.coinbase:
+ # should never happen
+ self.give_error(_("Coinbase not supported"))
- if txin["type"] in ["p2sh"]:
+ if txin.type == ScriptType.p2sh:
p2shTransaction = True
- pubkeys, x_pubkeys = tx.get_sorted_pubkeys(txin)
+ pubkeys, x_pubkeys = txin.get_sorted_pubkeys()
for i, x_pubkey in enumerate(x_pubkeys):
if x_pubkey in derivations:
signingPos = i
s = derivations.get(x_pubkey)
hwAddress = "{:s}/{:d}/{:d}".format(
self.get_derivation()[2:], s[0], s[1]
)
break
else:
self.give_error(
_("No matching x_key for sign_transaction")
) # should never happen
- redeemScript = Transaction.get_preimage_script(txin)
+ redeemScript = txin.get_preimage_script()
inputs.append(
[
- txin["prev_tx"].raw.hex(),
- txin["prevout_n"],
- redeemScript,
- txin["prevout_hash"],
+ txin.get_prev_tx().raw.hex(),
+ txin.outpoint.n,
+ redeemScript.hex(),
+ txin.outpoint.txid.to_string(),
signingPos,
- txin.get("sequence", DEFAULT_TXIN_SEQUENCE),
+ txin.sequence,
]
)
inputsPaths.append(hwAddress)
pubKeys.append(pubkeys)
# Sanity check
if p2shTransaction:
- for txin in tx.inputs():
- if txin["type"] != "p2sh":
+ for txin in tx.txinputs():
+ if txin.type != ScriptType.p2sh:
self.give_error(
_(
"P2SH / regular input mixed in same transaction not"
" supported"
)
) # should never happen
txOutput = serialize_sequence(tx.outputs())
if not client_electrum.supports_multi_output():
if len(tx.outputs()) > 2:
self.give_error(
_(
"Transaction with more than 2 outputs not supported by {}"
).format(self.device)
)
for o in tx.outputs():
if client_electrum.is_hw1():
if not o.type == TYPE_ADDRESS:
self.give_error(
_("Only address outputs are supported by {}").format(
self.device
)
)
else:
if o.type not in [TYPE_ADDRESS, TYPE_SCRIPT]:
self.give_error(
_("Only address and script outputs are supported by {}").format(
self.device
)
)
if o.type == TYPE_SCRIPT:
try:
# Ledger has a maximum output size of 200 bytes:
# https://github.com/LedgerHQ/ledger-app-btc/commit/3a78dee9c0484821df58975803e40d58fbfc2c38#diff-c61ccd96a6d8b54d48f54a3bc4dfa7e2R26
# which gives us a maximum OP_RETURN payload size of
# 187 bytes. It also apparently has no limit on
# max_pushes, so we specify max_pushes=None so as
# to bypass that check.
validate_op_return_output_and_get_data(
o.destination, max_size=187, max_pushes=None
)
except RuntimeError as e:
self.give_error("{}: {}".format(self.device, str(e)))
# - only one output and one change is authorized (for hw.1 and nano)
# - at most one output can bypass confirmation (~change) (for all)
if not p2shTransaction:
if not client_electrum.supports_multi_output():
if len(tx.outputs()) > 2:
self.give_error(
_(
"Transaction with more than 2 outputs not supported by {}"
).format(self.device)
)
has_change = False
any_output_on_change_branch = is_any_tx_output_on_change_branch(tx)
for o in tx.outputs():
info = tx.output_info.get(o.destination)
if (info is not None) and len(tx.outputs()) > 1 and not has_change:
index, xpubs, m, script_type = info
on_change_branch = index[0] == 1
# prioritise hiding outputs on the 'change' branch from user
# because no more than one change address allowed
if on_change_branch == any_output_on_change_branch:
changePath = self.get_derivation()[2:] + "/{:d}/{:d}".format(
*index
)
has_change = True
else:
output = o.destination
else:
output = o.destination
try:
# Get trusted inputs from the original transactions
for input_idx, utxo in enumerate(inputs):
self.handler.show_message(
_("Preparing transaction inputs...")
+ f" (phase1, {input_idx}/{len(inputs)})"
)
sequence = utxo[5].to_bytes(4, "little").hex()
if not client_electrum.requires_trusted_inputs():
txtmp = bitcoinTransaction(bfh(utxo[0]))
tmp = bfh(utxo[3])[::-1]
tmp += utxo[1].to_bytes(4, "little")
tmp += txtmp.outputs[utxo[1]].amount
chipInputs.append(
{"value": tmp, "witness": True, "sequence": sequence}
)
redeemScripts.append(bfh(utxo[2]))
else:
txtmp = bitcoinTransaction(bfh(utxo[0]))
trustedInput = client_ledger.getTrustedInput(txtmp, utxo[1])
trustedInput["sequence"] = sequence
trustedInput["witness"] = True
chipInputs.append(trustedInput)
if p2shTransaction:
redeemScripts.append(bfh(utxo[2]))
else:
redeemScripts.append(txtmp.outputs[utxo[1]].script)
self.handler.show_message(_("Confirm Transaction on your Ledger device..."))
# Sign all inputs
inputIndex = 0
client_ledger.enableAlternate2fa(False)
cashaddr = Address.FMT_UI == Address.FMT_CASHADDR
if cashaddr and client_electrum.supports_cashaddr():
# For now the Ledger will show a bitcoincash: CashAddr
client_ledger.startUntrustedTransaction(
True,
inputIndex,
chipInputs,
redeemScripts[inputIndex],
version=tx.version,
cashAddr=True,
)
else:
client_ledger.startUntrustedTransaction(
True,
inputIndex,
chipInputs,
redeemScripts[inputIndex],
version=tx.version,
)
# we don't set meaningful outputAddress, amount and fees
# as we only care about the alternateEncoding==True branch
outputData = client_ledger.finalizeInput(
b"", 0, 0, changePath, tx.serialize(True)
)
outputData["outputData"] = txOutput
if outputData["confirmationNeeded"]:
outputData["address"] = output
self.handler.finished()
pin = self.handler.get_auth(
outputData
) # does the authenticate dialog and returns pin
if not pin:
raise UserWarning()
self.handler.show_message(_("Confirmed. Signing Transaction..."))
while inputIndex < len(inputs):
self.handler.show_message(
_("Signing transaction...")
+ f" (phase2, {inputIndex}/{len(inputs)})"
)
singleInput = [chipInputs[inputIndex]]
if cashaddr and client_electrum.supports_cashaddr():
client_ledger.startUntrustedTransaction(
False,
0,
singleInput,
redeemScripts[inputIndex],
version=tx.version,
cashAddr=True,
)
else:
client_ledger.startUntrustedTransaction(
False,
0,
singleInput,
redeemScripts[inputIndex],
version=tx.version,
)
inputSignature = client_ledger.untrustedHashSign(
inputsPaths[inputIndex], pin, lockTime=tx.locktime, sighashType=0x41
)
inputSignature[0] = 0x30 # force for 1.4.9+
signatures.append(inputSignature)
inputIndex = inputIndex + 1
except UserWarning:
self.handler.show_error(_("Cancelled by user"))
return
except BTChipException as e:
if e.sw in (0x6985, 0x6D00): # cancelled by user
return
elif e.sw == 0x6982:
raise # pin lock. decorator will catch it
else:
traceback.print_exc(file=sys.stderr)
self.give_error(e, True)
except Exception as e:
traceback.print_exc(file=sys.stdout)
self.give_error(e, True)
finally:
self.handler.finished()
- for i, txin in enumerate(tx.inputs()):
+ for i, txin in enumerate(tx.txinputs()):
signingPos = inputs[i][4]
- txin["signatures"][signingPos] = bh2u(signatures[i])
+ txin.update_signature(signatures[i], signingPos)
tx.raw = tx.serialize()
@test_pin_unlocked
@set_and_unset_signing
def show_address(self, sequence):
client = self.get_client()
# prompt for the PIN before displaying the dialog if necessary
address_path = self.get_derivation()[2:] + "/{:d}/{:d}".format(*sequence)
self.handler.show_message(_("Showing address on {}...").format(self.device))
try:
if (
Address.FMT_UI == Address.FMT_CASHADDR
and self.get_client_electrum().supports_cashaddr()
):
# For now the Ledger will show a bitcoincash: CashAddr
client.getWalletPublicKey(
address_path, showOnScreen=True, cashAddr=True
)
else:
client.getWalletPublicKey(address_path, showOnScreen=True)
except BTChipException as e:
if e.sw == 0x6985: # cancelled by user
pass
elif e.sw == 0x6982:
raise # pin lock. decorator will catch it
elif e.sw == 0x6B00: # hw.1 raises this
self.handler.show_error(
"{}\n{}\n{}".format(
_("Error showing address") + ":",
e,
_(
"Your {} might not have support for this functionality."
).format(self.device),
)
)
else:
traceback.print_exc(file=sys.stderr)
self.handler.show_error(e)
except Exception as e:
traceback.print_exc(file=sys.stderr)
self.handler.show_error(e)
finally:
self.handler.finished()
class LedgerPlugin(HWPluginBase):
libraries_available = BTCHIP
keystore_class = LedgerKeyStore
client = None
DEVICE_IDS = [
(0x2581, 0x1807), # HW.1 legacy btchip
(0x2581, 0x2B7C), # HW.1 transitional production
(0x2581, 0x3B7C), # HW.1 ledger production
(0x2581, 0x4B7C), # HW.1 ledger test
(0x2C97, 0x0000), # Blue
(0x2C97, 0x0011), # Blue app-bitcoin >= 1.5.1
(0x2C97, 0x0015), # Blue app-bitcoin >= 1.5.1
(0x2C97, 0x0001), # Nano-S
(0x2C97, 0x1011), # Nano-S app-bitcoin >= 1.5.1
(0x2C97, 0x1015), # Nano-S app-bitcoin >= 1.5.1
(0x2C97, 0x0004), # Nano-X
(0x2C97, 0x4011), # Nano-X app-bitcoin >= 1.5.1
(0x2C97, 0x4015), # Nano-X app-bitcoin >= 1.5.1
(0x2C97, 0x0005), # Nano-S-Plus
(0x2C97, 0x5011), # Nano-S-Plus app-bitcoin >= 1.5.1
(0x2C97, 0x5015), # Nano-S-Plus app-bitcoin >= 1.5.1
]
VENDOR_IDS = (0x2C97,)
LEDGER_MODEL_IDS = {
0x10: "Ledger Nano S",
0x40: "Ledger Nano X",
0x50: "Ledger Nano S Plus",
}
def __init__(self, parent, config, name):
HWPluginBase.__init__(self, parent, config, name)
if not self.libraries_available:
return
# to support legacy devices and legacy firmwares
self.device_manager().register_devices(self.DEVICE_IDS, plugin=self)
# to support modern firmware
self.device_manager().register_vendor_ids(self.VENDOR_IDS, plugin=self)
@classmethod
def _recognize_device(cls, product_key) -> Tuple[bool, Optional[str]]:
"""Returns (can_recognize, model_name) tuple."""
# legacy product_keys
if product_key in cls.DEVICE_IDS:
if product_key[0] == 0x2581:
return True, "Ledger HW.1"
if product_key == (0x2C97, 0x0000):
return True, "Ledger Blue"
if product_key == (0x2C97, 0x0001):
return True, "Ledger Nano S"
if product_key == (0x2C97, 0x0004):
return True, "Ledger Nano X"
if product_key == (0x2C97, 0x0005):
return True, "Ledger Nano S Plus"
return True, None
# modern product_keys
if product_key[0] == 0x2C97:
product_id = product_key[1]
model_id = product_id >> 8
if model_id in cls.LEDGER_MODEL_IDS:
model_name = cls.LEDGER_MODEL_IDS[model_id]
return True, model_name
# give up
return False, None
def can_recognize_device(self, device: Device) -> bool:
return self._recognize_device(device.product_key)[0]
@classmethod
def device_name_from_product_key(cls, product_key) -> Optional[str]:
return cls._recognize_device(product_key)[1]
def create_device_from_hid_enumeration(self, d, *, product_key):
device = super().create_device_from_hid_enumeration(d, product_key=product_key)
if not self.can_recognize_device(device):
return None
return device
def get_btchip_device(self, device):
ledger = False
if device.product_key[0] == 0x2581 and device.product_key[1] == 0x3B7C:
ledger = True
if device.product_key[0] == 0x2581 and device.product_key[1] == 0x4B7C:
ledger = True
if device.product_key[0] == 0x2C97:
if device.interface_number == 0 or device.usage_page == 0xFFA0:
ledger = True
else:
return None # non-compatible interface of a nano s or blue
with self.device_manager().hid_lock:
dev = hid.device()
dev.open_path(device.path)
dev.set_nonblocking(True)
return HIDDongleHIDAPI(dev, ledger, BTCHIP_DEBUG)
def create_client(self, device, handler):
self.handler = handler
client = self.get_btchip_device(device)
if client is not None:
client = LedgerClient(client, product_key=device.product_key, plugin=self)
return client
def setup_device(self, device_info, wizard, purpose):
device_id = device_info.device.id_
client = self.scan_and_create_client_for_device(
device_id=device_id, wizard=wizard
)
wizard.run_task_without_blocking_gui(
task=lambda: client.get_xpub("m/44'/0'", "standard")
) # TODO replace by direct derivation once Nano S > 1.1
return client
def get_xpub(self, device_id, derivation, xtype, wizard):
client = self.scan_and_create_client_for_device(
device_id=device_id, wizard=wizard
)
client.checkDevice()
xpub = client.get_xpub(derivation, xtype)
return xpub
def get_client(self, keystore, force_pair=True):
# All client interaction should not be in the main GUI thread
# assert self.main_thread != threading.current_thread()
devmgr = self.device_manager()
handler = keystore.handler
client = devmgr.client_for_keystore(self, handler, keystore, force_pair)
# returns the client for a given keystore. can use xpub
# if client:
# client.used()
if client is not None:
client.checkDevice()
return client
def show_address(self, wallet, address):
sequence = wallet.get_address_index(address)
wallet.get_keystore().show_address(sequence)
diff --git a/electrum/electrumabc_plugins/satochip/satochip.py b/electrum/electrumabc_plugins/satochip/satochip.py
index fabc7bfc7..bfba1b5b2 100644
--- a/electrum/electrumabc_plugins/satochip/satochip.py
+++ b/electrum/electrumabc_plugins/satochip/satochip.py
@@ -1,989 +1,995 @@
import hashlib
import logging
import traceback
from os import urandom
from struct import pack
import mnemonic
# electrumabc
from electrumabc import networks
-from electrumabc.bitcoin import Hash, SignatureType, hash_160, serialize_xpub
+from electrumabc.bitcoin import (
+ Hash,
+ ScriptType,
+ SignatureType,
+ hash_160,
+ serialize_xpub,
+)
from electrumabc.constants import PROJECT_NAME
from electrumabc.i18n import _
from electrumabc.keystore import HardwareKeyStore
from electrumabc.mnemo import (
MnemonicElectrum,
bip39_mnemonic_to_seed,
is_seed,
seed_type_name,
)
from electrumabc.plugins import Device
from electrumabc.printerror import PrintError, is_verbose, print_error
from electrumabc.serialize import serialize_sequence
from electrumabc.transaction import Transaction
from electrumabc.wallet import StandardWallet
from electrumabc_gui.qt.qrcodewidget import QRDialog
from ..hw_wallet.plugin import HardwareClientBase, HWPluginBase
try:
# pysatochip
from pysatochip.CardConnector import CardConnector, UninitializedSeedError, logger
from pysatochip.ecc import (
CURVE_ORDER,
ECPubkey,
der_sig_from_r_and_s,
get_r_and_s_from_der_sig,
)
from pysatochip.Satochip2FA import Satochip2FA
from pysatochip.version import (
SATOCHIP_PROTOCOL_MAJOR_VERSION,
SATOCHIP_PROTOCOL_MINOR_VERSION,
SATOCHIP_PROTOCOL_VERSION,
)
from smartcard.CardRequest import CardRequest
from smartcard.CardType import AnyCardType
from smartcard.Exceptions import CardRequestTimeoutException
LIBS_AVAILABLE = True
except ImportError:
LIBS_AVAILABLE = False
print_error(
"[satochip] failed to import requisite libraries, please install the 'pyscard'"
" and 'pysatochip' libraries"
)
print_error("[satochip] satochip will not not be available")
raise
logging.basicConfig(
level=logging.DEBUG, format="%(levelname)s [%(module)s] %(funcName)s | %(message)s"
)
# debug: smartcard reader ids
SATOCHIP_VID = 0x096E
SATOCHIP_PID = 0x0503
MSG_USE_2FA = _(
"Do you want to use 2-Factor-Authentication (2FA)?\n\nWith 2FA, "
"any transaction must be confirmed on a second device such as your "
"smartphone. First you have to install the Satochip-2FA android app on "
"google play. Then you have to pair your 2FA device with your Satochip "
"by scanning the qr-code on the next screen.\n\nWarning: be sure to backup "
"a copy of the qr-code in a safe place, in case you have to reinstall the app!"
)
def bip32path2bytes(bip32path: str) -> (int, bytes):
splitPath = bip32path.split("/")
splitPath = [x for x in splitPath if x] # removes empty values
if splitPath and splitPath[0] == "m":
splitPath = splitPath[1:]
# bip32path = bip32path[2:]
bytePath = b""
depth = len(splitPath)
for index in splitPath:
if index.endswith("'"):
bytePath += pack(">I", int(index.rstrip("'")) + 0x80000000)
else:
bytePath += pack(">I", int(index))
return depth, bytePath
class SatochipClient(HardwareClientBase, PrintError):
def __init__(self, plugin, handler):
HardwareClientBase.__init__(self, plugin=plugin)
if not LIBS_AVAILABLE:
self.print_error("** No libraries available")
return
self.print_error("__init__()")
self.device = plugin.device
self.handler = handler
# self.parser= CardDataParser()
# self.cc= CardConnector(self, _logger.getEffectiveLevel())
self.cc = CardConnector(self)
if is_verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
def __repr__(self):
return "<SatochipClient TODO>"
def is_pairable(self):
return LIBS_AVAILABLE
def close(self):
self.print_error("close()")
self.cc.card_disconnect()
self.cc.cardmonitor.deleteObserver(self.cc.cardobserver)
def timeout(self, cutoff):
pass
def is_initialized(self):
return LIBS_AVAILABLE
def has_usable_connection_with_device(self):
self.print_error("has_usable_connection_with_device()")
try:
atr = self.cc.card_get_ATR()
self.print_error("Card ATR: " + bytes(atr).hex())
except Exception as e:
self.print_error(
f"Exception in has_usable_connection_with_device: {str(e)}"
)
return False
return True
def get_xpub(self, bip32_path, xtype):
assert xtype in SatochipPlugin.SUPPORTED_XTYPES
try:
hex_authentikey = self.handler.win.wallet.storage.get("authentikey")
self.print_error(
"get_xpub(): self.handler.win.wallet.storage.authentikey:",
hex_authentikey,
)
if hex_authentikey is not None:
self.cc.parser.authentikey_from_storage = ECPubkey(
bytes.fromhex(hex_authentikey)
)
except Exception as e: # attributeError?
self.print_error(
"get_xpub(): exception when getting authentikey from"
" self.handler.win.wallet.storage:",
str(e),
)
try:
# needs PIN
self.cc.card_verify_PIN()
# bip32_path is of the form 44'/0'/1'
self.print_error("[get_xpub(): bip32_path = ", bip32_path)
(depth, bytepath) = bip32path2bytes(bip32_path)
(childkey, childchaincode) = self.cc.card_bip32_get_extendedkey(bytepath)
if depth == 0: # masterkey
fingerprint = bytes([0, 0, 0, 0])
child_number = bytes([0, 0, 0, 0])
else: # get parent info
(parentkey, parentchaincode) = self.cc.card_bip32_get_extendedkey(
bytepath[0:-4]
)
fingerprint = hash_160(parentkey.get_public_key_bytes(compressed=True))[
0:4
]
child_number = bytepath[-4:]
# xpub= serialize_xpub('standard', childchaincode, childkey.get_public_key_bytes(compressed=True), depth, fingerprint, child_number)
xpub = serialize_xpub(
xtype,
childchaincode,
childkey.get_public_key_bytes(compressed=True),
depth,
fingerprint,
child_number,
)
self.print_error("SatochipClient: get_xpub(): xpub=", xpub)
return xpub
# return BIP32Node(xtype=xtype,
# eckey=childkey,
# chaincode=childchaincode,
# depth=depth,
# fingerprint=fingerprint,
# child_number=child_number).to_xpub()
except Exception as e:
self.print_error(repr(e))
return None
def ping_check(self):
# check connection is working
try:
print("ping_check") # debug
# atr= self.cc.card_get_ATR()
except Exception as e:
self.print_error(repr(e))
raise RuntimeError("Communication issue with Satochip")
def request(self, request_type, *args):
self.print_error("client request: " + str(request_type))
if self.handler is not None:
if request_type == "update_status":
reply = self.handler.update_status(*args)
return reply
elif request_type == "show_error":
reply = self.handler.show_error(*args)
return reply
elif request_type == "show_message":
reply = self.handler.show_message(*args)
return reply
else:
reply = self.handler.show_error("Unknown request: " + str(request_type))
return reply
else:
self.print_error("self.handler is None! ")
return None
# try:
# method_to_call = getattr(self.handler, request_type)
# print('Type of method_to_call: '+ str(type(method_to_call)))
# print('method_to_call: '+ str(method_to_call))
# reply = method_to_call(*args)
# return reply
# except Exception as e:
# _logger.exception(f"Exception: {str(e)}")
# raise RuntimeError("GUI exception")
def PIN_dialog(self, msg):
while True:
password = self.handler.get_passphrase(msg, False)
if password is None:
return False, None
if len(password) < 4:
msg = (
_("PIN must have at least 4 characters.") + "\n\n" + _("Enter PIN:")
)
elif len(password) > 16:
msg = (
_("PIN must have less than 16 characters.")
+ "\n\n"
+ _("Enter PIN:")
)
else:
password = password.encode("utf8")
return True, password
def PIN_setup_dialog(self, msg, msg_confirm, msg_error):
while True:
(is_PIN, pin) = self.PIN_dialog(msg)
if not is_PIN:
# return (False, None)
raise RuntimeError("A PIN code is required to initialize the Satochip!")
(is_PIN, pin_confirm) = self.PIN_dialog(msg_confirm)
if not is_PIN:
# return (False, None)
raise RuntimeError(
"A PIN confirmation is required to initialize the Satochip!"
)
if pin != pin_confirm:
self.request("show_error", msg_error)
else:
return (is_PIN, pin)
def PIN_change_dialog(
self, msg_oldpin, msg_newpin, msg_confirm, msg_error, msg_cancel
):
# old pin
(is_PIN, oldpin) = self.PIN_dialog(msg_oldpin)
if not is_PIN:
self.request("show_message", msg_cancel)
return (False, None, None)
# new pin
while True:
(is_PIN, newpin) = self.PIN_dialog(msg_newpin)
if not is_PIN:
self.request("show_message", msg_cancel)
return (False, None, None)
(is_PIN, pin_confirm) = self.PIN_dialog(msg_confirm)
if not is_PIN:
self.request("show_message", msg_cancel)
return (False, None, None)
if newpin != pin_confirm:
self.request("show_error", msg_error)
else:
return (True, oldpin, newpin)
class SatochipKeyStore(HardwareKeyStore):
hw_type = "satochip"
device = "Satochip"
def __init__(self, d):
HardwareKeyStore.__init__(self, d)
# print_error("[satochip] Satochip_KeyStore: __init__(): xpub:"+str(d.get('xpub')) )#debugSatochip
# print_error("[satochip] Satochip_KeyStore: __init__(): derivation"+str(d.get('derivation')))#debugSatochip
self.force_watching_only = False
self.ux_busy = False
def dump(self):
# our additions to the stored data about keystore -- only during creation?
d = HardwareKeyStore.dump(self)
return d
def get_derivation(self):
return self.derivation
def get_client(self):
# called when user tries to do something like view address, sign something.
# - not called during probing/setup
rv = self.plugin.get_client(self)
return rv
def give_error(self, message, clear_client=False):
self.print_error(message)
if not self.ux_busy:
self.handler.show_error(message)
else:
self.ux_busy = False
if clear_client:
self.client = None
raise Exception(message)
def decrypt_message(self, pubkey, message, password):
raise RuntimeError(
_("Encryption and decryption are currently not supported for {}").format(
self.device
)
)
def sign_message(self, sequence, message, password, sigtype=SignatureType.ECASH):
message_byte = message.encode("utf8")
message_hash = hashlib.sha256(message_byte).hexdigest().upper()
client = self.get_client()
address_path = self.get_derivation()[2:] + "/%d/%d" % sequence
self.print_error("debug: sign_message: path: " + address_path)
self.handler.show_message(
"Signing message ...\r\nMessage hash: " + message_hash
)
# check if 2FA is required
hmac = b""
if client.cc.needs_2FA is None:
(response, sw1, sw2, d) = client.cc.card_get_status()
if client.cc.needs_2FA:
# challenge based on sha256(btcheader+msg)
# format & encrypt msg
import json
msg = {"action": "sign_msg", "msg": message}
msg = json.dumps(msg)
(id_2FA, msg_out) = client.cc.card_crypt_transaction_2FA(msg, True)
d = {}
d["msg_encrypt"] = msg_out
d["id_2FA"] = id_2FA
# print_error("encrypted message: "+msg_out)
self.print_error("id_2FA: " + id_2FA)
# do challenge-response with 2FA device...
self.handler.show_message(
"2FA request sent! Approve or reject request on your second device."
)
Satochip2FA.do_challenge_response(d)
# decrypt and parse reply to extract challenge response
try:
reply_encrypt = d["reply_encrypt"]
except Exception:
# Note the below give_error call will itself raise Message. :/
self.give_error(
"No response received from 2FA.\nPlease ensure that the"
" Satochip-2FA plugin is enabled in Tools>Optional Features",
True,
)
return
reply_decrypt = client.cc.card_crypt_transaction_2FA(reply_encrypt, False)
self.print_error("challenge:response= " + reply_decrypt)
reply_decrypt = reply_decrypt.split(":")
chalresponse = reply_decrypt[1]
hmac = bytes.fromhex(chalresponse)
try:
# path= self.get_derivation() + ("/%d/%d" % sequence)
keynbr = 0xFF # for extended key
(depth, bytepath) = bip32path2bytes(address_path)
(pubkey, chaincode) = client.cc.card_bip32_get_extendedkey(bytepath)
# (response2, sw1, sw2) = client.cc.card_sign_message(keynbr, message_byte, hmac)
# if (sw1!=0x90 or sw2!=0x00):
# _logger.info("[satochip] SatochipPlugin: error during sign_message(): sw12="+hex(sw1)+" "+hex(sw2))#debugSatochip
# compsig=b''
# self.handler.show_error(_("Wrong signature!\nThe 2FA device may have rejected the action."))
# else:
# compsig=client.parser.parse_message_signature(response2, message_byte, pubkey)
altcoin = "eCash" if sigtype == SignatureType.ECASH else None
(response2, sw1, sw2, compsig) = client.cc.card_sign_message(
keynbr, pubkey, message_byte, hmac, altcoin=altcoin
)
if compsig == b"":
self.handler.show_error(
_("Wrong signature!\nThe 2FA device may have rejected the action.")
)
except Exception as e:
self.give_error(e, True)
finally:
self.handler.finished()
return compsig
def sign_transaction(self, tx: Transaction, password, *, use_cache=False):
self.print_error("sign_transaction(): tx: " + str(tx))
client = self.get_client()
# outputs
txOutputs = serialize_sequence(tx.outputs()).hex()
self.print_error("sign_transaction(): outputs= ", txOutputs)
# Fetch inputs of the transaction to sign
derivations = self.get_tx_derivations(tx)
- for i, txin in enumerate(tx.inputs()):
+ for i, txin in enumerate(tx.txinputs()):
self.print_error("sign_transaction(): input =", i)
- self.print_error("sign_transaction(): input[type]:", txin["type"])
- if txin["type"] == "coinbase":
- self.give_error("Coinbase not supported") # should never happen
+ self.print_error("sign_transaction(): input[type]:", txin.type)
+ if txin.type == ScriptType.coinbase:
+ # should never happen
+ self.give_error("Coinbase not supported")
- pubkeys, x_pubkeys = tx.get_sorted_pubkeys(txin)
+ pubkeys, x_pubkeys = txin.get_sorted_pubkeys()
for j, x_pubkey in enumerate(x_pubkeys):
self.print_error("sign_transaction(): forforloop: j=", j)
- if tx.is_txin_complete(txin):
+ if txin.is_complete():
break
if x_pubkey in derivations:
s = derivations.get(x_pubkey)
address_path = "%s/%d/%d" % (self.get_derivation()[2:], s[0], s[1])
# get corresponing extended key
(depth, bytepath) = bip32path2bytes(address_path)
(key, chaincode) = client.cc.card_bip32_get_extendedkey(bytepath)
# parse tx
pre_tx = tx.serialize_preimage(i)
pre_tx_hex = pre_tx.hex()
- pre_hash = Hash(pre_tx_hex)
+ pre_hash = Hash(pre_tx)
pre_hash_hex = pre_hash.hex()
self.print_error("sign_transaction(): pre_tx_hex=", pre_tx_hex)
self.print_error("sign_transaction(): pre_hash=", pre_hash_hex)
# (response, sw1, sw2) = client.cc.card_parse_transaction(pre_tx, True) # use 'True' since BCH use BIP143 as in Segwit...
# print_error('[satochip] sign_transaction(): response= '+str(response)) #debugSatochip
# (tx_hash, needs_2fa) = client.parser.parse_parse_transaction(response)
(
response,
sw1,
sw2,
tx_hash,
needs_2fa,
) = client.cc.card_parse_transaction(
pre_tx, True
) # use 'True' since BCH use BIP143 as in Segwit...
tx_hash_hex = bytearray(tx_hash).hex()
if pre_hash_hex != tx_hash_hex:
raise RuntimeError(
"[Satochip_KeyStore] Tx preimage mismatch:"
f" {pre_hash_hex} vs {tx_hash_hex}"
)
# sign tx
keynbr = 0xFF # for extended key
if needs_2fa:
# format & encrypt msg
import json
coin_type = 145 # see https://github.com/satoshilabs/slips/blob/master/slip-0044.md
test_net = networks.net.TESTNET
msg = {
"action": "sign_tx",
"tx": pre_tx_hex,
"ct": coin_type,
"sw": True,
"tn": test_net,
"txo": txOutputs,
- "ty": txin["type"],
+ "ty": txin.type.name,
}
msg = json.dumps(msg)
(id_2FA, msg_out) = client.cc.card_crypt_transaction_2FA(
msg, True
)
d = {}
d["msg_encrypt"] = msg_out
d["id_2FA"] = id_2FA
# self.print_error("encrypted message: "+msg_out)
self.print_error("id_2FA:", id_2FA)
# do challenge-response with 2FA device...
client.handler.show_message(
"2FA request sent! Approve or reject request on your second"
" device."
)
Satochip2FA.do_challenge_response(d)
# decrypt and parse reply to extract challenge response
try:
reply_encrypt = None # init it in case of exc below
reply_encrypt = d["reply_encrypt"]
except Exception:
# Note: give_error here will raise again.. :/
self.give_error(
"No response received from 2FA.\nPlease ensure that"
" the Satochip-2FA plugin is enabled in"
" Tools>Optional Features",
True,
)
break
if reply_encrypt is None:
# todo: abort tx
break
reply_decrypt = client.cc.card_crypt_transaction_2FA(
reply_encrypt, False
)
self.print_error("challenge:response=", reply_decrypt)
reply_decrypt = reply_decrypt.split(":")
rep_pre_hash_hex = reply_decrypt[0][0:64]
if rep_pre_hash_hex != pre_hash_hex:
# todo: abort tx or retry?
self.print_error(
"Abort transaction: tx mismatch:",
rep_pre_hash_hex,
"!=",
pre_hash_hex,
)
break
chalresponse = reply_decrypt[1]
if chalresponse == "00" * 20:
# todo: abort tx?
self.print_error("Abort transaction: rejected by 2FA!")
break
chalresponse = list(bytes.fromhex(chalresponse))
else:
chalresponse = None
(tx_sig, sw1, sw2) = client.cc.card_sign_transaction(
keynbr, tx_hash, chalresponse
)
# self.print_error('sign_transaction(): sig=', bytes(tx_sig).hex()) #debugSatochip
# todo: check sw1sw2 for error (0x9c0b if wrong challenge-response)
# enforce low-S signature (BIP 62)
tx_sig = bytearray(tx_sig)
r, s = get_r_and_s_from_der_sig(tx_sig)
if s > CURVE_ORDER // 2:
s = CURVE_ORDER - s
tx_sig = der_sig_from_r_and_s(r, s)
# update tx with signature
- tx_sig = tx_sig.hex() + "41"
- # tx.add_signature_to_txin(i,j,tx_sig)
- txin["signatures"][j] = tx_sig
+ tx_sig = tx_sig + b"\x41"
+ txin.update_signature(tx_sig, j)
break
else:
self.give_error(
"No matching x_key for sign_transaction"
) # should never happen
self.print_error("is_complete", tx.is_complete())
tx.raw = tx.serialize()
return
def show_address(self, sequence, txin_type):
self.print_error("show_address(): todo!")
return
class SatochipPlugin(HWPluginBase):
SUPPORTS_XEC_BIP44_DERIVATION = True
libraries_available = LIBS_AVAILABLE
minimum_library = (0, 0, 0)
keystore_class = SatochipKeyStore
DEVICE_IDS = [(SATOCHIP_VID, SATOCHIP_PID)]
SUPPORTED_XTYPES = "standard"
def __init__(self, parent, config, name):
HWPluginBase.__init__(self, parent, config, name)
if not LIBS_AVAILABLE:
return
self.print_error("init()")
# self.libraries_available = self.check_libraries_available() #debugSatochip
# if not self.libraries_available:
# return
# self.device_manager().register_devices(self.DEVICE_IDS, plugin=self)
self.device_manager().register_enumerate_func(self.detect_smartcard_reader)
def get_library_version(self):
return "0.0.1"
def detect_smartcard_reader(self):
self.print_error("detect_smartcard_reader")
self.cardtype = AnyCardType()
try:
cardrequest = CardRequest(timeout=0.1, cardType=self.cardtype)
cardrequest.waitforcard()
self.print_error("detect_smartcard_reader: found card!")
return [
Device(
path="/satochip",
interface_number=-1,
id_="/satochip",
product_key=(SATOCHIP_VID, SATOCHIP_PID),
usage_page=0,
)
]
# transport_ui_string='ccid')]
except CardRequestTimeoutException:
self.print_error("time-out: no card found")
return []
except Exception as exc:
self.print_error(
"Error during connection:", repr(exc), f"\n{traceback.format_exc()}"
)
return []
return []
def create_client(self, device, handler):
self.print_error("create_client()")
if handler:
self.handler = handler
try:
rv = SatochipClient(self, handler)
return rv
except Exception as e:
self.print_error("create_client(): exception:" + str(e))
return None
def setup_device(self, device_info, wizard, purpose):
self.print_error("setup_device()")
if not LIBS_AVAILABLE:
raise RuntimeError("No libraries available")
device_id = device_info.device.id_
client = self.scan_and_create_client_for_device(
device_id=device_id, wizard=wizard
)
client.cc.parser.authentikey_from_storage = None # https://github.com/simpleledger/Electron-Cash-SLP/pull/101#issuecomment-561238614
# check setup
while client.cc.card_present:
# check that card is indeed a Satochip
if client.cc.card_type != "Satochip":
raise Exception(
_("Failed to create a client for this device.")
+ "\n"
+ _("Inserted card is not a Satochip!")
)
(response, sw1, sw2, d) = client.cc.card_get_status()
# check version
if client.cc.setup_done:
v_supported = SATOCHIP_PROTOCOL_VERSION
v_applet = d["protocol_version"]
self.print_error(
"[SatochipPlugin] setup_device(): Satochip"
f" version={hex(v_applet)} Electrum supported version="
f" {hex(v_supported)}"
)
if v_supported < v_applet:
msg = (
_(
"The version of your Satochip is higher than "
f"supported by {PROJECT_NAME}. You should update"
f" {PROJECT_NAME} to ensure correct functioning!"
)
+ "\n"
+ " Satochip version: "
f"{d['protocol_major_version']}.{d['protocol_minor_version']}"
+ "\n"
+ " Supported version: "
f"{SATOCHIP_PROTOCOL_MAJOR_VERSION}.{SATOCHIP_PROTOCOL_MINOR_VERSION}"
)
client.handler.show_error(msg)
if client.cc.needs_secure_channel:
client.cc.card_initiate_secure_channel()
break
# setup device (done only once)
else:
# PIN dialog
msg = _("Enter a new PIN for your Satochip:")
msg_confirm = _("Please confirm the PIN code for your Satochip:")
msg_error = _("The PIN values do not match! Please type PIN again!")
(is_PIN, pin_0) = client.PIN_setup_dialog(msg, msg_confirm, msg_error)
pin_0 = list(pin_0)
client.cc.set_pin(0, pin_0) # cache PIN value in client
pin_tries_0 = 0x05
# PUK code can be used when PIN is unknown and the card is locked
# We use a random value as the PUK is not used currently in the electrum GUI
ublk_tries_0 = 0x01
ublk_0 = list(urandom(16))
pin_tries_1 = 0x01
ublk_tries_1 = 0x01
pin_1 = list(urandom(16))
# the second pin is not used currently
ublk_1 = list(urandom(16))
secmemsize = 32 # number of slot reserved in memory cache
memsize = 0x0000 # RFU
create_object_ACL = 0x01 # RFU
create_key_ACL = 0x01 # RFU
create_pin_ACL = 0x01 # RFU
# setup
(response, sw1, sw2) = client.cc.card_setup(
pin_tries_0,
ublk_tries_0,
pin_0,
ublk_0,
pin_tries_1,
ublk_tries_1,
pin_1,
ublk_1,
secmemsize,
memsize,
create_object_ACL,
create_key_ACL,
create_pin_ACL,
)
if sw1 != 0x90 or sw2 != 0x00:
self.print_error(
"setup_device(): unable to set up applet! sw12="
+ hex(sw1)
+ " "
+ hex(sw2)
)
raise RuntimeError(
"Unable to setup the device with error code:"
+ hex(sw1)
+ " "
+ hex(sw2)
)
# verify pin:
client.cc.card_verify_PIN()
# get authentikey
while client.cc.card_present:
try:
authentikey = client.cc.card_bip32_get_authentikey()
except UninitializedSeedError:
# Option: setup 2-Factor-Authentication (2FA)
if not client.cc.needs_2FA:
use_2FA = False # we put 2FA activation in advanced options as it confuses some users
if use_2FA:
secret_2FA = urandom(20)
# secret_2FA=b'\0'*20 #for debug purpose
secret_2FA_hex = secret_2FA.hex()
# the secret must be shared with the second factor app (eg on a smartphone)
try:
help_txt = (
"Scan the QR-code with your Satochip-2FA app and make a"
" backup of the following secret: " + secret_2FA_hex
)
d = QRDialog(
secret_2FA_hex,
None,
"Scan secret 2FA and save a copy",
show_text=False,
help_text=help_txt,
)
d.exec_()
except Exception as e:
self.print_error("setup_device(): setup 2FA: " + str(e))
return
# further communications will require an id and an encryption key (for privacy).
# Both are derived from the secret_2FA using a one-way function inside the Satochip
amount_limit = 0 # i.e. always use
(response, sw1, sw2) = client.cc.card_set_2FA_key(
secret_2FA, amount_limit
)
if sw1 != 0x90 or sw2 != 0x00:
self.print_error(
"setup_device(): unable to set 2FA! "
f" sw12={hex(sw1)},{hex(sw2)}"
)
raise RuntimeError(
"Unable to setup 2FA with error code:"
f" {hex(sw1)} {hex(sw2)}"
)
# seed dialog...
self.print_error("setup_device(): import seed:")
self.choose_seed(wizard)
seed = list(self.bip32_seed)
authentikey = client.cc.card_bip32_import_seed(seed)
hex_authentikey = authentikey.get_public_key_hex(compressed=True)
self.print_error("setup_device(): authentikey=", hex_authentikey)
wizard.data["authentikey"] = hex_authentikey
self.print_error(
"setup_device(): authentikey from storage=", wizard.data["authentikey"]
)
break
return client
def get_xpub(self, device_id, derivation, xtype, wizard):
# this seems to be part of the pairing process only, not during normal ops?
# base_wizard:on_hw_derivation
self.print_error("get_xpub()")
# if xtype not in self.SUPPORTED_XTYPES:
# raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device))
client = self.scan_and_create_client_for_device(
device_id=device_id, wizard=wizard
)
client.ping_check()
xpub = client.get_xpub(derivation, xtype)
return xpub
def get_client(self, keystore, force_pair=True):
# All client interaction should not be in the main GUI thread
devmgr = self.device_manager()
handler = keystore.handler
client = devmgr.client_for_keystore(self, handler, keystore, force_pair)
# returns the client for a given keystore. can use xpub
# if client:
# client.used()
if client is not None:
client.ping_check()
return client
def show_address(self, wallet, address, keystore=None):
if keystore is None:
keystore = wallet.get_keystore()
if not self.show_address_helper(wallet, address, keystore):
return
# Standard_Wallet => not multisig, must be bip32
if type(wallet) is not StandardWallet:
keystore.handler.show_error(
_(
"This function is only available for standard wallets when"
" using {}."
).format(self.device)
)
return
sequence = wallet.get_address_index(address)
txin_type = wallet.get_txin_type(address)
keystore.show_address(sequence, txin_type)
# create/restore seed during satochip initialization
def choose_seed(self, wizard):
title = _("Create or restore")
message = _(
"Do you want to create a new seed, or to restore a wallet using an existing"
" seed?"
)
choices = [
("create_seed", _("Create a new BIP39 seed")),
("restore_from_seed", _("I already have a BIP39 seed")),
]
wizard.choice_dialog(
title=title, message=message, choices=choices, run_next=wizard.run
)
# create seed
def create_seed(self, wizard):
wizard.seed_type = "bip39"
wizard.opt_bip39 = True
seed = self.to_bip39_mnemonic(128)
wizard.show_seed_dialog(
run_next=lambda x: self.request_passphrase(wizard, seed, x), seed_text=seed
)
def request_passphrase(self, wizard, seed, opt_passphrase):
if opt_passphrase:
wizard.passphrase_dialog(
run_next=lambda x: self.confirm_seed(wizard, seed, x)
)
else:
wizard.run("confirm_seed", seed, "")
def confirm_seed(self, wizard, seed, passphrase):
wizard.confirm_seed_dialog(
run_next=lambda x: self.confirm_passphrase(wizard, seed, passphrase),
test=lambda x: x == seed,
)
def confirm_passphrase(self, wizard, seed, passphrase):
if passphrase:
title = _("Confirm Seed Extension")
message = "\n".join(
[
_("Your seed extension must be saved together with your seed."),
_("Please type it here."),
]
)
wizard.line_dialog(
run_next=lambda x: self.derive_bip39_seed(seed, x),
title=title,
message=message,
default="",
test=lambda x: x == passphrase,
)
else:
self.derive_bip39_seed(seed, "")
# restore from seed
def restore_from_seed(self, wizard):
wizard.opt_bip39 = True
wizard.opt_ext = True
test = is_seed
def f(seed, is_bip39, is_ext):
self.on_restore_seed(wizard, seed, is_bip39, is_ext)
wizard.restore_seed_dialog(run_next=f, test=test)
def on_restore_seed(self, wizard, seed, is_bip39, is_ext):
wizard.seed_type = "bip39" if is_bip39 else seed_type_name(seed)
if wizard.seed_type == "bip39":
def f(passphrase):
self.derive_bip39_seed(seed, passphrase)
wizard.passphrase_dialog(run_next=f) if is_ext else f("")
elif wizard.seed_type in ["standard", "electrum"]:
# warning message as Electrum seed on hardware is not standard and incompatible with other hw
message = " ".join(
[
_(
"You are trying to import an Electrum seed to a Satochip"
" hardware wallet."
),
_(
"\n\nElectrum seeds are not compatible with the BIP39 seeds"
" typically used in hardware wallets."
),
_(
"This means you may have difficulty to import this seed in"
" another wallet in the future."
),
_(
"\n\nProceed with caution! If you are not sure, click on"
" 'Back', enable BIP39 in 'Options' and introduce a BIP39 seed"
" instead."
),
_(
"You can also generate a new random BIP39 seed by clicking on"
" 'Back' twice."
),
]
)
wizard.confirm_dialog("Warning", message, run_next=lambda x: None)
def f(passphrase):
self.derive_bip32_seed(seed, passphrase)
wizard.passphrase_dialog(run_next=f) if is_ext else f("")
elif wizard.seed_type == "old":
raise Exception("Unsupported seed type", wizard.seed_type)
else:
raise Exception("Unknown seed type", wizard.seed_type)
def derive_bip32_seed(self, seed, passphrase):
self.bip32_seed = MnemonicElectrum("en").mnemonic_to_seed(seed, passphrase)
def derive_bip39_seed(self, seed, passphrase):
self.bip32_seed = bip39_mnemonic_to_seed(seed, passphrase)
def to_bip39_mnemonic(self, strength: int) -> str:
return mnemonic.Mnemonic("english").generate(strength=strength)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Wed, May 21, 17:02 (33 m, 53 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5865567
Default Alt Text
(110 KB)
Attached To
rABC Bitcoin ABC
Event Timeline
Log In to Comment