Changeset View
Changeset View
Standalone View
Standalone View
test/functional/test_framework/mininode.py
Show All 17 Lines | |||||
from io import BytesIO | from io import BytesIO | ||||
import logging | import logging | ||||
import socket | import socket | ||||
import struct | import struct | ||||
import sys | import sys | ||||
from threading import RLock, Thread | from threading import RLock, Thread | ||||
from test_framework.messages import * | from test_framework.messages import * | ||||
from test_framework.util import wait_until | |||||
logger = logging.getLogger("TestFramework.mininode") | logger = logging.getLogger("TestFramework.mininode") | ||||
MESSAGEMAP = { | MESSAGEMAP = { | ||||
b"addr": msg_addr, | b"addr": msg_addr, | ||||
b"block": msg_block, | b"block": msg_block, | ||||
b"blocktxn": msg_blocktxn, | b"blocktxn": msg_blocktxn, | ||||
b"cmpctblock": msg_cmpctblock, | b"cmpctblock": msg_cmpctblock, | ||||
Show All 19 Lines | |||||
MAGIC_BYTES = { | MAGIC_BYTES = { | ||||
"mainnet": b"\xe3\xe1\xf3\xe8", | "mainnet": b"\xe3\xe1\xf3\xe8", | ||||
"testnet3": b"\xf4\xe5\xf3\xf4", | "testnet3": b"\xf4\xe5\xf3\xf4", | ||||
"regtest": b"\xda\xb5\xbf\xfa", | "regtest": b"\xda\xb5\xbf\xfa", | ||||
} | } | ||||
class NodeConn(asyncore.dispatcher): | class NodeConn(asyncore.dispatcher): | ||||
"""The actual NodeConn class | """A low-level connection object to a node's P2P interface. | ||||
This class provides an interface for a p2p connection to a specified node.""" | This class is responsible for: | ||||
def __init__(self, dstaddr, dstport, callback, net="regtest", services=NODE_NETWORK, send_version=True): | - opening and closing the TCP connection to the node | ||||
asyncore.dispatcher.__init__(self, map=mininode_socket_map) | - reading bytes from and writing bytes to the socket | ||||
- deserializing and serializing the P2P message header | |||||
- logging messages as they are sent and received | |||||
This class contains no logic for handing the P2P message payloads. It must be | |||||
sub-classed and the on_message() callback overridden. | |||||
TODO: rename this class P2PConnection.""" | |||||
def __init__(self): | |||||
super().__init__(map=mininode_socket_map) | |||||
def peer_connect(self, dstaddr, dstport, net="regtest", services=NODE_NETWORK, send_version=True): | |||||
self.dstaddr = dstaddr | self.dstaddr = dstaddr | ||||
self.dstport = dstport | self.dstport = dstport | ||||
self.create_socket(socket.AF_INET, socket.SOCK_STREAM) | self.create_socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | ||||
self.sendbuf = b"" | self.sendbuf = b"" | ||||
self.recvbuf = b"" | self.recvbuf = b"" | ||||
self.state = "connecting" | self.state = "connecting" | ||||
self.network = net | self.network = net | ||||
self.cb = callback | |||||
self.disconnect = False | self.disconnect = False | ||||
self.nServices = 0 | |||||
if send_version: | if send_version: | ||||
# stuff version msg into sendbuf | # stuff version msg into sendbuf | ||||
vt = msg_version() | vt = msg_version() | ||||
vt.nServices = services | vt.nServices = services | ||||
vt.addrTo.ip = self.dstaddr | vt.addrTo.ip = self.dstaddr | ||||
vt.addrTo.port = self.dstport | vt.addrTo.port = self.dstport | ||||
vt.addrFrom.ip = "0.0.0.0" | vt.addrFrom.ip = "0.0.0.0" | ||||
vt.addrFrom.port = 0 | vt.addrFrom.port = 0 | ||||
self.send_message(vt, True) | self.send_message(vt, True) | ||||
logger.info('Connecting to Bitcoin Node: %s:%d' % | logger.info('Connecting to Bitcoin Node: %s:%d' % | ||||
(self.dstaddr, self.dstport)) | (self.dstaddr, self.dstport)) | ||||
try: | try: | ||||
self.connect((dstaddr, dstport)) | self.connect((dstaddr, dstport)) | ||||
except: | except: | ||||
self.handle_close() | self.handle_close() | ||||
def peer_disconnect(self): | |||||
# Connection could have already been closed by other end. | |||||
if self.state == "connected": | |||||
self.disconnect_node() | |||||
# Connection and disconnection methods | # Connection and disconnection methods | ||||
def handle_connect(self): | def handle_connect(self): | ||||
"""asyncore callback when a connection is opened.""" | """asyncore callback when a connection is opened.""" | ||||
if self.state != "connected": | if self.state != "connected": | ||||
logger.debug("Connected & Listening: %s:%d" % | logger.debug("Connected & Listening: %s:%d" % | ||||
(self.dstaddr, self.dstport)) | (self.dstaddr, self.dstport)) | ||||
self.state = "connected" | self.state = "connected" | ||||
self.cb.on_open(self) | self.on_open() | ||||
def handle_close(self): | def handle_close(self): | ||||
"""asyncore callback when a connection is closed.""" | """asyncore callback when a connection is closed.""" | ||||
logger.debug("Closing connection to: %s:%d" % | logger.debug("Closing connection to: %s:%d" % | ||||
(self.dstaddr, self.dstport)) | (self.dstaddr, self.dstport)) | ||||
self.state = "closed" | self.state = "closed" | ||||
self.recvbuf = b"" | self.recvbuf = b"" | ||||
self.sendbuf = b"" | self.sendbuf = b"" | ||||
try: | try: | ||||
self.close() | self.close() | ||||
except: | except: | ||||
pass | pass | ||||
self.cb.on_close(self) | self.on_close() | ||||
def disconnect_node(self): | def disconnect_node(self): | ||||
"""Disconnect the p2p connection. | """Disconnect the p2p connection. | ||||
Called by the test logic thread. Causes the p2p connection | Called by the test logic thread. Causes the p2p connection | ||||
to be disconnected on the next iteration of the asyncore loop.""" | to be disconnected on the next iteration of the asyncore loop.""" | ||||
self.disconnect = True | self.disconnect = True | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | def _on_data(self): | ||||
m.deserialize(f) | m.deserialize(f) | ||||
self._log_message("receive", m) | self._log_message("receive", m) | ||||
return m | return m | ||||
except Exception as e: | except Exception as e: | ||||
logger.exception('Error reading message:', repr(e)) | logger.exception('Error reading message:', repr(e)) | ||||
raise | raise | ||||
def on_message(self, message): | def on_message(self, message): | ||||
"""Callback for processing a P2P payload. Calls into NodeConnCB.""" | """Callback for processing a P2P payload. Must be overridden by derived class.""" | ||||
self.cb.on_message(self, message) | raise NotImplementedError | ||||
# Socket write methods | # Socket write methods | ||||
def writable(self): | def writable(self): | ||||
"""asyncore method to determine whether the handle_write() callback should be called on the next loop.""" | """asyncore method to determine whether the handle_write() callback should be called on the next loop.""" | ||||
with mininode_lock: | with mininode_lock: | ||||
pre_connection = self.state == "connecting" | pre_connection = self.state == "connecting" | ||||
length = len(self.sendbuf) | length = len(self.sendbuf) | ||||
▲ Show 20 Lines • Show All 55 Lines • ▼ Show 20 Lines | def _log_message(self, direction, msg): | ||||
log_message = "Received message from " | log_message = "Received message from " | ||||
log_message += "%s:%d: %s" % (self.dstaddr, | log_message += "%s:%d: %s" % (self.dstaddr, | ||||
self.dstport, repr(msg)[:500]) | self.dstport, repr(msg)[:500]) | ||||
if len(log_message) > 500: | if len(log_message) > 500: | ||||
log_message += "... (msg truncated)" | log_message += "... (msg truncated)" | ||||
logger.debug(log_message) | logger.debug(log_message) | ||||
class NodeConnCB(): | class NodeConnCB(NodeConn): | ||||
"""Callback and helper functions for P2P connection to a bitcoind node. | """A high-level P2P interface class for communicating with a Bitcoin Cash node. | ||||
This class provides high-level callbacks for processing P2P message | |||||
payloads, as well as convenience methods for interacting with the | |||||
node over P2P. | |||||
Individual testcases should subclass this and override the on_* methods | Individual testcases should subclass this and override the on_* methods | ||||
if they want to alter message handling behaviour.""" | if they want to alter message handling behaviour. | ||||
TODO: rename this class P2PInterface""" | |||||
def __init__(self): | def __init__(self): | ||||
# Track whether we have a P2P connection open to the node | super().__init__() | ||||
self.connected = False | |||||
self.connection = None | |||||
# Track number of messages of each type received and the most recent | # Track number of messages of each type received and the most recent | ||||
# message of each type | # message of each type | ||||
self.message_count = defaultdict(int) | self.message_count = defaultdict(int) | ||||
self.last_message = {} | self.last_message = {} | ||||
# A count of the number of ping messages we've sent to the node | # A count of the number of ping messages we've sent to the node | ||||
self.ping_counter = 1 | self.ping_counter = 1 | ||||
# The network services received from the peer | |||||
self.nServices = 0 | |||||
# Message receiving methods | # Message receiving methods | ||||
def on_message(self, conn, message): | def on_message(self, message): | ||||
"""Receive message and dispatch message to appropriate callback. | """Receive message and dispatch message to appropriate callback. | ||||
We keep a count of how many of each message type has been received | We keep a count of how many of each message type has been received | ||||
and the most recent message of each type.""" | and the most recent message of each type.""" | ||||
with mininode_lock: | with mininode_lock: | ||||
try: | try: | ||||
command = message.command.decode('ascii') | command = message.command.decode('ascii') | ||||
self.message_count[command] += 1 | self.message_count[command] += 1 | ||||
self.last_message[command] = message | self.last_message[command] = message | ||||
getattr(self, 'on_' + command)(conn, message) | getattr(self, 'on_' + command)(message) | ||||
except: | except: | ||||
print("ERROR delivering %s (%s)" % (repr(message), | print("ERROR delivering %s (%s)" % (repr(message), | ||||
sys.exc_info()[0])) | sys.exc_info()[0])) | ||||
raise | raise | ||||
# Callback methods. Can be overridden by subclasses in individual test | # Callback methods. Can be overridden by subclasses in individual test | ||||
# cases to provide custom message handling behaviour. | # cases to provide custom message handling behaviour. | ||||
def on_open(self, conn): | def on_open(self): | ||||
self.connected = True | pass | ||||
def on_close(self, conn): | def on_close(self): | ||||
self.connected = False | pass | ||||
self.connection = None | |||||
def on_addr(self, conn, message): pass | def on_addr(self, message): pass | ||||
def on_block(self, conn, message): pass | def on_block(self, message): pass | ||||
def on_blocktxn(self, conn, message): pass | def on_blocktxn(self, message): pass | ||||
def on_cmpctblock(self, conn, message): pass | def on_cmpctblock(self, message): pass | ||||
def on_feefilter(self, conn, message): pass | def on_feefilter(self, message): pass | ||||
def on_getaddr(self, conn, message): pass | def on_getaddr(self, message): pass | ||||
def on_getblocks(self, conn, message): pass | def on_getblocks(self, message): pass | ||||
def on_getblocktxn(self, conn, message): pass | def on_getblocktxn(self, message): pass | ||||
def on_getdata(self, conn, message): pass | def on_getdata(self, message): pass | ||||
def on_getheaders(self, conn, message): pass | def on_getheaders(self, message): pass | ||||
def on_headers(self, conn, message): pass | def on_headers(self, message): pass | ||||
def on_mempool(self, conn): pass | def on_mempool(self, message): pass | ||||
def on_pong(self, conn, message): pass | def on_pong(self, message): pass | ||||
def on_reject(self, conn, message): pass | def on_reject(self, message): pass | ||||
def on_sendcmpct(self, conn, message): pass | def on_sendcmpct(self, message): pass | ||||
def on_sendheaders(self, conn, message): pass | def on_sendheaders(self, message): pass | ||||
def on_tx(self, conn, message): pass | def on_tx(self, message): pass | ||||
def on_inv(self, conn, message): | def on_inv(self, message): | ||||
want = msg_getdata() | want = msg_getdata() | ||||
for i in message.inv: | for i in message.inv: | ||||
if i.type != 0: | if i.type != 0: | ||||
want.inv.append(i) | want.inv.append(i) | ||||
if len(want.inv): | if len(want.inv): | ||||
conn.send_message(want) | self.send_message(want) | ||||
def on_ping(self, conn, message): | def on_ping(self, message): | ||||
conn.send_message(msg_pong(message.nonce)) | self.send_message(msg_pong(message.nonce)) | ||||
def on_verack(self, conn, message): | def on_verack(self, message): | ||||
self.verack_received = True | self.verack_received = True | ||||
def on_version(self, conn, message): | def on_version(self, message): | ||||
assert message.nVersion >= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format( | assert message.nVersion >= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format( | ||||
message.nVersion, MIN_VERSION_SUPPORTED) | message.nVersion, MIN_VERSION_SUPPORTED) | ||||
conn.send_message(msg_verack()) | self.send_message(msg_verack()) | ||||
conn.nServices = message.nServices | self.nServices = message.nServices | ||||
# Connection helper methods | # Connection helper methods | ||||
def add_connection(self, conn): | |||||
self.connection = conn | |||||
def wait_for_disconnect(self, timeout=60): | def wait_for_disconnect(self, timeout=60): | ||||
def test_function(): return not self.connected | def test_function(): return self.state != "connected" | ||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | wait_until(test_function, timeout=timeout, lock=mininode_lock) | ||||
# Message receiving helper methods | # Message receiving helper methods | ||||
def wait_for_block(self, blockhash, timeout=60): | def wait_for_block(self, blockhash, timeout=60): | ||||
def test_function(): return self.last_message.get( | def test_function(): return self.last_message.get( | ||||
"block") and self.last_message["block"].block.rehash() == blockhash | "block") and self.last_message["block"].block.rehash() == blockhash | ||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | wait_until(test_function, timeout=timeout, lock=mininode_lock) | ||||
Show All 18 Lines | def wait_for_inv(self, expected_inv, timeout=60): | ||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | wait_until(test_function, timeout=timeout, lock=mininode_lock) | ||||
def wait_for_verack(self, timeout=60): | def wait_for_verack(self, timeout=60): | ||||
def test_function(): return self.message_count["verack"] | def test_function(): return self.message_count["verack"] | ||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | wait_until(test_function, timeout=timeout, lock=mininode_lock) | ||||
# Message sending helper functions | # Message sending helper functions | ||||
def send_message(self, message): | |||||
if self.connection: | |||||
self.connection.send_message(message) | |||||
else: | |||||
logger.error("Cannot send message. No connection to node!") | |||||
def send_and_ping(self, message): | def send_and_ping(self, message): | ||||
self.send_message(message) | self.send_message(message) | ||||
self.sync_with_ping() | self.sync_with_ping() | ||||
# Sync up with the node | # Sync up with the node | ||||
def sync_with_ping(self, timeout=60): | def sync_with_ping(self, timeout=60): | ||||
self.send_message(msg_ping(nonce=self.ping_counter)) | self.send_message(msg_ping(nonce=self.ping_counter)) | ||||
Show All 36 Lines |