Changeset View
Changeset View
Standalone View
Standalone View
test/functional/abc-schnorrmultisig.py
Show All 39 Lines | from test_framework.script import ( | ||||
CScript, | CScript, | ||||
SignatureHashForkId, | SignatureHashForkId, | ||||
) | ) | ||||
from test_framework.test_framework import BitcoinTestFramework | from test_framework.test_framework import BitcoinTestFramework | ||||
from test_framework.util import assert_equal, assert_raises_rpc_error | from test_framework.util import assert_equal, assert_raises_rpc_error | ||||
# ECDSA checkmultisig with non-null dummy are invalid since the new mode | # ECDSA checkmultisig with non-null dummy are invalid since the new mode | ||||
# refuses ECDSA. | # refuses ECDSA. | ||||
ECDSA_NULLDUMMY_ERROR = 'mandatory-script-verify-flag-failed (Only Schnorr signatures allowed in this operation)' | ECDSA_NULLDUMMY_ERROR = ( | ||||
"mandatory-script-verify-flag-failed (Only Schnorr signatures allowed in this" | |||||
" operation)" | |||||
) | |||||
# A mandatory (bannable) error occurs when people pass Schnorr signatures into | # A mandatory (bannable) error occurs when people pass Schnorr signatures into | ||||
# legacy OP_CHECKMULTISIG. | # legacy OP_CHECKMULTISIG. | ||||
SCHNORR_LEGACY_MULTISIG_ERROR = 'mandatory-script-verify-flag-failed (Signature cannot be 65 bytes in CHECKMULTISIG)' | SCHNORR_LEGACY_MULTISIG_ERROR = ( | ||||
"mandatory-script-verify-flag-failed (Signature cannot be 65 bytes in" | |||||
" CHECKMULTISIG)" | |||||
) | |||||
# Blocks with invalid scripts give this error: | # Blocks with invalid scripts give this error: | ||||
BADINPUTS_ERROR = 'blk-bad-inputs' | BADINPUTS_ERROR = "blk-bad-inputs" | ||||
class SchnorrMultisigTest(BitcoinTestFramework): | class SchnorrMultisigTest(BitcoinTestFramework): | ||||
def set_test_params(self): | def set_test_params(self): | ||||
self.num_nodes = 1 | self.num_nodes = 1 | ||||
self.block_heights = {} | self.block_heights = {} | ||||
self.extra_args = [["-acceptnonstdtxn=1"]] | self.extra_args = [["-acceptnonstdtxn=1"]] | ||||
def reconnect_p2p(self): | def reconnect_p2p(self): | ||||
"""Tear down and bootstrap the P2P connection to the node. | """Tear down and bootstrap the P2P connection to the node. | ||||
Show All 14 Lines | class SchnorrMultisigTest(BitcoinTestFramework): | ||||
def build_block(self, parent, transactions=(), nTime=None): | def build_block(self, parent, transactions=(), nTime=None): | ||||
"""Make a new block with an OP_1 coinbase output. | """Make a new block with an OP_1 coinbase output. | ||||
Requires parent to have its height registered.""" | Requires parent to have its height registered.""" | ||||
parent.calc_sha256() | parent.calc_sha256() | ||||
block_height = self.block_heights[parent.sha256] + 1 | block_height = self.block_heights[parent.sha256] + 1 | ||||
block_time = (parent.nTime + 1) if nTime is None else nTime | block_time = (parent.nTime + 1) if nTime is None else nTime | ||||
block = create_block( | block = create_block(parent.sha256, create_coinbase(block_height), block_time) | ||||
parent.sha256, create_coinbase(block_height), block_time) | |||||
block.vtx.extend(transactions) | block.vtx.extend(transactions) | ||||
make_conform_to_ctor(block) | make_conform_to_ctor(block) | ||||
block.hashMerkleRoot = block.calc_merkle_root() | block.hashMerkleRoot = block.calc_merkle_root() | ||||
block.solve() | block.solve() | ||||
self.block_heights[block.sha256] = block_height | self.block_heights[block.sha256] = block_height | ||||
return block | return block | ||||
def check_for_ban_on_rejected_tx(self, tx, reject_reason=None): | def check_for_ban_on_rejected_tx(self, tx, reject_reason=None): | ||||
"""Check we are disconnected when sending a txn that the node rejects. | """Check we are disconnected when sending a txn that the node rejects. | ||||
(Can't actually get banned, since bitcoind won't ban local peers.)""" | (Can't actually get banned, since bitcoind won't ban local peers.)""" | ||||
self.nodes[0].p2ps[0].send_txs_and_test( | self.nodes[0].p2ps[0].send_txs_and_test( | ||||
[tx], self.nodes[0], success=False, expect_disconnect=True, reject_reason=reject_reason) | [tx], | ||||
self.nodes[0], | |||||
success=False, | |||||
expect_disconnect=True, | |||||
reject_reason=reject_reason, | |||||
) | |||||
self.reconnect_p2p() | self.reconnect_p2p() | ||||
def check_for_ban_on_rejected_block(self, block, reject_reason=None): | def check_for_ban_on_rejected_block(self, block, reject_reason=None): | ||||
"""Check we are disconnected when sending a block that the node rejects. | """Check we are disconnected when sending a block that the node rejects. | ||||
(Can't actually get banned, since bitcoind won't ban local peers.)""" | (Can't actually get banned, since bitcoind won't ban local peers.)""" | ||||
self.nodes[0].p2ps[0].send_blocks_and_test( | self.nodes[0].p2ps[0].send_blocks_and_test( | ||||
[block], self.nodes[0], success=False, reject_reason=reject_reason, expect_disconnect=True) | [block], | ||||
self.nodes[0], | |||||
success=False, | |||||
reject_reason=reject_reason, | |||||
expect_disconnect=True, | |||||
) | |||||
self.reconnect_p2p() | self.reconnect_p2p() | ||||
def run_test(self): | def run_test(self): | ||||
node, = self.nodes | (node,) = self.nodes | ||||
self.nodes[0].add_p2p_connection(P2PDataStore()) | self.nodes[0].add_p2p_connection(P2PDataStore()) | ||||
tip = self.getbestblock(node) | tip = self.getbestblock(node) | ||||
self.log.info("Create some blocks with OP_1 coinbase for spending.") | self.log.info("Create some blocks with OP_1 coinbase for spending.") | ||||
blocks = [] | blocks = [] | ||||
for _ in range(10): | for _ in range(10): | ||||
Show All 11 Lines | def run_test(self): | ||||
fundings = [] | fundings = [] | ||||
# Generate a key pair | # Generate a key pair | ||||
private_key = ECKey() | private_key = ECKey() | ||||
private_key.set(b"Schnorr!" * 4, True) | private_key.set(b"Schnorr!" * 4, True) | ||||
# get uncompressed public key serialization | # get uncompressed public key serialization | ||||
public_key = private_key.get_pubkey().get_bytes() | public_key = private_key.get_pubkey().get_bytes() | ||||
def create_fund_and_spend_tx(dummy=OP_0, sigtype='ecdsa'): | def create_fund_and_spend_tx(dummy=OP_0, sigtype="ecdsa"): | ||||
spendfrom = spendable_outputs.pop() | spendfrom = spendable_outputs.pop() | ||||
script = CScript([OP_1, public_key, OP_1, OP_CHECKMULTISIG]) | script = CScript([OP_1, public_key, OP_1, OP_CHECKMULTISIG]) | ||||
value = spendfrom.vout[0].nValue | value = spendfrom.vout[0].nValue | ||||
# Fund transaction | # Fund transaction | ||||
txfund = create_tx_with_script( | txfund = create_tx_with_script( | ||||
spendfrom, 0, b'', amount=value, script_pub_key=script) | spendfrom, 0, b"", amount=value, script_pub_key=script | ||||
) | |||||
txfund.rehash() | txfund.rehash() | ||||
fundings.append(txfund) | fundings.append(txfund) | ||||
# Spend transaction | # Spend transaction | ||||
txspend = CTransaction() | txspend = CTransaction() | ||||
txspend.vout.append( | txspend.vout.append(CTxOut(value - 1000, CScript([OP_TRUE]))) | ||||
CTxOut(value - 1000, CScript([OP_TRUE]))) | txspend.vin.append(CTxIn(COutPoint(txfund.sha256, 0), b"")) | ||||
txspend.vin.append( | |||||
CTxIn(COutPoint(txfund.sha256, 0), b'')) | |||||
# Sign the transaction | # Sign the transaction | ||||
sighashtype = SIGHASH_ALL | SIGHASH_FORKID | sighashtype = SIGHASH_ALL | SIGHASH_FORKID | ||||
hashbyte = bytes([sighashtype & 0xff]) | hashbyte = bytes([sighashtype & 0xFF]) | ||||
sighash = SignatureHashForkId( | sighash = SignatureHashForkId(script, txspend, 0, sighashtype, value) | ||||
script, txspend, 0, sighashtype, value) | if sigtype == "schnorr": | ||||
if sigtype == 'schnorr': | |||||
txsig = private_key.sign_schnorr(sighash) + hashbyte | txsig = private_key.sign_schnorr(sighash) + hashbyte | ||||
elif sigtype == 'ecdsa': | elif sigtype == "ecdsa": | ||||
txsig = private_key.sign_ecdsa(sighash) + hashbyte | txsig = private_key.sign_ecdsa(sighash) + hashbyte | ||||
txspend.vin[0].scriptSig = CScript([dummy, txsig]) | txspend.vin[0].scriptSig = CScript([dummy, txsig]) | ||||
txspend.rehash() | txspend.rehash() | ||||
return txspend | return txspend | ||||
# This is valid. | # This is valid. | ||||
ecdsa0tx = create_fund_and_spend_tx(OP_0, 'ecdsa') | ecdsa0tx = create_fund_and_spend_tx(OP_0, "ecdsa") | ||||
# This is invalid. | # This is invalid. | ||||
ecdsa1tx = create_fund_and_spend_tx(OP_1, 'ecdsa') | ecdsa1tx = create_fund_and_spend_tx(OP_1, "ecdsa") | ||||
# This is invalid. | # This is invalid. | ||||
schnorr0tx = create_fund_and_spend_tx(OP_0, 'schnorr') | schnorr0tx = create_fund_and_spend_tx(OP_0, "schnorr") | ||||
# This is valid. | # This is valid. | ||||
schnorr1tx = create_fund_and_spend_tx(OP_1, 'schnorr') | schnorr1tx = create_fund_and_spend_tx(OP_1, "schnorr") | ||||
tip = self.build_block(tip, fundings) | tip = self.build_block(tip, fundings) | ||||
node.p2ps[0].send_blocks_and_test([tip], node) | node.p2ps[0].send_blocks_and_test([tip], node) | ||||
self.log.info("Send a legacy ECDSA multisig into mempool.") | self.log.info("Send a legacy ECDSA multisig into mempool.") | ||||
node.p2ps[0].send_txs_and_test([ecdsa0tx], node) | node.p2ps[0].send_txs_and_test([ecdsa0tx], node) | ||||
assert_equal(node.getrawmempool(), [ecdsa0tx.hash]) | assert_equal(node.getrawmempool(), [ecdsa0tx.hash]) | ||||
self.log.info("Trying to mine a non-null-dummy ECDSA.") | self.log.info("Trying to mine a non-null-dummy ECDSA.") | ||||
self.check_for_ban_on_rejected_block( | self.check_for_ban_on_rejected_block( | ||||
self.build_block(tip, [ecdsa1tx]), BADINPUTS_ERROR) | self.build_block(tip, [ecdsa1tx]), BADINPUTS_ERROR | ||||
) | |||||
self.log.info( | self.log.info( | ||||
"If we try to submit it by mempool or RPC, it is rejected and we are banned") | "If we try to submit it by mempool or RPC, it is rejected and we are banned" | ||||
assert_raises_rpc_error(-26, ECDSA_NULLDUMMY_ERROR, | ) | ||||
node.sendrawtransaction, ToHex(ecdsa1tx)) | assert_raises_rpc_error( | ||||
self.check_for_ban_on_rejected_tx( | -26, ECDSA_NULLDUMMY_ERROR, node.sendrawtransaction, ToHex(ecdsa1tx) | ||||
ecdsa1tx, ECDSA_NULLDUMMY_ERROR) | ) | ||||
self.check_for_ban_on_rejected_tx(ecdsa1tx, ECDSA_NULLDUMMY_ERROR) | |||||
self.log.info( | self.log.info("Submitting a Schnorr-multisig via net, and mining it in a block") | ||||
"Submitting a Schnorr-multisig via net, and mining it in a block") | |||||
node.p2ps[0].send_txs_and_test([schnorr1tx], node) | node.p2ps[0].send_txs_and_test([schnorr1tx], node) | ||||
assert_equal(set(node.getrawmempool()), { | assert_equal(set(node.getrawmempool()), {ecdsa0tx.hash, schnorr1tx.hash}) | ||||
ecdsa0tx.hash, schnorr1tx.hash}) | |||||
tip = self.build_block(tip, [schnorr1tx]) | tip = self.build_block(tip, [schnorr1tx]) | ||||
node.p2ps[0].send_blocks_and_test([tip], node) | node.p2ps[0].send_blocks_and_test([tip], node) | ||||
self.log.info( | self.log.info("That legacy ECDSA multisig is still in mempool, let's mine it") | ||||
"That legacy ECDSA multisig is still in mempool, let's mine it") | |||||
assert_equal(node.getrawmempool(), [ecdsa0tx.hash]) | assert_equal(node.getrawmempool(), [ecdsa0tx.hash]) | ||||
tip = self.build_block(tip, [ecdsa0tx]) | tip = self.build_block(tip, [ecdsa0tx]) | ||||
node.p2ps[0].send_blocks_and_test([tip], node) | node.p2ps[0].send_blocks_and_test([tip], node) | ||||
assert_equal(node.getrawmempool(), []) | assert_equal(node.getrawmempool(), []) | ||||
self.log.info( | self.log.info("Trying Schnorr in legacy multisig is invalid and banworthy.") | ||||
"Trying Schnorr in legacy multisig is invalid and banworthy.") | self.check_for_ban_on_rejected_tx(schnorr0tx, SCHNORR_LEGACY_MULTISIG_ERROR) | ||||
self.check_for_ban_on_rejected_tx( | |||||
schnorr0tx, SCHNORR_LEGACY_MULTISIG_ERROR) | |||||
self.check_for_ban_on_rejected_block( | self.check_for_ban_on_rejected_block( | ||||
self.build_block(tip, [schnorr0tx]), BADINPUTS_ERROR) | self.build_block(tip, [schnorr0tx]), BADINPUTS_ERROR | ||||
) | |||||
if __name__ == '__main__': | if __name__ == "__main__": | ||||
SchnorrMultisigTest().main() | SchnorrMultisigTest().main() |