Changeset View
Changeset View
Standalone View
Standalone View
test/functional/interface_usdt_net.py
Show First 20 Lines • Show All 95 Lines • ▼ Show 20 Lines | def run_test(self): | ||||
("peer_addr", ctypes.c_char * MAX_PEER_ADDR_LENGTH), | ("peer_addr", ctypes.c_char * MAX_PEER_ADDR_LENGTH), | ||||
("peer_conn_type", ctypes.c_char * MAX_PEER_CONN_TYPE_LENGTH), | ("peer_conn_type", ctypes.c_char * MAX_PEER_CONN_TYPE_LENGTH), | ||||
("msg_type", ctypes.c_char * MAX_MSG_TYPE_LENGTH), | ("msg_type", ctypes.c_char * MAX_MSG_TYPE_LENGTH), | ||||
("msg_size", ctypes.c_uint64), | ("msg_size", ctypes.c_uint64), | ||||
("msg", ctypes.c_ubyte * MAX_MSG_DATA_LENGTH), | ("msg", ctypes.c_ubyte * MAX_MSG_DATA_LENGTH), | ||||
] | ] | ||||
def __repr__(self): | def __repr__(self): | ||||
return f"P2PMessage(peer={self.peer_id}, addr={self.peer_addr.decode('utf-8')}, conn_type={self.peer_conn_type.decode('utf-8')}, msg_type={self.msg_type.decode('utf-8')}, msg_size={self.msg_size})" | return ( | ||||
f"P2PMessage(peer={self.peer_id}," | |||||
f" addr={self.peer_addr.decode('utf-8')}," | |||||
f" conn_type={self.peer_conn_type.decode('utf-8')}," | |||||
f" msg_type={self.msg_type.decode('utf-8')}," | |||||
f" msg_size={self.msg_size})" | |||||
) | |||||
self.log.info( | self.log.info( | ||||
"hook into the net:inbound_message and net:outbound_message tracepoints") | "hook into the net:inbound_message and net:outbound_message tracepoints" | ||||
) | |||||
ctx = USDT(pid=self.nodes[0].process.pid) | ctx = USDT(pid=self.nodes[0].process.pid) | ||||
ctx.enable_probe(probe="net:inbound_message", | ctx.enable_probe(probe="net:inbound_message", fn_name="trace_inbound_message") | ||||
fn_name="trace_inbound_message") | ctx.enable_probe(probe="net:outbound_message", fn_name="trace_outbound_message") | ||||
ctx.enable_probe(probe="net:outbound_message", | |||||
fn_name="trace_outbound_message") | |||||
bpf = BPF(text=net_tracepoints_program, usdt_contexts=[ctx], debug=0) | bpf = BPF(text=net_tracepoints_program, usdt_contexts=[ctx], debug=0) | ||||
# The handle_* function is a ctypes callback function called from C. When | # The handle_* function is a ctypes callback function called from C. When | ||||
# we assert in the handle_* function, the AssertError doesn't propagate | # we assert in the handle_* function, the AssertError doesn't propagate | ||||
# back to Python. The exception is ignored. We manually count and assert | # back to Python. The exception is ignored. We manually count and assert | ||||
# that the handle_* functions succeeded. | # that the handle_* functions succeeded. | ||||
EXPECTED_INOUTBOUND_VERSION_MSG = 1 | EXPECTED_INOUTBOUND_VERSION_MSG = 1 | ||||
checked_inbound_version_msg = 0 | checked_inbound_version_msg = 0 | ||||
checked_outbound_version_msg = 0 | checked_outbound_version_msg = 0 | ||||
def check_p2p_message(event, inbound): | def check_p2p_message(event, inbound): | ||||
nonlocal checked_inbound_version_msg, checked_outbound_version_msg | nonlocal checked_inbound_version_msg, checked_outbound_version_msg | ||||
if event.msg_type.decode("utf-8") == "version": | if event.msg_type.decode("utf-8") == "version": | ||||
self.log.info( | self.log.info( | ||||
f"check_p2p_message(): {'inbound' if inbound else 'outbound'} {event}") | "check_p2p_message():" | ||||
f" {'inbound' if inbound else 'outbound'} {event}" | |||||
) | |||||
peer = self.nodes[0].getpeerinfo()[0] | peer = self.nodes[0].getpeerinfo()[0] | ||||
msg = msg_version() | msg = msg_version() | ||||
msg.deserialize(BytesIO(bytes(event.msg[:event.msg_size]))) | msg.deserialize(BytesIO(bytes(event.msg[: event.msg_size]))) | ||||
assert_equal(peer["id"], event.peer_id, peer["id"]) | assert_equal(peer["id"], event.peer_id, peer["id"]) | ||||
assert_equal(peer["addr"], event.peer_addr.decode("utf-8")) | assert_equal(peer["addr"], event.peer_addr.decode("utf-8")) | ||||
assert_equal(peer["connection_type"], | assert_equal( | ||||
event.peer_conn_type.decode("utf-8")) | peer["connection_type"], event.peer_conn_type.decode("utf-8") | ||||
) | |||||
if inbound: | if inbound: | ||||
checked_inbound_version_msg += 1 | checked_inbound_version_msg += 1 | ||||
else: | else: | ||||
checked_outbound_version_msg += 1 | checked_outbound_version_msg += 1 | ||||
def handle_inbound(_, data, __): | def handle_inbound(_, data, __): | ||||
event = ctypes.cast(data, ctypes.POINTER(P2PMessage)).contents | event = ctypes.cast(data, ctypes.POINTER(P2PMessage)).contents | ||||
check_p2p_message(event, True) | check_p2p_message(event, True) | ||||
def handle_outbound(_, data, __): | def handle_outbound(_, data, __): | ||||
event = ctypes.cast(data, ctypes.POINTER(P2PMessage)).contents | event = ctypes.cast(data, ctypes.POINTER(P2PMessage)).contents | ||||
check_p2p_message(event, False) | check_p2p_message(event, False) | ||||
bpf["inbound_messages"].open_perf_buffer(handle_inbound) | bpf["inbound_messages"].open_perf_buffer(handle_inbound) | ||||
bpf["outbound_messages"].open_perf_buffer(handle_outbound) | bpf["outbound_messages"].open_perf_buffer(handle_outbound) | ||||
self.log.info("connect a P2P test node to our bitcoind node") | self.log.info("connect a P2P test node to our bitcoind node") | ||||
test_node = P2PInterface() | test_node = P2PInterface() | ||||
self.nodes[0].add_p2p_connection(test_node) | self.nodes[0].add_p2p_connection(test_node) | ||||
bpf.perf_buffer_poll(timeout=200) | bpf.perf_buffer_poll(timeout=200) | ||||
self.log.info( | self.log.info("check that we got both an inbound and outbound version message") | ||||
"check that we got both an inbound and outbound version message") | assert_equal(EXPECTED_INOUTBOUND_VERSION_MSG, checked_inbound_version_msg) | ||||
assert_equal(EXPECTED_INOUTBOUND_VERSION_MSG, | assert_equal(EXPECTED_INOUTBOUND_VERSION_MSG, checked_outbound_version_msg) | ||||
checked_inbound_version_msg) | |||||
assert_equal(EXPECTED_INOUTBOUND_VERSION_MSG, | |||||
checked_outbound_version_msg) | |||||
bpf.cleanup() | bpf.cleanup() | ||||
if __name__ == '__main__': | if __name__ == "__main__": | ||||
NetTracepointTest().main() | NetTracepointTest().main() |