Changeset View
Changeset View
Standalone View
Standalone View
test/functional/test_framework/messages.py
- This file was copied from test/functional/test_framework/mininode.py.
Property | Old Value | New Value |
---|---|---|
File Mode | 100755 | 100644 |
#!/usr/bin/env python3 | #!/usr/bin/env python3 | ||||
# Copyright (c) 2010 ArtForz -- public domain half-a-node | # Copyright (c) 2010 ArtForz -- public domain half-a-node | ||||
# Copyright (c) 2012 Jeff Garzik | # Copyright (c) 2012 Jeff Garzik | ||||
# Copyright (c) 2010-2016 The Bitcoin Core developers | # Copyright (c) 2010-2017 The Bitcoin Core developers | ||||
# Distributed under the MIT software license, see the accompanying | # Distributed under the MIT software license, see the accompanying | ||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php. | # file COPYING or http://www.opensource.org/licenses/mit-license.php. | ||||
"""Bitcoin P2P network half-a-node. | """Bitcoin test framework primitive and message strcutures | ||||
This python code was modified from ArtForz' public domain half-a-node, as | |||||
found in the mini-node branch of http://github.com/jgarzik/pynode. | |||||
NodeConn: an object which manages p2p connectivity to a bitcoin node | |||||
NodeConnCB: a base class that describes the interface for receiving | |||||
callbacks with network messages from a NodeConn | |||||
CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....: | CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....: | ||||
data structures that should map to corresponding structures in | data structures that should map to corresponding structures in | ||||
bitcoin/primitives | bitcoin/primitives | ||||
msg_block, msg_tx, msg_headers, etc.: | msg_block, msg_tx, msg_headers, etc.: | ||||
data structures that represent network messages | data structures that represent network messages | ||||
ser_*, deser_*: functions that handle serialization/deserialization | |||||
""" | |||||
import asyncore | ser_*, deser_*: functions that handle serialization/deserialization.""" | ||||
from codecs import encode | from codecs import encode | ||||
from collections import defaultdict | |||||
import copy | import copy | ||||
import hashlib | import hashlib | ||||
from io import BytesIO | from io import BytesIO | ||||
import logging | |||||
import random | import random | ||||
import socket | import socket | ||||
import struct | import struct | ||||
import sys | |||||
import time | import time | ||||
from threading import RLock, Thread | |||||
from test_framework.siphash import siphash256 | from test_framework.siphash import siphash256 | ||||
from test_framework.cdefs import MAX_BLOCK_SIGOPS_PER_MB | |||||
from test_framework.util import hex_str_to_bytes, bytes_to_hex_str, wait_until | from test_framework.util import hex_str_to_bytes, bytes_to_hex_str, wait_until | ||||
MIN_VERSION_SUPPORTED = 60001 | MIN_VERSION_SUPPORTED = 60001 | ||||
MY_VERSION = 70014 # past bip-31 for ping/pong | # past bip-31 for ping/pong | ||||
MY_VERSION = 70014 | |||||
MY_SUBVERSION = b"/python-mininode-tester:0.0.3/" | MY_SUBVERSION = b"/python-mininode-tester:0.0.3/" | ||||
# from version 70001 onwards, fRelay should be appended to version messages (BIP37) | # from version 70001 onwards, fRelay should be appended to version messages (BIP37) | ||||
MY_RELAY = 1 | MY_RELAY = 1 | ||||
MAX_INV_SZ = 50000 | MAX_INV_SZ = 50000 | ||||
MAX_BLOCK_BASE_SIZE = 1000000 | |||||
COIN = 100000000 # 1 btc in satoshis | # 1 BCH in satoshis | ||||
COIN = 100000000 | |||||
NODE_NETWORK = (1 << 0) | NODE_NETWORK = (1 << 0) | ||||
# NODE_GETUTXO = (1 << 1) | # NODE_GETUTXO = (1 << 1) | ||||
# NODE_BLOOM = (1 << 2) | # NODE_BLOOM = (1 << 2) | ||||
NODE_WITNESS = (1 << 3) | NODE_WITNESS = (1 << 3) | ||||
NODE_XTHIN = (1 << 4) | NODE_XTHIN = (1 << 4) | ||||
NODE_BITCOIN_CASH = (1 << 5) | NODE_BITCOIN_CASH = (1 << 5) | ||||
# Howmuch data will be read from the network at once | # Howmuch data will be read from the network at once | ||||
READ_BUFFER_SIZE = 8192 | READ_BUFFER_SIZE = 8192 | ||||
logger = logging.getLogger("TestFramework.mininode") | |||||
# Keep our own socket map for asyncore, so that we can track disconnects | |||||
# ourselves (to workaround an issue with closing an asyncore socket when | |||||
# using select) | |||||
mininode_socket_map = dict() | |||||
# One lock for synchronizing all data access between the networking thread (see | |||||
# NetworkThread below) and the thread running the test logic. For simplicity, | |||||
# NodeConn acquires this lock whenever delivering a message to a NodeConnCB, | |||||
# and whenever adding anything to the send buffer (in send_message()). This | |||||
# lock should be acquired in the thread running the test logic to synchronize | |||||
# access to any data shared with the NodeConnCB or NodeConn. | |||||
mininode_lock = RLock() | |||||
# Serialization/deserialization tools | # Serialization/deserialization tools | ||||
def sha256(s): | def sha256(s): | ||||
return hashlib.new('sha256', s).digest() | return hashlib.new('sha256', s).digest() | ||||
def ripemd160(s): | def ripemd160(s): | ||||
▲ Show 20 Lines • Show All 425 Lines • ▼ Show 20 Lines | class CBlockHeader(): | ||||
def __repr__(self): | def __repr__(self): | ||||
return "CBlockHeader(nVersion=%i hashPrevBlock=%064x hashMerkleRoot=%064x nTime=%s nBits=%08x nNonce=%08x)" \ | return "CBlockHeader(nVersion=%i hashPrevBlock=%064x hashMerkleRoot=%064x nTime=%s nBits=%08x nNonce=%08x)" \ | ||||
% (self.nVersion, self.hashPrevBlock, self.hashMerkleRoot, | % (self.nVersion, self.hashPrevBlock, self.hashMerkleRoot, | ||||
time.ctime(self.nTime), self.nBits, self.nNonce) | time.ctime(self.nTime), self.nBits, self.nNonce) | ||||
class CBlock(CBlockHeader): | class CBlock(CBlockHeader): | ||||
def __init__(self, header=None): | def __init__(self, header=None): | ||||
super(CBlock, self).__init__(header) | super(CBlock, self).__init__(header) | ||||
self.vtx = [] | self.vtx = [] | ||||
def deserialize(self, f): | def deserialize(self, f): | ||||
super(CBlock, self).deserialize(f) | super(CBlock, self).deserialize(f) | ||||
self.vtx = deser_vector(f, CTransaction) | self.vtx = deser_vector(f, CTransaction) | ||||
▲ Show 20 Lines • Show All 693 Lines • ▼ Show 20 Lines | class msg_blocktxn(): | ||||
def serialize(self): | def serialize(self): | ||||
r = b"" | r = b"" | ||||
r += self.block_transactions.serialize() | r += self.block_transactions.serialize() | ||||
return r | return r | ||||
def __repr__(self): | def __repr__(self): | ||||
return "msg_blocktxn(block_transactions=%s)" % (repr(self.block_transactions)) | return "msg_blocktxn(block_transactions=%s)" % (repr(self.block_transactions)) | ||||
class NodeConnCB(): | |||||
"""Callback and helper functions for P2P connection to a bitcoind node. | |||||
Individual testcases should subclass this and override the on_* methods | |||||
if they want to alter message handling behaviour.""" | |||||
def __init__(self): | |||||
# Track whether we have a P2P connection open to the node | |||||
self.connected = False | |||||
self.connection = None | |||||
# Track number of messages of each type received and the most recent | |||||
# message of each type | |||||
self.message_count = defaultdict(int) | |||||
self.last_message = {} | |||||
# A count of the number of ping messages we've sent to the node | |||||
self.ping_counter = 1 | |||||
# Message receiving methods | |||||
def deliver(self, conn, message): | |||||
"""Receive message and dispatch message to appropriate callback. | |||||
We keep a count of how many of each message type has been received | |||||
and the most recent message of each type.""" | |||||
with mininode_lock: | |||||
try: | |||||
command = message.command.decode('ascii') | |||||
self.message_count[command] += 1 | |||||
self.last_message[command] = message | |||||
getattr(self, 'on_' + command)(conn, message) | |||||
except: | |||||
print("ERROR delivering %s (%s)" % (repr(message), | |||||
sys.exc_info()[0])) | |||||
raise | |||||
# Callback methods. Can be overridden by subclasses in individual test | |||||
# cases to provide custom message handling behaviour. | |||||
def on_open(self, conn): | |||||
self.connected = True | |||||
def on_close(self, conn): | |||||
self.connected = False | |||||
self.connection = None | |||||
def on_addr(self, conn, message): pass | |||||
def on_block(self, conn, message): pass | |||||
def on_blocktxn(self, conn, message): pass | |||||
def on_cmpctblock(self, conn, message): pass | |||||
def on_feefilter(self, conn, message): pass | |||||
def on_getaddr(self, conn, message): pass | |||||
def on_getblocks(self, conn, message): pass | |||||
def on_getblocktxn(self, conn, message): pass | |||||
def on_getdata(self, conn, message): pass | |||||
def on_getheaders(self, conn, message): pass | |||||
def on_headers(self, conn, message): pass | |||||
def on_mempool(self, conn): pass | |||||
def on_pong(self, conn, message): pass | |||||
def on_reject(self, conn, message): pass | |||||
def on_sendcmpct(self, conn, message): pass | |||||
def on_sendheaders(self, conn, message): pass | |||||
def on_tx(self, conn, message): pass | |||||
def on_inv(self, conn, message): | |||||
want = msg_getdata() | |||||
for i in message.inv: | |||||
if i.type != 0: | |||||
want.inv.append(i) | |||||
if len(want.inv): | |||||
conn.send_message(want) | |||||
def on_ping(self, conn, message): | |||||
conn.send_message(msg_pong(message.nonce)) | |||||
def on_verack(self, conn, message): | |||||
conn.ver_recv = conn.ver_send | |||||
self.verack_received = True | |||||
def on_version(self, conn, message): | |||||
assert message.nVersion >= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format( | |||||
message.nVersion, MIN_VERSION_SUPPORTED) | |||||
conn.send_message(msg_verack()) | |||||
conn.nServices = message.nServices | |||||
# Connection helper methods | |||||
def add_connection(self, conn): | |||||
self.connection = conn | |||||
def wait_for_disconnect(self, timeout=60): | |||||
def test_function(): return not self.connected | |||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | |||||
# Message receiving helper methods | |||||
def wait_for_block(self, blockhash, timeout=60): | |||||
def test_function(): return self.last_message.get( | |||||
"block") and self.last_message["block"].block.rehash() == blockhash | |||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | |||||
def wait_for_getdata(self, timeout=60): | |||||
def test_function(): return self.last_message.get("getdata") | |||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | |||||
def wait_for_getheaders(self, timeout=60): | |||||
def test_function(): return self.last_message.get("getheaders") | |||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | |||||
def wait_for_inv(self, expected_inv, timeout=60): | |||||
"""Waits for an INV message and checks that the first inv object in the message was as expected.""" | |||||
if len(expected_inv) > 1: | |||||
raise NotImplementedError( | |||||
"wait_for_inv() will only verify the first inv object") | |||||
def test_function(): return self.last_message.get("inv") and \ | |||||
self.last_message["inv"].inv[0].type == expected_inv[0].type and \ | |||||
self.last_message["inv"].inv[0].hash == expected_inv[0].hash | |||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | |||||
def wait_for_verack(self, timeout=60): | |||||
def test_function(): return self.message_count["verack"] | |||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | |||||
# Message sending helper functions | |||||
def send_message(self, message): | |||||
if self.connection: | |||||
self.connection.send_message(message) | |||||
else: | |||||
logger.error("Cannot send message. No connection to node!") | |||||
def send_and_ping(self, message): | |||||
self.send_message(message) | |||||
self.sync_with_ping() | |||||
# Sync up with the node | |||||
def sync_with_ping(self, timeout=60): | |||||
self.send_message(msg_ping(nonce=self.ping_counter)) | |||||
def test_function(): | |||||
if not self.last_message.get("pong"): | |||||
return False | |||||
return self.last_message["pong"].nonce == self.ping_counter | |||||
wait_until(test_function, timeout=timeout, lock=mininode_lock) | |||||
self.ping_counter += 1 | |||||
class NodeConn(asyncore.dispatcher): | |||||
"""The actual NodeConn class | |||||
This class provides an interface for a p2p connection to a specified node.""" | |||||
messagemap = { | |||||
b"version": msg_version, | |||||
b"verack": msg_verack, | |||||
b"addr": msg_addr, | |||||
b"inv": msg_inv, | |||||
b"getdata": msg_getdata, | |||||
b"getblocks": msg_getblocks, | |||||
b"tx": msg_tx, | |||||
b"block": msg_block, | |||||
b"getaddr": msg_getaddr, | |||||
b"ping": msg_ping, | |||||
b"pong": msg_pong, | |||||
b"headers": msg_headers, | |||||
b"getheaders": msg_getheaders, | |||||
b"reject": msg_reject, | |||||
b"mempool": msg_mempool, | |||||
b"feefilter": msg_feefilter, | |||||
b"sendheaders": msg_sendheaders, | |||||
b"sendcmpct": msg_sendcmpct, | |||||
b"cmpctblock": msg_cmpctblock, | |||||
b"getblocktxn": msg_getblocktxn, | |||||
b"blocktxn": msg_blocktxn | |||||
} | |||||
MAGIC_BYTES = { | |||||
"mainnet": b"\xe3\xe1\xf3\xe8", | |||||
"testnet3": b"\xf4\xe5\xf3\xf4", | |||||
"regtest": b"\xda\xb5\xbf\xfa", | |||||
} | |||||
def __init__(self, dstaddr, dstport, rpc, callback, net="regtest", services=NODE_NETWORK, send_version=True): | |||||
asyncore.dispatcher.__init__(self, map=mininode_socket_map) | |||||
self.dstaddr = dstaddr | |||||
self.dstport = dstport | |||||
self.create_socket(socket.AF_INET, socket.SOCK_STREAM) | |||||
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | |||||
self.sendbuf = b"" | |||||
self.recvbuf = b"" | |||||
self.ver_send = 209 | |||||
self.ver_recv = 209 | |||||
self.last_sent = 0 | |||||
self.state = "connecting" | |||||
self.network = net | |||||
self.cb = callback | |||||
self.disconnect = False | |||||
self.nServices = 0 | |||||
if send_version: | |||||
# stuff version msg into sendbuf | |||||
vt = msg_version() | |||||
vt.nServices = services | |||||
vt.addrTo.ip = self.dstaddr | |||||
vt.addrTo.port = self.dstport | |||||
vt.addrFrom.ip = "0.0.0.0" | |||||
vt.addrFrom.port = 0 | |||||
self.send_message(vt, True) | |||||
logger.info('Connecting to Bitcoin Node: %s:%d' % | |||||
(self.dstaddr, self.dstport)) | |||||
try: | |||||
self.connect((dstaddr, dstport)) | |||||
except: | |||||
self.handle_close() | |||||
self.rpc = rpc | |||||
def handle_connect(self): | |||||
if self.state != "connected": | |||||
logger.debug("Connected & Listening: %s:%d" % | |||||
(self.dstaddr, self.dstport)) | |||||
self.state = "connected" | |||||
self.cb.on_open(self) | |||||
def handle_close(self): | |||||
logger.debug("Closing connection to: %s:%d" % | |||||
(self.dstaddr, self.dstport)) | |||||
self.state = "closed" | |||||
self.recvbuf = b"" | |||||
self.sendbuf = b"" | |||||
try: | |||||
self.close() | |||||
except: | |||||
pass | |||||
self.cb.on_close(self) | |||||
def handle_read(self): | |||||
with mininode_lock: | |||||
t = self.recv(READ_BUFFER_SIZE) | |||||
if len(t) > 0: | |||||
self.recvbuf += t | |||||
while True: | |||||
msg = self.got_data() | |||||
if msg == None: | |||||
break | |||||
self.got_message(msg) | |||||
def readable(self): | |||||
return True | |||||
def writable(self): | |||||
with mininode_lock: | |||||
pre_connection = self.state == "connecting" | |||||
length = len(self.sendbuf) | |||||
return (length > 0 or pre_connection) | |||||
def handle_write(self): | |||||
with mininode_lock: | |||||
# asyncore does not expose socket connection, only the first read/write | |||||
# event, thus we must check connection manually here to know when we | |||||
# actually connect | |||||
if self.state == "connecting": | |||||
self.handle_connect() | |||||
if not self.writable(): | |||||
return | |||||
try: | |||||
sent = self.send(self.sendbuf) | |||||
except: | |||||
self.handle_close() | |||||
return | |||||
self.sendbuf = self.sendbuf[sent:] | |||||
def got_data(self): | |||||
try: | |||||
with mininode_lock: | |||||
if len(self.recvbuf) < 4: | |||||
return None | |||||
if self.recvbuf[:4] != self.MAGIC_BYTES[self.network]: | |||||
raise ValueError("got garbage %s" % repr(self.recvbuf)) | |||||
if len(self.recvbuf) < 4 + 12 + 4 + 4: | |||||
return | |||||
command = self.recvbuf[4:4+12].split(b"\x00", 1)[0] | |||||
msglen = struct.unpack("<i", self.recvbuf[4+12:4+12+4])[0] | |||||
checksum = self.recvbuf[4+12+4:4+12+4+4] | |||||
if len(self.recvbuf) < 4 + 12 + 4 + 4 + msglen: | |||||
return | |||||
msg = self.recvbuf[4+12+4+4:4+12+4+4+msglen] | |||||
h = sha256(sha256(msg)) | |||||
if checksum != h[:4]: | |||||
raise ValueError("got bad checksum " + repr(self.recvbuf)) | |||||
self.recvbuf = self.recvbuf[4+12+4+4+msglen:] | |||||
if command not in self.messagemap: | |||||
raise ValueError("Received unknown command from %s:%d: '%s' %s" % ( | |||||
self.dstaddr, self.dstport, command, repr(msg))) | |||||
f = BytesIO(msg) | |||||
m = self.messagemap[command]() | |||||
m.deserialize(f) | |||||
return m | |||||
except Exception as e: | |||||
logger.exception('Error reading message:', repr(e)) | |||||
raise | |||||
def send_message(self, message, pushbuf=False): | |||||
if self.state != "connected" and not pushbuf: | |||||
raise IOError('Not connected, no pushbuf') | |||||
self._log_message("send", message) | |||||
command = message.command | |||||
data = message.serialize() | |||||
tmsg = self.MAGIC_BYTES[self.network] | |||||
tmsg += command | |||||
tmsg += b"\x00" * (12 - len(command)) | |||||
tmsg += struct.pack("<I", len(data)) | |||||
th = sha256(data) | |||||
h = sha256(th) | |||||
tmsg += h[:4] | |||||
tmsg += data | |||||
with mininode_lock: | |||||
if (len(self.sendbuf) == 0 and not pushbuf): | |||||
try: | |||||
sent = self.send(tmsg) | |||||
self.sendbuf = tmsg[sent:] | |||||
except BlockingIOError: | |||||
self.sendbuf = tmsg | |||||
else: | |||||
self.sendbuf += tmsg | |||||
self.last_sent = time.time() | |||||
def got_message(self, message): | |||||
if self.last_sent + 30 * 60 < time.time(): | |||||
self.send_message(self.messagemap[b'ping']()) | |||||
self._log_message("receive", message) | |||||
self.cb.deliver(self, message) | |||||
def _log_message(self, direction, msg): | |||||
if direction == "send": | |||||
log_message = "Send message to " | |||||
elif direction == "receive": | |||||
log_message = "Received message from " | |||||
log_message += "%s:%d: %s" % (self.dstaddr, | |||||
self.dstport, repr(msg)[:500]) | |||||
if len(log_message) > 500: | |||||
log_message += "... (msg truncated)" | |||||
logger.debug(log_message) | |||||
def disconnect_node(self): | |||||
self.disconnect = True | |||||
class NetworkThread(Thread): | |||||
def run(self): | |||||
while mininode_socket_map: | |||||
# We check for whether to disconnect outside of the asyncore | |||||
# loop to workaround the behavior of asyncore when using | |||||
# select | |||||
disconnected = [] | |||||
for fd, obj in mininode_socket_map.items(): | |||||
if obj.disconnect: | |||||
disconnected.append(obj) | |||||
[obj.handle_close() for obj in disconnected] | |||||
asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) | |||||
logger.debug("Network thread closing") |