diff --git a/doc/zmq.md b/doc/zmq.md index de22aadc6..8638a6990 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -1,121 +1,135 @@ # Block and Transaction Broadcasting with ZeroMQ [ZeroMQ](https://zeromq.org/) is a lightweight wrapper around TCP connections, inter-process communication, and shared-memory, providing various message-oriented semantics such as publish/subscribe, request/reply, and push/pull. The Bitcoin ABC daemon can be configured to act as a trusted "border router", implementing the bitcoin wire protocol and relay, making consensus decisions, maintaining the local blockchain database, broadcasting locally generated transactions into the network, and providing a queryable RPC interface to interact on a polled basis for requesting blockchain related data. However, there exists only a limited service to notify external software of events like the arrival of new blocks or transactions. The ZeroMQ facility implements a notification interface through a set of specific notifiers. Currently there are notifiers that publish blocks and transactions. This read-only facility requires only the connection of a corresponding ZeroMQ subscriber port in receiving software; it is not authenticated nor is there any two-way protocol involvement. Therefore, subscribers should validate the received data since it may be out of date, incomplete or even invalid. ZeroMQ sockets are self-connecting and self-healing; that is, connections made between two endpoints will be automatically restored after an outage, and either end may be freely started or stopped in any order. Because ZeroMQ is message oriented, subscribers receive transactions and blocks all-at-once and do not need to implement any sort of buffering or reassembly. ## Prerequisites The ZeroMQ feature in Bitcoin ABC requires the ZeroMQ API >= 4.1.5 [libzmq](https://github.com/zeromq/libzmq/releases). For version information, see [dependencies.md](dependencies.md). Typically, it is packaged by distributions as something like *libzmq3-dev*. The C++ wrapper for ZeroMQ is *not* needed. In order to run the example Python client scripts in the `contrib/zmq/` directory, one must also install [PyZMQ](https://github.com/zeromq/pyzmq) (generally with `pip install pyzmq`), though this is not necessary for daemon operation. ## Enabling By default, the ZeroMQ feature is automatically compiled. To disable, use -DBUILD_BITCOIN_ZMQ=OFF to `cmake` when building bitcoind: $ cmake -GNinja .. -DBUILD_BITCOIN_ZMQ=OFF [...] To actually enable operation, one must set the appropriate options on the command line or in the configuration file. ## Usage Currently, the following notifications are supported: -zmqpubhashtx=address -zmqpubhashblock=address -zmqpubrawblock=address -zmqpubrawtx=address The socket type is PUB and the address must be a valid ZeroMQ socket address. The same address can be used in more than one notification. The option to set the PUB socket's outbound message high water mark (SNDHWM) may be set individually for each notification: -zmqpubhashtxhwm=n -zmqpubhashblockhwm=n -zmqpubrawblockhwm=n -zmqpubrawtxhwm=n The high water mark value must be an integer greater than or equal to 0. For instance: $ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \ -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw \ -zmqpubhashtxhwm=10000 Each PUB notification has a topic and body, where the header corresponds to the notification type. For instance, for the notification `-zmqpubhashtx` the topic is `hashtx` (no null terminator) and the body is the transaction hash (32 bytes). These options can also be provided in bitcoin.conf. ZeroMQ endpoint specifiers for TCP (and others) are documented in the [ZeroMQ API](http://api.zeromq.org/4-0:_start). Client side, then, the ZeroMQ subscriber socket must have the ZMQ_SUBSCRIBE option set to one or either of these prefixes (for instance, just `hash`); without doing so will result in no messages arriving. Please see [`contrib/zmq/zmq_sub.py`](/contrib/zmq/zmq_sub.py) for a working example. +The ZMQ_PUB socket's ZMQ_TCP_KEEPALIVE option is enabled. This means that +the underlying SO_KEEPALIVE option is enabled when using a TCP transport. +The effective TCP keepalive values are managed through the underlying +operating system configuration and must be configured prior to connection establishment. + +For example, when running on GNU/Linux, one might use the following +to lower the keepalive setting to 10 minutes: + +sudo sysctl -w net.ipv4.tcp_keepalive_time=600 + +Setting the keepalive values appropriately for your operating environment may +improve connectivity in situations where long-lived connections are silently +dropped by network middle boxes. + ## Remarks From the perspective of bitcoind, the ZeroMQ socket is write-only; PUB sockets don't even have a read function. Thus, there is no state introduced into bitcoind directly. Furthermore, no information is broadcast that wasn't already received from the public P2P network. No authentication or authorization is done on connecting clients; it is assumed that the ZeroMQ port is exposed only to trusted entities, using other means such as firewalling. Note that when the block chain tip changes, a reorganisation may occur and just the tip will be notified. It is up to the subscriber to retrieve the chain from the last known block to the new tip. Also note that no notification occurs if the tip was in the active chain - this is the case after calling invalidateblock RPC. There are several possibilities that ZMQ notification can get lost during transmission depending on the communication type you are using. Bitcoind appends an up-counting sequence number to each notification which allows listeners to detect lost notifications. diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index c2fe8172b..3b715d7ff 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -1,225 +1,234 @@ // Copyright (c) 2015-2016 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include static std::multimap mapPublishNotifiers; static const char *MSG_HASHBLOCK = "hashblock"; static const char *MSG_HASHTX = "hashtx"; static const char *MSG_RAWBLOCK = "rawblock"; static const char *MSG_RAWTX = "rawtx"; // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void *data, size_t size, ...) { va_list args; va_start(args, size); while (1) { zmq_msg_t msg; int rc = zmq_msg_init_size(&msg, size); if (rc != 0) { zmqError("Unable to initialize ZMQ msg"); va_end(args); return -1; } void *buf = zmq_msg_data(&msg); memcpy(buf, data, size); data = va_arg(args, const void *); rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0); if (rc == -1) { zmqError("Unable to send ZMQ msg"); zmq_msg_close(&msg); va_end(args); return -1; } zmq_msg_close(&msg); if (!data) { break; } size = va_arg(args, size_t); } va_end(args); return 0; } bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) { assert(!psocket); // check if address is being used by other publish notifier std::multimap::iterator i = mapPublishNotifiers.find(address); if (i == mapPublishNotifiers.end()) { psocket = zmq_socket(pcontext, ZMQ_PUB); if (!psocket) { zmqError("Failed to create socket"); return false; } LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark)); if (rc != 0) { zmqError("Failed to set outbound message high water mark"); zmq_close(psocket); return false; } + const int so_keepalive_option{1}; + rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, + sizeof(so_keepalive_option)); + if (rc != 0) { + zmqError("Failed to set SO_KEEPALIVE"); + zmq_close(psocket); + return false; + } + rc = zmq_bind(psocket, address.c_str()); if (rc != 0) { zmqError("Failed to bind address"); zmq_close(psocket); return false; } // register this notifier for the address, so it can be reused for other // publish notifier mapPublishNotifiers.insert(std::make_pair(address, this)); return true; } else { LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address); LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); psocket = i->second->psocket; mapPublishNotifiers.insert(std::make_pair(address, this)); return true; } } void CZMQAbstractPublishNotifier::Shutdown() { // Early return if Initialize was not called if (!psocket) { return; } int count = mapPublishNotifiers.count(address); // remove this notifier from the list of publishers using this address typedef std::multimap::iterator iterator; std::pair iterpair = mapPublishNotifiers.equal_range(address); for (iterator it = iterpair.first; it != iterpair.second; ++it) { if (it->second == this) { mapPublishNotifiers.erase(it); break; } } if (count == 1) { LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address); int linger = 0; zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); zmq_close(psocket); } psocket = nullptr; } bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void *data, size_t size) { assert(psocket); /* send three parts, command & data & a LE 4byte sequence number */ uint8_t msgseq[sizeof(uint32_t)]; WriteLE32(&msgseq[0], nSequence); int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr); if (rc == -1) { return false; } /* increment memory only sequence number after sending */ nSequence++; return true; } bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { BlockHash hash = pindex->GetBlockHash(); LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex()); char data[32]; for (unsigned int i = 0; i < 32; i++) { data[31 - i] = hash.begin()[i]; } return SendZmqMessage(MSG_HASHBLOCK, data, 32); } bool CZMQPublishHashTransactionNotifier::NotifyTransaction( const CTransaction &transaction) { TxId txid = transaction.GetId(); LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", txid.GetHex()); char data[32]; for (unsigned int i = 0; i < 32; i++) { data[31 - i] = txid.begin()[i]; } return SendZmqMessage(MSG_HASHTX, data, 32); } bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex()); const Config &config = GetConfig(); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); { LOCK(cs_main); CBlock block; if (!ReadBlockFromDisk(block, pindex, config.GetChainParams().GetConsensus())) { zmqError("Can't read block from disk"); return false; } ss << block; } return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); } bool CZMQPublishRawTransactionNotifier::NotifyTransaction( const CTransaction &transaction) { TxId txid = transaction.GetId(); LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", txid.GetHex()); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); ss << transaction; return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); }