Changeset View
Changeset View
Standalone View
Standalone View
test/functional/test_framework/key.py
Show All 21 Lines | def jacobi_symbol(n, k): | ||||
""" | """ | ||||
assert k > 0 and k & 1 | assert k > 0 and k & 1 | ||||
n %= k | n %= k | ||||
t = 0 | t = 0 | ||||
while n != 0: | while n != 0: | ||||
while n & 1 == 0: | while n & 1 == 0: | ||||
n >>= 1 | n >>= 1 | ||||
r = k & 7 | r = k & 7 | ||||
t ^= (r == 3 or r == 5) | t ^= r == 3 or r == 5 | ||||
n, k = k, n | n, k = k, n | ||||
t ^= (n & k & 3 == 3) | t ^= n & k & 3 == 3 | ||||
n = n % k | n = n % k | ||||
if k == 1: | if k == 1: | ||||
return -1 if t else 1 | return -1 if t else 1 | ||||
return 0 | return 0 | ||||
def modsqrt(a, p): | def modsqrt(a, p): | ||||
"""Compute the square root of a modulo p | """Compute the square root of a modulo p | ||||
Show All 29 Lines | def negate(self, p1): | ||||
x1, y1, z1 = p1 | x1, y1, z1 = p1 | ||||
return (x1, (self.p - y1) % self.p, z1) | return (x1, (self.p - y1) % self.p, z1) | ||||
def on_curve(self, p1): | def on_curve(self, p1): | ||||
"""Determine whether a Jacobian tuple p is on the curve (and not infinity)""" | """Determine whether a Jacobian tuple p is on the curve (and not infinity)""" | ||||
x1, y1, z1 = p1 | x1, y1, z1 = p1 | ||||
z2 = pow(z1, 2, self.p) | z2 = pow(z1, 2, self.p) | ||||
z4 = pow(z2, 2, self.p) | z4 = pow(z2, 2, self.p) | ||||
return z1 != 0 and (pow(x1, 3, self.p) + self.a * x1 * | return ( | ||||
z4 + self.b * z2 * z4 - pow(y1, 2, self.p)) % self.p == 0 | z1 != 0 | ||||
and ( | |||||
pow(x1, 3, self.p) | |||||
+ self.a * x1 * z4 | |||||
+ self.b * z2 * z4 | |||||
- pow(y1, 2, self.p) | |||||
) | |||||
% self.p | |||||
== 0 | |||||
) | |||||
def is_x_coord(self, x): | def is_x_coord(self, x): | ||||
"""Test whether x is a valid X coordinate on the curve.""" | """Test whether x is a valid X coordinate on the curve.""" | ||||
x_3 = pow(x, 3, self.p) | x_3 = pow(x, 3, self.p) | ||||
return jacobi_symbol(x_3 + self.a * x + self.b, self.p) != -1 | return jacobi_symbol(x_3 + self.a * x + self.b, self.p) != -1 | ||||
def lift_x(self, x): | def lift_x(self, x): | ||||
"""Given an X coordinate on the curve, return a corresponding affine point.""" | """Given an X coordinate on the curve, return a corresponding affine point.""" | ||||
Show All 29 Lines | def add_mixed(self, p1, p2): | ||||
assert z2 == 1 | assert z2 == 1 | ||||
if z1 == 0: | if z1 == 0: | ||||
return p2 | return p2 | ||||
z1_2 = (z1**2) % self.p | z1_2 = (z1**2) % self.p | ||||
z1_3 = (z1_2 * z1) % self.p | z1_3 = (z1_2 * z1) % self.p | ||||
u2 = (x2 * z1_2) % self.p | u2 = (x2 * z1_2) % self.p | ||||
s2 = (y2 * z1_3) % self.p | s2 = (y2 * z1_3) % self.p | ||||
if x1 == u2: | if x1 == u2: | ||||
if (y1 != s2): | if y1 != s2: | ||||
return (0, 1, 0) | return (0, 1, 0) | ||||
return self.double(p1) | return self.double(p1) | ||||
h = u2 - x1 | h = u2 - x1 | ||||
r = s2 - y1 | r = s2 - y1 | ||||
h_2 = (h**2) % self.p | h_2 = (h**2) % self.p | ||||
h_3 = (h_2 * h) % self.p | h_3 = (h_2 * h) % self.p | ||||
u1_h_2 = (x1 * h_2) % self.p | u1_h_2 = (x1 * h_2) % self.p | ||||
x3 = (r**2 - h_3 - 2 * u1_h_2) % self.p | x3 = (r**2 - h_3 - 2 * u1_h_2) % self.p | ||||
Show All 17 Lines | def add(self, p1, p2): | ||||
z1_3 = (z1_2 * z1) % self.p | z1_3 = (z1_2 * z1) % self.p | ||||
z2_2 = (z2**2) % self.p | z2_2 = (z2**2) % self.p | ||||
z2_3 = (z2_2 * z2) % self.p | z2_3 = (z2_2 * z2) % self.p | ||||
u1 = (x1 * z2_2) % self.p | u1 = (x1 * z2_2) % self.p | ||||
u2 = (x2 * z1_2) % self.p | u2 = (x2 * z1_2) % self.p | ||||
s1 = (y1 * z2_3) % self.p | s1 = (y1 * z2_3) % self.p | ||||
s2 = (y2 * z1_3) % self.p | s2 = (y2 * z1_3) % self.p | ||||
if u1 == u2: | if u1 == u2: | ||||
if (s1 != s2): | if s1 != s2: | ||||
return (0, 1, 0) | return (0, 1, 0) | ||||
return self.double(p1) | return self.double(p1) | ||||
h = u2 - u1 | h = u2 - u1 | ||||
r = s2 - s1 | r = s2 - s1 | ||||
h_2 = (h**2) % self.p | h_2 = (h**2) % self.p | ||||
h_3 = (h_2 * h) % self.p | h_3 = (h_2 * h) % self.p | ||||
u1_h_2 = (u1 * h_2) % self.p | u1_h_2 = (u1 * h_2) % self.p | ||||
x3 = (r**2 - h_3 - 2 * u1_h_2) % self.p | x3 = (r**2 - h_3 - 2 * u1_h_2) % self.p | ||||
y3 = (r * (u1_h_2 - x3) - s1 * h_3) % self.p | y3 = (r * (u1_h_2 - x3) - s1 * h_3) % self.p | ||||
z3 = (h * z1 * z2) % self.p | z3 = (h * z1 * z2) % self.p | ||||
return (x3, y3, z3) | return (x3, y3, z3) | ||||
def mul(self, ps): | def mul(self, ps): | ||||
"""Compute a (multi) point multiplication | """Compute a (multi) point multiplication | ||||
ps is a list of (Jacobian tuple, scalar) pairs. | ps is a list of (Jacobian tuple, scalar) pairs. | ||||
""" | """ | ||||
r = (0, 1, 0) | r = (0, 1, 0) | ||||
for i in range(255, -1, -1): | for i in range(255, -1, -1): | ||||
r = self.double(r) | r = self.double(r) | ||||
for (p, n) in ps: | for p, n in ps: | ||||
if ((n >> i) & 1): | if (n >> i) & 1: | ||||
r = self.add(r, p) | r = self.add(r, p) | ||||
return r | return r | ||||
SECP256K1 = EllipticCurve(2**256 - 2**32 - 977, 0, 7) | SECP256K1 = EllipticCurve(2**256 - 2**32 - 977, 0, 7) | ||||
SECP256K1_G = ( | SECP256K1_G = ( | ||||
0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, | 0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, | ||||
0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8, | 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8, | ||||
1) | 1, | ||||
) | |||||
SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 | SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 | ||||
SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2 | SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2 | ||||
class ECPubKey: | class ECPubKey: | ||||
"""A secp256k1 public key""" | """A secp256k1 public key""" | ||||
def __init__(self): | def __init__(self): | ||||
"""Construct an uninitialized public key""" | """Construct an uninitialized public key""" | ||||
self.valid = False | self.valid = False | ||||
def set(self, data): | def set(self, data): | ||||
"""Construct a public key from a serialization in compressed or uncompressed format""" | """Construct a public key from a serialization in compressed or uncompressed format""" | ||||
if (len(data) == 65 and data[0] == 0x04): | if len(data) == 65 and data[0] == 0x04: | ||||
p = (int.from_bytes(data[1:33], 'big'), | p = ( | ||||
int.from_bytes(data[33:65], 'big'), 1) | int.from_bytes(data[1:33], "big"), | ||||
int.from_bytes(data[33:65], "big"), | |||||
1, | |||||
) | |||||
self.valid = SECP256K1.on_curve(p) | self.valid = SECP256K1.on_curve(p) | ||||
if self.valid: | if self.valid: | ||||
self.p = p | self.p = p | ||||
self.compressed = False | self.compressed = False | ||||
elif (len(data) == 33 and (data[0] == 0x02 or data[0] == 0x03)): | elif len(data) == 33 and (data[0] == 0x02 or data[0] == 0x03): | ||||
x = int.from_bytes(data[1:33], 'big') | x = int.from_bytes(data[1:33], "big") | ||||
if SECP256K1.is_x_coord(x): | if SECP256K1.is_x_coord(x): | ||||
p = SECP256K1.lift_x(x) | p = SECP256K1.lift_x(x) | ||||
if (p[1] & 1) != (data[0] & 1): | if (p[1] & 1) != (data[0] & 1): | ||||
p = SECP256K1.negate(p) | p = SECP256K1.negate(p) | ||||
self.p = p | self.p = p | ||||
self.valid = True | self.valid = True | ||||
self.compressed = True | self.compressed = True | ||||
else: | else: | ||||
Show All 10 Lines | def is_valid(self): | ||||
return self.valid | return self.valid | ||||
def get_bytes(self): | def get_bytes(self): | ||||
assert self.valid | assert self.valid | ||||
p = SECP256K1.affine(self.p) | p = SECP256K1.affine(self.p) | ||||
if p is None: | if p is None: | ||||
return None | return None | ||||
if self.compressed: | if self.compressed: | ||||
return bytes([0x02 + (p[1] & 1)]) + p[0].to_bytes(32, 'big') | return bytes([0x02 + (p[1] & 1)]) + p[0].to_bytes(32, "big") | ||||
else: | else: | ||||
return bytes([0x04]) + p[0].to_bytes(32, 'big') + \ | return bytes([0x04]) + p[0].to_bytes(32, "big") + p[1].to_bytes(32, "big") | ||||
p[1].to_bytes(32, 'big') | |||||
def verify_ecdsa(self, sig, msg, low_s=True): | def verify_ecdsa(self, sig, msg, low_s=True): | ||||
"""Verify a strictly DER-encoded ECDSA signature against this pubkey.""" | """Verify a strictly DER-encoded ECDSA signature against this pubkey.""" | ||||
assert self.valid | assert self.valid | ||||
if (sig[1] + 2 != len(sig)): | if sig[1] + 2 != len(sig): | ||||
return False | return False | ||||
if (len(sig) < 4): | if len(sig) < 4: | ||||
return False | return False | ||||
if (sig[0] != 0x30): | if sig[0] != 0x30: | ||||
return False | return False | ||||
if (sig[2] != 0x02): | if sig[2] != 0x02: | ||||
return False | return False | ||||
rlen = sig[3] | rlen = sig[3] | ||||
if (len(sig) < 6 + rlen): | if len(sig) < 6 + rlen: | ||||
return False | return False | ||||
if rlen < 1 or rlen > 33: | if rlen < 1 or rlen > 33: | ||||
return False | return False | ||||
if sig[4] >= 0x80: | if sig[4] >= 0x80: | ||||
return False | return False | ||||
if (rlen > 1 and (sig[4] == 0) and not (sig[5] & 0x80)): | if rlen > 1 and (sig[4] == 0) and not (sig[5] & 0x80): | ||||
return False | return False | ||||
r = int.from_bytes(sig[4:4 + rlen], 'big') | r = int.from_bytes(sig[4 : 4 + rlen], "big") | ||||
if (sig[4 + rlen] != 0x02): | if sig[4 + rlen] != 0x02: | ||||
return False | return False | ||||
slen = sig[5 + rlen] | slen = sig[5 + rlen] | ||||
if slen < 1 or slen > 33: | if slen < 1 or slen > 33: | ||||
return False | return False | ||||
if (len(sig) != 6 + rlen + slen): | if len(sig) != 6 + rlen + slen: | ||||
return False | return False | ||||
if sig[6 + rlen] >= 0x80: | if sig[6 + rlen] >= 0x80: | ||||
return False | return False | ||||
if (slen > 1 and (sig[6 + rlen] == 0) and not (sig[7 + rlen] & 0x80)): | if slen > 1 and (sig[6 + rlen] == 0) and not (sig[7 + rlen] & 0x80): | ||||
return False | return False | ||||
s = int.from_bytes(sig[6 + rlen:6 + rlen + slen], 'big') | s = int.from_bytes(sig[6 + rlen : 6 + rlen + slen], "big") | ||||
if r < 1 or s < 1 or r >= SECP256K1_ORDER or s >= SECP256K1_ORDER: | if r < 1 or s < 1 or r >= SECP256K1_ORDER or s >= SECP256K1_ORDER: | ||||
return False | return False | ||||
if low_s and s >= SECP256K1_ORDER_HALF: | if low_s and s >= SECP256K1_ORDER_HALF: | ||||
return False | return False | ||||
z = int.from_bytes(msg, 'big') | z = int.from_bytes(msg, "big") | ||||
w = modinv(s, SECP256K1_ORDER) | w = modinv(s, SECP256K1_ORDER) | ||||
u1 = z * w % SECP256K1_ORDER | u1 = z * w % SECP256K1_ORDER | ||||
u2 = r * w % SECP256K1_ORDER | u2 = r * w % SECP256K1_ORDER | ||||
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, u1), (self.p, u2)])) | R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, u1), (self.p, u2)])) | ||||
if R is None or R[0] != r: | if R is None or R[0] != r: | ||||
return False | return False | ||||
return True | return True | ||||
def verify_schnorr(self, sig, msg32): | def verify_schnorr(self, sig, msg32): | ||||
assert self.is_valid | assert self.is_valid | ||||
assert len(sig) == 64 | assert len(sig) == 64 | ||||
assert len(msg32) == 32 | assert len(msg32) == 32 | ||||
Rx = sig[:32] | Rx = sig[:32] | ||||
s = int.from_bytes(sig[32:], 'big') | s = int.from_bytes(sig[32:], "big") | ||||
e = int.from_bytes( | e = int.from_bytes( | ||||
hashlib.sha256( | hashlib.sha256(Rx + self.get_bytes() + msg32).digest(), "big" | ||||
Rx + | ) | ||||
self.get_bytes() + | |||||
msg32).digest(), | |||||
'big') | |||||
nege = SECP256K1_ORDER - e | nege = SECP256K1_ORDER - e | ||||
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, s), (self.p, nege)])) | R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, s), (self.p, nege)])) | ||||
if R is None: | if R is None: | ||||
return False | return False | ||||
if jacobi_symbol(R[1], SECP256K1.p) == -1: | if jacobi_symbol(R[1], SECP256K1.p) == -1: | ||||
return False | return False | ||||
return R[0] == int.from_bytes(Rx, 'big') | return R[0] == int.from_bytes(Rx, "big") | ||||
class ECKey: | class ECKey: | ||||
"""A secp256k1 private key""" | """A secp256k1 private key""" | ||||
def __init__(self): | def __init__(self): | ||||
self.valid = False | self.valid = False | ||||
def set(self, secret, compressed): | def set(self, secret, compressed): | ||||
"""Construct a private key object with given 32-byte secret and compressed flag.""" | """Construct a private key object with given 32-byte secret and compressed flag.""" | ||||
assert len(secret) == 32 | assert len(secret) == 32 | ||||
secret = int.from_bytes(secret, 'big') | secret = int.from_bytes(secret, "big") | ||||
self.valid = (secret > 0 and secret < SECP256K1_ORDER) | self.valid = secret > 0 and secret < SECP256K1_ORDER | ||||
if self.valid: | if self.valid: | ||||
self.secret = secret | self.secret = secret | ||||
self.compressed = compressed | self.compressed = compressed | ||||
def generate(self, compressed=True): | def generate(self, compressed=True): | ||||
"""Generate a random private key (compressed or uncompressed).""" | """Generate a random private key (compressed or uncompressed).""" | ||||
self.set( | self.set(random.randrange(1, SECP256K1_ORDER).to_bytes(32, "big"), compressed) | ||||
random.randrange( | |||||
1, | |||||
SECP256K1_ORDER).to_bytes( | |||||
32, | |||||
'big'), | |||||
compressed) | |||||
def get_bytes(self): | def get_bytes(self): | ||||
"""Retrieve the 32-byte representation of this key.""" | """Retrieve the 32-byte representation of this key.""" | ||||
assert self.valid | assert self.valid | ||||
return self.secret.to_bytes(32, 'big') | return self.secret.to_bytes(32, "big") | ||||
@property | @property | ||||
def is_valid(self): | def is_valid(self): | ||||
return self.valid | return self.valid | ||||
@property | @property | ||||
def is_compressed(self): | def is_compressed(self): | ||||
return self.compressed | return self.compressed | ||||
def get_pubkey(self): | def get_pubkey(self): | ||||
"""Compute an ECPubKey object for this secret key.""" | """Compute an ECPubKey object for this secret key.""" | ||||
assert self.valid | assert self.valid | ||||
ret = ECPubKey() | ret = ECPubKey() | ||||
p = SECP256K1.mul([(SECP256K1_G, self.secret)]) | p = SECP256K1.mul([(SECP256K1_G, self.secret)]) | ||||
ret.p = p | ret.p = p | ||||
ret.valid = True | ret.valid = True | ||||
ret.compressed = self.compressed | ret.compressed = self.compressed | ||||
return ret | return ret | ||||
def sign_ecdsa(self, msg, low_s=True): | def sign_ecdsa(self, msg, low_s=True): | ||||
"""Construct a DER-encoded ECDSA signature with this key.""" | """Construct a DER-encoded ECDSA signature with this key.""" | ||||
assert self.valid | assert self.valid | ||||
z = int.from_bytes(msg, 'big') | z = int.from_bytes(msg, "big") | ||||
# Note: no RFC6979, but a simple random nonce (some tests rely on | # Note: no RFC6979, but a simple random nonce (some tests rely on | ||||
# distinct transactions for the same operation) | # distinct transactions for the same operation) | ||||
k = random.randrange(1, SECP256K1_ORDER) | k = random.randrange(1, SECP256K1_ORDER) | ||||
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)])) | R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)])) | ||||
r = R[0] % SECP256K1_ORDER | r = R[0] % SECP256K1_ORDER | ||||
s = (modinv(k, SECP256K1_ORDER) * (z + self.secret * r)) % SECP256K1_ORDER | s = (modinv(k, SECP256K1_ORDER) * (z + self.secret * r)) % SECP256K1_ORDER | ||||
if low_s and s > SECP256K1_ORDER_HALF: | if low_s and s > SECP256K1_ORDER_HALF: | ||||
s = SECP256K1_ORDER - s | s = SECP256K1_ORDER - s | ||||
rb = r.to_bytes((r.bit_length() + 8) // 8, 'big') | rb = r.to_bytes((r.bit_length() + 8) // 8, "big") | ||||
sb = s.to_bytes((s.bit_length() + 8) // 8, 'big') | sb = s.to_bytes((s.bit_length() + 8) // 8, "big") | ||||
return b'\x30' + \ | return ( | ||||
bytes([4 + len(rb) + len(sb), 2, len(rb)]) + \ | b"\x30" | ||||
rb + bytes([2, len(sb)]) + sb | + bytes([4 + len(rb) + len(sb), 2, len(rb)]) | ||||
+ rb | |||||
+ bytes([2, len(sb)]) | |||||
+ sb | |||||
) | |||||
def sign_schnorr(self, msg32): | def sign_schnorr(self, msg32): | ||||
"""Create Schnorr signature (BIP-Schnorr convention).""" | """Create Schnorr signature (BIP-Schnorr convention).""" | ||||
assert self.valid | assert self.valid | ||||
assert len(msg32) == 32 | assert len(msg32) == 32 | ||||
pubkey = self.get_pubkey() | pubkey = self.get_pubkey() | ||||
assert pubkey.is_valid | assert pubkey.is_valid | ||||
k = random.randrange(1, SECP256K1_ORDER) | k = random.randrange(1, SECP256K1_ORDER) | ||||
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)])) | R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)])) | ||||
if jacobi_symbol(R[1], SECP256K1.p) == -1: | if jacobi_symbol(R[1], SECP256K1.p) == -1: | ||||
k = SECP256K1_ORDER - k | k = SECP256K1_ORDER - k | ||||
Rx = R[0].to_bytes(32, 'big') | Rx = R[0].to_bytes(32, "big") | ||||
e = int.from_bytes( | e = int.from_bytes( | ||||
hashlib.sha256( | hashlib.sha256(Rx + pubkey.get_bytes() + msg32).digest(), "big" | ||||
Rx + | ) | ||||
pubkey.get_bytes() + | s = (k + e * int.from_bytes(self.get_bytes(), "big")) % SECP256K1_ORDER | ||||
msg32).digest(), | sig = Rx + s.to_bytes(32, "big") | ||||
'big') | |||||
s = (k + e * int.from_bytes(self.get_bytes(), 'big')) % SECP256K1_ORDER | |||||
sig = Rx + s.to_bytes(32, 'big') | |||||
assert pubkey.verify_schnorr(sig, msg32) | assert pubkey.verify_schnorr(sig, msg32) | ||||
return sig | return sig |