Changeset View
Changeset View
Standalone View
Standalone View
src/zmq/zmqnotificationinterface.cpp
Show All 11 Lines | void zmqError(const char *str) { | ||||
LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, | LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, | ||||
zmq_strerror(errno)); | zmq_strerror(errno)); | ||||
} | } | ||||
CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr) {} | CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr) {} | ||||
CZMQNotificationInterface::~CZMQNotificationInterface() { | CZMQNotificationInterface::~CZMQNotificationInterface() { | ||||
Shutdown(); | Shutdown(); | ||||
for (std::list<CZMQAbstractNotifier *>::iterator i = notifiers.begin(); | |||||
i != notifiers.end(); ++i) { | |||||
delete *i; | |||||
} | |||||
} | } | ||||
std::list<const CZMQAbstractNotifier *> | std::list<const CZMQAbstractNotifier *> | ||||
CZMQNotificationInterface::GetActiveNotifiers() const { | CZMQNotificationInterface::GetActiveNotifiers() const { | ||||
std::list<const CZMQAbstractNotifier *> result; | std::list<const CZMQAbstractNotifier *> result; | ||||
for (const auto *n : notifiers) { | for (const auto &n : notifiers) { | ||||
result.push_back(n); | result.push_back(n.get()); | ||||
} | } | ||||
return result; | return result; | ||||
} | } | ||||
CZMQNotificationInterface *CZMQNotificationInterface::Create() { | CZMQNotificationInterface *CZMQNotificationInterface::Create() { | ||||
CZMQNotificationInterface *notificationInterface = nullptr; | |||||
std::map<std::string, CZMQNotifierFactory> factories; | std::map<std::string, CZMQNotifierFactory> factories; | ||||
std::list<CZMQAbstractNotifier *> notifiers; | |||||
factories["pubhashblock"] = | factories["pubhashblock"] = | ||||
CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>; | CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>; | ||||
factories["pubhashtx"] = | factories["pubhashtx"] = | ||||
CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>; | CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>; | ||||
factories["pubrawblock"] = | factories["pubrawblock"] = | ||||
CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>; | CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>; | ||||
factories["pubrawtx"] = | factories["pubrawtx"] = | ||||
CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>; | CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>; | ||||
std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers; | |||||
for (const auto &entry : factories) { | for (const auto &entry : factories) { | ||||
std::string arg("-zmq" + entry.first); | std::string arg("-zmq" + entry.first); | ||||
if (gArgs.IsArgSet(arg)) { | if (gArgs.IsArgSet(arg)) { | ||||
CZMQNotifierFactory factory = entry.second; | CZMQNotifierFactory factory = entry.second; | ||||
std::string address = gArgs.GetArg(arg, ""); | std::string address = gArgs.GetArg(arg, ""); | ||||
CZMQAbstractNotifier *notifier = factory(); | CZMQAbstractNotifier *notifier = factory(); | ||||
notifier->SetType(entry.first); | notifier->SetType(entry.first); | ||||
notifier->SetAddress(address); | notifier->SetAddress(address); | ||||
notifier->SetOutboundMessageHighWaterMark( | notifier->SetOutboundMessageHighWaterMark( | ||||
static_cast<int>(gArgs.GetArg( | static_cast<int>(gArgs.GetArg( | ||||
arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM))); | arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM))); | ||||
notifiers.push_back(notifier); | notifiers.emplace_back(notifier); | ||||
} | } | ||||
} | } | ||||
if (!notifiers.empty()) { | if (!notifiers.empty()) { | ||||
notificationInterface = new CZMQNotificationInterface(); | std::unique_ptr<CZMQNotificationInterface> notificationInterface( | ||||
notificationInterface->notifiers = notifiers; | new CZMQNotificationInterface()); | ||||
notificationInterface->notifiers = std::move(notifiers); | |||||
if (!notificationInterface->Initialize()) { | if (notificationInterface->Initialize()) { | ||||
delete notificationInterface; | return notificationInterface.release(); | ||||
notificationInterface = nullptr; | |||||
} | } | ||||
} | } | ||||
return notificationInterface; | return nullptr; | ||||
} | } | ||||
// Called at startup to conditionally set up ZMQ socket(s) | // Called at startup to conditionally set up ZMQ socket(s) | ||||
bool CZMQNotificationInterface::Initialize() { | bool CZMQNotificationInterface::Initialize() { | ||||
int major = 0, minor = 0, patch = 0; | int major = 0, minor = 0, patch = 0; | ||||
zmq_version(&major, &minor, &patch); | zmq_version(&major, &minor, &patch); | ||||
LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch); | LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch); | ||||
LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n"); | LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n"); | ||||
assert(!pcontext); | assert(!pcontext); | ||||
pcontext = zmq_ctx_new(); | pcontext = zmq_ctx_new(); | ||||
if (!pcontext) { | if (!pcontext) { | ||||
zmqError("Unable to initialize context"); | zmqError("Unable to initialize context"); | ||||
return false; | return false; | ||||
} | } | ||||
std::list<CZMQAbstractNotifier *>::iterator i = notifiers.begin(); | for (auto ¬ifier : notifiers) { | ||||
for (; i != notifiers.end(); ++i) { | |||||
CZMQAbstractNotifier *notifier = *i; | |||||
if (notifier->Initialize(pcontext)) { | if (notifier->Initialize(pcontext)) { | ||||
LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", | LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", | ||||
notifier->GetType(), notifier->GetAddress()); | notifier->GetType(), notifier->GetAddress()); | ||||
} else { | } else { | ||||
LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", | LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", | ||||
notifier->GetType(), notifier->GetAddress()); | notifier->GetType(), notifier->GetAddress()); | ||||
break; | |||||
} | |||||
} | |||||
if (i != notifiers.end()) { | |||||
return false; | return false; | ||||
} | } | ||||
} | |||||
return true; | return true; | ||||
} | } | ||||
// Called during shutdown sequence | // Called during shutdown sequence | ||||
void CZMQNotificationInterface::Shutdown() { | void CZMQNotificationInterface::Shutdown() { | ||||
LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n"); | LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n"); | ||||
if (pcontext) { | if (pcontext) { | ||||
for (std::list<CZMQAbstractNotifier *>::iterator i = notifiers.begin(); | for (auto ¬ifier : notifiers) { | ||||
i != notifiers.end(); ++i) { | |||||
CZMQAbstractNotifier *notifier = *i; | |||||
LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", | LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", | ||||
notifier->GetType(), notifier->GetAddress()); | notifier->GetType(), notifier->GetAddress()); | ||||
notifier->Shutdown(); | notifier->Shutdown(); | ||||
} | } | ||||
zmq_ctx_term(pcontext); | zmq_ctx_term(pcontext); | ||||
pcontext = nullptr; | pcontext = nullptr; | ||||
} | } | ||||
} | } | ||||
void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, | void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, | ||||
const CBlockIndex *pindexFork, | const CBlockIndex *pindexFork, | ||||
bool fInitialDownload) { | bool fInitialDownload) { | ||||
// In IBD or blocks were disconnected without any new ones | // In IBD or blocks were disconnected without any new ones | ||||
if (fInitialDownload || pindexNew == pindexFork) { | if (fInitialDownload || pindexNew == pindexFork) { | ||||
return; | return; | ||||
} | } | ||||
for (std::list<CZMQAbstractNotifier *>::iterator i = notifiers.begin(); | for (auto i = notifiers.begin(); i != notifiers.end();) { | ||||
i != notifiers.end();) { | CZMQAbstractNotifier *notifier = i->get(); | ||||
CZMQAbstractNotifier *notifier = *i; | |||||
if (notifier->NotifyBlock(pindexNew)) { | if (notifier->NotifyBlock(pindexNew)) { | ||||
i++; | i++; | ||||
} else { | } else { | ||||
notifier->Shutdown(); | notifier->Shutdown(); | ||||
i = notifiers.erase(i); | i = notifiers.erase(i); | ||||
} | } | ||||
} | } | ||||
} | } | ||||
void CZMQNotificationInterface::TransactionAddedToMempool( | void CZMQNotificationInterface::TransactionAddedToMempool( | ||||
const CTransactionRef &ptx) { | const CTransactionRef &ptx) { | ||||
// Used by BlockConnected and BlockDisconnected as well, because they're all | // Used by BlockConnected and BlockDisconnected as well, because they're all | ||||
// the same external callback. | // the same external callback. | ||||
const CTransaction &tx = *ptx; | const CTransaction &tx = *ptx; | ||||
for (std::list<CZMQAbstractNotifier *>::iterator i = notifiers.begin(); | for (auto i = notifiers.begin(); i != notifiers.end();) { | ||||
i != notifiers.end();) { | CZMQAbstractNotifier *notifier = i->get(); | ||||
CZMQAbstractNotifier *notifier = *i; | |||||
if (notifier->NotifyTransaction(tx)) { | if (notifier->NotifyTransaction(tx)) { | ||||
i++; | i++; | ||||
} else { | } else { | ||||
notifier->Shutdown(); | notifier->Shutdown(); | ||||
i = notifiers.erase(i); | i = notifiers.erase(i); | ||||
} | } | ||||
} | } | ||||
} | } | ||||
Show All 21 Lines |