diff --git a/test/functional/p2p_filter.py b/test/functional/p2p_filter.py new file mode 100755 index 000000000..1e1e53258 --- /dev/null +++ b/test/functional/p2p_filter.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# Copyright (c) 2020 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +""" +Test BIP 37 +""" + +from test_framework.messages import ( + MSG_BLOCK, + MSG_FILTERED_BLOCK, + msg_getdata, + msg_filterload, +) +from test_framework.mininode import ( + P2PInterface, + mininode_lock, +) +from test_framework.test_framework import BitcoinTestFramework + + +class FilterNode(P2PInterface): + # This is a P2SH watch-only wallet + watch_script_pubkey = 'a914ffffffffffffffffffffffffffffffffffffffff87' + # The initial filter (n=10, fp=0.000001) with just the above scriptPubKey + # added + watch_filter_init = msg_filterload( + data=b'@\x00\x08\x00\x80\x00\x00 \x00\xc0\x00 \x04\x00\x08$\x00\x04\x80\x00\x00 \x00\x00\x00\x00\x80\x00\x00@\x00\x02@ \x00', + nHashFuncs=19, + nTweak=0, + nFlags=1, + ) + + def on_inv(self, message): + want = msg_getdata() + for i in message.inv: + # inv messages can only contain TX or BLOCK, so translate BLOCK to + # FILTERED_BLOCK + if i.type == MSG_BLOCK: + i.type = MSG_FILTERED_BLOCK + want.inv.append(i) + if len(want.inv): + self.send_message(want) + + def on_merkleblock(self, message): + self.merkleblock_received = True + + def on_tx(self, message): + self.tx_received = True + + +class FilterTest(BitcoinTestFramework): + def set_test_params(self): + self.setup_clean_chain = False + self.num_nodes = 1 + self.extra_args = [[ + '-peerbloomfilters', + '-whitelist=noban@127.0.0.1', # immediate tx relay + ]] + + def skip_test_if_missing_module(self): + self.skip_if_no_wallet() + + def run_test(self): + self.log.info('Add filtered P2P connection to the node') + filter_node = self.nodes[0].add_p2p_connection(FilterNode()) + filter_node.send_message(filter_node.watch_filter_init) + filter_node.sync_with_ping() + filter_address = self.nodes[0].decodescript( + filter_node.watch_script_pubkey)['addresses'][0] + + self.log.info( + 'Check that we receive merkleblock and tx if the filter matches a tx in a block') + filter_node.merkleblock_received = False + block_hash = self.nodes[0].generatetoaddress(1, filter_address)[0] + txid = self.nodes[0].getblock(block_hash)['tx'][0] + filter_node.wait_for_tx(txid) + assert filter_node.merkleblock_received + + self.log.info( + 'Check that we only receive a merkleblock if the filter does not match a tx in a block') + with mininode_lock: + filter_node.last_message.pop("merkleblock", None) + filter_node.tx_received = False + self.nodes[0].generatetoaddress(1, self.nodes[0].getnewaddress()) + filter_node.wait_for_merkleblock() + assert not filter_node.tx_received + + self.log.info( + 'Check that we not receive a tx if the filter does not match a mempool tx') + filter_node.merkleblock_received = False + filter_node.tx_received = False + self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 90) + filter_node.sync_with_ping() + filter_node.sync_with_ping() + assert not filter_node.merkleblock_received + assert not filter_node.tx_received + + self.log.info( + 'Check that we receive a tx in reply to a mempool msg if the filter matches a mempool tx') + filter_node.merkleblock_received = False + txid = self.nodes[0].sendtoaddress(filter_address, 90) + filter_node.wait_for_tx(txid) + assert not filter_node.merkleblock_received + + +if __name__ == '__main__': + FilterTest().main() diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py index 4bd0c1954..8d9bbbdd7 100755 --- a/test/functional/test_framework/messages.py +++ b/test/functional/test_framework/messages.py @@ -1,1476 +1,1514 @@ #!/usr/bin/env python3 # Copyright (c) 2010 ArtForz -- public domain half-a-node # Copyright (c) 2012 Jeff Garzik # Copyright (c) 2010-2019 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Bitcoin test framework primitive and message structures CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....: data structures that should map to corresponding structures in bitcoin/primitives msg_block, msg_tx, msg_headers, etc.: data structures that represent network messages ser_*, deser_*: functions that handle serialization/deserialization. Classes use __slots__ to ensure extraneous attributes aren't accidentally added by tests, compromising their intended effect. """ from codecs import encode import copy import hashlib from io import BytesIO import random import socket import struct import time from test_framework.siphash import siphash256 from test_framework.util import hex_str_to_bytes MIN_VERSION_SUPPORTED = 60001 # past bip-31 for ping/pong MY_VERSION = 70014 MY_SUBVERSION = b"/python-mininode-tester:0.0.3/" # from version 70001 onwards, fRelay should be appended to version # messages (BIP37) MY_RELAY = 1 MAX_INV_SZ = 50000 MAX_LOCATOR_SZ = 101 MAX_BLOCK_BASE_SIZE = 1000000 # 1 BCH in satoshis COIN = 100000000 NODE_NETWORK = (1 << 0) # NODE_GETUTXO = (1 << 1) NODE_BLOOM = (1 << 2) # NODE_WITNESS = (1 << 3) NODE_XTHIN = (1 << 4) NODE_BITCOIN_CASH = (1 << 5) NODE_NETWORK_LIMITED = (1 << 10) NODE_AVALANCHE = (1 << 24) MSG_TX = 1 MSG_BLOCK = 2 +MSG_FILTERED_BLOCK = 3 MSG_CMPCTBLOCK = 4 MSG_TYPE_MASK = 0xffffffff >> 2 # Serialization/deserialization tools def sha256(s): return hashlib.new('sha256', s).digest() def ripemd160(s): return hashlib.new('ripemd160', s).digest() def hash256(s): return sha256(sha256(s)) def ser_compact_size(size): r = b"" if size < 253: r = struct.pack("B", size) elif size < 0x10000: r = struct.pack(">= 32 return rs def uint256_from_str(s): r = 0 t = struct.unpack("> 24) & 0xFF v = (c & 0xFFFFFF) << (8 * (nbytes - 3)) return v def deser_vector(f, c): nit = deser_compact_size(f) r = [] for i in range(nit): t = c() t.deserialize(f) r.append(t) return r # ser_function_name: Allow for an alternate serialization function on the # entries in the vector. def ser_vector(v, ser_function_name=None): r = ser_compact_size(len(v)) for i in v: if ser_function_name: r += getattr(i, ser_function_name)() else: r += i.serialize() return r def deser_uint256_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = deser_uint256(f) r.append(t) return r def ser_uint256_vector(v): r = ser_compact_size(len(v)) for i in v: r += ser_uint256(i) return r def deser_string_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = deser_string(f) r.append(t) return r def ser_string_vector(v): r = ser_compact_size(len(v)) for sv in v: r += ser_string(sv) return r # Deserialize from a hex string representation (eg from RPC) def FromHex(obj, hex_string): obj.deserialize(BytesIO(hex_str_to_bytes(hex_string))) return obj # Convert a binary-serializable object to hex (eg for submission via RPC) def ToHex(obj): return obj.serialize().hex() # Objects that map to bitcoind objects, which can be serialized/deserialized class CAddress: __slots__ = ("ip", "nServices", "pchReserved", "port", "time") def __init__(self): self.time = 0 self.nServices = 1 self.pchReserved = b"\x00" * 10 + b"\xff" * 2 self.ip = "0.0.0.0" self.port = 0 def deserialize(self, f, with_time=True): if with_time: self.time = struct.unpack("H", f.read(2))[0] def serialize(self, with_time=True): r = b"" if with_time: r += struct.pack("H", self.port) return r def __repr__(self): return "CAddress(nServices={} ip={} port={})".format( self.nServices, self.ip, self.port) class CInv: __slots__ = ("hash", "type") typemap = { 0: "Error", - 1: "TX", - 2: "Block", - 4: "CompactBlock" + MSG_TX: "TX", + MSG_BLOCK: "Block", + MSG_FILTERED_BLOCK: "filtered Block", + MSG_CMPCTBLOCK: "CompactBlock" } def __init__(self, t=0, h=0): self.type = t self.hash = h def deserialize(self, f): self.type = struct.unpack(" 21000000 * COIN: return False return True def __repr__(self): return "CTransaction(nVersion={} vin={} vout={} nLockTime={})".format( self.nVersion, repr(self.vin), repr(self.vout), self.nLockTime) class CBlockHeader: __slots__ = ("hash", "hashMerkleRoot", "hashPrevBlock", "nBits", "nNonce", "nTime", "nVersion", "sha256") def __init__(self, header=None): if header is None: self.set_null() else: self.nVersion = header.nVersion self.hashPrevBlock = header.hashPrevBlock self.hashMerkleRoot = header.hashMerkleRoot self.nTime = header.nTime self.nBits = header.nBits self.nNonce = header.nNonce self.sha256 = header.sha256 self.hash = header.hash self.calc_sha256() def set_null(self): self.nVersion = 1 self.hashPrevBlock = 0 self.hashMerkleRoot = 0 self.nTime = 0 self.nBits = 0 self.nNonce = 0 self.sha256 = None self.hash = None def deserialize(self, f): self.nVersion = struct.unpack(" 1: newhashes = [] for i in range(0, len(hashes), 2): i2 = min(i + 1, len(hashes) - 1) newhashes.append(hash256(hashes[i] + hashes[i2])) hashes = newhashes return uint256_from_str(hashes[0]) def calc_merkle_root(self): hashes = [] for tx in self.vtx: tx.calc_sha256() hashes.append(ser_uint256(tx.sha256)) return self.get_merkle_root(hashes) def is_valid(self): self.calc_sha256() target = uint256_from_compact(self.nBits) if self.sha256 > target: return False for tx in self.vtx: if not tx.is_valid(): return False if self.calc_merkle_root() != self.hashMerkleRoot: return False return True def solve(self): self.rehash() target = uint256_from_compact(self.nBits) while self.sha256 > target: self.nNonce += 1 self.rehash() def __repr__(self): return "CBlock(nVersion={} hashPrevBlock={:064x} hashMerkleRoot={:064x} nTime={} nBits={:08x} nNonce={:08x} vtx={})".format( self.nVersion, self.hashPrevBlock, self.hashMerkleRoot, self.nTime, self.nBits, self.nNonce, repr(self.vtx)) class PrefilledTransaction: __slots__ = ("index", "tx") def __init__(self, index=0, tx=None): self.index = index self.tx = tx def deserialize(self, f): self.index = deser_compact_size(f) self.tx = CTransaction() self.tx.deserialize(f) def serialize(self): r = b"" r += ser_compact_size(self.index) r += self.tx.serialize() return r def __repr__(self): return "PrefilledTransaction(index={}, tx={})".format( self.index, repr(self.tx)) # This is what we send on the wire, in a cmpctblock message. class P2PHeaderAndShortIDs: __slots__ = ("header", "nonce", "prefilled_txn", "prefilled_txn_length", "shortids", "shortids_length") def __init__(self): self.header = CBlockHeader() self.nonce = 0 self.shortids_length = 0 self.shortids = [] self.prefilled_txn_length = 0 self.prefilled_txn = [] def deserialize(self, f): self.header.deserialize(f) self.nonce = struct.unpack("= 70001: # Relay field is optional for version 70001 onwards try: self.nRelay = struct.unpack(" class msg_headers: __slots__ = ("headers",) command = b"headers" def __init__(self, headers=None): self.headers = headers if headers is not None else [] def deserialize(self, f): # comment in bitcoind indicates these should be deserialized as blocks blocks = deser_vector(f, CBlock) for x in blocks: self.headers.append(CBlockHeader(x)) def serialize(self): blocks = [CBlock(x) for x in self.headers] return ser_vector(blocks) def __repr__(self): return "msg_headers(headers={})".format(repr(self.headers)) class msg_reject: __slots__ = ("code", "data", "message", "reason") command = b"reject" REJECT_MALFORMED = 1 def __init__(self): self.message = b"" self.code = 0 self.reason = b"" self.data = 0 def deserialize(self, f): self.message = deser_string(f) self.code = struct.unpack(" 0: self.recvbuf += t while True: msg = self._on_data() if msg is None: break self.on_message(msg) def _on_data(self): """Try to read P2P messages from the recv buffer. This method reads data from the buffer in a loop. It deserializes, parses and verifies the P2P header, then passes the P2P payload to the on_message callback for processing.""" try: with mininode_lock: if len(self.recvbuf) < 4: return None if self.recvbuf[:4] != self.magic_bytes: raise ValueError( "got garbage {}".format(repr(self.recvbuf))) if len(self.recvbuf) < 4 + 12 + 4 + 4: return None command = self.recvbuf[4:4 + 12].split(b"\x00", 1)[0] msglen = struct.unpack( " 500: log_message += "... (msg truncated)" logger.debug(log_message) class P2PInterface(P2PConnection): """A high-level P2P interface class for communicating with a Bitcoin Cash node. This class provides high-level callbacks for processing P2P message payloads, as well as convenience methods for interacting with the node over P2P. Individual testcases should subclass this and override the on_* methods if they want to alter message handling behaviour.""" def __init__(self): super().__init__() # Track number of messages of each type received and the most recent # message of each type self.message_count = defaultdict(int) self.last_message = {} # A count of the number of ping messages we've sent to the node self.ping_counter = 1 # The network services received from the peer self.nServices = 0 def peer_connect(self, *args, services=NODE_NETWORK, send_version=True, **kwargs): create_conn = super().peer_connect(*args, **kwargs) if send_version: # Send a version msg vt = msg_version() vt.nServices = services vt.addrTo.ip = self.dstaddr vt.addrTo.port = self.dstport vt.addrFrom.ip = "0.0.0.0" vt.addrFrom.port = 0 # Will be sent soon after connection_made self.on_connection_send_msg = vt return create_conn # Message receiving methods def on_message(self, message): """Receive message and dispatch message to appropriate callback. We keep a count of how many of each message type has been received and the most recent message of each type.""" with mininode_lock: try: command = message.command.decode('ascii') self.message_count[command] += 1 self.last_message[command] = message getattr(self, 'on_' + command)(message) except Exception: print("ERROR delivering {} ({})".format( repr(message), sys.exc_info()[0])) raise # Callback methods. Can be overridden by subclasses in individual test # cases to provide custom message handling behaviour. def on_open(self): pass def on_close(self): pass def on_addr(self, message): pass def on_avapoll(self, message): pass def on_avaresponse(self, message): pass def on_block(self, message): pass def on_blocktxn(self, message): pass def on_cmpctblock(self, message): pass def on_feefilter(self, message): pass + def on_filterload(self, message): pass + def on_getaddr(self, message): pass def on_getblocks(self, message): pass def on_getblocktxn(self, message): pass def on_getdata(self, message): pass def on_getheaders(self, message): pass def on_headers(self, message): pass def on_mempool(self, message): pass + def on_merkleblock(self, message): pass + def on_notfound(self, message): pass def on_pong(self, message): pass def on_reject(self, message): pass def on_sendcmpct(self, message): pass def on_sendheaders(self, message): pass def on_tx(self, message): pass def on_inv(self, message): want = msg_getdata() for i in message.inv: if i.type != 0: want.inv.append(i) if len(want.inv): self.send_message(want) def on_ping(self, message): self.send_message(msg_pong(message.nonce)) def on_verack(self, message): self.verack_received = True def on_version(self, message): assert message.nVersion >= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format( message.nVersion, MIN_VERSION_SUPPORTED) self.send_message(msg_verack()) self.nServices = message.nServices # Connection helper methods def wait_for_disconnect(self, timeout=60): def test_function(): return not self.is_connected wait_until(test_function, timeout=timeout, lock=mininode_lock) # Message receiving helper methods def wait_for_tx(self, txid, timeout=60): def test_function(): if not self.last_message.get('tx'): return False return self.last_message['tx'].tx.rehash() == txid wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_block(self, blockhash, timeout=60): def test_function(): return self.last_message.get( "block") and self.last_message["block"].block.rehash() == blockhash wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_header(self, blockhash, timeout=60): def test_function(): last_headers = self.last_message.get('headers') if not last_headers: return False return last_headers.headers[0].rehash() == blockhash wait_until(test_function, timeout=timeout, lock=mininode_lock) + def wait_for_merkleblock(self, timeout=60): + def test_function(): + assert self.is_connected + last_filtered_block = self.last_message.get('merkleblock') + if not last_filtered_block: + return False + # TODO change this method to take a hash value and only return true + # if the correct block has been received + return True + + wait_until(test_function, timeout=timeout, lock=mininode_lock) + def wait_for_getdata(self, timeout=60): """Waits for a getdata message. Receiving any getdata message will satisfy the predicate. the last_message["getdata"] value must be explicitly cleared before calling this method, or this will return immediately with success. TODO: change this method to take a hash value and only return true if the correct block/tx has been requested.""" def test_function(): return self.last_message.get("getdata") wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_getheaders(self, timeout=60): """Waits for a getheaders message. Receiving any getheaders message will satisfy the predicate. the last_message["getheaders"] value must be explicitly cleared before calling this method, or this will return immediately with success. TODO: change this method to take a hash value and only return true if the correct block header has been requested.""" def test_function(): return self.last_message.get("getheaders") wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_inv(self, expected_inv, timeout=60): """Waits for an INV message and checks that the first inv object in the message was as expected.""" if len(expected_inv) > 1: raise NotImplementedError( "wait_for_inv() will only verify the first inv object") def test_function(): return self.last_message.get("inv") and \ self.last_message["inv"].inv[0].type == expected_inv[0].type and \ self.last_message["inv"].inv[0].hash == expected_inv[0].hash wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_verack(self, timeout=60): def test_function(): return self.message_count["verack"] wait_until(test_function, timeout=timeout, lock=mininode_lock) # Message sending helper functions def send_and_ping(self, message, timeout=60): self.send_message(message) self.sync_with_ping(timeout=timeout) # Sync up with the node def sync_with_ping(self, timeout=60): self.send_message(msg_ping(nonce=self.ping_counter)) def test_function(): if not self.last_message.get("pong"): return False return self.last_message["pong"].nonce == self.ping_counter wait_until(test_function, timeout=timeout, lock=mininode_lock) self.ping_counter += 1 # One lock for synchronizing all data access between the networking thread (see # NetworkThread below) and the thread running the test logic. For simplicity, # P2PConnection acquires this lock whenever delivering a message to a P2PInterface. # This lock should be acquired in the thread running the test logic to synchronize # access to any data shared with the P2PInterface or P2PConnection. mininode_lock = threading.RLock() class NetworkThread(threading.Thread): network_event_loop = None def __init__(self): super().__init__(name="NetworkThread") # There is only one event loop and no more than one thread must be # created assert not self.network_event_loop NetworkThread.network_event_loop = asyncio.new_event_loop() def run(self): """Start the network thread.""" self.network_event_loop.run_forever() def close(self, timeout=10): """Close the connections and network event loop.""" self.network_event_loop.call_soon_threadsafe( self.network_event_loop.stop) wait_until(lambda: not self.network_event_loop.is_running(), timeout=timeout) self.network_event_loop.close() self.join(timeout) class P2PDataStore(P2PInterface): """A P2P data store class. Keeps a block and transaction store and responds correctly to getdata and getheaders requests.""" def __init__(self): super().__init__() # store of blocks. key is block hash, value is a CBlock object self.block_store = {} self.last_block_hash = '' # store of txs. key is txid, value is a CTransaction object self.tx_store = {} self.getdata_requests = [] def on_getdata(self, message): """Check for the tx/block in our stores and if found, reply with an inv message.""" for inv in message.inv: self.getdata_requests.append(inv.hash) if (inv.type & MSG_TYPE_MASK) == MSG_TX and inv.hash in self.tx_store.keys(): self.send_message(msg_tx(self.tx_store[inv.hash])) elif (inv.type & MSG_TYPE_MASK) == MSG_BLOCK and inv.hash in self.block_store.keys(): self.send_message(msg_block(self.block_store[inv.hash])) else: logger.debug( 'getdata message type {} received.'.format(hex(inv.type))) def on_getheaders(self, message): """Search back through our block store for the locator, and reply with a headers message if found.""" locator, hash_stop = message.locator, message.hashstop # Assume that the most recent block added is the tip if not self.block_store: return headers_list = [self.block_store[self.last_block_hash]] maxheaders = 2000 while headers_list[-1].sha256 not in locator.vHave: # Walk back through the block store, adding headers to headers_list # as we go. prev_block_hash = headers_list[-1].hashPrevBlock if prev_block_hash in self.block_store: prev_block_header = CBlockHeader( self.block_store[prev_block_hash]) headers_list.append(prev_block_header) if prev_block_header.sha256 == hash_stop: # if this is the hashstop header, stop here break else: logger.debug('block hash {} not found in block store'.format( hex(prev_block_hash))) break # Truncate the list if there are too many headers headers_list = headers_list[:-maxheaders - 1:-1] response = msg_headers(headers_list) if response is not None: self.send_message(response) def send_blocks_and_test(self, blocks, node, *, success=True, force_send=False, reject_reason=None, expect_disconnect=False, timeout=60): """Send blocks to test node and test whether the tip advances. - add all blocks to our block_store - send a headers message for the final block - the on_getheaders handler will ensure that any getheaders are responded to - if force_send is False: wait for getdata for each of the blocks. The on_getdata handler will ensure that any getdata messages are responded to. Otherwise send the full block unsolicited. - if success is True: assert that the node's tip advances to the most recent block - if success is False: assert that the node's tip doesn't advance - if reject_reason is set: assert that the correct reject message is logged""" with mininode_lock: for block in blocks: self.block_store[block.sha256] = block self.last_block_hash = block.sha256 def test(): if force_send: for b in blocks: self.send_message(msg_block(block=b)) else: self.send_message(msg_headers([CBlockHeader(blocks[-1])])) wait_until( lambda: blocks[-1].sha256 in self.getdata_requests, timeout=timeout, lock=mininode_lock) if expect_disconnect: self.wait_for_disconnect(timeout=timeout) else: self.sync_with_ping(timeout=timeout) if success: wait_until(lambda: node.getbestblockhash() == blocks[-1].hash, timeout=timeout) else: assert node.getbestblockhash() != blocks[-1].hash if reject_reason: with node.assert_debug_log(expected_msgs=[reject_reason]): test() else: test() def send_txs_and_test(self, txs, node, *, success=True, expect_disconnect=False, reject_reason=None): """Send txs to test node and test whether they're accepted to the mempool. - add all txs to our tx_store - send tx messages for all txs - if success is True/False: assert that the txs are/are not accepted to the mempool - if expect_disconnect is True: Skip the sync with ping - if reject_reason is set: assert that the correct reject message is logged.""" with mininode_lock: for tx in txs: self.tx_store[tx.sha256] = tx def test(): for tx in txs: self.send_message(msg_tx(tx)) if expect_disconnect: self.wait_for_disconnect() else: self.sync_with_ping() raw_mempool = node.getrawmempool() if success: # Check that all txs are now in the mempool for tx in txs: assert tx.hash in raw_mempool, "{} not found in mempool".format( tx.hash) else: # Check that none of the txs are now in the mempool for tx in txs: assert tx.hash not in raw_mempool, "{} tx found in mempool".format( tx.hash) if reject_reason: with node.assert_debug_log(expected_msgs=[reject_reason]): test() else: test()