Changeset View
Changeset View
Standalone View
Standalone View
test/functional/example_test.py
Show First 20 Lines • Show All 59 Lines • ▼ Show 20 Lines | def on_inv(self, message): | ||||
pass | pass | ||||
def custom_function(): | def custom_function(): | ||||
"""Do some custom behaviour | """Do some custom behaviour | ||||
If this function is more generally useful for other tests, consider | If this function is more generally useful for other tests, consider | ||||
moving it to a module in test_framework.""" | moving it to a module in test_framework.""" | ||||
# self.log.info("running custom_function") # Oops! Can't run self.log outside the BitcoinTestFramework | # self.log.info("running custom_function") # Oops! Can't run self.log | ||||
# outside the BitcoinTestFramework | |||||
pass | pass | ||||
class ExampleTest(BitcoinTestFramework): | class ExampleTest(BitcoinTestFramework): | ||||
# Each functional test is a subclass of the BitcoinTestFramework class. | # Each functional test is a subclass of the BitcoinTestFramework class. | ||||
# Override the set_test_params(), skip_test_if_missing_module(), add_options(), setup_chain(), setup_network() | # Override the set_test_params(), skip_test_if_missing_module(), add_options(), setup_chain(), setup_network() | ||||
# and setup_nodes() methods to customize the test setup as required. | # and setup_nodes() methods to customize the test setup as required. | ||||
def set_test_params(self): | def set_test_params(self): | ||||
"""Override test parameters for your individual test. | """Override test parameters for your individual test. | ||||
This method must be overridden and num_nodes must be exlicitly set.""" | This method must be overridden and num_nodes must be exlicitly set.""" | ||||
self.setup_clean_chain = True | self.setup_clean_chain = True | ||||
self.num_nodes = 3 | self.num_nodes = 3 | ||||
# Use self.extra_args to change command-line arguments for the nodes | # Use self.extra_args to change command-line arguments for the nodes | ||||
self.extra_args = [[], ["-logips"], []] | self.extra_args = [[], ["-logips"], []] | ||||
# self.log.info("I've finished set_test_params") # Oops! Can't run self.log before run_test() | # self.log.info("I've finished set_test_params") # Oops! Can't run | ||||
# self.log before run_test() | |||||
def skip_test_if_missing_module(self): | def skip_test_if_missing_module(self): | ||||
self.skip_if_no_wallet() | self.skip_if_no_wallet() | ||||
# Use add_options() to add specific command-line options for your test. | # Use add_options() to add specific command-line options for your test. | ||||
# In practice this is not used very much, since the tests are mostly written | # In practice this is not used very much, since the tests are mostly written | ||||
# to be run in automated environments without command-line options. | # to be run in automated environments without command-line options. | ||||
# def add_options() | # def add_options() | ||||
Show All 34 Lines | def custom_method(self): | ||||
If you think it's useful in general, consider moving it to the base | If you think it's useful in general, consider moving it to the base | ||||
BitcoinTestFramework class so other tests can use it.""" | BitcoinTestFramework class so other tests can use it.""" | ||||
self.log.info("Running custom_method") | self.log.info("Running custom_method") | ||||
def run_test(self): | def run_test(self): | ||||
"""Main test logic""" | """Main test logic""" | ||||
# Create P2P connections will wait for a verack to make sure the connection is fully up | # Create P2P connections will wait for a verack to make sure the | ||||
# connection is fully up | |||||
self.nodes[0].add_p2p_connection(BaseNode()) | self.nodes[0].add_p2p_connection(BaseNode()) | ||||
# Generating a block on one of the nodes will get us out of IBD | # Generating a block on one of the nodes will get us out of IBD | ||||
blocks = [int(self.nodes[0].generate(nblocks=1)[0], 16)] | blocks = [int(self.nodes[0].generate(nblocks=1)[0], 16)] | ||||
self.sync_all([self.nodes[0:1]]) | self.sync_all([self.nodes[0:1]]) | ||||
# Notice above how we called an RPC by calling a method with the same | # Notice above how we called an RPC by calling a method with the same | ||||
# name on the node object. Notice also how we used a keyword argument | # name on the node object. Notice also how we used a keyword argument | ||||
Show All 22 Lines | def run_test(self): | ||||
for i in range(10): | for i in range(10): | ||||
# Use the mininode and blocktools functionality to manually build a block | # Use the mininode and blocktools functionality to manually build a block | ||||
# Calling the generate() rpc is easier, but this allows us to exactly | # Calling the generate() rpc is easier, but this allows us to exactly | ||||
# control the blocks and transactions. | # control the blocks and transactions. | ||||
block = create_block( | block = create_block( | ||||
self.tip, create_coinbase(height), self.block_time) | self.tip, create_coinbase(height), self.block_time) | ||||
block.solve() | block.solve() | ||||
block_message = msg_block(block) | block_message = msg_block(block) | ||||
# Send message is used to send a P2P message to the node over our P2PInterface | # Send message is used to send a P2P message to the node over our | ||||
# P2PInterface | |||||
self.nodes[0].p2p.send_message(block_message) | self.nodes[0].p2p.send_message(block_message) | ||||
self.tip = block.sha256 | self.tip = block.sha256 | ||||
blocks.append(self.tip) | blocks.append(self.tip) | ||||
self.block_time += 1 | self.block_time += 1 | ||||
height += 1 | height += 1 | ||||
self.log.info( | self.log.info( | ||||
"Wait for node1 to reach current tip (height 11) using RPC") | "Wait for node1 to reach current tip (height 11) using RPC") | ||||
Show All 18 Lines | def run_test(self): | ||||
# wait_until() will loop until a predicate condition is met. Use it to test properties of the | # wait_until() will loop until a predicate condition is met. Use it to test properties of the | ||||
# P2PInterface objects. | # P2PInterface objects. | ||||
wait_until(lambda: sorted(blocks) == sorted( | wait_until(lambda: sorted(blocks) == sorted( | ||||
list(self.nodes[2].p2p.block_receive_map.keys())), timeout=5, lock=mininode_lock) | list(self.nodes[2].p2p.block_receive_map.keys())), timeout=5, lock=mininode_lock) | ||||
self.log.info("Check that each block was received only once") | self.log.info("Check that each block was received only once") | ||||
# The network thread uses a global lock on data access to the P2PConnection objects when sending and receiving | # The network thread uses a global lock on data access to the P2PConnection objects when sending and receiving | ||||
# messages. The test thread should acquire the global lock before accessing any P2PConnection data to avoid locking | # messages. The test thread should acquire the global lock before accessing any P2PConnection data to avoid locking | ||||
# and synchronization issues. Note wait_until() acquires this global lock when testing the predicate. | # and synchronization issues. Note wait_until() acquires this global | ||||
# lock when testing the predicate. | |||||
with mininode_lock: | with mininode_lock: | ||||
for block in self.nodes[2].p2p.block_receive_map.values(): | for block in self.nodes[2].p2p.block_receive_map.values(): | ||||
assert_equal(block, 1) | assert_equal(block, 1) | ||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
ExampleTest().main() | ExampleTest().main() |