diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index ff2601c6a..befe838ac 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -1,678 +1,678 @@ #!/usr/bin/env python3 # Copyright (c) 2015-2019 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the ZMQ notification interface.""" import struct from io import BytesIO from time import sleep from test_framework.address import ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE from test_framework.blocktools import ( create_block, create_coinbase, make_conform_to_ctor, ) from test_framework.messages import CTransaction, FromHex, hash256 from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal, assert_raises_rpc_error # Test may be skipped and not have zmq installed try: import zmq except ImportError: pass def hash256_reversed(byte_str): return hash256(byte_str)[::-1] class ZMQSubscriber: def __init__(self, socket, topic): # no sequence number received yet self.sequence = None self.socket = socket self.topic = topic self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) # Receive message from publisher and verify that topic and sequence match def _receive_from_publisher_and_check(self): topic, body, seq = self.socket.recv_multipart() # Topic should match the subscriber topic. assert_equal(topic, self.topic) # Sequence should be incremental. received_seq = struct.unpack('C : Blockhash connected <32-byte hash>D : Blockhash disconnected <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool """ self.log.info("Testing 'sequence' publisher") [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")]) self.disconnect_nodes(0, 1) # Mempool sequence number starts at 1 seq_num = 1 # Generate 1 block in nodes[0] and receive all notifications dc_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE, sync_fun=self.no_op)[0] # Note: We are not notified of any block transactions, coinbase or # mined assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence()) # Generate 2 blocks in nodes[1] to a different address to ensure # a chain split self.generatetoaddress( self.nodes[1], 2, ADDRESS_ECREG_P2SH_OP_TRUE, sync_fun=self.no_op) # nodes[0] will reorg chain after connecting back nodes[1] self.connect_nodes(0, 1) # Then we receive all block (dis)connect notifications for the # 2 block reorg assert_equal((dc_block, "D", None), seq.receive_sequence()) block_count = self.nodes[1].getblockcount() assert_equal((self.nodes[1].getblockhash(block_count - 1), "C", None), seq.receive_sequence()) assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence()) # Rest of test requires wallet functionality if self.is_wallet_compiled(): (block_hash, txid_to_be_replaced, replacement_txid ) = self.create_conflicting_tx() self.log.info( "Testing sequence notifications with mempool sequence values") # Should receive the initially broadcasted txid. assert_equal((txid_to_be_replaced, "A", seq_num), seq.receive_sequence()) seq_num += 1 self.log.info("Testing a tx removal notification") # Next we receive a notification for the transaction removal assert_equal((txid_to_be_replaced, "R", seq_num), seq.receive_sequence()) seq_num += 1 # Then we see the block notification assert_equal((block_hash, "C", None), seq.receive_sequence()) # There is no sequence notification for the transaction that was # never in node0's mempool, but it can be found in the block. assert replacement_txid in self.nodes[0].getblock(block_hash)["tx"] self.log.info("Wait for tx from second node") payment_txid = self.nodes[1].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=5_000_000) self.sync_all() assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) seq_num += 1 # Doesn't get published when mined, make a block and tx to "flush" # the possibility though the mempool sequence number does go up by # the number of transactions removed from the mempool by the block # mining it. mempool_size = len(self.nodes[0].getrawmempool()) c_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE)[0] # Make sure the number of mined transactions matches the number of # txs out of mempool mempool_size_delta = mempool_size - \ len(self.nodes[0].getrawmempool()) assert_equal(len(self.nodes[0].getblock(c_block)["tx"]) - 1, mempool_size_delta) seq_num += mempool_size_delta payment_txid_2 = self.nodes[1].sendtoaddress( self.nodes[0].getnewaddress(), 1_000_000) self.sync_all() assert_equal((c_block, "C", None), seq.receive_sequence()) assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence()) seq_num += 1 # Spot check getrawmempool results that they only show up when # asked for assert isinstance(self.nodes[0].getrawmempool(), list) assert isinstance( self.nodes[0].getrawmempool(mempool_sequence=False), list) assert "mempool_sequence" not in self.nodes[0].getrawmempool( verbose=True) assert_raises_rpc_error( -8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True) assert_equal(self.nodes[0].getrawmempool( mempool_sequence=True)["mempool_sequence"], seq_num) self.log.info("Testing reorg notifications") # Manually invalidate the last block to test mempool re-entry # N.B. This part could be made more lenient in exact ordering # since it greatly depends on inner-workings of blocks/mempool # during "deep" re-orgs. Probably should "re-construct" # blockchain/mempool state from notifications instead. block_count = self.nodes[0].getblockcount() best_hash = self.nodes[0].getbestblockhash() self.nodes[0].invalidateblock(best_hash) # Bit of room to make sure transaction things happened sleep(2) # Make sure getrawmempool mempool_sequence results aren't "queued" # but immediately reflective of the time they were gathered. assert self.nodes[0].getrawmempool( mempool_sequence=True)["mempool_sequence"] > seq_num assert_equal((payment_txid_2, "R", seq_num), seq.receive_sequence()) seq_num += 1 assert_equal((best_hash, "D", None), seq.receive_sequence()) assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) seq_num += 1 # Other things may happen but aren't wallet-deterministic so we # don't test for them currently self.nodes[0].reconsiderblock(best_hash) self.generatetoaddress(self.nodes[1], 1, ADDRESS_ECREG_UNSPENDABLE) self.log.info("Evict mempool transaction by block conflict") orig_txid = self.nodes[0].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=1_000_000) # More to be simply mined more_tx = [] for _ in range(5): more_tx.append(self.nodes[0].sendtoaddress( self.nodes[0].getnewaddress(), 100_000)) raw_tx = self.nodes[0].getrawtransaction(orig_txid) block = create_block( int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount() + 1)) tx = FromHex(CTransaction(), raw_tx) block.vtx.append(tx) for txid in more_tx: tx = FromHex(CTransaction(), self.nodes[0].getrawtransaction(txid)) block.vtx.append(tx) make_conform_to_ctor(block) block.hashMerkleRoot = block.calc_merkle_root() block.solve() assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None) tip = self.nodes[0].getbestblockhash() assert_equal(int(tip, 16), block.sha256) orig_txid_2 = self.nodes[0].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=1_000_000) # Flush old notifications until evicted tx original entry (hash_str, label, mempool_seq) = seq.receive_sequence() while hash_str != orig_txid: (hash_str, label, mempool_seq) = seq.receive_sequence() mempool_seq += 1 # Added original tx assert_equal(label, "A") # More transactions to be simply mined for i in range(len(more_tx)): assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence()) mempool_seq += 1 # Removed RBF tests mempool_seq += 1 assert_equal((tip, "C", None), seq.receive_sequence()) mempool_seq += len(more_tx) # Last tx assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence()) mempool_seq += 1 self.generatetoaddress(self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE) # want to make sure we didn't break "consensus" for other tests self.sync_all() def test_mempool_sync(self): """ Use sequence notification plus getrawmempool sequence results to "sync mempool" """ if not self.is_wallet_compiled(): self.log.info("Skipping mempool sync test") return self.log.info("Testing 'mempool sync' usage of sequence notifier") [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")]) # In-memory counter, should always start at 1 next_mempool_seq = self.nodes[0].getrawmempool( mempool_sequence=True)["mempool_sequence"] assert_equal(next_mempool_seq, 1) # Some transactions have been happening but we aren't consuming # zmq notifications yet or we lost a ZMQ message somehow and want # to start over txids = [] num_txs = 5 for _ in range(num_txs): txids.append(self.nodes[1].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=1_000_000)) self.sync_all() # 1) Consume backlog until we get a mempool sequence number (hash_str, label, zmq_mem_seq) = seq.receive_sequence() while zmq_mem_seq is None: (hash_str, label, zmq_mem_seq) = seq.receive_sequence() assert label == "A" assert hash_str is not None # 2) We need to "seed" our view of the mempool mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) mempool_view = set(mempool_snapshot["txids"]) get_raw_seq = mempool_snapshot["mempool_sequence"] assert_equal(get_raw_seq, 6) # Snapshot may be too old compared to zmq message we read off latest while zmq_mem_seq >= get_raw_seq: sleep(2) mempool_snapshot = self.nodes[0].getrawmempool( mempool_sequence=True) mempool_view = set(mempool_snapshot["txids"]) get_raw_seq = mempool_snapshot["mempool_sequence"] # Things continue to happen in the "interim" while waiting for # snapshot results for _ in range(num_txs): txids.append(self.nodes[0].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=1_000_000)) self.sync_all() self.create_conflicting_tx() self.generatetoaddress(self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE) final_txid = self.nodes[0].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=100_000) # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot while True: if zmq_mem_seq == get_raw_seq - 1: break (hash_str, label, mempool_sequence) = seq.receive_sequence() if mempool_sequence is not None: zmq_mem_seq = mempool_sequence if zmq_mem_seq > get_raw_seq: raise Exception( f"We somehow jumped mempool sequence numbers! " f"zmq_mem_seq: {zmq_mem_seq} > " f"get_raw_seq: {get_raw_seq}") # 4) Moving forward, we apply the delta to our local view # remaining txs + conflict (A, R, C) + 1 block connect + 1 final tx expected_sequence = get_raw_seq for _ in range(num_txs + 3 + 1 + 1): (hash_str, label, mempool_sequence) = seq.receive_sequence() if label == "A": assert hash_str not in mempool_view mempool_view.add(hash_str) expected_sequence = mempool_sequence + 1 elif label == "R": assert hash_str in mempool_view mempool_view.remove(hash_str) expected_sequence = mempool_sequence + 1 elif label == "C": # (Attempt to) remove all txids from known block connects block_txids = self.nodes[0].getblock(hash_str)["tx"][1:] for txid in block_txids: if txid in mempool_view: expected_sequence += 1 mempool_view.remove(txid) elif label == "D": # Not useful for mempool tracking per se continue else: raise Exception("Unexpected ZMQ sequence label!") assert_equal(self.nodes[0].getrawmempool(), [final_txid]) assert_equal( self.nodes[0].getrawmempool( mempool_sequence=True)["mempool_sequence"], expected_sequence) # 5) If you miss a zmq/mempool sequence number, go back to step (2) self.generatetoaddress(self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE) def test_multiple_interfaces(self): # Set up two subscribers with different addresses # (note that after the reorg test, syncing would fail due to different # chain lengths on node0 and node1; for this test we only need node0, so # we can disable syncing blocks on the setup) subscribers = self.setup_zmq_test([ ("hashblock", "tcp://127.0.0.1:28334"), ("hashblock", "tcp://127.0.0.1:28335"), ], sync_blocks=False) # Generate 1 block in nodes[0] and receive all notifications self.generatetoaddress( self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE, sync_fun=self.no_op) # Should receive the same block hash on both subscribers assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex()) assert_equal(self.nodes[0].getbestblockhash(), subscribers[1].receive().hex()) if __name__ == '__main__': ZMQTest().main()