Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F14864828
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
179 KB
Subscribers
None
View Options
diff --git a/electrum/electrumabc/commands.py b/electrum/electrumabc/commands.py
index ecd9c01eb..f1c5fbae9 100644
--- a/electrum/electrumabc/commands.py
+++ b/electrum/electrumabc/commands.py
@@ -1,1582 +1,1582 @@
#
# Electrum ABC - lightweight eCash client
# Copyright (C) 2020 The Electrum ABC developers
# Copyright (C) 2011 thomasv@gitorious
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import argparse
import ast
import base64
import datetime
import getpass
import json
import os
import queue
import sys
import time
from decimal import Decimal as PyDecimal # Qt 5.12 also exports Decimal
from functools import wraps
from . import alias, bitcoin, util, web
from .address import Address, AddressError
from .bitcoin import CASH, TYPE_ADDRESS
from .constants import PROJECT_NAME, SCRIPT_NAME, XEC
from .crypto import hash_160
-from .ecc import public_key_from_private_key, verify_message
+from .ecc import public_key_from_private_key, verify_message_with_address
from .json_util import json_decode
from .mnemo import MnemonicElectrum, make_bip39_words
from .paymentrequest import PR_EXPIRED, PR_PAID, PR_UNCONFIRMED, PR_UNKNOWN, PR_UNPAID
from .plugins import run_hook
from .printerror import print_error
from .simple_config import SimpleConfig
from .transaction import (
OPReturn,
Transaction,
TxInput,
TxOutput,
multisig_script,
rawtx_from_str,
)
from .util import format_satoshis, to_bytes
from .version import PACKAGE_VERSION
from .wallet import create_new_wallet, restore_wallet_from_text
known_commands = {}
def satoshis(amount):
# satoshi conversion must not be performed by the parser
return int(CASH * PyDecimal(amount)) if amount not in ["!", None] else amount
def assertOutpoint(out: str):
"""Perform some basic sanity checks on a string that represents a
transaction outpoint. Namely, 64 characters and a non-negative integer
separated by a colon."""
prevoutParts = out.split(":")
assert len(prevoutParts) == 2, "invalid outpoint"
prevout_hash, prevout_n = prevoutParts
prevout_hash = bytes.fromhex(prevout_hash)
assert len(prevout_hash) == 32, f"{prevout_hash.hex()} should be a 32-byte hash"
assert int(prevout_n) >= 0, f"invalid output index {prevout_n}"
class Command:
def __init__(self, func, s):
self.name = func.__name__
self.requires_network = "n" in s
self.requires_wallet = "w" in s
self.requires_password = "p" in s
self.description = func.__doc__
self.help = self.description.split(".")[0] if self.description else None
varnames = func.__code__.co_varnames[1 : func.__code__.co_argcount]
self.defaults = func.__defaults__
if self.defaults:
n = len(self.defaults)
self.params = list(varnames[:-n])
self.options = list(varnames[-n:])
else:
self.params = list(varnames)
self.options = []
self.defaults = []
def __repr__(self):
return "<Command {}>".format(self)
def __str__(self):
return "{}({})".format(
self.name,
", ".join(
self.params
+ [
"{}={!r}".format(name, self.defaults[i])
for i, name in enumerate(self.options)
]
),
)
def command(s):
def decorator(func):
global known_commands
name = func.__name__
known_commands[name] = Command(func, s)
@wraps(func)
def func_wrapper(*args, **kwargs):
c = known_commands[func.__name__]
wallet = args[0].wallet
network = args[0].network
password = kwargs.get("password")
if c.requires_network and network is None:
raise RuntimeError("Daemon offline") # Same wording as in daemon.py.
if c.requires_wallet and wallet is None:
raise RuntimeError(
f"Wallet not loaded. Use '{SCRIPT_NAME} daemon load_wallet'"
)
if (
c.requires_password
and password is None
and wallet.has_password()
and not kwargs.get("unsigned")
):
return {"error": "Password required"}
return func(*args, **kwargs)
return func_wrapper
return decorator
class Commands:
def __init__(self, config, wallet, network, daemon=None, callback=None):
self.config = config
self.wallet = wallet
self.daemon = daemon
self.network = network
self._callback = callback
def _run(self, method, *args, password_getter=None, **kwargs):
# this wrapper is called from the python console
cmd = known_commands[method]
if cmd.requires_password and self.wallet.has_password():
password = password_getter()
if password is None:
return
else:
password = None
f = getattr(self, method)
if cmd.requires_password:
kwargs.update(password=password)
result = f(*args, **kwargs)
if self._callback:
self._callback()
return result
@staticmethod
def _EnsureDictNamedTuplesAreJSONSafe(d):
"""Address, ScriptOutput and other objects contain bytes. They cannot be serialized
using JSON. This makes sure they get serialized properly by calling .to_ui_string() on them.
See issue #638"""
def DoChk(v):
def ChkList(l_):
for i in range(0, len(l_)):
l_[i] = DoChk(l_[i]) # recurse
return l_
def EncodeNamedTupleObject(nt):
if hasattr(nt, "to_ui_string"):
return nt.to_ui_string()
return nt
if isinstance(v, tuple):
v = EncodeNamedTupleObject(v)
elif isinstance(v, list):
v = ChkList(v) # may recurse
elif isinstance(v, dict):
v = Commands._EnsureDictNamedTuplesAreJSONSafe(v) # recurse
return v
for k in d.keys():
d[k] = DoChk(d[k])
return d
@command("")
def addressconvert(self, address):
"""Convert to/from Legacy <-> Cash Address. Address can be either
a legacy or a Cash Address and both forms will be returned as a JSON
dict."""
try:
addr = Address.from_string(address, support_arbitrary_prefix=True)
except Exception as e:
raise AddressError(f"Invalid address: {address}") from e
return {
"cashaddr": addr.to_full_string(Address.FMT_CASHADDR),
"bitcoincashaddr": addr.to_full_string(Address.FMT_CASHADDR_BCH),
"legacy": addr.to_full_string(Address.FMT_LEGACY),
}
@command("")
def commands(self):
"""List of commands"""
return " ".join(sorted(known_commands.keys()))
@command("n")
def getinfo(self):
"""network info"""
net_params = self.network.get_parameters()
response = {
"path": self.network.config.path,
"server": net_params[0],
"blockchain_height": self.network.get_local_height(),
"server_height": self.network.get_server_height(),
"spv_nodes": len(self.network.get_interfaces()),
"connected": self.network.is_connected(),
"auto_connect": net_params[4],
"version": PACKAGE_VERSION,
"default_wallet": self.config.get_wallet_path(),
"wallets": {k: w.is_up_to_date() for k, w in self.daemon.wallets.items()},
"fee_per_kb": self.config.fee_per_kb(),
}
return response
@command("n")
def stop(self):
"""Stop daemon"""
self.daemon.stop()
return "Daemon stopped"
@command("n")
def list_wallets(self):
"""List wallets open in daemon"""
return [
{"path": k, "synchronized": w.is_up_to_date()}
for k, w in self.daemon.wallets.items()
]
@command("n")
def load_wallet(self):
"""Open wallet in daemon"""
path = self.config.get_wallet_path()
wallet = self.daemon.load_wallet(path, self.config.get("password"))
if wallet is not None:
self.wallet = wallet
response = wallet is not None
return response
@command("n")
def close_wallet(self):
"""Close wallet"""
path = self.config.get_wallet_path()
if path in self.daemon.wallets:
self.daemon.stop_wallet(path)
response = True
else:
response = False
return response
@command("")
def create(
self,
passphrase=None,
password=None,
encrypt_file=True,
seed_type=None,
wallet_path=None,
):
"""Create a new wallet.
If you want to be prompted for an argument, type '?' or ':' (concealed)
"""
d = create_new_wallet(
path=wallet_path,
passphrase=passphrase,
password=password,
encrypt_file=encrypt_file,
seed_type=seed_type,
)
return {
"seed": d["seed"],
"path": d["wallet"].storage.path,
"msg": d["msg"],
}
@command("")
def restore(
self, text, passphrase=None, password=None, encrypt_file=True, wallet_path=None
):
"""Restore a wallet from text. Text can be a seed phrase, a master
public key, a master private key, a list of eCash addresses
or private keys.
If you want to be prompted for an argument, type '?' or ':' (concealed)
"""
d = restore_wallet_from_text(
text,
path=wallet_path,
passphrase=passphrase,
password=password,
encrypt_file=encrypt_file,
config=self.config,
)
return {
"path": d["wallet"].storage.path,
"msg": d["msg"],
}
@command("wp")
def password(self, password=None, new_password=None):
"""Change wallet password."""
b = self.wallet.storage.is_encrypted()
self.wallet.update_password(password, new_password, b)
self.wallet.storage.write()
return {"password": self.wallet.has_password()}
@command("w")
def get(self, key):
"""Return item from wallet storage"""
return self.wallet.storage.get(key)
@command("")
def getconfig(self, key):
"""Return a configuration variable."""
return self.config.get(key)
@classmethod
def _setconfig_normalize_value(cls, key, value):
if key not in ("rpcuser", "rpcpassword"):
value = json_decode(value)
try:
value = ast.literal_eval(value)
except Exception:
pass
return value
@command("")
def setconfig(self, key, value):
"""Set a configuration variable. 'value' may be a string or a Python expression."""
value = self._setconfig_normalize_value(key, value)
self.config.set_key(key, value)
return True
@command("")
def make_electrum_seed(self, nbits=132, entropy=1, language=None):
"""Create an Electrum seed"""
t = "electrum"
s = MnemonicElectrum(language).make_seed(t, nbits, custom_entropy=entropy)
return s
@command("")
def make_seed(self, nbits=128, language=None):
"""Create a BIP39 seed"""
s = make_bip39_words("english")
return s
@command("")
def check_electrum_seed(self, seed, entropy=1, language=None):
"""Check that an Electrum seed was generated with given entropy"""
return MnemonicElectrum(language).check_seed(seed, entropy)
@command("")
def check_seed(self, seed, entropy=1, language=None):
"""This command is deprecated and will fail, use check_electrum_seed instead."""
raise NotImplementedError(
"check_seed has been removed. Use check_electrum_seed instead."
)
@command("n")
def getaddresshistory(self, address):
"""Return the transaction history of any address. Note: This is a
walletless server query, results are not checked by SPV.
"""
sh = Address.from_string(address).to_scripthash_hex()
return self.network.synchronous_get(("blockchain.scripthash.get_history", [sh]))
@command("w")
def listunspent(self):
"""List unspent outputs. Returns the list of unspent transaction
outputs in your wallet."""
coins = self.wallet.get_utxos(exclude_frozen=False)
for coin in coins:
if coin["value"] is not None:
coin["value"] = str(PyDecimal(coin["value"]) / CASH)
coin["address"] = coin["address"].to_ui_string()
return coins
@command("n")
def getaddressunspent(self, address):
"""Returns the UTXO list of any address. Note: This
is a walletless server query, results are not checked by SPV.
"""
sh = Address.from_string(address).to_scripthash_hex()
return self.network.synchronous_get(("blockchain.scripthash.listunspent", [sh]))
@command("")
def serialize(self, jsontx):
"""Create a transaction from json inputs.
Inputs must have a redeemPubkey.
Outputs must be a list of {'address':address, 'value':satoshi_amount}.
"""
keypairs = {}
inputs = jsontx.get("inputs")
outputs = jsontx.get("outputs")
locktime = jsontx.get("locktime", 0)
for txin in inputs:
if txin.get("output"):
prevout_hash, prevout_n = txin["output"].split(":")
txin["prevout_n"] = int(prevout_n)
txin["prevout_hash"] = prevout_hash
sec = txin.get("privkey")
if sec:
txin_type, privkey, compressed = bitcoin.deserialize_privkey(sec)
pubkey = public_key_from_private_key(privkey, compressed)
keypairs[pubkey] = privkey, compressed
txin["type"] = txin_type.name
txin["x_pubkeys"] = [pubkey.hex()]
txin["signatures"] = [None]
txin["num_sig"] = 1
inputs = [TxInput.from_coin_dict(inp) for inp in inputs]
outputs = [
TxOutput(TYPE_ADDRESS, Address.from_string(x["address"]), int(x["value"]))
for x in outputs
]
tx = Transaction.from_io(
inputs,
outputs,
locktime=locktime,
sign_schnorr=self.wallet and self.wallet.is_schnorr_enabled(),
)
tx.sign(keypairs)
return tx.as_dict()
@command("wp")
def signtransaction(self, tx, privkey=None, password=None):
"""Sign a transaction. The wallet keys will be used unless a private key is provided."""
tx = Transaction(
rawtx_from_str(tx),
sign_schnorr=self.wallet and self.wallet.is_schnorr_enabled(),
)
if privkey:
txin_type, privkey2, compressed = bitcoin.deserialize_privkey(privkey)
pubkey = public_key_from_private_key(privkey2, compressed)
tx.sign({pubkey: (privkey2, compressed)})
else:
self.wallet.sign_transaction(tx, password)
return tx.as_dict()
@command("")
def deserialize(self, tx):
"""Deserialize a serialized transaction"""
tx = Transaction(rawtx_from_str(tx))
tx.deserialize()
outputs = [
{
"value": txout.value,
"type": txout.type,
"address": txout.destination,
"scriptPubKey": txout.destination.to_script().hex(),
"prevout_n": i,
}
for i, txout in enumerate(tx.outputs())
]
return self._EnsureDictNamedTuplesAreJSONSafe(
{
"version": tx.version,
"inputs": tx.inputs(),
"outputs": outputs,
"lockTime": tx.locktime,
}
)
@command("n")
def broadcast(self, tx):
"""Broadcast a transaction to the network."""
tx = Transaction(rawtx_from_str(tx))
return self.network.broadcast_transaction(tx)
@command("")
def createmultisig(self, num, pubkeys):
"""Create multisig address"""
assert isinstance(pubkeys, list), (type(num), type(pubkeys))
redeem_script = multisig_script(pubkeys, num)
address = bitcoin.hash160_to_p2sh(hash_160(redeem_script))
return {"address": address, "redeemScript": redeem_script}
@command("w")
def freeze(self, address: str):
"""Freeze address. Freeze the funds at one of your wallet\'s addresses"""
address = Address.from_string(address)
return self.wallet.set_frozen_state([address], True)
@command("w")
def unfreeze(self, address: str):
"""Unfreeze address. Unfreeze the funds at one of your wallet\'s address"""
address = Address.from_string(address)
return self.wallet.set_frozen_state([address], False)
@command("w")
def freeze_utxo(self, coin: str):
"""Freeze a UTXO so that the wallet will not spend it."""
assertOutpoint(coin)
self.wallet.set_frozen_coin_state([coin], True)
return True
@command("w")
def unfreeze_utxo(self, coin: str):
"""Unfreeze a UTXO so that the wallet might spend it."""
assertOutpoint(coin)
self.wallet.set_frozen_coin_state([coin], False)
return True
@command("wp")
def getprivatekeys(self, address, password=None):
"""Get private keys of addresses. You may pass a single wallet address, or a list of wallet addresses."""
def get_pk(address):
address = Address.from_string(address)
return self.wallet.export_private_key(address, password)
if isinstance(address, str):
return get_pk(address)
else:
return [get_pk(addr) for addr in address]
@command("w")
def ismine(self, address):
"""Check if address is in wallet. Return true if and only address is in wallet"""
address = Address.from_string(address)
return self.wallet.is_mine(address)
@command("")
def dumpprivkeys(self):
"""Deprecated."""
return (
"This command is deprecated. Use a pipe instead: "
f"'{SCRIPT_NAME} listaddresses | {SCRIPT_NAME} "
"getprivatekeys - '"
)
@command("")
def validateaddress(self, address):
"""Check that an address is valid."""
return Address.is_valid(address)
@command("w")
def getpubkeys(self, address):
"""Return the public keys for a wallet address."""
address = Address.from_string(address)
return self.wallet.get_public_keys(address)
@command("w")
def getbalance(self):
"""Return the balance of your wallet."""
c, u, x = self.wallet.get_balance()
out = {"confirmed": str(PyDecimal(c) / CASH)}
if u:
out["unconfirmed"] = str(PyDecimal(u) / CASH)
if x:
out["unmatured"] = str(PyDecimal(x) / CASH)
return out
@command("n")
def getaddressbalance(self, address):
"""Return the balance of any address. Note: This is a walletless
server query, results are not checked by SPV.
"""
sh = Address.from_string(address).to_scripthash_hex()
out = self.network.synchronous_get(("blockchain.scripthash.get_balance", [sh]))
out["confirmed"] = str(PyDecimal(out["confirmed"]) / CASH)
out["unconfirmed"] = str(PyDecimal(out["unconfirmed"]) / CASH)
return out
@command("n")
def getmerkle(self, txid, height):
"""Get Merkle branch of a transaction included in a block. Electron Cash
uses this to verify transactions (Simple Payment Verification)."""
return self.network.synchronous_get(
("blockchain.transaction.get_merkle", [txid, int(height)])
)
@command("n")
def getservers(self):
"""Return the list of available servers"""
return self.network.get_servers()
@command("")
def version(self):
"""Return the version of Electron Cash."""
from .version import PACKAGE_VERSION
return PACKAGE_VERSION
@command("w")
def getmpk(self):
"""Get master public key. Return your wallet\'s master public key"""
return self.wallet.get_master_public_key()
@command("wp")
def getmasterprivate(self, password=None):
"""Get master private key. Return your wallet\'s master private key"""
return str(self.wallet.keystore.get_master_private_key(password))
@command("wp")
def getseed(self, password=None):
"""Get seed phrase. Print the generation seed of your wallet."""
s = self.wallet.get_seed(password)
return s
@command("wp")
def importprivkey(self, privkey, password=None):
"""Import a private key."""
if not self.wallet.can_import_privkey():
return (
"Error: This type of wallet cannot import private keys. Try to create a"
" new wallet with that key."
)
try:
addr = self.wallet.import_private_key(privkey, password)
out = "Keypair imported: " + addr
except Exception as e:
out = "Error: " + str(e)
return out
def _resolver(self, x):
if x is None:
return None
out = alias.resolve(x)
if (
out.get("type") == "openalias"
and self.nocheck is False
and out.get("validated") is False
):
raise RuntimeError("cannot verify alias", x)
return out["address"]
@command("n")
def sweep(self, privkey, destination, fee=None, nocheck=False, imax=100):
"""Sweep private keys. Returns a transaction that spends UTXOs from
privkey to a destination address. The transaction is not
broadcasted."""
from .wallet import sweep
tx_fee = satoshis(fee)
privkeys = privkey.split()
self.nocheck = nocheck
addr = Address.from_string(destination)
tx = sweep(privkeys, self.network, self.config, addr, tx_fee, imax)
return tx.as_dict() if tx else None
@command("wp")
def signmessage(self, address, message, password=None):
"""Sign a message with a key. Use quotes if your message contains
whitespaces"""
address = Address.from_string(address)
sig = self.wallet.sign_message(address, message, password)
return base64.b64encode(sig).decode("ascii")
@command("")
def verifymessage(self, address, signature, message):
"""Verify a signature."""
address = Address.from_string(address)
sig = base64.b64decode(signature)
message = util.to_bytes(message)
- return verify_message(address, sig, message)
+ return verify_message_with_address(address, sig, message)
def _mktx(
self,
outputs,
fee=None,
feerate=None,
change_addr=None,
domain=None,
nocheck=False,
unsigned=False,
password=None,
locktime=None,
op_return=None,
op_return_raw=None,
addtransaction=False,
):
if fee is not None and feerate is not None:
raise ValueError(
"Cannot specify both 'fee' and 'feerate' at the same time!"
)
if op_return and op_return_raw:
raise ValueError(
"Both op_return and op_return_raw cannot be specified together!"
)
self.nocheck = nocheck
change_addr = self._resolver(change_addr)
domain = None if domain is None else map(self._resolver, domain)
final_outputs = []
if op_return:
final_outputs.append(OPReturn.output_for_stringdata(op_return))
elif op_return_raw:
try:
op_return_raw = op_return_raw.strip()
tmp = bytes.fromhex(op_return_raw).hex()
assert tmp == op_return_raw.lower()
op_return_raw = tmp
except Exception as e:
raise ValueError(
"op_return_raw must be an even number of hex digits"
) from e
final_outputs.append(OPReturn.output_for_rawhex(op_return_raw))
for address, amount in outputs:
address = self._resolver(address)
amount = satoshis(amount)
final_outputs.append(TxOutput(TYPE_ADDRESS, address, amount))
coins = self.wallet.get_spendable_coins(domain, self.config)
if feerate is not None:
fee_per_kb = 1000 * PyDecimal(feerate)
def fee_estimator(size):
return SimpleConfig.estimate_fee_for_feerate(fee_per_kb, size)
else:
fee_estimator = fee
tx = self.wallet.make_unsigned_transaction(
coins, final_outputs, self.config, fee_estimator, change_addr
)
if locktime is not None:
tx.locktime = locktime
if not unsigned:
run_hook("sign_tx", self.wallet, tx)
self.wallet.sign_transaction(tx, password)
if addtransaction:
self.wallet.add_transaction(tx.txid(), tx)
self.wallet.add_tx_to_history(tx.txid())
self.wallet.save_transactions()
return tx
@command("wp")
def payto(
self,
destination,
amount,
fee=None,
feerate=None,
from_addr=None,
change_addr=None,
nocheck=False,
unsigned=False,
password=None,
locktime=None,
op_return=None,
op_return_raw=None,
addtransaction=False,
):
"""Create a transaction."""
tx_fee = satoshis(fee)
domain = from_addr.split(",") if from_addr else None
tx = self._mktx(
[(destination, amount)],
tx_fee,
feerate,
change_addr,
domain,
nocheck,
unsigned,
password,
locktime,
op_return,
op_return_raw,
addtransaction=addtransaction,
)
return tx.as_dict()
@command("wp")
def paytomany(
self,
outputs,
fee=None,
feerate=None,
from_addr=None,
change_addr=None,
nocheck=False,
unsigned=False,
password=None,
locktime=None,
addtransaction=False,
):
"""Create a multi-output transaction."""
tx_fee = satoshis(fee)
domain = from_addr.split(",") if from_addr else None
tx = self._mktx(
outputs,
tx_fee,
feerate,
change_addr,
domain,
nocheck,
unsigned,
password,
locktime,
addtransaction=addtransaction,
)
return tx.as_dict()
@command("w")
def history(
self, year=0, show_addresses=False, show_fiat=False, use_net=False, timeout=30.0
):
"""Wallet history. Returns the transaction history of your wallet."""
t0 = time.time()
year, show_addresses, show_fiat, use_net, timeout = (
int(year),
bool(show_addresses),
bool(show_fiat),
bool(use_net),
float(timeout),
)
def time_remaining():
return max(timeout - (time.time() - t0), 0)
kwargs = {
"show_addresses": show_addresses,
"fee_calc_timeout": timeout,
"download_inputs": use_net,
}
if year:
start_date = datetime.datetime(year, 1, 1)
end_date = datetime.datetime(year + 1, 1, 1)
kwargs["from_timestamp"] = time.mktime(start_date.timetuple())
kwargs["to_timestamp"] = time.mktime(end_date.timetuple())
if show_fiat:
from .exchange_rate import FxThread
fakenet, q = None, None
if use_net and time_remaining():
class FakeNetwork:
"""This simply exists to implement trigger_callback which
is the only thing the FX thread calls if you pass it a
'network' object. We use it to get notified of when FX
history has been downloaded."""
def __init__(self, q):
self.q = q
def trigger_callback(self, *args, **kwargs):
self.q.put(True)
q = queue.Queue()
fakenet = FakeNetwork(q)
fx = FxThread(self.config, fakenet)
kwargs["fx"] = fx
# invoke the fx to grab history rates at least once, otherwise results will
# always contain "No data" (see #1671)
fx.run()
if fakenet and q and fx.is_enabled() and fx.get_history_config():
# queue.get docs aren't clean on whether 0 means block or don't
# block, so we ensure at least 1ms timeout.
# we also limit waiting for fx to 10 seconds in case it had
# errors.
try:
q.get(timeout=min(max(time_remaining() / 2.0, 0.001), 10.0))
except queue.Empty:
pass
# since we blocked above, recompute time_remaining for kwargs
kwargs["fee_calc_timeout"] = time_remaining()
return self.wallet.export_history(**kwargs)
@command("w")
def setlabel(self, key, label):
"""Assign a label to an item. Item may be a bitcoin address address or a
transaction ID"""
self.wallet.set_label(key, label)
@command("w")
def listcontacts(self):
"""Show your list of contacts"""
return self.wallet.contacts.get_all()
@command("w")
def getalias(self, key):
"""Retrieve alias. Lookup in your list of contacts, and for an OpenAlias DNS record."""
return alias.resolve(key)
@command("w")
def searchcontacts(self, query):
"""Search through contacts, return matching entries."""
results = []
for contact in self.wallet.contacts.get_all():
lquery = query.lower()
if (
lquery in contact.name.lower()
or lquery.lower() in contact.address.lower()
):
results.append(contact)
return results
@command("w")
def listaddresses(
self,
receiving=False,
change=False,
labels=False,
frozen=False,
unused=False,
funded=False,
balance=False,
):
"""List wallet addresses. Returns the list of all addresses in your wallet. Use optional arguments to filter the results."""
out = []
for addr in self.wallet.get_addresses():
if frozen and not self.wallet.is_frozen(addr):
continue
if receiving and self.wallet.is_change(addr):
continue
if change and not self.wallet.is_change(addr):
continue
if unused and self.wallet.is_used(addr):
continue
if funded and self.wallet.is_empty(addr):
continue
item = addr.to_ui_string()
if labels or balance:
item = (item,)
if balance:
item += (
format_satoshis(
sum(self.wallet.get_addr_balance(addr)), decimal_point=2
),
)
if labels:
item += (repr(self.wallet.labels.get(addr.to_storage_string(), "")),)
out.append(item)
return out
@command("n")
def gettransaction(self, txid):
"""Retrieve a transaction."""
if self.wallet and txid in self.wallet.transactions:
tx = self.wallet.transactions[txid]
else:
raw = self.network.synchronous_get(("blockchain.transaction.get", [txid]))
if raw:
tx = Transaction(bytes.fromhex(raw))
else:
raise RuntimeError("Unknown transaction")
return tx.as_dict()
@command("")
def encrypt(self, pubkey, message):
"""Encrypt a message with a public key. Use quotes if the message contains whitespaces."""
if not isinstance(pubkey, (str, bytes, bytearray)) or not isinstance(
message, (str, bytes, bytearray)
):
raise ValueError("pubkey and message text must both be strings")
message = to_bytes(message)
res = bitcoin.encrypt_message(message, bytes.fromhex(pubkey))
if isinstance(res, (bytes, bytearray)):
# prevent "JSON serializable" errors in case this came from
# cmdline. See #1270
res = res.decode("utf-8")
return res
@command("wp")
def decrypt(self, pubkey, encrypted, password=None):
"""Decrypt a message encrypted with a public key."""
if not isinstance(pubkey, str) or not isinstance(encrypted, str):
raise ValueError("pubkey and encrypted text must both be strings")
res = self.wallet.decrypt_message(pubkey, encrypted, password)
if isinstance(res, (bytes, bytearray)):
# prevent "JSON serializable" errors in case this came from
# cmdline. See #1270
res = res.decode("utf-8")
return res
def _format_request(self, out):
pr_str = {
PR_UNKNOWN: "Unknown",
PR_UNPAID: "Pending",
PR_PAID: "Paid",
PR_EXPIRED: "Expired",
PR_UNCONFIRMED: "Unconfirmed",
}
out["address"] = out.get("address").to_ui_string()
out[f"amount ({XEC.ticker})"] = format_satoshis(out.get("amount"))
out["status"] = pr_str[out.get("status", PR_UNKNOWN)]
return out
@command("w")
def getrequest(self, key):
"""Return a payment request"""
r = self.wallet.get_payment_request(Address.from_string(key), self.config)
if not r:
raise RuntimeError("Request not found")
return self._format_request(r)
# @command('w')
# def ackrequest(self, serialized):
# """<Not implemented>"""
# pass
@command("w")
def listrequests(self, pending=False, expired=False, paid=False):
"""List the payment requests you made."""
out = self.wallet.get_sorted_requests(self.config)
if pending:
f = PR_UNPAID
elif expired:
f = PR_EXPIRED
elif paid:
f = PR_PAID
else:
f = None
if f is not None:
out = list(filter(lambda x: x.get("status") == f, out))
return list(map(self._format_request, out))
@command("w")
def createnewaddress(self):
"""Create a new receiving address, beyond the gap limit of the wallet"""
return self.wallet.create_new_address(False).to_ui_string()
@command("w")
def getunusedaddress(self):
"""Returns the first unused address of the wallet, or None if all addresses are used.
An address is considered as used if it has received a transaction, or if it is used in a payment request.
"""
return self.wallet.get_unused_address().to_ui_string()
@command("w")
def addrequest(
self,
amount,
memo="",
expiration=None,
force=False,
payment_url=None,
index_url=None,
):
"""Create a payment request, using the first unused address of the wallet.
The address will be condidered as used after this operation.
If no payment is received, the address will be considered as unused if the
payment request is deleted from the wallet."""
addr = self.wallet.get_unused_address()
if addr is None:
if not self.wallet.is_deterministic():
self.wallet.print_error(
"Unable to find an unused address. Please use a deteministic "
"wallet to proceed, then run with the --force option to create "
"new addresses."
)
return False
if force:
addr = self.wallet.create_new_address(False)
else:
self.wallet.print_error(
"Unable to find an unused address. Try running with the --force "
"option to create new addresses."
)
return False
amount = satoshis(amount)
expiration = int(expiration) if expiration else None
req = self.wallet.make_payment_request(
addr, amount, memo, expiration, payment_url=payment_url, index_url=index_url
)
self.wallet.add_payment_request(req, self.config)
out = self.wallet.get_payment_request(addr, self.config)
return self._format_request(out)
@command("wp")
def signrequest(self, address, password=None):
"Sign payment request with an OpenAlias"
alias_ = self.config.get("alias")
if not alias_:
raise ValueError("No alias in your configuration")
data = alias.resolve(alias_)
alias_addr = (data and data.get("address")) or None
if not alias_addr:
raise RuntimeError("Alias could not be resolved")
self.wallet.sign_payment_request(address, alias_, alias_addr, password)
@command("w")
def rmrequest(self, address):
"""Remove a payment request"""
return self.wallet.remove_payment_request(address, self.config)
@command("w")
def clearrequests(self):
"""Remove all payment requests"""
for k in list(self.wallet.receive_requests.keys()):
self.wallet.remove_payment_request(k, self.config)
@command("n")
def notify(self, address, URL):
"""Watch an address. Everytime the address changes, a http POST is sent to the URL."""
def callback(x):
import urllib.request
headers = {"content-type": "application/json"}
data = {"address": address, "status": x.get("result")}
serialized_data = util.to_bytes(json.dumps(data))
try:
req = urllib.request.Request(URL, serialized_data, headers)
urllib.request.urlopen(req, timeout=5)
print_error("Got Response for %s" % address)
except Exception as e:
print_error(str(e))
h = Address.from_string(address).to_scripthash_hex()
self.network.send([("blockchain.scripthash.subscribe", [h])], callback)
return True
@command("wn")
def is_synchronized(self):
"""return wallet synchronization status"""
return self.wallet.is_up_to_date()
@command("n")
def getfeerate(self):
"""Return current optimal fee rate per kilobyte, according
to config settings (static/dynamic)"""
return self.config.fee_per_kb()
@command("")
def help(self):
# for the python console
return sorted(known_commands.keys())
param_descriptions = {
"wallet_path": "Wallet path(create/restore commands)",
"privkey": "Private key. Type '?' to get a prompt.",
"destination": "eCash address, contact or alias",
"address": "eCash address",
"seed": "Seed phrase",
"txid": "Transaction ID",
"pos": "Position",
"height": "Block height",
"tx": "Serialized transaction (hexadecimal)",
"key": "Variable name",
"pubkey": "Public key",
"message": "Clear text message. Use quotes if it contains spaces.",
"encrypted": "Encrypted message",
"amount": (
f"Amount to be sent (in {XEC.ticker}). Type '!' to send the maximum available."
),
"requested_amount": f"Requested amount (in {XEC.ticker}).",
"outputs": 'list of ["address", amount]',
"redeem_script": "redeem script (hexadecimal)",
}
command_options = {
"addtransaction": (
None,
(
"Whether transaction is to be used for broadcasting afterwards. Adds"
" transaction to the wallet"
),
),
"balance": ("-b", "Show the balances of listed addresses"),
"change": (None, "Show only change addresses"),
"change_addr": (
"-c",
(
"Change address. Default is a spare address, or the source address if it's"
" not in the wallet"
),
),
"domain": ("-D", "List of addresses"),
"encrypt_file": (
None,
"Whether the file on disk should be encrypted with the provided password",
),
"entropy": (None, "Custom entropy"),
"expiration": (None, "Time in seconds"),
"expired": (None, "Show only expired requests."),
"fee": ("-f", f"Transaction fee (absolute, in {XEC.ticker})"),
"feerate": (None, "Transaction fee rate (in sat/byte)"),
"force": (
None,
"Create new address beyond gap limit, if no more addresses are available.",
),
"from_addr": (
"-F",
(
"Source address (must be a wallet address; use sweep to spend from"
" non-wallet address)."
),
),
"frozen": (None, "Show only frozen addresses"),
"funded": (None, "Show only funded addresses"),
"imax": (None, "Maximum number of inputs"),
"index_url": (
None,
(
"Override the URL where you would like users to be shown the BIP70 Payment"
" Request"
),
),
"labels": ("-l", "Show the labels of listed addresses"),
"language": ("-L", "Default language for wordlist"),
"locktime": (None, "Set locktime block number"),
"memo": ("-m", "Description of the request"),
"nbits": (None, "Number of bits of entropy"),
"new_password": (None, "New Password"),
"nocheck": (None, "Do not verify aliases"),
"op_return": (
None,
"Specify string data to add to the transaction as an OP_RETURN output",
),
"op_return_raw": (
None,
(
"Specify raw hex data to add to the transaction as an OP_RETURN output"
" (0x6a aka the OP_RETURN byte will be auto-prepended for you so do not"
" include it)"
),
),
"paid": (None, "Show only paid requests."),
"passphrase": (None, "Seed extension"),
"password": ("-W", "Password"),
"payment_url": (
None,
"Optional URL where you would like users to POST the BIP70 Payment message",
),
"pending": (None, "Show only pending requests."),
"privkey": (None, "Private key. Set to '?' to get a prompt."),
"receiving": (None, "Show only receiving addresses"),
"seed_type": (
None,
(
"The type of seed to create, currently: 'electrum' and 'bip39' is"
" supported. Default 'bip39'."
),
),
"show_addresses": (None, "Show input and output addresses"),
"show_fiat": (None, "Show fiat value of transactions"),
"timeout": (
None,
(
"Timeout in seconds to wait for the overall operation to complete. Defaults"
" to 30.0."
),
),
"unsigned": ("-u", "Do not sign transaction"),
"unused": (None, "Show only unused addresses"),
"use_net": (
None,
(
"Go out to network for accurate fiat value and/or fee calculations for"
" history. If not specified only the wallet's cache is used which may lead"
" to inaccurate/missing fees and/or FX rates."
),
),
"wallet_path": (None, "Wallet path(create/restore commands)"),
"year": (None, "Show history for a given year"),
}
def json_loads(x):
return json.loads(x, parse_float=lambda y: str(PyDecimal(y)))
arg_types = {
"num": int,
"nbits": int,
"imax": int,
"year": int,
"entropy": int,
"pubkeys": json_loads,
"jsontx": json_loads,
"inputs": json_loads,
"outputs": json_loads,
"fee": lambda x: str(PyDecimal(x)) if x is not None else None,
"amount": lambda x: str(PyDecimal(x)) if x != "!" else "!",
"locktime": int,
}
config_variables = {
"addrequest": {
"requests_dir": "directory where a bip70 file will be written.",
"ssl_privkey": "Path to your SSL private key, needed to sign the request.",
"ssl_chain": (
"Chain of SSL certificates, needed for signed requests. Put your"
" certificate at the top and the root CA at the end"
),
"url_rewrite": (
"Parameters passed to str.replace(), in order to create the r= part of"
" ecash: URIs. Example:"
" \"('file:///var/www/','https://electron-cash.org/')\""
),
},
"listrequests": {
"url_rewrite": (
"Parameters passed to str.replace(), in order to create the r= part of"
" ecash: URIs. Example:"
" \"('file:///var/www/','https://electron-cash.org/')\""
),
},
}
def add_network_options(parser):
parser.add_argument(
"-1",
"--oneserver",
action="store_true",
help="connect to one server only",
)
parser.add_argument(
"-s",
"--server",
default=None,
help=(
"set server host:port:protocol, where protocol is either t (tcp) or s (ssl)"
),
)
parser.add_argument(
"-p",
"--proxy",
default=None,
help="set proxy [type:]host[:port], where type is socks4,socks5 or http",
)
parser.add_argument(
"-x",
"--disable_preferred_servers_only",
action="store_false",
dest="whitelist_servers_only",
default=None,
help=(
"Disables 'preferred servers only' for this session. This must be used in"
" conjunction with --server or --oneserver for them to work if they are"
" outside the whitelist in servers.json (or the user-specified whitelist)."
),
)
def add_global_options(parser):
group = parser.add_argument_group("global options")
group.add_argument(
"-v",
"--verbose",
action="store_true",
help="Show debugging information",
)
group.add_argument(
"-D", "--dir", dest="data_path", help=f"{PROJECT_NAME} directory"
)
group.add_argument(
"-P",
"--portable",
action="store_true",
help="Use local 'electrum_abc_data' directory",
)
group.add_argument(
"-w",
"--wallet",
dest="wallet_path",
help="wallet path",
type=os.path.abspath,
)
group.add_argument(
"-wp",
"--walletpassword",
dest="wallet_password",
default=None,
help="Supply wallet password",
)
group.add_argument(
"--forgetconfig",
action="store_true",
dest="forget_config",
help="Forget config on exit",
)
group.add_argument(
"--testnet",
action="store_true",
help="Use Testnet",
)
group.add_argument(
"--regtest",
action="store_true",
help="Use Regtest",
)
group.add_argument(
"--test-release-notification",
action="store_true",
help="fetch release notification data from current source tree",
)
group.add_argument(
"--enable-aliases",
action="store_true",
help="Enable support for eCash aliases",
)
def get_parser():
# create main parser
parser = argparse.ArgumentParser(
epilog=f"Run '{SCRIPT_NAME} help <command>' to see the help for a command"
)
add_global_options(parser)
subparsers = parser.add_subparsers(dest="cmd", metavar="<command>")
# gui
parser_gui = subparsers.add_parser(
"gui",
description=f"Run {PROJECT_NAME}'s Graphical User Interface.",
help="Run GUI (default)",
)
parser_gui.add_argument(
"url", nargs="?", default=None, help="bitcoin URI (or bip70 file)"
)
parser_gui.add_argument(
"-g",
"--gui",
help="select graphical user interface",
choices=["qt", "text", "stdio"],
)
parser_gui.add_argument(
"-o",
"--offline",
action="store_true",
help="Run offline",
)
parser_gui.add_argument(
"-m",
action="store_true",
dest="hide_gui",
help="hide GUI on startup",
)
parser_gui.add_argument(
"-L",
"--lang",
dest="language",
default=None,
help="default language used in GUI",
)
if sys.platform in ("windows", "win32"):
# Hack to support forcing QT_OPENGL env var. See #1255. This allows us
# to perhaps add a custom installer shortcut to force software rendering
parser_gui.add_argument(
"-O",
"--qt_opengl",
default=None,
help=(
"(Windows only) If using Qt gui, override the QT_OPENGL env-var with"
" this value (angle,software,desktop are possible overrides)"
),
)
if sys.platform not in ("darwin",):
# Qt High DPI scaling can not be disabled on macOS since it is never
# explicitly enabled on macOS! (see gui/qt/__init__.py)
parser_gui.add_argument(
"--qt_disable_highdpi",
action="store_true",
default=None,
help="(Linux & Windows only) If using Qt gui, disable high DPI scaling",
)
add_network_options(parser_gui)
# daemon
parser_daemon = subparsers.add_parser("daemon", help="Run Daemon")
parser_daemon.add_argument(
"subcommand",
nargs="?",
help=(
"start, stop, status, load_wallet, close_wallet. Other commands may be"
" added by plugins."
),
)
parser_daemon.add_argument(
"subargs",
nargs="*",
metavar="arg",
help="additional arguments (used by plugins)",
)
# parser_daemon.set_defaults(func=run_daemon)
add_network_options(parser_daemon)
# commands
for cmdname in sorted(known_commands.keys()):
cmd = known_commands[cmdname]
p = subparsers.add_parser(cmdname, help=cmd.help, description=cmd.description)
if cmdname == "restore":
p.add_argument(
"-o",
"--offline",
action="store_true",
help="Run offline",
)
for optname, default in zip(cmd.options, cmd.defaults):
short_option, help_ = command_options[optname]
long_option = "--" + optname
action = "store_true" if type(default) is bool else "store"
args = (short_option, long_option) if short_option else (long_option,)
if action == "store":
_type = arg_types.get(optname, str)
p.add_argument(
*args,
action=action,
default=default,
help=help_,
type=_type,
)
else:
p.add_argument(*args, action=action, default=default, help=help_)
for param in cmd.params:
h = param_descriptions.get(param, "")
_type = arg_types.get(param, str)
p.add_argument(param, help=h, type=_type)
cvh = config_variables.get(cmdname)
if cvh:
group = p.add_argument_group(
"configuration variables", "(set with setconfig/getconfig)"
)
for k, v in cvh.items():
group.add_argument(k, nargs="?", help=v)
return parser
def prompt_password(prompt: str = "Password:", confirm: bool = False) -> str:
"""Get password routine"""
password = getpass.getpass(prompt, stream=None)
if password and confirm:
password2 = getpass.getpass("Confirm: ")
if password != password2:
sys.exit("Error: Passwords do not match.")
if not password:
password = None
return password
def preprocess_cmdline_args(args):
"""Sanitize command line args before parsing them with argparse."""
# on osx, delete Process Serial Number arg generated for apps launched in Finder
args_to_be_removed = list(filter(lambda x: x.startswith("-psn"), args))
for arg in args_to_be_removed:
args.remove(arg)
# old 'help' syntax
if len(args) > 1 and args[1] == "help":
args.remove("help")
args.append("-h")
# read arguments from stdin pipe and prompt
for i, arg in enumerate(args):
if arg == "-":
if sys.stdin.isatty():
raise RuntimeError("Cannot get argument from stdin")
args[i] = sys.stdin.read()
break
if arg == "?":
args[i] = input("Enter argument:")
elif arg == ":":
args[i] = prompt_password("Enter argument (will not echo):")
# Starting the application with an URI as a first argument implies cmd="gui".
# This happens when the application is started by the OS via a mimetype association,
# e.g. a "ecash:...." BIP21 URI.
if len(args) > 1:
if any(args[1].startswith(scheme + ":") for scheme in web.parseable_schemes()):
args.insert(1, "gui")
diff --git a/electrum/electrumabc/ecc.py b/electrum/electrumabc/ecc.py
index d67e57d57..84135a783 100644
--- a/electrum/electrumabc/ecc.py
+++ b/electrum/electrumabc/ecc.py
@@ -1,440 +1,442 @@
# -*- coding: utf-8 -*-
#
# Electrum ABC - lightweight eCash client
# Copyright (C) 2018 The Electrum developers
# Copyright (C) 2024 The Electrum ABC developers
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from __future__ import annotations
import base64
import hashlib
import hmac
from enum import Enum
from typing import TYPE_CHECKING, NamedTuple, Optional, Union
import ecdsa
from ecdsa.curves import SECP256k1
from ecdsa.ecdsa import curve_secp256k1, generator_secp256k1
from ecdsa.ellipticcurve import Point
from ecdsa.util import number_to_string, string_to_number
from . import networks
from .crypto import Hash, aes_decrypt_with_iv, aes_encrypt_with_iv
from .ecc_fast import do_monkey_patching_of_python_ecdsa_internals_with_libsecp256k1
from .serialize import serialize_blob
from .util import InvalidPassword, assert_bytes, to_bytes
if TYPE_CHECKING:
from .address import Address
do_monkey_patching_of_python_ecdsa_internals_with_libsecp256k1()
CURVE_ORDER = SECP256k1.order
def i2o_ECPublicKey(pubkey, compressed=False):
# public keys are 65 bytes long (520 bits)
# 0x04 + 32-byte X-coordinate + 32-byte Y-coordinate
# 0x00 = point at infinity, 0x02 and 0x03 = compressed, 0x04 = uncompressed
# compressed keys: <sign> <x> where <sign> is 0x02 if y is even and 0x03 if y is odd
if compressed:
if pubkey.point.y() & 1:
# explicitly convert point coordinates to int, because ecdsa
# returns mpz instead of int if gmpY is installed
key = b"\x03" + int(pubkey.point.x()).to_bytes(32, "big")
else:
key = b"\x02" + int(pubkey.point.x()).to_bytes(32, "big")
else:
key = (
b"\x04"
+ int(pubkey.point.x()).to_bytes(32, "big")
+ int(pubkey.point.y()).to_bytes(32, "big")
)
return key
def regenerate_key(pk):
assert len(pk) == 32
return ECKey(pk)
def GetPubKey(pubkey, compressed=False) -> bytes:
return i2o_ECPublicKey(pubkey, compressed)
def public_key_from_private_key(pk: bytes, compressed) -> bytes:
pkey = regenerate_key(pk)
return GetPubKey(pkey.pubkey, compressed)
class SignatureType(Enum):
ECASH = 1
BITCOIN = 2
ECASH_MSG_MAGIC = b"eCash Signed Message:\n"
BITCOIN_MSG_MAGIC = b"Bitcoin Signed Message:\n"
def msg_magic(message: bytes, sigtype: SignatureType = SignatureType.ECASH) -> bytes:
"""Prepare the preimage of the message before signing it or verifying
its signature."""
magic = ECASH_MSG_MAGIC if sigtype == SignatureType.ECASH else BITCOIN_MSG_MAGIC
return serialize_blob(magic) + serialize_blob(message)
-def verify_message(
- address: Union[str, "Address"],
- sig: bytes,
- message: bytes,
- *,
- net: Optional[networks.AbstractNet] = None,
- sigtype: SignatureType = SignatureType.ECASH,
-) -> bool:
- if net is None:
- net = networks.net
- assert_bytes(sig, message)
- # Fixme: circular import address -> ecc -> address
- from .address import Address
-
- if not isinstance(address, Address):
- address = Address.from_string(address, net=net)
-
- h = Hash(msg_magic(message, sigtype))
- public_key, compressed = pubkey_from_signature(sig, h)
- # check public key using the right address
- pubkey = point_to_ser(public_key.pubkey.point, compressed)
- addr = Address.from_pubkey(pubkey)
- if address != addr:
- return False
- # check message
- try:
- public_key.verify_digest(sig[1:], h, sigdecode=ecdsa.util.sigdecode_string)
- except Exception:
- return False
- return True
-
-
def encrypt_message(message, pubkey: bytes, magic=b"BIE1"):
return ECKey.encrypt_message(message, pubkey, magic)
def get_y_coord_from_x(x: int, odd=True) -> int:
curve = curve_secp256k1
_p = curve.p()
_a = curve.a()
_b = curve.b()
for offset in range(128):
Mx = x + offset
My2 = pow(Mx, 3, _p) + _a * pow(Mx, 2, _p) + _b % _p
My = pow(My2, (_p + 1) // 4, _p)
if curve.contains_point(Mx, My):
if odd == bool(My & 1):
return My
return _p - My
raise Exception("ECC_YfromX: No Y found")
def negative_point(P):
return Point(P.curve(), P.x(), -P.y(), P.order())
def sig_string_from_der_sig(der_sig):
r, s = ecdsa.util.sigdecode_der(der_sig, CURVE_ORDER)
return ecdsa.util.sigencode_string(r, s, CURVE_ORDER)
class EcCoordinates(NamedTuple):
x: int
y: int
def point_to_ser(
P: Union[EcCoordinates, ecdsa.ellipticcurve.Point], compressed=True
) -> bytes:
if isinstance(P, tuple):
assert len(P) == 2, f"unexpected point: {P}"
x, y = P
else:
x, y = P.x(), P.y()
if compressed:
return int(2 + (y & 1)).to_bytes(1, "big") + int(x).to_bytes(32, "big")
return b"\x04" + int(x).to_bytes(32, "big") + int(y).to_bytes(32, "big")
def ser_to_coordinates(ser: bytes) -> EcCoordinates:
if ser[0] not in (0x02, 0x03, 0x04):
raise ValueError(f"Unexpected first byte: {ser[0]}")
if ser[0] == 0x04:
return EcCoordinates(string_to_number(ser[1:33]), string_to_number(ser[33:]))
x = string_to_number(ser[1:])
return EcCoordinates(x, get_y_coord_from_x(x, ser[0] == 0x03))
def ser_to_point(ser: bytes) -> ecdsa.ellipticcurve.Point:
x, y = ser_to_coordinates(ser)
return Point(curve_secp256k1, x, y, generator_secp256k1.order())
class MyVerifyingKey(ecdsa.VerifyingKey):
@classmethod
def from_signature(klass, sig, recid, h, curve):
"""See http://www.secg.org/download/aid-780/sec1-v2.pdf, chapter 4.1.6"""
from ecdsa import numbertheory, util
from . import msqr
curveFp = curve.curve
G = curve.generator
order = G.order()
# extract r,s from signature
r, s = util.sigdecode_string(sig, order)
# 1.1
x = r + (recid // 2) * order
# 1.3
alpha = (x * x * x + curveFp.a() * x + curveFp.b()) % curveFp.p()
beta = msqr.modular_sqrt(alpha, curveFp.p())
y = beta if (beta - recid) % 2 == 0 else curveFp.p() - beta
# 1.4 the constructor checks that nR is at infinity
R = Point(curveFp, x, y, order)
# 1.5 compute e from message:
e = string_to_number(h)
minus_e = -e % order
# 1.6 compute Q = r^-1 (sR - eG)
inv_r = numbertheory.inverse_mod(r, order)
Q = inv_r * (s * R + minus_e * G)
return klass.from_public_point(Q, curve)
def pubkey_from_signature(sig, h):
if len(sig) != 65:
raise Exception("Wrong encoding")
nV = sig[0]
if nV < 27 or nV >= 35:
raise Exception("Bad encoding")
if nV >= 31:
compressed = True
nV -= 4
else:
compressed = False
recid = nV - 27
return MyVerifyingKey.from_signature(sig[1:], recid, h, curve=SECP256k1), compressed
class MySigningKey(ecdsa.SigningKey):
"""Enforce low S values in signatures"""
def sign_number(self, number, entropy=None, k=None):
curve = SECP256k1
G = curve.generator
order = G.order()
r, s = ecdsa.SigningKey.sign_number(self, number, entropy, k)
if s > order // 2:
s = order - s
return r, s
class ECPubkey(object):
def __init__(self, b: bytes):
assert_bytes(b)
point = ser_to_point(b)
self._pubkey = ecdsa.ecdsa.Public_key(generator_secp256k1, point)
@classmethod
def from_sig_string(cls, sig_string: bytes, recid: int, msg_hash: bytes):
assert_bytes(sig_string)
if len(sig_string) != 64:
raise Exception("Wrong encoding")
if not (0 <= recid <= 3):
raise ValueError(f"recid is {recid}, but should be 0 <= recid <= 3")
ecdsa_verifying_key = MyVerifyingKey.from_signature(
sig_string, recid, msg_hash, curve=SECP256k1
)
ecdsa_point = ecdsa_verifying_key.pubkey.point
return ECPubkey(point_to_ser(ecdsa_point))
@classmethod
def from_point(
cls, point: Union[EcCoordinates, ecdsa.ecdsa.Public_key]
) -> ECPubkey:
_bytes = point_to_ser(point, compressed=False) # faster than compressed
return ECPubkey(_bytes)
def get_public_key_bytes(self, compressed=True):
return point_to_ser(self.point(), compressed)
def get_public_key_hex(self, compressed=True):
return self.get_public_key_bytes(compressed).hex()
def point(self) -> EcCoordinates:
return EcCoordinates(self._pubkey.point.x(), self._pubkey.point.y())
def __mul__(self, other: int):
if not isinstance(other, int):
raise TypeError(
f"multiplication not defined for ECPubkey and {type(other)}"
)
ecdsa_point = self._pubkey.point * other
return self.from_point(ecdsa_point)
def __rmul__(self, other: int):
return self * other
def __add__(self, other):
if not isinstance(other, ECPubkey):
raise TypeError(f"addition not defined for ECPubkey and {type(other)}")
ecdsa_point = self._pubkey.point + other._pubkey.point
return self.from_point(ecdsa_point)
def __eq__(self, other):
return self.get_public_key_bytes() == other.get_public_key_bytes()
def __ne__(self, other):
return not (self == other)
def verify_message_hash(self, sig_string: bytes, msg_hash: bytes) -> bool:
assert_bytes(sig_string)
if len(sig_string) != 64:
return False
ecdsa_point = self._pubkey.point
verifying_key = MyVerifyingKey.from_public_point(ecdsa_point, curve=SECP256k1)
return verifying_key.verify_digest(
sig_string, msg_hash, sigdecode=ecdsa.util.sigdecode_string
)
@classmethod
def order(cls):
return CURVE_ORDER
+def verify_message_with_address(
+ address: Union[str, "Address"],
+ sig65: bytes,
+ message: bytes,
+ *,
+ sigtype: SignatureType = SignatureType.ECASH,
+ net: Optional[networks.AbstractNet] = None,
+) -> bool:
+ # Fixme: circular import address -> ecc -> address
+ from .address import Address
+
+ if net is None:
+ net = networks.net
+ assert_bytes(sig65, message)
+
+ if not isinstance(address, Address):
+ address = Address.from_string(address, net=net)
+
+ h = Hash(msg_magic(message, sigtype))
+ try:
+ verifying_key, compressed = pubkey_from_signature(sig65, h)
+ ecdsa_point = verifying_key.pubkey.point
+ public_key = ECPubkey(point_to_ser(ecdsa_point))
+ except Exception:
+ return False
+ # check public key using the address
+ pubkey_hex = public_key.get_public_key_bytes(compressed)
+ addr = Address.from_pubkey(pubkey_hex)
+ if address != addr:
+ return False
+ # check message
+ return public_key.verify_message_hash(sig65[1:], h)
+
+
class ECKey(object):
def __init__(self, k):
secret = string_to_number(k)
self.pubkey = ecdsa.ecdsa.Public_key(
generator_secp256k1, generator_secp256k1 * secret
)
self.privkey = ecdsa.ecdsa.Private_key(self.pubkey, secret)
self.secret = secret
def GetPubKey(self, compressed):
return GetPubKey(self.pubkey, compressed)
def get_public_key(self, compressed=True) -> bytes:
return point_to_ser(self.pubkey.point, compressed)
def sign(self, msg_hash):
private_key = MySigningKey.from_secret_exponent(self.secret, curve=SECP256k1)
public_key = private_key.get_verifying_key()
signature = private_key.sign_digest_deterministic(
msg_hash, hashfunc=hashlib.sha256, sigencode=ecdsa.util.sigencode_string
)
assert public_key.verify_digest(
signature, msg_hash, sigdecode=ecdsa.util.sigdecode_string
)
return signature
def sign_message(self, message, is_compressed, sigtype=SignatureType.ECASH):
message = to_bytes(message, "utf8")
signature = self.sign(Hash(msg_magic(message, sigtype)))
for i in range(4):
sig = bytes([27 + i + (4 if is_compressed else 0)]) + signature
try:
self.verify_message(sig, message, sigtype)
return sig
except Exception:
continue
else:
raise Exception("error: cannot sign message")
def verify_message(self, sig, message, sigtype=SignatureType.ECASH):
assert_bytes(message)
h = Hash(msg_magic(message, sigtype))
public_key, compressed = pubkey_from_signature(sig, h)
# check public key
if point_to_ser(public_key.pubkey.point, compressed) != point_to_ser(
self.pubkey.point, compressed
):
raise Exception("Bad signature")
# check message
public_key.verify_digest(sig[1:], h, sigdecode=ecdsa.util.sigdecode_string)
# ECIES encryption/decryption methods; AES-128-CBC with PKCS7 is used as the cipher; hmac-sha256 is used as the mac
@classmethod
def encrypt_message(self, message, pubkey, magic=b"BIE1"):
assert_bytes(message)
pk = ser_to_point(pubkey)
if not ecdsa.ecdsa.point_is_valid(generator_secp256k1, pk.x(), pk.y()):
raise Exception("invalid pubkey")
ephemeral_exponent = number_to_string(
ecdsa.util.randrange(pow(2, 256)), generator_secp256k1.order()
)
ephemeral = ECKey(ephemeral_exponent)
ecdh_key = point_to_ser(pk * ephemeral.privkey.secret_multiplier)
key = hashlib.sha512(ecdh_key).digest()
iv, key_e, key_m = key[0:16], key[16:32], key[32:]
ciphertext = aes_encrypt_with_iv(key_e, iv, message)
ephemeral_pubkey = ephemeral.get_public_key(compressed=True)
encrypted = magic + ephemeral_pubkey + ciphertext
mac = hmac.new(key_m, encrypted, hashlib.sha256).digest()
return base64.b64encode(encrypted + mac)
def decrypt_message(self, encrypted, magic=b"BIE1"):
encrypted = base64.b64decode(encrypted)
if len(encrypted) < 85:
raise Exception("invalid ciphertext: length")
magic_found = encrypted[:4]
ephemeral_pubkey = encrypted[4:37]
ciphertext = encrypted[37:-32]
mac = encrypted[-32:]
if magic_found != magic:
raise Exception("invalid ciphertext: invalid magic bytes")
try:
ephemeral_pubkey = ser_to_point(ephemeral_pubkey)
except AssertionError:
raise Exception("invalid ciphertext: invalid ephemeral pubkey")
if not ecdsa.ecdsa.point_is_valid(
generator_secp256k1, ephemeral_pubkey.x(), ephemeral_pubkey.y()
):
raise Exception("invalid ciphertext: invalid ephemeral pubkey")
ecdh_key = point_to_ser(ephemeral_pubkey * self.privkey.secret_multiplier)
key = hashlib.sha512(ecdh_key).digest()
iv, key_e, key_m = key[0:16], key[16:32], key[32:]
if mac != hmac.new(key_m, encrypted[:-32], hashlib.sha256).digest():
raise InvalidPassword()
return aes_decrypt_with_iv(key_e, iv, ciphertext)
def get_pubkeys_from_secret(secret):
# public key
private_key = ecdsa.SigningKey.from_string(secret, curve=SECP256k1)
public_key = private_key.get_verifying_key()
K = public_key.to_string()
K_compressed = GetPubKey(public_key.pubkey, True)
return K, K_compressed
diff --git a/electrum/electrumabc/paymentrequest.py b/electrum/electrumabc/paymentrequest.py
index 9ff23694c..1992c1cd4 100644
--- a/electrum/electrumabc/paymentrequest.py
+++ b/electrum/electrumabc/paymentrequest.py
@@ -1,1042 +1,1042 @@
#
# Electrum ABC - lightweight eCash client
# Copyright (C) 2020 The Electrum ABC developers
# Copyright (C) 2014 Thomas Voegtlin
# Copyright (C) 2019-2020 Calin Culianu <calin.culianu@gmail.com>
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import hashlib
import json
import sys
import threading
import time
import traceback
import urllib.parse
import zlib
from collections import namedtuple
import dateutil.parser
import requests
try:
from . import paymentrequest_pb2 as pb2
except ImportError:
sys.exit(
"Error: could not find paymentrequest_pb2.py. Create it with 'protoc"
" --proto_path=electrumabc/ --python_out=electrumabc/"
" electrumabc/paymentrequest.proto'"
)
from . import bitcoin, rsakey, transaction, util, x509
from .address import Address, PublicKey
from .bitcoin import TYPE_ADDRESS
from .constants import PROJECT_NAME, PROJECT_NAME_NO_SPACES, XEC
from .crypto import sha256
-from .ecc import point_to_ser, regenerate_key, verify_message
+from .ecc import point_to_ser, regenerate_key, verify_message_with_address
from .printerror import PrintError, print_error
from .transaction import Transaction, TxOutput
from .util import FileImportFailed, FileImportFailedEncrypted, bfh, bh2u
from .version import PACKAGE_VERSION
def _(message):
return message
# status of payment requests
PR_UNPAID = 0
PR_EXPIRED = 1
PR_UNKNOWN = 2 # sent but not propagated
PR_PAID = 3 # send and propagated
PR_UNCONFIRMED = 7 # paid and confirmations = 0 (7 used to match Electrum)
pr_tooltips = {
PR_UNPAID: _("Pending"),
PR_UNKNOWN: _("Unknown"),
PR_PAID: _("Paid"),
PR_EXPIRED: _("Expired"),
PR_UNCONFIRMED: _("Unconfirmed"),
}
del _
# e.g. "ElectrumABC/5.1.1
USER_AGENT = f"{PROJECT_NAME_NO_SPACES}/{PACKAGE_VERSION}"
REQUEST_HEADERS = {
"Accept": "application/ecash-paymentrequest",
"User-Agent": USER_AGENT,
}
ACK_HEADERS = {
"Content-Type": "application/ecash-payment",
"Accept": "application/ecash-paymentack",
"User-Agent": USER_AGENT,
}
ca_path = requests.certs.where()
ca_list = None
ca_keyID = None
def load_ca_list():
global ca_list, ca_keyID
if ca_list is None:
ca_list, ca_keyID = x509.load_certificates(ca_path)
def get_payment_request(url):
data = error = None
try:
u = urllib.parse.urlparse(url)
except ValueError as e:
error = str(e)
else:
if u.scheme in ("https",) and u.netloc.lower().endswith("bitpay.com"):
# Use BitPay 2.0 JSON-based API -- https only
return get_payment_request_bitpay20(url)
# .. else, try regular BIP70
if u.scheme in ["http", "https"]:
try:
response = requests.request("GET", url, headers=REQUEST_HEADERS)
response.raise_for_status()
# Guard against `ecash:`-URIs with invalid payment request URLs
if (
"Content-Type" not in response.headers
or response.headers["Content-Type"]
!= "application/ecash-paymentrequest"
):
error = (
"payment URL not pointing to a ecash payment request handling"
" server"
)
else:
data = response.content
print_error("fetched payment request", url, len(response.content))
except requests.exceptions.RequestException as e:
error = str(e)
else:
error = f"unknown scheme: '{u.scheme}'"
return PaymentRequest(data, error)
class PaymentRequest:
def __init__(self, data, error=None):
self.raw = data
self.error = error
self.parse(data)
self.requestor = None # known after verify
self.tx = None
def __str__(self):
return str(self.raw)
def parse(self, r):
if self.error:
return
self.id = bh2u(sha256(r)[0:16])
try:
self.data = pb2.PaymentRequest()
self.data.ParseFromString(r)
except Exception:
self.error = "cannot parse payment request"
return
self.details = pb2.PaymentDetails()
self.details.ParseFromString(self.data.serialized_payment_details)
self.outputs = []
for o in self.details.outputs:
addr = transaction.get_address_from_output_script(o.script)[1]
self.outputs.append(TxOutput(TYPE_ADDRESS, addr, o.amount))
self.memo = self.details.memo
self.payment_url = self.details.payment_url
def is_pr(self):
return self.get_amount() != 0
def verify(self, contacts):
if self.error:
return False
if not self.raw:
self.error = "Empty request"
return False
pr = pb2.PaymentRequest()
try:
pr.ParseFromString(self.raw)
except Exception:
self.error = "Error: Cannot parse payment request"
return False
if not pr.signature:
# the address will be dispayed as requestor
self.requestor = None
return True
if pr.pki_type in ["x509+sha256", "x509+sha1"]:
return self.verify_x509(pr)
elif pr.pki_type in ["dnssec+btc", "dnssec+ecdsa"]:
return self.verify_dnssec(pr, contacts)
else:
self.error = "ERROR: Unsupported PKI Type for Message Signature"
return False
def verify_x509(self, paymntreq):
load_ca_list()
if not ca_list:
self.error = "Trusted certificate authorities list not found"
return False
cert = pb2.X509Certificates()
cert.ParseFromString(paymntreq.pki_data)
# verify the chain of certificates
try:
x, ca = verify_cert_chain(cert.certificate)
except Exception as e:
traceback.print_exc(file=sys.stderr)
self.error = str(e)
return False
# get requestor name
self.requestor = x.get_common_name()
if self.requestor.startswith("*."):
self.requestor = self.requestor[2:]
# verify the BIP70 signature
pubkey0 = rsakey.RSAKey(x.modulus, x.exponent)
sig = paymntreq.signature
paymntreq.signature = b""
s = paymntreq.SerializeToString()
sigBytes = bytearray(sig)
msgBytes = bytearray(s)
if paymntreq.pki_type == "x509+sha256":
hashBytes = bytearray(hashlib.sha256(msgBytes).digest())
verify = pubkey0.verify(sigBytes, x509.PREFIX_RSA_SHA256 + hashBytes)
elif paymntreq.pki_type == "x509+sha1":
verify = pubkey0.hashAndVerify(sigBytes, msgBytes)
if not verify:
self.error = "ERROR: Invalid Signature for Payment Request Data"
return False
# SIG Verified
self.error = "Signed by Trusted CA: " + ca.get_common_name()
return True
def verify_dnssec(self, pr, contacts):
sig = pr.signature
alias = util.to_string(pr.pki_data)
try:
info = contacts.resolve(alias)
except RuntimeWarning as e:
# Failed to resolve openalias or contact
self.error = " ".join(e.args)
return False
except Exception as e:
# misc other parse error (bad address, etc)
self.error = str(e)
return False
if info.get("validated") is not True:
self.error = "Alias verification failed (DNSSEC)"
return False
if pr.pki_type == "dnssec+btc":
self.requestor = alias
address = info.get("address")
pr.signature = b""
message = pr.SerializeToString()
- if verify_message(address, sig, message):
+ if verify_message_with_address(address, sig, message):
self.error = "Verified with DNSSEC"
return True
else:
self.error = "verify failed"
return False
else:
self.error = "unknown algo"
return False
def has_expired(self):
return bool(self.details.expires and self.details.expires < int(time.time()))
def get_expiration_date(self):
return self.details.expires
def get_amount(self):
return sum(x.value for x in self.outputs)
def get_address(self) -> str:
o = self.outputs[0]
assert o.type == TYPE_ADDRESS
return o.destination.to_ui_string()
def get_requestor(self):
return self.requestor if self.requestor else self.get_address()
def get_verify_status(self):
return self.error if self.requestor else "No Signature"
def get_memo(self):
return self.memo
def get_payment_url(self):
return self.details.payment_url
def get_dict(self):
return {
"requestor": self.get_requestor(),
"memo": self.get_memo(),
"exp": self.get_expiration_date(),
"amount": self.get_amount(),
"signature": self.get_verify_status(),
"txid": self.tx,
"outputs": self.get_outputs(),
"payment_url": self.get_payment_url(),
}
def get_id(self):
return self.id if self.requestor else self.get_address()
def get_outputs(self):
return self.outputs[:]
def send_payment(self, raw_tx, refund_addr):
pay_det = self.details
if not self.details.payment_url:
# note caller is expecting this exact string in the "no payment url
# specified" case. see main_window.py
return False, "no url"
paymnt = pb2.Payment()
paymnt.merchant_data = pay_det.merchant_data
paymnt.transactions.append(bfh(raw_tx))
ref_out = paymnt.refund_to.add()
ref_out.script = refund_addr.to_script()
paymnt.memo = f"Paid using {PROJECT_NAME}"
pm = paymnt.SerializeToString()
payurl = urllib.parse.urlparse(pay_det.payment_url)
try:
r = requests.post(
payurl.geturl(), data=pm, headers=ACK_HEADERS, verify=ca_path
)
except requests.exceptions.RequestException as e:
return False, str(e)
if r.status_code != 200:
# Propagate 'Bad request' (HTTP 400) messages to the user since they
# contain valuable information.
if r.status_code == 400:
return False, (r.reason + ": " + r.content.decode("UTF-8"))
# Some other errors might display an entire HTML document.
# Hide those and just display the name of the error code.
return False, r.reason
try:
paymntack = pb2.PaymentACK()
paymntack.ParseFromString(r.content)
except Exception:
return (
False,
(
"PaymentACK could not be processed. Payment was sent; please"
" manually verify that payment was received."
),
)
return True, paymntack.memo
def serialize(self):
"""Returns bytes"""
return self.raw or b""
@classmethod
def deserialize(cls, ser):
return cls(ser)
def export_file_data(self):
"""Returns bytes suitable to be saved to a file"""
return self.serialize()
@classmethod
def export_file_ext(cls):
return "bip70"
def make_unsigned_request(req):
from .address import Address
addr = req["address"]
time = req.get("time", 0)
exp = req.get("exp", 0)
payment_url = req.get("payment_url")
if time and not isinstance(time, int):
time = 0
if exp and not isinstance(exp, int):
exp = 0
amount = req["amount"]
if amount is None:
amount = 0
memo = req["memo"]
if not isinstance(addr, Address):
addr = Address.from_string(addr)
outputs = [(addr.to_script(), amount)]
pd = pb2.PaymentDetails()
for script, amount in outputs:
pd.outputs.add(amount=amount, script=script)
pd.time = time
pd.expires = time + exp if exp else 0
pd.memo = memo
if payment_url:
pd.payment_url = payment_url
pr = pb2.PaymentRequest()
# Note: We explicitly set this again here to 1 (default was already 1).
# The reason we need to do this is because __setattr__ for this class
# will trigger the Serialization to be 4 bytes of this field, rather than 2,
# if it was explicitly set programmatically.
#
# This works around possible bugs with google protobuf for Javascript
# seen in the field -- in particular bitcoin.com was rejecting our BIP70 files
# because payment_details_version needed to be 4 bytes, not 2.
# Forcing the encoding to 4 bytes for payment_details_version fixed the
# rejection. This workaround is likely needed due to bugs in the protobuf.js
# library.
pr.payment_details_version = int(pr.payment_details_version)
pr.serialized_payment_details = pd.SerializeToString()
pr.signature = util.to_bytes("")
return pr
def sign_request_with_alias(pr, alias, alias_privkey):
pr.pki_type = "dnssec+btc"
pr.pki_data = util.to_bytes(alias)
message = pr.SerializeToString()
_typ, raw_key, compressed = bitcoin.deserialize_privkey(alias_privkey)
ec_key = regenerate_key(raw_key)
pr.signature = ec_key.sign_message(message, compressed)
def verify_cert_chain(chain):
"""Verify a chain of certificates. The last certificate is the CA"""
load_ca_list()
# parse the chain
cert_num = len(chain)
x509_chain = []
for i in range(cert_num):
x = x509.X509(bytearray(chain[i]))
x509_chain.append(x)
if i == 0:
x.check_date()
else:
if not x.check_ca():
raise RuntimeError("ERROR: Supplied CA Certificate Error")
if not cert_num > 1:
raise RuntimeError(
"ERROR: CA Certificate Chain Not Provided by Payment Processor"
)
# if the root CA is not supplied, add it to the chain
ca = x509_chain[cert_num - 1]
if ca.getFingerprint() not in ca_list:
keyID = ca.get_issuer_keyID()
f = ca_keyID.get(keyID)
if f:
root = ca_list[f]
x509_chain.append(root)
else:
raise RuntimeError("Supplied CA Not Found in Trusted CA Store.")
# verify the chain of signatures
cert_num = len(x509_chain)
for i in range(1, cert_num):
x = x509_chain[i]
prev_x = x509_chain[i - 1]
algo, sig, data = prev_x.get_signature()
sig = bytearray(sig)
pubkey = rsakey.RSAKey(x.modulus, x.exponent)
if algo == x509.ALGO_RSA_SHA1:
verify = pubkey.hashAndVerify(sig, data)
elif algo == x509.ALGO_RSA_SHA256:
hashBytes = bytearray(hashlib.sha256(data).digest())
verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA256 + hashBytes)
elif algo == x509.ALGO_RSA_SHA384:
hashBytes = bytearray(hashlib.sha384(data).digest())
verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA384 + hashBytes)
elif algo == x509.ALGO_RSA_SHA512:
hashBytes = bytearray(hashlib.sha512(data).digest())
verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA512 + hashBytes)
else:
raise RuntimeError("Algorithm not supported")
if not verify:
raise RuntimeError(
"Certificate not Signed by Provided CA Certificate Chain"
)
return x509_chain[0], ca
def check_ssl_config(config):
from . import pem
key_path = config.get("ssl_privkey")
cert_path = config.get("ssl_chain")
with open(key_path, "r", encoding="utf-8") as f:
params = pem.parse_private_key(f.read())
with open(cert_path, "r", encoding="utf-8") as f:
s = f.read()
bList = pem.dePemList(s, "CERTIFICATE")
# verify chain
x, ca = verify_cert_chain(bList)
assert x.modulus == params[0]
assert x.exponent == params[1]
# return requestor
requestor = x.get_common_name()
if requestor.startswith("*."):
requestor = requestor[2:]
return requestor
def sign_request_with_x509(pr, key_path, cert_path):
from . import pem
with open(key_path, "r", encoding="utf-8") as f:
params = pem.parse_private_key(f.read())
privkey = rsakey.RSAKey(*params)
with open(cert_path, "r", encoding="utf-8") as f:
s = f.read()
bList = pem.dePemList(s, "CERTIFICATE")
certificates = pb2.X509Certificates()
certificates.certificate.extend(map(bytes, bList))
pr.pki_type = "x509+sha256"
pr.pki_data = util.to_bytes(certificates.SerializeToString())
msgBytes = bytearray(pr.SerializeToString())
hashBytes = bytearray(hashlib.sha256(msgBytes).digest())
sig = privkey.sign(x509.PREFIX_RSA_SHA256 + hashBytes)
pr.signature = bytes(sig)
def serialize_request(req):
pr = make_unsigned_request(req)
signature = req.get("sig")
requestor = req.get("name")
if requestor and signature:
pr.signature = bfh(signature)
pr.pki_type = "dnssec+btc"
pr.pki_data = util.to_bytes(requestor)
return pr
def make_request(config, req):
pr = make_unsigned_request(req)
key_path = config.get("ssl_privkey")
cert_path = config.get("ssl_chain")
if key_path and cert_path:
sign_request_with_x509(pr, key_path, cert_path)
return pr
class InvoiceStore(object):
def __init__(self, storage):
self.storage = storage
self.invoices = {}
self.paid = {}
d = self.storage.get("invoices2", b"?")
if d == b"?":
# new format not found, use old format (upgrade)
d = self.storage.get("invoices", {})
self.load(d)
def set_paid(self, pr, txid):
pr.tx = txid
self.paid[txid] = pr.get_id()
def load(self, d):
for k, v in d.items():
try:
pr = None
raw = bfh(v.get("hex"))
try:
# First try BitPay 2.0 style PR -- this contains compressed raw bytes of the headers & json associated with the request; will raise if wrong format
pr = PaymentRequestBitPay20.deserialize(raw)
except Exception:
pass
if not pr:
# Lastly, try the BIP70 style PR; this won't raise if bad format
pr = PaymentRequest.deserialize(raw)
pr.tx = v.get("txid")
pr.requestor = v.get("requestor")
self.invoices[k] = pr
if pr.tx:
self.paid[pr.tx] = k
except Exception:
continue
def import_file(self, path):
try:
with open(path, "r", encoding="utf-8") as f:
d = json.loads(f.read())
self.load(d)
except json.decoder.JSONDecodeError:
traceback.print_exc(file=sys.stderr)
raise FileImportFailedEncrypted()
except Exception:
traceback.print_exc(file=sys.stdout)
raise FileImportFailed()
self.save()
def save(self):
invoices = {}
for k, pr in self.invoices.items():
invoices[k] = {
"hex": bh2u(pr.serialize()),
"requestor": pr.requestor,
"txid": pr.tx,
}
self.storage.put("invoices2", invoices)
# delete old invoice format to save space; caveat: older EC versions will not
# see invoices saved by newer versions anymore.
self.storage.put("invoices", None)
def get_status(self, key):
pr = self.get(key)
if pr is None:
print_error("[InvoiceStore] get_status() can't find pr for", key)
return
if pr.tx is not None:
return PR_PAID
if pr.has_expired():
return PR_EXPIRED
return PR_UNPAID
def add(self, pr):
key = pr.get_id()
self.invoices[key] = pr
self.save()
return key
def remove(self, key):
paid_list = self.paid.items()
for p in paid_list:
if p[1] == key:
self.paid.pop(p[0])
break
self.invoices.pop(key)
self.save()
def get(self, k):
return self.invoices.get(k)
def sorted_list(self):
# sort
return self.invoices.values()
def unpaid_invoices(self):
return [
v
for k, v in self.invoices.items()
if self.get_status(k) not in (PR_PAID, None)
]
# -----------------------------------------------------------------------------
""" BitPay 2.0 JSON-based HTTP Payment Protocol, replaces BIP70 for BitPay only.
Includes a scheme to verify the payment request using Bitcoin public keys
rather than x509 which is what BIP70 used. """
class ResponseError(Exception):
"""Contains the exact text of the bad response error message from BitPay"""
class PaymentRequestBitPay20(PaymentRequest, PrintError):
"""Work-alike to the existing BIP70 PaymentRequest class.
Wraps payment requests based on the new BitPay 2.0 JSON API."""
HEADERS = {"User-Agent": USER_AGENT}
Details = namedtuple(
"BitPay20Details",
"outputs, memo, payment_url, time, expires, network, currency,"
" required_fee_rate",
)
class Raw:
__slots__ = ("status_code", "headers", "text", "url")
ser_prefix = b"BITPAY2.0_ZCOMPRESSED_"
def __init__(self, **kwargs):
resp = kwargs.get("response")
if resp:
if not isinstance(resp, requests.Response):
raise ValueError(
"Expected a Response object in PaymentRequest_BitPay20.Raw"
" constructor"
)
self.status_code = resp.status_code
self.headers = resp.headers
self.text = resp.text
self.url = resp.url
else:
self.status_code = kwargs.get("status_code", 0)
self.headers = requests.structures.CaseInsensitiveDict(
kwargs.get("headers", {})
)
self.text = kwargs.get("text", "")
self.url = kwargs.get("url", "")
def json(self):
return json.loads(self.text)
def serialize(self):
d = self.get_dict()
return self.ser_prefix + zlib.compress(
json.dumps(d).encode("utf-8"), level=9
)
@classmethod
def deserialize(cls, ser):
if not ser.startswith(cls.ser_prefix):
raise ValueError("Invalid serialized data")
data = zlib.decompress(ser[len(cls.ser_prefix) :])
d = json.loads(data.decode("utf-8"))
if not all(s in d for s in cls.__slots__):
raise ValueError("Missing required keys in deserialized data")
return cls(**d)
def get_dict(self):
d = {}
for s in self.__slots__:
val = getattr(self, s, "")
if isinstance(val, requests.structures.CaseInsensitiveDict):
val = dict(val)
d[s] = val
return d
def __str__(self):
return json.dumps(self.get_dict())
# /class Raw
def serialize(self):
if self.raw:
return self.raw.serialize()
@classmethod
def deserialize(cls, ser):
return cls(cls.Raw.deserialize(ser))
def export_file_data(self):
"""Returns bytes suitable to be saved to a file"""
return json.dumps(self.raw.get_dict(), indent=4).encode("utf-8")
@classmethod
def export_file_ext(cls):
return "json"
def parse(self, r):
"""Overrides super. r is a self.Raw object."""
if self.error:
return
if not isinstance(r, self.Raw): # BitPay2.0 requires 'raw' be a Raw instance
self.error = (
"Argument not of the proper type (expected PaymentRequest_BitPay20.Raw"
" instance)"
)
return
(
self.data,
self.id,
self.details,
self.outputs,
self.memo,
self.payment_url,
self.headers,
) = (
None,
) * 7 # ensure attributes defined
try:
if r.status_code == 400:
# error 400, has special info in r.text
raise ResponseError(r.text)
assert r.status_code == 200, f"Bad response status: {r.status_code}"
self.headers = r.headers.copy()
self.data = j = r.json()
self.id = j["paymentId"]
self.details = self.Details(
outputs=j["outputs"],
memo=j["memo"],
payment_url=j["paymentUrl"],
time=dateutil.parser.parse(j["time"]).timestamp(),
expires=dateutil.parser.parse(j["expires"]).timestamp(),
network=j.get("network", "main"),
currency=j.get("currency", f"{XEC.ticker}"),
required_fee_rate=j.get("requiredFeeRate", 1),
)
self.outputs = []
for o in self.details.outputs:
amt, addr = o["amount"], Address.from_string(o["address"])
self.outputs.append(TxOutput(TYPE_ADDRESS, addr, amt))
self.memo = self.details.memo
self.payment_url = self.details.payment_url
except ResponseError as e:
self.error = str(e)
except (KeyError, ValueError, TypeError, AssertionError, IndexError) as e:
self.error = f"cannot parse payment request ({str(e)})"
except Exception as e:
self.print_error("Error parsing payment prequest", repr(e))
self.error = "Low-level error encountered parsing the payment request"
# super methods that work ok for us:
# def is_pr(self) -> bool
# def get_outputs(self) -> list
# def get_id(self) -> str
# def get_dict(self) -> dict
# def get_memo(self) -> str
# def get_verify_status(self) -> str
# def get_requestor(self) -> Address?
# def get_amount(self) -> int
# def get_address(self) -> Address
# def get_expiration_date(self) -> float
# def get_payment_url(self) -> str
# def has_expired(self) -> bool
def base_url(self):
r = self.raw
url = getattr(r, "url", None)
if url:
up = urllib.parse.urlparse(url)
return f"{up.scheme}://{up.netloc}"
return ""
# Cache the signing keys
_signing_keys = [0.0, "BitPay, Inc.", set()]
_signing_keys_lock = threading.Lock()
_pgp_key_data = {}
def _get_signing_keys(self, timeout=10.0):
return self._signing_keys
# NOTE: the below is turned-off for now
# We need to hear from BitPay on how best to handle this.
# It appears to be much ado about nothing since the PGP keys
# come from the web *anyway*. What's more -- we need to depend
# on Python PGP libs now, which is a rather heavy dependency. :/
if not self._pgp_key_data:
try:
pgp_key_data = requests.get(
"https://bitpay.com/pgp-keys.json", timeout=timeout, verify=True
).json()["pgpKeys"]
pgp_key_data = {
d["fingerprint"]: {"owner": d["owner"], "publicKey": d["publicKey"]}
for d in pgp_key_data
}
with self._signing_keys_lock:
self._pgp_key_data.update(pgp_key_data)
except Exception as e:
self.print_error("Failed to get PGP keys:", repr(e))
# TODO FIXME XXX: Use the PGP keys above to verify the retrieved keys below
# The problem is as follows: PGP dependencies in python, which are a bit
# heavy-handed. The URL for requesting sigs for the below would be:
#
# https://test.bitpay.com/signatures/<paymentProtocol.json_RESPONSE_SHA256_HASH>.json
#
# See: https://bitpay.com/docs/payment-protocol
ts, owner, signing_pubkeys = self._signing_keys
if (
not signing_pubkeys or abs(time.time() - ts) > 60.0 * 60.0
): # we keep the cached keys for up to 1 hour
url = self.base_url() + "/signingKeys/paymentProtocol.json"
try:
r2 = requests.get(url, timeout=timeout, verify=True)
if r2.status_code != 200:
raise RuntimeError(
f"Bad status when retrieving signing keys: {r2.status_code}"
)
with self._signing_keys_lock:
signing_pubkeys.clear()
d = r2.json()
exp = dateutil.parser.parse(d["expirationDate"]).timestamp()
if exp < time.time():
print_error(
"Warning: BitPay returned expired keys expirationDate=",
d["expirationDate"],
)
owner = d.get("owner", owner)
for k in d["publicKeys"]:
pk = PublicKey.from_string(k)
signing_pubkeys.add(pk)
except requests.RequestException as e:
self.error = "error retrieving keys: " + repr(e)
self.print_error(self.error)
raise
except Exception as e:
self.error = "error parsing signing keys: " + repr(e)
self.print_error(self.error)
raise
self._signing_keys[0] = time.time()
self._signing_keys[1] = owner
return self._signing_keys
def verify(self, contacts, *, timeout=10.0):
self.print_error("Verify")
# NB: contacts is ignored
if self.error:
return False
if not self.raw:
self.error = "Empty request"
return False
r = self.raw
try:
if r.status_code != 200:
if r.status_code == 400:
raise ValueError(r.text)
raise ValueError(f"Bad HTTP respone code: {r.status_code}")
sig = bytes.fromhex(r.headers["signature"])
digest = r.headers["digest"]
if not digest.upper().startswith("SHA-256="):
raise ValueError("Unknown digest")
digest = bytes.fromhex(digest.split("=", 1)[1])
if len(digest) != 32:
raise ValueError("Bad digest")
addr = Address.from_string(r.headers["x-identity"])
if sha256(r.text) != digest:
raise ValueError("Digest does not match payload")
msg = digest
except Exception as e:
self.error = "error processing response:" + repr(e)
self.print_error(self.error)
return False
# Grab Signing keys either from cache or from BitPay
try:
ts, owner, signing_pubkeys = self._get_signing_keys(timeout=timeout)
except Exception:
# Error retrieving signing pubkeys, try using cached values
# if that fails.. just abort.
ts, owner, signing_pubkeys = self._signing_keys
if not signing_pubkeys:
return False
self.error = None # clear error
# they don't include the nV byte so we have to try a bunch of stuff here
for nV in (27, 28, 31, 32):
pk, comp = bitcoin.pubkey_from_signature(bytes([nV]) + sig, msg)
pubkey = point_to_ser(pk.pubkey.point, comp)
sig_addr = Address.from_pubkey(pubkey)
if addr == sig_addr:
self.print_error("Signing address found and matches")
if PublicKey.from_pubkey(pubkey) in signing_pubkeys:
self.print_error("Signing pubkey is valid")
else:
# TODO: Fixme -- for now this branch will always be taken because we turned off key download in _get_signing_keys() above
self.print_error(
"Warning: Could not verify whether signing public key is"
" valid:",
pubkey.hex(),
"(PGP verification is currently disabled)",
)
self.requestor = sig_addr.to_ui_string()
break
else:
self.error = "failed to verify signature against retrieved keys"
self.print_error(self.error)
return False
# SIG Verified
# This is not ideal because we re-use self.error for a *non-error* but the
# superclass API is this way. -Calin
self.error = "Signed by: " + owner
return True
def verify_x509(self, paymntreq):
raise NotImplementedError()
def verify_dnssec(self, pr, contacts):
raise NotImplementedError()
def send_payment(self, raw_tx: str, refund_addr, *, timeout=10.0):
self.print_error("Send payment")
# NB: refund_addr is ignored
self.tx = None
# First, verify that BitPay would accept the payment by sending
# a verify-payment message via HTTP
tx = Transaction(bytes.fromhex(raw_tx))
# def from_io(klass, inputs, outputs, locktime=0, sign_schnorr=False):
unsigned_tx = Transaction.from_io(
tx.txinputs(),
tx.outputs(),
locktime=tx.locktime,
sign_schnorr=tx.is_schnorr_signed(0),
)
h = self.HEADERS.copy()
h["Content-Type"] = "application/verify-payment"
unsigned_raw = unsigned_tx.serialize(True)
body = {
"currency": self.details.currency or f"{XEC.ticker}",
"unsignedTransaction": unsigned_raw.hex(),
"weightedSize": len(unsigned_raw),
}
try:
r = requests.post(
self.raw.url, headers=h, data=json.dumps(body).encode("utf-8")
)
except requests.RequestException as e:
return False, str(e)
if r.status_code != 200:
# Propagate 'Bad request' (HTTP 400) messages to the user since they
# contain valuable information.
if r.status_code == 400:
return False, (r.reason + ": " + r.content.decode("UTF-8"))
# Some other errors might display an entire HTML document.
# Hide those and just display the name of the error code.
return False, r.reason
memo = r.json().get("memo", "?").lower()
if "valid" not in memo:
return False, f"Did not receive 'valid': {memo}"
# Ok, all is valid -- now actually send the tx
h["Content-Type"] = "application/payment"
body = {
"currency": self.details.currency or f"{XEC.ticker}",
"transactions": [raw_tx],
}
try:
r = requests.post(
self.raw.url, headers=h, data=json.dumps(body).encode("utf-8")
)
except requests.RequestException as e:
return False, str(e)
if r.status_code != 200:
# Propagate 'Bad request' (HTTP 400) messages to the user since they
# contain valuable information.
if r.status_code == 400:
return False, (r.reason + ": " + r.content.decode("UTF-8"))
# Some other errors might display an entire HTML document.
# Hide those and just display the name of the error code.
return False, r.reason
memo = r.json().get("memo", "?")
self.tx = Transaction._txid(raw_tx) # save txid
return True, memo
def get_payment_request_bitpay20(url, timeout=10.0):
"""Synchronously contacts BitPay and gets the payment request.
Returns the PaymentRequest object. Returned PaymentRequest
has .error != None on error."""
headers = PaymentRequestBitPay20.HEADERS.copy()
headers.update({"accept": "application/payment-request"})
try:
r = requests.get(url, headers=headers, timeout=timeout, verify=True)
if r.status_code == 400:
raise ResponseError(r.text)
r.raise_for_status()
return PaymentRequestBitPay20(PaymentRequestBitPay20.Raw(response=r))
except Exception as e:
print_error("[BitPay2.0] get_payment_request:", repr(e))
return PaymentRequest(None, error=str(e))
diff --git a/electrum/electrumabc/tests/test_ecc.py b/electrum/electrumabc/tests/test_ecc.py
index f8a0a5365..18e28dd57 100644
--- a/electrum/electrumabc/tests/test_ecc.py
+++ b/electrum/electrumabc/tests/test_ecc.py
@@ -1,142 +1,152 @@
import base64
import unittest
import ecdsa
from ecdsa.ecdsa import generator_secp256k1
from ecdsa.util import number_to_string
from ..bitcoin import deserialize_privkey
-from ..ecc import ECKey, SignatureType, point_to_ser, regenerate_key, verify_message
+from ..ecc import (
+ ECKey,
+ SignatureType,
+ point_to_ser,
+ regenerate_key,
+ verify_message_with_address,
+)
class TestECC(unittest.TestCase):
def test_crypto(self):
for message in [
b"Chancellor on brink of second bailout for banks",
b"\xff" * 512,
]:
self._do_test_crypto(message)
def _do_test_crypto(self, message):
G = generator_secp256k1
_r = G.order()
pvk = ecdsa.util.randrange(pow(2, 256)) % _r
Pub = pvk * G
pubkey_c = point_to_ser(Pub, True)
eck = ECKey(number_to_string(pvk, _r))
enc = ECKey.encrypt_message(message, pubkey_c)
dec = eck.decrypt_message(enc)
self.assertEqual(message, dec)
# enc2 = EC_KEY.encrypt_message(message, pubkey_u)
dec2 = eck.decrypt_message(enc)
self.assertEqual(message, dec2)
signature = eck.sign_message(message, True)
# print signature
ECKey.verify_message(eck, signature, message)
def test_msg_signing(self):
msg1 = b"Chancellor on brink of second bailout for banks"
msg2 = b"Electrum"
def sign_message_with_wif_privkey(wif_privkey, msg):
txin_type, privkey, compressed = deserialize_privkey(wif_privkey)
key = regenerate_key(privkey)
return key.sign_message(msg, compressed)
sig1 = sign_message_with_wif_privkey(
"L1TnU2zbNaAqMoVh65Cyvmcjzbrj41Gs9iTLcWbpJCMynXuap6UN", msg1
)
addr1 = "15hETetDmcXm1mM4sEf7U2KXC9hDHFMSzz"
sig2 = sign_message_with_wif_privkey(
"5Hxn5C4SQuiV6e62A1MtZmbSeQyrLFhu5uYks62pU5VBUygK2KD", msg2
)
addr2 = "1GPHVTY8UD9my6jyP4tb2TYJwUbDetyNC6"
sig1_b64 = base64.b64encode(sig1)
sig2_b64 = base64.b64encode(sig2)
# NOTE: you cannot rely on exact binary patterns of signatures
# produced by libsecp versus python ecdsa, etc. This is because nonces
# may differ. We ran into this when switching from Bitcoin Core libsecp
# to Bitcoin ABC libsecp. Amaury Sechet confirmed this to be true.
# Mark Lundeberg suggested we still do binary exact matches from a set,
# though, just to notice when nonces of the underlying lib change.
# So.. the below test is has been updated to use a set.
# self.assertEqual(sig1_b64, b'H/9jMOnj4MFbH3d7t4yCQ9i7DgZU/VZ278w3+ySv2F4yIsdqjsc5ng3kmN8OZAThgyfCZOQxZCWza9V5XzlVY0Y=')
# self.assertEqual(sig2_b64, b'G84dmJ8TKIDKMT9qBRhpX2sNmR0y5t+POcYnFFJCs66lJmAs3T8A6Sbpx7KA6yTQ9djQMabwQXRrDomOkIKGn18=')
#
# Hardcoded sigs were generated with an old version of Python ECDSA by Electrum team.
# New sigs generated with Bitcoin-ABC libsecp variant. Both verify against each other ok
# They just have different byte patterns we can expect due to different nonces used (both are to spec).
accept_b64_1 = {
# Generated by Electrum ABC 5.0.2 using python-ecdsa
b"IANkoyMsQ9aqJcqAfuPUb1iL1DVzlEzqZKQ+V2DrL6dteQvvsqTWIk5APuVthkXDcOhvZ3fvhaW90ADgxzUyd0Y=",
# Generated by Electrum ABC 5.0.2 using libsecp256k1
b"H24keoFIst9/UaMfvFejNk52pRMK3xGaC784Dz4NC/sOLCSy3y5Jpf7+Pk5BDQuKnOP6+Fr68yD0acMLt3WQXQ8=",
# Generated by Bitcoin ABC 0.23.2
b"ICbt3+l7L3C9ANJp3I2UK9h3i1AyTJeqFadIAhKQUm8gJRw2nKV+eCcaMgzm5couc12Yba6U/6YTmmNWVzaKv+A=",
}
accept_b64_2 = {
# Generated by Electrum ABC 5.0.2 using python-ecdsa
b"G0dN2iKid9zT79uz4RLfw2nDSB1AE2JsmYtzUxM4YXhvQ2iZxFvs9teeExaopgGxwPvadRPmP4oEXTZt4P3Vwic=",
# Generated by Electrum ABC 5.0.2 using libsecp256k1
b"HBF0Y4u7KECCNB8rubGysV0ZFiYMevLhdBrYck7MFgZuNSv+DODii6YQ2HyKyHYsZ7Q6ZRjkaaMXDacdCAoQ63k=",
# Generated by Bitcoin ABC 0.23.2
b"G9jE+8cxPJV9HKeqDh8xgIE+isk/8/3Jf7GNAlfEsEgGa7mdegoAQmBGTtblfQ6v+ciz+xUEubUh9HY5lm+rtZ8=",
}
# does it match with one of our hard-coded sigs in the set?
self.assertTrue(sig1_b64 in accept_b64_1)
self.assertTrue(sig2_b64 in accept_b64_2)
# can it verify its own sigs?
- self.assertTrue(verify_message(addr1, sig1, msg1))
- self.assertTrue(verify_message(addr2, sig2, msg2))
+ self.assertTrue(verify_message_with_address(addr1, sig1, msg1))
+ self.assertTrue(verify_message_with_address(addr2, sig2, msg2))
# Can we verify the hardcoded sigs (this checks that the underlying ECC
# libs basically are ok with either nonce being used)
for sig in accept_b64_1:
- self.assertTrue(verify_message(addr1, base64.b64decode(sig), msg1))
+ self.assertTrue(
+ verify_message_with_address(addr1, base64.b64decode(sig), msg1)
+ )
for sig in accept_b64_2:
- self.assertTrue(verify_message(addr2, base64.b64decode(sig), msg2))
+ self.assertTrue(
+ verify_message_with_address(addr2, base64.b64decode(sig), msg2)
+ )
- self.assertRaises(Exception, verify_message, addr1, b"wrong", msg1)
+ self.assertFalse(verify_message_with_address(addr1, b"wrong", msg1))
# test for bad sigs for a message
- self.assertFalse(verify_message(addr1, sig2, msg1))
- self.assertFalse(verify_message(addr2, sig1, msg2))
+ self.assertFalse(verify_message_with_address(addr1, sig2, msg1))
+ self.assertFalse(verify_message_with_address(addr2, sig1, msg2))
def test_legacy_msg_signing(self):
"""Test that we can use the legacy "Bitcoin Signed Message:\n" message magic."""
msg = b"Chancellor on brink of second bailout for banks"
addr = "15hETetDmcXm1mM4sEf7U2KXC9hDHFMSzz"
txin_type, privkey, compressed = deserialize_privkey(
"L1TnU2zbNaAqMoVh65Cyvmcjzbrj41Gs9iTLcWbpJCMynXuap6UN"
)
key = regenerate_key(privkey)
sig = key.sign_message(msg, compressed, sigtype=SignatureType.BITCOIN)
accepted_signatures = {
# Older core libsecp/python ecdsa nonce produces this deterministic signature
b"H/9jMOnj4MFbH3d7t4yCQ9i7DgZU/VZ278w3+ySv2F4yIsdqjsc5ng3kmN8OZAThgyfCZOQxZCWza9V5XzlVY0Y=",
# New Bitoin ABC libsecp nonce produces this deterministic signature
b"IA+oq/uGz4kKA2bNgxPcM+T216abyUiBhofMg1J8fC5BLAbbIpF2toCHaO7/LQAxhQBtu5D6ROq1JjXiRwPAASg=",
}
self.assertTrue(base64.b64encode(sig) in accepted_signatures)
for sig_ in accepted_signatures:
self.assertTrue(
- verify_message(
+ verify_message_with_address(
address=addr,
- sig=base64.b64decode(sig_),
+ sig65=base64.b64decode(sig_),
message=msg,
sigtype=SignatureType.BITCOIN,
)
)
if __name__ == "__main__":
unittest.main()
diff --git a/electrum/electrumabc_gui/qt/sign_verify_dialog.py b/electrum/electrumabc_gui/qt/sign_verify_dialog.py
index 44cb42cc1..ed341cee7 100644
--- a/electrum/electrumabc_gui/qt/sign_verify_dialog.py
+++ b/electrum/electrumabc_gui/qt/sign_verify_dialog.py
@@ -1,229 +1,231 @@
from __future__ import annotations
import base64
from functools import partial
from typing import TYPE_CHECKING, Optional
from PyQt5.QtCore import Qt
from PyQt5.QtWidgets import (
QDialog,
QFrame,
QGridLayout,
QHBoxLayout,
QLabel,
QLineEdit,
QPushButton,
QRadioButton,
QSizePolicy,
QTextEdit,
QToolButton,
QWidget,
)
from electrumabc.address import Address
from electrumabc.constants import CURRENCY, PROJECT_NAME
-from electrumabc.ecc import SignatureType, verify_message
+from electrumabc.ecc import SignatureType, verify_message_with_address
from electrumabc.i18n import _
from .password_dialog import PasswordDialog
from .util import MessageBoxMixin
if TYPE_CHECKING:
from electrumabc.wallet import AbstractWallet
class CollapsibleSection(QWidget):
def __init__(
self, title: str, content_widget: QWidget, parent: Optional[QWidget] = None
):
super().__init__(parent)
main_layout = QGridLayout(self)
main_layout.setVerticalSpacing(0)
main_layout.setContentsMargins(0, 0, 0, 0)
self.setLayout(main_layout)
self.toggleButton = QToolButton(self)
self.toggleButton.setStyleSheet("QToolButton {border: none;}") # noqa: FS003
self.toggleButton.setToolButtonStyle(Qt.ToolButtonTextBesideIcon)
self.toggleButton.setArrowType(Qt.RightArrow)
self.toggleButton.setText(title)
self.toggleButton.setCheckable(True)
self.toggleButton.setChecked(False)
self.header_line = QFrame(self)
self.header_line.setFrameShape(QFrame.HLine)
self.header_line.setFrameShadow(QFrame.Sunken)
self.header_line.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Maximum)
content_layout = QHBoxLayout()
self.contentFrame = QFrame(self)
self.contentFrame.setFrameShape(QFrame.Box)
self.contentFrame.setFrameShadow(QFrame.Sunken)
self.contentFrame.setLayout(content_layout)
content_layout.addWidget(content_widget)
self.contentFrame.setVisible(False)
main_layout.addWidget(self.toggleButton, 0, 0, 1, 1, Qt.AlignLeft)
main_layout.addWidget(self.header_line, 0, 2, 1, 1)
main_layout.addWidget(self.contentFrame, 1, 0, 1, 3)
self.toggleButton.toggled.connect(self.toggle)
def toggle(self, collapsed: bool):
if collapsed:
self.toggleButton.setArrowType(Qt.DownArrow)
self.contentFrame.setVisible(True)
else:
self.toggleButton.setArrowType(Qt.RightArrow)
self.contentFrame.setVisible(False)
class SignVerifyDialog(QDialog, MessageBoxMixin):
def __init__(
self, wallet: AbstractWallet, address: Optional[Address] = None, parent=None
):
super().__init__(parent)
self.setWindowModality(Qt.WindowModal)
self.setWindowTitle(_("Sign/verify Message"))
self.setMinimumSize(610, 290)
self.wallet = wallet
layout = QGridLayout(self)
self.setLayout(layout)
self.message_e = QTextEdit()
self.message_e.setAcceptRichText(False)
layout.addWidget(QLabel(_("Message")), 1, 0)
layout.addWidget(self.message_e, 1, 1)
layout.setRowStretch(2, 3)
self.address_e = QLineEdit()
self.address_e.setText(address.to_ui_string() if address else "")
layout.addWidget(QLabel(_("Address")), 2, 0)
layout.addWidget(self.address_e, 2, 1)
self.signature_e = QTextEdit()
self.signature_e.setAcceptRichText(False)
layout.addWidget(QLabel(_("Signature")), 3, 0)
layout.addWidget(self.signature_e, 3, 1)
layout.setRowStretch(3, 1)
sigtype_widget = QWidget()
sigtype_layout = QHBoxLayout()
sigtype_widget.setLayout(sigtype_layout)
self.ecash_magic_rb = QRadioButton("eCash signature")
self.ecash_magic_rb.setToolTip(
"New signature scheme introduced in v5.0.2 (incompatible with signatures\n"
"produced with earlier versions). The message is prefixed with 'eCash \n"
"Signed Message:\\n' prior to signing."
)
self.ecash_magic_rb.setChecked(True)
self.bicoin_magic_rb = QRadioButton("Bitcoin signature")
self.bicoin_magic_rb.setToolTip(
"Legacy signature scheme used before v5.0.2. The message is prefixed with\n"
"'Bitcoin Signed Message:\\n' prior to signing."
)
sigtype_layout.addWidget(QLabel("Signature type:"))
sigtype_layout.addWidget(self.ecash_magic_rb)
sigtype_layout.addWidget(self.bicoin_magic_rb)
collapsible_section = CollapsibleSection("Advanced settings", sigtype_widget)
layout.addWidget(collapsible_section, 4, 1)
hbox = QHBoxLayout()
b = QPushButton(_("Sign"))
b.clicked.connect(lambda: self.do_sign())
hbox.addWidget(b)
b = QPushButton(_("Verify"))
b.clicked.connect(lambda: self.do_verify())
hbox.addWidget(b)
b = QPushButton(_("Close"))
b.clicked.connect(self.accept)
hbox.addWidget(b)
layout.addLayout(hbox, 5, 1)
def _get_password(self) -> Optional[str]:
password = None
while self.wallet.has_keystore_encryption():
password = PasswordDialog(self).run()
if password is None:
return
try:
self.wallet.check_password(password)
break
except Exception as e:
self.show_error(str(e))
continue
return password
def do_sign(self):
password = self._get_password()
address = self.address_e.text().strip()
message = self.message_e.toPlainText().strip()
try:
addr = Address.from_string(address)
except Exception:
self.show_message(_(f"Invalid {CURRENCY} address."))
return
if addr.kind != addr.ADDR_P2PKH:
msg_sign = (
_(
"Signing with an address actually means signing with the"
" corresponding private key, and verifying with the corresponding"
" public key. The address you have entered does not have a unique"
" public key, so these operations cannot be performed."
)
+ "\n\n"
+ _(
"The operation is undefined. Not just in "
f"{PROJECT_NAME}, but in general."
)
)
self.show_message(
_("Cannot sign messages with this type of address.") + "\n\n" + msg_sign
)
return
if self.wallet.is_watching_only():
self.show_message(_("This is a watching-only wallet."))
return
if not self.wallet.is_mine(addr):
self.show_message(_("Address not in wallet."))
return
task = partial(
self.wallet.sign_message, addr, message, password, self.get_sigtype()
)
def show_signed_message(sig):
self.signature_e.setText(base64.b64encode(sig).decode("ascii"))
self.wallet.thread.add(task, on_success=show_signed_message)
def do_verify(self):
try:
address = Address.from_string(self.address_e.text().strip())
except Exception:
self.show_message(_(f"Invalid {CURRENCY} address."))
return
message = self.message_e.toPlainText().strip().encode("utf-8")
try:
# This can throw on invalid base64
sig = base64.b64decode(self.signature_e.toPlainText())
- verified = verify_message(address, sig, message, sigtype=self.get_sigtype())
+ verified = verify_message_with_address(
+ address, sig, message, sigtype=self.get_sigtype()
+ )
except Exception:
verified = False
if verified:
self.show_message(_("Signature verified"))
else:
self.show_error(_("Wrong signature"))
def get_sigtype(self) -> SignatureType:
return (
SignatureType.ECASH
if self.ecash_magic_rb.isChecked()
else SignatureType.BITCOIN
)
diff --git a/electrum/electrumabc_gui/qt/update_checker.py b/electrum/electrumabc_gui/qt/update_checker.py
index 81d484bf1..b15250589 100644
--- a/electrum/electrumabc_gui/qt/update_checker.py
+++ b/electrum/electrumabc_gui/qt/update_checker.py
@@ -1,462 +1,448 @@
##!/usr/bin/env python3
#
# Electrum ABC - lightweight eCash client
# Copyright (C) 2020 The Electrum ABC developers
# Copyright (C) 2015 Thomas Voegtlin
#
# Electron Cash - lightweight Bitcoin Cash Client
# Copyright (C) 2019-2020 The Electron Cash developers
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import base64
import json
import os.path
import sys
import threading
import time
import requests
from PyQt5 import QtWidgets
from PyQt5.QtCore import Qt, pyqtSignal
from electrumabc import address, version
from electrumabc.constants import PROJECT_NAME, RELEASES_JSON_URL
-from electrumabc.ecc import verify_message
+from electrumabc.ecc import verify_message_with_address
from electrumabc.i18n import _
from electrumabc.networks import MainNet
from electrumabc.printerror import PrintError, print_error
from electrumabc.simple_config import SimpleConfig
from .util import Buttons
class UpdateChecker(QtWidgets.QWidget, PrintError):
"""A window that checks for updates.
If ok, and a new version is detected, will present the hard-coded download
URL in the GUI.
If ok, and we are on the latest version, will present a message to that
effect.
If it can't verify the response or can't talk on network, will present a
generic error message.
Update data is expected to be JSON with a bunch of signed version strings.
see self._process_server_reply below for an example.
"""
# Note: it's guaranteed that every call to do_check() will either result
# in a 'checked' signal or a 'failed' signal to be emitted.
# got_new_version is only emitted if the new version is actually newer than
# our version.
checked = pyqtSignal(object)
"""emitted whenever the server gave us a (properly signed) version string.
This may or may not mean it's a new version."""
got_new_version = pyqtSignal(object)
"""emitted in tandem with 'checked' above ONLY if the server gave us a
(properly signed) version string we recognize as *newer*"""
failed = pyqtSignal()
"""emitted when there is an exception, network error, or verify error
on version check."""
_req_finished = pyqtSignal(object)
"""internal use by _Req thread"""
_dl_prog = pyqtSignal(object, int)
"""[0 -> 100] range"""
# Release URL
download_url = "https://www.bitcoinabc.org/electrum"
VERSION_ANNOUNCEMENT_SIGNING_ADDRESSES = (
# Pierre's keys
address.Address.from_string(
"ecash:qz5j83ez703wvlwpqh94j6t45f8dn2afjgtgurgua0", net=MainNet
),
)
def __init__(self, config: SimpleConfig, parent=None):
super().__init__(parent)
self.is_test_run = config.get("test_release_notification", False)
self.setWindowTitle(f"{PROJECT_NAME} - " + _("Update Checker"))
self.content = QtWidgets.QVBoxLayout()
self.content.setContentsMargins(*([10] * 4))
self.heading_label = QtWidgets.QLabel()
self.content.addWidget(self.heading_label)
self.detail_label = QtWidgets.QLabel()
self.detail_label.setTextInteractionFlags(Qt.LinksAccessibleByMouse)
self.detail_label.setOpenExternalLinks(True)
self.detail_label.setWordWrap(True)
self.content.addWidget(self.detail_label)
self.pb = QtWidgets.QProgressBar()
self.pb.setMaximum(100)
self.pb.setMinimum(0)
self.content.addWidget(self.pb)
versions = QtWidgets.QHBoxLayout()
current_version_message = (
_(f"Current version: {version.PACKAGE_VERSION}")
if not self.is_test_run
else "Testing release notification"
)
versions.addWidget(QtWidgets.QLabel(current_version_message))
self.latest_version_label = QtWidgets.QLabel(_(f"Latest version: {' '}"))
versions.addWidget(self.latest_version_label)
self.content.addLayout(versions)
close_button = QtWidgets.QPushButton(_("Close"))
close_button.clicked.connect(self.close)
self.cancel_or_check_button = QtWidgets.QPushButton(_("Cancel"))
self.cancel_or_check_button.clicked.connect(self.cancel_or_check)
self.content.addLayout(Buttons(self.cancel_or_check_button, close_button))
grid = QtWidgets.QGridLayout()
grid.addLayout(self.content, 0, 0)
self.setLayout(grid)
self._req_finished.connect(self._on_req_finished)
self._dl_prog.connect(self._on_downloading)
self.active_req = None
self.last_checked_ts = 0.0
self.resize(450, 200)
def _process_server_reply(self, signed_version_dict):
"""Returns:
- the new package version string if new version found from
server, e.g. '3.3.5', '3.3.5CS', etc
- or the current version (version.PACKAGE_VERSION) if no new
version found.
- None on failure (such as bad signature).
May also raise on error.
"""
# example signed_version_dict:
# {
# "3.9.9": {
# "bitcoincash:qphax4cg8sxuc0qnzk6sx25939ma7y877uz04s2z82": "IA+2QG3xPRn4HAIFdpu9eeaCYC7S5wS/sDxn54LJx6BdUTBpse3ibtfq8C43M7M1VfpGkD5tsdwl5C6IfpZD/gQ="
# },
# "3.9.9CS": {
# "bitcoincash:qphax4cg8sxuc0qnzk6sx25939ma7y877uz04s2z82": "IA+2QG3xPRn4HAIFdpu9eeaCYC7S5wS/sDxn54LJx6BdUTBpse3ibtfq8C43M7M1VfpGkD5tsdwl5C6IfpZD/gQ="
# },
# "3.9.9SLP": {
# "bitcoincash:qphax4cg8sxuc0qnzk6sx25939ma7y877uz04s2z82": "IA+2QG3xPRn4HAIFdpu9eeaCYC7S5wS/sDxn54LJx6BdUTBpse3ibtfq8C43M7M1VfpGkD5tsdwl5C6IfpZD/gQ="
# },
# }
# All signed messages above are signed with the address in the dict,
# and the message is the "3.9.9" or "3.9.9CS" etc string
ct_matching = 0
for version_msg, sigdict in signed_version_dict.items():
# This looks quadratic, and it is. But the expected results are small.
# We needed to do it this way to detect when there was no matching
# variant and/or no known-key match.
if self.is_matching_variant(version_msg):
for adr, sig in sigdict.items():
# may raise
adr = address.Address.from_string(adr, net=MainNet)
if adr in self.VERSION_ANNOUNCEMENT_SIGNING_ADDRESSES:
ct_matching += 1
# may raise
if not self.is_newer(version_msg):
continue
try:
- is_verified = verify_message(
+ is_verified = verify_message_with_address(
adr,
base64.b64decode(sig),
version_msg.encode("utf-8"),
net=MainNet,
)
except Exception:
- # temporary: try the legacy verification algorithm
- # because we will need to sign the old way for a
- # couple of releases, for old releases to verify
- # the message.
- # TODO: remove after two new releases past 5.0.1
- try:
- is_verified = verify_message(
- adr,
- base64.b64decode(sig),
- version_msg.encode("utf-8"),
- net=MainNet,
- legacy=True,
- )
- except Exception:
- self.print_error(
- "Exception when verifying version signature for",
- version_msg,
- ":",
- repr(sys.exc_info()[1]),
- )
- return None
+ self.print_error(
+ "Exception when verifying version signature for",
+ version_msg,
+ ":",
+ repr(sys.exc_info()[1]),
+ )
+ return None
if is_verified:
self.print_error("Got new version", version_msg)
return version_msg.strip()
else:
self.print_error(
"Got new version", version_msg, "but sigcheck failed!"
)
return None
if 0 == ct_matching:
# Hmm. None of the versions we saw matched our variant.
# And/Or, none of the keys we saw matched keys we knew about.
# This is an error condition, so return None
self.print_error(
"Error: Got a valid reply from server but none of the variants"
" match us and/or none of the signing keys are known!"
)
return None
return version.PACKAGE_VERSION
def _my_version(self):
if self.is_test_run:
# Return a lower version to always trigger the notification
return 0, 0, 0, ""
if getattr(self, "_my_version_parsed", None) is None:
self._my_version_parsed = version.parse_package_version(
version.PACKAGE_VERSION
)
return self._my_version_parsed
@classmethod
def _parse_version(cls, version_msg):
try:
return version.parse_package_version(version_msg)
except Exception:
print_error(
"[{}] Error parsing version '{}': {}".format(
cls.__name__, version_msg, repr(sys.exc_info()[1])
)
)
raise
def is_matching_variant(self, version_msg, *, return_parsed=False):
parsed_version = self._parse_version(version_msg)
me = self._my_version()
# last element of tuple is always a string, the 'variant'
# (may be '' for EC Regular)
ret = me[-1] == parsed_version[-1]
if return_parsed:
return ret, parsed_version
return ret
def is_newer(self, version_msg):
yes, parsed = self.is_matching_variant(version_msg, return_parsed=True)
# make sure it's the same variant as us eg SLP, CS, '' regular, etc..
if yes:
v_me = self._my_version()[:-1]
v_server = parsed[:-1]
return v_server > v_me
return False
def _on_downloading(self, req, prog):
if req is self.active_req:
prog = int(prog or 0)
self.print_error("Downloading progress", str(prog) + "%", "from", req.url)
self.pb.setValue(max(0, min(int(prog), 100)))
else:
self.print_error(
"Warning: on_downloading called with a req that is not 'active'!"
)
def _update_view(self, latest_version=None):
if latest_version == self._error_val:
self.heading_label.setText("<h2>" + _("Update check failed") + "</h2>")
self.detail_label.setText(
_(
"Sorry, but we were unable to check for updates. "
"Please try again later."
)
)
self.cancel_or_check_button.setText(_("Check Again"))
self.cancel_or_check_button.setEnabled(True)
self.pb.hide()
elif latest_version:
self.pb.hide()
self.cancel_or_check_button.setText(_("Check Again"))
self.latest_version_label.setText(
_(f"Latest version: {'<b>' + latest_version + '</b>'}")
)
if self.is_newer(latest_version):
self.heading_label.setText(
"<h2>" + _("There is a new update available") + "</h2>"
)
url = '<a href="{u}">{u}</a>'.format(u=UpdateChecker.download_url)
self.detail_label.setText(
_(f"You can download the new version from:<br>{url}")
)
self.cancel_or_check_button.setEnabled(False)
else:
self.heading_label.setText("<h2>" + _("Already up to date") + "</h2>")
self.detail_label.setText(
_(f"You are already on the latest version of {PROJECT_NAME}.")
)
self.cancel_or_check_button.setEnabled(True)
else:
self.pb.show()
self.pb.setValue(0)
self.cancel_or_check_button.setText(_("Cancel"))
self.cancel_or_check_button.setEnabled(True)
self.latest_version_label.setText("")
self.heading_label.setText("<h2>" + _("Checking for updates...") + "</h2>")
self.detail_label.setText(
_(f"Please wait while {PROJECT_NAME} checks for available updates.")
)
def cancel_active(self):
if self.active_req:
self.active_req.abort()
self.active_req = None
self._err_fail()
def cancel_or_check(self):
if self.active_req:
self.cancel_active()
else:
self.do_check(force=True)
# Note: calls to do_check() will either result in a 'checked' signal or
# a 'failed' signal to be emitted (and possibly also 'got_new_version')
def do_check(self, force=False):
if force:
# no-op if none active
self.cancel_active()
if not self.active_req:
self.last_checked_ts = time.time()
self._update_view()
self.active_req = _Req(self, self.is_test_run)
def did_check_recently(self, secs=10.0):
return time.time() - self.last_checked_ts < secs
_error_val = 0xDEADB33F
def _err_fail(self):
self._update_view(self._error_val)
self.failed.emit()
def _ok_good(self, newver):
# NB: below 'newver' may actually just be our version or a version
# before our version (in case we are on a development build).
# Client code should check with this class.is_newer if the emitted
# version is actually newer.
self._update_view(newver)
self.checked.emit(newver)
if self.is_newer(newver):
self.got_new_version.emit(newver)
def _got_reply(self, req):
newver = None
if not req.aborted and req.json:
try:
newver = self._process_server_reply(req.json)
except Exception:
import traceback
self.print_error(traceback.format_exc())
if newver is not None:
self._ok_good(newver)
else:
self._err_fail()
def _on_req_finished(self, req):
adjective = ""
if req is self.active_req:
self._got_reply(req)
self.active_req = None
adjective = "Active"
if req.aborted:
adjective = "Aborted"
self.print_error(f"{adjective}", req.diagnostic_name(), "finished")
class _Req(threading.Thread, PrintError):
"""Thread to get the list of releases from a JSON file on the github
repository.
"""
url = RELEASES_JSON_URL
local_path = os.path.join(
os.path.dirname(__file__),
"..",
"..",
"contrib",
"update_checker",
"releases.json",
)
def __init__(self, checker, is_test_run: bool):
super().__init__(daemon=True)
self.checker = checker
self.aborted = False
self.json = None
self.is_test_run = is_test_run
try:
self.start()
except RuntimeError:
# If the user hits a system limitation on the number of threads,
# ignore the error.
self.aborted = True
self.checker._req_finished.emit(self)
def abort(self):
self.aborted = True
def diagnostic_name(self):
return f"{__class__.__name__}@{id(self) & 0xffff}"
def run(self):
self.checker._dl_prog.emit(self, 10)
try:
source = self.url if not self.is_test_run else self.local_path
self.print_error("Requesting from", source, "...")
self.json, self.url = self._do_request(self.url)
self.checker._dl_prog.emit(self, 100)
except Exception:
self.checker._dl_prog.emit(self, 25)
import traceback
self.print_error(traceback.format_exc())
finally:
self.checker._req_finished.emit(self)
def _do_request(self, url):
if self.is_test_run:
# Fetch the data in the local repository to test the signature
if not os.path.isfile(self.local_path):
raise RuntimeError(
f"{self.local_path} file not found. Did you not run the application"
" from sources?"
)
with open(self.local_path, "r", encoding="utf-8") as f:
json_data = json.loads(f.read())
self.print_msg(json_data)
return json_data, self.url
# will raise requests.exceptions.Timeout on timeout
response = requests.get(url, allow_redirects=True, timeout=30.0)
if response.status_code != 200:
raise RuntimeError(response.status_code, response.text)
self.print_error(f"got response {len(response.text)} bytes")
return response.json(), response.url
diff --git a/electrum/electrumabc_plugins/digitalbitbox/digitalbitbox.py b/electrum/electrumabc_plugins/digitalbitbox/digitalbitbox.py
index 7e2d24b5f..4c6eb0d4d 100644
--- a/electrum/electrumabc_plugins/digitalbitbox/digitalbitbox.py
+++ b/electrum/electrumabc_plugins/digitalbitbox/digitalbitbox.py
@@ -1,890 +1,890 @@
# ----------------------------------------------------------------------------------
# Electrum plugin for the Digital Bitbox hardware wallet by Shift Devices AG
# digitalbitbox.com
#
try:
import base64
import binascii
import hashlib
import hmac
import json
import math
import os
import re
import struct
import sys
import time
import hid
import requests
from ecdsa.ecdsa import generator_secp256k1
from ecdsa.util import sigencode_der
from electrumabc.base_wizard import HWD_SETUP_NEW_WALLET
from electrumabc.bitcoin import (
ScriptType,
hmac_oneshot,
public_key_to_p2pkh,
push_script,
)
from electrumabc.crypto import (
DecodeAES_bytes,
EncodeAES_base64,
EncodeAES_bytes,
Hash,
)
from electrumabc.ecc import (
ECPubkey,
SignatureType,
msg_magic,
point_to_ser,
pubkey_from_signature,
- verify_message,
+ verify_message_with_address,
)
from electrumabc.i18n import _
from electrumabc.keystore import HardwareKeyStore
from electrumabc.printerror import print_error
from electrumabc.transaction import Transaction
from electrumabc.util import UserCancelled, to_string
from ..hw_wallet import HardwareClientBase, HWPluginBase
DIGIBOX = True
except ImportError:
DIGIBOX = False
# ----------------------------------------------------------------------------------
# USB HID interface
#
def to_hexstr(s):
return binascii.hexlify(s).decode("ascii")
def derive_keys(x):
h = Hash(x)
h = hashlib.sha512(h).digest()
return (h[:32], h[32:])
MIN_MAJOR_VERSION = 5
class DigitalBitboxClient(HardwareClientBase):
def __init__(self, plugin, hidDevice):
HardwareClientBase.__init__(self, plugin=plugin)
self.dbb_hid = hidDevice
self.opened = True
self.password = None
self.isInitialized = False
self.setupRunning = False
self.usbReportSize = 64 # firmware > v2.0.0
def close(self):
if self.opened:
with self.device_manager().hid_lock:
try:
self.dbb_hid.close()
except Exception:
pass
self.opened = False
def timeout(self, cutoff):
pass
def is_pairable(self):
return True
def is_initialized(self):
return self.dbb_has_password()
def is_paired(self):
return self.password is not None
def has_usable_connection_with_device(self):
try:
self.dbb_has_password()
except Exception:
return False
return True
def _get_xpub(self, bip32_path):
if self.check_device_dialog():
return self.hid_send_encrypt(b'{"xpub": "%s"}' % bip32_path.encode("utf8"))
def get_xpub(self, bip32_path, xtype):
assert xtype == "standard"
reply = self._get_xpub(bip32_path)
if reply:
xpub = reply["xpub"]
return xpub
else:
raise RuntimeError("no reply")
def dbb_has_password(self):
reply = self.hid_send_plain(b'{"ping":""}')
if "ping" not in reply:
raise Exception(
_(
"Device communication error. Please unplug and replug your Digital"
" Bitbox."
)
)
if reply["ping"] == "password":
return True
return False
def stretch_key(self, key):
return binascii.hexlify(
hashlib.pbkdf2_hmac(
"sha512", key.encode("utf-8"), b"Digital Bitbox", iterations=20480
)
)
def backup_password_dialog(self):
msg = _("Enter the password used when the backup was created:")
while True:
password = self.handler.get_passphrase(msg, False)
if password is None:
return None
if len(password) < 4:
msg = (
_("Password must have at least 4 characters.")
+ "\n\n"
+ _("Enter password:")
)
elif len(password) > 64:
msg = (
_("Password must have less than 64 characters.")
+ "\n\n"
+ _("Enter password:")
)
else:
return password.encode("utf8")
def password_dialog(self, msg):
while True:
password = self.handler.get_passphrase(msg, False)
if password is None:
return False
if len(password) < 4:
msg = (
_("Password must have at least 4 characters.")
+ "\n\n"
+ _("Enter password:")
)
elif len(password) > 64:
msg = (
_("Password must have less than 64 characters.")
+ "\n\n"
+ _("Enter password:")
)
else:
self.password = password.encode("utf8")
return True
def check_device_dialog(self):
# Check device firmware version
match = re.search(
r"v([0-9])+\.[0-9]+\.[0-9]+", self.dbb_hid.get_serial_number_string()
)
if match is None:
raise Exception("error detecting firmware version")
major_version = int(match.group(1))
if major_version < MIN_MAJOR_VERSION:
raise Exception(
"Please upgrade to the newest firmware using the BitBox Desktop app:"
" https://shiftcrypto.ch/start"
)
# Set password if fresh device
if self.password is None and not self.dbb_has_password():
if not self.setupRunning:
return False # A fresh device cannot connect to an existing wallet
msg = (
_("An uninitialized Digital Bitbox is detected.")
+ " "
+ _("Enter a new password below.")
+ "\n\n"
+ _("REMEMBER THE PASSWORD!")
+ "\n\n"
+ _("You cannot access your coins or a backup without the password.")
+ "\n"
+ _("A backup is saved automatically when generating a new wallet.")
)
if self.password_dialog(msg):
reply = self.hid_send_plain(b'{"password":"' + self.password + b'"}')
else:
return False
# Get password from user if not yet set
msg = _("Enter your Digital Bitbox password:")
while self.password is None:
if not self.password_dialog(msg):
return False
reply = self.hid_send_encrypt(b'{"led":"blink"}')
if "error" in reply:
self.password = None
if reply["error"]["code"] == 109:
msg = (
_("Incorrect password entered.")
+ "\n\n"
+ reply["error"]["message"]
+ "\n\n"
+ _("Enter your Digital Bitbox password:")
)
else:
# Should never occur
msg = (
_("Unexpected error occurred.")
+ "\n\n"
+ reply["error"]["message"]
+ "\n\n"
+ _("Enter your Digital Bitbox password:")
)
# Initialize device if not yet initialized
if not self.setupRunning:
self.isInitialized = True # Wallet exists. Electrum code later checks if the device matches the wallet
elif not self.isInitialized:
reply = self.hid_send_encrypt(b'{"device":"info"}')
if reply["device"]["id"] != "":
self.recover_or_erase_dialog() # Already seeded
else:
self.seed_device_dialog() # Seed if not initialized
self.mobile_pairing_dialog()
return self.isInitialized
def recover_or_erase_dialog(self):
msg = _("The Digital Bitbox is already seeded. Choose an option:") + "\n"
choices = [
(_("Create a wallet using the current seed")),
(
_(
"Load a wallet from the micro SD card (the current seed is"
" overwritten)"
)
),
(_("Erase the Digital Bitbox")),
]
try:
reply = self.handler.win.query_choice(msg, choices)
except Exception:
return # Back button pushed
if reply == 2:
self.dbb_erase()
elif reply == 1:
if not self.dbb_load_backup():
return
else:
if self.hid_send_encrypt(b'{"device":"info"}')["device"]["lock"]:
raise Exception(_("Full 2FA enabled. This is not supported yet."))
# Use existing seed
self.isInitialized = True
def seed_device_dialog(self):
msg = _("Choose how to initialize your Digital Bitbox:") + "\n"
choices = [
(_("Generate a new random wallet")),
(_("Load a wallet from the micro SD card")),
]
try:
reply = self.handler.win.query_choice(msg, choices)
except Exception:
return # Back button pushed
if reply == 0:
self.dbb_generate_wallet()
else:
if not self.dbb_load_backup(show_msg=False):
return
self.isInitialized = True
def mobile_pairing_dialog(self):
dbb_user_dir = None
if sys.platform == "darwin":
dbb_user_dir = os.path.join(
os.environ.get("HOME", ""), "Library", "Application Support", "DBB"
)
elif sys.platform == "win32":
dbb_user_dir = os.path.join(os.environ["APPDATA"], "DBB")
else:
dbb_user_dir = os.path.join(os.environ["HOME"], ".dbb")
if not dbb_user_dir:
return
try:
with open(os.path.join(dbb_user_dir, "config.dat"), encoding="utf-8") as f:
dbb_config = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return
if (
"encryptionprivkey" not in dbb_config
or "comserverchannelid" not in dbb_config
):
return
choices = [
_("Do not pair"),
_("Import pairing from the Digital Bitbox desktop app"),
]
try:
reply = self.handler.win.query_choice(_("Mobile pairing options"), choices)
except Exception:
return # Back button pushed
if reply == 0:
if self.plugin.is_mobile_paired():
del self.plugin.digitalbitbox_config["encryptionprivkey"]
del self.plugin.digitalbitbox_config["comserverchannelid"]
elif reply == 1:
# import pairing from dbb app
self.plugin.digitalbitbox_config["encryptionprivkey"] = dbb_config[
"encryptionprivkey"
]
self.plugin.digitalbitbox_config["comserverchannelid"] = dbb_config[
"comserverchannelid"
]
self.plugin.config.set_key("digitalbitbox", self.plugin.digitalbitbox_config)
def dbb_generate_wallet(self):
key = self.stretch_key(self.password)
filename = ("Electrum-" + time.strftime("%Y-%m-%d-%H-%M-%S") + ".pdf").encode(
"utf8"
)
msg = (
b'{"seed":{"source": "create", "key": "%s", "filename": "%s", "entropy":'
b' "%s"}}' % (key, filename, to_hexstr(os.urandom(32)))
)
reply = self.hid_send_encrypt(msg)
if "error" in reply:
raise Exception(reply["error"]["message"])
def dbb_erase(self):
self.handler.show_message(
_("Are you sure you want to erase the Digital Bitbox?")
+ "\n\n"
+ _("To continue, touch the Digital Bitbox's light for 3 seconds.")
+ "\n\n"
+ _("To cancel, briefly touch the light or wait for the timeout.")
)
hid_reply = self.hid_send_encrypt(b'{"reset":"__ERASE__"}')
self.handler.finished()
if "error" in hid_reply:
raise Exception(hid_reply["error"]["message"])
else:
self.password = None
raise Exception("Device erased")
def dbb_load_backup(self, show_msg=True):
backups = self.hid_send_encrypt(b'{"backup":"list"}')
if "error" in backups:
raise Exception(backups["error"]["message"])
try:
f = self.handler.win.query_choice(
_("Choose a backup file:"), backups["backup"]
)
except Exception:
return False # Back button pushed
key = self.backup_password_dialog()
if key is None:
raise Exception("Canceled by user")
key = self.stretch_key(key)
if show_msg:
self.handler.show_message(
_("Loading backup...")
+ "\n\n"
+ _("To continue, touch the Digital Bitbox's light for 3 seconds.")
+ "\n\n"
+ _("To cancel, briefly touch the light or wait for the timeout.")
)
msg = (
'{"seed":{"source": "backup", "key": "%s", "filename": "%s"}}'
% (key, backups["backup"][f])
).encode("utf8")
hid_reply = self.hid_send_encrypt(msg)
self.handler.finished()
if "error" in hid_reply:
raise Exception(hid_reply["error"]["message"])
return True
def hid_send_frame(self, data):
HWW_CID = 0xFF000000
HWW_CMD = 0x80 + 0x40 + 0x01
data_len = len(data)
seq = 0
idx = 0
write = []
while idx < data_len:
if idx == 0:
# INIT frame
write = data[idx : idx + min(data_len, self.usbReportSize - 7)]
self.dbb_hid.write(
b"\0"
+ struct.pack(">IBH", HWW_CID, HWW_CMD, data_len & 0xFFFF)
+ write
+ b"\xee" * (self.usbReportSize - 7 - len(write))
)
else:
# CONT frame
write = data[idx : idx + min(data_len, self.usbReportSize - 5)]
self.dbb_hid.write(
b"\0"
+ struct.pack(">IB", HWW_CID, seq)
+ write
+ b"\xee" * (self.usbReportSize - 5 - len(write))
)
seq += 1
idx += len(write)
def hid_read_frame(self):
# INIT response
read = bytearray(self.dbb_hid.read(self.usbReportSize))
# cid = ((read[0] * 256 + read[1]) * 256 + read[2]) * 256 + read[3]
# cmd = read[4]
data_len = read[5] * 256 + read[6]
data = read[7:]
idx = len(read) - 7
while idx < data_len:
# CONT response
read = bytearray(self.dbb_hid.read(self.usbReportSize))
data += read[5:]
idx += len(read) - 5
return data
def hid_send_plain(self, msg):
reply = ""
try:
serial_number = self.dbb_hid.get_serial_number_string()
if "v2.0." in serial_number or "v1." in serial_number:
hidBufSize = 4096
self.dbb_hid.write("\0" + msg + "\0" * (hidBufSize - len(msg)))
r = bytearray()
while len(r) < hidBufSize:
r += bytearray(self.dbb_hid.read(hidBufSize))
else:
self.hid_send_frame(msg)
r = self.hid_read_frame()
r = r.rstrip(b" \t\r\n\0")
r = r.replace(b"\0", b"")
r = to_string(r, "utf8")
reply = json.loads(r)
except Exception as e:
print_error("Exception caught " + str(e))
return reply
def hid_send_encrypt(self, msg):
sha256_byte_len = 32
reply = ""
try:
encryption_key, authentication_key = derive_keys(self.password)
msg = EncodeAES_bytes(encryption_key, msg)
hmac_digest = hmac_oneshot(authentication_key, msg, hashlib.sha256)
authenticated_msg = base64.b64encode(msg + hmac_digest)
reply = self.hid_send_plain(authenticated_msg)
if "ciphertext" in reply:
b64_unencoded = bytes(base64.b64decode("".join(reply["ciphertext"])))
reply_hmac = b64_unencoded[-sha256_byte_len:]
hmac_calculated = hmac_oneshot(
authentication_key, b64_unencoded[:-sha256_byte_len], hashlib.sha256
)
if not hmac.compare_digest(reply_hmac, hmac_calculated):
raise Exception("Failed to validate HMAC")
reply = DecodeAES_bytes(
encryption_key, b64_unencoded[:-sha256_byte_len]
)
reply = to_string(reply, "utf8")
reply = json.loads(reply)
if "error" in reply:
self.password = None
except Exception as e:
print_error("Exception caught " + str(e))
return reply
# ----------------------------------------------------------------------------------
#
#
class DigitalBitboxKeyStore(HardwareKeyStore):
hw_type = "digitalbitbox"
device = "DigitalBitbox"
def __init__(self, d):
HardwareKeyStore.__init__(self, d)
self.force_watching_only = False
self.maxInputs = 14 # maximum inputs per single sign command
def get_derivation(self):
return str(self.derivation)
def is_p2pkh(self):
return self.derivation.startswith("m/44'/")
def give_error(self, message, clear_client=False):
if clear_client:
self.client = None
raise Exception(message)
def decrypt_message(self, pubkey, message, password):
raise RuntimeError(
_("Encryption and decryption are currently not supported for {}").format(
self.device
)
)
def sign_message(self, sequence, message, password, sigtype=SignatureType.BITCOIN):
if sigtype == SignatureType.ECASH:
raise RuntimeError(
_("eCash message signing is not available for {}").format(self.device)
)
sig = None
try:
message = message.encode("utf8")
inputPath = self.get_derivation() + "/%d/%d" % sequence
msg_hash = Hash(msg_magic(message))
inputHash = to_hexstr(msg_hash)
hasharray = []
hasharray.append({"hash": inputHash, "keypath": inputPath})
hasharray = json.dumps(hasharray)
msg = b'{"sign":{"meta":"sign message", "data":%s}}' % hasharray.encode(
"utf8"
)
dbb_client = self.plugin.get_client(self)
if not dbb_client.is_paired():
raise Exception(_("Could not sign message."))
reply = dbb_client.hid_send_encrypt(msg)
self.handler.show_message(
_("Signing message ...")
+ "\n\n"
+ _(
"To continue, touch the Digital Bitbox's blinking light for 3"
" seconds."
)
+ "\n\n"
+ _(
"To cancel, briefly touch the blinking light or wait for the"
" timeout."
)
)
reply = dbb_client.hid_send_encrypt(
msg
) # Send twice, first returns an echo for smart verification (not implemented)
self.handler.finished()
if "error" in reply:
raise Exception(reply["error"]["message"])
if "sign" not in reply:
raise Exception(_("Could not sign message."))
if "recid" in reply["sign"][0]:
# firmware > v2.1.1
sig = bytes(
[27 + int(reply["sign"][0]["recid"], 16) + 4]
) + binascii.unhexlify(reply["sign"][0]["sig"])
pk, compressed = pubkey_from_signature(sig, msg_hash)
pk = point_to_ser(pk.pubkey.point, compressed)
addr = public_key_to_p2pkh(pk)
- if verify_message(addr, sig, message) is False:
+ if verify_message_with_address(addr, sig, message) is False:
raise Exception(_("Could not sign message"))
elif "pubkey" in reply["sign"][0]:
# firmware <= v2.1.1
for i in range(4):
sig = bytes([27 + i + 4]) + binascii.unhexlify(
reply["sign"][0]["sig"]
)
try:
addr = public_key_to_p2pkh(
binascii.unhexlify(reply["sign"][0]["pubkey"])
)
- if verify_message(addr, sig, message):
+ if verify_message_with_address(addr, sig, message):
break
except Exception:
continue
else:
raise Exception(_("Could not sign message"))
except Exception as e:
self.give_error(e)
return sig
def sign_transaction(self, tx, password, *, use_cache=False):
if tx.is_complete():
return
try:
p2pkhTransaction = True
derivations = self.get_tx_derivations(tx)
inputhasharray = []
hasharray = []
pubkeyarray = []
# Build hasharray from inputs
for i, txin in enumerate(tx.txinputs()):
if txin.type == ScriptType.coinbase:
# should never happen
self.give_error("Coinbase not supported")
if txin.type != ScriptType.p2pkh:
p2pkhTransaction = False
for x_pubkey in txin.x_pubkeys:
if x_pubkey in derivations:
index = derivations.get(x_pubkey)
inputPath = "%s/%d/%d" % (
self.get_derivation(),
index[0],
index[1],
)
inputHash = Hash(tx.serialize_preimage(i))
hasharray_i = {
"hash": to_hexstr(inputHash),
"keypath": inputPath,
}
hasharray.append(hasharray_i)
inputhasharray.append(inputHash)
break
else:
self.give_error(
"No matching x_key for sign_transaction"
) # should never happen
# Build pubkeyarray from outputs
for o in tx.outputs():
info = tx.output_info.get(o.destination)
if info is not None:
index, xpubs, m, script_type = info
changePath = self.get_derivation() + "/%d/%d" % index
changePubkey = self.derive_pubkey(index[0], index[1])
pubkeyarray_i = {"pubkey": changePubkey, "keypath": changePath}
pubkeyarray.append(pubkeyarray_i)
# Special serialization of the unsigned transaction for
# the mobile verification app.
# At the moment, verification only works for p2pkh transactions.
if p2pkhTransaction:
class CustomTXSerialization(Transaction):
# fixme: Transaction.serialize no longer calls input_script,
# so this overloading does not affect
# CustomTXSerialization.serialize()
@classmethod
def input_script(
self, txin, estimate_size=False, sign_schnorr=False
):
if txin["type"] == "p2pkh":
return Transaction.get_preimage_script(txin)
if txin["type"] == "p2sh":
# Multisig verification has partial support, but is disabled. This is the
# expected serialization though, so we leave it here until we activate it.
return "00" + push_script(
Transaction.get_preimage_script(txin)
)
raise Exception("unsupported type %s" % txin["type"])
tx_dbb_serialized = (
CustomTXSerialization(tx.serialize()).serialize().hex()
)
else:
# We only need this for the signing echo / verification.
tx_dbb_serialized = None
# Build sign command
dbb_signatures = []
steps = math.ceil(1.0 * len(hasharray) / self.maxInputs)
for step in range(int(steps)):
hashes = hasharray[step * self.maxInputs : (step + 1) * self.maxInputs]
msg = {
"sign": {
"data": hashes,
"checkpub": pubkeyarray,
},
}
if tx_dbb_serialized is not None:
msg["sign"]["meta"] = to_hexstr(Hash(tx_dbb_serialized))
msg = json.dumps(msg).encode("ascii")
dbb_client = self.plugin.get_client(self)
if not dbb_client.is_paired():
raise Exception("Could not sign transaction.")
reply = dbb_client.hid_send_encrypt(msg)
if "error" in reply:
raise Exception(reply["error"]["message"])
if "echo" not in reply:
raise Exception("Could not sign transaction.")
if self.plugin.is_mobile_paired() and tx_dbb_serialized is not None:
reply["tx"] = tx_dbb_serialized.hex()
self.plugin.comserver_post_notification(reply)
if steps > 1:
self.handler.show_message(
_("Signing large transaction. Please be patient ...")
+ "\n\n"
+ _(
"To continue, touch the Digital Bitbox's blinking light for"
" 3 seconds."
)
+ " "
+ _("(Touch {} of {})").format((step + 1), steps)
+ "\n\n"
+ _(
"To cancel, briefly touch the blinking light or wait for"
" the timeout."
)
+ "\n\n"
)
else:
self.handler.show_message(
_("Signing transaction...")
+ "\n\n"
+ _(
"To continue, touch the Digital Bitbox's blinking light for"
" 3 seconds."
)
+ "\n\n"
+ _(
"To cancel, briefly touch the blinking light or wait for"
" the timeout."
)
)
# Send twice, first returns an echo for smart verification
reply = dbb_client.hid_send_encrypt(msg)
self.handler.finished()
if "error" in reply:
if reply["error"].get("code") in (600, 601):
# aborted via LED short touch or timeout
raise UserCancelled()
raise Exception(reply["error"]["message"])
if "sign" not in reply:
raise Exception("Could not sign transaction.")
dbb_signatures.extend(reply["sign"])
# Fill signatures
if len(dbb_signatures) != len(tx.txinputs()):
raise Exception(
"Incorrect number of transactions signed."
) # Should never occur
for i, txin in enumerate(tx.txinputs()):
if txin.is_complete():
break
pubkeys, x_pubkeys = txin.get_sorted_pubkeys()
for ii, pubkey in enumerate(pubkeys):
signed = dbb_signatures[i]
if "recid" in signed:
# firmware > v2.1.1
recid = int(signed["recid"], 16)
s = binascii.unhexlify(signed["sig"])
h = inputhasharray[i]
pk = ECPubkey.from_sig_string(s, recid, h)
pk = pk.get_public_key_hex(compressed=True)
elif "pubkey" in signed:
# firmware <= v2.1.1
pk = signed["pubkey"]
if pk != pubkey:
continue
sig_r = int(signed["sig"][:64], 16)
sig_s = int(signed["sig"][64:], 16)
sig = sigencode_der(sig_r, sig_s, generator_secp256k1.order())
txin.update_signature(sig + b"\x41", ii)
tx.update_input(i, txin)
except UserCancelled:
raise
except Exception as e:
self.give_error(e, True)
else:
print_error("Transaction is_complete", tx.is_complete())
tx.raw = tx.serialize()
class DigitalBitboxPlugin(HWPluginBase):
libraries_available = DIGIBOX
keystore_class = DigitalBitboxKeyStore
client = None
DEVICE_IDS = [(0x03EB, 0x2402)] # Digital Bitbox
def __init__(self, parent, config, name):
HWPluginBase.__init__(self, parent, config, name)
if self.libraries_available:
self.device_manager().register_devices(self.DEVICE_IDS, plugin=self)
self.digitalbitbox_config = self.config.get("digitalbitbox", {})
def get_dbb_device(self, device):
with self.device_manager().hid_lock:
dev = hid.device()
dev.open_path(device.path)
return dev
def create_client(self, device, handler):
if device.interface_number == 0 or device.usage_page == 0xFFFF:
self.handler = handler
client = self.get_dbb_device(device)
if client is not None:
client = DigitalBitboxClient(self, client)
return client
else:
return None
def setup_device(self, device_info, wizard, purpose):
device_id = device_info.device.id_
client = self.scan_and_create_client_for_device(
device_id=device_id, wizard=wizard
)
if purpose == HWD_SETUP_NEW_WALLET:
client.setupRunning = True
wizard.run_task_without_blocking_gui(
task=lambda: client.get_xpub("m/44'/0'", "standard")
)
return client
def is_mobile_paired(self):
return "encryptionprivkey" in self.digitalbitbox_config
def comserver_post_notification(self, payload):
assert self.is_mobile_paired(), "unexpected mobile pairing error"
url = "https://digitalbitbox.com/smartverification/index.php"
key_s = base64.b64decode(self.digitalbitbox_config["encryptionprivkey"])
args = "c=data&s=0&dt=0&uuid=%s&pl=%s" % (
self.digitalbitbox_config["comserverchannelid"],
EncodeAES_base64(key_s, json.dumps(payload).encode("ascii")).decode(
"ascii"
),
)
try:
requests.post(url, args)
except Exception as e:
self.handler.show_error(str(e))
def get_xpub(self, device_id, derivation, xtype, wizard):
client = self.scan_and_create_client_for_device(
device_id=device_id, wizard=wizard
)
client.check_device_dialog()
xpub = client.get_xpub(derivation, xtype)
return xpub
def get_client(self, keystore, force_pair=True):
devmgr = self.device_manager()
handler = keystore.handler
client = devmgr.client_for_keystore(self, handler, keystore, force_pair)
if client is not None:
client.check_device_dialog()
return client
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Wed, May 21, 22:39 (23 h, 11 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5866080
Default Alt Text
(179 KB)
Attached To
rABC Bitcoin ABC
Event Timeline
Log In to Comment