Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F13115216
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Subscribers
None
View Options
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp
index f5839620ff..a9ce9c48c4 100644
--- a/src/zmq/zmqpublishnotifier.cpp
+++ b/src/zmq/zmqpublishnotifier.cpp
@@ -1,173 +1,178 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "chainparams.h"
#include "zmqpublishnotifier.h"
#include "main.h"
#include "util.h"
static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
+static const char *MSG_HASHBLOCK = "hashblock";
+static const char *MSG_HASHTX = "hashtx";
+static const char *MSG_RAWBLOCK = "rawblock";
+static const char *MSG_RAWTX = "rawtx";
+
// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
{
va_list args;
va_start(args, size);
while (1)
{
zmq_msg_t msg;
int rc = zmq_msg_init_size(&msg, size);
if (rc != 0)
{
zmqError("Unable to initialize ZMQ msg");
return -1;
}
void *buf = zmq_msg_data(&msg);
memcpy(buf, data, size);
data = va_arg(args, const void*);
rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
if (rc == -1)
{
zmqError("Unable to send ZMQ msg");
zmq_msg_close(&msg);
return -1;
}
zmq_msg_close(&msg);
if (!data)
break;
size = va_arg(args, size_t);
}
return 0;
}
bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
{
assert(!psocket);
// check if address is being used by other publish notifier
std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
if (i==mapPublishNotifiers.end())
{
psocket = zmq_socket(pcontext, ZMQ_PUB);
if (!psocket)
{
zmqError("Failed to create socket");
return false;
}
int rc = zmq_bind(psocket, address.c_str());
if (rc!=0)
{
zmqError("Failed to bind address");
zmq_close(psocket);
return false;
}
// register this notifier for the address, so it can be reused for other publish notifier
mapPublishNotifiers.insert(std::make_pair(address, this));
return true;
}
else
{
LogPrint("zmq", "zmq: Reusing socket for address %s\n", address);
psocket = i->second->psocket;
mapPublishNotifiers.insert(std::make_pair(address, this));
return true;
}
}
void CZMQAbstractPublishNotifier::Shutdown()
{
assert(psocket);
int count = mapPublishNotifiers.count(address);
// remove this notifier from the list of publishers using this address
typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
for (iterator it = iterpair.first; it != iterpair.second; ++it)
{
if (it->second==this)
{
mapPublishNotifiers.erase(it);
break;
}
}
if (count == 1)
{
LogPrint("zmq", "Close socket at address %s\n", address);
int linger = 0;
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
zmq_close(psocket);
}
psocket = 0;
}
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint("zmq", "zmq: Publish hashblock %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
- int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0);
+ int rc = zmq_send_multipart(psocket, MSG_HASHBLOCK, 9, data, 32, 0);
return rc == 0;
}
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint("zmq", "zmq: Publish hashtx %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
- int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0);
+ int rc = zmq_send_multipart(psocket, MSG_HASHTX, 6, data, 32, 0);
return rc == 0;
}
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
LogPrint("zmq", "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
const Consensus::Params& consensusParams = Params().GetConsensus();
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
{
LOCK(cs_main);
CBlock block;
if(!ReadBlockFromDisk(block, pindex, consensusParams))
{
zmqError("Can't read block from disk");
return false;
}
ss << block;
}
- int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0);
+ int rc = zmq_send_multipart(psocket, MSG_RAWBLOCK, 8, &(*ss.begin()), ss.size(), 0);
return rc == 0;
}
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << transaction;
- int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0);
+ int rc = zmq_send_multipart(psocket, MSG_RAWTX, 5, &(*ss.begin()), ss.size(), 0);
return rc == 0;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Mar 2, 10:20 (1 d, 7 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5181041
Default Alt Text
(5 KB)
Attached To
rSTAGING Bitcoin ABC staging
Event Timeline
Log In to Comment