Page MenuHomePhabricator

No OneTemporary

diff --git a/electrum/electrumabc/ecc.py b/electrum/electrumabc/ecc.py
index b80b40644..a87650950 100644
--- a/electrum/electrumabc/ecc.py
+++ b/electrum/electrumabc/ecc.py
@@ -1,453 +1,452 @@
# -*- coding: utf-8 -*-
#
# Electrum ABC - lightweight eCash client
# Copyright (C) 2018 The Electrum developers
# Copyright (C) 2024 The Electrum ABC developers
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from __future__ import annotations
import base64
import hashlib
import hmac
from enum import Enum
from typing import TYPE_CHECKING, NamedTuple, Optional, Union
import ecdsa
from ecdsa.curves import SECP256k1
from ecdsa.ecdsa import curve_secp256k1, generator_secp256k1
from ecdsa.ellipticcurve import Point
from . import networks
from .crypto import Hash, aes_decrypt_with_iv, aes_encrypt_with_iv
from .ecc_fast import do_monkey_patching_of_python_ecdsa_internals_with_libsecp256k1
from .serialize import serialize_blob
from .util import InvalidPassword, assert_bytes, randrange, to_bytes
if TYPE_CHECKING:
from .address import Address
do_monkey_patching_of_python_ecdsa_internals_with_libsecp256k1()
+# 0xFFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D0364141
CURVE_ORDER = SECP256k1.order
PRIVATE_KEY_BYTECOUNT = 32
def be_bytes_to_number(b: bytes) -> int:
return int.from_bytes(b, byteorder="big", signed=False)
def i2o_ECPublicKey(pubkey, compressed=False):
# public keys are 65 bytes long (520 bits)
# 0x04 + 32-byte X-coordinate + 32-byte Y-coordinate
# 0x00 = point at infinity, 0x02 and 0x03 = compressed, 0x04 = uncompressed
# compressed keys: <sign> <x> where <sign> is 0x02 if y is even and 0x03 if y is odd
if compressed:
if pubkey.point.y() & 1:
# explicitly convert point coordinates to int, because ecdsa
# returns mpz instead of int if gmpY is installed
key = b"\x03" + int(pubkey.point.x()).to_bytes(32, "big")
else:
key = b"\x02" + int(pubkey.point.x()).to_bytes(32, "big")
else:
key = (
b"\x04"
+ int(pubkey.point.x()).to_bytes(32, "big")
+ int(pubkey.point.y()).to_bytes(32, "big")
)
return key
def regenerate_key(pk):
assert len(pk) == 32
return ECKey(pk)
def GetPubKey(pubkey, compressed=False) -> bytes:
return i2o_ECPublicKey(pubkey, compressed)
def public_key_from_private_key(pk: bytes, compressed) -> bytes:
pkey = regenerate_key(pk)
return GetPubKey(pkey.pubkey, compressed)
class SignatureType(Enum):
ECASH = 1
BITCOIN = 2
ECASH_MSG_MAGIC = b"eCash Signed Message:\n"
BITCOIN_MSG_MAGIC = b"Bitcoin Signed Message:\n"
def msg_magic(message: bytes, sigtype: SignatureType = SignatureType.ECASH) -> bytes:
"""Prepare the preimage of the message before signing it or verifying
its signature."""
magic = ECASH_MSG_MAGIC if sigtype == SignatureType.ECASH else BITCOIN_MSG_MAGIC
return serialize_blob(magic) + serialize_blob(message)
def get_y_coord_from_x(x: int, odd=True) -> int:
curve = curve_secp256k1
_p = curve.p()
_a = curve.a()
_b = curve.b()
for offset in range(128):
Mx = x + offset
My2 = pow(Mx, 3, _p) + _a * pow(Mx, 2, _p) + _b % _p
My = pow(My2, (_p + 1) // 4, _p)
if curve.contains_point(Mx, My):
if odd == bool(My & 1):
return My
return _p - My
raise Exception("ECC_YfromX: No Y found")
def negative_point(P):
return Point(P.curve(), P.x(), -P.y(), P.order())
def sig_string_from_der_sig(der_sig):
r, s = ecdsa.util.sigdecode_der(der_sig, CURVE_ORDER)
return ecdsa.util.sigencode_string(r, s, CURVE_ORDER)
class EcCoordinates(NamedTuple):
x: int
y: int
def point_to_ser(
P: Union[EcCoordinates, ecdsa.ellipticcurve.Point], compressed=True
) -> bytes:
if isinstance(P, tuple):
assert len(P) == 2, f"unexpected point: {P}"
x, y = P
else:
x, y = P.x(), P.y()
if compressed:
return int(2 + (y & 1)).to_bytes(1, "big") + int(x).to_bytes(32, "big")
return b"\x04" + int(x).to_bytes(32, "big") + int(y).to_bytes(32, "big")
def ser_to_coordinates(ser: bytes) -> EcCoordinates:
if ser[0] not in (0x02, 0x03, 0x04):
raise ValueError(f"Unexpected first byte: {ser[0]}")
if ser[0] == 0x04:
return EcCoordinates(
be_bytes_to_number(ser[1:33]), be_bytes_to_number(ser[33:])
)
x = be_bytes_to_number(ser[1:])
return EcCoordinates(x, get_y_coord_from_x(x, ser[0] == 0x03))
def ser_to_point(ser: bytes) -> ecdsa.ellipticcurve.Point:
x, y = ser_to_coordinates(ser)
return Point(curve_secp256k1, x, y, generator_secp256k1.order())
class MyVerifyingKey(ecdsa.VerifyingKey):
@classmethod
def from_signature(klass, sig, recid, h, curve):
"""See http://www.secg.org/download/aid-780/sec1-v2.pdf, chapter 4.1.6"""
from ecdsa import numbertheory, util
from . import msqr
curveFp = curve.curve
G = curve.generator
order = G.order()
# extract r,s from signature
r, s = util.sigdecode_string(sig, order)
# 1.1
x = r + (recid // 2) * order
# 1.3
alpha = (x * x * x + curveFp.a() * x + curveFp.b()) % curveFp.p()
beta = msqr.modular_sqrt(alpha, curveFp.p())
y = beta if (beta - recid) % 2 == 0 else curveFp.p() - beta
# 1.4 the constructor checks that nR is at infinity
R = Point(curveFp, x, y, order)
# 1.5 compute e from message:
e = be_bytes_to_number(h)
minus_e = -e % order
# 1.6 compute Q = r^-1 (sR - eG)
inv_r = numbertheory.inverse_mod(r, order)
Q = inv_r * (s * R + minus_e * G)
return klass.from_public_point(Q, curve)
class MySigningKey(ecdsa.SigningKey):
"""Enforce low S values in signatures"""
def sign_number(self, number, entropy=None, k=None):
curve = SECP256k1
G = curve.generator
order = G.order()
r, s = ecdsa.SigningKey.sign_number(self, number, entropy, k)
if s > order // 2:
s = order - s
return r, s
class ECPubkey(object):
def __init__(self, b: bytes):
assert_bytes(b)
point = ser_to_point(b)
self._pubkey = ecdsa.ecdsa.Public_key(generator_secp256k1, point)
@classmethod
def from_sig_string(cls, sig_string: bytes, recid: int, msg_hash: bytes):
assert_bytes(sig_string)
if len(sig_string) != 64:
raise Exception("Wrong encoding")
if not (0 <= recid <= 3):
raise ValueError(f"recid is {recid}, but should be 0 <= recid <= 3")
ecdsa_verifying_key = MyVerifyingKey.from_signature(
sig_string, recid, msg_hash, curve=SECP256k1
)
ecdsa_point = ecdsa_verifying_key.pubkey.point
return ECPubkey(point_to_ser(ecdsa_point))
@classmethod
def from_signature65(cls, sig: bytes, msg_hash: bytes):
if len(sig) != 65:
raise Exception("Wrong encoding")
nV = sig[0]
if not (27 <= nV <= 34):
raise Exception("Bad encoding")
if nV >= 31:
compressed = True
nV -= 4
else:
compressed = False
recid = nV - 27
return cls.from_sig_string(sig[1:], recid, msg_hash), compressed
@classmethod
def from_point(
cls, point: Union[EcCoordinates, ecdsa.ecdsa.Public_key]
) -> ECPubkey:
_bytes = point_to_ser(point, compressed=False) # faster than compressed
return ECPubkey(_bytes)
def get_public_key_bytes(self, compressed=True):
return point_to_ser(self.point(), compressed)
def get_public_key_hex(self, compressed=True):
return self.get_public_key_bytes(compressed).hex()
def point(self) -> EcCoordinates:
return EcCoordinates(self._pubkey.point.x(), self._pubkey.point.y())
def __mul__(self, other: int):
if not isinstance(other, int):
raise TypeError(
f"multiplication not defined for ECPubkey and {type(other)}"
)
ecdsa_point = self._pubkey.point * other
return self.from_point(ecdsa_point)
def __rmul__(self, other: int):
return self * other
def __add__(self, other):
if not isinstance(other, ECPubkey):
raise TypeError(f"addition not defined for ECPubkey and {type(other)}")
ecdsa_point = self._pubkey.point + other._pubkey.point
return self.from_point(ecdsa_point)
def __eq__(self, other):
return self.get_public_key_bytes() == other.get_public_key_bytes()
def __ne__(self, other):
return not (self == other)
def verify_message(
self,
sig65: bytes,
message: bytes,
*,
sigtype: SignatureType = SignatureType.ECASH,
) -> bool:
assert_bytes(message)
h = Hash(msg_magic(message, sigtype))
try:
public_key, compressed = ECPubkey.from_signature65(sig65, h)
except Exception:
return False
# check public key
if public_key != self:
return False
# check message
return self.verify_message_hash(sig65[1:], h)
def verify_message_hash(self, sig_string: bytes, msg_hash: bytes) -> bool:
assert_bytes(sig_string)
if len(sig_string) != 64:
return False
ecdsa_point = self._pubkey.point
verifying_key = MyVerifyingKey.from_public_point(ecdsa_point, curve=SECP256k1)
return verifying_key.verify_digest(
sig_string, msg_hash, sigdecode=ecdsa.util.sigdecode_string
)
def encrypt_message(self, message: bytes, magic=b"BIE1"):
"""
ECIES encryption/decryption methods; AES-128-CBC with PKCS7 is used as the
cipher; hmac-sha256 is used as the mac
"""
assert_bytes(message)
pk = self._pubkey.point
- if not ecdsa.ecdsa.point_is_valid(generator_secp256k1, pk.x(), pk.y()):
- raise Exception("invalid pubkey")
ephemeral_exponent = int.to_bytes(
- randrange(pow(2, 256)),
+ randrange(CURVE_ORDER),
length=PRIVATE_KEY_BYTECOUNT,
byteorder="big",
signed=False,
)
ephemeral = ECKey(ephemeral_exponent)
ecdh_key = point_to_ser(pk * ephemeral.privkey.secret_multiplier)
key = hashlib.sha512(ecdh_key).digest()
iv, key_e, key_m = key[0:16], key[16:32], key[32:]
ciphertext = aes_encrypt_with_iv(key_e, iv, message)
ephemeral_pubkey = ephemeral.get_public_key(compressed=True)
encrypted = magic + ephemeral_pubkey + ciphertext
mac = hmac.new(key_m, encrypted, hashlib.sha256).digest()
return base64.b64encode(encrypted + mac)
@classmethod
def order(cls):
return CURVE_ORDER
def verify_message_with_address(
address: Union[str, "Address"],
sig65: bytes,
message: bytes,
*,
sigtype: SignatureType = SignatureType.ECASH,
net: Optional[networks.AbstractNet] = None,
) -> bool:
# Fixme: circular import address -> ecc -> address
from .address import Address
if net is None:
net = networks.net
assert_bytes(sig65, message)
if not isinstance(address, Address):
address = Address.from_string(address, net=net)
h = Hash(msg_magic(message, sigtype))
try:
public_key, compressed = ECPubkey.from_signature65(sig65, h)
except Exception:
return False
# check public key using the address
pubkey_hex = public_key.get_public_key_bytes(compressed)
addr = Address.from_pubkey(pubkey_hex)
if address != addr:
return False
# check message
return public_key.verify_message_hash(sig65[1:], h)
class ECKey(object):
def __init__(self, k):
secret = be_bytes_to_number(k)
self.pubkey = ecdsa.ecdsa.Public_key(
generator_secp256k1, generator_secp256k1 * secret
)
self.privkey = ecdsa.ecdsa.Private_key(self.pubkey, secret)
self.secret = secret
def GetPubKey(self, compressed):
return GetPubKey(self.pubkey, compressed)
def get_public_key(self, compressed=True) -> bytes:
return point_to_ser(self.pubkey.point, compressed)
def sign(self, msg_hash):
private_key = MySigningKey.from_secret_exponent(self.secret, curve=SECP256k1)
public_key = private_key.get_verifying_key()
signature = private_key.sign_digest_deterministic(
msg_hash, hashfunc=hashlib.sha256, sigencode=ecdsa.util.sigencode_string
)
assert public_key.verify_digest(
signature, msg_hash, sigdecode=ecdsa.util.sigdecode_string
)
return signature
def sign_message(self, message, is_compressed, sigtype=SignatureType.ECASH):
message = to_bytes(message, "utf8")
signature = self.sign(Hash(msg_magic(message, sigtype)))
for i in range(4):
sig = bytes([27 + i + (4 if is_compressed else 0)]) + signature
pubkey = ECPubkey.from_point(self.pubkey.point)
if pubkey.verify_message(sig, message, sigtype=sigtype):
return sig
continue
else:
raise Exception("error: cannot sign message")
def decrypt_message(self, encrypted, magic=b"BIE1"):
encrypted = base64.b64decode(encrypted)
if len(encrypted) < 85:
raise Exception("invalid ciphertext: length")
magic_found = encrypted[:4]
ephemeral_pubkey = encrypted[4:37]
ciphertext = encrypted[37:-32]
mac = encrypted[-32:]
if magic_found != magic:
raise Exception("invalid ciphertext: invalid magic bytes")
try:
ephemeral_pubkey = ser_to_point(ephemeral_pubkey)
except AssertionError:
raise Exception("invalid ciphertext: invalid ephemeral pubkey")
if not ecdsa.ecdsa.point_is_valid(
generator_secp256k1, ephemeral_pubkey.x(), ephemeral_pubkey.y()
):
raise Exception("invalid ciphertext: invalid ephemeral pubkey")
ecdh_key = point_to_ser(ephemeral_pubkey * self.privkey.secret_multiplier)
key = hashlib.sha512(ecdh_key).digest()
iv, key_e, key_m = key[0:16], key[16:32], key[32:]
if mac != hmac.new(key_m, encrypted[:-32], hashlib.sha256).digest():
raise InvalidPassword()
return aes_decrypt_with_iv(key_e, iv, ciphertext)
def get_pubkeys_from_secret(secret):
# public key
private_key = ecdsa.SigningKey.from_string(secret, curve=SECP256k1)
public_key = private_key.get_verifying_key()
K = public_key.to_string()
K_compressed = GetPubKey(public_key.pubkey, True)
return K, K_compressed

File Metadata

Mime Type
text/x-diff
Expires
Wed, May 21, 20:57 (1 d, 14 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5865927
Default Alt Text
(15 KB)

Event Timeline