diff --git a/src/zmq/CMakeLists.txt b/src/zmq/CMakeLists.txt index 61a9871c4..fa7d13772 100644 --- a/src/zmq/CMakeLists.txt +++ b/src/zmq/CMakeLists.txt @@ -1,17 +1,18 @@ # Copyright (c) 2017 The Bitcoin developers project(zmq) add_library(zmq zmqabstractnotifier.cpp zmqnotificationinterface.cpp zmqpublishnotifier.cpp zmqrpc.cpp + zmqutil.cpp ) find_package(ZeroMQ 4.1.5 REQUIRED) target_link_libraries(zmq util ZeroMQ::zmq) if(${CMAKE_SYSTEM_NAME} MATCHES "Windows") target_compile_definitions(zmq PUBLIC ZMQ_STATIC) endif() diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp index 78de4afe6..ad9de1df3 100644 --- a/src/zmq/zmqabstractnotifier.cpp +++ b/src/zmq/zmqabstractnotifier.cpp @@ -1,20 +1,22 @@ // Copyright (c) 2015 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include +#include + const int CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM; CZMQAbstractNotifier::~CZMQAbstractNotifier() { assert(!psocket); } bool CZMQAbstractNotifier::NotifyBlock(const CBlockIndex * /*CBlockIndex*/) { return true; } bool CZMQAbstractNotifier::NotifyTransaction( const CTransaction & /*transaction*/) { return true; } diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h index 81e2068f1..8dd3c9299 100644 --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -1,57 +1,57 @@ // Copyright (c) 2015 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H #define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H -#include - #include +#include class CBlockIndex; +class CTransaction; class CZMQAbstractNotifier; using CZMQNotifierFactory = std::unique_ptr (*)(); class CZMQAbstractNotifier { public: static const int DEFAULT_ZMQ_SNDHWM{1000}; CZMQAbstractNotifier() : psocket(nullptr), outbound_message_high_water_mark(DEFAULT_ZMQ_SNDHWM) {} virtual ~CZMQAbstractNotifier(); template static std::unique_ptr Create() { return std::make_unique(); } std::string GetType() const { return type; } void SetType(const std::string &t) { type = t; } std::string GetAddress() const { return address; } void SetAddress(const std::string &a) { address = a; } int GetOutboundMessageHighWaterMark() const { return outbound_message_high_water_mark; } void SetOutboundMessageHighWaterMark(const int sndhwm) { if (sndhwm >= 0) { outbound_message_high_water_mark = sndhwm; } } virtual bool Initialize(void *pcontext) = 0; virtual void Shutdown() = 0; virtual bool NotifyBlock(const CBlockIndex *pindex); virtual bool NotifyTransaction(const CTransaction &transaction); protected: void *psocket; std::string type; std::string address; int outbound_message_high_water_mark; // aka SNDHWM }; #endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H diff --git a/src/zmq/zmqconfig.h b/src/zmq/zmqconfig.h deleted file mode 100644 index 1cf5d4e68..000000000 --- a/src/zmq/zmqconfig.h +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) 2014 The Bitcoin Core developers -// Distributed under the MIT software license, see the accompanying -// file COPYING or http://www.opensource.org/licenses/mit-license.php. - -#ifndef BITCOIN_ZMQ_ZMQCONFIG_H -#define BITCOIN_ZMQ_ZMQCONFIG_H - -#if defined(HAVE_CONFIG_H) -#include -#endif - -#if ENABLE_ZMQ -#include -#endif - -#include - -void zmqError(const char *str); - -#endif // BITCOIN_ZMQ_ZMQCONFIG_H diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index 19043bf84..6300edfa6 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -1,179 +1,177 @@ // Copyright (c) 2015-2018 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include +#include + +#include #include #include -void zmqError(const char *str) { - LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, - zmq_strerror(errno)); -} - CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr) {} CZMQNotificationInterface::~CZMQNotificationInterface() { Shutdown(); } std::list CZMQNotificationInterface::GetActiveNotifiers() const { std::list result; for (const auto &n : notifiers) { result.push_back(n.get()); } return result; } CZMQNotificationInterface *CZMQNotificationInterface::Create() { std::map factories; factories["pubhashblock"] = CZMQAbstractNotifier::Create; factories["pubhashtx"] = CZMQAbstractNotifier::Create; factories["pubrawblock"] = CZMQAbstractNotifier::Create; factories["pubrawtx"] = CZMQAbstractNotifier::Create; std::list> notifiers; for (const auto &entry : factories) { std::string arg("-zmq" + entry.first); if (gArgs.IsArgSet(arg)) { const auto &factory = entry.second; const std::string address = gArgs.GetArg(arg, ""); std::unique_ptr notifier = factory(); notifier->SetType(entry.first); notifier->SetAddress(address); notifier->SetOutboundMessageHighWaterMark( static_cast(gArgs.GetArg( arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM))); notifiers.push_back(std::move(notifier)); } } if (!notifiers.empty()) { std::unique_ptr notificationInterface( new CZMQNotificationInterface()); notificationInterface->notifiers = std::move(notifiers); if (notificationInterface->Initialize()) { return notificationInterface.release(); } } return nullptr; } // Called at startup to conditionally set up ZMQ socket(s) bool CZMQNotificationInterface::Initialize() { int major = 0, minor = 0, patch = 0; zmq_version(&major, &minor, &patch); LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch); LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n"); assert(!pcontext); pcontext = zmq_ctx_new(); if (!pcontext) { zmqError("Unable to initialize context"); return false; } for (auto ¬ifier : notifiers) { if (notifier->Initialize(pcontext)) { LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); } else { LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); return false; } } return true; } // Called during shutdown sequence void CZMQNotificationInterface::Shutdown() { LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n"); if (pcontext) { for (auto ¬ifier : notifiers) { LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); notifier->Shutdown(); } zmq_ctx_term(pcontext); pcontext = nullptr; } } namespace { template void TryForEachAndRemoveFailed( std::list> ¬ifiers, const Function &func) { for (auto i = notifiers.begin(); i != notifiers.end();) { CZMQAbstractNotifier *notifier = i->get(); if (func(notifier)) { ++i; } else { notifier->Shutdown(); i = notifiers.erase(i); } } } } // anonymous namespace void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { // In IBD or blocks were disconnected without any new ones if (fInitialDownload || pindexNew == pindexFork) { return; } TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier *notifier) { return notifier->NotifyBlock(pindexNew); }); } void CZMQNotificationInterface::TransactionAddedToMempool( const CTransactionRef &ptx) { // Used by BlockConnected and BlockDisconnected as well, because they're all // the same external callback. const CTransaction &tx = *ptx; TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier *notifier) { return notifier->NotifyTransaction(tx); }); } void CZMQNotificationInterface::BlockConnected( const std::shared_ptr &pblock, const CBlockIndex *pindexConnected) { for (const CTransactionRef &ptx : pblock->vtx) { // Do a normal notify for each transaction added in the block TransactionAddedToMempool(ptx); } } void CZMQNotificationInterface::BlockDisconnected( const std::shared_ptr &pblock, const CBlockIndex *pindexDisconnected) { for (const CTransactionRef &ptx : pblock->vtx) { // Do a normal notify for each transaction removed in block // disconnection TransactionAddedToMempool(ptx); } } CZMQNotificationInterface *g_zmq_notification_interface = nullptr; diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index c412f9626..d90ecc6c0 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -1,222 +1,224 @@ // Copyright (c) 2015-2016 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include #include +#include + #include #include #include #include #include #include static std::multimap mapPublishNotifiers; static const char *MSG_HASHBLOCK = "hashblock"; static const char *MSG_HASHTX = "hashtx"; static const char *MSG_RAWBLOCK = "rawblock"; static const char *MSG_RAWTX = "rawtx"; // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void *data, size_t size, ...) { va_list args; va_start(args, size); while (1) { zmq_msg_t msg; int rc = zmq_msg_init_size(&msg, size); if (rc != 0) { zmqError("Unable to initialize ZMQ msg"); va_end(args); return -1; } void *buf = zmq_msg_data(&msg); memcpy(buf, data, size); data = va_arg(args, const void *); rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0); if (rc == -1) { zmqError("Unable to send ZMQ msg"); zmq_msg_close(&msg); va_end(args); return -1; } zmq_msg_close(&msg); if (!data) { break; } size = va_arg(args, size_t); } va_end(args); return 0; } bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) { assert(!psocket); // check if address is being used by other publish notifier std::multimap::iterator i = mapPublishNotifiers.find(address); if (i == mapPublishNotifiers.end()) { psocket = zmq_socket(pcontext, ZMQ_PUB); if (!psocket) { zmqError("Failed to create socket"); return false; } LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark)); if (rc != 0) { zmqError("Failed to set outbound message high water mark"); zmq_close(psocket); return false; } rc = zmq_bind(psocket, address.c_str()); if (rc != 0) { zmqError("Failed to bind address"); zmq_close(psocket); return false; } // register this notifier for the address, so it can be reused for other // publish notifier mapPublishNotifiers.insert(std::make_pair(address, this)); return true; } else { LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address); LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); psocket = i->second->psocket; mapPublishNotifiers.insert(std::make_pair(address, this)); return true; } } void CZMQAbstractPublishNotifier::Shutdown() { // Early return if Initialize was not called if (!psocket) { return; } int count = mapPublishNotifiers.count(address); // remove this notifier from the list of publishers using this address typedef std::multimap::iterator iterator; std::pair iterpair = mapPublishNotifiers.equal_range(address); for (iterator it = iterpair.first; it != iterpair.second; ++it) { if (it->second == this) { mapPublishNotifiers.erase(it); break; } } if (count == 1) { LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address); int linger = 0; zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); zmq_close(psocket); } psocket = nullptr; } bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void *data, size_t size) { assert(psocket); /* send three parts, command & data & a LE 4byte sequence number */ uint8_t msgseq[sizeof(uint32_t)]; WriteLE32(&msgseq[0], nSequence); int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr); if (rc == -1) { return false; } /* increment memory only sequence number after sending */ nSequence++; return true; } bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { BlockHash hash = pindex->GetBlockHash(); LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex()); char data[32]; for (unsigned int i = 0; i < 32; i++) { data[31 - i] = hash.begin()[i]; } return SendMessage(MSG_HASHBLOCK, data, 32); } bool CZMQPublishHashTransactionNotifier::NotifyTransaction( const CTransaction &transaction) { TxId txid = transaction.GetId(); LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", txid.GetHex()); char data[32]; for (unsigned int i = 0; i < 32; i++) { data[31 - i] = txid.begin()[i]; } return SendMessage(MSG_HASHTX, data, 32); } bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex()); const Config &config = GetConfig(); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); { LOCK(cs_main); CBlock block; if (!ReadBlockFromDisk(block, pindex, config.GetChainParams().GetConsensus())) { zmqError("Can't read block from disk"); return false; } ss << block; } return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); } bool CZMQPublishRawTransactionNotifier::NotifyTransaction( const CTransaction &transaction) { TxId txid = transaction.GetId(); LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", txid.GetHex()); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); ss << transaction; return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); } diff --git a/src/zmq/zmqutil.cpp b/src/zmq/zmqutil.cpp new file mode 100644 index 000000000..1fa848746 --- /dev/null +++ b/src/zmq/zmqutil.cpp @@ -0,0 +1,14 @@ +// Copyright (c) 2014-2018 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include + +#include + +void zmqError(const char *str) { + LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, + zmq_strerror(errno)); +} diff --git a/src/zmq/zmqutil.h b/src/zmq/zmqutil.h new file mode 100644 index 000000000..792cf522d --- /dev/null +++ b/src/zmq/zmqutil.h @@ -0,0 +1,10 @@ +// Copyright (c) 2014-2018 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQUTIL_H +#define BITCOIN_ZMQ_ZMQUTIL_H + +void zmqError(const char *str); + +#endif // BITCOIN_ZMQ_ZMQUTIL_H