diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py --- a/test/functional/test_framework/mininode.py +++ b/test/functional/test_framework/mininode.py @@ -27,18 +27,35 @@ logger = logging.getLogger("TestFramework.mininode") -# Keep our own socket map for asyncore, so that we can track disconnects -# ourselves (to workaround an issue with closing an asyncore socket when -# using select) -mininode_socket_map = dict() - -# One lock for synchronizing all data access between the networking thread (see -# NetworkThread below) and the thread running the test logic. For simplicity, -# NodeConn acquires this lock whenever delivering a message to a NodeConnCB, -# and whenever adding anything to the send buffer (in send_message()). This -# lock should be acquired in the thread running the test logic to synchronize -# access to any data shared with the NodeConnCB or NodeConn. -mininode_lock = RLock() +MESSAGEMAP = { + b"addr": msg_addr, + b"block": msg_block, + b"blocktxn": msg_blocktxn, + b"cmpctblock": msg_cmpctblock, + b"feefilter": msg_feefilter, + b"getaddr": msg_getaddr, + b"getblocks": msg_getblocks, + b"getblocktxn": msg_getblocktxn, + b"getdata": msg_getdata, + b"getheaders": msg_getheaders, + b"headers": msg_headers, + b"inv": msg_inv, + b"mempool": msg_mempool, + b"ping": msg_ping, + b"pong": msg_pong, + b"reject": msg_reject, + b"sendcmpct": msg_sendcmpct, + b"sendheaders": msg_sendheaders, + b"tx": msg_tx, + b"verack": msg_verack, + b"version": msg_version, +} + +MAGIC_BYTES = { + "mainnet": b"\xe3\xe1\xf3\xe8", + "testnet3": b"\xf4\xe5\xf3\xf4", + "regtest": b"\xda\xb5\xbf\xfa", +} class NodeConnCB(): @@ -210,35 +227,6 @@ """The actual NodeConn class This class provides an interface for a p2p connection to a specified node.""" - messagemap = { - b"version": msg_version, - b"verack": msg_verack, - b"addr": msg_addr, - b"inv": msg_inv, - b"getdata": msg_getdata, - b"getblocks": msg_getblocks, - b"tx": msg_tx, - b"block": msg_block, - b"getaddr": msg_getaddr, - b"ping": msg_ping, - b"pong": msg_pong, - b"headers": msg_headers, - b"getheaders": msg_getheaders, - b"reject": msg_reject, - b"mempool": msg_mempool, - b"feefilter": msg_feefilter, - b"sendheaders": msg_sendheaders, - b"sendcmpct": msg_sendcmpct, - b"cmpctblock": msg_cmpctblock, - b"getblocktxn": msg_getblocktxn, - b"blocktxn": msg_blocktxn - } - - MAGIC_BYTES = { - "mainnet": b"\xe3\xe1\xf3\xe8", - "testnet3": b"\xf4\xe5\xf3\xf4", - "regtest": b"\xda\xb5\xbf\xfa", - } def __init__(self, dstaddr, dstport, rpc, callback, net="regtest", services=NODE_NETWORK, send_version=True): asyncore.dispatcher.__init__(self, map=mininode_socket_map) @@ -276,6 +264,8 @@ self.handle_close() self.rpc = rpc + # Connection and disconnection methods + def handle_connect(self): if self.state != "connected": logger.debug("Connected & Listening: %s:%d" % @@ -295,6 +285,18 @@ pass self.cb.on_close(self) + def disconnect_node(self): + """ Disconnect the p2p connection. + + Called by the test logic thread. Causes the p2p connection + to be disconnected on the next iteration of the asyncore loop.""" + self.disconnect = True + + # Socket read methods + + def readable(self): + return True + def handle_read(self): with mininode_lock: t = self.recv(READ_BUFFER_SIZE) @@ -307,38 +309,12 @@ break self.got_message(msg) - def readable(self): - return True - - def writable(self): - with mininode_lock: - pre_connection = self.state == "connecting" - length = len(self.sendbuf) - return (length > 0 or pre_connection) - - def handle_write(self): - with mininode_lock: - # asyncore does not expose socket connection, only the first read/write - # event, thus we must check connection manually here to know when we - # actually connect - if self.state == "connecting": - self.handle_connect() - if not self.writable(): - return - - try: - sent = self.send(self.sendbuf) - except: - self.handle_close() - return - self.sendbuf = self.sendbuf[sent:] - def got_data(self): try: with mininode_lock: if len(self.recvbuf) < 4: return None - if self.recvbuf[:4] != self.MAGIC_BYTES[self.network]: + if self.recvbuf[:4] != MAGIC_BYTES[self.network]: raise ValueError("got garbage %s" % repr(self.recvbuf)) if len(self.recvbuf) < 4 + 12 + 4 + 4: return @@ -352,24 +328,55 @@ if checksum != h[:4]: raise ValueError("got bad checksum " + repr(self.recvbuf)) self.recvbuf = self.recvbuf[4+12+4+4+msglen:] - if command not in self.messagemap: + if command not in MESSAGEMAP: raise ValueError("Received unknown command from %s:%d: '%s' %s" % ( self.dstaddr, self.dstport, command, repr(msg))) f = BytesIO(msg) - m = self.messagemap[command]() + m = MESSAGEMAP[command]() m.deserialize(f) return m except Exception as e: logger.exception('Error reading message:', repr(e)) raise + def got_message(self, message): + if self.last_sent + 30 * 60 < time.time(): + self.send_message(MESSAGEMAP[b'ping']()) + self._log_message("receive", message) + self.cb.deliver(self, message) + + # Socket write methods + + def writable(self): + with mininode_lock: + pre_connection = self.state == "connecting" + length = len(self.sendbuf) + return (length > 0 or pre_connection) + + def handle_write(self): + with mininode_lock: + # asyncore does not expose socket connection, only the first read/write + # event, thus we must check connection manually here to know when we + # actually connect + if self.state == "connecting": + self.handle_connect() + if not self.writable(): + return + + try: + sent = self.send(self.sendbuf) + except: + self.handle_close() + return + self.sendbuf = self.sendbuf[sent:] + def send_message(self, message, pushbuf=False): if self.state != "connected" and not pushbuf: raise IOError('Not connected, no pushbuf') self._log_message("send", message) command = message.command data = message.serialize() - tmsg = self.MAGIC_BYTES[self.network] + tmsg = MAGIC_BYTES[self.network] tmsg += command tmsg += b"\x00" * (12 - len(command)) tmsg += struct.pack("