Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F13711571
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
29 KB
Subscribers
None
View Options
diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py
index 7f47e792a..111571fa6 100644
--- a/test/functional/interface_zmq.py
+++ b/test/functional/interface_zmq.py
@@ -1,697 +1,707 @@
# Copyright (c) 2015-2019 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test the ZMQ notification interface."""
import struct
from io import BytesIO
from time import sleep
from test_framework.address import ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE
from test_framework.blocktools import (
create_block,
create_coinbase,
make_conform_to_ctor,
)
from test_framework.messages import CTransaction, FromHex, hash256
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, assert_raises_rpc_error
# Test may be skipped and not have zmq installed
try:
import zmq
except ImportError:
pass
def hash256_reversed(byte_str):
return hash256(byte_str)[::-1]
class ZMQSubscriber:
def __init__(self, socket, topic):
# no sequence number received yet
self.sequence = None
self.socket = socket
self.topic = topic
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
# Receive message from publisher and verify that topic and sequence match
def _receive_from_publisher_and_check(self):
topic, body, seq = self.socket.recv_multipart()
# Topic should match the subscriber topic.
assert_equal(topic, self.topic)
# Sequence should be incremental.
received_seq = struct.unpack("<I", seq)[-1]
if self.sequence is None:
self.sequence = received_seq
else:
assert_equal(received_seq, self.sequence)
self.sequence += 1
return body
def receive(self):
return self._receive_from_publisher_and_check()
def receive_sequence(self):
body = self._receive_from_publisher_and_check()
hash_str = body[:32].hex()
label = chr(body[32])
mempool_sequence = (
None if len(body) != 32 + 1 + 8 else struct.unpack("<Q", body[32 + 1 :])[0]
)
if mempool_sequence is not None:
assert label == "A" or label == "R"
else:
assert label == "D" or label == "C"
return (hash_str, label, mempool_sequence)
class ZMQTestSetupBlock:
"""Helper class for setting up a ZMQ test via the "sync up" procedure.
Generates a block on the specified node on instantiation and provides a
method to check whether a ZMQ notification matches, i.e. the event was
caused by this generated block. Assumes that a notification either contains
the generated block's hash, it's (coinbase) transaction id, the raw block or
raw transaction data.
"""
def __init__(self, test_framework, node):
self.block_hash = test_framework.generate(
node, 1, sync_fun=test_framework.no_op
)[0]
coinbase = node.getblock(self.block_hash, 2)["tx"][0]
self.tx_hash = coinbase["txid"]
self.raw_tx = coinbase["hex"]
self.raw_block = node.getblock(self.block_hash, 0)
def caused_notification(self, notification):
return (
self.block_hash in notification
or self.tx_hash in notification
or self.raw_block in notification
or self.raw_tx in notification
)
class ZMQTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.extra_args = [["-whitelist=noban@127.0.0.1"]] * self.num_nodes
def skip_test_if_missing_module(self):
self.skip_if_no_py3_zmq()
self.skip_if_no_bitcoind_zmq()
def run_test(self):
self.ctx = zmq.Context()
try:
self.test_basic()
self.test_sequence()
self.test_mempool_sync()
self.test_reorg()
self.test_multiple_interfaces()
finally:
# Destroy the ZMQ context.
self.log.debug("Destroying ZMQ context")
self.ctx.destroy(linger=None)
# Restart node with the specified zmq notifications enabled, subscribe to
# all of them and return the corresponding ZMQSubscriber objects.
def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True):
subscribers = []
for topic, address in services:
socket = self.ctx.socket(zmq.SUB)
subscribers.append(ZMQSubscriber(socket, topic.encode()))
self.restart_node(
0,
self.extra_args[0]
+ [f"-zmqpub{topic}={address}" for topic, address in services],
)
for i, sub in enumerate(subscribers):
sub.socket.connect(services[i][1])
# Ensure that all zmq publisher notification interfaces are ready by
# running the following "sync up" procedure:
# 1. Generate a block on the node
# 2. Try to receive the corresponding notification on all subscribers
# 3. If all subscribers get the message within the timeout (1 second),
# we are done, otherwise repeat starting from step 1
for sub in subscribers:
sub.socket.set(zmq.RCVTIMEO, 1000)
while True:
test_block = ZMQTestSetupBlock(self, self.nodes[0])
recv_failed = False
for sub in subscribers:
try:
while not test_block.caused_notification(sub.receive().hex()):
self.log.debug(
"Ignoring sync-up notification for previously generated"
" block."
)
except zmq.error.Again:
self.log.debug("Didn't receive sync-up notification, trying again.")
recv_failed = True
if not recv_failed:
self.log.debug("ZMQ sync-up completed, all subscribers are ready.")
break
# set subscriber's desired timeout for the test
for sub in subscribers:
sub.socket.set(zmq.RCVTIMEO, recv_timeout * 1000)
self.connect_nodes(0, 1)
if sync_blocks:
self.sync_blocks()
return subscribers
def test_basic(self):
# Invalid zmq arguments don't take down the node, see #17185.
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
address = "tcp://127.0.0.1:28332"
subs = self.setup_zmq_test(
[(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]]
)
hashblock = subs[0]
hashtx = subs[1]
rawblock = subs[2]
rawtx = subs[3]
num_blocks = 5
self.log.info("Generate {0} blocks (and {0} coinbase txes)".format(num_blocks))
genhashes = self.generatetoaddress(
self.nodes[0], num_blocks, ADDRESS_ECREG_UNSPENDABLE
)
for x in range(num_blocks):
# Should receive the coinbase txid.
txid = hashtx.receive()
# Should receive the coinbase raw transaction.
tx_hex = rawtx.receive()
tx = CTransaction()
tx.deserialize(BytesIO(tx_hex))
tx.calc_sha256()
assert_equal(tx.hash, txid.hex())
# Should receive the generated raw block.
block = rawblock.receive()
assert_equal(genhashes[x], hash256_reversed(block[:80]).hex())
# Should receive the generated block hash.
blockhash = hashblock.receive().hex()
assert_equal(genhashes[x], blockhash)
# The block should only have the coinbase txid.
assert_equal([txid.hex()], self.nodes[1].getblock(blockhash)["tx"])
if self.is_wallet_compiled():
self.log.info("Wait for tx from second node")
payment_txid = self.nodes[1].sendtoaddress(
self.nodes[0].getnewaddress(), 1000000
)
self.sync_all()
# Should receive the broadcasted txid.
txid = hashtx.receive()
assert_equal(payment_txid, txid.hex())
# Should receive the broadcasted raw transaction.
tx_hex = rawtx.receive()
assert_equal(payment_txid, hash256_reversed(tx_hex).hex())
# Mining the block with this tx should result in second notification
# after coinbase tx notification
self.generatetoaddress(self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE)
hashtx.receive()
txid = hashtx.receive()
assert_equal(payment_txid, txid.hex())
self.log.info("Test the getzmqnotifications RPC")
assert_equal(
self.nodes[0].getzmqnotifications(),
[
{"type": "pubhashblock", "address": address, "hwm": 1000},
{"type": "pubhashtx", "address": address, "hwm": 1000},
{"type": "pubrawblock", "address": address, "hwm": 1000},
{"type": "pubrawtx", "address": address, "hwm": 1000},
],
)
assert_equal(self.nodes[1].getzmqnotifications(), [])
def test_reorg(self):
if not self.is_wallet_compiled():
self.log.info("Skipping reorg test because wallet is disabled")
return
address = "tcp://127.0.0.1:28333"
# Should only notify the tip if a reorg occurs
hashblock, hashtx = self.setup_zmq_test(
[(topic, address) for topic in ["hashblock", "hashtx"]],
# 2 second timeout to check end of notifications
recv_timeout=2,
)
self.disconnect_nodes(0, 1)
# Generate 1 block in nodes[0] with 1 mempool tx and receive all
# notifications
payment_txid = self.nodes[0].sendtoaddress(
self.nodes[0].getnewaddress(), 1000000
)
+ # -> TransactionAddedToMempool
+ assert_equal(hashtx.receive().hex(), payment_txid)
+
disconnect_block = self.generatetoaddress(
self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE, sync_fun=self.no_op
)[0]
disconnect_cb = self.nodes[0].getblock(disconnect_block)["tx"][0]
assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex())
- assert_equal(hashtx.receive().hex(), payment_txid)
+ # -> BlockConnected
assert_equal(hashtx.receive().hex(), disconnect_cb)
+ assert_equal(hashtx.receive().hex(), payment_txid)
# Generate 2 blocks in nodes[1] to a different address to ensure split
connect_blocks = self.generatetoaddress(
self.nodes[1], 2, ADDRESS_ECREG_P2SH_OP_TRUE, sync_fun=self.no_op
)
# nodes[0] will reorg chain after connecting back nodes[1]
self.connect_nodes(0, 1)
# tx in mempool valid but not advertised
self.sync_blocks()
# Should receive nodes[1] tip
+ # -> UpdatedBlockTip
assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex())
# During reorg:
# Get old payment transaction notification from disconnect and
# disconnected cb
- assert_equal(hashtx.receive().hex(), payment_txid)
+ # -> BlockDisconnected
assert_equal(hashtx.receive().hex(), disconnect_cb)
- # And the payment transaction again due to mempool entry
assert_equal(hashtx.receive().hex(), payment_txid)
+ # And the payment transaction again due to mempool entry
+ # -> TransactionAddedToMempool
assert_equal(hashtx.receive().hex(), payment_txid)
# And the new connected coinbases
+ # -> BlockConnected
for i in [0, 1]:
assert_equal(
hashtx.receive().hex(),
self.nodes[1].getblock(connect_blocks[i])["tx"][0],
)
# If we do a simple invalidate we announce the disconnected coinbase
self.nodes[0].invalidateblock(connect_blocks[1])
+ # -> BlockDisconnected
assert_equal(
hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[1])["tx"][0]
)
# And the payment transaction again due to mempool entry (it was removed
# from the mempool due to the reorg)
+ # -> TransactionRemovedFromMempool
assert_equal(hashtx.receive().hex(), payment_txid)
# And the current tip
+ # -> UpdatedBlockTip
assert_equal(
hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0]
)
def create_conflicting_tx(self):
"""Create a transaction that is initially added to node0's mempool
and is then rejected by a transaction created and included into a
block by node1."""
utxo = self.nodes[1].listunspent()[0]
def send_conflicting_transaction(send_node):
"""Send a transaction using an identical utxo as input and
a different address as output each time the function is
called. Return the TxId."""
address = self.nodes[1].getnewaddress()
change_address = self.nodes[1].getrawchangeaddress()
tx = self.nodes[1].signrawtransactionwithwallet(
self.nodes[1].createrawtransaction(
inputs=[{"txid": utxo["txid"], "vout": utxo["vout"]}],
outputs=[
{address: 5_000_000},
{change_address: utxo["amount"] - 5_001_000},
],
)
)
return send_node.sendrawtransaction(tx["hex"])
self.disconnect_nodes(0, 1)
txid_to_be_replaced = send_conflicting_transaction(self.nodes[0])
replacement_txid = send_conflicting_transaction(self.nodes[1])
block_hash = self.generatetoaddress(
self.nodes[1], 1, ADDRESS_ECREG_P2SH_OP_TRUE, sync_fun=self.no_op
)[0]
self.connect_nodes(0, 1)
self.sync_all()
return block_hash, txid_to_be_replaced, replacement_txid
def test_sequence(self):
"""
Sequence zmq notifications give every blockhash and txhash in order
of processing, regardless of IBD, re-orgs, etc.
Format of messages:
<32-byte hash>C : Blockhash connected
<32-byte hash>D : Blockhash disconnected
<32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool
for non-block inclusion reason
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
"""
self.log.info("Testing 'sequence' publisher")
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
self.disconnect_nodes(0, 1)
# Mempool sequence number starts at 1
seq_num = 1
# Generate 1 block in nodes[0] and receive all notifications
dc_block = self.generatetoaddress(
self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE, sync_fun=self.no_op
)[0]
# Note: We are not notified of any block transactions, coinbase or
# mined
assert_equal(
(self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence()
)
# Generate 2 blocks in nodes[1] to a different address to ensure
# a chain split
self.generatetoaddress(
self.nodes[1], 2, ADDRESS_ECREG_P2SH_OP_TRUE, sync_fun=self.no_op
)
# nodes[0] will reorg chain after connecting back nodes[1]
self.connect_nodes(0, 1)
# Then we receive all block (dis)connect notifications for the
# 2 block reorg
assert_equal((dc_block, "D", None), seq.receive_sequence())
block_count = self.nodes[1].getblockcount()
assert_equal(
(self.nodes[1].getblockhash(block_count - 1), "C", None),
seq.receive_sequence(),
)
assert_equal(
(self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence()
)
# Rest of test requires wallet functionality
if self.is_wallet_compiled():
(
block_hash,
txid_to_be_replaced,
replacement_txid,
) = self.create_conflicting_tx()
self.log.info("Testing sequence notifications with mempool sequence values")
# Should receive the initially broadcasted txid.
assert_equal((txid_to_be_replaced, "A", seq_num), seq.receive_sequence())
seq_num += 1
self.log.info("Testing a tx removal notification")
# Next we receive a notification for the transaction removal
assert_equal((txid_to_be_replaced, "R", seq_num), seq.receive_sequence())
seq_num += 1
# Then we see the block notification
assert_equal((block_hash, "C", None), seq.receive_sequence())
# There is no sequence notification for the transaction that was
# never in node0's mempool, but it can be found in the block.
assert replacement_txid in self.nodes[0].getblock(block_hash)["tx"]
self.log.info("Wait for tx from second node")
payment_txid = self.nodes[1].sendtoaddress(
address=self.nodes[0].getnewaddress(), amount=5_000_000
)
self.sync_all()
assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
seq_num += 1
# Doesn't get published when mined, make a block and tx to "flush"
# the possibility though the mempool sequence number does go up by
# the number of transactions removed from the mempool by the block
# mining it.
mempool_size = len(self.nodes[0].getrawmempool())
c_block = self.generatetoaddress(
self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE
)[0]
# Make sure the number of mined transactions matches the number of
# txs out of mempool
mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool())
assert_equal(
len(self.nodes[0].getblock(c_block)["tx"]) - 1, mempool_size_delta
)
seq_num += mempool_size_delta
payment_txid_2 = self.nodes[1].sendtoaddress(
self.nodes[0].getnewaddress(), 1_000_000
)
self.sync_all()
assert_equal((c_block, "C", None), seq.receive_sequence())
assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence())
seq_num += 1
# Spot check getrawmempool results that they only show up when
# asked for
assert isinstance(self.nodes[0].getrawmempool(), list)
assert isinstance(self.nodes[0].getrawmempool(mempool_sequence=False), list)
assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True)
assert_raises_rpc_error(
-8,
"Verbose results cannot contain mempool sequence values.",
self.nodes[0].getrawmempool,
True,
True,
)
assert_equal(
self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"],
seq_num,
)
self.log.info("Testing reorg notifications")
# Manually invalidate the last block to test mempool re-entry
# N.B. This part could be made more lenient in exact ordering
# since it greatly depends on inner-workings of blocks/mempool
# during "deep" re-orgs. Probably should "re-construct"
# blockchain/mempool state from notifications instead.
block_count = self.nodes[0].getblockcount()
best_hash = self.nodes[0].getbestblockhash()
self.nodes[0].invalidateblock(best_hash)
# Bit of room to make sure transaction things happened
sleep(2)
# Make sure getrawmempool mempool_sequence results aren't "queued"
# but immediately reflective of the time they were gathered.
assert (
self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
> seq_num
)
assert_equal((payment_txid_2, "R", seq_num), seq.receive_sequence())
seq_num += 1
assert_equal((best_hash, "D", None), seq.receive_sequence())
assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
seq_num += 1
# Other things may happen but aren't wallet-deterministic so we
# don't test for them currently
self.nodes[0].reconsiderblock(best_hash)
self.generatetoaddress(self.nodes[1], 1, ADDRESS_ECREG_UNSPENDABLE)
self.log.info("Evict mempool transaction by block conflict")
orig_txid = self.nodes[0].sendtoaddress(
address=self.nodes[0].getnewaddress(), amount=1_000_000
)
# More to be simply mined
more_tx = []
for _ in range(5):
more_tx.append(
self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 100_000)
)
raw_tx = self.nodes[0].getrawtransaction(orig_txid)
block = create_block(
int(self.nodes[0].getbestblockhash(), 16),
create_coinbase(self.nodes[0].getblockcount() + 1),
)
tx = FromHex(CTransaction(), raw_tx)
block.vtx.append(tx)
for txid in more_tx:
tx = FromHex(CTransaction(), self.nodes[0].getrawtransaction(txid))
block.vtx.append(tx)
make_conform_to_ctor(block)
block.hashMerkleRoot = block.calc_merkle_root()
block.solve()
assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None)
tip = self.nodes[0].getbestblockhash()
assert_equal(int(tip, 16), block.sha256)
orig_txid_2 = self.nodes[0].sendtoaddress(
address=self.nodes[0].getnewaddress(), amount=1_000_000
)
# Flush old notifications until evicted tx original entry
(hash_str, label, mempool_seq) = seq.receive_sequence()
while hash_str != orig_txid:
(hash_str, label, mempool_seq) = seq.receive_sequence()
mempool_seq += 1
# Added original tx
assert_equal(label, "A")
# More transactions to be simply mined
for i in range(len(more_tx)):
assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence())
mempool_seq += 1
# Removed RBF tests
mempool_seq += 1
assert_equal((tip, "C", None), seq.receive_sequence())
mempool_seq += len(more_tx)
# Last tx
assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence())
mempool_seq += 1
self.generatetoaddress(self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE)
# want to make sure we didn't break "consensus" for other tests
self.sync_all()
def test_mempool_sync(self):
"""
Use sequence notification plus getrawmempool sequence results to
"sync mempool"
"""
if not self.is_wallet_compiled():
self.log.info("Skipping mempool sync test")
return
self.log.info("Testing 'mempool sync' usage of sequence notifier")
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
# In-memory counter, should always start at 1
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)[
"mempool_sequence"
]
assert_equal(next_mempool_seq, 1)
# Some transactions have been happening but we aren't consuming
# zmq notifications yet or we lost a ZMQ message somehow and want
# to start over
txids = []
num_txs = 5
for _ in range(num_txs):
txids.append(
self.nodes[1].sendtoaddress(
address=self.nodes[0].getnewaddress(), amount=1_000_000
)
)
self.sync_all()
# 1) Consume backlog until we get a mempool sequence number
(hash_str, label, zmq_mem_seq) = seq.receive_sequence()
while zmq_mem_seq is None:
(hash_str, label, zmq_mem_seq) = seq.receive_sequence()
assert label == "A"
assert hash_str is not None
# 2) We need to "seed" our view of the mempool
mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
mempool_view = set(mempool_snapshot["txids"])
get_raw_seq = mempool_snapshot["mempool_sequence"]
assert_equal(get_raw_seq, 6)
# Snapshot may be too old compared to zmq message we read off latest
while zmq_mem_seq >= get_raw_seq:
sleep(2)
mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
mempool_view = set(mempool_snapshot["txids"])
get_raw_seq = mempool_snapshot["mempool_sequence"]
# Things continue to happen in the "interim" while waiting for
# snapshot results
for _ in range(num_txs):
txids.append(
self.nodes[0].sendtoaddress(
address=self.nodes[0].getnewaddress(), amount=1_000_000
)
)
self.sync_all()
self.create_conflicting_tx()
self.generatetoaddress(self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE)
final_txid = self.nodes[0].sendtoaddress(
address=self.nodes[0].getnewaddress(), amount=100_000
)
# 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot
while True:
if zmq_mem_seq == get_raw_seq - 1:
break
(hash_str, label, mempool_sequence) = seq.receive_sequence()
if mempool_sequence is not None:
zmq_mem_seq = mempool_sequence
if zmq_mem_seq > get_raw_seq:
raise Exception(
"We somehow jumped mempool sequence numbers! "
f"zmq_mem_seq: {zmq_mem_seq} > "
f"get_raw_seq: {get_raw_seq}"
)
# 4) Moving forward, we apply the delta to our local view
# remaining txs + conflict (A, R, C) + 1 block connect + 1 final tx
expected_sequence = get_raw_seq
for _ in range(num_txs + 3 + 1 + 1):
(hash_str, label, mempool_sequence) = seq.receive_sequence()
if label == "A":
assert hash_str not in mempool_view
mempool_view.add(hash_str)
expected_sequence = mempool_sequence + 1
elif label == "R":
assert hash_str in mempool_view
mempool_view.remove(hash_str)
expected_sequence = mempool_sequence + 1
elif label == "C":
# (Attempt to) remove all txids from known block connects
block_txids = self.nodes[0].getblock(hash_str)["tx"][1:]
for txid in block_txids:
if txid in mempool_view:
expected_sequence += 1
mempool_view.remove(txid)
elif label == "D":
# Not useful for mempool tracking per se
continue
else:
raise Exception("Unexpected ZMQ sequence label!")
assert_equal(self.nodes[0].getrawmempool(), [final_txid])
assert_equal(
self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"],
expected_sequence,
)
# 5) If you miss a zmq/mempool sequence number, go back to step (2)
self.generatetoaddress(self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE)
def test_multiple_interfaces(self):
# Set up two subscribers with different addresses
# (note that after the reorg test, syncing would fail due to different
# chain lengths on node0 and node1; for this test we only need node0, so
# we can disable syncing blocks on the setup)
subscribers = self.setup_zmq_test(
[
("hashblock", "tcp://127.0.0.1:28334"),
("hashblock", "tcp://127.0.0.1:28335"),
],
sync_blocks=False,
)
# Generate 1 block in nodes[0] and receive all notifications
self.generatetoaddress(
self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE, sync_fun=self.no_op
)
# Should receive the same block hash on both subscribers
assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex())
assert_equal(self.nodes[0].getbestblockhash(), subscribers[1].receive().hex())
if __name__ == "__main__":
ZMQTest().main()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Apr 27, 12:51 (22 h, 59 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5573595
Default Alt Text
(29 KB)
Attached To
rABC Bitcoin ABC
Event Timeline
Log In to Comment