Changeset View
Standalone View
test/functional/interface_zmq.py
Show All 16 Lines | from test_framework.blocktools import ( | ||||
make_conform_to_ctor, | make_conform_to_ctor, | ||||
) | ) | ||||
from test_framework.messages import CTransaction, FromHex, hash256 | from test_framework.messages import CTransaction, FromHex, hash256 | ||||
from test_framework.test_framework import BitcoinTestFramework | from test_framework.test_framework import BitcoinTestFramework | ||||
from test_framework.util import ( | from test_framework.util import ( | ||||
assert_equal, | assert_equal, | ||||
assert_raises_rpc_error, | assert_raises_rpc_error, | ||||
connect_nodes, | connect_nodes, | ||||
disconnect_nodes, | |||||
) | ) | ||||
# Test may be skipped and not have zmq installed | # Test may be skipped and not have zmq installed | ||||
try: | try: | ||||
import zmq | import zmq | ||||
except ImportError: | except ImportError: | ||||
pass | pass | ||||
Show All 35 Lines | def receive_sequence(self): | ||||
else: | else: | ||||
assert label == "D" or label == "C" | assert label == "D" or label == "C" | ||||
return (hash, label, mempool_sequence) | return (hash, label, mempool_sequence) | ||||
class ZMQTest (BitcoinTestFramework): | class ZMQTest (BitcoinTestFramework): | ||||
def set_test_params(self): | def set_test_params(self): | ||||
self.num_nodes = 2 | self.num_nodes = 2 | ||||
self.whitelist_arg = "-whitelist=noban@127.0.0.1" | |||||
self.extra_args = [[self.whitelist_arg]] * self.num_nodes | |||||
def skip_test_if_missing_module(self): | def skip_test_if_missing_module(self): | ||||
self.skip_if_no_py3_zmq() | self.skip_if_no_py3_zmq() | ||||
self.skip_if_no_bitcoind_zmq() | self.skip_if_no_bitcoind_zmq() | ||||
def run_test(self): | def run_test(self): | ||||
self.ctx = zmq.Context() | self.ctx = zmq.Context() | ||||
try: | try: | ||||
Show All 21 Lines | def test_basic(self): | ||||
subs.append(ZMQSubscriber(sockets[-1], service)) | subs.append(ZMQSubscriber(sockets[-1], service)) | ||||
# Subscribe to all available topics. | # Subscribe to all available topics. | ||||
hashblock = subs[0] | hashblock = subs[0] | ||||
hashtx = subs[1] | hashtx = subs[1] | ||||
rawblock = subs[2] | rawblock = subs[2] | ||||
rawtx = subs[3] | rawtx = subs[3] | ||||
self.restart_node(0, ["-zmqpub{}={}".format(sub.topic.decode(), address) | self.restart_node( | ||||
for sub in [hashblock, hashtx, rawblock, rawtx]]) | 0, | ||||
[f"-zmqpub{sub.topic.decode()}={address}" for sub in [ | |||||
hashblock, hashtx, rawblock, rawtx]] + | |||||
[self.whitelist_arg]) | |||||
Fabien: just use self.extra_args here, so you have a single spot to update | |||||
connect_nodes(self.nodes[0], self.nodes[1]) | connect_nodes(self.nodes[0], self.nodes[1]) | ||||
for socket in sockets: | for socket in sockets: | ||||
socket.connect(address) | socket.connect(address) | ||||
# Relax so that the subscriber is ready before publishing zmq messages | # Relax so that the subscriber is ready before publishing zmq messages | ||||
sleep(0.2) | sleep(0.2) | ||||
num_blocks = 5 | num_blocks = 5 | ||||
self.log.info( | self.log.info( | ||||
▲ Show 20 Lines • Show All 72 Lines • ▼ Show 20 Lines | def test_reorg(self): | ||||
subs.append(ZMQSubscriber(sockets[-1], service)) | subs.append(ZMQSubscriber(sockets[-1], service)) | ||||
# Subscribe to all available topics. | # Subscribe to all available topics. | ||||
hashblock = subs[0] | hashblock = subs[0] | ||||
hashtx = subs[1] | hashtx = subs[1] | ||||
# Should only notify the tip if a reorg occurs | # Should only notify the tip if a reorg occurs | ||||
self.restart_node( | self.restart_node( | ||||
0, ['-zmqpub{}={}'.format(sub.topic.decode(), address) for sub in [hashblock, hashtx]]) | 0, [f'-zmqpub{sub.topic.decode()}={address}' | ||||
for sub in [hashblock, hashtx]] + [self.whitelist_arg]) | |||||
FabienUnsubmitted Not Done Inline Actionsdito Fabien: dito | |||||
for socket in sockets: | for socket in sockets: | ||||
socket.connect(address) | socket.connect(address) | ||||
# Relax so that the subscriber is ready before publishing zmq messages | # Relax so that the subscriber is ready before publishing zmq messages | ||||
sleep(0.2) | sleep(0.2) | ||||
# Generate 1 block in nodes[0] with 1 mempool tx and receive all | # Generate 1 block in nodes[0] with 1 mempool tx and receive all | ||||
# notifications | # notifications | ||||
payment_txid = self.nodes[0].sendtoaddress( | payment_txid = self.nodes[0].sendtoaddress( | ||||
▲ Show 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | def test_sequence(self): | ||||
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool | <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool | ||||
""" | """ | ||||
self.log.info("Testing 'sequence' publisher") | self.log.info("Testing 'sequence' publisher") | ||||
address = 'tcp://127.0.0.1:28333' | address = 'tcp://127.0.0.1:28333' | ||||
socket = self.ctx.socket(zmq.SUB) | socket = self.ctx.socket(zmq.SUB) | ||||
socket.set(zmq.RCVTIMEO, 60000) | socket.set(zmq.RCVTIMEO, 60000) | ||||
seq = ZMQSubscriber(socket, b'sequence') | seq = ZMQSubscriber(socket, b'sequence') | ||||
self.restart_node(0, [f'-zmqpub{seq.topic.decode()}={address}']) | self.restart_node( | ||||
0, [f'-zmqpub{seq.topic.decode()}={address}', self.whitelist_arg]) | |||||
FabienUnsubmitted Not Done Inline Actionsdito Fabien: dito | |||||
socket.connect(address) | socket.connect(address) | ||||
# Relax so that the subscriber is ready before publishing zmq messages | # Relax so that the subscriber is ready before publishing zmq messages | ||||
sleep(0.2) | sleep(0.2) | ||||
# Mempool sequence number starts at 1 | # Mempool sequence number starts at 1 | ||||
seq_num = 1 | seq_num = 1 | ||||
# Generate 1 block in nodes[0] and receive all notifications | # Generate 1 block in nodes[0] and receive all notifications | ||||
Show All 19 Lines | def test_sequence(self): | ||||
assert_equal((self.nodes[1].getblockhash(block_count - 1), "C", None), | assert_equal((self.nodes[1].getblockhash(block_count - 1), "C", None), | ||||
seq.receive_sequence()) | seq.receive_sequence()) | ||||
assert_equal((self.nodes[1].getblockhash(block_count), "C", None), | assert_equal((self.nodes[1].getblockhash(block_count), "C", None), | ||||
seq.receive_sequence()) | seq.receive_sequence()) | ||||
# Rest of test requires wallet functionality | # Rest of test requires wallet functionality | ||||
if self.is_wallet_compiled(): | if self.is_wallet_compiled(): | ||||
self.log.info("Wait for tx from second node") | self.log.info("Wait for tx from second node") | ||||
payment_txid = self.nodes[1].sendtoaddress( | |||||
address=self.nodes[0].getnewaddress(), amount=5_000_000) | # Prepare data needed to generate conflicting transactions | ||||
self.sync_all() | utxo = self.nodes[1].listunspent()[0] | ||||
address1 = self.nodes[1].getnewaddress() | |||||
address2 = self.nodes[1].getnewaddress() | |||||
change_address = self.nodes[1].getrawchangeaddress() | |||||
# Produce a transaction that node1 will not receive. | |||||
disconnect_nodes(self.nodes[0], self.nodes[1]) | |||||
tx1 = self.nodes[1].signrawtransactionwithwallet( | |||||
self.nodes[1].createrawtransaction( | |||||
inputs=[{"txid": utxo["txid"], "vout": utxo["vout"]}], | |||||
outputs=[{address1: 5_000_000}, | |||||
{change_address: utxo["amount"] - 5_001_000}] | |||||
) | |||||
) | |||||
payment_txid = self.nodes[0].sendrawtransaction(tx1["hex"]) | |||||
self.log.info( | self.log.info( | ||||
"Testing sequence notifications with mempool sequence values") | "Testing sequence notifications with mempool sequence values") | ||||
# Should receive the broadcasted txid. | # Should receive the broadcasted txid. | ||||
assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) | assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) | ||||
seq_num += 1 | seq_num += 1 | ||||
# Removed RBF tests | self.log.info("Testing a tx removal notification") | ||||
tx2 = self.nodes[1].signrawtransactionwithwallet( | |||||
self.nodes[1].createrawtransaction( | |||||
inputs=[{"txid": utxo["txid"], "vout": utxo["vout"]}], | |||||
outputs=[{address2: 5_000_000}, | |||||
{change_address: utxo["amount"] - 5_001_000}] | |||||
) | |||||
) | |||||
self.nodes[1].sendrawtransaction(tx2["hex"]) | |||||
FabienUnsubmitted Not Done Inline ActionsYou may want to wrap this into a function to avoid having to search for where the difference is from the previous tx. Also you avoid tx1 and tx2 vars which are not needed Fabien: You may want to wrap this into a function to avoid having to search for where the difference is… | |||||
# Mine it before reconnection, so that will get ejected from | |||||
FabienUnsubmitted Not Done Inline ActionsThis comment is wrong, you got it the opposite way. Fabien: This comment is wrong, you got it the opposite way.
Also it's a good idea to make sure tx2 is… | |||||
# from node0's mempool | |||||
self.nodes[1].generatetoaddress(1, ADDRESS_ECREG_P2SH_OP_TRUE) | |||||
connect_nodes(self.nodes[0], self.nodes[1]) | |||||
self.sync_all() | |||||
best_block = self.nodes[1].getblockhash( | |||||
FabienUnsubmitted Not Done Inline Actionsyou could have used getbestblockhash but it's not even needed, as you can look at the output from generatetoaddress above Fabien: you could have used `getbestblockhash` but it's not even needed, as you can look at the output… | |||||
self.nodes[1].getblockcount()) | |||||
assert_equal((payment_txid, "R", seq_num), seq.receive_sequence()) | |||||
seq_num += 1 | |||||
assert_equal((best_block, "C", None), seq.receive_sequence()) | |||||
# We don't see a notification for the replacement tx, because it | |||||
# was never in node0's mempool. | |||||
# Add another transaction, because mining that last block in order | |||||
# to cause a tx rejection put us out of sync with Core's test | |||||
payment_txid = self.nodes[1].sendtoaddress( | |||||
FabienUnsubmitted Not Done Inline Actionsdon't reuse the same variable, this is confusing. You should change the name of the previous one to something like txid_to_be_replaced Fabien: don't reuse the same variable, this is confusing. You should change the name of the previous… | |||||
address=self.nodes[0].getnewaddress(), amount=5_000_000) | |||||
self.sync_all() | |||||
assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) | |||||
seq_num += 1 | |||||
# Doesn't get published when mined, make a block and tx to "flush" | # Doesn't get published when mined, make a block and tx to "flush" | ||||
# the possibility though the mempool sequence number does go up by | # the possibility though the mempool sequence number does go up by | ||||
# the number of transactions removed from the mempool by the block | # the number of transactions removed from the mempool by the block | ||||
# mining it. | # mining it. | ||||
mempool_size = len(self.nodes[0].getrawmempool()) | mempool_size = len(self.nodes[0].getrawmempool()) | ||||
c_block = self.nodes[0].generatetoaddress( | c_block = self.nodes[0].generatetoaddress( | ||||
1, ADDRESS_ECREG_UNSPENDABLE)[0] | 1, ADDRESS_ECREG_UNSPENDABLE)[0] | ||||
self.sync_all() | self.sync_all() | ||||
# Make sure the number of mined transactions matches the number of | # Make sure the number of mined transactions matches the number of | ||||
# txs out of mempool | # txs out of mempool | ||||
mempool_size_delta = mempool_size - \ | mempool_size_delta = ( | ||||
FabienUnsubmitted Not Done Inline ActionsDon't mix these random refactors with the actual changes Fabien: Don't mix these random refactors with the actual changes | |||||
len(self.nodes[0].getrawmempool()) | mempool_size - len(self.nodes[0].getrawmempool())) | ||||
assert_equal(len(self.nodes[0].getblock(c_block)["tx"]) - 1, | assert_equal(len(self.nodes[0].getblock(c_block)["tx"]) - 1, | ||||
mempool_size_delta) | mempool_size_delta) | ||||
seq_num += mempool_size_delta | seq_num += mempool_size_delta | ||||
payment_txid_2 = self.nodes[1].sendtoaddress( | payment_txid_2 = self.nodes[1].sendtoaddress( | ||||
self.nodes[0].getnewaddress(), 1_000_000) | self.nodes[0].getnewaddress(), 1_000_000) | ||||
self.sync_all() | self.sync_all() | ||||
assert_equal((c_block, "C", None), seq.receive_sequence()) | assert_equal((c_block, "C", None), seq.receive_sequence()) | ||||
assert_equal((payment_txid_2, "A", seq_num), | assert_equal((payment_txid_2, "A", seq_num), | ||||
▲ Show 20 Lines • Show All 83 Lines • ▼ Show 20 Lines | def test_sequence(self): | ||||
# Added original tx | # Added original tx | ||||
assert_equal(label, "A") | assert_equal(label, "A") | ||||
# More transactions to be simply mined | # More transactions to be simply mined | ||||
for i in range(len(more_tx)): | for i in range(len(more_tx)): | ||||
assert_equal((more_tx[i], "A", mempool_seq), | assert_equal((more_tx[i], "A", mempool_seq), | ||||
seq.receive_sequence()) | seq.receive_sequence()) | ||||
mempool_seq += 1 | mempool_seq += 1 | ||||
# Removed RBF tests | # Removed RBF tests | ||||
FabienUnsubmitted Not Done Inline Actionswhat is supposed to be tested here ? Fabien: what is supposed to be tested here ? | |||||
PiRKAuthorUnsubmitted Done Inline ActionsIt is the same replacement sequence as tested in this diff, but more elaborate thanks to RBF: a replaceable tx is added (A), then bumped (R, A), then the original pre-bump tx is mined into a block (conflict R, block connected C). PiRK: It is the same replacement sequence as tested in this diff, but more elaborate thanks to RBF: a… | |||||
mempool_seq += 1 | mempool_seq += 1 | ||||
assert_equal((tip, "C", None), seq.receive_sequence()) | assert_equal((tip, "C", None), seq.receive_sequence()) | ||||
mempool_seq += len(more_tx) | mempool_seq += len(more_tx) | ||||
# Last tx | # Last tx | ||||
assert_equal((orig_txid_2, "A", mempool_seq), | assert_equal((orig_txid_2, "A", mempool_seq), | ||||
seq.receive_sequence()) | seq.receive_sequence()) | ||||
mempool_seq += 1 | mempool_seq += 1 | ||||
Show All 11 Lines | def test_mempool_sync(self): | ||||
return | return | ||||
self.log.info("Testing 'mempool sync' usage of sequence notifier") | self.log.info("Testing 'mempool sync' usage of sequence notifier") | ||||
address = 'tcp://127.0.0.1:28333' | address = 'tcp://127.0.0.1:28333' | ||||
socket = self.ctx.socket(zmq.SUB) | socket = self.ctx.socket(zmq.SUB) | ||||
socket.set(zmq.RCVTIMEO, 60000) | socket.set(zmq.RCVTIMEO, 60000) | ||||
seq = ZMQSubscriber(socket, b'sequence') | seq = ZMQSubscriber(socket, b'sequence') | ||||
self.restart_node(0, [f'-zmqpub{seq.topic.decode()}={address}']) | self.restart_node( | ||||
0, [f'-zmqpub{seq.topic.decode()}={address}', self.whitelist_arg]) | |||||
FabienUnsubmitted Not Done Inline Actionsdito Fabien: dito | |||||
connect_nodes(self.nodes[0], self.nodes[1]) | connect_nodes(self.nodes[0], self.nodes[1]) | ||||
socket.connect(address) | socket.connect(address) | ||||
# Relax so that the subscriber is ready before publishing zmq messages | # Relax so that the subscriber is ready before publishing zmq messages | ||||
sleep(0.2) | sleep(0.2) | ||||
# In-memory counter, should always start at 1 | # In-memory counter, should always start at 1 | ||||
next_mempool_seq = self.nodes[0].getrawmempool( | next_mempool_seq = self.nodes[0].getrawmempool( | ||||
mempool_sequence=True)["mempool_sequence"] | mempool_sequence=True)["mempool_sequence"] | ||||
Show All 27 Lines | def test_mempool_sync(self): | ||||
sleep(2) | sleep(2) | ||||
mempool_snapshot = self.nodes[0].getrawmempool( | mempool_snapshot = self.nodes[0].getrawmempool( | ||||
mempool_sequence=True) | mempool_sequence=True) | ||||
mempool_view = set(mempool_snapshot["txids"]) | mempool_view = set(mempool_snapshot["txids"]) | ||||
get_raw_seq = mempool_snapshot["mempool_sequence"] | get_raw_seq = mempool_snapshot["mempool_sequence"] | ||||
# Things continue to happen in the "interim" while waiting for | # Things continue to happen in the "interim" while waiting for | ||||
# snapshot results | # snapshot results | ||||
# We have node 0 do all these to avoid p2p races with RBF announcements | |||||
FabienUnsubmitted Not Done Inline ActionsThat belongs to D10306 Fabien: That belongs to D10306 | |||||
for _ in range(num_txs): | for _ in range(num_txs): | ||||
txids.append(self.nodes[0].sendtoaddress( | txids.append(self.nodes[0].sendtoaddress( | ||||
address=self.nodes[0].getnewaddress(), amount=1_000_000)) | address=self.nodes[0].getnewaddress(), amount=1_000_000)) | ||||
self.sync_all() | self.sync_all() | ||||
self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) | self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) | ||||
final_txid = self.nodes[0].sendtoaddress( | final_txid = self.nodes[0].sendtoaddress( | ||||
address=self.nodes[0].getnewaddress(), amount=100_000) | address=self.nodes[0].getnewaddress(), amount=100_000) | ||||
▲ Show 20 Lines • Show All 48 Lines • Show Last 20 Lines |
just use self.extra_args here, so you have a single spot to update