diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h index 421ef452a..81e2068f1 100644 --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -1,54 +1,57 @@ // Copyright (c) 2015 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H #define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H #include +#include + class CBlockIndex; class CZMQAbstractNotifier; -typedef CZMQAbstractNotifier *(*CZMQNotifierFactory)(); +using CZMQNotifierFactory = std::unique_ptr (*)(); class CZMQAbstractNotifier { public: static const int DEFAULT_ZMQ_SNDHWM{1000}; CZMQAbstractNotifier() : psocket(nullptr), outbound_message_high_water_mark(DEFAULT_ZMQ_SNDHWM) {} virtual ~CZMQAbstractNotifier(); - template static CZMQAbstractNotifier *Create() { - return new T(); + template + static std::unique_ptr Create() { + return std::make_unique(); } std::string GetType() const { return type; } void SetType(const std::string &t) { type = t; } std::string GetAddress() const { return address; } void SetAddress(const std::string &a) { address = a; } int GetOutboundMessageHighWaterMark() const { return outbound_message_high_water_mark; } void SetOutboundMessageHighWaterMark(const int sndhwm) { if (sndhwm >= 0) { outbound_message_high_water_mark = sndhwm; } } virtual bool Initialize(void *pcontext) = 0; virtual void Shutdown() = 0; virtual bool NotifyBlock(const CBlockIndex *pindex); virtual bool NotifyTransaction(const CTransaction &transaction); protected: void *psocket; std::string type; std::string address; int outbound_message_high_water_mark; // aka SNDHWM }; #endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index c50dd9c41..19043bf84 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -1,179 +1,179 @@ // Copyright (c) 2015-2018 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include void zmqError(const char *str) { LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno)); } CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr) {} CZMQNotificationInterface::~CZMQNotificationInterface() { Shutdown(); } std::list CZMQNotificationInterface::GetActiveNotifiers() const { std::list result; for (const auto &n : notifiers) { result.push_back(n.get()); } return result; } CZMQNotificationInterface *CZMQNotificationInterface::Create() { std::map factories; factories["pubhashblock"] = CZMQAbstractNotifier::Create; factories["pubhashtx"] = CZMQAbstractNotifier::Create; factories["pubrawblock"] = CZMQAbstractNotifier::Create; factories["pubrawtx"] = CZMQAbstractNotifier::Create; std::list> notifiers; for (const auto &entry : factories) { std::string arg("-zmq" + entry.first); if (gArgs.IsArgSet(arg)) { - CZMQNotifierFactory factory = entry.second; - std::string address = gArgs.GetArg(arg, ""); - CZMQAbstractNotifier *notifier = factory(); + const auto &factory = entry.second; + const std::string address = gArgs.GetArg(arg, ""); + std::unique_ptr notifier = factory(); notifier->SetType(entry.first); notifier->SetAddress(address); notifier->SetOutboundMessageHighWaterMark( static_cast(gArgs.GetArg( arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM))); - notifiers.emplace_back(notifier); + notifiers.push_back(std::move(notifier)); } } if (!notifiers.empty()) { std::unique_ptr notificationInterface( new CZMQNotificationInterface()); notificationInterface->notifiers = std::move(notifiers); if (notificationInterface->Initialize()) { return notificationInterface.release(); } } return nullptr; } // Called at startup to conditionally set up ZMQ socket(s) bool CZMQNotificationInterface::Initialize() { int major = 0, minor = 0, patch = 0; zmq_version(&major, &minor, &patch); LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch); LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n"); assert(!pcontext); pcontext = zmq_ctx_new(); if (!pcontext) { zmqError("Unable to initialize context"); return false; } for (auto ¬ifier : notifiers) { if (notifier->Initialize(pcontext)) { LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); } else { LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); return false; } } return true; } // Called during shutdown sequence void CZMQNotificationInterface::Shutdown() { LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n"); if (pcontext) { for (auto ¬ifier : notifiers) { LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); notifier->Shutdown(); } zmq_ctx_term(pcontext); pcontext = nullptr; } } namespace { template void TryForEachAndRemoveFailed( std::list> ¬ifiers, const Function &func) { for (auto i = notifiers.begin(); i != notifiers.end();) { CZMQAbstractNotifier *notifier = i->get(); if (func(notifier)) { ++i; } else { notifier->Shutdown(); i = notifiers.erase(i); } } } } // anonymous namespace void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { // In IBD or blocks were disconnected without any new ones if (fInitialDownload || pindexNew == pindexFork) { return; } TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier *notifier) { return notifier->NotifyBlock(pindexNew); }); } void CZMQNotificationInterface::TransactionAddedToMempool( const CTransactionRef &ptx) { // Used by BlockConnected and BlockDisconnected as well, because they're all // the same external callback. const CTransaction &tx = *ptx; TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier *notifier) { return notifier->NotifyTransaction(tx); }); } void CZMQNotificationInterface::BlockConnected( const std::shared_ptr &pblock, const CBlockIndex *pindexConnected) { for (const CTransactionRef &ptx : pblock->vtx) { // Do a normal notify for each transaction added in the block TransactionAddedToMempool(ptx); } } void CZMQNotificationInterface::BlockDisconnected( const std::shared_ptr &pblock, const CBlockIndex *pindexDisconnected) { for (const CTransactionRef &ptx : pblock->vtx) { // Do a normal notify for each transaction removed in block // disconnection TransactionAddedToMempool(ptx); } } CZMQNotificationInterface *g_zmq_notification_interface = nullptr;