diff --git a/src/init.cpp b/src/init.cpp --- a/src/init.cpp +++ b/src/init.cpp @@ -840,6 +840,9 @@ argsman.AddArg("-zmqpubrawtx=
", "Enable publish raw transaction in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubsequence=
", + "Enable publish hash block and tx sequence in
", + ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg( "-zmqpubhashblockhwm=", strprintf("Set publish hash block outbound message high water " @@ -864,15 +867,22 @@ "water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubsequencehwm=", + strprintf("Set publish hash sequence message high water mark" + " (default: %d)", + CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), + ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); #else hidden_args.emplace_back("-zmqpubhashblock=
"); hidden_args.emplace_back("-zmqpubhashtx=
"); hidden_args.emplace_back("-zmqpubrawblock=
"); hidden_args.emplace_back("-zmqpubrawtx=
"); + hidden_args.emplace_back("-zmqpubsequence="); hidden_args.emplace_back("-zmqpubhashblockhwm="); hidden_args.emplace_back("-zmqpubhashtxhwm="); hidden_args.emplace_back("-zmqpubrawblockhwm="); hidden_args.emplace_back("-zmqpubrawtxhwm="); + hidden_args.emplace_back("-zmqpubsequencehwm="); #endif argsman.AddArg( diff --git a/src/interfaces/chain.h b/src/interfaces/chain.h --- a/src/interfaces/chain.h +++ b/src/interfaces/chain.h @@ -273,10 +273,11 @@ class Notifications { public: virtual ~Notifications() {} - virtual void transactionAddedToMempool(const CTransactionRef &tx) {} - virtual void - transactionRemovedFromMempool(const CTransactionRef &ptx, - MemPoolRemovalReason reason) {} + virtual void transactionAddedToMempool(const CTransactionRef &tx, + uint64_t mempool_sequence) {} + virtual void transactionRemovedFromMempool(const CTransactionRef &ptx, + MemPoolRemovalReason reason, + uint64_t mempool_sequence) {} virtual void blockConnected(const CBlock &block, int height) {} virtual void blockDisconnected(const CBlock &block, int height) {} virtual void updatedBlockTip() {} diff --git a/src/interfaces/chain.cpp b/src/interfaces/chain.cpp --- a/src/interfaces/chain.cpp +++ b/src/interfaces/chain.cpp @@ -72,13 +72,15 @@ std::shared_ptr notifications) : m_notifications(std::move(notifications)) {} virtual ~NotificationsProxy() = default; - void TransactionAddedToMempool(const CTransactionRef &tx) override { - m_notifications->transactionAddedToMempool(tx); + void TransactionAddedToMempool(const CTransactionRef &tx, + uint64_t mempool_sequence) override { + m_notifications->transactionAddedToMempool(tx, mempool_sequence); } - void - TransactionRemovedFromMempool(const CTransactionRef &tx, - MemPoolRemovalReason reason) override { - m_notifications->transactionRemovedFromMempool(tx, reason); + void TransactionRemovedFromMempool(const CTransactionRef &tx, + MemPoolRemovalReason reason, + uint64_t mempool_sequence) override { + m_notifications->transactionRemovedFromMempool(tx, reason, + mempool_sequence); } void BlockConnected(const std::shared_ptr &block, const CBlockIndex *index) override { @@ -475,7 +477,8 @@ } LOCK2(::cs_main, m_node.mempool->cs); for (const CTxMemPoolEntry &entry : m_node.mempool->mapTx) { - notifications.transactionAddedToMempool(entry.GetSharedTx()); + notifications.transactionAddedToMempool( + entry.GetSharedTx(), 0 /* mempool_sequence */); } } const CChainParams ¶ms() const override { return m_params; } diff --git a/src/rpc/blockchain.h b/src/rpc/blockchain.h --- a/src/rpc/blockchain.h +++ b/src/rpc/blockchain.h @@ -43,7 +43,8 @@ UniValue MempoolInfoToJSON(const CTxMemPool &pool); /** Mempool to JSON */ -UniValue MempoolToJSON(const CTxMemPool &pool, bool verbose = false); +UniValue MempoolToJSON(const CTxMemPool &pool, bool verbose = false, + bool include_mempool_sequence = false); /** Block header to JSON */ UniValue blockheaderToJSON(const CBlockIndex *tip, diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp --- a/src/rpc/blockchain.cpp +++ b/src/rpc/blockchain.cpp @@ -586,8 +586,14 @@ info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetId())); } -UniValue MempoolToJSON(const CTxMemPool &pool, bool verbose) { +UniValue MempoolToJSON(const CTxMemPool &pool, bool verbose, + bool include_mempool_sequence) { if (verbose) { + if (include_mempool_sequence) { + throw JSONRPCError( + RPC_INVALID_PARAMETER, + "Verbose results cannot contain mempool sequence values."); + } LOCK(pool.cs); UniValue o(UniValue::VOBJ); for (const CTxMemPoolEntry &e : pool.mapTx) { @@ -601,15 +607,26 @@ } return o; } else { + uint64_t mempool_sequence; std::vector vtxids; - pool.queryHashes(vtxids); - + { + LOCK(pool.cs); + pool.queryHashes(vtxids); + mempool_sequence = pool.GetSequence(); + } UniValue a(UniValue::VARR); for (const uint256 &txid : vtxids) { a.push_back(txid.ToString()); } - return a; + if (!include_mempool_sequence) { + return a; + } else { + UniValue o(UniValue::VOBJ); + o.pushKV("txids", a); + o.pushKV("mempool_sequence", mempool_sequence); + return o; + } } } @@ -623,6 +640,9 @@ { {"verbose", RPCArg::Type::BOOL, /* default */ "false", "True for a json object, false for array of transaction ids"}, + {"mempool_sequence", RPCArg::Type::BOOL, /* default */ "false", + "If verbose=false, returns a json object with transaction list " + "and mempool sequence number attached."}, }, { RPCResult{"for verbose = false", @@ -640,6 +660,21 @@ {RPCResult::Type::OBJ, "transactionid", "", MempoolEntryDescription()}, }}, + RPCResult{ + "for verbose = false and mempool_sequence = true", + RPCResult::Type::OBJ, + "", + "", + { + {RPCResult::Type::ARR, + "txids", + "", + { + {RPCResult::Type::STR_HEX, "", "The transaction id"}, + }}, + {RPCResult::Type::NUM, "mempool_sequence", + "The mempool sequence value."}, + }}, }, RPCExamples{HelpExampleCli("getrawmempool", "true") + HelpExampleRpc("getrawmempool", "true")}, @@ -650,7 +685,13 @@ fVerbose = request.params[0].get_bool(); } - return MempoolToJSON(EnsureMemPool(request.context), fVerbose); + bool include_mempool_sequence = false; + if (!request.params[1].isNull()) { + include_mempool_sequence = request.params[1].get_bool(); + } + + return MempoolToJSON(EnsureMemPool(request.context), fVerbose, + include_mempool_sequence); }, }; } @@ -3035,7 +3076,7 @@ { "blockchain", "getmempooldescendants", getmempooldescendants, {"txid","verbose"} }, { "blockchain", "getmempoolentry", getmempoolentry, {"txid"} }, { "blockchain", "getmempoolinfo", getmempoolinfo, {} }, - { "blockchain", "getrawmempool", getrawmempool, {"verbose"} }, + { "blockchain", "getrawmempool", getrawmempool, {"verbose", "mempool_sequence"} }, { "blockchain", "gettxout", gettxout, {"txid","n","include_mempool"} }, { "blockchain", "gettxoutsetinfo", gettxoutsetinfo, {"hash_type"} }, { "blockchain", "pruneblockchain", pruneblockchain, {"height"} }, diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -129,6 +129,7 @@ {"pruneblockchain", 0, "height"}, {"keypoolrefill", 0, "newsize"}, {"getrawmempool", 0, "verbose"}, + {"getrawmempool", 1, "mempool_sequence"}, {"prioritisetransaction", 1, "dummy"}, {"prioritisetransaction", 2, "fee_delta"}, {"setban", 2, "bantime"}, diff --git a/src/txmempool.h b/src/txmempool.h --- a/src/txmempool.h +++ b/src/txmempool.h @@ -512,6 +512,11 @@ mutable uint64_t m_epoch; mutable bool m_has_epoch_guard; + // In-memory counter for external mempool tracking purposes. + // This number is incremented once every time a transaction + // is added or removed from the mempool for any reason. + mutable uint64_t m_sequence_number{1}; + void trackPackageRemoved(const CFeeRate &rate) EXCLUSIVE_LOCKS_REQUIRED(cs); bool m_is_loaded GUARDED_BY(cs){false}; @@ -820,6 +825,15 @@ return (m_unbroadcast_txids.count(txid) != 0); } + /** Guards this internal counter for external reporting */ + uint64_t GetAndIncrementSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) { + return m_sequence_number++; + } + + uint64_t GetSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) { + return m_sequence_number; + } + private: /** * UpdateForDescendants is used by UpdateTransactionsFromBlock to update the diff --git a/src/txmempool.cpp b/src/txmempool.cpp --- a/src/txmempool.cpp +++ b/src/txmempool.cpp @@ -473,13 +473,17 @@ } void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason) { + // We increment mempool sequence value no matter removal reason + // even if not directly reported below. + uint64_t mempool_sequence = GetAndIncrementSequence(); + if (reason != MemPoolRemovalReason::BLOCK) { // Notify clients that a transaction has been removed from the mempool // for any reason except being included in a block. Clients interested // in transactions included in blocks can subscribe to the // BlockConnected notification. - GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), - reason); + GetMainSignals().TransactionRemovedFromMempool( + it->GetSharedTx(), reason, mempool_sequence); } for (const CTxIn &txin : it->GetTx().vin) { diff --git a/src/validation.cpp b/src/validation.cpp --- a/src/validation.cpp +++ b/src/validation.cpp @@ -741,7 +741,9 @@ return false; } - GetMainSignals().TransactionAddedToMempool(ptx); + GetMainSignals().TransactionAddedToMempool( + ptx, m_pool.GetAndIncrementSequence()); + return true; } diff --git a/src/validationinterface.h b/src/validationinterface.h --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -105,7 +105,9 @@ * * Called on a background thread. */ - virtual void TransactionAddedToMempool(const CTransactionRef &tx) {} + virtual void TransactionAddedToMempool(const CTransactionRef &tx, + uint64_t mempool_sequence) {} + /** * Notifies listeners of a transaction leaving mempool. * @@ -139,7 +141,8 @@ * Called on a background thread. */ virtual void TransactionRemovedFromMempool(const CTransactionRef &tx, - MemPoolRemovalReason reason) {} + MemPoolRemovalReason reason, + uint64_t mempool_sequence) {} /** * Notifies listeners of a block being connected. * Provides a vector of transactions evicted from the mempool as a result. @@ -219,9 +222,11 @@ void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); - void TransactionAddedToMempool(const CTransactionRef &); + void TransactionAddedToMempool(const CTransactionRef &, + uint64_t mempool_sequence); void TransactionRemovedFromMempool(const CTransactionRef &, - MemPoolRemovalReason); + MemPoolRemovalReason, + uint64_t mempool_sequence); void BlockConnected(const std::shared_ptr &, const CBlockIndex *pindex); void BlockDisconnected(const std::shared_ptr &, diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -203,10 +203,11 @@ fInitialDownload); } -void CMainSignals::TransactionAddedToMempool(const CTransactionRef &tx) { - auto event = [tx, this] { +void CMainSignals::TransactionAddedToMempool(const CTransactionRef &tx, + uint64_t mempool_sequence) { + auto event = [tx, mempool_sequence, this] { m_internals->Iterate([&](CValidationInterface &callbacks) { - callbacks.TransactionAddedToMempool(tx); + callbacks.TransactionAddedToMempool(tx, mempool_sequence); }); }; ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__, @@ -214,10 +215,12 @@ } void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef &tx, - MemPoolRemovalReason reason) { - auto event = [tx, reason, this] { + MemPoolRemovalReason reason, + uint64_t mempool_sequence) { + auto event = [tx, reason, mempool_sequence, this] { m_internals->Iterate([&](CValidationInterface &callbacks) { - callbacks.TransactionRemovedFromMempool(tx, reason); + callbacks.TransactionRemovedFromMempool(tx, reason, + mempool_sequence); }); }; ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__, diff --git a/src/wallet/wallet.h b/src/wallet/wallet.h --- a/src/wallet/wallet.h +++ b/src/wallet/wallet.h @@ -1059,7 +1059,8 @@ bool fFlushOnClose = true); bool LoadToWallet(const TxId &txid, const UpdateWalletTxFn &fill_wtx) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet); - void transactionAddedToMempool(const CTransactionRef &tx) override; + void transactionAddedToMempool(const CTransactionRef &tx, + uint64_t mempool_sequence) override; void blockConnected(const CBlock &block, int height) override; void blockDisconnected(const CBlock &block, int height) override; void updatedBlockTip() override; @@ -1087,7 +1088,8 @@ const WalletRescanReserver &reserver, bool fUpdate); void transactionRemovedFromMempool(const CTransactionRef &tx, - MemPoolRemovalReason reason) override; + MemPoolRemovalReason reason, + uint64_t mempool_sequence) override; void ReacceptWalletTransactions() EXCLUSIVE_LOCKS_REQUIRED(cs_wallet); void ResendWalletTransactions(); struct Balance { diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -1293,7 +1293,8 @@ MarkInputsDirty(ptx); } -void CWallet::transactionAddedToMempool(const CTransactionRef &tx) { +void CWallet::transactionAddedToMempool(const CTransactionRef &tx, + uint64_t mempool_sequence) { LOCK(cs_wallet); SyncTransaction(tx, {CWalletTx::Status::UNCONFIRMED, /* block_height */ 0, @@ -1306,7 +1307,8 @@ } void CWallet::transactionRemovedFromMempool(const CTransactionRef &tx, - MemPoolRemovalReason reason) { + MemPoolRemovalReason reason, + uint64_t mempool_sequence) { LOCK(cs_wallet); auto it = mapWallet.find(tx->GetId()); if (it != mapWallet.end()) { @@ -1356,7 +1358,8 @@ SyncTransaction(block.vtx[index], {CWalletTx::Status::CONFIRMED, height, block_hash, int(index)}); transactionRemovedFromMempool(block.vtx[index], - MemPoolRemovalReason::BLOCK); + MemPoolRemovalReason::BLOCK, + 0 /* mempool_sequence */); } } diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -44,7 +44,19 @@ virtual bool Initialize(void *pcontext) = 0; virtual void Shutdown() = 0; + // Notifies of ConnectTip result, i.e., new active tip only virtual bool NotifyBlock(const CBlockIndex *pindex); + // Notifies of every block connection + virtual bool NotifyBlockConnect(const CBlockIndex *pindex); + // Notifies of every block disconnection + virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex); + // Notifies of every mempool acceptance + virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, + uint64_t mempool_sequence); + // Notifies of every mempool removal, except inclusion in blocks + virtual bool NotifyTransactionRemoval(const CTransaction &transaction, + uint64_t mempool_sequence); + // Notifies of transactions added to mempool or appearing in blocks virtual bool NotifyTransaction(const CTransaction &transaction); protected: diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp --- a/src/zmq/zmqabstractnotifier.cpp +++ b/src/zmq/zmqabstractnotifier.cpp @@ -20,3 +20,23 @@ const CTransaction & /*transaction*/) { return true; } + +bool CZMQAbstractNotifier::NotifyBlockConnect( + const CBlockIndex * /*CBlockIndex*/) { + return true; +} + +bool CZMQAbstractNotifier::NotifyBlockDisconnect( + const CBlockIndex * /*CBlockIndex*/) { + return true; +} + +bool CZMQAbstractNotifier::NotifyTransactionAcceptance( + const CTransaction & /*transaction*/, uint64_t mempool_sequence) { + return true; +} + +bool CZMQAbstractNotifier::NotifyTransactionRemoval( + const CTransaction & /*transaction*/, uint64_t mempool_sequence) { + return true; +} diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h --- a/src/zmq/zmqnotificationinterface.h +++ b/src/zmq/zmqnotificationinterface.h @@ -26,7 +26,11 @@ void Shutdown(); // CValidationInterface - void TransactionAddedToMempool(const CTransactionRef &tx) override; + void TransactionAddedToMempool(const CTransactionRef &tx, + uint64_t mempool_sequence) override; + void TransactionRemovedFromMempool(const CTransactionRef &tx, + MemPoolRemovalReason reason, + uint64_t mempool_sequence) override; void BlockConnected(const std::shared_ptr &pblock, const CBlockIndex *pindexConnected) override; void BlockDisconnected(const std::shared_ptr &pblock, diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -36,6 +36,8 @@ CZMQAbstractNotifier::Create; factories["pubrawtx"] = CZMQAbstractNotifier::Create; + factories["pubsequence"] = + CZMQAbstractNotifier::Create; std::list> notifiers; for (const auto &entry : factories) { @@ -145,33 +147,62 @@ } void CZMQNotificationInterface::TransactionAddedToMempool( - const CTransactionRef &ptx) { - // Used by BlockConnected and BlockDisconnected as well, because they're all - // the same external callback. + const CTransactionRef &ptx, uint64_t mempool_sequence) { const CTransaction &tx = *ptx; - TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier *notifier) { - return notifier->NotifyTransaction(tx); - }); + TryForEachAndRemoveFailed( + notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier *notifier) { + return notifier->NotifyTransaction(tx) && + notifier->NotifyTransactionAcceptance(tx, mempool_sequence); + }); +} + +void CZMQNotificationInterface::TransactionRemovedFromMempool( + const CTransactionRef &ptx, MemPoolRemovalReason reason, + uint64_t mempool_sequence) { + // Called for all non-block inclusion reasons + const CTransaction &tx = *ptx; + + TryForEachAndRemoveFailed( + notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier *notifier) { + return notifier->NotifyTransactionRemoval(tx, mempool_sequence); + }); } void CZMQNotificationInterface::BlockConnected( const std::shared_ptr &pblock, const CBlockIndex *pindexConnected) { for (const CTransactionRef &ptx : pblock->vtx) { - // Do a normal notify for each transaction added in the block - TransactionAddedToMempool(ptx); + const CTransaction &tx = *ptx; + TryForEachAndRemoveFailed(notifiers, + [&tx](CZMQAbstractNotifier *notifier) { + return notifier->NotifyTransaction(tx); + }); } + + // Next we notify BlockConnect listeners for *all* blocks + TryForEachAndRemoveFailed( + notifiers, [pindexConnected](CZMQAbstractNotifier *notifier) { + return notifier->NotifyBlockConnect(pindexConnected); + }); } void CZMQNotificationInterface::BlockDisconnected( const std::shared_ptr &pblock, const CBlockIndex *pindexDisconnected) { for (const CTransactionRef &ptx : pblock->vtx) { - // Do a normal notify for each transaction removed in block - // disconnection - TransactionAddedToMempool(ptx); + const CTransaction &tx = *ptx; + TryForEachAndRemoveFailed(notifiers, + [&tx](CZMQAbstractNotifier *notifier) { + return notifier->NotifyTransaction(tx); + }); } + + // Next we notify BlockDisconnect listeners for *all* blocks + TryForEachAndRemoveFailed( + notifiers, [pindexDisconnected](CZMQAbstractNotifier *notifier) { + return notifier->NotifyBlockDisconnect(pindexDisconnected); + }); } CZMQNotificationInterface *g_zmq_notification_interface = nullptr; diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -47,4 +47,14 @@ bool NotifyTransaction(const CTransaction &transaction) override; }; +class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier { +public: + bool NotifyBlockConnect(const CBlockIndex *pindex) override; + bool NotifyBlockDisconnect(const CBlockIndex *pindex) override; + bool NotifyTransactionAcceptance(const CTransaction &transaction, + uint64_t mempool_sequence) override; + bool NotifyTransactionRemoval(const CTransaction &transaction, + uint64_t mempool_sequence) override; +}; + #endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -30,6 +30,7 @@ static const char *MSG_HASHTX = "hashtx"; static const char *MSG_RAWBLOCK = "rawblock"; static const char *MSG_RAWTX = "rawtx"; +static const char *MSG_SEQUENCE = "sequence"; // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void *data, size_t size, ...) { @@ -232,3 +233,62 @@ ss << transaction; return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); } + +// TODO: Dedup this code to take label char, log string +bool CZMQPublishSequenceNotifier::NotifyBlockConnect( + const CBlockIndex *pindex) { + BlockHash hash = pindex->GetBlockHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s\n", + hash.GetHex()); + char data[sizeof(BlockHash) + 1]; + for (unsigned int i = 0; i < sizeof(BlockHash); i++) { + data[sizeof(BlockHash) - 1 - i] = hash.begin()[i]; + } + // Block (C)onnect + data[sizeof(data) - 1] = 'C'; + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect( + const CBlockIndex *pindex) { + BlockHash hash = pindex->GetBlockHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s\n", + hash.GetHex()); + char data[sizeof(BlockHash) + 1]; + for (unsigned int i = 0; i < sizeof(BlockHash); i++) { + data[sizeof(BlockHash) - 1 - i] = hash.begin()[i]; + } + // Block (D)isconnect + data[sizeof(data) - 1] = 'D'; + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance( + const CTransaction &transaction, uint64_t mempool_sequence) { + TxId txid = transaction.GetId(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s\n", + txid.GetHex()); + uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1]; + for (unsigned int i = 0; i < sizeof(TxId); i++) { + data[sizeof(TxId) - 1 - i] = txid.begin()[i]; + } + // Mempool (A)cceptance + data[sizeof(TxId)] = 'A'; + WriteLE64(data + sizeof(TxId) + 1, mempool_sequence); + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval( + const CTransaction &transaction, uint64_t mempool_sequence) { + TxId txid = transaction.GetId(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s\n", + txid.GetHex()); + uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1]; + for (unsigned int i = 0; i < sizeof(TxId); i++) { + data[sizeof(TxId) - 1 - i] = txid.begin()[i]; + } + // Mempool (R)emoval + data[sizeof(TxId)] = 'R'; + WriteLE64(data + sizeof(TxId) + 1, mempool_sequence); + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} diff --git a/test/lint/check-doc.py b/test/lint/check-doc.py --- a/test/lint/check-doc.py +++ b/test/lint/check-doc.py @@ -37,6 +37,8 @@ '-zmqpubhashtxhwm', '-zmqpubrawblockhwm', '-zmqpubrawtxhwm', + '-zmqpubsequence', + '-zmqpubsequencehwm', ]) # list false positive undocumented arguments