Changeset View
Changeset View
Standalone View
Standalone View
test/functional/feature_dbcrash.py
Show All 40 Lines | class ChainstateWriteCrashTest(BitcoinTestFramework): | ||||
def set_test_params(self): | def set_test_params(self): | ||||
self.num_nodes = 4 | self.num_nodes = 4 | ||||
self.rpc_timeout = 480 | self.rpc_timeout = 480 | ||||
self.supports_cli = False | self.supports_cli = False | ||||
# Set -maxmempool=0 to turn off mempool memory sharing with dbcache | # Set -maxmempool=0 to turn off mempool memory sharing with dbcache | ||||
# Set -rpcservertimeout=900 to reduce socket disconnects in this | # Set -rpcservertimeout=900 to reduce socket disconnects in this | ||||
# long-running test | # long-running test | ||||
self.base_args = ["-limitdescendantsize=0", "-maxmempool=0", | self.base_args = [ | ||||
"-rpcservertimeout=900", "-dbbatchsize=200000", | "-limitdescendantsize=0", | ||||
"-noparkdeepreorg"] | "-maxmempool=0", | ||||
"-rpcservertimeout=900", | |||||
"-dbbatchsize=200000", | |||||
"-noparkdeepreorg", | |||||
] | |||||
# Set different crash ratios and cache sizes. Note that not all of | # Set different crash ratios and cache sizes. Note that not all of | ||||
# -dbcache goes to the in-memory coins cache. | # -dbcache goes to the in-memory coins cache. | ||||
self.node0_args = ["-dbcrashratio=8", "-dbcache=4"] + self.base_args | self.node0_args = ["-dbcrashratio=8", "-dbcache=4"] + self.base_args | ||||
self.node1_args = ["-dbcrashratio=16", "-dbcache=8"] + self.base_args | self.node1_args = ["-dbcrashratio=16", "-dbcache=8"] + self.base_args | ||||
self.node2_args = ["-dbcrashratio=24", "-dbcache=16"] + self.base_args | self.node2_args = ["-dbcrashratio=24", "-dbcache=16"] + self.base_args | ||||
# Node3 is a normal node with default args, except will mine full blocks | # Node3 is a normal node with default args, except will mine full blocks | ||||
# and non-standard txs (e.g. txs with "dust" outputs) | # and non-standard txs (e.g. txs with "dust" outputs) | ||||
self.node3_args = [ | self.node3_args = [ | ||||
f"-blockmaxsize={DEFAULT_MAX_BLOCK_SIZE}", | f"-blockmaxsize={DEFAULT_MAX_BLOCK_SIZE}", | ||||
"-acceptnonstdtxn"] | "-acceptnonstdtxn", | ||||
self.extra_args = [self.node0_args, self.node1_args, | ] | ||||
self.node2_args, self.node3_args] | self.extra_args = [ | ||||
self.node0_args, | |||||
self.node1_args, | |||||
self.node2_args, | |||||
self.node3_args, | |||||
] | |||||
def skip_test_if_missing_module(self): | def skip_test_if_missing_module(self): | ||||
self.skip_if_no_wallet() | self.skip_if_no_wallet() | ||||
def setup_network(self): | def setup_network(self): | ||||
self.add_nodes(self.num_nodes, extra_args=self.extra_args) | self.add_nodes(self.num_nodes, extra_args=self.extra_args) | ||||
self.start_nodes() | self.start_nodes() | ||||
self.import_deterministic_coinbase_privkeys() | self.import_deterministic_coinbase_privkeys() | ||||
# Leave them unconnected, we'll use submitblock directly in this test | # Leave them unconnected, we'll use submitblock directly in this test | ||||
def restart_node(self, node_index, expected_tip): | def restart_node(self, node_index, expected_tip): | ||||
"""Start up a given node id, wait for the tip to reach the given block hash, and calculate the utxo hash. | """Start up a given node id, wait for the tip to reach the given block hash, and calculate the utxo hash. | ||||
Exceptions on startup should indicate node crash (due to -dbcrashratio), in which case we try again. Give up | Exceptions on startup should indicate node crash (due to -dbcrashratio), in which case we try again. Give up | ||||
after 60 seconds. Returns the utxo hash of the given node.""" | after 60 seconds. Returns the utxo hash of the given node.""" | ||||
time_start = time.time() | time_start = time.time() | ||||
while time.time() - time_start < 120: | while time.time() - time_start < 120: | ||||
try: | try: | ||||
# Any of these RPC calls could throw due to node crash | # Any of these RPC calls could throw due to node crash | ||||
self.start_node(node_index) | self.start_node(node_index) | ||||
self.nodes[node_index].waitforblock(expected_tip) | self.nodes[node_index].waitforblock(expected_tip) | ||||
utxo_hash = self.nodes[node_index].gettxoutsetinfo()[ | utxo_hash = self.nodes[node_index].gettxoutsetinfo()["hash_serialized"] | ||||
'hash_serialized'] | |||||
return utxo_hash | return utxo_hash | ||||
except Exception: | except Exception: | ||||
# An exception here should mean the node is about to crash. | # An exception here should mean the node is about to crash. | ||||
# If bitcoind exits, then try again. wait_for_node_exit() | # If bitcoind exits, then try again. wait_for_node_exit() | ||||
# should raise an exception if bitcoind doesn't exit. | # should raise an exception if bitcoind doesn't exit. | ||||
self.wait_for_node_exit(node_index, timeout=15) | self.wait_for_node_exit(node_index, timeout=15) | ||||
self.crashed_on_restart += 1 | self.crashed_on_restart += 1 | ||||
time.sleep(1) | time.sleep(1) | ||||
# If we got here, bitcoind isn't coming back up on restart. Could be a | # If we got here, bitcoind isn't coming back up on restart. Could be a | ||||
# bug in bitcoind, or we've gotten unlucky with our dbcrash ratio -- | # bug in bitcoind, or we've gotten unlucky with our dbcrash ratio -- | ||||
# perhaps we generated a test case that blew up our cache? | # perhaps we generated a test case that blew up our cache? | ||||
# TODO: If this happens a lot, we should try to restart without -dbcrashratio | # TODO: If this happens a lot, we should try to restart without -dbcrashratio | ||||
# and make sure that recovery happens. | # and make sure that recovery happens. | ||||
raise AssertionError( | raise AssertionError( | ||||
f"Unable to successfully restart node {node_index} in allotted time") | f"Unable to successfully restart node {node_index} in allotted time" | ||||
) | |||||
def submit_block_catch_error(self, node_index, block): | def submit_block_catch_error(self, node_index, block): | ||||
"""Try submitting a block to the given node. | """Try submitting a block to the given node. | ||||
Catch any exceptions that indicate the node has crashed. | Catch any exceptions that indicate the node has crashed. | ||||
Returns true if the block was submitted successfully; false otherwise.""" | Returns true if the block was submitted successfully; false otherwise.""" | ||||
try: | try: | ||||
self.nodes[node_index].submitblock(block) | self.nodes[node_index].submitblock(block) | ||||
return True | return True | ||||
except (http.client.CannotSendRequest, http.client.RemoteDisconnected) as e: | except (http.client.CannotSendRequest, http.client.RemoteDisconnected) as e: | ||||
self.log.debug( | self.log.debug(f"node {node_index} submitblock raised exception: {e}") | ||||
f"node {node_index} submitblock raised exception: {e}") | |||||
return False | return False | ||||
except OSError as e: | except OSError as e: | ||||
self.log.debug( | self.log.debug( | ||||
f"node {node_index} submitblock raised OSError exception: errno={e.errno}") | f"node {node_index} submitblock raised OSError exception:" | ||||
f" errno={e.errno}" | |||||
) | |||||
if e.errno in [errno.EPIPE, errno.ECONNREFUSED, errno.ECONNRESET]: | if e.errno in [errno.EPIPE, errno.ECONNREFUSED, errno.ECONNRESET]: | ||||
# The node has likely crashed | # The node has likely crashed | ||||
return False | return False | ||||
else: | else: | ||||
# Unexpected exception, raise | # Unexpected exception, raise | ||||
raise | raise | ||||
def sync_node3blocks(self, block_hashes): | def sync_node3blocks(self, block_hashes): | ||||
"""Use submitblock to sync node3's chain with the other nodes | """Use submitblock to sync node3's chain with the other nodes | ||||
If submitblock fails, restart the node and get the new utxo hash. | If submitblock fails, restart the node and get the new utxo hash. | ||||
If any nodes crash while updating, we'll compare utxo hashes to | If any nodes crash while updating, we'll compare utxo hashes to | ||||
ensure recovery was successful.""" | ensure recovery was successful.""" | ||||
node3_utxo_hash = self.nodes[3].gettxoutsetinfo()['hash_serialized'] | node3_utxo_hash = self.nodes[3].gettxoutsetinfo()["hash_serialized"] | ||||
# Retrieve all the blocks from node3 | # Retrieve all the blocks from node3 | ||||
blocks = [] | blocks = [] | ||||
for block_hash in block_hashes: | for block_hash in block_hashes: | ||||
blocks.append( | blocks.append([block_hash, self.nodes[3].getblock(block_hash, False)]) | ||||
[block_hash, self.nodes[3].getblock(block_hash, False)]) | |||||
# Deliver each block to each other node | # Deliver each block to each other node | ||||
for i in range(3): | for i in range(3): | ||||
nodei_utxo_hash = None | nodei_utxo_hash = None | ||||
self.log.debug(f"Syncing blocks to node {i}") | self.log.debug(f"Syncing blocks to node {i}") | ||||
for (block_hash, block) in blocks: | for block_hash, block in blocks: | ||||
# Get the block from node3, and submit to node_i | # Get the block from node3, and submit to node_i | ||||
self.log.debug(f"submitting block {block_hash}") | self.log.debug(f"submitting block {block_hash}") | ||||
if not self.submit_block_catch_error(i, block): | if not self.submit_block_catch_error(i, block): | ||||
# TODO: more carefully check that the crash is due to -dbcrashratio | # TODO: more carefully check that the crash is due to -dbcrashratio | ||||
# (change the exit code perhaps, and check that here?) | # (change the exit code perhaps, and check that here?) | ||||
self.wait_for_node_exit(i, timeout=30) | self.wait_for_node_exit(i, timeout=30) | ||||
self.log.debug( | self.log.debug(f"Restarting node {i} after block hash {block_hash}") | ||||
f"Restarting node {i} after block hash {block_hash}") | |||||
nodei_utxo_hash = self.restart_node(i, block_hash) | nodei_utxo_hash = self.restart_node(i, block_hash) | ||||
assert nodei_utxo_hash is not None | assert nodei_utxo_hash is not None | ||||
self.restart_counts[i] += 1 | self.restart_counts[i] += 1 | ||||
else: | else: | ||||
# Clear it out after successful submitblock calls -- the cached | # Clear it out after successful submitblock calls -- the cached | ||||
# utxo hash will no longer be correct | # utxo hash will no longer be correct | ||||
nodei_utxo_hash = None | nodei_utxo_hash = None | ||||
# Check that the utxo hash matches node3's utxo set | # Check that the utxo hash matches node3's utxo set | ||||
# NOTE: we only check the utxo set if we had to restart the node | # NOTE: we only check the utxo set if we had to restart the node | ||||
# after the last block submitted: | # after the last block submitted: | ||||
# - checking the utxo hash causes a cache flush, which we don't | # - checking the utxo hash causes a cache flush, which we don't | ||||
# want to do every time; so | # want to do every time; so | ||||
# - we only update the utxo cache after a node restart, since flushing | # - we only update the utxo cache after a node restart, since flushing | ||||
# the cache is a no-op at that point | # the cache is a no-op at that point | ||||
if nodei_utxo_hash is not None: | if nodei_utxo_hash is not None: | ||||
self.log.debug( | self.log.debug(f"Checking txoutsetinfo matches for node {i}") | ||||
f"Checking txoutsetinfo matches for node {i}") | |||||
assert_equal(nodei_utxo_hash, node3_utxo_hash) | assert_equal(nodei_utxo_hash, node3_utxo_hash) | ||||
def verify_utxo_hash(self): | def verify_utxo_hash(self): | ||||
"""Verify that the utxo hash of each node matches node3. | """Verify that the utxo hash of each node matches node3. | ||||
Restart any nodes that crash while querying.""" | Restart any nodes that crash while querying.""" | ||||
node3_utxo_hash = self.nodes[3].gettxoutsetinfo()['hash_serialized'] | node3_utxo_hash = self.nodes[3].gettxoutsetinfo()["hash_serialized"] | ||||
self.log.info("Verifying utxo hash matches for all nodes") | self.log.info("Verifying utxo hash matches for all nodes") | ||||
for i in range(3): | for i in range(3): | ||||
try: | try: | ||||
nodei_utxo_hash = self.nodes[i].gettxoutsetinfo()[ | nodei_utxo_hash = self.nodes[i].gettxoutsetinfo()["hash_serialized"] | ||||
'hash_serialized'] | |||||
except OSError: | except OSError: | ||||
# probably a crash on db flushing | # probably a crash on db flushing | ||||
nodei_utxo_hash = self.restart_node( | nodei_utxo_hash = self.restart_node(i, self.nodes[3].getbestblockhash()) | ||||
i, self.nodes[3].getbestblockhash()) | |||||
assert_equal(nodei_utxo_hash, node3_utxo_hash) | assert_equal(nodei_utxo_hash, node3_utxo_hash) | ||||
def generate_small_transactions(self, node, count, utxo_list): | def generate_small_transactions(self, node, count, utxo_list): | ||||
FEE = 1000 # TODO: replace this with node relay fee based calculation | FEE = 1000 # TODO: replace this with node relay fee based calculation | ||||
num_transactions = 0 | num_transactions = 0 | ||||
random.shuffle(utxo_list) | random.shuffle(utxo_list) | ||||
while len(utxo_list) >= 2 and num_transactions < count: | while len(utxo_list) >= 2 and num_transactions < count: | ||||
tx = CTransaction() | tx = CTransaction() | ||||
input_amount = 0 | input_amount = 0 | ||||
for _ in range(2): | for _ in range(2): | ||||
utxo = utxo_list.pop() | utxo = utxo_list.pop() | ||||
tx.vin.append( | tx.vin.append(CTxIn(COutPoint(int(utxo["txid"], 16), utxo["vout"]))) | ||||
CTxIn(COutPoint(int(utxo['txid'], 16), utxo['vout']))) | input_amount += int(utxo["amount"] * XEC) | ||||
input_amount += int(utxo['amount'] * XEC) | |||||
output_amount = (input_amount - FEE) // 3 | output_amount = (input_amount - FEE) // 3 | ||||
if output_amount <= 0: | if output_amount <= 0: | ||||
# Sanity check -- if we chose inputs that are too small, skip | # Sanity check -- if we chose inputs that are too small, skip | ||||
continue | continue | ||||
for _ in range(3): | for _ in range(3): | ||||
tx.vout.append( | tx.vout.append( | ||||
CTxOut(output_amount, bytes.fromhex(utxo['scriptPubKey']))) | CTxOut(output_amount, bytes.fromhex(utxo["scriptPubKey"])) | ||||
) | |||||
# Sign and send the transaction to get into the mempool | # Sign and send the transaction to get into the mempool | ||||
tx_signed_hex = node.signrawtransactionwithwallet(ToHex(tx))['hex'] | tx_signed_hex = node.signrawtransactionwithwallet(ToHex(tx))["hex"] | ||||
node.sendrawtransaction(tx_signed_hex) | node.sendrawtransaction(tx_signed_hex) | ||||
num_transactions += 1 | num_transactions += 1 | ||||
def run_test(self): | def run_test(self): | ||||
# Track test coverage statistics | # Track test coverage statistics | ||||
self.restart_counts = [0, 0, 0] # Track the restarts for nodes 0-2 | self.restart_counts = [0, 0, 0] # Track the restarts for nodes 0-2 | ||||
self.crashed_on_restart = 0 # Track count of crashes during recovery | self.crashed_on_restart = 0 # Track count of crashes during recovery | ||||
# Start by creating a lot of utxos on node3 | # Start by creating a lot of utxos on node3 | ||||
initial_height = self.nodes[3].getblockcount() | initial_height = self.nodes[3].getblockcount() | ||||
utxo_list = create_confirmed_utxos( | utxo_list = create_confirmed_utxos( | ||||
self, self.nodes[3], 5000, sync_fun=self.no_op) | self, self.nodes[3], 5000, sync_fun=self.no_op | ||||
) | |||||
self.log.info(f"Prepped {len(utxo_list)} utxo entries") | self.log.info(f"Prepped {len(utxo_list)} utxo entries") | ||||
# Sync these blocks with the other nodes | # Sync these blocks with the other nodes | ||||
block_hashes_to_sync = [] | block_hashes_to_sync = [] | ||||
for height in range(initial_height + 1, | for height in range(initial_height + 1, self.nodes[3].getblockcount() + 1): | ||||
self.nodes[3].getblockcount() + 1): | |||||
block_hashes_to_sync.append(self.nodes[3].getblockhash(height)) | block_hashes_to_sync.append(self.nodes[3].getblockhash(height)) | ||||
self.log.debug( | self.log.debug(f"Syncing {len(block_hashes_to_sync)} blocks with other nodes") | ||||
f"Syncing {len(block_hashes_to_sync)} blocks with other nodes") | |||||
# Syncing the blocks could cause nodes to crash, so the test begins | # Syncing the blocks could cause nodes to crash, so the test begins | ||||
# here. | # here. | ||||
self.sync_node3blocks(block_hashes_to_sync) | self.sync_node3blocks(block_hashes_to_sync) | ||||
starting_tip_height = self.nodes[3].getblockcount() | starting_tip_height = self.nodes[3].getblockcount() | ||||
# Set mock time to the last block time. This will allow us to increase | # Set mock time to the last block time. This will allow us to increase | ||||
# the time at each loop so the block hash will always differ for the | # the time at each loop so the block hash will always differ for the | ||||
# same block height, and avoid duplication. | # same block height, and avoid duplication. | ||||
# Note that the current time can be behind the block time due to the | # Note that the current time can be behind the block time due to the | ||||
# way the miner sets the block time. | # way the miner sets the block time. | ||||
tip = self.nodes[3].getbestblockhash() | tip = self.nodes[3].getbestblockhash() | ||||
block_time = self.nodes[3].getblockheader(tip)['time'] | block_time = self.nodes[3].getblockheader(tip)["time"] | ||||
self.nodes[3].setmocktime(block_time) | self.nodes[3].setmocktime(block_time) | ||||
# Main test loop: | # Main test loop: | ||||
# each time through the loop, generate a bunch of transactions, | # each time through the loop, generate a bunch of transactions, | ||||
# and then either mine a single new block on the tip, or some-sized | # and then either mine a single new block on the tip, or some-sized | ||||
# reorg. | # reorg. | ||||
for i in range(40): | for i in range(40): | ||||
block_time += 10 | block_time += 10 | ||||
self.nodes[3].setmocktime(block_time) | self.nodes[3].setmocktime(block_time) | ||||
self.log.info( | self.log.info( | ||||
f"Iteration {i}, generating 2500 transactions {self.restart_counts}") | f"Iteration {i}, generating 2500 transactions {self.restart_counts}" | ||||
) | |||||
# Generate a bunch of small-ish transactions | # Generate a bunch of small-ish transactions | ||||
self.generate_small_transactions(self.nodes[3], 2500, utxo_list) | self.generate_small_transactions(self.nodes[3], 2500, utxo_list) | ||||
# Pick a random block between current tip, and starting tip | # Pick a random block between current tip, and starting tip | ||||
current_height = self.nodes[3].getblockcount() | current_height = self.nodes[3].getblockcount() | ||||
random_height = random.randint(starting_tip_height, current_height) | random_height = random.randint(starting_tip_height, current_height) | ||||
self.log.debug( | self.log.debug( | ||||
f"At height {current_height}, considering height {random_height}") | f"At height {current_height}, considering height {random_height}" | ||||
) | |||||
if random_height > starting_tip_height: | if random_height > starting_tip_height: | ||||
# Randomly reorg from this point with some probability (1/4 for | # Randomly reorg from this point with some probability (1/4 for | ||||
# tip, 1/5 for tip-1, ...) | # tip, 1/5 for tip-1, ...) | ||||
if random.random() < 1.0 / (current_height + 4 - random_height): | if random.random() < 1.0 / (current_height + 4 - random_height): | ||||
self.log.debug( | self.log.debug(f"Invalidating block at height {random_height}") | ||||
f"Invalidating block at height {random_height}") | |||||
self.nodes[3].invalidateblock( | self.nodes[3].invalidateblock( | ||||
self.nodes[3].getblockhash(random_height)) | self.nodes[3].getblockhash(random_height) | ||||
) | |||||
# Now generate new blocks until we pass the old tip height | # Now generate new blocks until we pass the old tip height | ||||
self.log.debug("Mining longer tip") | self.log.debug("Mining longer tip") | ||||
block_hashes = [] | block_hashes = [] | ||||
while current_height + 1 > self.nodes[3].getblockcount(): | while current_height + 1 > self.nodes[3].getblockcount(): | ||||
block_hashes.extend( | block_hashes.extend( | ||||
self.generatetoaddress( | self.generatetoaddress( | ||||
self.nodes[3], | self.nodes[3], | ||||
nblocks=min(10, current_height + 1 - | nblocks=min( | ||||
self.nodes[3].getblockcount()), | 10, current_height + 1 - self.nodes[3].getblockcount() | ||||
), | |||||
# new address to avoid mining a block that has just been | # new address to avoid mining a block that has just been | ||||
# invalidated | # invalidated | ||||
address=self.nodes[3].getnewaddress(), | address=self.nodes[3].getnewaddress(), | ||||
sync_fun=self.no_op, | sync_fun=self.no_op, | ||||
)) | ) | ||||
) | |||||
self.log.debug(f"Syncing {len(block_hashes)} new blocks...") | self.log.debug(f"Syncing {len(block_hashes)} new blocks...") | ||||
self.sync_node3blocks(block_hashes) | self.sync_node3blocks(block_hashes) | ||||
utxo_list = self.nodes[3].listunspent() | utxo_list = self.nodes[3].listunspent() | ||||
self.log.debug(f"Node3 utxo count: {len(utxo_list)}") | self.log.debug(f"Node3 utxo count: {len(utxo_list)}") | ||||
# Check that the utxo hashes agree with node3 | # Check that the utxo hashes agree with node3 | ||||
# Useful side effect: each utxo cache gets flushed here, so that we | # Useful side effect: each utxo cache gets flushed here, so that we | ||||
# won't get crashes on shutdown at the end of the test. | # won't get crashes on shutdown at the end of the test. | ||||
self.verify_utxo_hash() | self.verify_utxo_hash() | ||||
# Check the test coverage | # Check the test coverage | ||||
self.log.info( | self.log.info( | ||||
f"Restarted nodes: {self.restart_counts}; " | f"Restarted nodes: {self.restart_counts}; " | ||||
f"crashes on restart: {self.crashed_on_restart}") | f"crashes on restart: {self.crashed_on_restart}" | ||||
) | |||||
# If no nodes were restarted, we didn't test anything. | # If no nodes were restarted, we didn't test anything. | ||||
assert self.restart_counts != [0, 0, 0] | assert self.restart_counts != [0, 0, 0] | ||||
# Make sure we tested the case of crash-during-recovery. | # Make sure we tested the case of crash-during-recovery. | ||||
assert self.crashed_on_restart > 0 | assert self.crashed_on_restart > 0 | ||||
# Warn if any of the nodes escaped restart. | # Warn if any of the nodes escaped restart. | ||||
for i in range(3): | for i in range(3): | ||||
if self.restart_counts[i] == 0: | if self.restart_counts[i] == 0: | ||||
self.log.warning( | self.log.warning(f"Node {i} never crashed during utxo flush!") | ||||
f"Node {i} never crashed during utxo flush!") | |||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
ChainstateWriteCrashTest().main() | ChainstateWriteCrashTest().main() |