diff --git a/doc/release-notes/release-notes.md b/doc/release-notes/release-notes.md index abcd5087d..27eca228b 100644 --- a/doc/release-notes/release-notes.md +++ b/doc/release-notes/release-notes.md @@ -1,21 +1,23 @@ # Bitcoin ABC 0.24.4 Release Notes Bitcoin ABC version 0.24.4 is now available from: This release includes the following features and fixes: - Bitcoin ABC will no longer create an unnamed `""` wallet by default when no wallet is specified on the command line or in the configuration files. For backwards compatibility, if an unnamed `""` wallet already exists and would have been loaded previously, then it will still be loaded. Users without an unnamed `""` wallet and without any other wallets to be loaded on startup will be prompted to either choose a wallet to load, or to create a new wallet. - A new `send` RPC with similar syntax to `walletcreatefundedpsbt`, including support for coin selection and a custom fee rate. Using the new `send` method is encouraged: `sendmany` and `sendtoaddress` may be deprecated in a future release. - The `testmempoolaccept` RPC returns `size` and a `fee` object with the `base` fee if the transaction passes validation. - A "sequence" notifier is added to ZeroMQ notifications, enabling client-side mempool tracking. +- The same ZeroMQ notification (e.g. `-zmqpubhashtx=address`) can now be specified multiple + times to publish the same notification to different ZeroMQ sockets. diff --git a/doc/zmq.md b/doc/zmq.md index 025c85160..9ee13619a 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -1,152 +1,154 @@ # Block and Transaction Broadcasting with ZeroMQ [ZeroMQ](https://zeromq.org/) is a lightweight wrapper around TCP connections, inter-process communication, and shared-memory, providing various message-oriented semantics such as publish/subscribe, request/reply, and push/pull. The Bitcoin ABC daemon can be configured to act as a trusted "border router", implementing the bitcoin wire protocol and relay, making consensus decisions, maintaining the local blockchain database, broadcasting locally generated transactions into the network, and providing a queryable RPC interface to interact on a polled basis for requesting blockchain related data. However, there exists only a limited service to notify external software of events like the arrival of new blocks or transactions. The ZeroMQ facility implements a notification interface through a set of specific notifiers. Currently there are notifiers that publish blocks and transactions. This read-only facility requires only the connection of a corresponding ZeroMQ subscriber port in receiving software; it is not authenticated nor is there any two-way protocol involvement. Therefore, subscribers should validate the received data since it may be out of date, incomplete or even invalid. ZeroMQ sockets are self-connecting and self-healing; that is, connections made between two endpoints will be automatically restored after an outage, and either end may be freely started or stopped in any order. Because ZeroMQ is message oriented, subscribers receive transactions and blocks all-at-once and do not need to implement any sort of buffering or reassembly. ## Prerequisites The ZeroMQ feature in Bitcoin ABC requires the ZeroMQ API >= 4.1.5 [libzmq](https://github.com/zeromq/libzmq/releases). For version information, see [dependencies.md](dependencies.md). Typically, it is packaged by distributions as something like *libzmq3-dev*. The C++ wrapper for ZeroMQ is *not* needed. In order to run the example Python client scripts in the `contrib/zmq/` directory, one must also install [PyZMQ](https://github.com/zeromq/pyzmq) (generally with `pip install pyzmq`), though this is not necessary for daemon operation. ## Enabling By default, the ZeroMQ feature is automatically compiled. To disable, use -DBUILD_BITCOIN_ZMQ=OFF to `cmake` when building bitcoind: $ cmake -GNinja .. -DBUILD_BITCOIN_ZMQ=OFF [...] To actually enable operation, one must set the appropriate options on the command line or in the configuration file. ## Usage Currently, the following notifications are supported: -zmqpubhashtx=address -zmqpubhashblock=address -zmqpubrawblock=address -zmqpubrawtx=address -zmqpubsequence=address The socket type is PUB and the address must be a valid ZeroMQ socket address. The same address can be used in more than one notification. +The same notification can be specified more than once. The option to set the PUB socket's outbound message high water mark (SNDHWM) may be set individually for each notification: -zmqpubhashtxhwm=n -zmqpubhashblockhwm=n -zmqpubrawblockhwm=n -zmqpubrawtxhwm=n -zmqpubsequencehwm=address The high water mark value must be an integer greater than or equal to 0. For instance: $ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \ + -zmqpubhashtx=tcp://192.168.1.2:28332 \ -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw \ -zmqpubhashtxhwm=10000 Each PUB notification has a topic and body, where the header corresponds to the notification type. For instance, for the notification `-zmqpubhashtx` the topic is `hashtx` (no null terminator) and the body is the transaction hash (32 bytes) for all but `sequence` topic. For `sequence`, the body is structured as the following based on the type of message: <32-byte hash>C : Blockhash connected <32-byte hash>D : Blockhash disconnected <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool Where the 8-byte uints correspond to the mempool sequence number. These options can also be provided in bitcoin.conf. ZeroMQ endpoint specifiers for TCP (and others) are documented in the [ZeroMQ API](http://api.zeromq.org/4-0:_start). Client side, then, the ZeroMQ subscriber socket must have the ZMQ_SUBSCRIBE option set to one or either of these prefixes (for instance, just `hash`); without doing so will result in no messages arriving. Please see [`contrib/zmq/zmq_sub.py`](/contrib/zmq/zmq_sub.py) for a working example. The ZMQ_PUB socket's ZMQ_TCP_KEEPALIVE option is enabled. This means that the underlying SO_KEEPALIVE option is enabled when using a TCP transport. The effective TCP keepalive values are managed through the underlying operating system configuration and must be configured prior to connection establishment. For example, when running on GNU/Linux, one might use the following to lower the keepalive setting to 10 minutes: sudo sysctl -w net.ipv4.tcp_keepalive_time=600 Setting the keepalive values appropriately for your operating environment may improve connectivity in situations where long-lived connections are silently dropped by network middle boxes. ## Remarks From the perspective of bitcoind, the ZeroMQ socket is write-only; PUB sockets don't even have a read function. Thus, there is no state introduced into bitcoind directly. Furthermore, no information is broadcast that wasn't already received from the public P2P network. No authentication or authorization is done on connecting clients; it is assumed that the ZeroMQ port is exposed only to trusted entities, using other means such as firewalling. Note that for `*block` topics, when the block chain tip changes, a reorganisation may occur and just the tip will be notified. It is up to the subscriber to retrieve the chain from the last known block to the new tip. Also note that no notification will occur if the tip was in the active chain--as would be the case after calling invalidateblock RPC. In contrast, the `sequence` topic publishes all block connections and disconnections. There are several possibilities that ZMQ notification can get lost during transmission depending on the communication type you are using. Bitcoind appends an up-counting sequence number to each notification which allows listeners to detect lost notifications. The `sequence` topic refers specifically to the mempool sequence number, which is also published along with all mempool events. This is a different sequence value than in ZMQ itself in order to allow a total ordering of mempool events to be constructed. diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index 9296ea3a6..c96399bcc 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -1,208 +1,207 @@ // Copyright (c) 2015-2018 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr) {} CZMQNotificationInterface::~CZMQNotificationInterface() { Shutdown(); } std::list CZMQNotificationInterface::GetActiveNotifiers() const { std::list result; for (const auto &n : notifiers) { result.push_back(n.get()); } return result; } CZMQNotificationInterface *CZMQNotificationInterface::Create() { std::map factories; factories["pubhashblock"] = CZMQAbstractNotifier::Create; factories["pubhashtx"] = CZMQAbstractNotifier::Create; factories["pubrawblock"] = CZMQAbstractNotifier::Create; factories["pubrawtx"] = CZMQAbstractNotifier::Create; factories["pubsequence"] = CZMQAbstractNotifier::Create; std::list> notifiers; for (const auto &entry : factories) { std::string arg("-zmq" + entry.first); - if (gArgs.IsArgSet(arg)) { - const auto &factory = entry.second; - const std::string address = gArgs.GetArg(arg, ""); + const auto &factory = entry.second; + for (const std::string &address : gArgs.GetArgs(arg)) { std::unique_ptr notifier = factory(); notifier->SetType(entry.first); notifier->SetAddress(address); notifier->SetOutboundMessageHighWaterMark( static_cast(gArgs.GetArg( arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM))); notifiers.push_back(std::move(notifier)); } } if (!notifiers.empty()) { std::unique_ptr notificationInterface( new CZMQNotificationInterface()); notificationInterface->notifiers = std::move(notifiers); if (notificationInterface->Initialize()) { return notificationInterface.release(); } } return nullptr; } // Called at startup to conditionally set up ZMQ socket(s) bool CZMQNotificationInterface::Initialize() { int major = 0, minor = 0, patch = 0; zmq_version(&major, &minor, &patch); LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch); LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n"); assert(!pcontext); pcontext = zmq_ctx_new(); if (!pcontext) { zmqError("Unable to initialize context"); return false; } for (auto ¬ifier : notifiers) { if (notifier->Initialize(pcontext)) { LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); } else { LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); return false; } } return true; } // Called during shutdown sequence void CZMQNotificationInterface::Shutdown() { LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n"); if (pcontext) { for (auto ¬ifier : notifiers) { LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); notifier->Shutdown(); } zmq_ctx_term(pcontext); pcontext = nullptr; } } namespace { template void TryForEachAndRemoveFailed( std::list> ¬ifiers, const Function &func) { for (auto i = notifiers.begin(); i != notifiers.end();) { CZMQAbstractNotifier *notifier = i->get(); if (func(notifier)) { ++i; } else { notifier->Shutdown(); i = notifiers.erase(i); } } } } // anonymous namespace void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { // In IBD or blocks were disconnected without any new ones if (fInitialDownload || pindexNew == pindexFork) { return; } TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier *notifier) { return notifier->NotifyBlock(pindexNew); }); } void CZMQNotificationInterface::TransactionAddedToMempool( const CTransactionRef &ptx, uint64_t mempool_sequence) { const CTransaction &tx = *ptx; TryForEachAndRemoveFailed( notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier *notifier) { return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence); }); } void CZMQNotificationInterface::TransactionRemovedFromMempool( const CTransactionRef &ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence) { // Called for all non-block inclusion reasons const CTransaction &tx = *ptx; TryForEachAndRemoveFailed( notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier *notifier) { return notifier->NotifyTransactionRemoval(tx, mempool_sequence); }); } void CZMQNotificationInterface::BlockConnected( const std::shared_ptr &pblock, const CBlockIndex *pindexConnected) { for (const CTransactionRef &ptx : pblock->vtx) { const CTransaction &tx = *ptx; TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier *notifier) { return notifier->NotifyTransaction(tx); }); } // Next we notify BlockConnect listeners for *all* blocks TryForEachAndRemoveFailed( notifiers, [pindexConnected](CZMQAbstractNotifier *notifier) { return notifier->NotifyBlockConnect(pindexConnected); }); } void CZMQNotificationInterface::BlockDisconnected( const std::shared_ptr &pblock, const CBlockIndex *pindexDisconnected) { for (const CTransactionRef &ptx : pblock->vtx) { const CTransaction &tx = *ptx; TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier *notifier) { return notifier->NotifyTransaction(tx); }); } // Next we notify BlockDisconnect listeners for *all* blocks TryForEachAndRemoveFailed( notifiers, [pindexDisconnected](CZMQAbstractNotifier *notifier) { return notifier->NotifyBlockDisconnect(pindexDisconnected); }); } CZMQNotificationInterface *g_zmq_notification_interface = nullptr; diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index 644c7c6fc..4ed85f7fb 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -1,294 +1,297 @@ // Copyright (c) 2015-2016 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include static std::multimap mapPublishNotifiers; static const char *MSG_HASHBLOCK = "hashblock"; static const char *MSG_HASHTX = "hashtx"; static const char *MSG_RAWBLOCK = "rawblock"; static const char *MSG_RAWTX = "rawtx"; static const char *MSG_SEQUENCE = "sequence"; // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void *data, size_t size, ...) { va_list args; va_start(args, size); while (1) { zmq_msg_t msg; int rc = zmq_msg_init_size(&msg, size); if (rc != 0) { zmqError("Unable to initialize ZMQ msg"); va_end(args); return -1; } void *buf = zmq_msg_data(&msg); memcpy(buf, data, size); data = va_arg(args, const void *); rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0); if (rc == -1) { zmqError("Unable to send ZMQ msg"); zmq_msg_close(&msg); va_end(args); return -1; } zmq_msg_close(&msg); if (!data) { break; } size = va_arg(args, size_t); } va_end(args); return 0; } bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) { assert(!psocket); // check if address is being used by other publish notifier std::multimap::iterator i = mapPublishNotifiers.find(address); if (i == mapPublishNotifiers.end()) { psocket = zmq_socket(pcontext, ZMQ_PUB); if (!psocket) { zmqError("Failed to create socket"); return false; } LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark)); if (rc != 0) { zmqError("Failed to set outbound message high water mark"); zmq_close(psocket); return false; } const int so_keepalive_option{1}; rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, sizeof(so_keepalive_option)); if (rc != 0) { zmqError("Failed to set SO_KEEPALIVE"); zmq_close(psocket); return false; } rc = zmq_bind(psocket, address.c_str()); if (rc != 0) { zmqError("Failed to bind address"); zmq_close(psocket); return false; } // register this notifier for the address, so it can be reused for other // publish notifier mapPublishNotifiers.insert(std::make_pair(address, this)); return true; } else { LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address); LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); psocket = i->second->psocket; mapPublishNotifiers.insert(std::make_pair(address, this)); return true; } } void CZMQAbstractPublishNotifier::Shutdown() { // Early return if Initialize was not called if (!psocket) { return; } int count = mapPublishNotifiers.count(address); // remove this notifier from the list of publishers using this address typedef std::multimap::iterator iterator; std::pair iterpair = mapPublishNotifiers.equal_range(address); for (iterator it = iterpair.first; it != iterpair.second; ++it) { if (it->second == this) { mapPublishNotifiers.erase(it); break; } } if (count == 1) { LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address); int linger = 0; zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); zmq_close(psocket); } psocket = nullptr; } bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void *data, size_t size) { assert(psocket); /* send three parts, command & data & a LE 4byte sequence number */ uint8_t msgseq[sizeof(uint32_t)]; WriteLE32(&msgseq[0], nSequence); int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr); if (rc == -1) { return false; } /* increment memory only sequence number after sending */ nSequence++; return true; } bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { BlockHash hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), + this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) { data[31 - i] = hash.begin()[i]; } return SendZmqMessage(MSG_HASHBLOCK, data, 32); } bool CZMQPublishHashTransactionNotifier::NotifyTransaction( const CTransaction &transaction) { TxId txid = transaction.GetId(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", txid.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", txid.GetHex(), + this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) { data[31 - i] = txid.begin()[i]; } return SendZmqMessage(MSG_HASHTX, data, 32); } bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { - LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", - pindex->GetBlockHash().GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", + pindex->GetBlockHash().GetHex(), this->address); const Config &config = GetConfig(); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); { LOCK(cs_main); CBlock block; if (!ReadBlockFromDisk(block, pindex, config.GetChainParams().GetConsensus())) { zmqError("Can't read block from disk"); return false; } ss << block; } return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); } bool CZMQPublishRawTransactionNotifier::NotifyTransaction( const CTransaction &transaction) { TxId txid = transaction.GetId(); - LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", txid.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", txid.GetHex(), + this->address); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); ss << transaction; return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); } // TODO: Dedup this code to take label char, log string bool CZMQPublishSequenceNotifier::NotifyBlockConnect( const CBlockIndex *pindex) { BlockHash hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s\n", - hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", + hash.GetHex(), this->address); char data[sizeof(BlockHash) + 1]; for (unsigned int i = 0; i < sizeof(BlockHash); i++) { data[sizeof(BlockHash) - 1 - i] = hash.begin()[i]; } // Block (C)onnect data[sizeof(data) - 1] = 'C'; return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); } bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect( const CBlockIndex *pindex) { BlockHash hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s\n", - hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", + hash.GetHex(), this->address); char data[sizeof(BlockHash) + 1]; for (unsigned int i = 0; i < sizeof(BlockHash); i++) { data[sizeof(BlockHash) - 1 - i] = hash.begin()[i]; } // Block (D)isconnect data[sizeof(data) - 1] = 'D'; return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); } bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance( const CTransaction &transaction, uint64_t mempool_sequence) { TxId txid = transaction.GetId(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s\n", - txid.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", + txid.GetHex(), this->address); uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1]; for (unsigned int i = 0; i < sizeof(TxId); i++) { data[sizeof(TxId) - 1 - i] = txid.begin()[i]; } // Mempool (A)cceptance data[sizeof(TxId)] = 'A'; WriteLE64(data + sizeof(TxId) + 1, mempool_sequence); return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); } bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval( const CTransaction &transaction, uint64_t mempool_sequence) { TxId txid = transaction.GetId(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s\n", - txid.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", + txid.GetHex(), this->address); uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1]; for (unsigned int i = 0; i < sizeof(TxId); i++) { data[sizeof(TxId) - 1 - i] = txid.begin()[i]; } // Mempool (R)emoval data[sizeof(TxId)] = 'R'; WriteLE64(data + sizeof(TxId) + 1, mempool_sequence); return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); } diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index e529b3a41..add6fddfe 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -1,624 +1,653 @@ #!/usr/bin/env python3 # Copyright (c) 2015-2019 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the ZMQ notification interface.""" import struct from io import BytesIO from time import sleep from test_framework.address import ( ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE, ) from test_framework.blocktools import ( create_block, create_coinbase, make_conform_to_ctor, ) from test_framework.messages import CTransaction, FromHex, hash256 from test_framework.test_framework import BitcoinTestFramework from test_framework.util import ( assert_equal, assert_raises_rpc_error, connect_nodes, disconnect_nodes, ) # Test may be skipped and not have zmq installed try: import zmq except ImportError: pass def hash256_reversed(byte_str): return hash256(byte_str)[::-1] class ZMQSubscriber: def __init__(self, socket, topic): self.sequence = 0 self.socket = socket self.topic = topic self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) def receive(self): topic, body, seq = self.socket.recv_multipart() # Topic should match the subscriber topic. assert_equal(topic, self.topic) # Sequence should be incremental. assert_equal(struct.unpack('C : Blockhash connected <32-byte hash>D : Blockhash disconnected <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool """ self.log.info("Testing 'sequence' publisher") address = 'tcp://127.0.0.1:28333' socket = self.ctx.socket(zmq.SUB) socket.set(zmq.RCVTIMEO, 60000) seq = ZMQSubscriber(socket, b'sequence') self.restart_node( 0, self.extra_args[0] + [f'-zmqpub{seq.topic.decode()}={address}']) socket.connect(address) # Relax so that the subscriber is ready before publishing zmq messages sleep(0.2) # Mempool sequence number starts at 1 seq_num = 1 # Generate 1 block in nodes[0] and receive all notifications dc_block = self.nodes[0].generatetoaddress( 1, ADDRESS_ECREG_UNSPENDABLE)[0] # Note: We are not notified of any block transactions, coinbase or # mined assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence()) # Generate 2 blocks in nodes[1] to a different address to ensure # a chain split self.nodes[1].generatetoaddress(2, ADDRESS_ECREG_P2SH_OP_TRUE) # nodes[0] will reorg chain after connecting back nodes[1] connect_nodes(self.nodes[0], self.nodes[1]) # Then we receive all block (dis)connect notifications for the # 2 block reorg assert_equal((dc_block, "D", None), seq.receive_sequence()) block_count = self.nodes[1].getblockcount() assert_equal((self.nodes[1].getblockhash(block_count - 1), "C", None), seq.receive_sequence()) assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence()) # Rest of test requires wallet functionality if self.is_wallet_compiled(): (block_hash, txid_to_be_replaced, replacement_txid ) = self.create_conflicting_tx() self.log.info( "Testing sequence notifications with mempool sequence values") # Should receive the initially broadcasted txid. assert_equal((txid_to_be_replaced, "A", seq_num), seq.receive_sequence()) seq_num += 1 self.log.info("Testing a tx removal notification") # Next we receive a notification for the transaction removal assert_equal((txid_to_be_replaced, "R", seq_num), seq.receive_sequence()) seq_num += 1 # Then we see the block notification assert_equal((block_hash, "C", None), seq.receive_sequence()) # There is no sequence notification for the transaction that was # never in node0's mempool, but it can be found in the block. assert replacement_txid in self.nodes[0].getblock(block_hash)["tx"] self.log.info("Wait for tx from second node") payment_txid = self.nodes[1].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=5_000_000) self.sync_all() assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) seq_num += 1 # Doesn't get published when mined, make a block and tx to "flush" # the possibility though the mempool sequence number does go up by # the number of transactions removed from the mempool by the block # mining it. mempool_size = len(self.nodes[0].getrawmempool()) c_block = self.nodes[0].generatetoaddress( 1, ADDRESS_ECREG_UNSPENDABLE)[0] self.sync_all() # Make sure the number of mined transactions matches the number of # txs out of mempool mempool_size_delta = mempool_size - \ len(self.nodes[0].getrawmempool()) assert_equal(len(self.nodes[0].getblock(c_block)["tx"]) - 1, mempool_size_delta) seq_num += mempool_size_delta payment_txid_2 = self.nodes[1].sendtoaddress( self.nodes[0].getnewaddress(), 1_000_000) self.sync_all() assert_equal((c_block, "C", None), seq.receive_sequence()) assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence()) seq_num += 1 # Spot check getrawmempool results that they only show up when # asked for assert isinstance(self.nodes[0].getrawmempool(), list) assert isinstance( self.nodes[0].getrawmempool(mempool_sequence=False), list) assert "mempool_sequence" not in self.nodes[0].getrawmempool( verbose=True) assert_raises_rpc_error( -8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True) assert_equal(self.nodes[0].getrawmempool( mempool_sequence=True)["mempool_sequence"], seq_num) self.log.info("Testing reorg notifications") # Manually invalidate the last block to test mempool re-entry # N.B. This part could be made more lenient in exact ordering # since it greatly depends on inner-workings of blocks/mempool # during "deep" re-orgs. Probably should "re-construct" # blockchain/mempool state from notifications instead. block_count = self.nodes[0].getblockcount() best_hash = self.nodes[0].getbestblockhash() self.nodes[0].invalidateblock(best_hash) # Bit of room to make sure transaction things happened sleep(2) # Make sure getrawmempool mempool_sequence results aren't "queued" # but immediately reflective of the time they were gathered. assert self.nodes[0].getrawmempool( mempool_sequence=True)["mempool_sequence"] > seq_num assert_equal((best_hash, "D", None), seq.receive_sequence()) assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) seq_num += 1 # Other things may happen but aren't wallet-deterministic so we # don't test for them currently self.nodes[0].reconsiderblock(best_hash) self.nodes[1].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) self.sync_all() self.log.info("Evict mempool transaction by block conflict") orig_txid = self.nodes[0].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=1_000_000) # More to be simply mined more_tx = [] for _ in range(5): more_tx.append(self.nodes[0].sendtoaddress( self.nodes[0].getnewaddress(), 100_000)) raw_tx = self.nodes[0].getrawtransaction(orig_txid) block = create_block( int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount() + 1)) tx = FromHex(CTransaction(), raw_tx) block.vtx.append(tx) for txid in more_tx: tx = FromHex(CTransaction(), self.nodes[0].getrawtransaction(txid)) block.vtx.append(tx) make_conform_to_ctor(block) block.hashMerkleRoot = block.calc_merkle_root() block.solve() assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None) tip = self.nodes[0].getbestblockhash() assert_equal(int(tip, 16), block.sha256) orig_txid_2 = self.nodes[0].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=1_000_000) # Flush old notifications until evicted tx original entry (hash_str, label, mempool_seq) = seq.receive_sequence() while hash_str != orig_txid: (hash_str, label, mempool_seq) = seq.receive_sequence() mempool_seq += 1 # Added original tx assert_equal(label, "A") # More transactions to be simply mined for i in range(len(more_tx)): assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence()) mempool_seq += 1 # Removed RBF tests mempool_seq += 1 assert_equal((tip, "C", None), seq.receive_sequence()) mempool_seq += len(more_tx) # Last tx assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence()) mempool_seq += 1 self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) # want to make sure we didn't break "consensus" for other tests self.sync_all() def test_mempool_sync(self): """ Use sequence notification plus getrawmempool sequence results to "sync mempool" """ if not self.is_wallet_compiled(): self.log.info("Skipping mempool sync test") return self.log.info("Testing 'mempool sync' usage of sequence notifier") address = 'tcp://127.0.0.1:28333' socket = self.ctx.socket(zmq.SUB) socket.set(zmq.RCVTIMEO, 60000) seq = ZMQSubscriber(socket, b'sequence') self.restart_node( 0, self.extra_args[0] + [f'-zmqpub{seq.topic.decode()}={address}']) connect_nodes(self.nodes[0], self.nodes[1]) socket.connect(address) # Relax so that the subscriber is ready before publishing zmq messages sleep(0.2) # In-memory counter, should always start at 1 next_mempool_seq = self.nodes[0].getrawmempool( mempool_sequence=True)["mempool_sequence"] assert_equal(next_mempool_seq, 1) # Some transactions have been happening but we aren't consuming # zmq notifications yet or we lost a ZMQ message somehow and want # to start over txids = [] num_txs = 5 for _ in range(num_txs): txids.append(self.nodes[1].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=1_000_000)) self.sync_all() # 1) Consume backlog until we get a mempool sequence number (hash_str, label, zmq_mem_seq) = seq.receive_sequence() while zmq_mem_seq is None: (hash_str, label, zmq_mem_seq) = seq.receive_sequence() assert label == "A" assert hash_str is not None # 2) We need to "seed" our view of the mempool mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) mempool_view = set(mempool_snapshot["txids"]) get_raw_seq = mempool_snapshot["mempool_sequence"] assert_equal(get_raw_seq, 6) # Snapshot may be too old compared to zmq message we read off latest while zmq_mem_seq >= get_raw_seq: sleep(2) mempool_snapshot = self.nodes[0].getrawmempool( mempool_sequence=True) mempool_view = set(mempool_snapshot["txids"]) get_raw_seq = mempool_snapshot["mempool_sequence"] # Things continue to happen in the "interim" while waiting for # snapshot results for _ in range(num_txs): txids.append(self.nodes[0].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=1_000_000)) self.sync_all() self.create_conflicting_tx() self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) final_txid = self.nodes[0].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=100_000) # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot while True: if zmq_mem_seq == get_raw_seq - 1: break (hash_str, label, mempool_sequence) = seq.receive_sequence() if mempool_sequence is not None: zmq_mem_seq = mempool_sequence if zmq_mem_seq > get_raw_seq: raise Exception( f"We somehow jumped mempool sequence numbers! " f"zmq_mem_seq: {zmq_mem_seq} > " f"get_raw_seq: {get_raw_seq}") # 4) Moving forward, we apply the delta to our local view # remaining txs + conflict (A, R, C) + 1 block connect + 1 final tx expected_sequence = get_raw_seq for _ in range(num_txs + 3 + 1 + 1): (hash_str, label, mempool_sequence) = seq.receive_sequence() if label == "A": assert hash_str not in mempool_view mempool_view.add(hash_str) expected_sequence = mempool_sequence + 1 elif label == "R": assert hash_str in mempool_view mempool_view.remove(hash_str) expected_sequence = mempool_sequence + 1 elif label == "C": # (Attempt to) remove all txids from known block connects block_txids = self.nodes[0].getblock(hash_str)["tx"][1:] for txid in block_txids: if txid in mempool_view: expected_sequence += 1 mempool_view.remove(txid) elif label == "D": # Not useful for mempool tracking per se continue else: raise Exception("Unexpected ZMQ sequence label!") assert_equal(self.nodes[0].getrawmempool(), [final_txid]) assert_equal( self.nodes[0].getrawmempool( mempool_sequence=True)["mempool_sequence"], expected_sequence) # 5) If you miss a zmq/mempool sequence number, go back to step (2) self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) + def test_multiple_interfaces(self): + # Set up two subscribers with different addresses + subscribers = [] + for i in range(2): + address = f"tcp://127.0.0.1:{28334 + i}" + socket = self.ctx.socket(zmq.SUB) + socket.set(zmq.RCVTIMEO, 60000) + hashblock = ZMQSubscriber(socket, b"hashblock") + socket.connect(address) + subscribers.append({'address': address, 'hashblock': hashblock}) + + self.restart_node( + 0, + [f'-zmqpub{subscriber["hashblock"].topic.decode()}={subscriber["address"]}' + for subscriber in subscribers]) + + # Relax so that the subscriber is ready before publishing zmq messages + sleep(0.2) + + # Generate 1 block in nodes[0] and receive all notifications + self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) + + # Should receive the same block hash on both subscribers + assert_equal(self.nodes[0].getbestblockhash(), + subscribers[0]['hashblock'].receive().hex()) + assert_equal(self.nodes[0].getbestblockhash(), + subscribers[1]['hashblock'].receive().hex()) + if __name__ == '__main__': ZMQTest().main()