Changeset View
Changeset View
Standalone View
Standalone View
test/functional/test_framework/mininode.py
Show All 32 Lines | |||||
import sys | import sys | ||||
import time | import time | ||||
from threading import RLock, Thread | from threading import RLock, Thread | ||||
from test_framework.siphash import siphash256 | from test_framework.siphash import siphash256 | ||||
from test_framework.cdefs import MAX_BLOCK_SIGOPS_PER_MB | from test_framework.cdefs import MAX_BLOCK_SIGOPS_PER_MB | ||||
from test_framework.util import hex_str_to_bytes, bytes_to_hex_str, wait_until | from test_framework.util import hex_str_to_bytes, bytes_to_hex_str, wait_until | ||||
BIP0031_VERSION = 60000 | MIN_VERSION_SUPPORTED = 60001 | ||||
MY_VERSION = 70014 # past bip-31 for ping/pong | MY_VERSION = 70014 # past bip-31 for ping/pong | ||||
MY_SUBVERSION = b"/python-mininode-tester:0.0.3/" | MY_SUBVERSION = b"/python-mininode-tester:0.0.3/" | ||||
# from version 70001 onwards, fRelay should be appended to version messages (BIP37) | # from version 70001 onwards, fRelay should be appended to version messages (BIP37) | ||||
MY_RELAY = 1 | MY_RELAY = 1 | ||||
MAX_INV_SZ = 50000 | MAX_INV_SZ = 50000 | ||||
COIN = 100000000 # 1 btc in satoshis | COIN = 100000000 # 1 btc in satoshis | ||||
▲ Show 20 Lines • Show All 927 Lines • ▼ Show 20 Lines | class msg_getaddr(): | ||||
def serialize(self): | def serialize(self): | ||||
return b"" | return b"" | ||||
def __repr__(self): | def __repr__(self): | ||||
return "msg_getaddr()" | return "msg_getaddr()" | ||||
class msg_ping_prebip31(): | |||||
command = b"ping" | |||||
def __init__(self): | |||||
pass | |||||
def deserialize(self, f): | |||||
pass | |||||
def serialize(self): | |||||
return b"" | |||||
def __repr__(self): | |||||
return "msg_ping() (pre-bip31)" | |||||
class msg_ping(): | class msg_ping(): | ||||
command = b"ping" | command = b"ping" | ||||
def __init__(self, nonce=0): | def __init__(self, nonce=0): | ||||
self.nonce = nonce | self.nonce = nonce | ||||
def deserialize(self, f): | def deserialize(self, f): | ||||
self.nonce = struct.unpack("<Q", f.read(8))[0] | self.nonce = struct.unpack("<Q", f.read(8))[0] | ||||
▲ Show 20 Lines • Show All 232 Lines • ▼ Show 20 Lines | class msg_blocktxn(): | ||||
def __repr__(self): | def __repr__(self): | ||||
return "msg_blocktxn(block_transactions=%s)" % (repr(self.block_transactions)) | return "msg_blocktxn(block_transactions=%s)" % (repr(self.block_transactions)) | ||||
class NodeConnCB(): | class NodeConnCB(): | ||||
"""Callback and helper functions for P2P connection to a bitcoind node. | """Callback and helper functions for P2P connection to a bitcoind node. | ||||
Individual testcases should subclass this and override the on_* methods | Individual testcases should subclass this and override the on_* methods | ||||
if they want to alter message handling behaviour. | if they want to alter message handling behaviour.""" | ||||
""" | |||||
def __init__(self): | def __init__(self): | ||||
# Track whether we have a P2P connection open to the node | # Track whether we have a P2P connection open to the node | ||||
self.connected = False | self.connected = False | ||||
self.connection = None | self.connection = None | ||||
# Track number of messages of each type received and the most recent | # Track number of messages of each type received and the most recent | ||||
# message of each type | # message of each type | ||||
self.message_count = defaultdict(int) | self.message_count = defaultdict(int) | ||||
self.last_message = {} | self.last_message = {} | ||||
# A count of the number of ping messages we've sent to the node | # A count of the number of ping messages we've sent to the node | ||||
self.ping_counter = 1 | self.ping_counter = 1 | ||||
# deliver_sleep_time is helpful for debugging race conditions in p2p | |||||
# tests; it causes message delivery to sleep for the specified time | |||||
# before acquiring the global lock and delivering the next message. | |||||
self.deliver_sleep_time = None | |||||
# Message receiving methods | # Message receiving methods | ||||
def deliver(self, conn, message): | def deliver(self, conn, message): | ||||
"""Receive message and dispatch message to appropriate callback. | """Receive message and dispatch message to appropriate callback. | ||||
We keep a count of how many of each message type has been received | We keep a count of how many of each message type has been received | ||||
and the most recent message of each type. | and the most recent message of each type.""" | ||||
Optionally waits for deliver_sleep_time before dispatching message. | |||||
""" | |||||
deliver_sleep = self.get_deliver_sleep_time() | |||||
if deliver_sleep is not None: | |||||
time.sleep(deliver_sleep) | |||||
with mininode_lock: | with mininode_lock: | ||||
try: | try: | ||||
command = message.command.decode('ascii') | command = message.command.decode('ascii') | ||||
self.message_count[command] += 1 | self.message_count[command] += 1 | ||||
self.last_message[command] = message | self.last_message[command] = message | ||||
getattr(self, 'on_' + command)(conn, message) | getattr(self, 'on_' + command)(conn, message) | ||||
except: | except: | ||||
print("ERROR delivering %s (%s)" % (repr(message), | print("ERROR delivering %s (%s)" % (repr(message), | ||||
sys.exc_info()[0])) | sys.exc_info()[0])) | ||||
raise | raise | ||||
def get_deliver_sleep_time(self): | |||||
with mininode_lock: | |||||
return self.deliver_sleep_time | |||||
# Callback methods. Can be overridden by subclasses in individual test | # Callback methods. Can be overridden by subclasses in individual test | ||||
# cases to provide custom message handling behaviour. | # cases to provide custom message handling behaviour. | ||||
def on_open(self, conn): | def on_open(self, conn): | ||||
self.connected = True | self.connected = True | ||||
def on_close(self, conn): | def on_close(self, conn): | ||||
self.connected = False | self.connected = False | ||||
Show All 37 Lines | def on_inv(self, conn, message): | ||||
want = msg_getdata() | want = msg_getdata() | ||||
for i in message.inv: | for i in message.inv: | ||||
if i.type != 0: | if i.type != 0: | ||||
want.inv.append(i) | want.inv.append(i) | ||||
if len(want.inv): | if len(want.inv): | ||||
conn.send_message(want) | conn.send_message(want) | ||||
def on_ping(self, conn, message): | def on_ping(self, conn, message): | ||||
if conn.ver_send > BIP0031_VERSION: | |||||
conn.send_message(msg_pong(message.nonce)) | conn.send_message(msg_pong(message.nonce)) | ||||
def on_verack(self, conn, message): | def on_verack(self, conn, message): | ||||
conn.ver_recv = conn.ver_send | conn.ver_recv = conn.ver_send | ||||
self.verack_received = True | self.verack_received = True | ||||
def on_version(self, conn, message): | def on_version(self, conn, message): | ||||
if message.nVersion >= 209: | assert message.nVersion >= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format( | ||||
message.nVersion, MIN_VERSION_SUPPORTED) | |||||
conn.send_message(msg_verack()) | conn.send_message(msg_verack()) | ||||
conn.ver_send = min(MY_VERSION, message.nVersion) | |||||
if message.nVersion < 209: | |||||
conn.ver_recv = conn.ver_send | |||||
conn.nServices = message.nServices | conn.nServices = message.nServices | ||||
# Connection helper methods | # Connection helper methods | ||||
def add_connection(self, conn): | def add_connection(self, conn): | ||||
self.connection = conn | self.connection = conn | ||||
def wait_for_disconnect(self, timeout=60): | def wait_for_disconnect(self, timeout=60): | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | def sync_with_ping(self, timeout=60): | ||||
def test_function(): | def test_function(): | ||||
if not self.last_message.get("pong"): | if not self.last_message.get("pong"): | ||||
return False | return False | ||||
return self.last_message["pong"].nonce == self.ping_counter | return self.last_message["pong"].nonce == self.ping_counter | ||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | wait_until(test_function, timeout=timeout, lock=mininode_lock) | ||||
self.ping_counter += 1 | self.ping_counter += 1 | ||||
# The actual NodeConn class | |||||
# This class provides an interface for a p2p connection to a specified node | |||||
class NodeConn(asyncore.dispatcher): | class NodeConn(asyncore.dispatcher): | ||||
"""The actual NodeConn class | |||||
This class provides an interface for a p2p connection to a specified node.""" | |||||
messagemap = { | messagemap = { | ||||
b"version": msg_version, | b"version": msg_version, | ||||
b"verack": msg_verack, | b"verack": msg_verack, | ||||
b"addr": msg_addr, | b"addr": msg_addr, | ||||
b"inv": msg_inv, | b"inv": msg_inv, | ||||
b"getdata": msg_getdata, | b"getdata": msg_getdata, | ||||
b"getblocks": msg_getblocks, | b"getblocks": msg_getblocks, | ||||
b"tx": msg_tx, | b"tx": msg_tx, | ||||
▲ Show 20 Lines • Show All 114 Lines • ▼ Show 20 Lines | class NodeConn(asyncore.dispatcher): | ||||
def got_data(self): | def got_data(self): | ||||
try: | try: | ||||
with mininode_lock: | with mininode_lock: | ||||
if len(self.recvbuf) < 4: | if len(self.recvbuf) < 4: | ||||
return None | return None | ||||
if self.recvbuf[:4] != self.MAGIC_BYTES[self.network]: | if self.recvbuf[:4] != self.MAGIC_BYTES[self.network]: | ||||
raise ValueError("got garbage %s" % repr(self.recvbuf)) | raise ValueError("got garbage %s" % repr(self.recvbuf)) | ||||
if self.ver_recv < 209: | |||||
if len(self.recvbuf) < 4 + 12 + 4: | |||||
return None | |||||
command = self.recvbuf[4:4 + 12].split(b"\x00", 1)[0] | |||||
msglen = struct.unpack( | |||||
"<i", self.recvbuf[4 + 12:4 + 12 + 4])[0] | |||||
checksum = None | |||||
if len(self.recvbuf) < 4 + 12 + 4 + msglen: | |||||
return None | |||||
msg = self.recvbuf[4 + 12 + 4:4 + 12 + 4 + msglen] | |||||
self.recvbuf = self.recvbuf[4 + 12 + 4 + msglen:] | |||||
else: | |||||
if len(self.recvbuf) < 4 + 12 + 4 + 4: | if len(self.recvbuf) < 4 + 12 + 4 + 4: | ||||
return None | return | ||||
command = self.recvbuf[4:4 + 12].split(b"\x00", 1)[0] | command = self.recvbuf[4:4+12].split(b"\x00", 1)[0] | ||||
msglen = struct.unpack( | msglen = struct.unpack("<i", self.recvbuf[4+12:4+12+4])[0] | ||||
"<i", self.recvbuf[4 + 12:4 + 12 + 4])[0] | |||||
checksum = self.recvbuf[4 + 12 + 4:4 + 12 + 4 + 4] | checksum = self.recvbuf[4+12+4:4+12+4+4] | ||||
if len(self.recvbuf) < 4 + 12 + 4 + 4 + msglen: | if len(self.recvbuf) < 4 + 12 + 4 + 4 + msglen: | ||||
return None | return | ||||
msg = self.recvbuf[4 + 12 + 4 + 4:4 + 12 + 4 + 4 + msglen] | msg = self.recvbuf[4+12+4+4:4+12+4+4+msglen] | ||||
h = sha256(sha256(msg)) | h = sha256(sha256(msg)) | ||||
if checksum != h[:4]: | if checksum != h[:4]: | ||||
raise ValueError( | raise ValueError("got bad checksum " + repr(self.recvbuf)) | ||||
"got bad checksum " + repr(self.recvbuf)) | |||||
self.recvbuf = self.recvbuf[4+12+4+4+msglen:] | self.recvbuf = self.recvbuf[4+12+4+4+msglen:] | ||||
if command not in self.messagemap: | if command not in self.messagemap: | ||||
logger.warning("Received unknown command from %s:%d: '%s' %s" % ( | raise ValueError("Received unknown command from %s:%d: '%s' %s" % ( | ||||
self.dstaddr, self.dstport, command, repr(msg))) | self.dstaddr, self.dstport, command, repr(msg))) | ||||
raise ValueError("Unknown command: '%s'" % (command)) | |||||
f = BytesIO(msg) | f = BytesIO(msg) | ||||
m = self.messagemap[command]() | m = self.messagemap[command]() | ||||
m.deserialize(f) | m.deserialize(f) | ||||
return m | return m | ||||
except Exception as e: | except Exception as e: | ||||
logger.exception('got_data:', repr(e)) | logger.exception('Error reading message:', repr(e)) | ||||
raise | raise | ||||
def send_message(self, message, pushbuf=False): | def send_message(self, message, pushbuf=False): | ||||
if self.state != "connected" and not pushbuf: | if self.state != "connected" and not pushbuf: | ||||
raise IOError('Not connected, no pushbuf') | raise IOError('Not connected, no pushbuf') | ||||
self._log_message("send", message) | self._log_message("send", message) | ||||
command = message.command | command = message.command | ||||
data = message.serialize() | data = message.serialize() | ||||
tmsg = self.MAGIC_BYTES[self.network] | tmsg = self.MAGIC_BYTES[self.network] | ||||
tmsg += command | tmsg += command | ||||
tmsg += b"\x00" * (12 - len(command)) | tmsg += b"\x00" * (12 - len(command)) | ||||
tmsg += struct.pack("<I", len(data)) | tmsg += struct.pack("<I", len(data)) | ||||
if self.ver_send >= 209: | |||||
th = sha256(data) | th = sha256(data) | ||||
h = sha256(th) | h = sha256(th) | ||||
tmsg += h[:4] | tmsg += h[:4] | ||||
tmsg += data | tmsg += data | ||||
with mininode_lock: | with mininode_lock: | ||||
if (len(self.sendbuf) == 0 and not pushbuf): | if (len(self.sendbuf) == 0 and not pushbuf): | ||||
try: | try: | ||||
sent = self.send(tmsg) | sent = self.send(tmsg) | ||||
self.sendbuf = tmsg[sent:] | self.sendbuf = tmsg[sent:] | ||||
except BlockingIOError: | except BlockingIOError: | ||||
self.sendbuf = tmsg | self.sendbuf = tmsg | ||||
else: | else: | ||||
self.sendbuf += tmsg | self.sendbuf += tmsg | ||||
self.last_sent = time.time() | self.last_sent = time.time() | ||||
def got_message(self, message): | def got_message(self, message): | ||||
if message.command == b"version": | |||||
if message.nVersion <= BIP0031_VERSION: | |||||
self.messagemap[b'ping'] = msg_ping_prebip31 | |||||
if self.last_sent + 30 * 60 < time.time(): | if self.last_sent + 30 * 60 < time.time(): | ||||
self.send_message(self.messagemap[b'ping']()) | self.send_message(self.messagemap[b'ping']()) | ||||
self._log_message("receive", message) | self._log_message("receive", message) | ||||
self.cb.deliver(self, message) | self.cb.deliver(self, message) | ||||
def _log_message(self, direction, msg): | def _log_message(self, direction, msg): | ||||
if direction == "send": | if direction == "send": | ||||
log_message = "Send message to " | log_message = "Send message to " | ||||
Show All 18 Lines | def run(self): | ||||
# select | # select | ||||
disconnected = [] | disconnected = [] | ||||
for fd, obj in mininode_socket_map.items(): | for fd, obj in mininode_socket_map.items(): | ||||
if obj.disconnect: | if obj.disconnect: | ||||
disconnected.append(obj) | disconnected.append(obj) | ||||
[obj.handle_close() for obj in disconnected] | [obj.handle_close() for obj in disconnected] | ||||
asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) | asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) | ||||
logger.debug("Network thread closing") | logger.debug("Network thread closing") | ||||
# An exception we can raise if we detect a potential disconnect | |||||
# (p2p or rpc) before the test is complete | |||||
class EarlyDisconnectError(Exception): | |||||
def __init__(self, value): | |||||
self.value = value | |||||
def __str__(self): | |||||
return repr(self.value) |