diff --git a/test/functional/abc-cmdline.py b/test/functional/abc-cmdline.py index 9d6006667..d6a5f7815 100755 --- a/test/functional/abc-cmdline.py +++ b/test/functional/abc-cmdline.py @@ -1,71 +1,72 @@ #!/usr/bin/env python3 # Copyright (c) 2017 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Exercise the command line functions specific to ABC functionality. Currently: -excessiveblocksize= """ import re from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal from test_framework.cdefs import LEGACY_MAX_BLOCK_SIZE MAX_GENERATED_BLOCK_SIZE_ERROR = ( 'Max generated block size (blockmaxsize) cannot exceed the excessive block size (excessiveblocksize)') class ABC_CmdLine_Test (BitcoinTestFramework): def set_test_params(self): self.num_nodes = 1 self.setup_clean_chain = False def check_excessive(self, expected_value): 'Check that the excessiveBlockSize is as expected' getsize = self.nodes[0].getexcessiveblock() ebs = getsize['excessiveBlockSize'] assert_equal(ebs, expected_value) def check_subversion(self, pattern_str): 'Check that the subversion is set as expected' netinfo = self.nodes[0].getnetworkinfo() subversion = netinfo['subversion'] pattern = re.compile(pattern_str) assert(pattern.match(subversion)) def excessiveblocksize_test(self): self.log.info("Testing -excessiveblocksize") - self.log.info(" Set to twice the default, i.e. %d bytes" % - (2 * LEGACY_MAX_BLOCK_SIZE)) + self.log.info(" Set to twice the default, i.e. {} bytes".format( + 2 * LEGACY_MAX_BLOCK_SIZE)) self.stop_node(0) - self.start_node(0, ["-excessiveblocksize=%d" % - (2 * LEGACY_MAX_BLOCK_SIZE)]) + self.start_node(0, ["-excessiveblocksize={}".format( + 2 * LEGACY_MAX_BLOCK_SIZE)]) self.check_excessive(2 * LEGACY_MAX_BLOCK_SIZE) # Check for EB correctness in the subver string self.check_subversion("/Bitcoin ABC:.*\(EB2\.0; .*\)/") - self.log.info(" Attempt to set below legacy limit of 1MB - try %d bytes" % - LEGACY_MAX_BLOCK_SIZE) + self.log.info(" Attempt to set below legacy limit of 1MB - try {} bytes".format( + LEGACY_MAX_BLOCK_SIZE)) self.stop_node(0) self.assert_start_raises_init_error( - 0, ["-excessiveblocksize=%d" % LEGACY_MAX_BLOCK_SIZE], 'Error: Excessive block size must be > 1,000,000 bytes (1MB)') + 0, ["-excessiveblocksize={}".format(LEGACY_MAX_BLOCK_SIZE)], + 'Error: Excessive block size must be > 1,000,000 bytes (1MB)') self.log.info(" Attempt to set below blockmaxsize (mining limit)") self.assert_start_raises_init_error( 0, ['-blockmaxsize=1500000', '-excessiveblocksize=1300000'], 'Error: ' + MAX_GENERATED_BLOCK_SIZE_ERROR) # Make sure we leave the test with a node running as this is what thee # framework expects. self.start_node(0, []) def run_test(self): # Run tests on -excessiveblocksize option self.excessiveblocksize_test() if __name__ == '__main__': ABC_CmdLine_Test().main() diff --git a/test/functional/abc-magnetic-anomaly-mining.py b/test/functional/abc-magnetic-anomaly-mining.py index 3c0e20cdc..b558adbc1 100755 --- a/test/functional/abc-magnetic-anomaly-mining.py +++ b/test/functional/abc-magnetic-anomaly-mining.py @@ -1,119 +1,119 @@ #!/usr/bin/env python3 # Copyright (c) 2014-2016 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test that mining RPC continues to supply correct transaction metadata after the Nov 2018 protocol upgrade which engages canonical transaction ordering """ import time import random import decimal from test_framework.test_framework import BitcoinTestFramework class CTORMiningTest(BitcoinTestFramework): def set_test_params(self): # Setup two nodes so we can getblocktemplate # it errors out if it is not connected to other nodes self.num_nodes = 2 self.setup_clean_chain = True self.block_heights = {} self.tip = None self.blocks = {} self.mocktime = int(time.time()) - 600 * 100 extra_arg = ['-spendzeroconfchange=0', '-whitelist=127.0.0.1', - "-replayprotectionactivationtime=%d" % (10 * self.mocktime)] + "-replayprotectionactivationtime={}".format(10 * self.mocktime)] self.extra_args = [extra_arg, extra_arg] def run_test(self): mining_node = self.nodes[0] # Helper for updating the times def update_time(): mining_node.setmocktime(self.mocktime) self.mocktime = self.mocktime + 600 mining_node.getnewaddress() # Generate some unspent utxos and also # activate magnetic anomaly for x in range(150): update_time() mining_node.generate(1) update_time() unspent = mining_node.listunspent() transactions = {} # Spend all our coinbases while len(unspent): inputs = [] # Grab a random number of inputs for _ in range(random.randrange(1, 5)): txin = unspent.pop() inputs.append({ 'txid': txin['txid'], 'vout': 0 # This is a coinbase }) if len(unspent) == 0: break outputs = {} # Calculate a unique fee for this transaction fee = decimal.Decimal(random.randint( 1000, 2000)) / decimal.Decimal(1e8) # Spend to the same number of outputs as inputs, so we can leave # the amounts unchanged and avoid rounding errors. # # NOTE: There will be 1 sigop per output (which equals the number # of inputs now). We need this randomization to ensure the # numbers are properly following the transactions in the block # template metadata addr = "" for _ in range(len(inputs)): addr = mining_node.getnewaddress() output = { # 50 BCH per coinbase addr: decimal.Decimal(50) } outputs.update(output) # Take the fee off the last output to avoid rounding errors we # need the exact fee later for assertions outputs[addr] -= fee rawtx = mining_node.createrawtransaction(inputs, outputs) signedtx = mining_node.signrawtransaction(rawtx) txid = mining_node.sendrawtransaction(signedtx['hex']) # number of outputs is the same as the number of sigops in this # case transactions.update({txid: {'fee': fee, 'sigops': len(outputs)}}) tmpl = mining_node.getblocktemplate() assert 'proposal' in tmpl['capabilities'] assert 'coinbasetxn' not in tmpl # Check the template transaction metadata and ordering last_txid = 0 for txn in tmpl['transactions'][1:]: txid = txn['txid'] txnMetadata = transactions[txid] expectedFeeSats = int(txnMetadata['fee'] * 10**8) expectedSigOps = txnMetadata['sigops'] txid_decoded = int(txid, 16) # Assert we got the expected metadata assert(expectedFeeSats == txn['fee']) assert(expectedSigOps == txn['sigops']) # Assert transaction ids are in order assert(last_txid == 0 or last_txid < txid_decoded) last_txid = txid_decoded if __name__ == '__main__': CTORMiningTest().main() diff --git a/test/functional/abc-p2p-compactblocks.py b/test/functional/abc-p2p-compactblocks.py index 538d58d5e..f5f2cc6c3 100755 --- a/test/functional/abc-p2p-compactblocks.py +++ b/test/functional/abc-p2p-compactblocks.py @@ -1,381 +1,381 @@ #!/usr/bin/env python3 # Copyright (c) 2015-2016 The Bitcoin Core developers # Copyright (c) 2017 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ This test checks simple acceptance of bigger blocks via p2p. It is derived from the much more complex p2p-fullblocktest. The intention is that small tests can be derived from this one, or this one can be extended, to cover the checks done for bigger blocks (e.g. sigops limits). """ from test_framework.test_framework import ComparisonTestFramework from test_framework.util import * from test_framework.comptool import TestManager, TestInstance from test_framework.blocktools import * import time from test_framework.script import * from test_framework.cdefs import ONE_MEGABYTE from collections import deque class PreviousSpendableOutput(): def __init__(self, tx=CTransaction(), n=-1): self.tx = tx self.n = n # the output we're spending # TestNode: A peer we use to send messages to bitcoind, and store responses. class TestNode(P2PInterface): def __init__(self): self.last_sendcmpct = None self.last_cmpctblock = None self.last_getheaders = None self.last_headers = None super().__init__() def on_sendcmpct(self, message): self.last_sendcmpct = message def on_cmpctblock(self, message): self.last_cmpctblock = message self.last_cmpctblock.header_and_shortids.header.calc_sha256() def on_getheaders(self, message): self.last_getheaders = message def on_headers(self, message): self.last_headers = message for x in self.last_headers.headers: x.calc_sha256() def clear_block_data(self): with mininode_lock: self.last_sendcmpct = None self.last_cmpctblock = None class FullBlockTest(ComparisonTestFramework): # Can either run this test as 1 node with expected answers, or two and compare them. # Change the "outcome" variable from each TestInstance object to only do # the comparison. def set_test_params(self): self.num_nodes = 1 self.setup_clean_chain = True self.block_heights = {} self.tip = None self.blocks = {} self.excessive_block_size = 16 * ONE_MEGABYTE self.extra_args = [['-norelaypriority', '-whitelist=127.0.0.1', '-limitancestorcount=999999', '-limitancestorsize=999999', '-limitdescendantcount=999999', '-limitdescendantsize=999999', '-maxmempool=99999', - "-excessiveblocksize=%d" % self.excessive_block_size]] + "-excessiveblocksize={}".format(self.excessive_block_size)]] def add_options(self, parser): super().add_options(parser) parser.add_argument( "--runbarelyexpensive", dest="runbarelyexpensive", default=True) def run_test(self): self.test = TestManager(self, self.options.tmpdir) self.test.add_all_connections(self.nodes) network_thread_start() # Set the blocksize to 2MB as initial condition self.nodes[0].setexcessiveblock(self.excessive_block_size) self.test.run() def add_transactions_to_block(self, block, tx_list): [tx.rehash() for tx in tx_list] block.vtx.extend(tx_list) # this is a little handier to use than the version in blocktools.py def create_tx(self, spend_tx, n, value, script=CScript([OP_TRUE])): tx = create_transaction(spend_tx, n, b"", value, script) return tx def next_block(self, number, spend=None, script=CScript([OP_TRUE]), block_size=0, extra_txns=0): if self.tip == None: base_block_hash = self.genesis_hash block_time = int(time.time()) + 1 else: base_block_hash = self.tip.sha256 block_time = self.tip.nTime + 1 # First create the coinbase height = self.block_heights[base_block_hash] + 1 coinbase = create_coinbase(height) coinbase.rehash() if spend == None: # We need to have something to spend to fill the block. assert_equal(block_size, 0) block = create_block(base_block_hash, coinbase, block_time) else: # all but one satoshi to fees coinbase.vout[0].nValue += spend.tx.vout[spend.n].nValue - 1 coinbase.rehash() block = create_block(base_block_hash, coinbase, block_time) # Make sure we have plenty enough to spend going forward. spendable_outputs = deque([spend]) def get_base_transaction(): # Create the new transaction tx = CTransaction() # Spend from one of the spendable outputs spend = spendable_outputs.popleft() tx.vin.append(CTxIn(COutPoint(spend.tx.sha256, spend.n))) # Add spendable outputs for i in range(4): tx.vout.append(CTxOut(0, CScript([OP_TRUE]))) spendable_outputs.append(PreviousSpendableOutput(tx, i)) pad_tx(tx) return tx tx = get_base_transaction() # Make it the same format as transaction added for padding and save the size. # It's missing the padding output, so we add a constant to account for it. tx.rehash() base_tx_size = len(tx.serialize()) + 18 # If a specific script is required, add it. if script != None: tx.vout.append(CTxOut(1, script)) # Put some random data into the first transaction of the chain to randomize ids. tx.vout.append( CTxOut(0, CScript([random.randint(0, 256), OP_RETURN]))) # Add the transaction to the block self.add_transactions_to_block(block, [tx]) # Add transaction until we reach the expected transaction count for _ in range(extra_txns): self.add_transactions_to_block(block, [get_base_transaction()]) # If we have a block size requirement, just fill # the block until we get there current_block_size = len(block.serialize()) overage_bytes = 0 while current_block_size < block_size: # We will add a new transaction. That means the size of # the field enumerating how many transaction go in the block # may change. current_block_size -= len(ser_compact_size(len(block.vtx))) current_block_size += len(ser_compact_size(len(block.vtx) + 1)) # Add padding to fill the block. left_to_fill = block_size - current_block_size # Don't go over the 1 mb limit for a txn if left_to_fill > 500000: # Make sure we eat up non-divisible by 100 amounts quickly # Also keep transaction less than 1 MB left_to_fill = 500000 + left_to_fill % 100 # Create the new transaction tx = get_base_transaction() pad_tx(tx, left_to_fill - overage_bytes) if len(tx.serialize()) + current_block_size > block_size: # Our padding was too big try again overage_bytes += 1 continue # Add the tx to the list of transactions to be included # in the block. self.add_transactions_to_block(block, [tx]) current_block_size += len(tx.serialize()) # Now that we added a bunch of transaction, we need to recompute # the merkle root. make_conform_to_ctor(block) block.hashMerkleRoot = block.calc_merkle_root() # Check that the block size is what's expected if block_size > 0: assert_equal(len(block.serialize()), block_size) # Do PoW, which is cheap on regnet block.solve() self.tip = block self.block_heights[block.sha256] = height assert number not in self.blocks self.blocks[number] = block return block def get_tests(self): self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16) self.block_heights[self.genesis_hash] = 0 spendable_outputs = [] # save the current tip so it can be spent by a later block def save_spendable_output(): spendable_outputs.append(self.tip) # get an output that we previously marked as spendable def get_spendable_output(): return PreviousSpendableOutput(spendable_outputs.pop(0).vtx[0], 0) # returns a test case that asserts that the current tip was accepted def accepted(): return TestInstance([[self.tip, True]]) # returns a test case that asserts that the current tip was rejected def rejected(reject=None): if reject is None: return TestInstance([[self.tip, False]]) else: return TestInstance([[self.tip, reject]]) # move the tip back to a previous block def tip(number): self.tip = self.blocks[number] # shorthand for functions block = self.next_block # Create a new block block(0) save_spendable_output() yield accepted() # Now we need that block to mature so we can spend the coinbase. test = TestInstance(sync_every_block=False) for i in range(99): block(5000 + i) test.blocks_and_transactions.append([self.tip, True]) save_spendable_output() # Get to one block of the May 15, 2018 HF activation for i in range(6): block(5100 + i) test.blocks_and_transactions.append([self.tip, True]) # Send it all to the node at once. yield test # collect spendable outputs now to avoid cluttering the code later on out = [] for i in range(100): out.append(get_spendable_output()) # There can be only one network thread running at a time. # Adding a new P2P connection here will try to start the network thread # at init, which will throw an assertion because it's already running. # This requires a few steps to avoid this: # 1/ Disconnect all the TestManager nodes # 2/ Terminate the network thread # 3/ Add the new P2P connection # 4/ Reconnect all the TestManager nodes # 5/ Restart the network thread # Disconnect all the TestManager nodes [n.disconnect_node() for n in self.test.p2p_connections] self.test.wait_for_disconnections() self.test.clear_all_connections() # Wait for the network thread to terminate network_thread_join() # Add the new connection node = self.nodes[0] node.add_p2p_connection(TestNode()) # Reconnect TestManager nodes self.test.add_all_connections(self.nodes) # Restart the network thread network_thread_start() # Wait for connection to be etablished peer = node.p2p peer.wait_for_verack() # Check that compact block also work for big blocks # Wait for SENDCMPCT def received_sendcmpct(): return (peer.last_sendcmpct != None) wait_until(received_sendcmpct, timeout=30) sendcmpct = msg_sendcmpct() sendcmpct.version = 1 sendcmpct.announce = True peer.send_and_ping(sendcmpct) # Exchange headers def received_getheaders(): return (peer.last_getheaders != None) wait_until(received_getheaders, timeout=30) # Return the favor peer.send_message(peer.last_getheaders) # Wait for the header list def received_headers(): return (peer.last_headers != None) wait_until(received_headers, timeout=30) # It's like we know about the same headers ! peer.send_message(peer.last_headers) # Send a block b1 = block(1, spend=out[0], block_size=ONE_MEGABYTE + 1) yield accepted() # Checks the node to forward it via compact block def received_block(): return (peer.last_cmpctblock != None) wait_until(received_block, timeout=30) # Was it our block ? cmpctblk_header = peer.last_cmpctblock.header_and_shortids.header cmpctblk_header.calc_sha256() assert(cmpctblk_header.sha256 == b1.sha256) # Send a large block with numerous transactions. peer.clear_block_data() b2 = block(2, spend=out[1], extra_txns=70000, block_size=self.excessive_block_size - 1000) yield accepted() # Checks the node forwards it via compact block wait_until(received_block, timeout=30) # Was it our block ? cmpctblk_header = peer.last_cmpctblock.header_and_shortids.header cmpctblk_header.calc_sha256() assert(cmpctblk_header.sha256 == b2.sha256) # In order to avoid having to resend a ton of transactions, we invalidate # b2, which will send all its transactions in the mempool. node.invalidateblock(node.getbestblockhash()) # Let's send a compact block and see if the node accepts it. # Let's modify b2 and use it so that we can reuse the mempool. tx = b2.vtx[0] tx.vout.append(CTxOut(0, CScript([random.randint(0, 256), OP_RETURN]))) tx.rehash() b2.vtx[0] = tx b2.hashMerkleRoot = b2.calc_merkle_root() b2.solve() # Now we create the compact block and send it comp_block = HeaderAndShortIDs() comp_block.initialize_from_block(b2) peer.send_and_ping(msg_cmpctblock(comp_block.to_p2p())) # Check that compact block is received properly assert(int(node.getbestblockhash(), 16) == b2.sha256) if __name__ == '__main__': FullBlockTest().main() diff --git a/test/functional/abc-p2p-fullblocktest.py b/test/functional/abc-p2p-fullblocktest.py index 5cf32a69e..ced4bdf73 100755 --- a/test/functional/abc-p2p-fullblocktest.py +++ b/test/functional/abc-p2p-fullblocktest.py @@ -1,401 +1,402 @@ #!/usr/bin/env python3 # Copyright (c) 2015-2016 The Bitcoin Core developers # Copyright (c) 2017 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ This test checks simple acceptance of bigger blocks via p2p. It is derived from the much more complex p2p-fullblocktest. The intention is that small tests can be derived from this one, or this one can be extended, to cover the checks done for bigger blocks (e.g. sigops limits). """ from test_framework.test_framework import ComparisonTestFramework from test_framework.util import assert_equal from test_framework.comptool import TestManager, TestInstance, RejectResult from test_framework.blocktools import * import time from test_framework.key import CECKey from test_framework.script import * from test_framework.cdefs import (ONE_MEGABYTE, MAX_BLOCK_SIGOPS_PER_MB, MAX_TX_SIGOPS_COUNT) from collections import deque REPLAY_PROTECTION_START_TIME = 2000000000 class PreviousSpendableOutput(): def __init__(self, tx=CTransaction(), n=-1): self.tx = tx self.n = n # the output we're spending class FullBlockTest(ComparisonTestFramework): # Can either run this test as 1 node with expected answers, or two and compare them. # Change the "outcome" variable from each TestInstance object to only do # the comparison. def set_test_params(self): self.num_nodes = 1 self.setup_clean_chain = True self.block_heights = {} self.tip = None self.blocks = {} self.excessive_block_size = 100 * ONE_MEGABYTE self.extra_args = [['-whitelist=127.0.0.1', - "-replayprotectionactivationtime=%d" % REPLAY_PROTECTION_START_TIME, - "-excessiveblocksize=%d" % self.excessive_block_size]] + "-replayprotectionactivationtime={}".format( + REPLAY_PROTECTION_START_TIME), + "-excessiveblocksize={}".format(self.excessive_block_size)]] def add_options(self, parser): super().add_options(parser) parser.add_argument( "--runbarelyexpensive", dest="runbarelyexpensive", default=True) def run_test(self): self.test = TestManager(self, self.options.tmpdir) self.test.add_all_connections(self.nodes) network_thread_start() # Set the blocksize to 2MB as initial condition self.nodes[0].setexcessiveblock(self.excessive_block_size) self.test.run() def add_transactions_to_block(self, block, tx_list): [tx.rehash() for tx in tx_list] block.vtx.extend(tx_list) # this is a little handier to use than the version in blocktools.py def create_tx(self, spend, value, script=CScript([OP_TRUE])): tx = create_transaction(spend.tx, spend.n, b"", value, script) return tx def next_block(self, number, spend=None, script=CScript([OP_TRUE]), block_size=0, extra_sigops=0): if self.tip == None: base_block_hash = self.genesis_hash block_time = int(time.time()) + 1 else: base_block_hash = self.tip.sha256 block_time = self.tip.nTime + 1 # First create the coinbase height = self.block_heights[base_block_hash] + 1 coinbase = create_coinbase(height) coinbase.rehash() if spend == None: # We need to have something to spend to fill the block. assert_equal(block_size, 0) block = create_block(base_block_hash, coinbase, block_time) else: # all but one satoshi to fees coinbase.vout[0].nValue += spend.tx.vout[spend.n].nValue - 1 coinbase.rehash() block = create_block(base_block_hash, coinbase, block_time) # Make sure we have plenty engough to spend going forward. spendable_outputs = deque([spend]) def get_base_transaction(): # Create the new transaction tx = CTransaction() # Spend from one of the spendable outputs spend = spendable_outputs.popleft() tx.vin.append(CTxIn(COutPoint(spend.tx.sha256, spend.n))) # Add spendable outputs for i in range(4): tx.vout.append(CTxOut(0, CScript([OP_TRUE]))) spendable_outputs.append(PreviousSpendableOutput(tx, i)) return tx tx = get_base_transaction() # Make it the same format as transaction added for padding and save the size. # It's missing the padding output, so we add a constant to account for it. tx.rehash() base_tx_size = len(tx.serialize()) + 18 # If a specific script is required, add it. if script != None: tx.vout.append(CTxOut(1, script)) # Put some random data into the first transaction of the chain to randomize ids. tx.vout.append( CTxOut(0, CScript([random.randint(0, 256), OP_RETURN]))) # Add the transaction to the block self.add_transactions_to_block(block, [tx]) # If we have a block size requirement, just fill # the block until we get there current_block_size = len(block.serialize()) while current_block_size < block_size: # We will add a new transaction. That means the size of # the field enumerating how many transaction go in the block # may change. current_block_size -= len(ser_compact_size(len(block.vtx))) current_block_size += len(ser_compact_size(len(block.vtx) + 1)) # Create the new transaction tx = get_base_transaction() # Add padding to fill the block. script_length = block_size - current_block_size - base_tx_size if script_length > 510000: if script_length < 1000000: # Make sure we don't find ourselves in a position where we # need to generate a transaction smaller than what we expected. script_length = script_length // 2 else: script_length = 500000 tx_sigops = min(extra_sigops, script_length, MAX_TX_SIGOPS_COUNT) extra_sigops -= tx_sigops script_pad_len = script_length - tx_sigops script_output = CScript( [b'\x00' * script_pad_len] + [OP_CHECKSIG] * tx_sigops) tx.vout.append(CTxOut(0, script_output)) # Add the tx to the list of transactions to be included # in the block. self.add_transactions_to_block(block, [tx]) current_block_size += len(tx.serialize()) # Now that we added a bunch of transaction, we need to recompute # the merkle root. make_conform_to_ctor(block) block.hashMerkleRoot = block.calc_merkle_root() # Check that the block size is what's expected if block_size > 0: assert_equal(len(block.serialize()), block_size) # Do PoW, which is cheap on regnet block.solve() self.tip = block self.block_heights[block.sha256] = height assert number not in self.blocks self.blocks[number] = block return block def get_tests(self): node = self.nodes[0] self.genesis_hash = int(node.getbestblockhash(), 16) self.block_heights[self.genesis_hash] = 0 spendable_outputs = [] # save the current tip so it can be spent by a later block def save_spendable_output(): spendable_outputs.append(self.tip) # get an output that we previously marked as spendable def get_spendable_output(): return PreviousSpendableOutput(spendable_outputs.pop(0).vtx[0], 0) # returns a test case that asserts that the current tip was accepted def accepted(): return TestInstance([[self.tip, True]]) # returns a test case that asserts that the current tip was rejected def rejected(reject=None): if reject is None: return TestInstance([[self.tip, False]]) else: return TestInstance([[self.tip, reject]]) # move the tip back to a previous block def tip(number): self.tip = self.blocks[number] # adds transactions to the block and updates state def update_block(block_number, new_transactions): block = self.blocks[block_number] self.add_transactions_to_block(block, new_transactions) old_sha256 = block.sha256 make_conform_to_ctor(block) block.hashMerkleRoot = block.calc_merkle_root() block.solve() # Update the internal state just like in next_block self.tip = block if block.sha256 != old_sha256: self.block_heights[ block.sha256] = self.block_heights[old_sha256] del self.block_heights[old_sha256] self.blocks[block_number] = block return block # shorthand for functions block = self.next_block # Create a new block block(0) save_spendable_output() yield accepted() # Now we need that block to mature so we can spend the coinbase. test = TestInstance(sync_every_block=False) for i in range(99): block(5000 + i) test.blocks_and_transactions.append([self.tip, True]) save_spendable_output() yield test # collect spendable outputs now to avoid cluttering the code later on out = [] for i in range(100): out.append(get_spendable_output()) # Let's build some blocks and test them. for i in range(16): n = i + 1 block(n, spend=out[i], block_size=n * ONE_MEGABYTE) yield accepted() # block of maximal size block(17, spend=out[16], block_size=self.excessive_block_size) yield accepted() # Reject oversized blocks with bad-blk-length error block(18, spend=out[17], block_size=self.excessive_block_size + 1) yield rejected(RejectResult(16, b'bad-blk-length')) # Rewind bad block. tip(17) # Accept many sigops lots_of_checksigs = CScript( [OP_CHECKSIG] * MAX_BLOCK_SIGOPS_PER_MB) block(19, spend=out[17], script=lots_of_checksigs, block_size=ONE_MEGABYTE) yield accepted() block(20, spend=out[18], script=lots_of_checksigs, block_size=ONE_MEGABYTE, extra_sigops=1) yield rejected(RejectResult(16, b'bad-blk-sigops')) # Rewind bad block tip(19) # Accept 40k sigops per block > 1MB and <= 2MB block(21, spend=out[18], script=lots_of_checksigs, extra_sigops=MAX_BLOCK_SIGOPS_PER_MB, block_size=ONE_MEGABYTE + 1) yield accepted() # Accept 40k sigops per block > 1MB and <= 2MB block(22, spend=out[19], script=lots_of_checksigs, extra_sigops=MAX_BLOCK_SIGOPS_PER_MB, block_size=2 * ONE_MEGABYTE) yield accepted() # Reject more than 40k sigops per block > 1MB and <= 2MB. block(23, spend=out[20], script=lots_of_checksigs, extra_sigops=MAX_BLOCK_SIGOPS_PER_MB + 1, block_size=ONE_MEGABYTE + 1) yield rejected(RejectResult(16, b'bad-blk-sigops')) # Rewind bad block tip(22) # Reject more than 40k sigops per block > 1MB and <= 2MB. block(24, spend=out[20], script=lots_of_checksigs, extra_sigops=MAX_BLOCK_SIGOPS_PER_MB + 1, block_size=2 * ONE_MEGABYTE) yield rejected(RejectResult(16, b'bad-blk-sigops')) # Rewind bad block tip(22) # Accept 60k sigops per block > 2MB and <= 3MB block(25, spend=out[20], script=lots_of_checksigs, extra_sigops=2 * MAX_BLOCK_SIGOPS_PER_MB, block_size=2 * ONE_MEGABYTE + 1) yield accepted() # Accept 60k sigops per block > 2MB and <= 3MB block(26, spend=out[21], script=lots_of_checksigs, extra_sigops=2 * MAX_BLOCK_SIGOPS_PER_MB, block_size=3 * ONE_MEGABYTE) yield accepted() # Reject more than 40k sigops per block > 1MB and <= 2MB. block(27, spend=out[22], script=lots_of_checksigs, extra_sigops=2 * MAX_BLOCK_SIGOPS_PER_MB + 1, block_size=2 * ONE_MEGABYTE + 1) yield rejected(RejectResult(16, b'bad-blk-sigops')) # Rewind bad block tip(26) # Reject more than 40k sigops per block > 1MB and <= 2MB. block(28, spend=out[22], script=lots_of_checksigs, extra_sigops=2 * MAX_BLOCK_SIGOPS_PER_MB + 1, block_size=3 * ONE_MEGABYTE) yield rejected(RejectResult(16, b'bad-blk-sigops')) # Rewind bad block tip(26) # Too many sigops in one txn too_many_tx_checksigs = CScript( [OP_CHECKSIG] * (MAX_BLOCK_SIGOPS_PER_MB + 1)) block( 29, spend=out[22], script=too_many_tx_checksigs, block_size=ONE_MEGABYTE + 1) yield rejected(RejectResult(16, b'bad-txn-sigops')) # Rewind bad block tip(26) # Generate a key pair to test P2SH sigops count private_key = CECKey() private_key.set_secretbytes(b"fatstacks") public_key = private_key.get_pubkey() # P2SH # Build the redeem script, hash it, use hash to create the p2sh script redeem_script = CScript( [public_key] + [OP_2DUP, OP_CHECKSIGVERIFY] * 5 + [OP_CHECKSIG]) redeem_script_hash = hash160(redeem_script) p2sh_script = CScript([OP_HASH160, redeem_script_hash, OP_EQUAL]) # Create a p2sh transaction p2sh_tx = self.create_tx(out[22], 1, p2sh_script) # Add the transaction to the block block(30) update_block(30, [p2sh_tx]) yield accepted() # Creates a new transaction using the p2sh transaction included in the # last block def spend_p2sh_tx(output_script=CScript([OP_TRUE])): # Create the transaction spent_p2sh_tx = CTransaction() spent_p2sh_tx.vin.append(CTxIn(COutPoint(p2sh_tx.sha256, 0), b'')) spent_p2sh_tx.vout.append(CTxOut(1, output_script)) # Sign the transaction using the redeem script sighash = SignatureHashForkId( redeem_script, spent_p2sh_tx, 0, SIGHASH_ALL | SIGHASH_FORKID, p2sh_tx.vout[0].nValue) sig = private_key.sign(sighash) + \ bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID])) spent_p2sh_tx.vin[0].scriptSig = CScript([sig, redeem_script]) spent_p2sh_tx.rehash() return spent_p2sh_tx # Sigops p2sh limit p2sh_sigops_limit = MAX_BLOCK_SIGOPS_PER_MB - \ redeem_script.GetSigOpCount(True) # Too many sigops in one p2sh txn too_many_p2sh_sigops = CScript([OP_CHECKSIG] * (p2sh_sigops_limit + 1)) block(31, spend=out[23], block_size=ONE_MEGABYTE + 1) update_block(31, [spend_p2sh_tx(too_many_p2sh_sigops)]) yield rejected(RejectResult(16, b'bad-txn-sigops')) # Rewind bad block tip(30) # Max sigops in one p2sh txn max_p2sh_sigops = CScript([OP_CHECKSIG] * (p2sh_sigops_limit)) block(32, spend=out[23], block_size=ONE_MEGABYTE + 1) update_block(32, [spend_p2sh_tx(max_p2sh_sigops)]) yield accepted() # Submit a very large block via RPC large_block = block( 33, spend=out[24], block_size=self.excessive_block_size) node.submitblock(ToHex(large_block)) if __name__ == '__main__': FullBlockTest().main() diff --git a/test/functional/abc-parkedchain.py b/test/functional/abc-parkedchain.py index cc9f31235..41e54852d 100755 --- a/test/functional/abc-parkedchain.py +++ b/test/functional/abc-parkedchain.py @@ -1,192 +1,193 @@ #!/usr/bin/env python3 # Copyright (c) 2018 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the parckblock and unparkblock RPC calls.""" from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal, wait_until class ParkedChainTest(BitcoinTestFramework): def set_test_params(self): self.num_nodes = 2 self.extra_args = [["-noparkdeepreorg"], ["-maxreorgdepth=-1"]] # There should only be one chaintip, which is expected_tip def only_valid_tip(self, expected_tip, other_tip_status=None): node = self.nodes[0] assert_equal(node.getbestblockhash(), expected_tip) for tip in node.getchaintips(): if tip["hash"] == expected_tip: assert_equal(tip["status"], "active") else: assert_equal(tip["status"], other_tip_status) def run_test(self): def wait_for_tip(node, tip): def check_tip(): return node.getbestblockhash() == tip wait_until(check_tip) node = self.nodes[0] parking_node = self.nodes[1] self.log.info("Test chain parking...") node.generate(10) tip = node.getbestblockhash() node.generate(1) block_to_park = node.getbestblockhash() node.generate(10) parked_tip = node.getbestblockhash() # get parking_node caught up. # (probably not needed, but just in case parking can have race # condition like invalidateblock below) wait_for_tip(parking_node, parked_tip) # Let's park the chain. assert(parked_tip != tip) assert(block_to_park != tip) assert(block_to_park != parked_tip) node.parkblock(block_to_park) assert_equal(node.getbestblockhash(), tip) # When the chain is unparked, the node reorg into its original chain. node.unparkblock(parked_tip) assert_equal(node.getbestblockhash(), parked_tip) # Parking and then unparking a block should not change its validity, # and invaliding and reconsidering a block should not change its # parked state. See the following test cases: self.log.info("Test invalidate, park, unpark, reconsider...") node.generate(1) tip = node.getbestblockhash() node.generate(1) bad_tip = node.getbestblockhash() # Generate an extra block to check that children are invalidated as # expected and do not produce dangling chaintips node.generate(1) good_tip = node.getbestblockhash() # avoid race condition from parking_node requesting block when invalid wait_for_tip(parking_node, good_tip) node.invalidateblock(bad_tip) self.only_valid_tip(tip, other_tip_status="invalid") node.parkblock(bad_tip) self.only_valid_tip(tip, other_tip_status="invalid") node.unparkblock(bad_tip) self.only_valid_tip(tip, other_tip_status="invalid") node.reconsiderblock(bad_tip) self.only_valid_tip(good_tip) self.log.info("Test park, invalidate, reconsider, unpark") node.generate(1) tip = node.getbestblockhash() node.generate(1) bad_tip = node.getbestblockhash() node.generate(1) good_tip = node.getbestblockhash() # avoid race condition from parking_node requesting block when invalid wait_for_tip(parking_node, good_tip) node.parkblock(bad_tip) self.only_valid_tip(tip, other_tip_status="parked") node.invalidateblock(bad_tip) # NOTE: Intuitively, other_tip_status would be "invalid", but because # only valid (unparked) chains are walked, child blocks' statuses are # not updated, so the "parked" state remains. self.only_valid_tip(tip, other_tip_status="parked") node.reconsiderblock(bad_tip) self.only_valid_tip(tip, other_tip_status="parked") node.unparkblock(bad_tip) self.only_valid_tip(good_tip) self.log.info("Test invalidate, park, reconsider, unpark...") node.generate(1) tip = node.getbestblockhash() node.generate(1) bad_tip = node.getbestblockhash() node.generate(1) good_tip = node.getbestblockhash() # avoid race condition from parking_node requesting block when invalid wait_for_tip(parking_node, good_tip) node.invalidateblock(bad_tip) self.only_valid_tip(tip, other_tip_status="invalid") node.parkblock(bad_tip) self.only_valid_tip(tip, other_tip_status="invalid") node.reconsiderblock(bad_tip) self.only_valid_tip(tip, other_tip_status="parked") node.unparkblock(bad_tip) self.only_valid_tip(good_tip) self.log.info("Test park, invalidate, unpark, reconsider") node.generate(1) tip = node.getbestblockhash() node.generate(1) bad_tip = node.getbestblockhash() node.generate(1) good_tip = node.getbestblockhash() # avoid race condition from parking_node requesting block when invalid wait_for_tip(parking_node, good_tip) node.parkblock(bad_tip) self.only_valid_tip(tip, other_tip_status="parked") node.invalidateblock(bad_tip) # NOTE: Intuitively, other_tip_status would be "invalid", but because # only valid (unparked) chains are walked, child blocks' statuses are # not updated, so the "parked" state remains. self.only_valid_tip(tip, other_tip_status="parked") node.unparkblock(bad_tip) self.only_valid_tip(tip, other_tip_status="invalid") node.reconsiderblock(bad_tip) self.only_valid_tip(good_tip) # To get ready for next testset, make sure both nodes are in sync. wait_for_tip(parking_node, good_tip) assert_equal(node.getbestblockhash(), parking_node.getbestblockhash()) # Wait for node 1 to park the chain. def wait_for_parked_block(block): def check_block(): for tip in parking_node.getchaintips(): if tip["hash"] == block: assert(tip["status"] != "active") return tip["status"] == "parked" return False wait_until(check_block) def check_reorg_protection(depth, extra_blocks): - self.log.info("Test deep reorg parking, %d block deep" % depth) + self.log.info( + "Test deep reorg parking, {} block deep".format(depth)) # Invalidate the tip on node 0, so it doesn't follow node 1. node.invalidateblock(node.getbestblockhash()) # Mine block to create a fork of proper depth parking_node.generate(depth - 1) node.generate(depth) # extra block should now find themselves parked for i in range(extra_blocks): node.generate(1) wait_for_parked_block(node.getbestblockhash()) # If we mine one more block, the node reorgs. node.generate(1) wait_until(lambda: parking_node.getbestblockhash() == node.getbestblockhash()) check_reorg_protection(1, 0) check_reorg_protection(2, 0) check_reorg_protection(3, 1) check_reorg_protection(4, 4) check_reorg_protection(5, 5) check_reorg_protection(6, 6) check_reorg_protection(100, 100) if __name__ == '__main__': ParkedChainTest().main() diff --git a/test/functional/abc-replay-protection.py b/test/functional/abc-replay-protection.py index a6e3834f4..f2dc2e63e 100755 --- a/test/functional/abc-replay-protection.py +++ b/test/functional/abc-replay-protection.py @@ -1,308 +1,308 @@ #!/usr/bin/env python3 # Copyright (c) 2015-2016 The Bitcoin Core developers # Copyright (c) 2017 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ This test checks activation of UAHF and the different consensus related to this activation. It is derived from the much more complex p2p-fullblocktest. """ from test_framework.test_framework import ComparisonTestFramework from test_framework.util import assert_equal, assert_raises_rpc_error from test_framework.comptool import TestManager, TestInstance, RejectResult from test_framework.blocktools import * import time from test_framework.key import CECKey from test_framework.script import * # far into the future REPLAY_PROTECTION_START_TIME = 2000000000 # Error due to invalid signature INVALID_SIGNATURE_ERROR = b'mandatory-script-verify-flag-failed (Signature must be zero for failed CHECK(MULTI)SIG operation)' RPC_INVALID_SIGNATURE_ERROR = "16: " + \ INVALID_SIGNATURE_ERROR.decode("utf-8") class PreviousSpendableOutput(object): def __init__(self, tx=CTransaction(), n=-1): self.tx = tx self.n = n class ReplayProtectionTest(ComparisonTestFramework): def set_test_params(self): self.num_nodes = 1 self.setup_clean_chain = True self.block_heights = {} self.tip = None self.blocks = {} self.extra_args = [['-whitelist=127.0.0.1', - "-replayprotectionactivationtime=%d" % REPLAY_PROTECTION_START_TIME]] + "-replayprotectionactivationtime={}".format(REPLAY_PROTECTION_START_TIME)]] def run_test(self): self.test = TestManager(self, self.options.tmpdir) self.test.add_all_connections(self.nodes) network_thread_start() self.nodes[0].setmocktime(REPLAY_PROTECTION_START_TIME) self.test.run() def next_block(self, number): if self.tip == None: base_block_hash = self.genesis_hash block_time = int(time.time()) + 1 else: base_block_hash = self.tip.sha256 block_time = self.tip.nTime + 1 # First create the coinbase height = self.block_heights[base_block_hash] + 1 coinbase = create_coinbase(height) coinbase.rehash() block = create_block(base_block_hash, coinbase, block_time) # Do PoW, which is cheap on regnet block.solve() self.tip = block self.block_heights[block.sha256] = height assert number not in self.blocks self.blocks[number] = block return block def get_tests(self): self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16) self.block_heights[self.genesis_hash] = 0 spendable_outputs = [] # save the current tip so it can be spent by a later block def save_spendable_output(): spendable_outputs.append(self.tip) # get an output that we previously marked as spendable def get_spendable_output(): return PreviousSpendableOutput(spendable_outputs.pop(0).vtx[0], 0) # returns a test case that asserts that the current tip was accepted def accepted(): return TestInstance([[self.tip, True]]) # returns a test case that asserts that the current tip was rejected def rejected(reject=None): if reject is None: return TestInstance([[self.tip, False]]) else: return TestInstance([[self.tip, reject]]) # move the tip back to a previous block def tip(number): self.tip = self.blocks[number] # adds transactions to the block and updates state def update_block(block_number, new_transactions): block = self.blocks[block_number] block.vtx.extend(new_transactions) old_sha256 = block.sha256 make_conform_to_ctor(block) block.hashMerkleRoot = block.calc_merkle_root() block.solve() # Update the internal state just like in next_block self.tip = block if block.sha256 != old_sha256: self.block_heights[ block.sha256] = self.block_heights[old_sha256] del self.block_heights[old_sha256] self.blocks[block_number] = block return block # shorthand block = self.next_block node = self.nodes[0] # Create a new block block(0) save_spendable_output() yield accepted() # Now we need that block to mature so we can spend the coinbase. test = TestInstance(sync_every_block=False) for i in range(99): block(5000 + i) test.blocks_and_transactions.append([self.tip, True]) save_spendable_output() yield test # collect spendable outputs now to avoid cluttering the code later on out = [] for i in range(100): out.append(get_spendable_output()) # Generate a key pair to test P2SH sigops count private_key = CECKey() private_key.set_secretbytes(b"replayprotection") public_key = private_key.get_pubkey() # This is a little handier to use than the version in blocktools.py def create_fund_and_spend_tx(spend, forkvalue=0): # Fund transaction script = CScript([public_key, OP_CHECKSIG]) txfund = create_transaction( spend.tx, spend.n, b'', 50 * COIN, script) txfund.rehash() # Spend transaction txspend = CTransaction() txspend.vout.append(CTxOut(50 * COIN - 1000, CScript([OP_TRUE]))) txspend.vin.append(CTxIn(COutPoint(txfund.sha256, 0), b'')) # Sign the transaction sighashtype = (forkvalue << 8) | SIGHASH_ALL | SIGHASH_FORKID sighash = SignatureHashForkId( script, txspend, 0, sighashtype, 50 * COIN) sig = private_key.sign(sighash) + \ bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID])) txspend.vin[0].scriptSig = CScript([sig]) txspend.rehash() return [txfund, txspend] def send_transaction_to_mempool(tx): tx_id = node.sendrawtransaction(ToHex(tx)) assert(tx_id in set(node.getrawmempool())) return tx_id # Before the fork, no replay protection required to get in the mempool. txns = create_fund_and_spend_tx(out[0]) send_transaction_to_mempool(txns[0]) send_transaction_to_mempool(txns[1]) # And txns get mined in a block properly. block(1) update_block(1, txns) yield accepted() # Replay protected transactions are rejected. replay_txns = create_fund_and_spend_tx(out[1], 0xffdead) send_transaction_to_mempool(replay_txns[0]) assert_raises_rpc_error(-26, RPC_INVALID_SIGNATURE_ERROR, node.sendrawtransaction, ToHex(replay_txns[1])) # And block containing them are rejected as well. block(2) update_block(2, replay_txns) yield rejected(RejectResult(16, b'blk-bad-inputs')) # Rewind bad block tip(1) # Create a block that would activate the replay protection. bfork = block(5555) bfork.nTime = REPLAY_PROTECTION_START_TIME - 1 update_block(5555, []) yield accepted() for i in range(5): block(5100 + i) test.blocks_and_transactions.append([self.tip, True]) yield test # Check we are just before the activation time assert_equal(node.getblockheader(node.getbestblockhash())['mediantime'], REPLAY_PROTECTION_START_TIME - 1) # We are just before the fork, replay protected txns still are rejected assert_raises_rpc_error(-26, RPC_INVALID_SIGNATURE_ERROR, node.sendrawtransaction, ToHex(replay_txns[1])) block(3) update_block(3, replay_txns) yield rejected(RejectResult(16, b'blk-bad-inputs')) # Rewind bad block tip(5104) # Send some non replay protected txns in the mempool to check # they get cleaned at activation. txns = create_fund_and_spend_tx(out[2]) send_transaction_to_mempool(txns[0]) tx_id = send_transaction_to_mempool(txns[1]) # Activate the replay protection block(5556) yield accepted() # Check we just activated the replay protection assert_equal(node.getblockheader(node.getbestblockhash())['mediantime'], REPLAY_PROTECTION_START_TIME) # Non replay protected transactions are not valid anymore, # so they should be removed from the mempool. assert(tx_id not in set(node.getrawmempool())) # Good old transactions are now invalid. send_transaction_to_mempool(txns[0]) assert_raises_rpc_error(-26, RPC_INVALID_SIGNATURE_ERROR, node.sendrawtransaction, ToHex(txns[1])) # They also cannot be mined block(4) update_block(4, txns) yield rejected(RejectResult(16, b'blk-bad-inputs')) # Rewind bad block tip(5556) # The replay protected transaction is now valid replay_tx0_id = send_transaction_to_mempool(replay_txns[0]) replay_tx1_id = send_transaction_to_mempool(replay_txns[1]) # Make sure the transaction are ready to be mined. tmpl = node.getblocktemplate() found_id0 = False found_id1 = False for txn in tmpl['transactions']: txid = txn['txid'] if txid == replay_tx0_id: found_id0 = True elif txid == replay_tx1_id: found_id1 = True assert(found_id0 and found_id1) # And the mempool is still in good shape. assert(replay_tx0_id in set(node.getrawmempool())) assert(replay_tx1_id in set(node.getrawmempool())) # They also can also be mined b5 = block(5) update_block(5, replay_txns) yield accepted() # Ok, now we check if a reorg work properly accross the activation. postforkblockid = node.getbestblockhash() node.invalidateblock(postforkblockid) assert(replay_tx0_id in set(node.getrawmempool())) assert(replay_tx1_id in set(node.getrawmempool())) # Deactivating replay protection. forkblockid = node.getbestblockhash() node.invalidateblock(forkblockid) # The funding tx is not evicted from the mempool, since it's valid in # both sides of the fork assert(replay_tx0_id in set(node.getrawmempool())) assert(replay_tx1_id not in set(node.getrawmempool())) # Check that we also do it properly on deeper reorg. node.reconsiderblock(forkblockid) node.reconsiderblock(postforkblockid) node.invalidateblock(forkblockid) assert(replay_tx0_id in set(node.getrawmempool())) assert(replay_tx1_id not in set(node.getrawmempool())) if __name__ == '__main__': ReplayProtectionTest().main() diff --git a/test/functional/abc-rpc.py b/test/functional/abc-rpc.py index a5fbab9d5..21b43622b 100755 --- a/test/functional/abc-rpc.py +++ b/test/functional/abc-rpc.py @@ -1,81 +1,83 @@ #!/usr/bin/env python3 # Copyright (c) 2017 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. # Exercise the Bitcoin ABC RPC calls. import re from test_framework.test_framework import BitcoinTestFramework from test_framework.util import (assert_equal, assert_raises_rpc_error) from test_framework.cdefs import (ONE_MEGABYTE, LEGACY_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE) class ABC_RPC_Test (BitcoinTestFramework): def set_test_params(self): self.num_nodes = 1 self.tip = None self.setup_clean_chain = True self.extra_args = [['-norelaypriority', '-whitelist=127.0.0.1']] def check_subversion(self, pattern_str): # Check that the subversion is set as expected netinfo = self.nodes[0].getnetworkinfo() subversion = netinfo['subversion'] pattern = re.compile(pattern_str) assert(pattern.match(subversion)) def test_excessiveblock(self): # Check that we start with DEFAULT_MAX_BLOCK_SIZE getsize = self.nodes[0].getexcessiveblock() ebs = getsize['excessiveBlockSize'] assert_equal(ebs, DEFAULT_MAX_BLOCK_SIZE) # Check that setting to legacy size is ok self.nodes[0].setexcessiveblock(LEGACY_MAX_BLOCK_SIZE + 1) getsize = self.nodes[0].getexcessiveblock() ebs = getsize['excessiveBlockSize'] assert_equal(ebs, LEGACY_MAX_BLOCK_SIZE + 1) # Check that going below legacy size is not accepted - assert_raises_rpc_error(-8, "Invalid parameter, excessiveblock must be larger than %d" % - LEGACY_MAX_BLOCK_SIZE, self.nodes[0].setexcessiveblock, LEGACY_MAX_BLOCK_SIZE) + assert_raises_rpc_error(-8, + "Invalid parameter, excessiveblock must be larger than {}".format( + LEGACY_MAX_BLOCK_SIZE), + self.nodes[0].setexcessiveblock, LEGACY_MAX_BLOCK_SIZE) getsize = self.nodes[0].getexcessiveblock() ebs = getsize['excessiveBlockSize'] assert_equal(ebs, LEGACY_MAX_BLOCK_SIZE + 1) # Check setting to 2MB self.nodes[0].setexcessiveblock(2 * ONE_MEGABYTE) getsize = self.nodes[0].getexcessiveblock() ebs = getsize['excessiveBlockSize'] assert_equal(ebs, 2 * ONE_MEGABYTE) # Check for EB correctness in the subver string self.check_subversion("/Bitcoin ABC:.*\(EB2\.0; .*\)/") # Check setting to 13MB self.nodes[0].setexcessiveblock(13 * ONE_MEGABYTE) getsize = self.nodes[0].getexcessiveblock() ebs = getsize['excessiveBlockSize'] assert_equal(ebs, 13 * ONE_MEGABYTE) # Check for EB correctness in the subver string self.check_subversion("/Bitcoin ABC:.*\(EB13\.0; .*\)/") # Check setting to 13.14MB self.nodes[0].setexcessiveblock(13140000) getsize = self.nodes[0].getexcessiveblock() ebs = getsize['excessiveBlockSize'] assert_equal(ebs, 13.14 * ONE_MEGABYTE) # check for EB correctness in the subver string self.check_subversion("/Bitcoin ABC:.*\(EB13\.1; .*\)/") def run_test(self): self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16) self.test_excessiveblock() if __name__ == '__main__': ABC_RPC_Test().main()