diff --git a/test/functional/abc-schnorr.py b/test/functional/abc-schnorr.py index d662e85f4..e8eb93e16 100755 --- a/test/functional/abc-schnorr.py +++ b/test/functional/abc-schnorr.py @@ -1,248 +1,246 @@ #!/usr/bin/env python3 # Copyright (c) 2019 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ This tests the treatment of Schnorr transaction signatures: - acceptance both in mempool and blocks. - check banning for peers who send txns with 64 byte ECDSA DER sigs. Derived from a variety of functional tests. """ from test_framework.blocktools import ( create_block, create_coinbase, create_tx_with_script, make_conform_to_ctor, ) from test_framework.key import ECKey from test_framework.messages import ( CBlock, COutPoint, CTransaction, CTxIn, CTxOut, FromHex, ToHex, ) from test_framework.mininode import ( P2PDataStore, ) -from test_framework import schnorr from test_framework.script import ( CScript, OP_1, OP_CHECKMULTISIG, OP_CHECKSIG, OP_TRUE, SIGHASH_ALL, SIGHASH_FORKID, SignatureHashForkId, ) from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_raises_rpc_error # A mandatory (bannable) error occurs when people pass Schnorr signatures # into OP_CHECKMULTISIG. SCHNORR_MULTISIG_ERROR = 'mandatory-script-verify-flag-failed (Signature cannot be 65 bytes in CHECKMULTISIG)' # A mandatory (bannable) error occurs when people send invalid Schnorr # sigs into OP_CHECKSIG. NULLFAIL_ERROR = 'mandatory-script-verify-flag-failed (Signature must be zero for failed CHECK(MULTI)SIG operation)' # Blocks with invalid scripts give this error: BADINPUTS_ERROR = 'blk-bad-inputs' # This 64-byte signature is used to test exclusion & banning according to # the above error messages. # Tests of real 64 byte ECDSA signatures can be found in script_tests. sig64 = b'\0' * 64 class SchnorrTest(BitcoinTestFramework): def set_test_params(self): self.num_nodes = 1 self.block_heights = {} self.extra_args = [[ "-acceptnonstdtxn=1"]] def bootstrap_p2p(self, *, num_connections=1): """Add a P2P connection to the node. Helper to connect and wait for version handshake.""" for _ in range(num_connections): self.nodes[0].add_p2p_connection(P2PDataStore()) def reconnect_p2p(self, **kwargs): """Tear down and bootstrap the P2P connection to the node. The node gets disconnected several times in this test. This helper method reconnects the p2p and restarts the network thread.""" self.nodes[0].disconnect_p2ps() self.bootstrap_p2p(**kwargs) def getbestblock(self, node): """Get the best block. Register its height so we can use build_block.""" block_height = node.getblockcount() blockhash = node.getblockhash(block_height) block = FromHex(CBlock(), node.getblock(blockhash, 0)) block.calc_sha256() self.block_heights[block.sha256] = block_height return block def build_block(self, parent, transactions=(), nTime=None): """Make a new block with an OP_1 coinbase output. Requires parent to have its height registered.""" parent.calc_sha256() block_height = self.block_heights[parent.sha256] + 1 block_time = (parent.nTime + 1) if nTime is None else nTime block = create_block( parent.sha256, create_coinbase(block_height), block_time) block.vtx.extend(transactions) make_conform_to_ctor(block) block.hashMerkleRoot = block.calc_merkle_root() block.solve() self.block_heights[block.sha256] = block_height return block def check_for_ban_on_rejected_tx(self, tx, reject_reason=None): """Check we are disconnected when sending a txn that the node rejects. (Can't actually get banned, since bitcoind won't ban local peers.)""" self.nodes[0].p2p.send_txs_and_test( [tx], self.nodes[0], success=False, reject_reason=reject_reason, expect_disconnect=True) self.reconnect_p2p() def check_for_ban_on_rejected_block(self, block, reject_reason=None): """Check we are disconnected when sending a block that the node rejects. (Can't actually get banned, since bitcoind won't ban local peers.)""" self.nodes[0].p2p.send_blocks_and_test( [block], self.nodes[0], success=False, reject_reason=reject_reason, expect_disconnect=True) self.reconnect_p2p() def run_test(self): node, = self.nodes self.bootstrap_p2p() tip = self.getbestblock(node) self.log.info("Create some blocks with OP_1 coinbase for spending.") blocks = [] for _ in range(10): tip = self.build_block(tip) blocks.append(tip) node.p2p.send_blocks_and_test(blocks, node, success=True) spendable_outputs = [block.vtx[0] for block in blocks] self.log.info("Mature the blocks and get out of IBD.") node.generatetoaddress(100, node.get_deterministic_priv_key().address) tip = self.getbestblock(node) self.log.info("Setting up spends to test and mining the fundings.") fundings = [] # Generate a key pair - privkeybytes = b"Schnorr!" * 4 private_key = ECKey() - private_key.set(privkeybytes, True) + private_key.set(b"Schnorr!" * 4, True) # get uncompressed public key serialization public_key = private_key.get_pubkey().get_bytes() def create_fund_and_spend_tx(multi=False, sig='schnorr'): spendfrom = spendable_outputs.pop() if multi: script = CScript([OP_1, public_key, OP_1, OP_CHECKMULTISIG]) else: script = CScript([public_key, OP_CHECKSIG]) value = spendfrom.vout[0].nValue # Fund transaction txfund = create_tx_with_script(spendfrom, 0, b'', value, script) txfund.rehash() fundings.append(txfund) # Spend transaction txspend = CTransaction() txspend.vout.append( CTxOut(value - 1000, CScript([OP_TRUE]))) txspend.vin.append( CTxIn(COutPoint(txfund.sha256, 0), b'')) # Sign the transaction sighashtype = SIGHASH_ALL | SIGHASH_FORKID hashbyte = bytes([sighashtype & 0xff]) sighash = SignatureHashForkId( script, txspend, 0, sighashtype, value) if sig == 'schnorr': - txsig = schnorr.sign(privkeybytes, sighash) + hashbyte + txsig = private_key.sign_schnorr(sighash) + hashbyte elif sig == 'ecdsa': txsig = private_key.sign_ecdsa(sighash) + hashbyte elif isinstance(sig, bytes): txsig = sig + hashbyte if multi: txspend.vin[0].scriptSig = CScript([b'', txsig]) else: txspend.vin[0].scriptSig = CScript([txsig]) txspend.rehash() return txspend schnorrchecksigtx = create_fund_and_spend_tx() schnorrmultisigtx = create_fund_and_spend_tx(multi=True) ecdsachecksigtx = create_fund_and_spend_tx(sig='ecdsa') sig64checksigtx = create_fund_and_spend_tx(sig=sig64) sig64multisigtx = create_fund_and_spend_tx(multi=True, sig=sig64) tip = self.build_block(tip, fundings) node.p2p.send_blocks_and_test([tip], node) self.log.info("Typical ECDSA and Schnorr CHECKSIG are valid.") node.p2p.send_txs_and_test([schnorrchecksigtx, ecdsachecksigtx], node) # They get mined as usual. node.generatetoaddress(1, node.get_deterministic_priv_key().address) tip = self.getbestblock(node) # Make sure they are in the block, and mempool is now empty. txhashes = set([schnorrchecksigtx.hash, ecdsachecksigtx.hash]) assert txhashes.issubset(tx.rehash() for tx in tip.vtx) assert not node.getrawmempool() self.log.info("Schnorr in multisig is rejected with mandatory error.") assert_raises_rpc_error(-26, SCHNORR_MULTISIG_ERROR, node.sendrawtransaction, ToHex(schnorrmultisigtx)) # And it is banworthy. self.check_for_ban_on_rejected_tx( schnorrmultisigtx, SCHNORR_MULTISIG_ERROR) # And it can't be mined self.check_for_ban_on_rejected_block( self.build_block(tip, [schnorrmultisigtx]), BADINPUTS_ERROR) self.log.info("Bad 64-byte sig is rejected with mandatory error.") # In CHECKSIG it's invalid Schnorr and hence NULLFAIL. assert_raises_rpc_error(-26, NULLFAIL_ERROR, node.sendrawtransaction, ToHex(sig64checksigtx)) # In CHECKMULTISIG it's invalid length and hence BAD_LENGTH. assert_raises_rpc_error(-26, SCHNORR_MULTISIG_ERROR, node.sendrawtransaction, ToHex(sig64multisigtx)) # Sending these transactions is banworthy. self.check_for_ban_on_rejected_tx(sig64checksigtx, NULLFAIL_ERROR) self.check_for_ban_on_rejected_tx( sig64multisigtx, SCHNORR_MULTISIG_ERROR) # And they can't be mined either... self.check_for_ban_on_rejected_block( self.build_block(tip, [sig64checksigtx]), BADINPUTS_ERROR) self.check_for_ban_on_rejected_block( self.build_block(tip, [sig64multisigtx]), BADINPUTS_ERROR) if __name__ == '__main__': SchnorrTest().main() diff --git a/test/functional/abc-schnorrmultisig.py b/test/functional/abc-schnorrmultisig.py index f16d3eaf7..b39bce233 100755 --- a/test/functional/abc-schnorrmultisig.py +++ b/test/functional/abc-schnorrmultisig.py @@ -1,248 +1,246 @@ #!/usr/bin/env python3 # Copyright (c) 2019 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ This tests the CHECKMULTISIG mode that uses Schnorr transaction signatures and repurposes the dummy element to indicate which signatures are being checked. - acceptance both in mempool and blocks. - check non-banning for peers who send invalid txns that would have been valid on the other side of the upgrade. - check banning of peers for some fully-invalid transactions. Derived from abc-schnorr.py """ from test_framework.blocktools import ( create_block, create_coinbase, create_tx_with_script, make_conform_to_ctor, ) from test_framework.key import ECKey from test_framework.messages import ( CBlock, COutPoint, CTransaction, CTxIn, CTxOut, FromHex, ToHex, ) from test_framework.mininode import ( P2PDataStore, ) -from test_framework import schnorr from test_framework.script import ( CScript, OP_0, OP_1, OP_CHECKMULTISIG, OP_TRUE, SIGHASH_ALL, SIGHASH_FORKID, SignatureHashForkId, ) from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal, assert_raises_rpc_error # ECDSA checkmultisig with non-null dummy are invalid since the new mode # refuses ECDSA. ECDSA_NULLDUMMY_ERROR = 'mandatory-script-verify-flag-failed (Only Schnorr signatures allowed in this operation)' # A mandatory (bannable) error occurs when people pass Schnorr signatures into # legacy OP_CHECKMULTISIG. SCHNORR_LEGACY_MULTISIG_ERROR = 'mandatory-script-verify-flag-failed (Signature cannot be 65 bytes in CHECKMULTISIG)' # Blocks with invalid scripts give this error: BADINPUTS_ERROR = 'blk-bad-inputs' # This 64-byte signature is used to test exclusion & banning according to # the above error messages. # Tests of real 64 byte ECDSA signatures can be found in script_tests. sig64 = b'\0' * 64 class SchnorrMultisigTest(BitcoinTestFramework): def set_test_params(self): self.num_nodes = 1 self.block_heights = {} self.extra_args = [["-acceptnonstdtxn=1"]] def bootstrap_p2p(self, *, num_connections=1): """Add a P2P connection to the node. Helper to connect and wait for version handshake.""" for _ in range(num_connections): self.nodes[0].add_p2p_connection(P2PDataStore()) def reconnect_p2p(self, **kwargs): """Tear down and bootstrap the P2P connection to the node. The node gets disconnected several times in this test. This helper method reconnects the p2p and restarts the network thread.""" self.nodes[0].disconnect_p2ps() self.bootstrap_p2p(**kwargs) def getbestblock(self, node): """Get the best block. Register its height so we can use build_block.""" block_height = node.getblockcount() blockhash = node.getblockhash(block_height) block = FromHex(CBlock(), node.getblock(blockhash, 0)) block.calc_sha256() self.block_heights[block.sha256] = block_height return block def build_block(self, parent, transactions=(), nTime=None): """Make a new block with an OP_1 coinbase output. Requires parent to have its height registered.""" parent.calc_sha256() block_height = self.block_heights[parent.sha256] + 1 block_time = (parent.nTime + 1) if nTime is None else nTime block = create_block( parent.sha256, create_coinbase(block_height), block_time) block.vtx.extend(transactions) make_conform_to_ctor(block) block.hashMerkleRoot = block.calc_merkle_root() block.solve() self.block_heights[block.sha256] = block_height return block def check_for_ban_on_rejected_tx(self, tx, reject_reason=None): """Check we are disconnected when sending a txn that the node rejects. (Can't actually get banned, since bitcoind won't ban local peers.)""" self.nodes[0].p2p.send_txs_and_test( [tx], self.nodes[0], success=False, expect_disconnect=True, reject_reason=reject_reason) self.reconnect_p2p() def check_for_ban_on_rejected_block(self, block, reject_reason=None): """Check we are disconnected when sending a block that the node rejects. (Can't actually get banned, since bitcoind won't ban local peers.)""" self.nodes[0].p2p.send_blocks_and_test( [block], self.nodes[0], success=False, reject_reason=reject_reason, expect_disconnect=True) self.reconnect_p2p() def run_test(self): node, = self.nodes self.bootstrap_p2p() tip = self.getbestblock(node) self.log.info("Create some blocks with OP_1 coinbase for spending.") blocks = [] for _ in range(10): tip = self.build_block(tip) blocks.append(tip) node.p2p.send_blocks_and_test(blocks, node, success=True) spendable_outputs = [block.vtx[0] for block in blocks] self.log.info("Mature the blocks and get out of IBD.") node.generatetoaddress(100, node.get_deterministic_priv_key().address) tip = self.getbestblock(node) self.log.info("Setting up spends to test and mining the fundings.") fundings = [] # Generate a key pair - privkeybytes = b"Schnorr!" * 4 private_key = ECKey() - private_key.set(privkeybytes, True) + private_key.set(b"Schnorr!" * 4, True) # get uncompressed public key serialization public_key = private_key.get_pubkey().get_bytes() def create_fund_and_spend_tx(dummy=OP_0, sigtype='ecdsa'): spendfrom = spendable_outputs.pop() script = CScript([OP_1, public_key, OP_1, OP_CHECKMULTISIG]) value = spendfrom.vout[0].nValue # Fund transaction txfund = create_tx_with_script(spendfrom, 0, b'', value, script) txfund.rehash() fundings.append(txfund) # Spend transaction txspend = CTransaction() txspend.vout.append( CTxOut(value - 1000, CScript([OP_TRUE]))) txspend.vin.append( CTxIn(COutPoint(txfund.sha256, 0), b'')) # Sign the transaction sighashtype = SIGHASH_ALL | SIGHASH_FORKID hashbyte = bytes([sighashtype & 0xff]) sighash = SignatureHashForkId( script, txspend, 0, sighashtype, value) if sigtype == 'schnorr': - txsig = schnorr.sign(privkeybytes, sighash) + hashbyte + txsig = private_key.sign_schnorr(sighash) + hashbyte elif sigtype == 'ecdsa': txsig = private_key.sign_ecdsa(sighash) + hashbyte txspend.vin[0].scriptSig = CScript([dummy, txsig]) txspend.rehash() return txspend # This is valid. ecdsa0tx = create_fund_and_spend_tx(OP_0, 'ecdsa') # This is invalid. ecdsa1tx = create_fund_and_spend_tx(OP_1, 'ecdsa') # This is invalid. schnorr0tx = create_fund_and_spend_tx(OP_0, 'schnorr') # This is valid. schnorr1tx = create_fund_and_spend_tx(OP_1, 'schnorr') tip = self.build_block(tip, fundings) node.p2p.send_blocks_and_test([tip], node) self.log.info("Send a legacy ECDSA multisig into mempool.") node.p2p.send_txs_and_test([ecdsa0tx], node) assert_equal(node.getrawmempool(), [ecdsa0tx.hash]) self.log.info("Trying to mine a non-null-dummy ECDSA.") self.check_for_ban_on_rejected_block( self.build_block(tip, [ecdsa1tx]), BADINPUTS_ERROR) self.log.info( "If we try to submit it by mempool or RPC, it is rejected and we are banned") assert_raises_rpc_error(-26, ECDSA_NULLDUMMY_ERROR, node.sendrawtransaction, ToHex(ecdsa1tx)) self.check_for_ban_on_rejected_tx( ecdsa1tx, ECDSA_NULLDUMMY_ERROR) self.log.info( "Submitting a Schnorr-multisig via net, and mining it in a block") node.p2p.send_txs_and_test([schnorr1tx], node) assert_equal(set(node.getrawmempool()), { ecdsa0tx.hash, schnorr1tx.hash}) tip = self.build_block(tip, [schnorr1tx]) node.p2p.send_blocks_and_test([tip], node) self.log.info( "That legacy ECDSA multisig is still in mempool, let's mine it") assert_equal(node.getrawmempool(), [ecdsa0tx.hash]) tip = self.build_block(tip, [ecdsa0tx]) node.p2p.send_blocks_and_test([tip], node) assert_equal(node.getrawmempool(), []) self.log.info( "Trying Schnorr in legacy multisig is invalid and banworthy.") self.check_for_ban_on_rejected_tx( schnorr0tx, SCHNORR_LEGACY_MULTISIG_ERROR) self.check_for_ban_on_rejected_block( self.build_block(tip, [schnorr0tx]), BADINPUTS_ERROR) if __name__ == '__main__': SchnorrMultisigTest().main() diff --git a/test/functional/abc_p2p_avalanche.py b/test/functional/abc_p2p_avalanche.py index b22839291..49d229d07 100755 --- a/test/functional/abc_p2p_avalanche.py +++ b/test/functional/abc_p2p_avalanche.py @@ -1,279 +1,285 @@ #!/usr/bin/env python3 # Copyright (c) 2018 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the resolution of forks via avalanche.""" import random +from test_framework.key import ( + ECKey, + ECPubKey, +) from test_framework.mininode import P2PInterface, mininode_lock from test_framework.messages import ( AvalancheResponse, AvalancheVote, CInv, msg_avapoll, msg_tcpavaresponse, TCPAvalancheResponse, ) from test_framework.test_framework import BitcoinTestFramework from test_framework.util import ( assert_equal, wait_until, ) -from test_framework import schnorr BLOCK_ACCEPTED = 0 BLOCK_REJECTED = 1 BLOCK_UNKNOWN = -1 class TestNode(P2PInterface): def __init__(self): self.round = 0 self.avaresponses = [] self.avapolls = [] super().__init__() def on_avaresponse(self, message): with mininode_lock: self.avaresponses.append(message.response) def on_avapoll(self, message): with mininode_lock: self.avapolls.append(message.poll) def send_avaresponse(self, round, votes, privkey): response = AvalancheResponse(round, 0, votes) - sig = schnorr.sign(privkey, response.get_hash()) + sig = privkey.sign_schnorr(response.get_hash()) msg = msg_tcpavaresponse() msg.response = TCPAvalancheResponse(response, sig) self.send_message(msg) def send_poll(self, hashes): msg = msg_avapoll() msg.poll.round = self.round self.round += 1 for h in hashes: msg.poll.invs.append(CInv(2, h)) self.send_message(msg) def wait_for_avaresponse(self, timeout=5): wait_until( lambda: len(self.avaresponses) > 0, timeout=timeout, lock=mininode_lock) with mininode_lock: return self.avaresponses.pop(0) def get_avapoll_if_available(self): with mininode_lock: return self.avapolls.pop(0) if len(self.avapolls) > 0 else None class AvalancheTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 2 self.extra_args = [ ['-enableavalanche=1', '-avacooldown=0', '-automaticunparking=0'], ['-enableavalanche=1', '-avacooldown=0', '-noparkdeepreorg', '-maxreorgdepth=-1']] def run_test(self): node = self.nodes[0] # Build a fake quorum of nodes. quorum = [] for i in range(0, 16): n = TestNode() quorum.append(n) node.add_p2p_connection(n) n.wait_for_verack() # Get our own node id so we can use it later. n.nodeid = node.getpeerinfo()[-1]['id'] # Pick on node from the quorum for polling. poll_node = quorum[0] # Generate many block and poll for them. address = node.get_deterministic_priv_key().address node.generatetoaddress(100, address) fork_node = self.nodes[1] # Make sure the fork node has synced the blocks self.sync_blocks([node, fork_node]) # Get the key so we can verify signatures. - avakey = bytes.fromhex(node.getavalanchekey()) + avakey = ECPubKey() + avakey.set(bytes.fromhex(node.getavalanchekey())) self.log.info("Poll for the chain tip...") best_block_hash = int(node.getbestblockhash(), 16) poll_node.send_poll([best_block_hash]) def assert_response(expected): response = poll_node.wait_for_avaresponse() r = response.response assert_equal(r.cooldown, 0) # Verify signature. - assert schnorr.verify(response.sig, avakey, r.get_hash()) + assert avakey.verify_schnorr(response.sig, r.get_hash()) votes = r.votes assert_equal(len(votes), len(expected)) for i in range(0, len(votes)): assert_equal(repr(votes[i]), repr(expected[i])) assert_response([AvalancheVote(BLOCK_ACCEPTED, best_block_hash)]) self.log.info("Poll for a selection of blocks...") various_block_hashes = [ int(node.getblockhash(0), 16), int(node.getblockhash(1), 16), int(node.getblockhash(10), 16), int(node.getblockhash(25), 16), int(node.getblockhash(42), 16), int(node.getblockhash(96), 16), int(node.getblockhash(99), 16), int(node.getblockhash(100), 16), ] poll_node.send_poll(various_block_hashes) assert_response([AvalancheVote(BLOCK_ACCEPTED, h) for h in various_block_hashes]) self.log.info( "Poll for a selection of blocks, but some are now invalid...") invalidated_block = node.getblockhash(76) node.invalidateblock(invalidated_block) # We need to send the coin to a new address in order to make sure we do # not regenerate the same block. node.generatetoaddress( 26, 'bchreg:pqv2r67sgz3qumufap3h2uuj0zfmnzuv8v7ej0fffv') node.reconsiderblock(invalidated_block) poll_node.send_poll(various_block_hashes) assert_response([AvalancheVote(BLOCK_ACCEPTED, h) for h in various_block_hashes[:5]] + [AvalancheVote(BLOCK_REJECTED, h) for h in various_block_hashes[-3:]]) self.log.info("Poll for unknown blocks...") various_block_hashes = [ int(node.getblockhash(0), 16), int(node.getblockhash(25), 16), int(node.getblockhash(42), 16), various_block_hashes[5], various_block_hashes[6], various_block_hashes[7], random.randrange(1 << 255, (1 << 256) - 1), random.randrange(1 << 255, (1 << 256) - 1), random.randrange(1 << 255, (1 << 256) - 1), ] poll_node.send_poll(various_block_hashes) assert_response([AvalancheVote(BLOCK_ACCEPTED, h) for h in various_block_hashes[:3]] + [AvalancheVote(BLOCK_REJECTED, h) for h in various_block_hashes[3:6]] + [AvalancheVote(BLOCK_UNKNOWN, h) for h in various_block_hashes[-3:]]) self.log.info("Trigger polling from the node...") # duplicate the deterministic sig test from src/test/key_tests.cpp - privkey = bytes.fromhex( - "12b004fff7f4b69ef8650e767f18f11ede158148b425660723b9f9a66e61f747") - pubkey = schnorr.getpubkey(privkey, compressed=True) + privkey = ECKey() + privkey.set(bytes.fromhex( + "12b004fff7f4b69ef8650e767f18f11ede158148b425660723b9f9a66e61f747"), True) + pubkey = privkey.get_pubkey() privatekey = node.get_deterministic_priv_key().key - proof = node.buildavalancheproof(11, 12, pubkey.hex(), [{ + proof = node.buildavalancheproof(11, 12, pubkey.get_bytes().hex(), [{ 'txid': "12b004fff7f4b69ef8650e767f18f11ede158148b425660723b9f9a66e61f747", 'vout': 0, 'amount': 10, 'height': 100, 'privatekey': privatekey, }]) # Activate the quorum. for n in quorum: - success = node.addavalanchenode(n.nodeid, pubkey.hex(), proof) + success = node.addavalanchenode( + n.nodeid, pubkey.get_bytes().hex(), proof) assert success is True def can_find_block_in_poll(hash, resp=BLOCK_ACCEPTED): found_hash = False for n in quorum: poll = n.get_avapoll_if_available() # That node has not received a poll if poll is None: continue # We got a poll, check for the hash and repond votes = [] for inv in poll.invs: # Vote yes to everything r = BLOCK_ACCEPTED # Look for what we expect if inv.hash == hash: r = resp found_hash = True votes.append(AvalancheVote(r, inv.hash)) n.send_avaresponse(poll.round, votes, privkey) return found_hash # Now that we have a peer, we should start polling for the tip. hash_tip = int(node.getbestblockhash(), 16) wait_until(lambda: can_find_block_in_poll(hash_tip), timeout=5) # Make sure the fork node has synced the blocks self.sync_blocks([node, fork_node]) # Create a fork 2 blocks deep. This should trigger polling. fork_node.invalidateblock(fork_node.getblockhash(100)) fork_address = fork_node.get_deterministic_priv_key().address fork_node.generatetoaddress(2, fork_address) # Because the new tip is a deep reorg, the node will not accept it # right away, but poll for it. def parked_block(blockhash): for tip in node.getchaintips(): if tip["hash"] == blockhash: assert tip["status"] != "active" return tip["status"] == "parked" return False fork_tip = fork_node.getbestblockhash() wait_until(lambda: parked_block(fork_tip)) self.log.info("Answer all polls to finalize...") hash_to_find = int(fork_tip, 16) def has_accepted_new_tip(): can_find_block_in_poll(hash_to_find) return node.getbestblockhash() == fork_tip # Because everybody answers yes, the node will accept that block. wait_until(has_accepted_new_tip, timeout=15) assert_equal(node.getbestblockhash(), fork_tip) self.log.info("Answer all polls to park...") node.generate(1) tip_to_park = node.getbestblockhash() self.log.info(tip_to_park) hash_to_find = int(tip_to_park, 16) assert(tip_to_park != fork_tip) def has_parked_new_tip(): can_find_block_in_poll(hash_to_find, BLOCK_REJECTED) return node.getbestblockhash() == fork_tip # Because everybody answers no, the node will park that block. wait_until(has_parked_new_tip, timeout=15) assert_equal(node.getbestblockhash(), fork_tip) if __name__ == '__main__': AvalancheTest().main() diff --git a/test/functional/test_framework/key.py b/test/functional/test_framework/key.py index f181b9b18..8c0467c8b 100755 --- a/test/functional/test_framework/key.py +++ b/test/functional/test_framework/key.py @@ -1,369 +1,423 @@ #!/usr/bin/env python3 # Copyright (c) 2019 Pieter Wuille +# Copyright (c) 2019-2020 The Bitcoin developers """Test-only secp256k1 elliptic curve implementation WARNING: This code is slow, uses bad randomness, does not properly protect keys, and is trivially vulnerable to side channel attacks. Do not use for anything but tests. """ +import hashlib import random def modinv(a, n): """Compute the modular inverse of a modulo n See https://en.wikipedia.org/wiki/Extended_Euclidean_algorithm#Modular_integers """ t1, t2 = 0, 1 r1, r2 = n, a while r2 != 0: q = r1 // r2 t1, t2 = t2, t1 - q * t2 r1, r2 = r2, r1 - q * r2 if r1 > 1: return None if t1 < 0: t1 += n return t1 def jacobi_symbol(n, k): """Compute the Jacobi symbol of n modulo k See http://en.wikipedia.org/wiki/Jacobi_symbol """ assert k > 0 and k & 1 n %= k t = 0 while n != 0: while n & 1 == 0: n >>= 1 r = k & 7 t ^= (r == 3 or r == 5) n, k = k, n t ^= (n & k & 3 == 3) n = n % k if k == 1: return -1 if t else 1 return 0 def modsqrt(a, p): """Compute the square root of a modulo p For p = 3 mod 4, if a square root exists, it is equal to a**((p+1)/4) mod p. """ assert(p % 4 == 3) # Only p = 3 mod 4 is implemented sqrt = pow(a, (p + 1) // 4, p) if pow(sqrt, 2, p) == a % p: return sqrt return None class EllipticCurve: def __init__(self, p, a, b): """Initialize elliptic curve y^2 = x^3 + a*x + b over GF(p).""" self.p = p self.a = a % p self.b = b % p def affine(self, p1): """Convert a Jacobian point tuple p1 to affine form, or None if at infinity.""" x1, y1, z1 = p1 if z1 == 0: return None inv = modinv(z1, self.p) inv_2 = (inv**2) % self.p inv_3 = (inv_2 * inv) % self.p return ((inv_2 * x1) % self.p, (inv_3 * y1) % self.p, 1) def negate(self, p1): """Negate a Jacobian point tuple p1.""" x1, y1, z1 = p1 return (x1, (self.p - y1) % self.p, z1) def on_curve(self, p1): """Determine whether a Jacobian tuple p is on the curve (and not infinity)""" x1, y1, z1 = p1 z2 = pow(z1, 2, self.p) z4 = pow(z2, 2, self.p) return z1 != 0 and (pow(x1, 3, self.p) + self.a * x1 * z4 + self.b * z2 * z4 - pow(y1, 2, self.p)) % self.p == 0 def is_x_coord(self, x): """Test whether x is a valid X coordinate on the curve.""" x_3 = pow(x, 3, self.p) return jacobi_symbol(x_3 + self.a * x + self.b, self.p) != -1 def lift_x(self, x): """Given an X coordinate on the curve, return a corresponding affine point.""" x_3 = pow(x, 3, self.p) v = x_3 + self.a * x + self.b y = modsqrt(v, self.p) if y is None: return None return (x, y, 1) def double(self, p1): """Double a Jacobian tuple p1""" x1, y1, z1 = p1 if z1 == 0: return (0, 1, 0) y1_2 = (y1**2) % self.p y1_4 = (y1_2**2) % self.p x1_2 = (x1**2) % self.p s = (4 * x1 * y1_2) % self.p m = 3 * x1_2 if self.a: m += self.a * pow(z1, 4, self.p) m = m % self.p x2 = (m**2 - 2 * s) % self.p y2 = (m * (s - x2) - 8 * y1_4) % self.p z2 = (2 * y1 * z1) % self.p return (x2, y2, z2) def add_mixed(self, p1, p2): """Add a Jacobian tuple p1 and an affine tuple p2""" x1, y1, z1 = p1 x2, y2, z2 = p2 assert(z2 == 1) if z1 == 0: return p2 z1_2 = (z1**2) % self.p z1_3 = (z1_2 * z1) % self.p u2 = (x2 * z1_2) % self.p s2 = (y2 * z1_3) % self.p if x1 == u2: if (y1 != s2): return (0, 1, 0) return self.double(p1) h = u2 - x1 r = s2 - y1 h_2 = (h**2) % self.p h_3 = (h_2 * h) % self.p u1_h_2 = (x1 * h_2) % self.p x3 = (r**2 - h_3 - 2 * u1_h_2) % self.p y3 = (r * (u1_h_2 - x3) - y1 * h_3) % self.p z3 = (h * z1) % self.p return (x3, y3, z3) def add(self, p1, p2): """Add two Jacobian tuples p1 and p2""" x1, y1, z1 = p1 x2, y2, z2 = p2 if z1 == 0: return p2 if z2 == 0: return p1 if z1 == 1: return self.add_mixed(p2, p1) if z2 == 1: return self.add_mixed(p1, p2) z1_2 = (z1**2) % self.p z1_3 = (z1_2 * z1) % self.p z2_2 = (z2**2) % self.p z2_3 = (z2_2 * z2) % self.p u1 = (x1 * z2_2) % self.p u2 = (x2 * z1_2) % self.p s1 = (y1 * z2_3) % self.p s2 = (y2 * z1_3) % self.p if u1 == u2: if (s1 != s2): return (0, 1, 0) return self.double(p1) h = u2 - u1 r = s2 - s1 h_2 = (h**2) % self.p h_3 = (h_2 * h) % self.p u1_h_2 = (u1 * h_2) % self.p x3 = (r**2 - h_3 - 2 * u1_h_2) % self.p y3 = (r * (u1_h_2 - x3) - s1 * h_3) % self.p z3 = (h * z1 * z2) % self.p return (x3, y3, z3) def mul(self, ps): """Compute a (multi) point multiplication ps is a list of (Jacobian tuple, scalar) pairs. """ r = (0, 1, 0) for i in range(255, -1, -1): r = self.double(r) for (p, n) in ps: if ((n >> i) & 1): r = self.add(r, p) return r SECP256K1 = EllipticCurve(2**256 - 2**32 - 977, 0, 7) SECP256K1_G = ( 0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8, 1) SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2 class ECPubKey(): """A secp256k1 public key""" def __init__(self): """Construct an uninitialized public key""" self.valid = False def set(self, data): """Construct a public key from a serialization in compressed or uncompressed format""" if (len(data) == 65 and data[0] == 0x04): p = (int.from_bytes(data[1:33], 'big'), int.from_bytes(data[33:65], 'big'), 1) self.valid = SECP256K1.on_curve(p) if self.valid: self.p = p self.compressed = False elif (len(data) == 33 and (data[0] == 0x02 or data[0] == 0x03)): x = int.from_bytes(data[1:33], 'big') if SECP256K1.is_x_coord(x): p = SECP256K1.lift_x(x) if (p[1] & 1) != (data[0] & 1): p = SECP256K1.negate(p) self.p = p self.valid = True self.compressed = True else: self.valid = False else: self.valid = False @property def is_compressed(self): return self.compressed @property def is_valid(self): return self.valid def get_bytes(self): assert(self.valid) p = SECP256K1.affine(self.p) if p is None: return None if self.compressed: return bytes([0x02 + (p[1] & 1)]) + p[0].to_bytes(32, 'big') else: return bytes([0x04]) + p[0].to_bytes(32, 'big') + \ p[1].to_bytes(32, 'big') def verify_ecdsa(self, sig, msg, low_s=True): """Verify a strictly DER-encoded ECDSA signature against this pubkey.""" assert(self.valid) if (sig[1] + 2 != len(sig)): return False if (len(sig) < 4): return False if (sig[0] != 0x30): return False if (sig[2] != 0x02): return False rlen = sig[3] if (len(sig) < 6 + rlen): return False if rlen < 1 or rlen > 33: return False if sig[4] >= 0x80: return False if (rlen > 1 and (sig[4] == 0) and not (sig[5] & 0x80)): return False r = int.from_bytes(sig[4:4 + rlen], 'big') if (sig[4 + rlen] != 0x02): return False slen = sig[5 + rlen] if slen < 1 or slen > 33: return False if (len(sig) != 6 + rlen + slen): return False if sig[6 + rlen] >= 0x80: return False if (slen > 1 and (sig[6 + rlen] == 0) and not (sig[7 + rlen] & 0x80)): return False s = int.from_bytes(sig[6 + rlen:6 + rlen + slen], 'big') if r < 1 or s < 1 or r >= SECP256K1_ORDER or s >= SECP256K1_ORDER: return False if low_s and s >= SECP256K1_ORDER_HALF: return False z = int.from_bytes(msg, 'big') w = modinv(s, SECP256K1_ORDER) u1 = z * w % SECP256K1_ORDER u2 = r * w % SECP256K1_ORDER R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, u1), (self.p, u2)])) if R is None or R[0] != r: return False return True + def verify_schnorr(self, sig, msg32): + assert self.is_valid + assert len(sig) == 64 + assert len(msg32) == 32 + + Rx = sig[:32] + s = int.from_bytes(sig[32:], 'big') + e = int.from_bytes( + hashlib.sha256( + Rx + + self.get_bytes() + + msg32).digest(), + 'big') + nege = SECP256K1_ORDER - e + + R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, s), (self.p, nege)])) + + if R is None: + return False + if jacobi_symbol(R[1], SECP256K1.p) == -1: + return False + + return R[0] == int.from_bytes(Rx, 'big') + class ECKey(): """A secp256k1 private key""" def __init__(self): self.valid = False def set(self, secret, compressed): """Construct a private key object with given 32-byte secret and compressed flag.""" assert(len(secret) == 32) secret = int.from_bytes(secret, 'big') self.valid = (secret > 0 and secret < SECP256K1_ORDER) if self.valid: self.secret = secret self.compressed = compressed def generate(self, compressed=True): """Generate a random private key (compressed or uncompressed).""" self.set( random.randrange( 1, SECP256K1_ORDER).to_bytes( 32, 'big'), compressed) def get_bytes(self): """Retrieve the 32-byte representation of this key.""" assert(self.valid) return self.secret.to_bytes(32, 'big') @property def is_valid(self): return self.valid @property def is_compressed(self): return self.compressed def get_pubkey(self): """Compute an ECPubKey object for this secret key.""" assert(self.valid) ret = ECPubKey() p = SECP256K1.mul([(SECP256K1_G, self.secret)]) ret.p = p ret.valid = True ret.compressed = self.compressed return ret def sign_ecdsa(self, msg, low_s=True): """Construct a DER-encoded ECDSA signature with this key.""" assert(self.valid) z = int.from_bytes(msg, 'big') # Note: no RFC6979, but a simple random nonce (some tests rely on # distinct transactions for the same operation) k = random.randrange(1, SECP256K1_ORDER) R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)])) r = R[0] % SECP256K1_ORDER s = (modinv(k, SECP256K1_ORDER) * (z + self.secret * r)) % SECP256K1_ORDER if low_s and s > SECP256K1_ORDER_HALF: s = SECP256K1_ORDER - s rb = r.to_bytes((r.bit_length() + 8) // 8, 'big') sb = s.to_bytes((s.bit_length() + 8) // 8, 'big') return b'\x30' + \ bytes([4 + len(rb) + len(sb), 2, len(rb)]) + \ rb + bytes([2, len(sb)]) + sb + + def sign_schnorr(self, msg32): + """Create Schnorr signature (BIP-Schnorr convention).""" + assert self.valid + assert len(msg32) == 32 + + pubkey = self.get_pubkey() + assert pubkey.is_valid + + k = random.randrange(1, SECP256K1_ORDER) + + R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)])) + + if jacobi_symbol(R[1], SECP256K1.p) == -1: + k = SECP256K1_ORDER - k + + Rx = R[0].to_bytes(32, 'big') + e = int.from_bytes( + hashlib.sha256( + Rx + + pubkey.get_bytes() + + msg32).digest(), + 'big') + s = (k + e * int.from_bytes(self.get_bytes(), 'big')) % SECP256K1_ORDER + sig = Rx + s.to_bytes(32, 'big') + + assert pubkey.verify_schnorr(sig, msg32) + return sig diff --git a/test/functional/test_framework/schnorr.py b/test/functional/test_framework/schnorr.py deleted file mode 100755 index e7810d952..000000000 --- a/test/functional/test_framework/schnorr.py +++ /dev/null @@ -1,337 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2019 The Bitcoin Developers -"""Schnorr secp256k1 using OpenSSL - -WARNING: This module does not mlock() secrets; your private keys may end up on -disk in swap! Also, operations are not constant time. Use with caution! - -Inspired by key.py from python-bitcoinlib. -""" - -import ctypes -import ctypes.util -import hashlib -import hmac -import threading - -ssl = ctypes.cdll.LoadLibrary(ctypes.util.find_library('ssl') or 'libeay32') - -ssl.BN_new.restype = ctypes.c_void_p -ssl.BN_new.argtypes = [] - -ssl.BN_free.restype = None -ssl.BN_free.argtypes = [ctypes.c_void_p] - -ssl.BN_bin2bn.restype = ctypes.c_void_p -ssl.BN_bin2bn.argtypes = [ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p] - -ssl.BN_CTX_new.restype = ctypes.c_void_p -ssl.BN_CTX_new.argtypes = [] - -ssl.BN_CTX_free.restype = None -ssl.BN_CTX_free.argtypes = [ctypes.c_void_p] - -ssl.EC_GROUP_new_by_curve_name.restype = ctypes.c_void_p -ssl.EC_GROUP_new_by_curve_name.argtypes = [ctypes.c_int] - -ssl.EC_POINT_new.restype = ctypes.c_void_p -ssl.EC_POINT_new.argtypes = [ctypes.c_void_p] - -ssl.EC_POINT_free.restype = None -ssl.EC_POINT_free.argtypes = [ctypes.c_void_p] - -ssl.EC_POINT_mul.restype = ctypes.c_int -ssl.EC_POINT_mul.argtypes = [ - ctypes.c_void_p, - ctypes.c_void_p, - ctypes.c_void_p, - ctypes.c_void_p, - ctypes.c_void_p, - ctypes.c_void_p] - -ssl.EC_POINT_is_at_infinity.restype = ctypes.c_int -ssl.EC_POINT_is_at_infinity.argtypes = [ctypes.c_void_p, ctypes.c_void_p] - -ssl.EC_POINT_point2oct.restype = ctypes.c_size_t -ssl.EC_POINT_point2oct.argtypes = [ - ctypes.c_void_p, - ctypes.c_void_p, - ctypes.c_int, - ctypes.c_void_p, - ctypes.c_size_t, - ctypes.c_void_p] - -ssl.EC_POINT_oct2point.restype = ctypes.c_int -ssl.EC_POINT_oct2point.argtypes = [ - ctypes.c_void_p, - ctypes.c_void_p, - ctypes.c_void_p, - ctypes.c_size_t, - ctypes.c_void_p] - -# point encodings for EC_POINT_point2oct -POINT_CONVERSION_COMPRESSED = 2 -POINT_CONVERSION_UNCOMPRESSED = 4 - -SECP256K1_FIELDSIZE = 0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffefffffc2f -SECP256K1_ORDER = 0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141 -SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2 - -# this specifies the curve used -NID_secp256k1 = 714 # from openssl/obj_mac.h - -group = ssl.EC_GROUP_new_by_curve_name(NID_secp256k1) -if not group: - raise RuntimeError("Cannot get secp256k1 group!") - - -class CTX: - """Wrapper for a bignum context""" - - def __init__(self): - self.ptr = ssl.BN_CTX_new() - assert self.ptr - - def __del__(self): - ssl.BN_CTX_free(self.ptr) - - _threadlocal = threading.local() - - @classmethod - def ptr_for_this_thread(cls): - """grab a pointer to per-thread ctx""" - try: - self = cls._threadlocal.ctxwrapper - except AttributeError: - self = cls() - cls._threadlocal.ctxwrapper = self - return self.ptr - - -def jacobi(a, n): - """Jacobi symbol""" - - # Based on the Handbook of Applied Cryptography (HAC), algorithm 2.149. - - # This function has been tested by comparison with a small - # table printed in HAC, and by extensive use in calculating - # modular square roots. - - # Borrowed from python ecdsa package (function originally from Peter Pearson) - # ... modified to use bitwise arithmetic when possible, for speed. - - assert n >= 3 - assert n & 1 == 1 - a = a % n - if a == 0: - return 0 - if a == 1: - return 1 - a1, e = a, 0 - while a1 & 1 == 0: - a1, e = a1 >> 1, e + 1 - if e & 1 == 0 or n & 7 == 1 or n & 7 == 7: - s = 1 - else: - s = -1 - if a1 == 1: - return s - if n & 3 == 3 and a1 & 3 == 3: - s = -s - return s * jacobi(n % a1, a1) - - -def nonce_function_rfc6979(privkeybytes, msg32, algo16=b'', ndata=b''): - # RFC6979 deterministic nonce generation, done in libsecp256k1 style. - # see nonce_function_rfc6979() in secp256k1.c; and details in hash_impl.h - assert len(privkeybytes) == 32 - assert len(msg32) == 32 - assert len(algo16) in (0, 16) - assert len(ndata) in (0, 32) - - V = b'\x01' * 32 - K = b'\x00' * 32 - blob = bytes(privkeybytes) + msg32 + ndata + algo16 - # initialize - K = hmac.HMAC(K, V + b'\x00' + blob, 'sha256').digest() - V = hmac.HMAC(K, V, 'sha256').digest() - K = hmac.HMAC(K, V + b'\x01' + blob, 'sha256').digest() - V = hmac.HMAC(K, V, 'sha256').digest() - # loop forever until an in-range k is found - k = 0 - while True: - # see RFC6979 3.2.h.2 : we take a shortcut and don't build T in - # multiple steps since the first step is always the right size for - # our purpose. - V = hmac.HMAC(K, V, 'sha256').digest() - T = V - assert len(T) >= 32 - k = int.from_bytes(T, 'big') - if k > 0 and k < SECP256K1_ORDER: - break - K = hmac.HMAC(K, V + b'\x00', 'sha256').digest() - V = hmac.HMAC(K, V, 'sha256').digest() - return k - - -def sign(privkeybytes, msg32): - """Create Schnorr signature (BIP-Schnorr convention).""" - assert len(privkeybytes) == 32 - assert len(msg32) == 32 - - k = nonce_function_rfc6979( - privkeybytes, msg32, algo16=b"Schnorr+SHA256 ") - - ctx = CTX.ptr_for_this_thread() - - # calculate R point and P point, and get them in - # uncompressed/compressed formats respectively. - R = ssl.EC_POINT_new(group) - assert R - P = ssl.EC_POINT_new(group) - assert P - kbn = ssl.BN_bin2bn(k.to_bytes(32, 'big'), 32, None) - assert kbn - privbn = ssl.BN_bin2bn(privkeybytes, 32, None) - assert privbn - assert ssl.EC_POINT_mul(group, R, kbn, None, None, ctx) - assert ssl.EC_POINT_mul(group, P, privbn, None, None, ctx) - # buffer for uncompressed R coord - Rbuf = ctypes.create_string_buffer(65) - assert 65 == ssl.EC_POINT_point2oct( - group, R, POINT_CONVERSION_UNCOMPRESSED, Rbuf, 65, ctx) - # buffer for compressed P - pubkeybuf = ctypes.create_string_buffer(33) - assert 33 == ssl.EC_POINT_point2oct( - group, P, POINT_CONVERSION_COMPRESSED, pubkeybuf, 33, ctx) - ssl.BN_free(kbn) - ssl.BN_free(privbn) - ssl.EC_POINT_free(P) - ssl.EC_POINT_free(R) - - # y coord - Ry = int.from_bytes(Rbuf[33:65], 'big') - - if jacobi(Ry, SECP256K1_FIELDSIZE) == -1: - k = SECP256K1_ORDER - k - - # x coord big-endian - Rx = Rbuf[1:33] - - e = int.from_bytes(hashlib.sha256( - Rx + pubkeybuf + msg32).digest(), 'big') - - privkey = int.from_bytes(privkeybytes, 'big') - s = (k + e * privkey) % SECP256K1_ORDER - - sig = Rx + s.to_bytes(32, 'big') - assert verifypriv(sig, privkeybytes, msg32) - return sig - - -def verifypriv(sig, privkeybytes, msg32): - ctx = CTX.ptr_for_this_thread() - - P = ssl.EC_POINT_new(group) - assert P - privbn = ssl.BN_bin2bn(privkeybytes, 32, None) - assert privbn - assert ssl.EC_POINT_mul(group, P, privbn, None, None, ctx) - ssl.BN_free(privbn) - - # buffer for compressed P - pubkeybuf = ctypes.create_string_buffer(33) - assert 33 == ssl.EC_POINT_point2oct( - group, P, POINT_CONVERSION_COMPRESSED, pubkeybuf, 33, ctx) - - ssl.EC_POINT_free(P) - return verify(sig, pubkeybuf[:], msg32) - - -def verify(sig, pubkeybuf, msg32): - assert len(sig) == 64 - assert len(pubkeybuf) == 33 - assert len(msg32) == 32 - - Rx = sig[:32] - s = int.from_bytes(sig[32:], 'big') - e = int.from_bytes(hashlib.sha256(Rx + pubkeybuf + msg32).digest(), 'big') - nege = SECP256K1_ORDER - e - - ctx = CTX.ptr_for_this_thread() - - R = ssl.EC_POINT_new(group) - assert R - - P = ssl.EC_POINT_new(group) - assert P - assert ssl.EC_POINT_oct2point(group, P, pubkeybuf, 33, ctx) - - sbn = ssl.BN_bin2bn(s.to_bytes(32, 'big'), 32, None) - assert sbn - negebn = ssl.BN_bin2bn(nege.to_bytes(32, 'big'), 32, None) - assert negebn - assert ssl.EC_POINT_mul(group, R, sbn, P, negebn, ctx) - ssl.BN_free(negebn) - ssl.BN_free(sbn) - ssl.EC_POINT_free(P) - - # buffer for uncompressed R coord - Rbuf = ctypes.create_string_buffer(65) - assert 65 == ssl.EC_POINT_point2oct( - group, R, POINT_CONVERSION_UNCOMPRESSED, Rbuf, 65, ctx) - ssl.EC_POINT_free(R) - - if Rbuf[1:33] != Rx: - return False - - Ry = int.from_bytes(Rbuf[33:65], 'big') - return jacobi(Ry, SECP256K1_FIELDSIZE) == 1 - - -def getpubkey(privkeybytes, compressed=True): - assert len(privkeybytes) == 32 - encoding = POINT_CONVERSION_COMPRESSED if compressed else POINT_CONVERSION_UNCOMPRESSED - - ctx = CTX.ptr_for_this_thread() - - pubkey = ssl.EC_POINT_new(group) - assert pubkey - privbn = ssl.BN_bin2bn(privkeybytes, 32, None) - assert privbn - assert ssl.EC_POINT_mul(group, pubkey, privbn, None, None, ctx) - assert not ssl.EC_POINT_is_at_infinity(group, pubkey) - # first call (with nullptr for buffer) gets us the size - size = ssl.EC_POINT_point2oct(group, pubkey, encoding, None, 0, ctx) - pubkeybuf = ctypes.create_string_buffer(size) - ret = ssl.EC_POINT_point2oct(group, pubkey, encoding, pubkeybuf, size, ctx) - assert ret == size - ssl.BN_free(privbn) - ssl.EC_POINT_free(pubkey) - return bytes(pubkeybuf) - - -if __name__ == '__main__': - # Test Schnorr implementation. - # duplicate the deterministic sig test from src/test/key_tests.cpp - private_key = bytes.fromhex( - "12b004fff7f4b69ef8650e767f18f11ede158148b425660723b9f9a66e61f747") - - pubkey = getpubkey(private_key, compressed=True) - assert pubkey == bytes.fromhex( - "030b4c866585dd868a9d62348a9cd008d6a312937048fff31670e7e920cfc7a744") - - def sha(b): - return hashlib.sha256(b).digest() - msg = b"Very deterministic message" - msghash = sha(sha(msg)) - assert msghash == bytes.fromhex( - "5255683da567900bfd3e786ed8836a4e7763c221bf1ac20ece2a5171b9199e8a") - - sig = sign(private_key, msghash) - assert sig == bytes.fromhex( - "2c56731ac2f7a7e7f11518fc7722a166b02438924ca9d8b4d1113" - "47b81d0717571846de67ad3d913a8fdf9d8f3f73161a4c48ae81c" - "b183b214765feb86e255ce") - - print("ok")