Changeset View
Changeset View
Standalone View
Standalone View
test/functional/test_framework/mininode.py
Show First 20 Lines • Show All 104 Lines • ▼ Show 20 Lines | class P2PConnection(asyncore.dispatcher): | ||||
def __init__(self): | def __init__(self): | ||||
# All P2PConnections must be created before starting the NetworkThread. | # All P2PConnections must be created before starting the NetworkThread. | ||||
# assert that the network thread is not running. | # assert that the network thread is not running. | ||||
assert not network_thread_running() | assert not network_thread_running() | ||||
super().__init__(map=mininode_socket_map) | super().__init__(map=mininode_socket_map) | ||||
self._conn_open = False | |||||
@property | |||||
def is_connected(self): | |||||
return self._conn_open | |||||
def peer_connect(self, dstaddr, dstport, net="regtest"): | def peer_connect(self, dstaddr, dstport, net="regtest"): | ||||
self.dstaddr = dstaddr | self.dstaddr = dstaddr | ||||
self.dstport = dstport | self.dstport = dstport | ||||
self.create_socket(socket.AF_INET, socket.SOCK_STREAM) | self.create_socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | ||||
self.sendbuf = b"" | self.sendbuf = b"" | ||||
self.recvbuf = b"" | self.recvbuf = b"" | ||||
self.state = "connecting" | self._asyncore_pre_connection = True | ||||
self.network = net | self.network = net | ||||
self.disconnect = False | self.disconnect = False | ||||
logger.debug('Connecting to Bitcoin Node: {}:{}'.format( | logger.debug('Connecting to Bitcoin Node: {}:{}'.format( | ||||
self.dstaddr, self.dstport)) | self.dstaddr, self.dstport)) | ||||
try: | try: | ||||
self.connect((dstaddr, dstport)) | self.connect((dstaddr, dstport)) | ||||
except: | except: | ||||
self.handle_close() | self.handle_close() | ||||
def peer_disconnect(self): | def peer_disconnect(self): | ||||
# Connection could have already been closed by other end. | # Connection could have already been closed by other end. | ||||
if self.state == "connected": | if self.is_connected: | ||||
self.disconnect_node() | # Signal asyncore to disconnect | ||||
self.disconnect = True | |||||
# Connection and disconnection methods | # Connection and disconnection methods | ||||
def handle_connect(self): | def handle_connect(self): | ||||
"""asyncore callback when a connection is opened.""" | """asyncore callback when a connection is opened.""" | ||||
if self.state != "connected": | if not self.is_connected: | ||||
logger.debug("Connected & Listening: {}:{}".format( | logger.debug("Connected & Listening: {}:{}".format( | ||||
self.dstaddr, self.dstport)) | self.dstaddr, self.dstport)) | ||||
self.state = "connected" | self._conn_open = True | ||||
self._asyncore_pre_connection = False | |||||
self.on_open() | self.on_open() | ||||
def handle_close(self): | def handle_close(self): | ||||
"""asyncore callback when a connection is closed.""" | """asyncore callback when a connection is closed.""" | ||||
logger.debug("Closing connection to: {}:{}".format( | logger.debug("Closing connection to: {}:{}".format( | ||||
self.dstaddr, self.dstport)) | self.dstaddr, self.dstport)) | ||||
self.state = "closed" | self._conn_open = False | ||||
self.recvbuf = b"" | self.recvbuf = b"" | ||||
self.sendbuf = b"" | self.sendbuf = b"" | ||||
try: | try: | ||||
self.close() | self.close() | ||||
except: | except: | ||||
pass | pass | ||||
self.on_close() | self.on_close() | ||||
def disconnect_node(self): | |||||
"""Disconnect the p2p connection. | |||||
Called by the test logic thread. Causes the p2p connection | |||||
to be disconnected on the next iteration of the asyncore loop.""" | |||||
self.disconnect = True | |||||
# Socket read methods | # Socket read methods | ||||
def handle_read(self): | def handle_read(self): | ||||
"""asyncore callback when data is read from the socket.""" | """asyncore callback when data is read from the socket.""" | ||||
with mininode_lock: | with mininode_lock: | ||||
t = self.recv(READ_BUFFER_SIZE) | t = self.recv(READ_BUFFER_SIZE) | ||||
if len(t) > 0: | if len(t) > 0: | ||||
self.recvbuf += t | self.recvbuf += t | ||||
▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | def on_message(self, message): | ||||
"""Callback for processing a P2P payload. Must be overridden by derived class.""" | """Callback for processing a P2P payload. Must be overridden by derived class.""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
# Socket write methods | # Socket write methods | ||||
def writable(self): | def writable(self): | ||||
"""asyncore method to determine whether the handle_write() callback should be called on the next loop.""" | """asyncore method to determine whether the handle_write() callback should be called on the next loop.""" | ||||
with mininode_lock: | with mininode_lock: | ||||
pre_connection = self.state == "connecting" | |||||
length = len(self.sendbuf) | length = len(self.sendbuf) | ||||
return (length > 0 or pre_connection) | return length > 0 or self._asyncore_pre_connection | ||||
def handle_write(self): | def handle_write(self): | ||||
"""asyncore callback when data should be written to the socket.""" | """asyncore callback when data should be written to the socket.""" | ||||
with mininode_lock: | with mininode_lock: | ||||
# asyncore does not expose socket connection, only the first read/write | # asyncore does not expose socket connection, only the first read/write | ||||
# event, thus we must check connection manually here to know when we | # event, thus we must check connection manually here to know when we | ||||
# actually connect | # actually connect | ||||
if self.state == "connecting": | if self._asyncore_pre_connection: | ||||
self.handle_connect() | self.handle_connect() | ||||
if not self.writable(): | if not self.writable(): | ||||
return | return | ||||
try: | try: | ||||
sent = self.send(self.sendbuf) | sent = self.send(self.sendbuf) | ||||
except: | except: | ||||
self.handle_close() | self.handle_close() | ||||
return | return | ||||
self.sendbuf = self.sendbuf[sent:] | self.sendbuf = self.sendbuf[sent:] | ||||
def format_message(self, message): | def send_message(self, message): | ||||
command = message.command | |||||
data = message.serialize() | |||||
tmsg = MAGIC_BYTES[self.network] | |||||
tmsg += command | |||||
tmsg += b"\x00" * (12 - len(command)) | |||||
tmsg += struct.pack("<I", len(data)) | |||||
th = sha256(data) | |||||
h = sha256(th) | |||||
tmsg += h[:4] | |||||
tmsg += data | |||||
return tmsg | |||||
def send_message(self, message, pushbuf=False): | |||||
"""Send a P2P message over the socket. | """Send a P2P message over the socket. | ||||
This method takes a P2P payload, builds the P2P header and adds | This method takes a P2P payload, builds the P2P header and adds | ||||
the message to the send buffer to be sent over the socket.""" | the message to the send buffer to be sent over the socket.""" | ||||
if self.state != "connected" and not pushbuf: | if not self.is_connected: | ||||
raise IOError('Not connected, no pushbuf') | raise IOError('Not connected') | ||||
self._log_message("send", message) | self._log_message("send", message) | ||||
tmsg = self.format_message(message) | tmsg = self._build_message(message) | ||||
self.send_raw_message(tmsg, pushbuf) | self.send_raw_message(tmsg) | ||||
def send_raw_message(self, tmsg, pushbuf=False): | def send_raw_message(self, tmsg): | ||||
"""Send any raw message over the socket. | """Send any raw message over the socket. | ||||
This method adds a raw message to the send buffer to be sent over the | This method adds a raw message to the send buffer to be sent over the | ||||
socket.""" | socket.""" | ||||
if self.state != "connected" and not pushbuf: | if not self.is_connected: | ||||
raise IOError('Not connected, no pushbuf') | raise IOError('Not connected') | ||||
with mininode_lock: | with mininode_lock: | ||||
if (len(self.sendbuf) == 0 and not pushbuf): | if len(self.sendbuf) == 0: | ||||
try: | try: | ||||
sent = self.send(tmsg) | sent = self.send(tmsg) | ||||
self.sendbuf = tmsg[sent:] | self.sendbuf = tmsg[sent:] | ||||
except BlockingIOError: | except BlockingIOError: | ||||
self.sendbuf = tmsg | self.sendbuf = tmsg | ||||
else: | else: | ||||
self.sendbuf += tmsg | self.sendbuf += tmsg | ||||
# Class utility methods | # Class utility methods | ||||
def _build_message(self, message): | |||||
"""Build a serialized P2P message""" | |||||
command = message.command | |||||
data = message.serialize() | |||||
tmsg = MAGIC_BYTES[self.network] | |||||
tmsg += command | |||||
tmsg += b"\x00" * (12 - len(command)) | |||||
tmsg += struct.pack("<I", len(data)) | |||||
th = sha256(data) | |||||
h = sha256(th) | |||||
tmsg += h[:4] | |||||
tmsg += data | |||||
return tmsg | |||||
def _log_message(self, direction, msg): | def _log_message(self, direction, msg): | ||||
"""Logs a message being sent or received over the connection.""" | """Logs a message being sent or received over the connection.""" | ||||
if direction == "send": | if direction == "send": | ||||
log_message = "Send message to " | log_message = "Send message to " | ||||
elif direction == "receive": | elif direction == "receive": | ||||
log_message = "Received message from " | log_message = "Received message from " | ||||
log_message += "{}:{}: {}".format( | log_message += "{}:{}: {}".format( | ||||
self.dstaddr, self.dstport, repr(msg)[:500]) | self.dstaddr, self.dstport, repr(msg)[:500]) | ||||
Show All 32 Lines | def peer_connect(self, *args, services=NODE_NETWORK, send_version=True, **kwargs): | ||||
if send_version: | if send_version: | ||||
# Send a version msg | # Send a version msg | ||||
vt = msg_version() | vt = msg_version() | ||||
vt.nServices = services | vt.nServices = services | ||||
vt.addrTo.ip = self.dstaddr | vt.addrTo.ip = self.dstaddr | ||||
vt.addrTo.port = self.dstport | vt.addrTo.port = self.dstport | ||||
vt.addrFrom.ip = "0.0.0.0" | vt.addrFrom.ip = "0.0.0.0" | ||||
vt.addrFrom.port = 0 | vt.addrFrom.port = 0 | ||||
self.send_message(vt, True) | # Will be sent right after handle_connect | ||||
self.sendbuf = self._build_message(vt) | |||||
# Message receiving methods | # Message receiving methods | ||||
def on_message(self, message): | def on_message(self, message): | ||||
"""Receive message and dispatch message to appropriate callback. | """Receive message and dispatch message to appropriate callback. | ||||
We keep a count of how many of each message type has been received | We keep a count of how many of each message type has been received | ||||
and the most recent message of each type.""" | and the most recent message of each type.""" | ||||
▲ Show 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | def on_version(self, message): | ||||
assert message.nVersion >= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format( | assert message.nVersion >= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format( | ||||
message.nVersion, MIN_VERSION_SUPPORTED) | message.nVersion, MIN_VERSION_SUPPORTED) | ||||
self.send_message(msg_verack()) | self.send_message(msg_verack()) | ||||
self.nServices = message.nServices | self.nServices = message.nServices | ||||
# Connection helper methods | # Connection helper methods | ||||
def wait_for_disconnect(self, timeout=60): | def wait_for_disconnect(self, timeout=60): | ||||
def test_function(): return self.state != "connected" | def test_function(): return not self.is_connected | ||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | wait_until(test_function, timeout=timeout, lock=mininode_lock) | ||||
# Message receiving helper methods | # Message receiving helper methods | ||||
def wait_for_block(self, blockhash, timeout=60): | def wait_for_block(self, blockhash, timeout=60): | ||||
def test_function(): return self.last_message.get( | def test_function(): return self.last_message.get( | ||||
"block") and self.last_message["block"].block.rehash() == blockhash | "block") and self.last_message["block"].block.rehash() == blockhash | ||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | wait_until(test_function, timeout=timeout, lock=mininode_lock) | ||||
▲ Show 20 Lines • Show All 241 Lines • Show Last 20 Lines |