diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py --- a/contrib/zmq/zmq_sub.py +++ b/contrib/zmq/zmq_sub.py @@ -44,6 +44,7 @@ self.zmqContext = zmq.asyncio.Context() self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) + self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock") diff --git a/doc/zmq.md b/doc/zmq.md --- a/doc/zmq.md +++ b/doc/zmq.md @@ -65,10 +65,21 @@ The socket type is PUB and the address must be a valid ZeroMQ socket address. The same address can be used in more than one notification. +The option to set the PUB socket's outbound message high water mark +(SNDHWM) may be set individually for each notification: + + -zmqpubhashtxhwm=n + -zmqpubhashblockhwm=n + -zmqpubrawblockhwm=n + -zmqpubrawtxhwm=n + +The high water mark value must be an integer greater than or equal to 0. + For instance: $ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \ - -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw + -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw \ + -zmqpubhashtxhwm=10000 Each PUB notification has a topic and body, where the header corresponds to the notification type. For instance, for the diff --git a/src/init.cpp b/src/init.cpp --- a/src/init.cpp +++ b/src/init.cpp @@ -66,6 +66,7 @@ #include #if ENABLE_ZMQ +#include #include #include #endif @@ -765,11 +766,35 @@ gArgs.AddArg("-zmqpubrawtx=
", "Enable publish raw transaction in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubhashblockhwm=", + strprintf("Set publish hash block outbound message high water " + "mark (default: %d)", + CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), + false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubhashtxhwm=", + strprintf("Set publish hash transaction outbound message high " + "water mark (default: %d)", + CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), + false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawblockhwm=", + strprintf("Set publish raw block outbound message high water " + "mark (default: %d)", + CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), + false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawtxhwm=", + strprintf("Set publish raw transaction outbound message high " + "water mark (default: %d)", + CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), + false, OptionsCategory::ZMQ); #else hidden_args.emplace_back("-zmqpubhashblock=
"); hidden_args.emplace_back("-zmqpubhashtx=
"); hidden_args.emplace_back("-zmqpubrawblock=
"); hidden_args.emplace_back("-zmqpubrawtx=
"); + hidden_args.emplace_back("-zmqpubhashblockhwm="); + hidden_args.emplace_back("-zmqpubhashtxhwm="); + hidden_args.emplace_back("-zmqpubrawblockhwm="); + hidden_args.emplace_back("-zmqpubrawtxhwm="); #endif gArgs.AddArg( diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -14,7 +14,11 @@ class CZMQAbstractNotifier { public: - CZMQAbstractNotifier() : psocket(nullptr) {} + static const int DEFAULT_ZMQ_SNDHWM{1000}; + + CZMQAbstractNotifier() + : psocket(nullptr), + outbound_message_high_water_mark(DEFAULT_ZMQ_SNDHWM) {} virtual ~CZMQAbstractNotifier(); template static CZMQAbstractNotifier *Create() { @@ -25,6 +29,14 @@ void SetType(const std::string &t) { type = t; } std::string GetAddress() const { return address; } void SetAddress(const std::string &a) { address = a; } + int GetOutboundMessageHighWaterMark() const { + return outbound_message_high_water_mark; + } + void SetOutboundMessageHighWaterMark(const int sndhwm) { + if (sndhwm >= 0) { + outbound_message_high_water_mark = sndhwm; + } + } virtual bool Initialize(void *pcontext) = 0; virtual void Shutdown() = 0; @@ -36,6 +48,7 @@ void *psocket; std::string type; std::string address; + int outbound_message_high_water_mark; // aka SNDHWM }; #endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp --- a/src/zmq/zmqabstractnotifier.cpp +++ b/src/zmq/zmqabstractnotifier.cpp @@ -4,6 +4,8 @@ #include +const int CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM; + CZMQAbstractNotifier::~CZMQAbstractNotifier() { assert(!psocket); } diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -56,6 +56,9 @@ CZMQAbstractNotifier *notifier = factory(); notifier->SetType(entry.first); notifier->SetAddress(address); + notifier->SetOutboundMessageHighWaterMark( + static_cast(gArgs.GetArg( + arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM))); notifiers.push_back(notifier); } } @@ -93,10 +96,10 @@ for (; i != notifiers.end(); ++i) { CZMQAbstractNotifier *notifier = *i; if (notifier->Initialize(pcontext)) { - LogPrint(BCLog::ZMQ, " Notifier %s ready (address = %s)\n", + LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); } else { - LogPrint(BCLog::ZMQ, " Notifier %s failed (address = %s)\n", + LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); break; } @@ -116,7 +119,7 @@ for (std::list::iterator i = notifiers.begin(); i != notifiers.end(); ++i) { CZMQAbstractNotifier *notifier = *i; - LogPrint(BCLog::ZMQ, " Shutdown notifier %s at %s\n", + LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); notifier->Shutdown(); } diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -12,7 +12,7 @@ class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier { private: //! upcounting per message sequence number - uint32_t nSequence; + uint32_t nSequence{0U}; public: /* send zmq multipart message diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -78,7 +78,20 @@ return false; } - int rc = zmq_bind(psocket, address.c_str()); + LogPrint(BCLog::ZMQ, + "zmq: Outbound message high water mark for %s at %s is %d\n", + type, address, outbound_message_high_water_mark); + + int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, + &outbound_message_high_water_mark, + sizeof(outbound_message_high_water_mark)); + if (rc != 0) { + zmqError("Failed to set outbound message high water mark"); + zmq_close(psocket); + return false; + } + + rc = zmq_bind(psocket, address.c_str()); if (rc != 0) { zmqError("Failed to bind address"); zmq_close(psocket); @@ -118,7 +131,7 @@ } if (count == 1) { - LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address); + LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address); int linger = 0; zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); zmq_close(psocket); diff --git a/src/zmq/zmqrpc.cpp b/src/zmq/zmqrpc.cpp --- a/src/zmq/zmqrpc.cpp +++ b/src/zmq/zmqrpc.cpp @@ -27,6 +27,8 @@ " \"type\": \"pubhashtx\", (string) Type of notification\n" " \"address\": \"...\" (string) Address of the " "publisher\n" + " \"hwm\": n (numeric) Outbound message high " + "water mark\n" " },\n" " ...\n" "]\n"}, @@ -43,6 +45,7 @@ UniValue obj(UniValue::VOBJ); obj.pushKV("type", n->GetType()); obj.pushKV("address", n->GetAddress()); + obj.pushKV("hwm", n->GetOutboundMessageHighWaterMark()); result.push_back(obj); } } diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -125,10 +125,10 @@ self.log.info("Test the getzmqnotifications RPC") assert_equal(self.nodes[0].getzmqnotifications(), [ - {"type": "pubhashblock", "address": ADDRESS}, - {"type": "pubhashtx", "address": ADDRESS}, - {"type": "pubrawblock", "address": ADDRESS}, - {"type": "pubrawtx", "address": ADDRESS}, + {"type": "pubhashblock", "address": ADDRESS, "hwm": 1000}, + {"type": "pubhashtx", "address": ADDRESS, "hwm": 1000}, + {"type": "pubrawblock", "address": ADDRESS, "hwm": 1000}, + {"type": "pubrawtx", "address": ADDRESS, "hwm": 1000}, ]) assert_equal(self.nodes[1].getzmqnotifications(), []) diff --git a/test/lint/check-doc.py b/test/lint/check-doc.py --- a/test/lint/check-doc.py +++ b/test/lint/check-doc.py @@ -33,6 +33,11 @@ '-zmqpubhashtx', '-zmqpubrawblock', '-zmqpubrawtx', + '-zmqpubhashblockhwm', + '-zmqpubhashtxhwm', + '-zmqpubrawblockhwm', + '-zmqpubrawtxhwm', + ]) # list false positive undocumented arguments