Changeset View
Changeset View
Standalone View
Standalone View
test/functional/interface_zmq.py
Show All 16 Lines | from test_framework.blocktools import ( | ||||
make_conform_to_ctor, | make_conform_to_ctor, | ||||
) | ) | ||||
from test_framework.messages import CTransaction, FromHex, hash256 | from test_framework.messages import CTransaction, FromHex, hash256 | ||||
from test_framework.test_framework import BitcoinTestFramework | from test_framework.test_framework import BitcoinTestFramework | ||||
from test_framework.util import ( | from test_framework.util import ( | ||||
assert_equal, | assert_equal, | ||||
assert_raises_rpc_error, | assert_raises_rpc_error, | ||||
connect_nodes, | connect_nodes, | ||||
disconnect_nodes, | |||||
) | ) | ||||
# Test may be skipped and not have zmq installed | # Test may be skipped and not have zmq installed | ||||
try: | try: | ||||
import zmq | import zmq | ||||
except ImportError: | except ImportError: | ||||
pass | pass | ||||
Show All 35 Lines | def receive_sequence(self): | ||||
else: | else: | ||||
assert label == "D" or label == "C" | assert label == "D" or label == "C" | ||||
return (hash, label, mempool_sequence) | return (hash, label, mempool_sequence) | ||||
class ZMQTest (BitcoinTestFramework): | class ZMQTest (BitcoinTestFramework): | ||||
def set_test_params(self): | def set_test_params(self): | ||||
self.num_nodes = 2 | self.num_nodes = 2 | ||||
self.extra_args = [["-whitelist=noban@127.0.0.1"]] * self.num_nodes | |||||
def skip_test_if_missing_module(self): | def skip_test_if_missing_module(self): | ||||
self.skip_if_no_py3_zmq() | self.skip_if_no_py3_zmq() | ||||
self.skip_if_no_bitcoind_zmq() | self.skip_if_no_bitcoind_zmq() | ||||
def run_test(self): | def run_test(self): | ||||
self.ctx = zmq.Context() | self.ctx = zmq.Context() | ||||
try: | try: | ||||
Show All 21 Lines | def test_basic(self): | ||||
subs.append(ZMQSubscriber(sockets[-1], service)) | subs.append(ZMQSubscriber(sockets[-1], service)) | ||||
# Subscribe to all available topics. | # Subscribe to all available topics. | ||||
hashblock = subs[0] | hashblock = subs[0] | ||||
hashtx = subs[1] | hashtx = subs[1] | ||||
rawblock = subs[2] | rawblock = subs[2] | ||||
rawtx = subs[3] | rawtx = subs[3] | ||||
self.restart_node(0, ["-zmqpub{}={}".format(sub.topic.decode(), address) | self.restart_node( | ||||
for sub in [hashblock, hashtx, rawblock, rawtx]]) | 0, | ||||
self.extra_args[0] + [ | |||||
f"-zmqpub{sub.topic.decode()}={address}" for sub in [ | |||||
hashblock, hashtx, rawblock, rawtx]] | |||||
) | |||||
connect_nodes(self.nodes[0], self.nodes[1]) | connect_nodes(self.nodes[0], self.nodes[1]) | ||||
for socket in sockets: | for socket in sockets: | ||||
socket.connect(address) | socket.connect(address) | ||||
# Relax so that the subscriber is ready before publishing zmq messages | # Relax so that the subscriber is ready before publishing zmq messages | ||||
sleep(0.2) | sleep(0.2) | ||||
num_blocks = 5 | num_blocks = 5 | ||||
self.log.info( | self.log.info( | ||||
▲ Show 20 Lines • Show All 72 Lines • ▼ Show 20 Lines | def test_reorg(self): | ||||
subs.append(ZMQSubscriber(sockets[-1], service)) | subs.append(ZMQSubscriber(sockets[-1], service)) | ||||
# Subscribe to all available topics. | # Subscribe to all available topics. | ||||
hashblock = subs[0] | hashblock = subs[0] | ||||
hashtx = subs[1] | hashtx = subs[1] | ||||
# Should only notify the tip if a reorg occurs | # Should only notify the tip if a reorg occurs | ||||
self.restart_node( | self.restart_node( | ||||
0, ['-zmqpub{}={}'.format(sub.topic.decode(), address) for sub in [hashblock, hashtx]]) | 0, self.extra_args[0] + [f'-zmqpub{sub.topic.decode()}={address}' | ||||
for sub in [hashblock, hashtx]]) | |||||
for socket in sockets: | for socket in sockets: | ||||
socket.connect(address) | socket.connect(address) | ||||
# Relax so that the subscriber is ready before publishing zmq messages | # Relax so that the subscriber is ready before publishing zmq messages | ||||
sleep(0.2) | sleep(0.2) | ||||
# Generate 1 block in nodes[0] with 1 mempool tx and receive all | # Generate 1 block in nodes[0] with 1 mempool tx and receive all | ||||
# notifications | # notifications | ||||
payment_txid = self.nodes[0].sendtoaddress( | payment_txid = self.nodes[0].sendtoaddress( | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | def test_reorg(self): | ||||
self.nodes[1].getblock( | self.nodes[1].getblock( | ||||
connect_blocks[1])["tx"][0]) | connect_blocks[1])["tx"][0]) | ||||
# And the current tip | # And the current tip | ||||
assert_equal( | assert_equal( | ||||
hashtx.receive().hex(), | hashtx.receive().hex(), | ||||
self.nodes[1].getblock( | self.nodes[1].getblock( | ||||
connect_blocks[0])["tx"][0]) | connect_blocks[0])["tx"][0]) | ||||
def create_conflicting_tx(self): | |||||
"""Create a transaction that is initially added to node0's mempool | |||||
and is then rejected by a transaction created and included into a | |||||
block by node1.""" | |||||
utxo = self.nodes[1].listunspent()[0] | |||||
def send_conflicting_transaction(send_node): | |||||
"""Send a transaction using an identical utxo as input and | |||||
a different address as output each time the function is | |||||
called. Return the TxId.""" | |||||
address = self.nodes[1].getnewaddress() | |||||
change_address = self.nodes[1].getrawchangeaddress() | |||||
tx = self.nodes[1].signrawtransactionwithwallet( | |||||
self.nodes[1].createrawtransaction( | |||||
inputs=[{"txid": utxo["txid"], "vout": utxo["vout"]}], | |||||
outputs=[{address: 5_000_000}, | |||||
{change_address: utxo["amount"] - 5_001_000}] | |||||
) | |||||
) | |||||
return send_node.sendrawtransaction(tx["hex"]) | |||||
disconnect_nodes(self.nodes[0], self.nodes[1]) | |||||
txid_to_be_replaced = send_conflicting_transaction(self.nodes[0]) | |||||
replacement_txid = send_conflicting_transaction(self.nodes[1]) | |||||
block_hash = self.nodes[1].generatetoaddress( | |||||
1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] | |||||
connect_nodes(self.nodes[0], self.nodes[1]) | |||||
self.sync_all() | |||||
return block_hash, txid_to_be_replaced, replacement_txid | |||||
def test_sequence(self): | def test_sequence(self): | ||||
""" | """ | ||||
Sequence zmq notifications give every blockhash and txhash in order | Sequence zmq notifications give every blockhash and txhash in order | ||||
of processing, regardless of IBD, re-orgs, etc. | of processing, regardless of IBD, re-orgs, etc. | ||||
Format of messages: | Format of messages: | ||||
<32-byte hash>C : Blockhash connected | <32-byte hash>C : Blockhash connected | ||||
<32-byte hash>D : Blockhash disconnected | <32-byte hash>D : Blockhash disconnected | ||||
<32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool | <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool | ||||
for non-block inclusion reason | for non-block inclusion reason | ||||
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool | <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool | ||||
""" | """ | ||||
self.log.info("Testing 'sequence' publisher") | self.log.info("Testing 'sequence' publisher") | ||||
address = 'tcp://127.0.0.1:28333' | address = 'tcp://127.0.0.1:28333' | ||||
socket = self.ctx.socket(zmq.SUB) | socket = self.ctx.socket(zmq.SUB) | ||||
socket.set(zmq.RCVTIMEO, 60000) | socket.set(zmq.RCVTIMEO, 60000) | ||||
seq = ZMQSubscriber(socket, b'sequence') | seq = ZMQSubscriber(socket, b'sequence') | ||||
self.restart_node(0, [f'-zmqpub{seq.topic.decode()}={address}']) | self.restart_node( | ||||
0, self.extra_args[0] + [f'-zmqpub{seq.topic.decode()}={address}']) | |||||
socket.connect(address) | socket.connect(address) | ||||
# Relax so that the subscriber is ready before publishing zmq messages | # Relax so that the subscriber is ready before publishing zmq messages | ||||
sleep(0.2) | sleep(0.2) | ||||
# Mempool sequence number starts at 1 | # Mempool sequence number starts at 1 | ||||
seq_num = 1 | seq_num = 1 | ||||
# Generate 1 block in nodes[0] and receive all notifications | # Generate 1 block in nodes[0] and receive all notifications | ||||
Show All 18 Lines | def test_sequence(self): | ||||
block_count = self.nodes[1].getblockcount() | block_count = self.nodes[1].getblockcount() | ||||
assert_equal((self.nodes[1].getblockhash(block_count - 1), "C", None), | assert_equal((self.nodes[1].getblockhash(block_count - 1), "C", None), | ||||
seq.receive_sequence()) | seq.receive_sequence()) | ||||
assert_equal((self.nodes[1].getblockhash(block_count), "C", None), | assert_equal((self.nodes[1].getblockhash(block_count), "C", None), | ||||
seq.receive_sequence()) | seq.receive_sequence()) | ||||
# Rest of test requires wallet functionality | # Rest of test requires wallet functionality | ||||
if self.is_wallet_compiled(): | if self.is_wallet_compiled(): | ||||
(block_hash, txid_to_be_replaced, replacement_txid | |||||
) = self.create_conflicting_tx() | |||||
self.log.info( | |||||
"Testing sequence notifications with mempool sequence values") | |||||
# Should receive the initially broadcasted txid. | |||||
assert_equal((txid_to_be_replaced, "A", seq_num), | |||||
seq.receive_sequence()) | |||||
seq_num += 1 | |||||
self.log.info("Testing a tx removal notification") | |||||
# Next we receive a notification for the transaction removal | |||||
assert_equal((txid_to_be_replaced, "R", seq_num), | |||||
seq.receive_sequence()) | |||||
seq_num += 1 | |||||
# Then we see the block notification | |||||
assert_equal((block_hash, "C", None), seq.receive_sequence()) | |||||
# There is no sequence notification for the transaction that was | |||||
# never in node0's mempool, but it can be found in the block. | |||||
assert replacement_txid in self.nodes[0].getblock(block_hash)["tx"] | |||||
self.log.info("Wait for tx from second node") | self.log.info("Wait for tx from second node") | ||||
payment_txid = self.nodes[1].sendtoaddress( | payment_txid = self.nodes[1].sendtoaddress( | ||||
address=self.nodes[0].getnewaddress(), amount=5_000_000) | address=self.nodes[0].getnewaddress(), amount=5_000_000) | ||||
self.sync_all() | self.sync_all() | ||||
self.log.info( | |||||
"Testing sequence notifications with mempool sequence values") | |||||
# Should receive the broadcasted txid. | |||||
assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) | assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) | ||||
seq_num += 1 | seq_num += 1 | ||||
# Removed RBF tests | |||||
# Doesn't get published when mined, make a block and tx to "flush" | # Doesn't get published when mined, make a block and tx to "flush" | ||||
# the possibility though the mempool sequence number does go up by | # the possibility though the mempool sequence number does go up by | ||||
# the number of transactions removed from the mempool by the block | # the number of transactions removed from the mempool by the block | ||||
# mining it. | # mining it. | ||||
mempool_size = len(self.nodes[0].getrawmempool()) | mempool_size = len(self.nodes[0].getrawmempool()) | ||||
c_block = self.nodes[0].generatetoaddress( | c_block = self.nodes[0].generatetoaddress( | ||||
1, ADDRESS_ECREG_UNSPENDABLE)[0] | 1, ADDRESS_ECREG_UNSPENDABLE)[0] | ||||
self.sync_all() | self.sync_all() | ||||
▲ Show 20 Lines • Show All 121 Lines • ▼ Show 20 Lines | def test_mempool_sync(self): | ||||
return | return | ||||
self.log.info("Testing 'mempool sync' usage of sequence notifier") | self.log.info("Testing 'mempool sync' usage of sequence notifier") | ||||
address = 'tcp://127.0.0.1:28333' | address = 'tcp://127.0.0.1:28333' | ||||
socket = self.ctx.socket(zmq.SUB) | socket = self.ctx.socket(zmq.SUB) | ||||
socket.set(zmq.RCVTIMEO, 60000) | socket.set(zmq.RCVTIMEO, 60000) | ||||
seq = ZMQSubscriber(socket, b'sequence') | seq = ZMQSubscriber(socket, b'sequence') | ||||
self.restart_node(0, [f'-zmqpub{seq.topic.decode()}={address}']) | self.restart_node( | ||||
0, self.extra_args[0] + [f'-zmqpub{seq.topic.decode()}={address}']) | |||||
connect_nodes(self.nodes[0], self.nodes[1]) | connect_nodes(self.nodes[0], self.nodes[1]) | ||||
socket.connect(address) | socket.connect(address) | ||||
# Relax so that the subscriber is ready before publishing zmq messages | # Relax so that the subscriber is ready before publishing zmq messages | ||||
sleep(0.2) | sleep(0.2) | ||||
# In-memory counter, should always start at 1 | # In-memory counter, should always start at 1 | ||||
next_mempool_seq = self.nodes[0].getrawmempool( | next_mempool_seq = self.nodes[0].getrawmempool( | ||||
mempool_sequence=True)["mempool_sequence"] | mempool_sequence=True)["mempool_sequence"] | ||||
Show All 31 Lines | def test_mempool_sync(self): | ||||
get_raw_seq = mempool_snapshot["mempool_sequence"] | get_raw_seq = mempool_snapshot["mempool_sequence"] | ||||
# Things continue to happen in the "interim" while waiting for | # Things continue to happen in the "interim" while waiting for | ||||
# snapshot results | # snapshot results | ||||
for _ in range(num_txs): | for _ in range(num_txs): | ||||
txids.append(self.nodes[0].sendtoaddress( | txids.append(self.nodes[0].sendtoaddress( | ||||
address=self.nodes[0].getnewaddress(), amount=1_000_000)) | address=self.nodes[0].getnewaddress(), amount=1_000_000)) | ||||
self.sync_all() | self.sync_all() | ||||
self.create_conflicting_tx() | |||||
self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) | self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) | ||||
final_txid = self.nodes[0].sendtoaddress( | final_txid = self.nodes[0].sendtoaddress( | ||||
address=self.nodes[0].getnewaddress(), amount=100_000) | address=self.nodes[0].getnewaddress(), amount=100_000) | ||||
# 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot | # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot | ||||
while True: | while True: | ||||
if zmq_mem_seq == get_raw_seq - 1: | if zmq_mem_seq == get_raw_seq - 1: | ||||
break | break | ||||
(hash_str, label, mempool_sequence) = seq.receive_sequence() | (hash_str, label, mempool_sequence) = seq.receive_sequence() | ||||
if mempool_sequence is not None: | if mempool_sequence is not None: | ||||
zmq_mem_seq = mempool_sequence | zmq_mem_seq = mempool_sequence | ||||
if zmq_mem_seq > get_raw_seq: | if zmq_mem_seq > get_raw_seq: | ||||
raise Exception( | raise Exception( | ||||
f"We somehow jumped mempool sequence numbers! " | f"We somehow jumped mempool sequence numbers! " | ||||
f"zmq_mem_seq: {zmq_mem_seq} > " | f"zmq_mem_seq: {zmq_mem_seq} > " | ||||
f"get_raw_seq: {get_raw_seq}") | f"get_raw_seq: {get_raw_seq}") | ||||
# 4) Moving forward, we apply the delta to our local view | # 4) Moving forward, we apply the delta to our local view | ||||
# remaining txs(5) + 1 block connect + 1 final tx | # remaining txs + conflict (A, R, C) + 1 block connect + 1 final tx | ||||
expected_sequence = get_raw_seq | expected_sequence = get_raw_seq | ||||
for _ in range(num_txs + 1 + 1): | for _ in range(num_txs + 3 + 1 + 1): | ||||
(hash_str, label, mempool_sequence) = seq.receive_sequence() | (hash_str, label, mempool_sequence) = seq.receive_sequence() | ||||
if label == "A": | if label == "A": | ||||
assert hash_str not in mempool_view | assert hash_str not in mempool_view | ||||
mempool_view.add(hash_str) | mempool_view.add(hash_str) | ||||
expected_sequence = mempool_sequence + 1 | expected_sequence = mempool_sequence + 1 | ||||
elif label == "R": | |||||
assert hash_str in mempool_view | |||||
mempool_view.remove(hash_str) | |||||
expected_sequence = mempool_sequence + 1 | |||||
elif label == "C": | elif label == "C": | ||||
# (Attempt to) remove all txids from known block connects | # (Attempt to) remove all txids from known block connects | ||||
block_txids = self.nodes[0].getblock(hash_str)["tx"][1:] | block_txids = self.nodes[0].getblock(hash_str)["tx"][1:] | ||||
for txid in block_txids: | for txid in block_txids: | ||||
if txid in mempool_view: | if txid in mempool_view: | ||||
expected_sequence += 1 | expected_sequence += 1 | ||||
mempool_view.remove(txid) | mempool_view.remove(txid) | ||||
elif label == "D": | elif label == "D": | ||||
Show All 18 Lines |