diff --git a/src/init.cpp b/src/init.cpp
--- a/src/init.cpp
+++ b/src/init.cpp
@@ -840,6 +840,9 @@
argsman.AddArg("-zmqpubrawtx=
",
"Enable publish raw transaction in ",
ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
+ argsman.AddArg("-zmqpubsequence=",
+ "Enable publish hash block and tx sequence in ",
+ ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg(
"-zmqpubhashblockhwm=",
strprintf("Set publish hash block outbound message high water "
@@ -864,15 +867,22 @@
"water mark (default: %d)",
CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM),
false, OptionsCategory::ZMQ);
+ argsman.AddArg("-zmqpubsequencehwm=",
+ strprintf("Set publish hash sequence message high water mark"
+ " (default: %d)",
+ CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM),
+ ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
#else
hidden_args.emplace_back("-zmqpubhashblock=");
hidden_args.emplace_back("-zmqpubhashtx=");
hidden_args.emplace_back("-zmqpubrawblock=");
hidden_args.emplace_back("-zmqpubrawtx=");
+ hidden_args.emplace_back("-zmqpubsequence=");
hidden_args.emplace_back("-zmqpubhashblockhwm=");
hidden_args.emplace_back("-zmqpubhashtxhwm=");
hidden_args.emplace_back("-zmqpubrawblockhwm=");
hidden_args.emplace_back("-zmqpubrawtxhwm=");
+ hidden_args.emplace_back("-zmqpubsequencehwm=");
#endif
argsman.AddArg(
diff --git a/src/interfaces/chain.h b/src/interfaces/chain.h
--- a/src/interfaces/chain.h
+++ b/src/interfaces/chain.h
@@ -273,10 +273,11 @@
class Notifications {
public:
virtual ~Notifications() {}
- virtual void transactionAddedToMempool(const CTransactionRef &tx) {}
- virtual void
- transactionRemovedFromMempool(const CTransactionRef &ptx,
- MemPoolRemovalReason reason) {}
+ virtual void transactionAddedToMempool(const CTransactionRef &tx,
+ uint64_t mempool_sequence) {}
+ virtual void transactionRemovedFromMempool(const CTransactionRef &ptx,
+ MemPoolRemovalReason reason,
+ uint64_t mempool_sequence) {}
virtual void blockConnected(const CBlock &block, int height) {}
virtual void blockDisconnected(const CBlock &block, int height) {}
virtual void updatedBlockTip() {}
diff --git a/src/interfaces/chain.cpp b/src/interfaces/chain.cpp
--- a/src/interfaces/chain.cpp
+++ b/src/interfaces/chain.cpp
@@ -72,13 +72,15 @@
std::shared_ptr notifications)
: m_notifications(std::move(notifications)) {}
virtual ~NotificationsProxy() = default;
- void TransactionAddedToMempool(const CTransactionRef &tx) override {
- m_notifications->transactionAddedToMempool(tx);
+ void TransactionAddedToMempool(const CTransactionRef &tx,
+ uint64_t mempool_sequence) override {
+ m_notifications->transactionAddedToMempool(tx, mempool_sequence);
}
- void
- TransactionRemovedFromMempool(const CTransactionRef &tx,
- MemPoolRemovalReason reason) override {
- m_notifications->transactionRemovedFromMempool(tx, reason);
+ void TransactionRemovedFromMempool(const CTransactionRef &tx,
+ MemPoolRemovalReason reason,
+ uint64_t mempool_sequence) override {
+ m_notifications->transactionRemovedFromMempool(tx, reason,
+ mempool_sequence);
}
void BlockConnected(const std::shared_ptr &block,
const CBlockIndex *index) override {
@@ -475,7 +477,8 @@
}
LOCK2(::cs_main, m_node.mempool->cs);
for (const CTxMemPoolEntry &entry : m_node.mempool->mapTx) {
- notifications.transactionAddedToMempool(entry.GetSharedTx());
+ notifications.transactionAddedToMempool(
+ entry.GetSharedTx(), 0 /* mempool_sequence */);
}
}
const CChainParams ¶ms() const override { return m_params; }
diff --git a/src/rpc/blockchain.h b/src/rpc/blockchain.h
--- a/src/rpc/blockchain.h
+++ b/src/rpc/blockchain.h
@@ -43,7 +43,8 @@
UniValue MempoolInfoToJSON(const CTxMemPool &pool);
/** Mempool to JSON */
-UniValue MempoolToJSON(const CTxMemPool &pool, bool verbose = false);
+UniValue MempoolToJSON(const CTxMemPool &pool, bool verbose = false,
+ bool include_mempool_sequence = false);
/** Block header to JSON */
UniValue blockheaderToJSON(const CBlockIndex *tip,
diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp
--- a/src/rpc/blockchain.cpp
+++ b/src/rpc/blockchain.cpp
@@ -586,8 +586,14 @@
info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetId()));
}
-UniValue MempoolToJSON(const CTxMemPool &pool, bool verbose) {
+UniValue MempoolToJSON(const CTxMemPool &pool, bool verbose,
+ bool include_mempool_sequence) {
if (verbose) {
+ if (include_mempool_sequence) {
+ throw JSONRPCError(
+ RPC_INVALID_PARAMETER,
+ "Verbose results cannot contain mempool sequence values.");
+ }
LOCK(pool.cs);
UniValue o(UniValue::VOBJ);
for (const CTxMemPoolEntry &e : pool.mapTx) {
@@ -601,15 +607,26 @@
}
return o;
} else {
+ uint64_t mempool_sequence;
std::vector vtxids;
- pool.queryHashes(vtxids);
-
+ {
+ LOCK(pool.cs);
+ pool.queryHashes(vtxids);
+ mempool_sequence = pool.GetSequence();
+ }
UniValue a(UniValue::VARR);
for (const uint256 &txid : vtxids) {
a.push_back(txid.ToString());
}
- return a;
+ if (!include_mempool_sequence) {
+ return a;
+ } else {
+ UniValue o(UniValue::VOBJ);
+ o.pushKV("txids", a);
+ o.pushKV("mempool_sequence", mempool_sequence);
+ return o;
+ }
}
}
@@ -623,6 +640,9 @@
{
{"verbose", RPCArg::Type::BOOL, /* default */ "false",
"True for a json object, false for array of transaction ids"},
+ {"mempool_sequence", RPCArg::Type::BOOL, /* default */ "false",
+ "If verbose=false, returns a json object with transaction list "
+ "and mempool sequence number attached."},
},
{
RPCResult{"for verbose = false",
@@ -640,6 +660,21 @@
{RPCResult::Type::OBJ, "transactionid", "",
MempoolEntryDescription()},
}},
+ RPCResult{
+ "for verbose = false and mempool_sequence = true",
+ RPCResult::Type::OBJ,
+ "",
+ "",
+ {
+ {RPCResult::Type::ARR,
+ "txids",
+ "",
+ {
+ {RPCResult::Type::STR_HEX, "", "The transaction id"},
+ }},
+ {RPCResult::Type::NUM, "mempool_sequence",
+ "The mempool sequence value."},
+ }},
},
RPCExamples{HelpExampleCli("getrawmempool", "true") +
HelpExampleRpc("getrawmempool", "true")},
@@ -650,7 +685,13 @@
fVerbose = request.params[0].get_bool();
}
- return MempoolToJSON(EnsureMemPool(request.context), fVerbose);
+ bool include_mempool_sequence = false;
+ if (!request.params[1].isNull()) {
+ include_mempool_sequence = request.params[1].get_bool();
+ }
+
+ return MempoolToJSON(EnsureMemPool(request.context), fVerbose,
+ include_mempool_sequence);
},
};
}
@@ -3035,7 +3076,7 @@
{ "blockchain", "getmempooldescendants", getmempooldescendants, {"txid","verbose"} },
{ "blockchain", "getmempoolentry", getmempoolentry, {"txid"} },
{ "blockchain", "getmempoolinfo", getmempoolinfo, {} },
- { "blockchain", "getrawmempool", getrawmempool, {"verbose"} },
+ { "blockchain", "getrawmempool", getrawmempool, {"verbose", "mempool_sequence"} },
{ "blockchain", "gettxout", gettxout, {"txid","n","include_mempool"} },
{ "blockchain", "gettxoutsetinfo", gettxoutsetinfo, {"hash_type"} },
{ "blockchain", "pruneblockchain", pruneblockchain, {"height"} },
diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp
--- a/src/rpc/client.cpp
+++ b/src/rpc/client.cpp
@@ -129,6 +129,7 @@
{"pruneblockchain", 0, "height"},
{"keypoolrefill", 0, "newsize"},
{"getrawmempool", 0, "verbose"},
+ {"getrawmempool", 1, "mempool_sequence"},
{"prioritisetransaction", 1, "dummy"},
{"prioritisetransaction", 2, "fee_delta"},
{"setban", 2, "bantime"},
diff --git a/src/txmempool.h b/src/txmempool.h
--- a/src/txmempool.h
+++ b/src/txmempool.h
@@ -512,6 +512,11 @@
mutable uint64_t m_epoch;
mutable bool m_has_epoch_guard;
+ // In-memory counter for external mempool tracking purposes.
+ // This number is incremented once every time a transaction
+ // is added or removed from the mempool for any reason.
+ mutable uint64_t m_sequence_number{1};
+
void trackPackageRemoved(const CFeeRate &rate) EXCLUSIVE_LOCKS_REQUIRED(cs);
bool m_is_loaded GUARDED_BY(cs){false};
@@ -820,6 +825,15 @@
return (m_unbroadcast_txids.count(txid) != 0);
}
+ /** Guards this internal counter for external reporting */
+ uint64_t GetAndIncrementSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
+ return m_sequence_number++;
+ }
+
+ uint64_t GetSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
+ return m_sequence_number;
+ }
+
private:
/**
* UpdateForDescendants is used by UpdateTransactionsFromBlock to update the
diff --git a/src/txmempool.cpp b/src/txmempool.cpp
--- a/src/txmempool.cpp
+++ b/src/txmempool.cpp
@@ -473,13 +473,17 @@
}
void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason) {
+ // We increment mempool sequence value no matter removal reason
+ // even if not directly reported below.
+ uint64_t mempool_sequence = GetAndIncrementSequence();
+
if (reason != MemPoolRemovalReason::BLOCK) {
// Notify clients that a transaction has been removed from the mempool
// for any reason except being included in a block. Clients interested
// in transactions included in blocks can subscribe to the
// BlockConnected notification.
- GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(),
- reason);
+ GetMainSignals().TransactionRemovedFromMempool(
+ it->GetSharedTx(), reason, mempool_sequence);
}
for (const CTxIn &txin : it->GetTx().vin) {
diff --git a/src/validation.cpp b/src/validation.cpp
--- a/src/validation.cpp
+++ b/src/validation.cpp
@@ -741,7 +741,9 @@
return false;
}
- GetMainSignals().TransactionAddedToMempool(ptx);
+ GetMainSignals().TransactionAddedToMempool(
+ ptx, m_pool.GetAndIncrementSequence());
+
return true;
}
diff --git a/src/validationinterface.h b/src/validationinterface.h
--- a/src/validationinterface.h
+++ b/src/validationinterface.h
@@ -105,7 +105,9 @@
*
* Called on a background thread.
*/
- virtual void TransactionAddedToMempool(const CTransactionRef &tx) {}
+ virtual void TransactionAddedToMempool(const CTransactionRef &tx,
+ uint64_t mempool_sequence) {}
+
/**
* Notifies listeners of a transaction leaving mempool.
*
@@ -139,7 +141,8 @@
* Called on a background thread.
*/
virtual void TransactionRemovedFromMempool(const CTransactionRef &tx,
- MemPoolRemovalReason reason) {}
+ MemPoolRemovalReason reason,
+ uint64_t mempool_sequence) {}
/**
* Notifies listeners of a block being connected.
* Provides a vector of transactions evicted from the mempool as a result.
@@ -219,9 +222,11 @@
void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *,
bool fInitialDownload);
- void TransactionAddedToMempool(const CTransactionRef &);
+ void TransactionAddedToMempool(const CTransactionRef &,
+ uint64_t mempool_sequence);
void TransactionRemovedFromMempool(const CTransactionRef &,
- MemPoolRemovalReason);
+ MemPoolRemovalReason,
+ uint64_t mempool_sequence);
void BlockConnected(const std::shared_ptr &,
const CBlockIndex *pindex);
void BlockDisconnected(const std::shared_ptr &,
diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp
--- a/src/validationinterface.cpp
+++ b/src/validationinterface.cpp
@@ -203,10 +203,11 @@
fInitialDownload);
}
-void CMainSignals::TransactionAddedToMempool(const CTransactionRef &tx) {
- auto event = [tx, this] {
+void CMainSignals::TransactionAddedToMempool(const CTransactionRef &tx,
+ uint64_t mempool_sequence) {
+ auto event = [tx, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface &callbacks) {
- callbacks.TransactionAddedToMempool(tx);
+ callbacks.TransactionAddedToMempool(tx, mempool_sequence);
});
};
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__,
@@ -214,10 +215,12 @@
}
void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef &tx,
- MemPoolRemovalReason reason) {
- auto event = [tx, reason, this] {
+ MemPoolRemovalReason reason,
+ uint64_t mempool_sequence) {
+ auto event = [tx, reason, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface &callbacks) {
- callbacks.TransactionRemovedFromMempool(tx, reason);
+ callbacks.TransactionRemovedFromMempool(tx, reason,
+ mempool_sequence);
});
};
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__,
diff --git a/src/wallet/wallet.h b/src/wallet/wallet.h
--- a/src/wallet/wallet.h
+++ b/src/wallet/wallet.h
@@ -1059,7 +1059,8 @@
bool fFlushOnClose = true);
bool LoadToWallet(const TxId &txid, const UpdateWalletTxFn &fill_wtx)
EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
- void transactionAddedToMempool(const CTransactionRef &tx) override;
+ void transactionAddedToMempool(const CTransactionRef &tx,
+ uint64_t mempool_sequence) override;
void blockConnected(const CBlock &block, int height) override;
void blockDisconnected(const CBlock &block, int height) override;
void updatedBlockTip() override;
@@ -1087,7 +1088,8 @@
const WalletRescanReserver &reserver,
bool fUpdate);
void transactionRemovedFromMempool(const CTransactionRef &tx,
- MemPoolRemovalReason reason) override;
+ MemPoolRemovalReason reason,
+ uint64_t mempool_sequence) override;
void ReacceptWalletTransactions() EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
void ResendWalletTransactions();
struct Balance {
diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp
--- a/src/wallet/wallet.cpp
+++ b/src/wallet/wallet.cpp
@@ -1293,7 +1293,8 @@
MarkInputsDirty(ptx);
}
-void CWallet::transactionAddedToMempool(const CTransactionRef &tx) {
+void CWallet::transactionAddedToMempool(const CTransactionRef &tx,
+ uint64_t mempool_sequence) {
LOCK(cs_wallet);
SyncTransaction(tx, {CWalletTx::Status::UNCONFIRMED, /* block_height */ 0,
@@ -1306,7 +1307,8 @@
}
void CWallet::transactionRemovedFromMempool(const CTransactionRef &tx,
- MemPoolRemovalReason reason) {
+ MemPoolRemovalReason reason,
+ uint64_t mempool_sequence) {
LOCK(cs_wallet);
auto it = mapWallet.find(tx->GetId());
if (it != mapWallet.end()) {
@@ -1356,7 +1358,8 @@
SyncTransaction(block.vtx[index], {CWalletTx::Status::CONFIRMED, height,
block_hash, int(index)});
transactionRemovedFromMempool(block.vtx[index],
- MemPoolRemovalReason::BLOCK);
+ MemPoolRemovalReason::BLOCK,
+ 0 /* mempool_sequence */);
}
}
diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h
--- a/src/zmq/zmqabstractnotifier.h
+++ b/src/zmq/zmqabstractnotifier.h
@@ -44,7 +44,19 @@
virtual bool Initialize(void *pcontext) = 0;
virtual void Shutdown() = 0;
+ // Notifies of ConnectTip result, i.e., new active tip only
virtual bool NotifyBlock(const CBlockIndex *pindex);
+ // Notifies of every block connection
+ virtual bool NotifyBlockConnect(const CBlockIndex *pindex);
+ // Notifies of every block disconnection
+ virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex);
+ // Notifies of every mempool acceptance
+ virtual bool NotifyTransactionAcceptance(const CTransaction &transaction,
+ uint64_t mempool_sequence);
+ // Notifies of every mempool removal, except inclusion in blocks
+ virtual bool NotifyTransactionRemoval(const CTransaction &transaction,
+ uint64_t mempool_sequence);
+ // Notifies of transactions added to mempool or appearing in blocks
virtual bool NotifyTransaction(const CTransaction &transaction);
protected:
diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp
--- a/src/zmq/zmqabstractnotifier.cpp
+++ b/src/zmq/zmqabstractnotifier.cpp
@@ -20,3 +20,23 @@
const CTransaction & /*transaction*/) {
return true;
}
+
+bool CZMQAbstractNotifier::NotifyBlockConnect(
+ const CBlockIndex * /*CBlockIndex*/) {
+ return true;
+}
+
+bool CZMQAbstractNotifier::NotifyBlockDisconnect(
+ const CBlockIndex * /*CBlockIndex*/) {
+ return true;
+}
+
+bool CZMQAbstractNotifier::NotifyTransactionAcceptance(
+ const CTransaction & /*transaction*/, uint64_t mempool_sequence) {
+ return true;
+}
+
+bool CZMQAbstractNotifier::NotifyTransactionRemoval(
+ const CTransaction & /*transaction*/, uint64_t mempool_sequence) {
+ return true;
+}
diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h
--- a/src/zmq/zmqnotificationinterface.h
+++ b/src/zmq/zmqnotificationinterface.h
@@ -26,7 +26,11 @@
void Shutdown();
// CValidationInterface
- void TransactionAddedToMempool(const CTransactionRef &tx) override;
+ void TransactionAddedToMempool(const CTransactionRef &tx,
+ uint64_t mempool_sequence) override;
+ void TransactionRemovedFromMempool(const CTransactionRef &tx,
+ MemPoolRemovalReason reason,
+ uint64_t mempool_sequence) override;
void BlockConnected(const std::shared_ptr &pblock,
const CBlockIndex *pindexConnected) override;
void BlockDisconnected(const std::shared_ptr &pblock,
diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp
--- a/src/zmq/zmqnotificationinterface.cpp
+++ b/src/zmq/zmqnotificationinterface.cpp
@@ -36,6 +36,8 @@
CZMQAbstractNotifier::Create;
factories["pubrawtx"] =
CZMQAbstractNotifier::Create;
+ factories["pubsequence"] =
+ CZMQAbstractNotifier::Create;
std::list> notifiers;
for (const auto &entry : factories) {
@@ -145,33 +147,62 @@
}
void CZMQNotificationInterface::TransactionAddedToMempool(
- const CTransactionRef &ptx) {
- // Used by BlockConnected and BlockDisconnected as well, because they're all
- // the same external callback.
+ const CTransactionRef &ptx, uint64_t mempool_sequence) {
const CTransaction &tx = *ptx;
- TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier *notifier) {
- return notifier->NotifyTransaction(tx);
- });
+ TryForEachAndRemoveFailed(
+ notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier *notifier) {
+ return notifier->NotifyTransaction(tx) &&
+ notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
+ });
+}
+
+void CZMQNotificationInterface::TransactionRemovedFromMempool(
+ const CTransactionRef &ptx, MemPoolRemovalReason reason,
+ uint64_t mempool_sequence) {
+ // Called for all non-block inclusion reasons
+ const CTransaction &tx = *ptx;
+
+ TryForEachAndRemoveFailed(
+ notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier *notifier) {
+ return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
+ });
}
void CZMQNotificationInterface::BlockConnected(
const std::shared_ptr &pblock,
const CBlockIndex *pindexConnected) {
for (const CTransactionRef &ptx : pblock->vtx) {
- // Do a normal notify for each transaction added in the block
- TransactionAddedToMempool(ptx);
+ const CTransaction &tx = *ptx;
+ TryForEachAndRemoveFailed(notifiers,
+ [&tx](CZMQAbstractNotifier *notifier) {
+ return notifier->NotifyTransaction(tx);
+ });
}
+
+ // Next we notify BlockConnect listeners for *all* blocks
+ TryForEachAndRemoveFailed(
+ notifiers, [pindexConnected](CZMQAbstractNotifier *notifier) {
+ return notifier->NotifyBlockConnect(pindexConnected);
+ });
}
void CZMQNotificationInterface::BlockDisconnected(
const std::shared_ptr &pblock,
const CBlockIndex *pindexDisconnected) {
for (const CTransactionRef &ptx : pblock->vtx) {
- // Do a normal notify for each transaction removed in block
- // disconnection
- TransactionAddedToMempool(ptx);
+ const CTransaction &tx = *ptx;
+ TryForEachAndRemoveFailed(notifiers,
+ [&tx](CZMQAbstractNotifier *notifier) {
+ return notifier->NotifyTransaction(tx);
+ });
}
+
+ // Next we notify BlockDisconnect listeners for *all* blocks
+ TryForEachAndRemoveFailed(
+ notifiers, [pindexDisconnected](CZMQAbstractNotifier *notifier) {
+ return notifier->NotifyBlockDisconnect(pindexDisconnected);
+ });
}
CZMQNotificationInterface *g_zmq_notification_interface = nullptr;
diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h
--- a/src/zmq/zmqpublishnotifier.h
+++ b/src/zmq/zmqpublishnotifier.h
@@ -47,4 +47,14 @@
bool NotifyTransaction(const CTransaction &transaction) override;
};
+class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier {
+public:
+ bool NotifyBlockConnect(const CBlockIndex *pindex) override;
+ bool NotifyBlockDisconnect(const CBlockIndex *pindex) override;
+ bool NotifyTransactionAcceptance(const CTransaction &transaction,
+ uint64_t mempool_sequence) override;
+ bool NotifyTransactionRemoval(const CTransaction &transaction,
+ uint64_t mempool_sequence) override;
+};
+
#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp
--- a/src/zmq/zmqpublishnotifier.cpp
+++ b/src/zmq/zmqpublishnotifier.cpp
@@ -30,6 +30,7 @@
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx";
+static const char *MSG_SEQUENCE = "sequence";
// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void *data, size_t size, ...) {
@@ -232,3 +233,62 @@
ss << transaction;
return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
}
+
+// TODO: Dedup this code to take label char, log string
+bool CZMQPublishSequenceNotifier::NotifyBlockConnect(
+ const CBlockIndex *pindex) {
+ BlockHash hash = pindex->GetBlockHash();
+ LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s\n",
+ hash.GetHex());
+ char data[sizeof(BlockHash) + 1];
+ for (unsigned int i = 0; i < sizeof(BlockHash); i++) {
+ data[sizeof(BlockHash) - 1 - i] = hash.begin()[i];
+ }
+ // Block (C)onnect
+ data[sizeof(data) - 1] = 'C';
+ return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
+}
+
+bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(
+ const CBlockIndex *pindex) {
+ BlockHash hash = pindex->GetBlockHash();
+ LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s\n",
+ hash.GetHex());
+ char data[sizeof(BlockHash) + 1];
+ for (unsigned int i = 0; i < sizeof(BlockHash); i++) {
+ data[sizeof(BlockHash) - 1 - i] = hash.begin()[i];
+ }
+ // Block (D)isconnect
+ data[sizeof(data) - 1] = 'D';
+ return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
+}
+
+bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(
+ const CTransaction &transaction, uint64_t mempool_sequence) {
+ TxId txid = transaction.GetId();
+ LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s\n",
+ txid.GetHex());
+ uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1];
+ for (unsigned int i = 0; i < sizeof(TxId); i++) {
+ data[sizeof(TxId) - 1 - i] = txid.begin()[i];
+ }
+ // Mempool (A)cceptance
+ data[sizeof(TxId)] = 'A';
+ WriteLE64(data + sizeof(TxId) + 1, mempool_sequence);
+ return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
+}
+
+bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(
+ const CTransaction &transaction, uint64_t mempool_sequence) {
+ TxId txid = transaction.GetId();
+ LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s\n",
+ txid.GetHex());
+ uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1];
+ for (unsigned int i = 0; i < sizeof(TxId); i++) {
+ data[sizeof(TxId) - 1 - i] = txid.begin()[i];
+ }
+ // Mempool (R)emoval
+ data[sizeof(TxId)] = 'R';
+ WriteLE64(data + sizeof(TxId) + 1, mempool_sequence);
+ return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
+}
diff --git a/test/lint/check-doc.py b/test/lint/check-doc.py
--- a/test/lint/check-doc.py
+++ b/test/lint/check-doc.py
@@ -37,6 +37,8 @@
'-zmqpubhashtxhwm',
'-zmqpubrawblockhwm',
'-zmqpubrawtxhwm',
+ '-zmqpubsequence',
+ '-zmqpubsequencehwm',
])
# list false positive undocumented arguments