Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F14362694
D10306.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
16 KB
Subscribers
None
D10306.diff
View Options
diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py
--- a/test/functional/interface_zmq.py
+++ b/test/functional/interface_zmq.py
@@ -11,9 +11,24 @@
ADDRESS_ECREG_P2SH_OP_TRUE,
ADDRESS_ECREG_UNSPENDABLE,
)
-from test_framework.messages import CTransaction, hash256
+from test_framework.blocktools import (
+ create_block,
+ create_coinbase,
+ make_conform_to_ctor,
+)
+from test_framework.messages import CTransaction, FromHex, hash256
from test_framework.test_framework import BitcoinTestFramework
-from test_framework.util import assert_equal, connect_nodes
+from test_framework.util import (
+ assert_equal,
+ assert_raises_rpc_error,
+ connect_nodes,
+)
+
+# Test may be skipped and not have zmq installed
+try:
+ import zmq
+except ImportError:
+ pass
def hash256_reversed(byte_str):
@@ -26,7 +41,6 @@
self.socket = socket
self.topic = topic
- import zmq
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
def receive(self):
@@ -38,6 +52,23 @@
self.sequence += 1
return body
+ def receive_sequence(self):
+ topic, body, seq = self.socket.recv_multipart()
+ # Topic should match the subscriber topic.
+ assert_equal(topic, self.topic)
+ # Sequence should be incremental.
+ assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
+ self.sequence += 1
+ hash = body[:32].hex()
+ label = chr(body[32])
+ mempool_sequence = None if len(
+ body) != 32 + 1 + 8 else struct.unpack("<Q", body[32 + 1:])[0]
+ if mempool_sequence is not None:
+ assert label == "A" or label == "R"
+ else:
+ assert label == "D" or label == "C"
+ return (hash, label, mempool_sequence)
+
class ZMQTest (BitcoinTestFramework):
def set_test_params(self):
@@ -48,10 +79,11 @@
self.skip_if_no_bitcoind_zmq()
def run_test(self):
- import zmq
self.ctx = zmq.Context()
try:
self.test_basic()
+ self.test_sequence()
+ self.test_mempool_sync()
self.test_reorg()
finally:
# Destroy the ZMQ context.
@@ -59,7 +91,6 @@
self.ctx.destroy(linger=None)
def test_basic(self):
- import zmq
# Invalid zmq arguments don't take down the node, see #17185.
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
@@ -152,7 +183,6 @@
self.log.info("Skipping reorg test because wallet is disabled")
return
- import zmq
address = 'tcp://127.0.0.1:28333'
services = [b"hashblock", b"hashtx"]
@@ -230,6 +260,305 @@
self.nodes[1].getblock(
connect_blocks[0])["tx"][0])
+ def test_sequence(self):
+ """
+ Sequence zmq notifications give every blockhash and txhash in order
+ of processing, regardless of IBD, re-orgs, etc.
+ Format of messages:
+ <32-byte hash>C : Blockhash connected
+ <32-byte hash>D : Blockhash disconnected
+ <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool
+ for non-block inclusion reason
+ <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
+ """
+ self.log.info("Testing 'sequence' publisher")
+ address = 'tcp://127.0.0.1:28333'
+ socket = self.ctx.socket(zmq.SUB)
+ socket.set(zmq.RCVTIMEO, 60000)
+ seq = ZMQSubscriber(socket, b'sequence')
+
+ self.restart_node(0, [f'-zmqpub{seq.topic.decode()}={address}'])
+ socket.connect(address)
+ # Relax so that the subscriber is ready before publishing zmq messages
+ sleep(0.2)
+
+ # Mempool sequence number starts at 1
+ seq_num = 1
+
+ # Generate 1 block in nodes[0] and receive all notifications
+ dc_block = self.nodes[0].generatetoaddress(
+ 1, ADDRESS_ECREG_UNSPENDABLE)[0]
+
+ # Note: We are not notified of any block transactions, coinbase or
+ # mined
+ assert_equal((self.nodes[0].getbestblockhash(), "C", None),
+ seq.receive_sequence())
+
+ # Generate 2 blocks in nodes[1] to a different address to ensure
+ # a chain split
+ self.nodes[1].generatetoaddress(2, ADDRESS_ECREG_P2SH_OP_TRUE)
+
+ # nodes[0] will reorg chain after connecting back nodes[1]
+ connect_nodes(self.nodes[0], self.nodes[1])
+
+ # Then we receive all block (dis)connect notifications for the
+ # 2 block reorg
+ assert_equal((dc_block, "D", None), seq.receive_sequence())
+ block_count = self.nodes[1].getblockcount()
+ assert_equal((self.nodes[1].getblockhash(block_count - 1), "C", None),
+ seq.receive_sequence())
+ assert_equal((self.nodes[1].getblockhash(block_count), "C", None),
+ seq.receive_sequence())
+
+ # Rest of test requires wallet functionality
+ if self.is_wallet_compiled():
+ self.log.info("Wait for tx from second node")
+ payment_txid = self.nodes[1].sendtoaddress(
+ address=self.nodes[0].getnewaddress(), amount=5_000_000)
+ self.sync_all()
+ self.log.info(
+ "Testing sequence notifications with mempool sequence values")
+
+ # Should receive the broadcasted txid.
+ assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
+ seq_num += 1
+
+ # Removed RBF tests
+
+ # Doesn't get published when mined, make a block and tx to "flush"
+ # the possibility though the mempool sequence number does go up by
+ # the number of transactions removed from the mempool by the block
+ # mining it.
+ mempool_size = len(self.nodes[0].getrawmempool())
+ c_block = self.nodes[0].generatetoaddress(
+ 1, ADDRESS_ECREG_UNSPENDABLE)[0]
+ self.sync_all()
+ # Make sure the number of mined transactions matches the number of
+ # txs out of mempool
+ mempool_size_delta = mempool_size - \
+ len(self.nodes[0].getrawmempool())
+ assert_equal(len(self.nodes[0].getblock(c_block)["tx"]) - 1,
+ mempool_size_delta)
+ seq_num += mempool_size_delta
+ payment_txid_2 = self.nodes[1].sendtoaddress(
+ self.nodes[0].getnewaddress(), 1_000_000)
+ self.sync_all()
+ assert_equal((c_block, "C", None), seq.receive_sequence())
+ assert_equal((payment_txid_2, "A", seq_num),
+ seq.receive_sequence())
+ seq_num += 1
+
+ # Spot check getrawmempool results that they only show up when
+ # asked for
+ assert isinstance(self.nodes[0].getrawmempool(), list)
+ assert isinstance(
+ self.nodes[0].getrawmempool(mempool_sequence=False),
+ list)
+ assert "mempool_sequence" not in self.nodes[0].getrawmempool(
+ verbose=True)
+ assert_raises_rpc_error(
+ -8, "Verbose results cannot contain mempool sequence values.",
+ self.nodes[0].getrawmempool, True, True)
+ assert_equal(self.nodes[0].getrawmempool(
+ mempool_sequence=True)["mempool_sequence"],
+ seq_num)
+
+ self.log.info("Testing reorg notifications")
+ # Manually invalidate the last block to test mempool re-entry
+ # N.B. This part could be made more lenient in exact ordering
+ # since it greatly depends on inner-workings of blocks/mempool
+ # during "deep" re-orgs. Probably should "re-construct"
+ # blockchain/mempool state from notifications instead.
+ block_count = self.nodes[0].getblockcount()
+ best_hash = self.nodes[0].getbestblockhash()
+ self.nodes[0].invalidateblock(best_hash)
+ # Bit of room to make sure transaction things happened
+ sleep(2)
+
+ # Make sure getrawmempool mempool_sequence results aren't "queued"
+ # but immediately reflective of the time they were gathered.
+ assert self.nodes[0].getrawmempool(
+ mempool_sequence=True)["mempool_sequence"] > seq_num
+
+ assert_equal((best_hash, "D", None), seq.receive_sequence())
+ assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
+ seq_num += 1
+
+ # Other things may happen but aren't wallet-deterministic so we
+ # don't test for them currently
+ self.nodes[0].reconsiderblock(best_hash)
+ self.nodes[1].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE)
+ self.sync_all()
+
+ self.log.info("Evict mempool transaction by block conflict")
+ orig_txid = self.nodes[0].sendtoaddress(
+ address=self.nodes[0].getnewaddress(), amount=1_000_000)
+
+ # More to be simply mined
+ more_tx = []
+ for _ in range(5):
+ more_tx.append(self.nodes[0].sendtoaddress(
+ self.nodes[0].getnewaddress(), 100_000))
+
+ raw_tx = self.nodes[0].getrawtransaction(orig_txid)
+ block = create_block(
+ int(self.nodes[0].getbestblockhash(), 16),
+ create_coinbase(self.nodes[0].getblockcount() + 1))
+ tx = FromHex(CTransaction(), raw_tx)
+ block.vtx.append(tx)
+ for txid in more_tx:
+ tx = FromHex(CTransaction(),
+ self.nodes[0].getrawtransaction(txid))
+ block.vtx.append(tx)
+ make_conform_to_ctor(block)
+ block.hashMerkleRoot = block.calc_merkle_root()
+ block.solve()
+ assert_equal(self.nodes[0].submitblock(block.serialize().hex()),
+ None)
+ tip = self.nodes[0].getbestblockhash()
+ assert_equal(int(tip, 16), block.sha256)
+ orig_txid_2 = self.nodes[0].sendtoaddress(
+ address=self.nodes[0].getnewaddress(), amount=1_000_000)
+
+ # Flush old notifications until evicted tx original entry
+ (hash_str, label, mempool_seq) = seq.receive_sequence()
+ while hash_str != orig_txid:
+ (hash_str, label, mempool_seq) = seq.receive_sequence()
+ mempool_seq += 1
+
+ # Added original tx
+ assert_equal(label, "A")
+ # More transactions to be simply mined
+ for i in range(len(more_tx)):
+ assert_equal((more_tx[i], "A", mempool_seq),
+ seq.receive_sequence())
+ mempool_seq += 1
+
+ # Removed RBF tests
+
+ mempool_seq += 1
+ assert_equal((tip, "C", None), seq.receive_sequence())
+ mempool_seq += len(more_tx)
+ # Last tx
+ assert_equal((orig_txid_2, "A", mempool_seq),
+ seq.receive_sequence())
+ mempool_seq += 1
+ self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE)
+ # want to make sure we didn't break "consensus" for other tests
+ self.sync_all()
+
+ def test_mempool_sync(self):
+ """
+ Use sequence notification plus getrawmempool sequence results to
+ "sync mempool"
+ """
+ if not self.is_wallet_compiled():
+ self.log.info("Skipping mempool sync test")
+ return
+
+ self.log.info("Testing 'mempool sync' usage of sequence notifier")
+ address = 'tcp://127.0.0.1:28333'
+ socket = self.ctx.socket(zmq.SUB)
+ socket.set(zmq.RCVTIMEO, 60000)
+ seq = ZMQSubscriber(socket, b'sequence')
+
+ self.restart_node(0, [f'-zmqpub{seq.topic.decode()}={address}'])
+ connect_nodes(self.nodes[0], self.nodes[1])
+ socket.connect(address)
+ # Relax so that the subscriber is ready before publishing zmq messages
+ sleep(0.2)
+
+ # In-memory counter, should always start at 1
+ next_mempool_seq = self.nodes[0].getrawmempool(
+ mempool_sequence=True)["mempool_sequence"]
+ assert_equal(next_mempool_seq, 1)
+
+ # Some transactions have been happening but we aren't consuming
+ # zmq notifications yet or we lost a ZMQ message somehow and want
+ # to start over
+ txids = []
+ num_txs = 5
+ for _ in range(num_txs):
+ txids.append(self.nodes[1].sendtoaddress(
+ address=self.nodes[0].getnewaddress(), amount=1_000_000))
+ self.sync_all()
+
+ # 1) Consume backlog until we get a mempool sequence number
+ (hash_str, label, zmq_mem_seq) = seq.receive_sequence()
+ while zmq_mem_seq is None:
+ (hash_str, label, zmq_mem_seq) = seq.receive_sequence()
+
+ assert label == "A"
+ assert hash_str is not None
+
+ # 2) We need to "seed" our view of the mempool
+ mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
+ mempool_view = set(mempool_snapshot["txids"])
+ get_raw_seq = mempool_snapshot["mempool_sequence"]
+ assert_equal(get_raw_seq, 6)
+ # Snapshot may be too old compared to zmq message we read off latest
+ while zmq_mem_seq >= get_raw_seq:
+ sleep(2)
+ mempool_snapshot = self.nodes[0].getrawmempool(
+ mempool_sequence=True)
+ mempool_view = set(mempool_snapshot["txids"])
+ get_raw_seq = mempool_snapshot["mempool_sequence"]
+
+ # Things continue to happen in the "interim" while waiting for
+ # snapshot results
+ for _ in range(num_txs):
+ txids.append(self.nodes[0].sendtoaddress(
+ address=self.nodes[0].getnewaddress(), amount=1_000_000))
+ self.sync_all()
+ self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE)
+ final_txid = self.nodes[0].sendtoaddress(
+ address=self.nodes[0].getnewaddress(), amount=100_000)
+
+ # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot
+ while True:
+ if zmq_mem_seq == get_raw_seq - 1:
+ break
+ (hash_str, label, mempool_sequence) = seq.receive_sequence()
+ if mempool_sequence is not None:
+ zmq_mem_seq = mempool_sequence
+ if zmq_mem_seq > get_raw_seq:
+ raise Exception(
+ f"We somehow jumped mempool sequence numbers! "
+ f"zmq_mem_seq: {zmq_mem_seq} > "
+ f"get_raw_seq: {get_raw_seq}")
+
+ # 4) Moving forward, we apply the delta to our local view
+ # remaining txs(5) + 1 block connect + 1 final tx
+ expected_sequence = get_raw_seq
+ for _ in range(num_txs + 1 + 1):
+ (hash_str, label, mempool_sequence) = seq.receive_sequence()
+ if label == "A":
+ assert hash_str not in mempool_view
+ mempool_view.add(hash_str)
+ expected_sequence = mempool_sequence + 1
+ elif label == "C":
+ # (Attempt to) remove all txids from known block connects
+ block_txids = self.nodes[0].getblock(hash_str)["tx"][1:]
+ for txid in block_txids:
+ if txid in mempool_view:
+ expected_sequence += 1
+ mempool_view.remove(txid)
+ elif label == "D":
+ # Not useful for mempool tracking per se
+ continue
+ else:
+ raise Exception("Unexpected ZMQ sequence label!")
+
+ assert_equal(self.nodes[0].getrawmempool(), [final_txid])
+ assert_equal(
+ self.nodes[0].getrawmempool(
+ mempool_sequence=True)["mempool_sequence"],
+ expected_sequence)
+
+ # 5) If you miss a zmq/mempool sequence number, go back to step (2)
+
+ self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE)
+
if __name__ == '__main__':
ZMQTest().main()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mon, May 12, 01:41 (4 h, 10 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5776624
Default Alt Text
D10306.diff (16 KB)
Attached To
D10306: Add functional tests for zmq sequence topic and mempool sequence logic
Event Timeline
Log In to Comment