diff --git a/doc/release-notes/release-notes.md b/doc/release-notes/release-notes.md --- a/doc/release-notes/release-notes.md +++ b/doc/release-notes/release-notes.md @@ -19,3 +19,5 @@ if the transaction passes validation. - A "sequence" notifier is added to ZeroMQ notifications, enabling client-side mempool tracking. +- The same ZeroMQ notification (e.g. `-zmqpubhashtx=address`) can now be specified multiple + times to publish the same notification to different ZeroMQ sockets. diff --git a/doc/zmq.md b/doc/zmq.md --- a/doc/zmq.md +++ b/doc/zmq.md @@ -66,6 +66,7 @@ The socket type is PUB and the address must be a valid ZeroMQ socket address. The same address can be used in more than one notification. +The same notification can be specified more than once. The option to set the PUB socket's outbound message high water mark (SNDHWM) may be set individually for each notification: @@ -81,6 +82,7 @@ For instance: $ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \ + -zmqpubhashtx=tcp://192.168.1.2:28332 \ -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw \ -zmqpubhashtxhwm=10000 diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -42,9 +42,8 @@ std::list> notifiers; for (const auto &entry : factories) { std::string arg("-zmq" + entry.first); - if (gArgs.IsArgSet(arg)) { - const auto &factory = entry.second; - const std::string address = gArgs.GetArg(arg, ""); + const auto &factory = entry.second; + for (const std::string &address : gArgs.GetArgs(arg)) { std::unique_ptr notifier = factory(); notifier->SetType(entry.first); notifier->SetAddress(address); diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -185,7 +185,8 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { BlockHash hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), + this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) { data[31 - i] = hash.begin()[i]; @@ -196,7 +197,8 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction( const CTransaction &transaction) { TxId txid = transaction.GetId(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", txid.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", txid.GetHex(), + this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) { data[31 - i] = txid.begin()[i]; @@ -205,8 +207,8 @@ } bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { - LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", - pindex->GetBlockHash().GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", + pindex->GetBlockHash().GetHex(), this->address); const Config &config = GetConfig(); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); @@ -228,7 +230,8 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction( const CTransaction &transaction) { TxId txid = transaction.GetId(); - LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", txid.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", txid.GetHex(), + this->address); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); ss << transaction; return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); @@ -238,8 +241,8 @@ bool CZMQPublishSequenceNotifier::NotifyBlockConnect( const CBlockIndex *pindex) { BlockHash hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s\n", - hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", + hash.GetHex(), this->address); char data[sizeof(BlockHash) + 1]; for (unsigned int i = 0; i < sizeof(BlockHash); i++) { data[sizeof(BlockHash) - 1 - i] = hash.begin()[i]; @@ -252,8 +255,8 @@ bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect( const CBlockIndex *pindex) { BlockHash hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s\n", - hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", + hash.GetHex(), this->address); char data[sizeof(BlockHash) + 1]; for (unsigned int i = 0; i < sizeof(BlockHash); i++) { data[sizeof(BlockHash) - 1 - i] = hash.begin()[i]; @@ -266,8 +269,8 @@ bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance( const CTransaction &transaction, uint64_t mempool_sequence) { TxId txid = transaction.GetId(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s\n", - txid.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", + txid.GetHex(), this->address); uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1]; for (unsigned int i = 0; i < sizeof(TxId); i++) { data[sizeof(TxId) - 1 - i] = txid.begin()[i]; @@ -281,8 +284,8 @@ bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval( const CTransaction &transaction, uint64_t mempool_sequence) { TxId txid = transaction.GetId(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s\n", - txid.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", + txid.GetHex(), this->address); uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1]; for (unsigned int i = 0; i < sizeof(TxId); i++) { data[sizeof(TxId) - 1 - i] = txid.begin()[i]; diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -87,6 +87,7 @@ self.test_sequence() self.test_mempool_sync() self.test_reorg() + self.test_multiple_interfaces() finally: # Destroy the ZMQ context. self.log.debug("Destroying ZMQ context") @@ -619,6 +620,34 @@ self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) + def test_multiple_interfaces(self): + # Set up two subscribers with different addresses + subscribers = [] + for i in range(2): + address = f"tcp://127.0.0.1:{28334 + i}" + socket = self.ctx.socket(zmq.SUB) + socket.set(zmq.RCVTIMEO, 60000) + hashblock = ZMQSubscriber(socket, b"hashblock") + socket.connect(address) + subscribers.append({'address': address, 'hashblock': hashblock}) + + self.restart_node( + 0, + [f'-zmqpub{subscriber["hashblock"].topic.decode()}={subscriber["address"]}' + for subscriber in subscribers]) + + # Relax so that the subscriber is ready before publishing zmq messages + sleep(0.2) + + # Generate 1 block in nodes[0] and receive all notifications + self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) + + # Should receive the same block hash on both subscribers + assert_equal(self.nodes[0].getbestblockhash(), + subscribers[0]['hashblock'].receive().hex()) + assert_equal(self.nodes[0].getbestblockhash(), + subscribers[1]['hashblock'].receive().hex()) + if __name__ == '__main__': ZMQTest().main()