diff --git a/test/functional/abc-mempool-accept-txn.py b/test/functional/abc-mempool-accept-txn.py --- a/test/functional/abc-mempool-accept-txn.py +++ b/test/functional/abc-mempool-accept-txn.py @@ -16,7 +16,7 @@ create_tx_with_script, ) from test_framework.cdefs import MAX_STANDARD_TX_SIGOPS -from test_framework.key import CECKey +from test_framework.key import ECKey from test_framework.messages import ( COutPoint, CTransaction, @@ -65,9 +65,10 @@ self.num_nodes = 1 self.setup_clean_chain = True self.block_heights = {} - self.coinbase_key = CECKey() - self.coinbase_key.set_secretbytes(b"horsebattery") - self.coinbase_pubkey = self.coinbase_key.get_pubkey() + self.coinbase_key = ECKey() + # The test expects uncompressed keys + self.coinbase_key.generate(compressed=False) + self.coinbase_pubkey = self.coinbase_key.get_pubkey().get_bytes() self.tip = None self.blocks = {} self.extra_args = [ @@ -101,7 +102,7 @@ sighash = SignatureHashForkId( spend_tx.vout[n].scriptPubKey, tx, 0, SIGHASH_ALL | SIGHASH_FORKID, spend_tx.vout[n].nValue) tx.vin[0].scriptSig = CScript( - [self.coinbase_key.sign(sighash) + bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID]))]) + [self.coinbase_key.sign_ecdsa(sighash) + bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID]))]) def create_and_sign_transaction( self, spend_tx, n, value, script=CScript([OP_TRUE])): @@ -223,7 +224,7 @@ # Sign the transaction using the redeem script sighash = SignatureHashForkId( redeem_script, spent_p2sh_tx, 0, SIGHASH_ALL | SIGHASH_FORKID, p2sh_tx_to_spend.vout[0].nValue) - sig = self.coinbase_key.sign( + sig = self.coinbase_key.sign_ecdsa( sighash) + bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID])) spent_p2sh_tx.vin[0].scriptSig = CScript([sig, redeem_script]) assert len( diff --git a/test/functional/abc-mempool-coherence-on-activations.py b/test/functional/abc-mempool-coherence-on-activations.py --- a/test/functional/abc-mempool-coherence-on-activations.py +++ b/test/functional/abc-mempool-coherence-on-activations.py @@ -25,7 +25,7 @@ create_tx_with_script, make_conform_to_ctor, ) -from test_framework.key import CECKey +from test_framework.key import ECKey from test_framework.messages import ( COIN, COutPoint, @@ -72,9 +72,9 @@ # create transactions that are only valid before or after the fork. # Generate a key pair to test - private_key = CECKey() - private_key.set_secretbytes(b"replayprotection") - public_key = private_key.get_pubkey() + private_key = ECKey() + private_key.generate() + public_key = private_key.get_pubkey().get_bytes() # Fund transaction script = CScript([public_key, OP_CHECKSIG]) @@ -94,7 +94,7 @@ sighashtype = (forkvalue << 8) | SIGHASH_ALL | SIGHASH_FORKID sighash = SignatureHashForkId( script, txspend, 0, sighashtype, 50 * COIN) - sig = private_key.sign(sighash) + \ + sig = private_key.sign_ecdsa(sighash) + \ bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID])) txspend.vin[0].scriptSig = CScript([sig]) txspend.rehash() diff --git a/test/functional/abc-p2p-fullblocktest-sigops.py b/test/functional/abc-p2p-fullblocktest-sigops.py --- a/test/functional/abc-p2p-fullblocktest-sigops.py +++ b/test/functional/abc-p2p-fullblocktest-sigops.py @@ -26,7 +26,7 @@ MAX_TX_SIGOPS_COUNT, ONE_MEGABYTE, ) -from test_framework.key import CECKey +from test_framework.key import ECKey from test_framework.messages import ( COutPoint, CTransaction, @@ -346,9 +346,9 @@ tip(26) # Generate a key pair to test P2SH sigops count - private_key = CECKey() - private_key.set_secretbytes(b"fatstacks") - public_key = private_key.get_pubkey() + private_key = ECKey() + private_key.generate() + public_key = private_key.get_pubkey().get_bytes() # P2SH # Build the redeem script, hash it, use hash to create the p2sh script @@ -375,7 +375,7 @@ # Sign the transaction using the redeem script sighash = SignatureHashForkId( redeem_script, spent_p2sh_tx, 0, SIGHASH_ALL | SIGHASH_FORKID, p2sh_tx.vout[0].nValue) - sig = private_key.sign(sighash) + \ + sig = private_key.sign_ecdsa(sighash) + \ bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID])) spent_p2sh_tx.vin[0].scriptSig = CScript([sig, redeem_script]) spent_p2sh_tx.rehash() diff --git a/test/functional/abc-replay-protection.py b/test/functional/abc-replay-protection.py --- a/test/functional/abc-replay-protection.py +++ b/test/functional/abc-replay-protection.py @@ -17,7 +17,7 @@ create_tx_with_script, make_conform_to_ctor, ) -from test_framework.key import CECKey +from test_framework.key import ECKey from test_framework.messages import ( COIN, COutPoint, @@ -146,9 +146,9 @@ out.append(get_spendable_output()) # Generate a key pair to test P2SH sigops count - private_key = CECKey() - private_key.set_secretbytes(b"replayprotection") - public_key = private_key.get_pubkey() + private_key = ECKey() + private_key.generate() + public_key = private_key.get_pubkey().get_bytes() # This is a little handier to use than the version in blocktools.py def create_fund_and_spend_tx(spend, forkvalue=0): @@ -167,7 +167,7 @@ sighashtype = (forkvalue << 8) | SIGHASH_ALL | SIGHASH_FORKID sighash = SignatureHashForkId( script, txspend, 0, sighashtype, 50 * COIN - 1000) - sig = private_key.sign(sighash) + \ + sig = private_key.sign_ecdsa(sighash) + \ bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID])) txspend.vin[0].scriptSig = CScript([sig]) txspend.rehash() diff --git a/test/functional/abc-schnorr.py b/test/functional/abc-schnorr.py --- a/test/functional/abc-schnorr.py +++ b/test/functional/abc-schnorr.py @@ -16,7 +16,7 @@ create_tx_with_script, make_conform_to_ctor, ) -from test_framework.key import CECKey +from test_framework.key import ECKey from test_framework.messages import ( CBlock, COutPoint, @@ -151,10 +151,10 @@ # Generate a key pair privkeybytes = b"Schnorr!" * 4 - private_key = CECKey() - private_key.set_secretbytes(privkeybytes) + private_key = ECKey() + private_key.set(privkeybytes, True) # get uncompressed public key serialization - public_key = private_key.get_pubkey() + public_key = private_key.get_pubkey().get_bytes() def create_fund_and_spend_tx(multi=False, sig='schnorr'): spendfrom = spendable_outputs.pop() @@ -186,7 +186,7 @@ if sig == 'schnorr': txsig = schnorr.sign(privkeybytes, sighash) + hashbyte elif sig == 'ecdsa': - txsig = private_key.sign(sighash) + hashbyte + txsig = private_key.sign_ecdsa(sighash) + hashbyte elif isinstance(sig, bytes): txsig = sig + hashbyte if multi: diff --git a/test/functional/abc-schnorrmultisig.py b/test/functional/abc-schnorrmultisig.py --- a/test/functional/abc-schnorrmultisig.py +++ b/test/functional/abc-schnorrmultisig.py @@ -19,7 +19,7 @@ create_tx_with_script, make_conform_to_ctor, ) -from test_framework.key import CECKey +from test_framework.key import ECKey from test_framework.messages import ( CBlock, COutPoint, @@ -154,10 +154,10 @@ # Generate a key pair privkeybytes = b"Schnorr!" * 4 - private_key = CECKey() - private_key.set_secretbytes(privkeybytes) + private_key = ECKey() + private_key.set(privkeybytes, True) # get uncompressed public key serialization - public_key = private_key.get_pubkey() + public_key = private_key.get_pubkey().get_bytes() def create_fund_and_spend_tx(dummy=OP_0, sigtype='ecdsa'): spendfrom = spendable_outputs.pop() @@ -186,7 +186,7 @@ if sigtype == 'schnorr': txsig = schnorr.sign(privkeybytes, sighash) + hashbyte elif sigtype == 'ecdsa': - txsig = private_key.sign(sighash) + hashbyte + txsig = private_key.sign_ecdsa(sighash) + hashbyte txspend.vin[0].scriptSig = CScript([dummy, txsig]) txspend.rehash() diff --git a/test/functional/feature_assumevalid.py b/test/functional/feature_assumevalid.py --- a/test/functional/feature_assumevalid.py +++ b/test/functional/feature_assumevalid.py @@ -32,7 +32,7 @@ import time from test_framework.blocktools import (create_block, create_coinbase) -from test_framework.key import CECKey +from test_framework.key import ECKey from test_framework.messages import ( CBlockHeader, COutPoint, @@ -111,9 +111,9 @@ self.blocks = [] # Get a pubkey for the coinbase TXO - coinbase_key = CECKey() - coinbase_key.set_secretbytes(b"horsebattery") - coinbase_pubkey = coinbase_key.get_pubkey() + coinbase_key = ECKey() + coinbase_key.generate() + coinbase_pubkey = coinbase_key.get_pubkey().get_bytes() # Create the first block with a coinbase output to our key height = 1 diff --git a/test/functional/feature_block.py b/test/functional/feature_block.py --- a/test/functional/feature_block.py +++ b/test/functional/feature_block.py @@ -15,7 +15,7 @@ make_conform_to_ctor, ) from test_framework.cdefs import LEGACY_MAX_BLOCK_SIZE -from test_framework.key import CECKey +from test_framework.key import ECKey from test_framework.messages import ( CBlock, COIN, @@ -80,9 +80,9 @@ self.bootstrap_p2p() # Add one p2p connection to the node self.block_heights = {} - self.coinbase_key = CECKey() - self.coinbase_key.set_secretbytes(b"horsebattery") - self.coinbase_pubkey = self.coinbase_key.get_pubkey() + self.coinbase_key = ECKey() + self.coinbase_key.generate() + self.coinbase_pubkey = self.coinbase_key.get_pubkey().get_bytes() self.tip = None self.blocks = {} self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16) @@ -1053,7 +1053,7 @@ sighash = SignatureHashForkId( spend_tx.vout[0].scriptPubKey, tx, 0, SIGHASH_ALL | SIGHASH_FORKID, spend_tx.vout[0].nValue) tx.vin[0].scriptSig = CScript( - [self.coinbase_key.sign(sighash) + bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID]))]) + [self.coinbase_key.sign_ecdsa(sighash) + bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID]))]) def create_and_sign_transaction( self, spend_tx, value, script=CScript([OP_TRUE])): diff --git a/test/functional/feature_block_sigops.py b/test/functional/feature_block_sigops.py --- a/test/functional/feature_block_sigops.py +++ b/test/functional/feature_block_sigops.py @@ -18,7 +18,7 @@ make_conform_to_ctor, ) from test_framework.cdefs import LEGACY_MAX_BLOCK_SIZE, MAX_BLOCK_SIGOPS_PER_MB -from test_framework.key import CECKey +from test_framework.key import ECKey from test_framework.messages import ( COutPoint, CTransaction, @@ -61,9 +61,9 @@ self.bootstrap_p2p() # Add one p2p connection to the node self.block_heights = {} - self.coinbase_key = CECKey() - self.coinbase_key.set_secretbytes(b"horsebattery") - self.coinbase_pubkey = self.coinbase_key.get_pubkey() + self.coinbase_key = ECKey() + self.coinbase_key.generate() + self.coinbase_pubkey = self.coinbase_key.get_pubkey().get_bytes() self.tip = None self.blocks = {} self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16) @@ -263,7 +263,7 @@ sighash = SignatureHashForkId( redeem_script, tx, 1, SIGHASH_ALL | SIGHASH_FORKID, lastAmount) - sig = self.coinbase_key.sign( + sig = self.coinbase_key.sign_ecdsa( sighash) + bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID])) scriptSig = CScript([sig, redeem_script]) @@ -420,7 +420,7 @@ sighash = SignatureHashForkId( spend_tx.vout[0].scriptPubKey, tx, 0, SIGHASH_ALL | SIGHASH_FORKID, spend_tx.vout[0].nValue) tx.vin[0].scriptSig = CScript( - [self.coinbase_key.sign(sighash) + bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID]))]) + [self.coinbase_key.sign_ecdsa(sighash) + bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID]))]) def create_and_sign_transaction( self, spend_tx, value, script=CScript([OP_TRUE])): diff --git a/test/functional/test_framework/key.py b/test/functional/test_framework/key.py --- a/test/functional/test_framework/key.py +++ b/test/functional/test_framework/key.py @@ -1,249 +1,369 @@ #!/usr/bin/env python3 -# Copyright (c) 2011 Sam Rushing -"""ECC secp256k1 OpenSSL wrapper. +# Copyright (c) 2019 Pieter Wuille -WARNING: This module does not mlock() secrets; your private keys may end up on -disk in swap! Use with caution! +"""Test-only secp256k1 elliptic curve implementation -This file is modified from python-bitcoinlib. +WARNING: This code is slow, uses bad randomness, does not properly protect +keys, and is trivially vulnerable to side channel attacks. Do not use for +anything but tests. """ -import ctypes -import ctypes.util -import hashlib +import random -ssl = ctypes.cdll.LoadLibrary(ctypes.util.find_library('ssl') or 'libeay32') -ssl.BN_new.restype = ctypes.c_void_p -ssl.BN_new.argtypes = [] - -ssl.BN_free.restype = None -ssl.BN_free.argtypes = [ctypes.c_void_p] - -ssl.BN_bin2bn.restype = ctypes.c_void_p -ssl.BN_bin2bn.argtypes = [ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p] - -ssl.BN_CTX_free.restype = None -ssl.BN_CTX_free.argtypes = [ctypes.c_void_p] - -ssl.BN_CTX_new.restype = ctypes.c_void_p -ssl.BN_CTX_new.argtypes = [] - -ssl.ECDH_compute_key.restype = ctypes.c_int -ssl.ECDH_compute_key.argtypes = [ctypes.c_void_p, - ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p] - -ssl.ECDSA_sign.restype = ctypes.c_int -ssl.ECDSA_sign.argtypes = [ctypes.c_int, ctypes.c_void_p, - ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] - -ssl.ECDSA_verify.restype = ctypes.c_int -ssl.ECDSA_verify.argtypes = [ctypes.c_int, ctypes.c_void_p, - ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p] - -ssl.EC_KEY_free.restype = None -ssl.EC_KEY_free.argtypes = [ctypes.c_void_p] - -ssl.EC_KEY_new_by_curve_name.restype = ctypes.c_void_p -ssl.EC_KEY_new_by_curve_name.argtypes = [ctypes.c_int] - -ssl.EC_KEY_get0_group.restype = ctypes.c_void_p -ssl.EC_KEY_get0_group.argtypes = [ctypes.c_void_p] - -ssl.EC_KEY_get0_public_key.restype = ctypes.c_void_p -ssl.EC_KEY_get0_public_key.argtypes = [ctypes.c_void_p] - -ssl.EC_KEY_set_private_key.restype = ctypes.c_int -ssl.EC_KEY_set_private_key.argtypes = [ctypes.c_void_p, ctypes.c_void_p] - -ssl.EC_KEY_set_conv_form.restype = None -ssl.EC_KEY_set_conv_form.argtypes = [ctypes.c_void_p, ctypes.c_int] - -ssl.EC_KEY_set_public_key.restype = ctypes.c_int -ssl.EC_KEY_set_public_key.argtypes = [ctypes.c_void_p, ctypes.c_void_p] - -ssl.i2o_ECPublicKey.restype = ctypes.c_void_p -ssl.i2o_ECPublicKey.argtypes = [ctypes.c_void_p, ctypes.c_void_p] - -ssl.EC_POINT_new.restype = ctypes.c_void_p -ssl.EC_POINT_new.argtypes = [ctypes.c_void_p] - -ssl.EC_POINT_free.restype = None -ssl.EC_POINT_free.argtypes = [ctypes.c_void_p] - -ssl.EC_POINT_mul.restype = ctypes.c_int -ssl.EC_POINT_mul.argtypes = [ctypes.c_void_p, ctypes.c_void_p, - ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] - -# this specifies the curve used with ECDSA. -NID_secp256k1 = 714 # from openssl/obj_mac.h +def modinv(a, n): + """Compute the modular inverse of a modulo n + See https://en.wikipedia.org/wiki/Extended_Euclidean_algorithm#Modular_integers + """ + t1, t2 = 0, 1 + r1, r2 = n, a + while r2 != 0: + q = r1 // r2 + t1, t2 = t2, t1 - q * t2 + r1, r2 = r2, r1 - q * r2 + if r1 > 1: + return None + if t1 < 0: + t1 += n + return t1 + + +def jacobi_symbol(n, k): + """Compute the Jacobi symbol of n modulo k + + See http://en.wikipedia.org/wiki/Jacobi_symbol + """ + assert k > 0 and k & 1 + n %= k + t = 0 + while n != 0: + while n & 1 == 0: + n >>= 1 + r = k & 7 + t ^= (r == 3 or r == 5) + n, k = k, n + t ^= (n & k & 3 == 3) + n = n % k + if k == 1: + return -1 if t else 1 + return 0 + + +def modsqrt(a, p): + """Compute the square root of a modulo p + + For p = 3 mod 4, if a square root exists, it is equal to a**((p+1)/4) mod p. + """ + assert(p % 4 == 3) # Only p = 3 mod 4 is implemented + sqrt = pow(a, (p + 1) // 4, p) + if pow(sqrt, 2, p) == a % p: + return sqrt + return None + + +class EllipticCurve: + def __init__(self, p, a, b): + """Initialize elliptic curve y^2 = x^3 + a*x + b over GF(p).""" + self.p = p + self.a = a % p + self.b = b % p + + def affine(self, p1): + """Convert a Jacobian point tuple p1 to affine form, or None if at infinity.""" + x1, y1, z1 = p1 + if z1 == 0: + return None + inv = modinv(z1, self.p) + inv_2 = (inv**2) % self.p + inv_3 = (inv_2 * inv) % self.p + return ((inv_2 * x1) % self.p, (inv_3 * y1) % self.p, 1) + + def negate(self, p1): + """Negate a Jacobian point tuple p1.""" + x1, y1, z1 = p1 + return (x1, (self.p - y1) % self.p, z1) + + def on_curve(self, p1): + """Determine whether a Jacobian tuple p is on the curve (and not infinity)""" + x1, y1, z1 = p1 + z2 = pow(z1, 2, self.p) + z4 = pow(z2, 2, self.p) + return z1 != 0 and (pow(x1, 3, self.p) + self.a * x1 * + z4 + self.b * z2 * z4 - pow(y1, 2, self.p)) % self.p == 0 + + def is_x_coord(self, x): + """Test whether x is a valid X coordinate on the curve.""" + x_3 = pow(x, 3, self.p) + return jacobi_symbol(x_3 + self.a * x + self.b, self.p) != -1 + + def lift_x(self, x): + """Given an X coordinate on the curve, return a corresponding affine point.""" + x_3 = pow(x, 3, self.p) + v = x_3 + self.a * x + self.b + y = modsqrt(v, self.p) + if y is None: + return None + return (x, y, 1) + + def double(self, p1): + """Double a Jacobian tuple p1""" + x1, y1, z1 = p1 + if z1 == 0: + return (0, 1, 0) + y1_2 = (y1**2) % self.p + y1_4 = (y1_2**2) % self.p + x1_2 = (x1**2) % self.p + s = (4 * x1 * y1_2) % self.p + m = 3 * x1_2 + if self.a: + m += self.a * pow(z1, 4, self.p) + m = m % self.p + x2 = (m**2 - 2 * s) % self.p + y2 = (m * (s - x2) - 8 * y1_4) % self.p + z2 = (2 * y1 * z1) % self.p + return (x2, y2, z2) + + def add_mixed(self, p1, p2): + """Add a Jacobian tuple p1 and an affine tuple p2""" + x1, y1, z1 = p1 + x2, y2, z2 = p2 + assert(z2 == 1) + if z1 == 0: + return p2 + z1_2 = (z1**2) % self.p + z1_3 = (z1_2 * z1) % self.p + u2 = (x2 * z1_2) % self.p + s2 = (y2 * z1_3) % self.p + if x1 == u2: + if (y1 != s2): + return (0, 1, 0) + return self.double(p1) + h = u2 - x1 + r = s2 - y1 + h_2 = (h**2) % self.p + h_3 = (h_2 * h) % self.p + u1_h_2 = (x1 * h_2) % self.p + x3 = (r**2 - h_3 - 2 * u1_h_2) % self.p + y3 = (r * (u1_h_2 - x3) - y1 * h_3) % self.p + z3 = (h * z1) % self.p + return (x3, y3, z3) + + def add(self, p1, p2): + """Add two Jacobian tuples p1 and p2""" + x1, y1, z1 = p1 + x2, y2, z2 = p2 + if z1 == 0: + return p2 + if z2 == 0: + return p1 + if z1 == 1: + return self.add_mixed(p2, p1) + if z2 == 1: + return self.add_mixed(p1, p2) + z1_2 = (z1**2) % self.p + z1_3 = (z1_2 * z1) % self.p + z2_2 = (z2**2) % self.p + z2_3 = (z2_2 * z2) % self.p + u1 = (x1 * z2_2) % self.p + u2 = (x2 * z1_2) % self.p + s1 = (y1 * z2_3) % self.p + s2 = (y2 * z1_3) % self.p + if u1 == u2: + if (s1 != s2): + return (0, 1, 0) + return self.double(p1) + h = u2 - u1 + r = s2 - s1 + h_2 = (h**2) % self.p + h_3 = (h_2 * h) % self.p + u1_h_2 = (u1 * h_2) % self.p + x3 = (r**2 - h_3 - 2 * u1_h_2) % self.p + y3 = (r * (u1_h_2 - x3) - s1 * h_3) % self.p + z3 = (h * z1 * z2) % self.p + return (x3, y3, z3) + + def mul(self, ps): + """Compute a (multi) point multiplication + + ps is a list of (Jacobian tuple, scalar) pairs. + """ + r = (0, 1, 0) + for i in range(255, -1, -1): + r = self.double(r) + for (p, n) in ps: + if ((n >> i) & 1): + r = self.add(r, p) + return r + + +SECP256K1 = EllipticCurve(2**256 - 2**32 - 977, 0, 7) +SECP256K1_G = ( + 0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, + 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8, + 1) SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2 -# Thx to Sam Devlin for the ctypes magic 64-bit fix. - -def _check_result(val, func, args): - if val == 0: - raise ValueError - else: - return ctypes.c_void_p(val) - - -ssl.EC_KEY_new_by_curve_name.restype = ctypes.c_void_p -ssl.EC_KEY_new_by_curve_name.errcheck = _check_result - - -class CECKey(): - """Wrapper around OpenSSL's EC_KEY""" - - POINT_CONVERSION_COMPRESSED = 2 - POINT_CONVERSION_UNCOMPRESSED = 4 +class ECPubKey(): + """A secp256k1 public key""" def __init__(self): - self.k = ssl.EC_KEY_new_by_curve_name(NID_secp256k1) - - def __del__(self): - if ssl: - ssl.EC_KEY_free(self.k) - self.k = None - - def set_secretbytes(self, secret): - priv_key = ssl.BN_bin2bn(secret, 32, ssl.BN_new()) - group = ssl.EC_KEY_get0_group(self.k) - pub_key = ssl.EC_POINT_new(group) - ctx = ssl.BN_CTX_new() - if not ssl.EC_POINT_mul(group, pub_key, priv_key, None, None, ctx): - raise ValueError( - "Could not derive public key from the supplied secret.") - ssl.EC_POINT_mul(group, pub_key, priv_key, None, None, ctx) - ssl.EC_KEY_set_private_key(self.k, priv_key) - ssl.EC_KEY_set_public_key(self.k, pub_key) - ssl.BN_free(priv_key) - ssl.EC_POINT_free(pub_key) - ssl.BN_CTX_free(ctx) - return self.k - - def set_privkey(self, key): - self.mb = ctypes.create_string_buffer(key) - return ssl.d2i_ECPrivateKey(ctypes.byref( - self.k), ctypes.byref(ctypes.pointer(self.mb)), len(key)) - - def set_pubkey(self, key): - self.mb = ctypes.create_string_buffer(key) - return ssl.o2i_ECPublicKey(ctypes.byref( - self.k), ctypes.byref(ctypes.pointer(self.mb)), len(key)) - - def get_privkey(self): - size = ssl.i2d_ECPrivateKey(self.k, 0) - mb_pri = ctypes.create_string_buffer(size) - ssl.i2d_ECPrivateKey(self.k, ctypes.byref(ctypes.pointer(mb_pri))) - return mb_pri.raw - - def get_pubkey(self): - size = ssl.i2o_ECPublicKey(self.k, 0) - mb = ctypes.create_string_buffer(size) - ssl.i2o_ECPublicKey(self.k, ctypes.byref(ctypes.pointer(mb))) - return mb.raw - - def get_raw_ecdh_key(self, other_pubkey): - ecdh_keybuffer = ctypes.create_string_buffer(32) - r = ssl.ECDH_compute_key(ctypes.pointer(ecdh_keybuffer), 32, - ssl.EC_KEY_get0_public_key(other_pubkey.k), - self.k, 0) - if r != 32: - raise Exception('CKey.get_ecdh_key(): ECDH_compute_key() failed') - return ecdh_keybuffer.raw - - def get_ecdh_key(self, other_pubkey, - kdf=lambda k: hashlib.sha256(k).digest()): - # FIXME: be warned it's not clear what the kdf should be as a default - r = self.get_raw_ecdh_key(other_pubkey) - return kdf(r) - - def sign(self, hash, low_s=True): - # FIXME: need unit tests for below cases - if not isinstance(hash, bytes): - raise TypeError( - 'Hash must be bytes instance; got {!r}'.format(hash.__class__)) - if len(hash) != 32: - raise ValueError('Hash must be exactly 32 bytes long') - - sig_size0 = ctypes.c_uint32() - sig_size0.value = ssl.ECDSA_size(self.k) - mb_sig = ctypes.create_string_buffer(sig_size0.value) - result = ssl.ECDSA_sign(0, hash, len( - hash), mb_sig, ctypes.byref(sig_size0), self.k) - assert 1 == result - assert mb_sig.raw[0] == 0x30 - assert mb_sig.raw[1] == sig_size0.value - 2 - total_size = mb_sig.raw[1] - assert mb_sig.raw[2] == 2 - r_size = mb_sig.raw[3] - assert mb_sig.raw[4 + r_size] == 2 - s_size = mb_sig.raw[5 + r_size] - s_value = int.from_bytes( - mb_sig.raw[6 + r_size:6 + r_size + s_size], byteorder='big') - if (not low_s) or s_value <= SECP256K1_ORDER_HALF: - return mb_sig.raw[:sig_size0.value] - else: - low_s_value = SECP256K1_ORDER - s_value - low_s_bytes = (low_s_value).to_bytes(33, byteorder='big') - while len( - low_s_bytes) > 1 and low_s_bytes[0] == 0 and low_s_bytes[1] < 0x80: - low_s_bytes = low_s_bytes[1:] - new_s_size = len(low_s_bytes) - new_total_size_byte = ( - total_size + new_s_size - s_size).to_bytes(1, byteorder='big') - new_s_size_byte = (new_s_size).to_bytes(1, byteorder='big') - return b'\x30' + new_total_size_byte + \ - mb_sig.raw[2:5 + r_size] + new_s_size_byte + low_s_bytes - - def verify(self, hash, sig): - """Verify a DER signature""" - return ssl.ECDSA_verify(0, hash, len(hash), sig, len(sig), self.k) == 1 - - def set_compressed(self, compressed): - if compressed: - form = self.POINT_CONVERSION_COMPRESSED + """Construct an uninitialized public key""" + self.valid = False + + def set(self, data): + """Construct a public key from a serialization in compressed or uncompressed format""" + if (len(data) == 65 and data[0] == 0x04): + p = (int.from_bytes(data[1:33], 'big'), + int.from_bytes(data[33:65], 'big'), 1) + self.valid = SECP256K1.on_curve(p) + if self.valid: + self.p = p + self.compressed = False + elif (len(data) == 33 and (data[0] == 0x02 or data[0] == 0x03)): + x = int.from_bytes(data[1:33], 'big') + if SECP256K1.is_x_coord(x): + p = SECP256K1.lift_x(x) + if (p[1] & 1) != (data[0] & 1): + p = SECP256K1.negate(p) + self.p = p + self.valid = True + self.compressed = True + else: + self.valid = False else: - form = self.POINT_CONVERSION_UNCOMPRESSED - ssl.EC_KEY_set_conv_form(self.k, form) - - -class CPubKey(bytes): - """An encapsulated public key + self.valid = False - Attributes: + @property + def is_compressed(self): + return self.compressed - is_valid - Corresponds to CPubKey.IsValid() - is_fullyvalid - Corresponds to CPubKey.IsFullyValid() - is_compressed - Corresponds to CPubKey.IsCompressed() - """ + @property + def is_valid(self): + return self.valid + + def get_bytes(self): + assert(self.valid) + p = SECP256K1.affine(self.p) + if p is None: + return None + if self.compressed: + return bytes([0x02 + (p[1] & 1)]) + p[0].to_bytes(32, 'big') + else: + return bytes([0x04]) + p[0].to_bytes(32, 'big') + \ + p[1].to_bytes(32, 'big') + + def verify_ecdsa(self, sig, msg, low_s=True): + """Verify a strictly DER-encoded ECDSA signature against this pubkey.""" + assert(self.valid) + if (sig[1] + 2 != len(sig)): + return False + if (len(sig) < 4): + return False + if (sig[0] != 0x30): + return False + if (sig[2] != 0x02): + return False + rlen = sig[3] + if (len(sig) < 6 + rlen): + return False + if rlen < 1 or rlen > 33: + return False + if sig[4] >= 0x80: + return False + if (rlen > 1 and (sig[4] == 0) and not (sig[5] & 0x80)): + return False + r = int.from_bytes(sig[4:4 + rlen], 'big') + if (sig[4 + rlen] != 0x02): + return False + slen = sig[5 + rlen] + if slen < 1 or slen > 33: + return False + if (len(sig) != 6 + rlen + slen): + return False + if sig[6 + rlen] >= 0x80: + return False + if (slen > 1 and (sig[6 + rlen] == 0) and not (sig[7 + rlen] & 0x80)): + return False + s = int.from_bytes(sig[6 + rlen:6 + rlen + slen], 'big') + if r < 1 or s < 1 or r >= SECP256K1_ORDER or s >= SECP256K1_ORDER: + return False + if low_s and s >= SECP256K1_ORDER_HALF: + return False + z = int.from_bytes(msg, 'big') + w = modinv(s, SECP256K1_ORDER) + u1 = z * w % SECP256K1_ORDER + u2 = r * w % SECP256K1_ORDER + R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, u1), (self.p, u2)])) + if R is None or R[0] != r: + return False + return True + + +class ECKey(): + """A secp256k1 private key""" - def __new__(cls, buf, _cec_key=None): - self = super(CPubKey, cls).__new__(cls, buf) - if _cec_key is None: - _cec_key = CECKey() - self._cec_key = _cec_key - self.is_fullyvalid = _cec_key.set_pubkey(self) != 0 - return self + def __init__(self): + self.valid = False + + def set(self, secret, compressed): + """Construct a private key object with given 32-byte secret and compressed flag.""" + assert(len(secret) == 32) + secret = int.from_bytes(secret, 'big') + self.valid = (secret > 0 and secret < SECP256K1_ORDER) + if self.valid: + self.secret = secret + self.compressed = compressed + + def generate(self, compressed=True): + """Generate a random private key (compressed or uncompressed).""" + self.set( + random.randrange( + 1, + SECP256K1_ORDER).to_bytes( + 32, + 'big'), + compressed) + + def get_bytes(self): + """Retrieve the 32-byte representation of this key.""" + assert(self.valid) + return self.secret.to_bytes(32, 'big') @property def is_valid(self): - return len(self) > 0 + return self.valid @property def is_compressed(self): - return len(self) == 33 - - def verify(self, hash, sig): - return self._cec_key.verify(hash, sig) + return self.compressed - def __str__(self): - return repr(self) - - def __repr__(self): - return '{}({})'.format(self.__class__.__name__, - super(CPubKey, self).__repr__()) + def get_pubkey(self): + """Compute an ECPubKey object for this secret key.""" + assert(self.valid) + ret = ECPubKey() + p = SECP256K1.mul([(SECP256K1_G, self.secret)]) + ret.p = p + ret.valid = True + ret.compressed = self.compressed + return ret + + def sign_ecdsa(self, msg, low_s=True): + """Construct a DER-encoded ECDSA signature with this key.""" + assert(self.valid) + z = int.from_bytes(msg, 'big') + # Note: no RFC6979, but a simple random nonce (some tests rely on + # distinct transactions for the same operation) + k = random.randrange(1, SECP256K1_ORDER) + R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)])) + r = R[0] % SECP256K1_ORDER + s = (modinv(k, SECP256K1_ORDER) * (z + self.secret * r)) % SECP256K1_ORDER + if low_s and s > SECP256K1_ORDER_HALF: + s = SECP256K1_ORDER - s + rb = r.to_bytes((r.bit_length() + 8) // 8, 'big') + sb = s.to_bytes((s.bit_length() + 8) // 8, 'big') + return b'\x30' + \ + bytes([4 + len(rb) + len(sb), 2, len(rb)]) + \ + rb + bytes([2, len(sb)]) + sb