diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -11,9 +11,24 @@ ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE, ) -from test_framework.messages import CTransaction, hash256 +from test_framework.blocktools import ( + create_block, + create_coinbase, + make_conform_to_ctor, +) +from test_framework.messages import CTransaction, FromHex, hash256 from test_framework.test_framework import BitcoinTestFramework -from test_framework.util import assert_equal, connect_nodes +from test_framework.util import ( + assert_equal, + assert_raises_rpc_error, + connect_nodes, +) + +# Test may be skipped and not have zmq installed +try: + import zmq +except ImportError: + pass def hash256_reversed(byte_str): @@ -26,7 +41,6 @@ self.socket = socket self.topic = topic - import zmq self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) def receive(self): @@ -38,6 +52,23 @@ self.sequence += 1 return body + def receive_sequence(self): + topic, body, seq = self.socket.recv_multipart() + # Topic should match the subscriber topic. + assert_equal(topic, self.topic) + # Sequence should be incremental. + assert_equal(struct.unpack('C : Blockhash connected + <32-byte hash>D : Blockhash disconnected + <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool + for non-block inclusion reason + <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool + """ + self.log.info("Testing 'sequence' publisher") + address = 'tcp://127.0.0.1:28333' + socket = self.ctx.socket(zmq.SUB) + socket.set(zmq.RCVTIMEO, 60000) + seq = ZMQSubscriber(socket, b'sequence') + + self.restart_node(0, [f'-zmqpub{seq.topic.decode()}={address}']) + socket.connect(address) + # Relax so that the subscriber is ready before publishing zmq messages + sleep(0.2) + + # Mempool sequence number starts at 1 + seq_num = 1 + + # Generate 1 block in nodes[0] and receive all notifications + dc_block = self.nodes[0].generatetoaddress( + 1, ADDRESS_ECREG_UNSPENDABLE)[0] + + # Note: We are not notified of any block transactions, coinbase or + # mined + assert_equal((self.nodes[0].getbestblockhash(), "C", None), + seq.receive_sequence()) + + # Generate 2 blocks in nodes[1] to a different address to ensure + # a chain split + self.nodes[1].generatetoaddress(2, ADDRESS_ECREG_P2SH_OP_TRUE) + + # nodes[0] will reorg chain after connecting back nodes[1] + connect_nodes(self.nodes[0], self.nodes[1]) + + # Then we receive all block (dis)connect notifications for the + # 2 block reorg + assert_equal((dc_block, "D", None), seq.receive_sequence()) + block_count = self.nodes[1].getblockcount() + assert_equal((self.nodes[1].getblockhash(block_count - 1), "C", None), + seq.receive_sequence()) + assert_equal((self.nodes[1].getblockhash(block_count), "C", None), + seq.receive_sequence()) + + # Rest of test requires wallet functionality + if self.is_wallet_compiled(): + self.log.info("Wait for tx from second node") + payment_txid = self.nodes[1].sendtoaddress( + address=self.nodes[0].getnewaddress(), amount=5_000_000) + self.sync_all() + self.log.info( + "Testing sequence notifications with mempool sequence values") + + # Should receive the broadcasted txid. + assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) + seq_num += 1 + + # Removed RBF tests + + # Doesn't get published when mined, make a block and tx to "flush" + # the possibility though the mempool sequence number does go up by + # the number of transactions removed from the mempool by the block + # mining it. + mempool_size = len(self.nodes[0].getrawmempool()) + c_block = self.nodes[0].generatetoaddress( + 1, ADDRESS_ECREG_UNSPENDABLE)[0] + self.sync_all() + # Make sure the number of mined transactions matches the number of + # txs out of mempool + mempool_size_delta = mempool_size - \ + len(self.nodes[0].getrawmempool()) + assert_equal(len(self.nodes[0].getblock(c_block)["tx"]) - 1, + mempool_size_delta) + seq_num += mempool_size_delta + payment_txid_2 = self.nodes[1].sendtoaddress( + self.nodes[0].getnewaddress(), 1_000_000) + self.sync_all() + assert_equal((c_block, "C", None), seq.receive_sequence()) + assert_equal((payment_txid_2, "A", seq_num), + seq.receive_sequence()) + seq_num += 1 + + # Spot check getrawmempool results that they only show up when + # asked for + assert isinstance(self.nodes[0].getrawmempool(), list) + assert isinstance( + self.nodes[0].getrawmempool(mempool_sequence=False), + list) + assert "mempool_sequence" not in self.nodes[0].getrawmempool( + verbose=True) + assert_raises_rpc_error( + -8, "Verbose results cannot contain mempool sequence values.", + self.nodes[0].getrawmempool, True, True) + assert_equal(self.nodes[0].getrawmempool( + mempool_sequence=True)["mempool_sequence"], + seq_num) + + self.log.info("Testing reorg notifications") + # Manually invalidate the last block to test mempool re-entry + # N.B. This part could be made more lenient in exact ordering + # since it greatly depends on inner-workings of blocks/mempool + # during "deep" re-orgs. Probably should "re-construct" + # blockchain/mempool state from notifications instead. + block_count = self.nodes[0].getblockcount() + best_hash = self.nodes[0].getbestblockhash() + self.nodes[0].invalidateblock(best_hash) + # Bit of room to make sure transaction things happened + sleep(2) + + # Make sure getrawmempool mempool_sequence results aren't "queued" + # but immediately reflective of the time they were gathered. + assert self.nodes[0].getrawmempool( + mempool_sequence=True)["mempool_sequence"] > seq_num + + assert_equal((best_hash, "D", None), seq.receive_sequence()) + assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) + seq_num += 1 + + # Other things may happen but aren't wallet-deterministic so we + # don't test for them currently + self.nodes[0].reconsiderblock(best_hash) + self.nodes[1].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) + self.sync_all() + + self.log.info("Evict mempool transaction by block conflict") + orig_txid = self.nodes[0].sendtoaddress( + address=self.nodes[0].getnewaddress(), amount=1_000_000) + + # More to be simply mined + more_tx = [] + for _ in range(5): + more_tx.append(self.nodes[0].sendtoaddress( + self.nodes[0].getnewaddress(), 100_000)) + + raw_tx = self.nodes[0].getrawtransaction(orig_txid) + block = create_block( + int(self.nodes[0].getbestblockhash(), 16), + create_coinbase(self.nodes[0].getblockcount() + 1)) + tx = FromHex(CTransaction(), raw_tx) + block.vtx.append(tx) + for txid in more_tx: + tx = FromHex(CTransaction(), + self.nodes[0].getrawtransaction(txid)) + block.vtx.append(tx) + make_conform_to_ctor(block) + block.hashMerkleRoot = block.calc_merkle_root() + block.solve() + assert_equal(self.nodes[0].submitblock(block.serialize().hex()), + None) + tip = self.nodes[0].getbestblockhash() + assert_equal(int(tip, 16), block.sha256) + orig_txid_2 = self.nodes[0].sendtoaddress( + address=self.nodes[0].getnewaddress(), amount=1_000_000) + + # Flush old notifications until evicted tx original entry + (hash_str, label, mempool_seq) = seq.receive_sequence() + while hash_str != orig_txid: + (hash_str, label, mempool_seq) = seq.receive_sequence() + mempool_seq += 1 + + # Added original tx + assert_equal(label, "A") + # More transactions to be simply mined + for i in range(len(more_tx)): + assert_equal((more_tx[i], "A", mempool_seq), + seq.receive_sequence()) + mempool_seq += 1 + + # Removed RBF tests + + mempool_seq += 1 + assert_equal((tip, "C", None), seq.receive_sequence()) + mempool_seq += len(more_tx) + # Last tx + assert_equal((orig_txid_2, "A", mempool_seq), + seq.receive_sequence()) + mempool_seq += 1 + self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) + # want to make sure we didn't break "consensus" for other tests + self.sync_all() + + def test_mempool_sync(self): + """ + Use sequence notification plus getrawmempool sequence results to + "sync mempool" + """ + if not self.is_wallet_compiled(): + self.log.info("Skipping mempool sync test") + return + + self.log.info("Testing 'mempool sync' usage of sequence notifier") + address = 'tcp://127.0.0.1:28333' + socket = self.ctx.socket(zmq.SUB) + socket.set(zmq.RCVTIMEO, 60000) + seq = ZMQSubscriber(socket, b'sequence') + + self.restart_node(0, [f'-zmqpub{seq.topic.decode()}={address}']) + connect_nodes(self.nodes[0], self.nodes[1]) + socket.connect(address) + # Relax so that the subscriber is ready before publishing zmq messages + sleep(0.2) + + # In-memory counter, should always start at 1 + next_mempool_seq = self.nodes[0].getrawmempool( + mempool_sequence=True)["mempool_sequence"] + assert_equal(next_mempool_seq, 1) + + # Some transactions have been happening but we aren't consuming + # zmq notifications yet or we lost a ZMQ message somehow and want + # to start over + txids = [] + num_txs = 5 + for _ in range(num_txs): + txids.append(self.nodes[1].sendtoaddress( + address=self.nodes[0].getnewaddress(), amount=1_000_000)) + self.sync_all() + + # 1) Consume backlog until we get a mempool sequence number + (hash_str, label, zmq_mem_seq) = seq.receive_sequence() + while zmq_mem_seq is None: + (hash_str, label, zmq_mem_seq) = seq.receive_sequence() + + assert label == "A" + assert hash_str is not None + + # 2) We need to "seed" our view of the mempool + mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) + mempool_view = set(mempool_snapshot["txids"]) + get_raw_seq = mempool_snapshot["mempool_sequence"] + assert_equal(get_raw_seq, 6) + # Snapshot may be too old compared to zmq message we read off latest + while zmq_mem_seq >= get_raw_seq: + sleep(2) + mempool_snapshot = self.nodes[0].getrawmempool( + mempool_sequence=True) + mempool_view = set(mempool_snapshot["txids"]) + get_raw_seq = mempool_snapshot["mempool_sequence"] + + # Things continue to happen in the "interim" while waiting for + # snapshot results + for _ in range(num_txs): + txids.append(self.nodes[0].sendtoaddress( + address=self.nodes[0].getnewaddress(), amount=1_000_000)) + self.sync_all() + self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) + final_txid = self.nodes[0].sendtoaddress( + address=self.nodes[0].getnewaddress(), amount=100_000) + + # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot + while True: + if zmq_mem_seq == get_raw_seq - 1: + break + (hash_str, label, mempool_sequence) = seq.receive_sequence() + if mempool_sequence is not None: + zmq_mem_seq = mempool_sequence + if zmq_mem_seq > get_raw_seq: + raise Exception( + f"We somehow jumped mempool sequence numbers! " + f"zmq_mem_seq: {zmq_mem_seq} > " + f"get_raw_seq: {get_raw_seq}") + + # 4) Moving forward, we apply the delta to our local view + # remaining txs(5) + 1 block connect + 1 final tx + expected_sequence = get_raw_seq + for _ in range(num_txs + 1 + 1): + (hash_str, label, mempool_sequence) = seq.receive_sequence() + if label == "A": + assert hash_str not in mempool_view + mempool_view.add(hash_str) + expected_sequence = mempool_sequence + 1 + elif label == "C": + # (Attempt to) remove all txids from known block connects + block_txids = self.nodes[0].getblock(hash_str)["tx"][1:] + for txid in block_txids: + if txid in mempool_view: + expected_sequence += 1 + mempool_view.remove(txid) + elif label == "D": + # Not useful for mempool tracking per se + continue + else: + raise Exception("Unexpected ZMQ sequence label!") + + assert_equal(self.nodes[0].getrawmempool(), [final_txid]) + assert_equal( + self.nodes[0].getrawmempool( + mempool_sequence=True)["mempool_sequence"], + expected_sequence) + + # 5) If you miss a zmq/mempool sequence number, go back to step (2) + + self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) + if __name__ == '__main__': ZMQTest().main()