Changeset View
Changeset View
Standalone View
Standalone View
test/functional/test_framework/mininode.py
#!/usr/bin/env python3 | #!/usr/bin/env python3 | ||||
# Copyright (c) 2010 ArtForz -- public domain half-a-node | # Copyright (c) 2010 ArtForz -- public domain half-a-node | ||||
# Copyright (c) 2012 Jeff Garzik | # Copyright (c) 2012 Jeff Garzik | ||||
# Copyright (c) 2010-2016 The Bitcoin Core developers | # Copyright (c) 2010-2016 The Bitcoin Core developers | ||||
# Distributed under the MIT software license, see the accompanying | # Distributed under the MIT software license, see the accompanying | ||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php. | # file COPYING or http://www.opensource.org/licenses/mit-license.php. | ||||
"""Bitcoin P2P network half-a-node. | """Bitcoin P2P network half-a-node. | ||||
This python code was modified from ArtForz' public domain half-a-node, as | This python code was modified from ArtForz' public domain half-a-node, as | ||||
found in the mini-node branch of http://github.com/jgarzik/pynode. | found in the mini-node branch of http://github.com/jgarzik/pynode. | ||||
P2PConnection: A low-level connection object to a node's P2P interface | P2PConnection: A low-level connection object to a node's P2P interface | ||||
P2PInterface: A high-level interface object for communicating to a node over P2P | P2PInterface: A high-level interface object for communicating to a node over P2P | ||||
P2PDataStore: A p2p interface class that keeps a store of transactions and blocks | P2PDataStore: A p2p interface class that keeps a store of transactions and blocks | ||||
and can respond correctly to getdata and getheaders messages""" | and can respond correctly to getdata and getheaders messages""" | ||||
import asyncore | import asyncio | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from io import BytesIO | from io import BytesIO | ||||
import logging | import logging | ||||
import socket | |||||
import struct | import struct | ||||
import sys | import sys | ||||
import threading | import threading | ||||
from test_framework.messages import ( | from test_framework.messages import ( | ||||
CBlockHeader, | CBlockHeader, | ||||
MIN_VERSION_SUPPORTED, | MIN_VERSION_SUPPORTED, | ||||
msg_addr, | msg_addr, | ||||
Show All 17 Lines | from test_framework.messages import ( | ||||
msg_sendcmpct, | msg_sendcmpct, | ||||
msg_sendheaders, | msg_sendheaders, | ||||
msg_tx, | msg_tx, | ||||
MSG_TX, | MSG_TX, | ||||
MSG_TYPE_MASK, | MSG_TYPE_MASK, | ||||
msg_verack, | msg_verack, | ||||
msg_version, | msg_version, | ||||
NODE_NETWORK, | NODE_NETWORK, | ||||
READ_BUFFER_SIZE, | |||||
sha256, | sha256, | ||||
) | ) | ||||
from test_framework.util import wait_until | from test_framework.util import wait_until | ||||
logger = logging.getLogger("TestFramework.mininode") | logger = logging.getLogger("TestFramework.mininode") | ||||
MESSAGEMAP = { | MESSAGEMAP = { | ||||
b"addr": msg_addr, | b"addr": msg_addr, | ||||
Show All 22 Lines | |||||
MAGIC_BYTES = { | MAGIC_BYTES = { | ||||
"mainnet": b"\xe3\xe1\xf3\xe8", | "mainnet": b"\xe3\xe1\xf3\xe8", | ||||
"testnet3": b"\xf4\xe5\xf3\xf4", | "testnet3": b"\xf4\xe5\xf3\xf4", | ||||
"regtest": b"\xda\xb5\xbf\xfa", | "regtest": b"\xda\xb5\xbf\xfa", | ||||
} | } | ||||
class P2PConnection(asyncore.dispatcher): | class P2PConnection(asyncio.Protocol): | ||||
"""A low-level connection object to a node's P2P interface. | """A low-level connection object to a node's P2P interface. | ||||
This class is responsible for: | This class is responsible for: | ||||
- opening and closing the TCP connection to the node | - opening and closing the TCP connection to the node | ||||
- reading bytes from and writing bytes to the socket | - reading bytes from and writing bytes to the socket | ||||
- deserializing and serializing the P2P message header | - deserializing and serializing the P2P message header | ||||
- logging messages as they are sent and received | - logging messages as they are sent and received | ||||
This class contains no logic for handing the P2P message payloads. It must be | This class contains no logic for handing the P2P message payloads. It must be | ||||
sub-classed and the on_message() callback overridden.""" | sub-classed and the on_message() callback overridden.""" | ||||
def __init__(self): | def __init__(self): | ||||
# All P2PConnections must be created before starting the NetworkThread. | # The underlying transport of the connection. | ||||
# assert that the network thread is not running. | # Should only call methods on this from the NetworkThread, c.f. call_soon_threadsafe | ||||
assert not network_thread_running() | self._transport = None | ||||
super().__init__(map=mininode_socket_map) | |||||
self._conn_open = False | |||||
@property | @property | ||||
def is_connected(self): | def is_connected(self): | ||||
return self._conn_open | return self._transport is not None | ||||
def peer_connect(self, dstaddr, dstport, net="regtest"): | def peer_connect(self, dstaddr, dstport, net="regtest"): | ||||
assert not self.is_connected | |||||
self.dstaddr = dstaddr | self.dstaddr = dstaddr | ||||
self.dstport = dstport | self.dstport = dstport | ||||
self.create_socket(socket.AF_INET, socket.SOCK_STREAM) | # The initial message to send after the connection was made: | ||||
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | self.on_connection_send_msg = None | ||||
self.sendbuf = b"" | self.on_connection_send_msg_is_raw = False | ||||
self.recvbuf = b"" | self.recvbuf = b"" | ||||
self._asyncore_pre_connection = True | |||||
self.network = net | self.network = net | ||||
self.disconnect = False | |||||
logger.debug('Connecting to Bitcoin Node: {}:{}'.format( | logger.debug('Connecting to Bitcoin Node: {}:{}'.format( | ||||
self.dstaddr, self.dstport)) | self.dstaddr, self.dstport)) | ||||
try: | loop = NetworkThread.network_event_loop | ||||
self.connect((dstaddr, dstport)) | conn_gen_unsafe = loop.create_connection( | ||||
except: | lambda: self, host=self.dstaddr, port=self.dstport) | ||||
self.handle_close() | |||||
def conn_gen(): return loop.call_soon_threadsafe( | |||||
loop.create_task, conn_gen_unsafe) | |||||
return conn_gen | |||||
def peer_disconnect(self): | def peer_disconnect(self): | ||||
# Connection could have already been closed by other end. | # Connection could have already been closed by other end. | ||||
if self.is_connected: | NetworkThread.network_event_loop.call_soon_threadsafe( | ||||
# Signal asyncore to disconnect | lambda: self._transport and self._transport.abort()) | ||||
self.disconnect = True | |||||
# Connection and disconnection methods | # Connection and disconnection methods | ||||
def handle_connect(self): | def connection_made(self, transport): | ||||
"""asyncore callback when a connection is opened.""" | """asyncio callback when a connection is opened.""" | ||||
if not self.is_connected: | assert not self._transport | ||||
logger.debug("Connected & Listening: {}:{}".format( | logger.debug("Connected & Listening: {}:{}".format( | ||||
self.dstaddr, self.dstport)) | self.dstaddr, self.dstport)) | ||||
self._conn_open = True | self._transport = transport | ||||
self._asyncore_pre_connection = False | if self.on_connection_send_msg: | ||||
if self.on_connection_send_msg_is_raw: | |||||
self.send_raw_message(self.on_connection_send_msg) | |||||
else: | |||||
self.send_message(self.on_connection_send_msg) | |||||
# Never used again | |||||
self.on_connection_send_msg = None | |||||
self.on_open() | self.on_open() | ||||
def handle_close(self): | def connection_lost(self, exc): | ||||
"""asyncore callback when a connection is closed.""" | """asyncio callback when a connection is closed.""" | ||||
logger.debug("Closing connection to: {}:{}".format( | if exc: | ||||
logger.warning("Connection lost to {}:{} due to {}".format( | |||||
self.dstaddr, self.dstport, exc)) | |||||
else: | |||||
logger.debug("Closed connection to: {}:{}".format( | |||||
self.dstaddr, self.dstport)) | self.dstaddr, self.dstport)) | ||||
self._conn_open = False | self._transport = None | ||||
self.recvbuf = b"" | self.recvbuf = b"" | ||||
self.sendbuf = b"" | |||||
try: | |||||
self.close() | |||||
except: | |||||
pass | |||||
self.on_close() | self.on_close() | ||||
# Socket read methods | # Socket read methods | ||||
def handle_read(self): | def data_received(self, t): | ||||
"""asyncore callback when data is read from the socket.""" | """asyncio callback when data is read from the socket.""" | ||||
with mininode_lock: | with mininode_lock: | ||||
t = self.recv(READ_BUFFER_SIZE) | |||||
if len(t) > 0: | if len(t) > 0: | ||||
self.recvbuf += t | self.recvbuf += t | ||||
while True: | while True: | ||||
msg = self._on_data() | msg = self._on_data() | ||||
if msg == None: | if msg == None: | ||||
break | break | ||||
self.on_message(msg) | self.on_message(msg) | ||||
Show All 36 Lines | def _on_data(self): | ||||
raise | raise | ||||
def on_message(self, message): | def on_message(self, message): | ||||
"""Callback for processing a P2P payload. Must be overridden by derived class.""" | """Callback for processing a P2P payload. Must be overridden by derived class.""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
# Socket write methods | # Socket write methods | ||||
def writable(self): | |||||
"""asyncore method to determine whether the handle_write() callback should be called on the next loop.""" | |||||
with mininode_lock: | |||||
length = len(self.sendbuf) | |||||
return length > 0 or self._asyncore_pre_connection | |||||
def handle_write(self): | |||||
"""asyncore callback when data should be written to the socket.""" | |||||
with mininode_lock: | |||||
# asyncore does not expose socket connection, only the first read/write | |||||
# event, thus we must check connection manually here to know when we | |||||
# actually connect | |||||
if self._asyncore_pre_connection: | |||||
self.handle_connect() | |||||
if not self.writable(): | |||||
return | |||||
try: | |||||
sent = self.send(self.sendbuf) | |||||
except: | |||||
self.handle_close() | |||||
return | |||||
self.sendbuf = self.sendbuf[sent:] | |||||
def send_message(self, message): | def send_message(self, message): | ||||
"""Send a P2P message over the socket. | """Send a P2P message over the socket. | ||||
This method takes a P2P payload, builds the P2P header and adds | This method takes a P2P payload, builds the P2P header and adds | ||||
the message to the send buffer to be sent over the socket.""" | the message to the send buffer to be sent over the socket.""" | ||||
if not self.is_connected: | if not self.is_connected: | ||||
raise IOError('Not connected') | raise IOError('Not connected') | ||||
self._log_message("send", message) | self._log_message("send", message) | ||||
tmsg = self._build_message(message) | tmsg = self._build_message(message) | ||||
self.send_raw_message(tmsg) | self.send_raw_message(tmsg) | ||||
def send_raw_message(self, tmsg): | def send_raw_message(self, tmsg): | ||||
"""Send any raw message over the socket. | """Send any raw message over the socket. | ||||
This method adds a raw message to the send buffer to be sent over the | This method adds a raw message to the send buffer to be sent over the | ||||
socket.""" | socket.""" | ||||
if not self.is_connected: | if not self.is_connected: | ||||
raise IOError('Not connected') | raise IOError('Not connected') | ||||
with mininode_lock: | NetworkThread.network_event_loop.call_soon_threadsafe( | ||||
if len(self.sendbuf) == 0: | lambda: self._transport and not self._transport.is_closing() and self._transport.write(tmsg)) | ||||
try: | |||||
sent = self.send(tmsg) | |||||
self.sendbuf = tmsg[sent:] | |||||
except BlockingIOError: | |||||
self.sendbuf = tmsg | |||||
else: | |||||
self.sendbuf += tmsg | |||||
# Class utility methods | # Class utility methods | ||||
def _build_message(self, message): | def _build_message(self, message): | ||||
"""Build a serialized P2P message""" | """Build a serialized P2P message""" | ||||
command = message.command | command = message.command | ||||
data = message.serialize() | data = message.serialize() | ||||
tmsg = MAGIC_BYTES[self.network] | tmsg = MAGIC_BYTES[self.network] | ||||
Show All 39 Lines | def __init__(self): | ||||
# A count of the number of ping messages we've sent to the node | # A count of the number of ping messages we've sent to the node | ||||
self.ping_counter = 1 | self.ping_counter = 1 | ||||
# The network services received from the peer | # The network services received from the peer | ||||
self.nServices = 0 | self.nServices = 0 | ||||
def peer_connect(self, *args, services=NODE_NETWORK, send_version=True, **kwargs): | def peer_connect(self, *args, services=NODE_NETWORK, send_version=True, **kwargs): | ||||
super().peer_connect(*args, **kwargs) | create_conn = super().peer_connect(*args, **kwargs) | ||||
if send_version: | if send_version: | ||||
# Send a version msg | # Send a version msg | ||||
vt = msg_version() | vt = msg_version() | ||||
vt.nServices = services | vt.nServices = services | ||||
vt.addrTo.ip = self.dstaddr | vt.addrTo.ip = self.dstaddr | ||||
vt.addrTo.port = self.dstport | vt.addrTo.port = self.dstport | ||||
vt.addrFrom.ip = "0.0.0.0" | vt.addrFrom.ip = "0.0.0.0" | ||||
vt.addrFrom.port = 0 | vt.addrFrom.port = 0 | ||||
# Will be sent right after handle_connect | # Will be sent soon after connection_made | ||||
self.sendbuf = self._build_message(vt) | self.on_connection_send_msg = vt | ||||
return create_conn | |||||
# Message receiving methods | # Message receiving methods | ||||
def on_message(self, message): | def on_message(self, message): | ||||
"""Receive message and dispatch message to appropriate callback. | """Receive message and dispatch message to appropriate callback. | ||||
We keep a count of how many of each message type has been received | We keep a count of how many of each message type has been received | ||||
and the most recent message of each type.""" | and the most recent message of each type.""" | ||||
▲ Show 20 Lines • Show All 134 Lines • ▼ Show 20 Lines | def sync_with_ping(self, timeout=60): | ||||
def test_function(): | def test_function(): | ||||
if not self.last_message.get("pong"): | if not self.last_message.get("pong"): | ||||
return False | return False | ||||
return self.last_message["pong"].nonce == self.ping_counter | return self.last_message["pong"].nonce == self.ping_counter | ||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | wait_until(test_function, timeout=timeout, lock=mininode_lock) | ||||
self.ping_counter += 1 | self.ping_counter += 1 | ||||
# Keep our own socket map for asyncore, so that we can track disconnects | # One lock for synchronizing all data access between the network event loop (see | ||||
# ourselves (to workaround an issue with closing an asyncore socket when | |||||
# using select) | |||||
mininode_socket_map = dict() | |||||
# One lock for synchronizing all data access between the networking thread (see | |||||
# NetworkThread below) and the thread running the test logic. For simplicity, | # NetworkThread below) and the thread running the test logic. For simplicity, | ||||
# P2PConnection acquires this lock whenever delivering a message to a P2PInterface, | # P2PConnection acquires this lock whenever delivering a message to a P2PInterface. | ||||
# and whenever adding anything to the send buffer (in send_message()). This | # This lock should be acquired in the thread running the test logic to synchronize | ||||
# lock should be acquired in the thread running the test logic to synchronize | |||||
# access to any data shared with the P2PInterface or P2PConnection. | # access to any data shared with the P2PInterface or P2PConnection. | ||||
mininode_lock = threading.RLock() | mininode_lock = threading.RLock() | ||||
class NetworkThread(threading.Thread): | class NetworkThread(threading.Thread): | ||||
network_event_loop = None | |||||
def __init__(self): | def __init__(self): | ||||
super().__init__(name="NetworkThread") | super().__init__(name="NetworkThread") | ||||
# There is only one event loop and no more than one thread must be created | |||||
assert not self.network_event_loop | |||||
def run(self): | NetworkThread.network_event_loop = asyncio.new_event_loop() | ||||
while mininode_socket_map: | |||||
# We check for whether to disconnect outside of the asyncore | |||||
# loop to workaround the behavior of asyncore when using | |||||
# select | |||||
disconnected = [] | |||||
for fd, obj in mininode_socket_map.items(): | |||||
if obj.disconnect: | |||||
disconnected.append(obj) | |||||
[obj.handle_close() for obj in disconnected] | |||||
asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) | |||||
logger.debug("Network thread closing") | |||||
def network_thread_start(): | def run(self): | ||||
"""Start the network thread.""" | """Start the network thread.""" | ||||
# Only one network thread may run at a time | self.network_event_loop.run_forever() | ||||
assert not network_thread_running() | |||||
NetworkThread().start() | |||||
def network_thread_running(): | |||||
"""Return whether the network thread is running.""" | |||||
return any([thread.name == "NetworkThread" for thread in threading.enumerate()]) | |||||
def network_thread_join(timeout=10): | |||||
"""Wait timeout seconds for the network thread to terminate. | |||||
Throw if the network thread doesn't terminate in timeout seconds.""" | def close(self, timeout=10): | ||||
network_threads = [ | """Close the connections and network event loop.""" | ||||
thread for thread in threading.enumerate() if thread.name == "NetworkThread"] | self.network_event_loop.call_soon_threadsafe( | ||||
assert len(network_threads) <= 1 | self.network_event_loop.stop) | ||||
for thread in network_threads: | wait_until(lambda: not self.network_event_loop.is_running(), | ||||
thread.join(timeout) | timeout=timeout) | ||||
assert not thread.is_alive() | self.network_event_loop.close() | ||||
self.join(timeout) | |||||
class P2PDataStore(P2PInterface): | class P2PDataStore(P2PInterface): | ||||
"""A P2P data store class. | """A P2P data store class. | ||||
Keeps a block and transaction store and responds correctly to getdata and getheaders requests.""" | Keeps a block and transaction store and responds correctly to getdata and getheaders requests.""" | ||||
def __init__(self): | def __init__(self): | ||||
▲ Show 20 Lines • Show All 124 Lines • Show Last 20 Lines |