Changeset View
Changeset View
Standalone View
Standalone View
test/functional/test_framework/mininode.py
Show First 20 Lines • Show All 192 Lines • ▼ Show 20 Lines | def connection_lost(self, exc): | ||||
self._transport = None | self._transport = None | ||||
self.recvbuf = b"" | self.recvbuf = b"" | ||||
self.on_close() | self.on_close() | ||||
# Socket read methods | # Socket read methods | ||||
def data_received(self, t): | def data_received(self, t): | ||||
"""asyncio callback when data is read from the socket.""" | """asyncio callback when data is read from the socket.""" | ||||
with mininode_lock: | with p2p_lock: | ||||
if len(t) > 0: | if len(t) > 0: | ||||
self.recvbuf += t | self.recvbuf += t | ||||
while True: | while True: | ||||
msg = self._on_data() | msg = self._on_data() | ||||
if msg is None: | if msg is None: | ||||
break | break | ||||
self.on_message(msg) | self.on_message(msg) | ||||
def _on_data(self): | def _on_data(self): | ||||
"""Try to read P2P messages from the recv buffer. | """Try to read P2P messages from the recv buffer. | ||||
This method reads data from the buffer in a loop. It deserializes, | This method reads data from the buffer in a loop. It deserializes, | ||||
parses and verifies the P2P header, then passes the P2P payload to | parses and verifies the P2P header, then passes the P2P payload to | ||||
the on_message callback for processing.""" | the on_message callback for processing.""" | ||||
try: | try: | ||||
with mininode_lock: | with p2p_lock: | ||||
if len(self.recvbuf) < 4: | if len(self.recvbuf) < 4: | ||||
return None | return None | ||||
if self.recvbuf[:4] != self.magic_bytes: | if self.recvbuf[:4] != self.magic_bytes: | ||||
raise ValueError( | raise ValueError( | ||||
"magic bytes mismatch: {} != {}".format( | "magic bytes mismatch: {} != {}".format( | ||||
repr( | repr( | ||||
self.magic_bytes), repr( | self.magic_bytes), repr( | ||||
self.recvbuf))) | self.recvbuf))) | ||||
▲ Show 20 Lines • Show All 130 Lines • ▼ Show 20 Lines | class P2PInterface(P2PConnection): | ||||
# Message receiving methods | # Message receiving methods | ||||
def on_message(self, message): | def on_message(self, message): | ||||
"""Receive message and dispatch message to appropriate callback. | """Receive message and dispatch message to appropriate callback. | ||||
We keep a count of how many of each message type has been received | We keep a count of how many of each message type has been received | ||||
and the most recent message of each type.""" | and the most recent message of each type.""" | ||||
with mininode_lock: | with p2p_lock: | ||||
try: | try: | ||||
msgtype = message.msgtype.decode('ascii') | msgtype = message.msgtype.decode('ascii') | ||||
self.message_count[msgtype] += 1 | self.message_count[msgtype] += 1 | ||||
self.last_message[msgtype] = message | self.last_message[msgtype] = message | ||||
getattr(self, 'on_' + msgtype)(message) | getattr(self, 'on_' + msgtype)(message) | ||||
except Exception: | except Exception: | ||||
print("ERROR delivering {} ({})".format( | print("ERROR delivering {} ({})".format( | ||||
repr(message), sys.exc_info()[0])) | repr(message), sys.exc_info()[0])) | ||||
▲ Show 20 Lines • Show All 94 Lines • ▼ Show 20 Lines | class P2PInterface(P2PConnection): | ||||
def wait_until(self, test_function_in, *, timeout=60, | def wait_until(self, test_function_in, *, timeout=60, | ||||
check_connected=True): | check_connected=True): | ||||
def test_function(): | def test_function(): | ||||
if check_connected: | if check_connected: | ||||
assert self.is_connected | assert self.is_connected | ||||
return test_function_in() | return test_function_in() | ||||
wait_until(test_function, timeout=timeout, lock=mininode_lock, | wait_until(test_function, timeout=timeout, lock=p2p_lock, | ||||
timeout_factor=self.timeout_factor) | timeout_factor=self.timeout_factor) | ||||
def wait_for_disconnect(self, timeout=60): | def wait_for_disconnect(self, timeout=60): | ||||
def test_function(): return not self.is_connected | def test_function(): return not self.is_connected | ||||
self.wait_until(test_function, timeout=timeout, check_connected=False) | self.wait_until(test_function, timeout=timeout, check_connected=False) | ||||
# Message receiving helper methods | # Message receiving helper methods | ||||
▲ Show 20 Lines • Show All 91 Lines • ▼ Show 20 Lines | def sync_with_ping(self, timeout=60): | ||||
self.ping_counter += 1 | self.ping_counter += 1 | ||||
# One lock for synchronizing all data access between the networking thread (see | # One lock for synchronizing all data access between the networking thread (see | ||||
# NetworkThread below) and the thread running the test logic. For simplicity, | # NetworkThread below) and the thread running the test logic. For simplicity, | ||||
# P2PConnection acquires this lock whenever delivering a message to a P2PInterface. | # P2PConnection acquires this lock whenever delivering a message to a P2PInterface. | ||||
# This lock should be acquired in the thread running the test logic to synchronize | # This lock should be acquired in the thread running the test logic to synchronize | ||||
# access to any data shared with the P2PInterface or P2PConnection. | # access to any data shared with the P2PInterface or P2PConnection. | ||||
mininode_lock = threading.Lock() | p2p_lock = threading.Lock() | ||||
class NetworkThread(threading.Thread): | class NetworkThread(threading.Thread): | ||||
network_event_loop = None | network_event_loop = None | ||||
def __init__(self): | def __init__(self): | ||||
super().__init__(name="NetworkThread") | super().__init__(name="NetworkThread") | ||||
# There is only one event loop and no more than one thread must be | # There is only one event loop and no more than one thread must be | ||||
▲ Show 20 Lines • Show All 85 Lines • ▼ Show 20 Lines | def send_blocks_and_test(self, blocks, node, *, success=True, force_send=False, | ||||
- send a headers message for the final block | - send a headers message for the final block | ||||
- the on_getheaders handler will ensure that any getheaders are responded to | - the on_getheaders handler will ensure that any getheaders are responded to | ||||
- if force_send is False: wait for getdata for each of the blocks. The on_getdata handler will | - if force_send is False: wait for getdata for each of the blocks. The on_getdata handler will | ||||
ensure that any getdata messages are responded to. Otherwise send the full block unsolicited. | ensure that any getdata messages are responded to. Otherwise send the full block unsolicited. | ||||
- if success is True: assert that the node's tip advances to the most recent block | - if success is True: assert that the node's tip advances to the most recent block | ||||
- if success is False: assert that the node's tip doesn't advance | - if success is False: assert that the node's tip doesn't advance | ||||
- if reject_reason is set: assert that the correct reject message is logged""" | - if reject_reason is set: assert that the correct reject message is logged""" | ||||
with mininode_lock: | with p2p_lock: | ||||
for block in blocks: | for block in blocks: | ||||
self.block_store[block.sha256] = block | self.block_store[block.sha256] = block | ||||
self.last_block_hash = block.sha256 | self.last_block_hash = block.sha256 | ||||
def test(): | def test(): | ||||
if force_send: | if force_send: | ||||
for b in blocks: | for b in blocks: | ||||
self.send_message(msg_block(block=b)) | self.send_message(msg_block(block=b)) | ||||
Show All 29 Lines | def send_txs_and_test(self, txs, node, *, success=True, | ||||
"""Send txs to test node and test whether they're accepted to the mempool. | """Send txs to test node and test whether they're accepted to the mempool. | ||||
- add all txs to our tx_store | - add all txs to our tx_store | ||||
- send tx messages for all txs | - send tx messages for all txs | ||||
- if success is True/False: assert that the txs are/are not accepted to the mempool | - if success is True/False: assert that the txs are/are not accepted to the mempool | ||||
- if expect_disconnect is True: Skip the sync with ping | - if expect_disconnect is True: Skip the sync with ping | ||||
- if reject_reason is set: assert that the correct reject message is logged.""" | - if reject_reason is set: assert that the correct reject message is logged.""" | ||||
with mininode_lock: | with p2p_lock: | ||||
for tx in txs: | for tx in txs: | ||||
self.tx_store[tx.sha256] = tx | self.tx_store[tx.sha256] = tx | ||||
def test(): | def test(): | ||||
for tx in txs: | for tx in txs: | ||||
self.send_message(msg_tx(tx)) | self.send_message(msg_tx(tx)) | ||||
if expect_disconnect: | if expect_disconnect: | ||||
Show All 32 Lines | def on_inv(self, message): | ||||
super().on_inv(message) | super().on_inv(message) | ||||
# Store how many times invs have been received for each tx. | # Store how many times invs have been received for each tx. | ||||
for i in message.inv: | for i in message.inv: | ||||
if i.type == MSG_TX: | if i.type == MSG_TX: | ||||
# save txid | # save txid | ||||
self.tx_invs_received[i.hash] += 1 | self.tx_invs_received[i.hash] += 1 | ||||
def get_invs(self): | def get_invs(self): | ||||
with mininode_lock: | with p2p_lock: | ||||
return list(self.tx_invs_received.keys()) | return list(self.tx_invs_received.keys()) | ||||
def wait_for_broadcast(self, txns, timeout=60): | def wait_for_broadcast(self, txns, timeout=60): | ||||
"""Waits for the txns (list of txids) to complete initial broadcast. | """Waits for the txns (list of txids) to complete initial broadcast. | ||||
The mempool should mark unbroadcast=False for these transactions. | The mempool should mark unbroadcast=False for these transactions. | ||||
""" | """ | ||||
# Wait until invs have been received (and getdatas sent) for each txid. | # Wait until invs have been received (and getdatas sent) for each txid. | ||||
self.wait_until(lambda: set(self.tx_invs_received.keys()) == set( | self.wait_until(lambda: set(self.tx_invs_received.keys()) == set( | ||||
[int(tx, 16) for tx in txns]), timeout=timeout) | [int(tx, 16) for tx in txns]), timeout=timeout) | ||||
# Flush messages and wait for the getdatas to be processed | # Flush messages and wait for the getdatas to be processed | ||||
self.sync_with_ping() | self.sync_with_ping() |