Changeset View
Changeset View
Standalone View
Standalone View
test/functional/test_framework/mininode.py
Show All 21 Lines | |||||
import sys | import sys | ||||
import time | import time | ||||
from threading import RLock, Thread | from threading import RLock, Thread | ||||
from test_framework.messages import * | from test_framework.messages import * | ||||
logger = logging.getLogger("TestFramework.mininode") | logger = logging.getLogger("TestFramework.mininode") | ||||
# Keep our own socket map for asyncore, so that we can track disconnects | MESSAGEMAP = { | ||||
# ourselves (to workaround an issue with closing an asyncore socket when | b"addr": msg_addr, | ||||
# using select) | b"block": msg_block, | ||||
mininode_socket_map = dict() | b"blocktxn": msg_blocktxn, | ||||
b"cmpctblock": msg_cmpctblock, | |||||
b"feefilter": msg_feefilter, | |||||
b"getaddr": msg_getaddr, | |||||
b"getblocks": msg_getblocks, | |||||
b"getblocktxn": msg_getblocktxn, | |||||
b"getdata": msg_getdata, | |||||
b"getheaders": msg_getheaders, | |||||
b"headers": msg_headers, | |||||
b"inv": msg_inv, | |||||
b"mempool": msg_mempool, | |||||
b"ping": msg_ping, | |||||
b"pong": msg_pong, | |||||
b"reject": msg_reject, | |||||
b"sendcmpct": msg_sendcmpct, | |||||
b"sendheaders": msg_sendheaders, | |||||
b"tx": msg_tx, | |||||
b"verack": msg_verack, | |||||
b"version": msg_version, | |||||
} | |||||
# One lock for synchronizing all data access between the networking thread (see | MAGIC_BYTES = { | ||||
# NetworkThread below) and the thread running the test logic. For simplicity, | "mainnet": b"\xe3\xe1\xf3\xe8", | ||||
# NodeConn acquires this lock whenever delivering a message to a NodeConnCB, | "testnet3": b"\xf4\xe5\xf3\xf4", | ||||
# and whenever adding anything to the send buffer (in send_message()). This | "regtest": b"\xda\xb5\xbf\xfa", | ||||
# lock should be acquired in the thread running the test logic to synchronize | } | ||||
# access to any data shared with the NodeConnCB or NodeConn. | |||||
mininode_lock = RLock() | |||||
class NodeConnCB(): | class NodeConnCB(): | ||||
"""Callback and helper functions for P2P connection to a bitcoind node. | """Callback and helper functions for P2P connection to a bitcoind node. | ||||
Individual testcases should subclass this and override the on_* methods | Individual testcases should subclass this and override the on_* methods | ||||
if they want to alter message handling behaviour.""" | if they want to alter message handling behaviour.""" | ||||
▲ Show 20 Lines • Show All 155 Lines • ▼ Show 20 Lines | def sync_with_ping(self, timeout=60): | ||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | wait_until(test_function, timeout=timeout, lock=mininode_lock) | ||||
self.ping_counter += 1 | self.ping_counter += 1 | ||||
class NodeConn(asyncore.dispatcher): | class NodeConn(asyncore.dispatcher): | ||||
"""The actual NodeConn class | """The actual NodeConn class | ||||
This class provides an interface for a p2p connection to a specified node.""" | This class provides an interface for a p2p connection to a specified node.""" | ||||
messagemap = { | |||||
b"version": msg_version, | |||||
b"verack": msg_verack, | |||||
b"addr": msg_addr, | |||||
b"inv": msg_inv, | |||||
b"getdata": msg_getdata, | |||||
b"getblocks": msg_getblocks, | |||||
b"tx": msg_tx, | |||||
b"block": msg_block, | |||||
b"getaddr": msg_getaddr, | |||||
b"ping": msg_ping, | |||||
b"pong": msg_pong, | |||||
b"headers": msg_headers, | |||||
b"getheaders": msg_getheaders, | |||||
b"reject": msg_reject, | |||||
b"mempool": msg_mempool, | |||||
b"feefilter": msg_feefilter, | |||||
b"sendheaders": msg_sendheaders, | |||||
b"sendcmpct": msg_sendcmpct, | |||||
b"cmpctblock": msg_cmpctblock, | |||||
b"getblocktxn": msg_getblocktxn, | |||||
b"blocktxn": msg_blocktxn | |||||
} | |||||
MAGIC_BYTES = { | |||||
"mainnet": b"\xe3\xe1\xf3\xe8", | |||||
"testnet3": b"\xf4\xe5\xf3\xf4", | |||||
"regtest": b"\xda\xb5\xbf\xfa", | |||||
} | |||||
def __init__(self, dstaddr, dstport, rpc, callback, net="regtest", services=NODE_NETWORK, send_version=True): | def __init__(self, dstaddr, dstport, rpc, callback, net="regtest", services=NODE_NETWORK, send_version=True): | ||||
asyncore.dispatcher.__init__(self, map=mininode_socket_map) | asyncore.dispatcher.__init__(self, map=mininode_socket_map) | ||||
self.dstaddr = dstaddr | self.dstaddr = dstaddr | ||||
self.dstport = dstport | self.dstport = dstport | ||||
self.create_socket(socket.AF_INET, socket.SOCK_STREAM) | self.create_socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | ||||
self.sendbuf = b"" | self.sendbuf = b"" | ||||
Show All 21 Lines | def __init__(self, dstaddr, dstport, rpc, callback, net="regtest", services=NODE_NETWORK, send_version=True): | ||||
(self.dstaddr, self.dstport)) | (self.dstaddr, self.dstport)) | ||||
try: | try: | ||||
self.connect((dstaddr, dstport)) | self.connect((dstaddr, dstport)) | ||||
except: | except: | ||||
self.handle_close() | self.handle_close() | ||||
self.rpc = rpc | self.rpc = rpc | ||||
# Connection and disconnection methods | |||||
def handle_connect(self): | def handle_connect(self): | ||||
if self.state != "connected": | if self.state != "connected": | ||||
logger.debug("Connected & Listening: %s:%d" % | logger.debug("Connected & Listening: %s:%d" % | ||||
(self.dstaddr, self.dstport)) | (self.dstaddr, self.dstport)) | ||||
self.state = "connected" | self.state = "connected" | ||||
self.cb.on_open(self) | self.cb.on_open(self) | ||||
def handle_close(self): | def handle_close(self): | ||||
logger.debug("Closing connection to: %s:%d" % | logger.debug("Closing connection to: %s:%d" % | ||||
(self.dstaddr, self.dstport)) | (self.dstaddr, self.dstport)) | ||||
self.state = "closed" | self.state = "closed" | ||||
self.recvbuf = b"" | self.recvbuf = b"" | ||||
self.sendbuf = b"" | self.sendbuf = b"" | ||||
try: | try: | ||||
self.close() | self.close() | ||||
except: | except: | ||||
pass | pass | ||||
self.cb.on_close(self) | self.cb.on_close(self) | ||||
def disconnect_node(self): | |||||
""" Disconnect the p2p connection. | |||||
Called by the test logic thread. Causes the p2p connection | |||||
to be disconnected on the next iteration of the asyncore loop.""" | |||||
self.disconnect = True | |||||
# Socket read methods | |||||
def readable(self): | |||||
return True | |||||
def handle_read(self): | def handle_read(self): | ||||
with mininode_lock: | with mininode_lock: | ||||
t = self.recv(READ_BUFFER_SIZE) | t = self.recv(READ_BUFFER_SIZE) | ||||
if len(t) > 0: | if len(t) > 0: | ||||
self.recvbuf += t | self.recvbuf += t | ||||
while True: | while True: | ||||
msg = self.got_data() | msg = self.got_data() | ||||
if msg == None: | if msg == None: | ||||
break | break | ||||
self.got_message(msg) | self.got_message(msg) | ||||
def readable(self): | |||||
return True | |||||
def writable(self): | |||||
with mininode_lock: | |||||
pre_connection = self.state == "connecting" | |||||
length = len(self.sendbuf) | |||||
return (length > 0 or pre_connection) | |||||
def handle_write(self): | |||||
with mininode_lock: | |||||
# asyncore does not expose socket connection, only the first read/write | |||||
# event, thus we must check connection manually here to know when we | |||||
# actually connect | |||||
if self.state == "connecting": | |||||
self.handle_connect() | |||||
if not self.writable(): | |||||
return | |||||
try: | |||||
sent = self.send(self.sendbuf) | |||||
except: | |||||
self.handle_close() | |||||
return | |||||
self.sendbuf = self.sendbuf[sent:] | |||||
def got_data(self): | def got_data(self): | ||||
try: | try: | ||||
with mininode_lock: | with mininode_lock: | ||||
if len(self.recvbuf) < 4: | if len(self.recvbuf) < 4: | ||||
return None | return None | ||||
if self.recvbuf[:4] != self.MAGIC_BYTES[self.network]: | if self.recvbuf[:4] != MAGIC_BYTES[self.network]: | ||||
raise ValueError("got garbage %s" % repr(self.recvbuf)) | raise ValueError("got garbage %s" % repr(self.recvbuf)) | ||||
if len(self.recvbuf) < 4 + 12 + 4 + 4: | if len(self.recvbuf) < 4 + 12 + 4 + 4: | ||||
return | return | ||||
command = self.recvbuf[4:4+12].split(b"\x00", 1)[0] | command = self.recvbuf[4:4+12].split(b"\x00", 1)[0] | ||||
msglen = struct.unpack("<i", self.recvbuf[4+12:4+12+4])[0] | msglen = struct.unpack("<i", self.recvbuf[4+12:4+12+4])[0] | ||||
checksum = self.recvbuf[4+12+4:4+12+4+4] | checksum = self.recvbuf[4+12+4:4+12+4+4] | ||||
if len(self.recvbuf) < 4 + 12 + 4 + 4 + msglen: | if len(self.recvbuf) < 4 + 12 + 4 + 4 + msglen: | ||||
return | return | ||||
msg = self.recvbuf[4+12+4+4:4+12+4+4+msglen] | msg = self.recvbuf[4+12+4+4:4+12+4+4+msglen] | ||||
h = sha256(sha256(msg)) | h = sha256(sha256(msg)) | ||||
if checksum != h[:4]: | if checksum != h[:4]: | ||||
raise ValueError("got bad checksum " + repr(self.recvbuf)) | raise ValueError("got bad checksum " + repr(self.recvbuf)) | ||||
self.recvbuf = self.recvbuf[4+12+4+4+msglen:] | self.recvbuf = self.recvbuf[4+12+4+4+msglen:] | ||||
if command not in self.messagemap: | if command not in MESSAGEMAP: | ||||
raise ValueError("Received unknown command from %s:%d: '%s' %s" % ( | raise ValueError("Received unknown command from %s:%d: '%s' %s" % ( | ||||
self.dstaddr, self.dstport, command, repr(msg))) | self.dstaddr, self.dstport, command, repr(msg))) | ||||
f = BytesIO(msg) | f = BytesIO(msg) | ||||
m = self.messagemap[command]() | m = MESSAGEMAP[command]() | ||||
m.deserialize(f) | m.deserialize(f) | ||||
return m | return m | ||||
except Exception as e: | except Exception as e: | ||||
logger.exception('Error reading message:', repr(e)) | logger.exception('Error reading message:', repr(e)) | ||||
raise | raise | ||||
def got_message(self, message): | |||||
if self.last_sent + 30 * 60 < time.time(): | |||||
self.send_message(MESSAGEMAP[b'ping']()) | |||||
self._log_message("receive", message) | |||||
self.cb.deliver(self, message) | |||||
# Socket write methods | |||||
def writable(self): | |||||
with mininode_lock: | |||||
pre_connection = self.state == "connecting" | |||||
length = len(self.sendbuf) | |||||
return (length > 0 or pre_connection) | |||||
def handle_write(self): | |||||
with mininode_lock: | |||||
# asyncore does not expose socket connection, only the first read/write | |||||
# event, thus we must check connection manually here to know when we | |||||
# actually connect | |||||
if self.state == "connecting": | |||||
self.handle_connect() | |||||
if not self.writable(): | |||||
return | |||||
try: | |||||
sent = self.send(self.sendbuf) | |||||
except: | |||||
self.handle_close() | |||||
return | |||||
self.sendbuf = self.sendbuf[sent:] | |||||
def send_message(self, message, pushbuf=False): | def send_message(self, message, pushbuf=False): | ||||
if self.state != "connected" and not pushbuf: | if self.state != "connected" and not pushbuf: | ||||
raise IOError('Not connected, no pushbuf') | raise IOError('Not connected, no pushbuf') | ||||
self._log_message("send", message) | self._log_message("send", message) | ||||
command = message.command | command = message.command | ||||
data = message.serialize() | data = message.serialize() | ||||
tmsg = self.MAGIC_BYTES[self.network] | tmsg = MAGIC_BYTES[self.network] | ||||
tmsg += command | tmsg += command | ||||
tmsg += b"\x00" * (12 - len(command)) | tmsg += b"\x00" * (12 - len(command)) | ||||
tmsg += struct.pack("<I", len(data)) | tmsg += struct.pack("<I", len(data)) | ||||
th = sha256(data) | th = sha256(data) | ||||
h = sha256(th) | h = sha256(th) | ||||
tmsg += h[:4] | tmsg += h[:4] | ||||
tmsg += data | tmsg += data | ||||
with mininode_lock: | with mininode_lock: | ||||
if (len(self.sendbuf) == 0 and not pushbuf): | if (len(self.sendbuf) == 0 and not pushbuf): | ||||
try: | try: | ||||
sent = self.send(tmsg) | sent = self.send(tmsg) | ||||
self.sendbuf = tmsg[sent:] | self.sendbuf = tmsg[sent:] | ||||
except BlockingIOError: | except BlockingIOError: | ||||
self.sendbuf = tmsg | self.sendbuf = tmsg | ||||
else: | else: | ||||
self.sendbuf += tmsg | self.sendbuf += tmsg | ||||
self.last_sent = time.time() | self.last_sent = time.time() | ||||
def got_message(self, message): | # Class utility methods | ||||
if self.last_sent + 30 * 60 < time.time(): | |||||
self.send_message(self.messagemap[b'ping']()) | |||||
self._log_message("receive", message) | |||||
self.cb.deliver(self, message) | |||||
def _log_message(self, direction, msg): | def _log_message(self, direction, msg): | ||||
if direction == "send": | if direction == "send": | ||||
log_message = "Send message to " | log_message = "Send message to " | ||||
elif direction == "receive": | elif direction == "receive": | ||||
log_message = "Received message from " | log_message = "Received message from " | ||||
log_message += "%s:%d: %s" % (self.dstaddr, | log_message += "%s:%d: %s" % (self.dstaddr, | ||||
self.dstport, repr(msg)[:500]) | self.dstport, repr(msg)[:500]) | ||||
if len(log_message) > 500: | if len(log_message) > 500: | ||||
log_message += "... (msg truncated)" | log_message += "... (msg truncated)" | ||||
logger.debug(log_message) | logger.debug(log_message) | ||||
def disconnect_node(self): | |||||
self.disconnect = True | # Keep our own socket map for asyncore, so that we can track disconnects | ||||
# ourselves (to workaround an issue with closing an asyncore socket when | |||||
# using select) | |||||
mininode_socket_map = dict() | |||||
# One lock for synchronizing all data access between the networking thread (see | |||||
# NetworkThread below) and the thread running the test logic. For simplicity, | |||||
# NodeConn acquires this lock whenever delivering a message to a NodeConnCB, | |||||
# and whenever adding anything to the send buffer (in send_message()). This | |||||
# lock should be acquired in the thread running the test logic to synchronize | |||||
# access to any data shared with the NodeConnCB or NodeConn. | |||||
mininode_lock = RLock() | |||||
class NetworkThread(Thread): | class NetworkThread(Thread): | ||||
def run(self): | def run(self): | ||||
while mininode_socket_map: | while mininode_socket_map: | ||||
# We check for whether to disconnect outside of the asyncore | # We check for whether to disconnect outside of the asyncore | ||||
# loop to workaround the behavior of asyncore when using | # loop to workaround the behavior of asyncore when using | ||||
# select | # select | ||||
disconnected = [] | disconnected = [] | ||||
for fd, obj in mininode_socket_map.items(): | for fd, obj in mininode_socket_map.items(): | ||||
if obj.disconnect: | if obj.disconnect: | ||||
disconnected.append(obj) | disconnected.append(obj) | ||||
[obj.handle_close() for obj in disconnected] | [obj.handle_close() for obj in disconnected] | ||||
asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) | asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) | ||||
logger.debug("Network thread closing") | logger.debug("Network thread closing") |