Changeset View
Changeset View
Standalone View
Standalone View
test/functional/test_framework/mininode.py
#!/usr/bin/env python3 | #!/usr/bin/env python3 | ||||
# Copyright (c) 2010 ArtForz -- public domain half-a-node | # Copyright (c) 2010 ArtForz -- public domain half-a-node | ||||
# Copyright (c) 2012 Jeff Garzik | # Copyright (c) 2012 Jeff Garzik | ||||
# Copyright (c) 2010-2016 The Bitcoin Core developers | # Copyright (c) 2010-2016 The Bitcoin Core developers | ||||
# Distributed under the MIT software license, see the accompanying | # Distributed under the MIT software license, see the accompanying | ||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php. | # file COPYING or http://www.opensource.org/licenses/mit-license.php. | ||||
"""Bitcoin P2P network half-a-node. | |||||
# | This python code was modified from ArtForz' public domain half-a-node, as | ||||
# mininode.py - Bitcoin P2P network half-a-node | found in the mini-node branch of http://github.com/jgarzik/pynode. | ||||
# | |||||
# This python code was modified from ArtForz' public domain half-a-node, as | |||||
# found in the mini-node branch of http://github.com/jgarzik/pynode. | |||||
# | |||||
# NodeConn: an object which manages p2p connectivity to a bitcoin node | |||||
# NodeConnCB: a base class that describes the interface for receiving | |||||
# callbacks with network messages from a NodeConn | |||||
# CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....: | |||||
# data structures that should map to corresponding structures in | |||||
# bitcoin/primitives | |||||
# msg_block, msg_tx, msg_headers, etc.: | |||||
# data structures that represent network messages | |||||
# ser_*, deser_*: functions that handle serialization/deserialization | |||||
NodeConn: an object which manages p2p connectivity to a bitcoin node | |||||
NodeConnCB: a base class that describes the interface for receiving | |||||
callbacks with network messages from a NodeConn | |||||
CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....: | |||||
data structures that should map to corresponding structures in | |||||
bitcoin/primitives | |||||
msg_block, msg_tx, msg_headers, etc.: | |||||
data structures that represent network messages | |||||
ser_*, deser_*: functions that handle serialization/deserialization | |||||
""" | |||||
import asyncore | import asyncore | ||||
from codecs import encode | from codecs import encode | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import copy | import copy | ||||
import hashlib | import hashlib | ||||
from io import BytesIO | from io import BytesIO | ||||
import logging | import logging | ||||
Show All 32 Lines | |||||
# Keep our own socket map for asyncore, so that we can track disconnects | # Keep our own socket map for asyncore, so that we can track disconnects | ||||
# ourselves (to workaround an issue with closing an asyncore socket when | # ourselves (to workaround an issue with closing an asyncore socket when | ||||
# using select) | # using select) | ||||
mininode_socket_map = dict() | mininode_socket_map = dict() | ||||
# One lock for synchronizing all data access between the networking thread (see | # One lock for synchronizing all data access between the networking thread (see | ||||
# NetworkThread below) and the thread running the test logic. For simplicity, | # NetworkThread below) and the thread running the test logic. For simplicity, | ||||
# NodeConn acquires this lock whenever delivering a message to to a NodeConnCB, | # NodeConn acquires this lock whenever delivering a message to a NodeConnCB, | ||||
# and whenever adding anything to the send buffer (in send_message()). This | # and whenever adding anything to the send buffer (in send_message()). This | ||||
# lock should be acquired in the thread running the test logic to synchronize | # lock should be acquired in the thread running the test logic to synchronize | ||||
# access to any data shared with the NodeConnCB or NodeConn. | # access to any data shared with the NodeConnCB or NodeConn. | ||||
mininode_lock = RLock() | mininode_lock = RLock() | ||||
# Serialization/deserialization tools | # Serialization/deserialization tools | ||||
▲ Show 20 Lines • Show All 1,292 Lines • ▼ Show 20 Lines | def deliver(self, conn, message): | ||||
try: | try: | ||||
command = message.command.decode('ascii') | command = message.command.decode('ascii') | ||||
self.message_count[command] += 1 | self.message_count[command] += 1 | ||||
self.last_message[command] = message | self.last_message[command] = message | ||||
getattr(self, 'on_' + command)(conn, message) | getattr(self, 'on_' + command)(conn, message) | ||||
except: | except: | ||||
print("ERROR delivering %s (%s)" % (repr(message), | print("ERROR delivering %s (%s)" % (repr(message), | ||||
sys.exc_info()[0])) | sys.exc_info()[0])) | ||||
raise | |||||
def set_deliver_sleep_time(self, value): | def set_deliver_sleep_time(self, value): | ||||
with mininode_lock: | with mininode_lock: | ||||
self.deliver_sleep_time = value | self.deliver_sleep_time = value | ||||
def get_deliver_sleep_time(self): | def get_deliver_sleep_time(self): | ||||
with mininode_lock: | with mininode_lock: | ||||
return self.deliver_sleep_time | return self.deliver_sleep_time | ||||
▲ Show 20 Lines • Show All 118 Lines • ▼ Show 20 Lines | class NodeConnCB(): | ||||
def send_and_ping(self, message): | def send_and_ping(self, message): | ||||
self.send_message(message) | self.send_message(message) | ||||
self.sync_with_ping() | self.sync_with_ping() | ||||
# Sync up with the node | # Sync up with the node | ||||
def sync_with_ping(self, timeout=60): | def sync_with_ping(self, timeout=60): | ||||
self.send_message(msg_ping(nonce=self.ping_counter)) | self.send_message(msg_ping(nonce=self.ping_counter)) | ||||
def test_function(): return self.last_message.get( | def test_function(): | ||||
"pong") and self.last_message["pong"].nonce == self.ping_counter | if not self.last_message.get("pong"): | ||||
return False | |||||
return self.last_message["pong"].nonce == self.ping_counter | |||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | wait_until(test_function, timeout=timeout, lock=mininode_lock) | ||||
self.ping_counter += 1 | self.ping_counter += 1 | ||||
# The actual NodeConn class | # The actual NodeConn class | ||||
# This class provides an interface for a p2p connection to a specified node | # This class provides an interface for a p2p connection to a specified node | ||||
class NodeConn(asyncore.dispatcher): | class NodeConn(asyncore.dispatcher): | ||||
▲ Show 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | def __init__(self, dstaddr, dstport, rpc, callback, net="regtest", services=NODE_NETWORK, send_version=True): | ||||
try: | try: | ||||
self.connect((dstaddr, dstport)) | self.connect((dstaddr, dstport)) | ||||
except: | except: | ||||
self.handle_close() | self.handle_close() | ||||
self.rpc = rpc | self.rpc = rpc | ||||
def handle_connect(self): | def handle_connect(self): | ||||
if self.state != "connected": | if self.state != "connected": | ||||
logger.debug("Connected & Listening: \n") | logger.debug("Connected & Listening: %s:%d" % | ||||
(self.dstaddr, self.dstport)) | |||||
self.state = "connected" | self.state = "connected" | ||||
self.cb.on_open(self) | self.cb.on_open(self) | ||||
def handle_close(self): | def handle_close(self): | ||||
logger.debug("Closing Connection to %s:%d... " % | logger.debug("Closing connection to: %s:%d" % | ||||
(self.dstaddr, self.dstport)) | (self.dstaddr, self.dstport)) | ||||
self.state = "closed" | self.state = "closed" | ||||
self.recvbuf = b"" | self.recvbuf = b"" | ||||
self.sendbuf = b"" | self.sendbuf = b"" | ||||
try: | try: | ||||
self.close() | self.close() | ||||
except: | except: | ||||
pass | pass | ||||
self.cb.on_close(self) | self.cb.on_close(self) | ||||
def handle_read(self): | def handle_read(self): | ||||
try: | |||||
with mininode_lock: | with mininode_lock: | ||||
t = self.recv(READ_BUFFER_SIZE) | t = self.recv(READ_BUFFER_SIZE) | ||||
if len(t) > 0: | if len(t) > 0: | ||||
self.recvbuf += t | self.recvbuf += t | ||||
except: | |||||
pass | |||||
while True: | while True: | ||||
msg = self.got_data() | msg = self.got_data() | ||||
if msg == None: | if msg == None: | ||||
break | break | ||||
self.got_message(msg) | self.got_message(msg) | ||||
def readable(self): | def readable(self): | ||||
▲ Show 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | def got_data(self): | ||||
checksum = self.recvbuf[4 + 12 + 4:4 + 12 + 4 + 4] | checksum = self.recvbuf[4 + 12 + 4:4 + 12 + 4 + 4] | ||||
if len(self.recvbuf) < 4 + 12 + 4 + 4 + msglen: | if len(self.recvbuf) < 4 + 12 + 4 + 4 + msglen: | ||||
return None | return None | ||||
msg = self.recvbuf[4 + 12 + 4 + 4:4 + 12 + 4 + 4 + msglen] | msg = self.recvbuf[4 + 12 + 4 + 4:4 + 12 + 4 + 4 + msglen] | ||||
h = sha256(sha256(msg)) | h = sha256(sha256(msg)) | ||||
if checksum != h[:4]: | if checksum != h[:4]: | ||||
raise ValueError( | raise ValueError( | ||||
"got bad checksum " + repr(self.recvbuf)) | "got bad checksum " + repr(self.recvbuf)) | ||||
self.recvbuf = self.recvbuf[4 + 12 + 4 + 4 + msglen:] | self.recvbuf = self.recvbuf[4+12+4+4+msglen:] | ||||
if command not in self.messagemap: | if command not in self.messagemap: | ||||
logger.warning("Received unknown command from %s:%d: '%s' %s" % | logger.warning("Received unknown command from %s:%d: '%s' %s" % ( | ||||
(self.dstaddr, self.dstport, command, repr(msg))) | self.dstaddr, self.dstport, command, repr(msg))) | ||||
return None | raise ValueError("Unknown command: '%s'" % (command)) | ||||
f = BytesIO(msg) | f = BytesIO(msg) | ||||
m = self.messagemap[command]() | m = self.messagemap[command]() | ||||
m.deserialize(f) | m.deserialize(f) | ||||
return m | return m | ||||
except Exception as e: | except Exception as e: | ||||
logger.exception('got_data:', repr(e)) | logger.exception('got_data:', repr(e)) | ||||
raise | |||||
def send_message(self, message, pushbuf=False): | def send_message(self, message, pushbuf=False): | ||||
if self.state != "connected" and not pushbuf: | if self.state != "connected" and not pushbuf: | ||||
raise IOError('Not connected, no pushbuf') | raise IOError('Not connected, no pushbuf') | ||||
logger.debug("Send message to %s:%d: %s" % | self._log_message("send", message) | ||||
(self.dstaddr, self.dstport, repr(message))) | |||||
command = message.command | command = message.command | ||||
data = message.serialize() | data = message.serialize() | ||||
tmsg = self.MAGIC_BYTES[self.network] | tmsg = self.MAGIC_BYTES[self.network] | ||||
tmsg += command | tmsg += command | ||||
tmsg += b"\x00" * (12 - len(command)) | tmsg += b"\x00" * (12 - len(command)) | ||||
tmsg += struct.pack("<I", len(data)) | tmsg += struct.pack("<I", len(data)) | ||||
if self.ver_send >= 209: | if self.ver_send >= 209: | ||||
th = sha256(data) | th = sha256(data) | ||||
h = sha256(th) | h = sha256(th) | ||||
tmsg += h[:4] | tmsg += h[:4] | ||||
tmsg += data | tmsg += data | ||||
with mininode_lock: | with mininode_lock: | ||||
self.sendbuf += tmsg | self.sendbuf += tmsg | ||||
self.last_sent = time.time() | self.last_sent = time.time() | ||||
def got_message(self, message): | def got_message(self, message): | ||||
if message.command == b"version": | if message.command == b"version": | ||||
if message.nVersion <= BIP0031_VERSION: | if message.nVersion <= BIP0031_VERSION: | ||||
self.messagemap[b'ping'] = msg_ping_prebip31 | self.messagemap[b'ping'] = msg_ping_prebip31 | ||||
if self.last_sent + 30 * 60 < time.time(): | if self.last_sent + 30 * 60 < time.time(): | ||||
self.send_message(self.messagemap[b'ping']()) | self.send_message(self.messagemap[b'ping']()) | ||||
logger.debug("Received message from %s:%d: %s" % | self._log_message("receive", message) | ||||
(self.dstaddr, self.dstport, repr(message))) | |||||
self.cb.deliver(self, message) | self.cb.deliver(self, message) | ||||
def _log_message(self, direction, msg): | |||||
if direction == "send": | |||||
log_message = "Send message to " | |||||
elif direction == "receive": | |||||
log_message = "Received message from " | |||||
log_message += "%s:%d: %s" % (self.dstaddr, | |||||
self.dstport, repr(msg)[:500]) | |||||
if len(log_message) > 500: | |||||
log_message += "... (msg truncated)" | |||||
logger.debug(log_message) | |||||
def disconnect_node(self): | def disconnect_node(self): | ||||
self.disconnect = True | self.disconnect = True | ||||
class NetworkThread(Thread): | class NetworkThread(Thread): | ||||
def run(self): | def run(self): | ||||
while mininode_socket_map: | while mininode_socket_map: | ||||
Show All 21 Lines |