diff --git a/src/net_processing.h b/src/net_processing.h index 4849a8f8e..88669e270 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -1,229 +1,229 @@ // Copyright (c) 2009-2010 Satoshi Nakamoto // Copyright (c) 2009-2016 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_NET_PROCESSING_H #define BITCOIN_NET_PROCESSING_H #include #include #include #include #include extern RecursiveMutex cs_main; extern RecursiveMutex g_cs_orphans; class BlockTransactionsRequest; class BlockValidationState; class CBlockHeader; class CTxMemPool; class ChainstateManager; class Config; class TxValidationState; /** * Default for -maxorphantx, maximum number of orphan transactions kept in * memory. */ static const unsigned int DEFAULT_MAX_ORPHAN_TRANSACTIONS = 100; /** * Default number of orphan+recently-replaced txn to keep around for block * reconstruction. */ static const unsigned int DEFAULT_BLOCK_RECONSTRUCTION_EXTRA_TXN = 100; static const bool DEFAULT_PEERBLOCKFILTERS = false; /** Threshold for marking a node to be discouraged, e.g. disconnected and added * to the discouragement filter. */ static const int DISCOURAGEMENT_THRESHOLD{100}; class PeerManager final : public CValidationInterface, public NetEventsInterface { public: PeerManager(const CChainParams &chainparams, CConnman &connman, BanMan *banman, CScheduler &scheduler, ChainstateManager &chainman, CTxMemPool &pool); /** * Overridden from CValidationInterface. */ void BlockConnected(const std::shared_ptr &pblock, const CBlockIndex *pindexConnected) override; void BlockDisconnected(const std::shared_ptr &block, const CBlockIndex *pindex) override; /** * Overridden from CValidationInterface. */ void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; /** * Overridden from CValidationInterface. */ void BlockChecked(const CBlock &block, const BlockValidationState &state) override; /** * Overridden from CValidationInterface. */ void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr &pblock) override; /** * Initialize a peer by adding it to mapNodeState and pushing a message * requesting its version. */ void InitializeNode(const Config &config, CNode *pnode) override; /** * Handle removal of a peer by updating various state and removing it from * mapNodeState. */ void FinalizeNode(const Config &config, NodeId nodeid, bool &fUpdateConnectionTime) override; /** * Process protocol messages received from a given node. */ bool ProcessMessages(const Config &config, CNode *pfrom, std::atomic &interrupt) override; /** * Send queued protocol messages to be sent to a give node. * * @param[in] pto The node which we are sending messages to. * @param[in] interrupt Interrupt condition for processing threads * @return True if there is more work to be done */ bool SendMessages(const Config &config, CNode *pto, std::atomic &interrupt) override EXCLUSIVE_LOCKS_REQUIRED(pto->cs_sendProcessing); /** * Consider evicting an outbound peer based on the amount of time they've * been behind our tip. */ void ConsiderEviction(CNode &pto, int64_t time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main); /** * Evict extra outbound peers. If we think our tip may be stale, connect to * an extra outbound. */ void CheckForStaleTipAndEvictPeers(const Consensus::Params &consensusParams); /** * If we have extra outbound peers, try to disconnect the one with the * oldest block announcement. */ void EvictExtraOutboundPeers(int64_t time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main); /** Process a single message from a peer. Public for fuzz testing */ void ProcessMessage(const Config &config, CNode &pfrom, const std::string &msg_type, CDataStream &vRecv, const std::chrono::microseconds time_received, const std::atomic &interruptMsgProc); /** * Increment peer's misbehavior score. If the new value >= * DISCOURAGEMENT_THRESHOLD, mark the node to be discouraged, meaning the * peer might be disconnected and added to the discouragement filter. Public * for unit testing. */ void Misbehaving(const NodeId pnode, const int howmuch, const std::string &message); /** * Retrieve unbroadcast transactions from the mempool and reattempt * sending to peers */ void ReattemptInitialBroadcast(CScheduler &scheduler) const; private: // overloaded variant of above to operate on CNode*s void Misbehaving(const CNode &node, int howmuch, const std::string &message) { Misbehaving(node.GetId(), howmuch, message); } /** * Potentially mark a node discouraged based on the contents of a * BlockValidationState object * * @param[in] via_compact_block this bool is passed in because * net_processing should punish peers differently depending on whether the * data was provided in a compact block message or not. If the compact block * had a valid header, but contained invalid txs, the peer should not be * punished. See BIP 152. * * @return Returns true if the peer was punished (probably disconnected) */ bool MaybePunishNodeForBlock(NodeId nodeid, const BlockValidationState &state, bool via_compact_block, const std::string &message = ""); /** * Potentially disconnect and discourage a node based on the contents of a * TxValidationState object * * @return Returns true if the peer was punished (probably disconnected) */ bool MaybePunishNodeForTx(NodeId nodeid, const TxValidationState &state, const std::string &message = ""); /** * Maybe disconnect a peer and discourage future connections from its * address. * * @param[in] pnode The node to check. * @return True if the peer was marked for disconnection in * this function */ bool MaybeDiscourageAndDisconnect(CNode &pnode); void ProcessOrphanTx(const Config &config, std::set &orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans); /** Process a single headers message from a peer. */ void ProcessHeadersMessage(const Config &config, CNode &pfrom, const std::vector &headers, bool via_compact_block); void SendBlockTransactions(CNode &pfrom, const CBlock &block, const BlockTransactionsRequest &req); /** * Register with TxRequestTracker that an INV has been received from a * peer. The announcement parameters are decided in PeerManager and then * passed to TxRequestTracker. */ void AddTxAnnouncement(const CNode &node, const TxId &txid, std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(::cs_main); const CChainParams &m_chainparams; CConnman &m_connman; /** * Pointer to this node's banman. May be nullptr - check existence before * dereferencing. */ BanMan *const m_banman; ChainstateManager &m_chainman; CTxMemPool &m_mempool; - TxRequestTracker m_txrequest GUARDED_BY(::cs_main); + TxRequestTracker m_txrequest GUARDED_BY(::cs_main); //! Next time to check for stale tip int64_t m_stale_tip_check_time; }; struct CNodeStateStats { int m_misbehavior_score = 0; int nSyncHeight = -1; int nCommonHeight = -1; std::vector vHeightInFlight; }; /** Get statistics from node state */ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats); /** Relay transaction to every node */ void RelayTransaction(const TxId &txid, const CConnman &connman); #endif // BITCOIN_NET_PROCESSING_H diff --git a/src/test/fuzz/txrequest.cpp b/src/test/fuzz/txrequest.cpp index 48dfb0401..029880d36 100644 --- a/src/test/fuzz/txrequest.cpp +++ b/src/test/fuzz/txrequest.cpp @@ -1,406 +1,406 @@ // Copyright (c) 2020 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include #include namespace { constexpr int MAX_TXIDS = 16; constexpr int MAX_PEERS = 16; //! Randomly generated TxIds used in this test (length is MAX_TXIDS). TxId TXIDS[MAX_TXIDS]; //! Precomputed random durations (positive and negative, each ~exponentially //! distributed). std::chrono::microseconds DELAYS[256]; struct Initializer { Initializer() { for (uint8_t txid = 0; txid < MAX_TXIDS; txid += 1) { CSHA256().Write(&txid, 1).Finalize(TXIDS[txid].begin()); } int i = 0; // DELAYS[N] for N=0..15 is just N microseconds. for (; i < 16; ++i) { DELAYS[i] = std::chrono::microseconds{i}; } // DELAYS[N] for N=16..127 has randomly-looking but roughly // exponentially increasing values up to 198.416453 seconds. for (; i < 128; ++i) { int diff_bits = ((i - 10) * 2) / 9; uint64_t diff = 1 + (CSipHasher(0, 0).Write(i).Finalize() >> (64 - diff_bits)); DELAYS[i] = DELAYS[i - 1] + std::chrono::microseconds{diff}; } // DELAYS[N] for N=128..255 are negative delays with the same magnitude // as N=0..127. for (; i < 256; ++i) { DELAYS[i] = -DELAYS[255 - i]; } } } g_initializer; /** * Tester class for TxRequestTracker * * It includes a naive reimplementation of its behavior, for a limited set * of MAX_TXIDS distinct txids, and MAX_PEERS peer identifiers. * * All of the public member functions perform the same operation on * an actual TxRequestTracker and on the state of the reimplementation. * The output of GetRequestable is compared with the expected value * as well. * * Check() calls the TxRequestTracker's sanity check, plus compares the * output of the constant accessors (Size(), CountLoad(), CountTracked()) * with expected values. */ class Tester { //! TxRequestTracker object being tested. - TxRequestTracker m_tracker; + TxRequestTracker m_tracker; //! States for txid/peer combinations in the naive data structure. enum class State { //! Absence of this txid/peer combination NOTHING, // Note that this implementation does not distinguish between // DELAYED/READY/BEST variants of CANDIDATE. CANDIDATE, REQUESTED, COMPLETED, }; //! Sequence numbers, incremented whenever a new CANDIDATE is added. uint64_t m_current_sequence{0}; //! List of future 'events' (all inserted reqtimes/exptimes). This is used //! to implement AdvanceToEvent. std::priority_queue, std::greater> m_events; //! Information about a txid/peer combination. struct Announcement { std::chrono::microseconds m_time; uint64_t m_sequence; State m_state{State::NOTHING}; bool m_preferred; //! Precomputed priority. uint64_t m_priority; }; //! Information about all txid/peer combination. Announcement m_announcements[MAX_TXIDS][MAX_PEERS]; //! The current time; can move forward and backward. std::chrono::microseconds m_now{244466666}; //! Delete txids whose only announcements are COMPLETED. void Cleanup(int txid) { bool all_nothing = true; for (int peer = 0; peer < MAX_PEERS; ++peer) { const Announcement &ann = m_announcements[txid][peer]; if (ann.m_state != State::NOTHING) { if (ann.m_state != State::COMPLETED) { return; } all_nothing = false; } } if (all_nothing) { return; } for (int peer = 0; peer < MAX_PEERS; ++peer) { m_announcements[txid][peer].m_state = State::NOTHING; } } //! Find the current best peer to request from for a txid (or -1 if none). int GetSelected(int txhash) const { int ret = -1; uint64_t ret_priority = 0; for (int peer = 0; peer < MAX_PEERS; ++peer) { const Announcement &ann = m_announcements[txhash][peer]; // Return -1 if there already is a (non-expired) in-flight request. if (ann.m_state == State::REQUESTED) { return -1; } // If it's a viable candidate, see if it has lower priority than the // best one so far. if (ann.m_state == State::CANDIDATE && ann.m_time <= m_now) { if (ret == -1 || ann.m_priority > ret_priority) { std::tie(ret, ret_priority) = std::tie(peer, ann.m_priority); } } } return ret; } public: Tester() : m_tracker(true) {} std::chrono::microseconds Now() const { return m_now; } void AdvanceTime(std::chrono::microseconds offset) { m_now += offset; while (!m_events.empty() && m_events.top() <= m_now) { m_events.pop(); } } void AdvanceToEvent() { while (!m_events.empty() && m_events.top() <= m_now) { m_events.pop(); } if (!m_events.empty()) { m_now = m_events.top(); m_events.pop(); } } void DisconnectedPeer(int peer) { // Apply to naive structure: all announcements for that peer are wiped. for (int txid = 0; txid < MAX_TXIDS; ++txid) { if (m_announcements[txid][peer].m_state != State::NOTHING) { m_announcements[txid][peer].m_state = State::NOTHING; Cleanup(txid); } } // Call TxRequestTracker's implementation. m_tracker.DisconnectedPeer(peer); } void ForgetTxId(int txid) { // Apply to naive structure: all announcements for that txid are wiped. for (int peer = 0; peer < MAX_PEERS; ++peer) { m_announcements[txid][peer].m_state = State::NOTHING; } Cleanup(txid); // Call TxRequestTracker's implementation. m_tracker.ForgetTxId(TXIDS[txid]); } void ReceivedInv(int peer, int txid, bool is_wtxid, bool preferred, std::chrono::microseconds reqtime) { // Apply to naive structure: if no announcement for txidnum/peer // combination already, create a new CANDIDATE; otherwise do nothing. Announcement &ann = m_announcements[txid][peer]; if (ann.m_state == State::NOTHING) { ann.m_preferred = preferred; ann.m_state = State::CANDIDATE; ann.m_time = reqtime; ann.m_sequence = m_current_sequence++; ann.m_priority = m_tracker.ComputePriority(TXIDS[txid], peer, ann.m_preferred); // Add event so that AdvanceToEvent can quickly jump to the point // where its reqtime passes. if (reqtime > m_now) { m_events.push(reqtime); } } // Call TxRequestTracker's implementation. m_tracker.ReceivedInv(peer, TXIDS[txid], preferred, reqtime); } void RequestedTx(int peer, int txid, std::chrono::microseconds exptime) { // Apply to naive structure: if a CANDIDATE announcement exists for // peer/txid, convert it to REQUESTED, and change any existing REQUESTED // announcement for the same txid to COMPLETED. if (m_announcements[txid][peer].m_state == State::CANDIDATE) { for (int peer2 = 0; peer2 < MAX_PEERS; ++peer2) { if (m_announcements[txid][peer2].m_state == State::REQUESTED) { m_announcements[txid][peer2].m_state = State::COMPLETED; } } m_announcements[txid][peer].m_state = State::REQUESTED; m_announcements[txid][peer].m_time = exptime; } // Add event so that AdvanceToEvent can quickly jump to the point where // its exptime passes. if (exptime > m_now) { m_events.push(exptime); } // Call TxRequestTracker's implementation. m_tracker.RequestedTx(peer, TXIDS[txid], exptime); } void ReceivedResponse(int peer, int txid) { // Apply to naive structure: convert anything to COMPLETED. if (m_announcements[txid][peer].m_state != State::NOTHING) { m_announcements[txid][peer].m_state = State::COMPLETED; Cleanup(txid); } // Call TxRequestTracker's implementation. m_tracker.ReceivedResponse(peer, TXIDS[txid]); } void GetRequestable(int peer) { // Implement using naive structure: //! list of (sequence number, txid) tuples. std::vector> result; std::vector> expected_expired; for (int txid = 0; txid < MAX_TXIDS; ++txid) { // Mark any expired REQUESTED announcements as COMPLETED. for (int peer2 = 0; peer2 < MAX_PEERS; ++peer2) { Announcement &ann2 = m_announcements[txid][peer2]; if (ann2.m_state == State::REQUESTED && ann2.m_time <= m_now) { expected_expired.emplace_back(peer2, TXIDS[txid]); ann2.m_state = State::COMPLETED; break; } } // And delete txids with only COMPLETED announcements left. Cleanup(txid); // CANDIDATEs for which this announcement has the highest priority // get returned. const Announcement &ann = m_announcements[txid][peer]; if (ann.m_state == State::CANDIDATE && GetSelected(txid) == peer) { result.emplace_back(ann.m_sequence, txid); } } // Sort the results by sequence number. std::sort(result.begin(), result.end()); std::sort(expected_expired.begin(), expected_expired.end()); // Compare with TxRequestTracker's implementation. std::vector> expired; const auto actual = m_tracker.GetRequestable(peer, m_now, &expired); std::sort(expired.begin(), expired.end()); assert(expired == expected_expired); m_tracker.PostGetRequestableSanityCheck(m_now); assert(result.size() == actual.size()); for (size_t pos = 0; pos < actual.size(); ++pos) { assert(TXIDS[std::get<1>(result[pos])] == actual[pos]); } } void Check() { // Compare CountTracked and CountLoad with naive structure. size_t total = 0; for (int peer = 0; peer < MAX_PEERS; ++peer) { size_t tracked = 0; size_t inflight = 0; size_t candidates = 0; for (int txid = 0; txid < MAX_TXIDS; ++txid) { tracked += m_announcements[txid][peer].m_state != State::NOTHING; inflight += m_announcements[txid][peer].m_state == State::REQUESTED; candidates += m_announcements[txid][peer].m_state == State::CANDIDATE; } assert(m_tracker.Count(peer) == tracked); assert(m_tracker.CountInFlight(peer) == inflight); assert(m_tracker.CountCandidates(peer) == candidates); total += tracked; } // Compare Size. assert(m_tracker.Size() == total); // Invoke internal consistency check of TxRequestTracker object. m_tracker.SanityCheck(); } }; } // namespace void test_one_input(const std::vector &buffer) { // Tester object (which encapsulates a TxRequestTracker). Tester tester; // Decode the input as a sequence of instructions with parameters auto it = buffer.begin(); while (it != buffer.end()) { int cmd = *(it++) % 11; int peer, txidnum, delaynum; switch (cmd) { case 0: // Make time jump to the next event (m_time of CANDIDATE or // REQUESTED) tester.AdvanceToEvent(); break; case 1: // Change time delaynum = it == buffer.end() ? 0 : *(it++); tester.AdvanceTime(DELAYS[delaynum]); break; case 2: // Query for requestable txs peer = it == buffer.end() ? 0 : *(it++) % MAX_PEERS; tester.GetRequestable(peer); break; case 3: // Peer went offline peer = it == buffer.end() ? 0 : *(it++) % MAX_PEERS; tester.DisconnectedPeer(peer); break; case 4: // No longer need tx txidnum = it == buffer.end() ? 0 : *(it++); tester.ForgetTxId(txidnum % MAX_TXIDS); break; case 5: // Received immediate preferred inv case 6: // Same, but non-preferred. peer = it == buffer.end() ? 0 : *(it++) % MAX_PEERS; txidnum = it == buffer.end() ? 0 : *(it++); tester.ReceivedInv(peer, txidnum % MAX_TXIDS, (txidnum / MAX_TXIDS) & 1, cmd & 1, std::chrono::microseconds::min()); break; case 7: // Received delayed preferred inv case 8: // Same, but non-preferred. peer = it == buffer.end() ? 0 : *(it++) % MAX_PEERS; txidnum = it == buffer.end() ? 0 : *(it++); delaynum = it == buffer.end() ? 0 : *(it++); tester.ReceivedInv(peer, txidnum % MAX_TXIDS, (txidnum / MAX_TXIDS) & 1, cmd & 1, tester.Now() + DELAYS[delaynum]); break; case 9: // Requested tx from peer peer = it == buffer.end() ? 0 : *(it++) % MAX_PEERS; txidnum = it == buffer.end() ? 0 : *(it++); delaynum = it == buffer.end() ? 0 : *(it++); tester.RequestedTx(peer, txidnum % MAX_TXIDS, tester.Now() + DELAYS[delaynum]); break; case 10: // Received response peer = it == buffer.end() ? 0 : *(it++) % MAX_PEERS; txidnum = it == buffer.end() ? 0 : *(it++); tester.ReceivedResponse(peer, txidnum % MAX_TXIDS); break; default: assert(false); } } tester.Check(); } diff --git a/src/test/txrequest_tests.cpp b/src/test/txrequest_tests.cpp index 579054572..3da4b5bcc 100644 --- a/src/test/txrequest_tests.cpp +++ b/src/test/txrequest_tests.cpp @@ -1,766 +1,766 @@ // Copyright (c) 2020 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include BOOST_FIXTURE_TEST_SUITE(txrequest_tests, BasicTestingSetup) namespace { constexpr std::chrono::microseconds MIN_TIME = std::chrono::microseconds::min(); constexpr std::chrono::microseconds MAX_TIME = std::chrono::microseconds::max(); constexpr std::chrono::microseconds MICROSECOND = std::chrono::microseconds{1}; constexpr std::chrono::microseconds NO_TIME = std::chrono::microseconds{0}; /** An Action is a function to call at a particular (simulated) timestamp. */ using Action = std::pair>; /** * Object that stores actions from multiple interleaved scenarios, and data * shared across them. * * The Scenario below is used to fill this. */ struct Runner { /** The TxRequestTracker being tested. */ - TxRequestTracker txrequest; + TxRequestTracker txrequest; /** List of actions to be executed (in order of increasing timestamp). */ std::vector actions; /** Which node ids have been assigned already (to prevent reuse). */ std::set peerset; /** Which txids have been assigned already (to prevent reuse). */ std::set txidset; /** * Which (peer, txid) combinations are known to be expired. These need to be * accumulated here instead of checked directly in the GetRequestable return * value to avoid introducing a dependency between the various parallel * tests. */ std::multiset> expired; }; std::chrono::microseconds RandomTime8s() { return std::chrono::microseconds{1 + InsecureRandBits(23)}; } std::chrono::microseconds RandomTime1y() { return std::chrono::microseconds{1 + InsecureRandBits(45)}; } /** * A proxy for a Runner that helps build a sequence of consecutive test actions * on a TxRequestTracker. * * Each Scenario is a proxy through which actions for the (sequential) execution * of various tests are added to a Runner. The actions from multiple scenarios * are then run concurrently, resulting in these tests being performed against a * TxRequestTracker in parallel. Every test has its own unique txids and * NodeIds which are not reused in other tests, and thus they should be * independent from each other. Running them in parallel however means that we * verify the behavior (w.r.t. one test's txids and NodeIds) even when the * state of the data structure is more complicated due to the presence of other * tests. */ class Scenario { Runner &m_runner; std::chrono::microseconds m_now; std::string m_testname; public: Scenario(Runner &runner, std::chrono::microseconds starttime) : m_runner(runner), m_now(starttime) {} /** Set a name for the current test, to give more clear error messages. */ void SetTestName(std::string testname) { m_testname = std::move(testname); } /** * Advance this Scenario's time; this affects the timestamps newly * scheduled events get. */ void AdvanceTime(std::chrono::microseconds amount) { assert(amount.count() >= 0); m_now += amount; } /** Schedule a ForgetTxId call at the Scheduler's current time. */ void ForgetTxId(const TxId &txid) { auto &runner = m_runner; runner.actions.emplace_back(m_now, [=, &runner]() { runner.txrequest.ForgetTxId(txid); runner.txrequest.SanityCheck(); }); } /** Schedule a ReceivedInv call at the Scheduler's current time. */ void ReceivedInv(NodeId peer, const TxId &txid, bool pref, std::chrono::microseconds reqtime) { auto &runner = m_runner; runner.actions.emplace_back(m_now, [=, &runner]() { runner.txrequest.ReceivedInv(peer, txid, pref, reqtime); runner.txrequest.SanityCheck(); }); } /** Schedule a DisconnectedPeer call at the Scheduler's current time. */ void DisconnectedPeer(NodeId peer) { auto &runner = m_runner; runner.actions.emplace_back(m_now, [=, &runner]() { runner.txrequest.DisconnectedPeer(peer); runner.txrequest.SanityCheck(); }); } /** Schedule a RequestedTx call at the Scheduler's current time. */ void RequestedTx(NodeId peer, const TxId &txid, std::chrono::microseconds exptime) { auto &runner = m_runner; runner.actions.emplace_back(m_now, [=, &runner]() { runner.txrequest.RequestedTx(peer, txid, exptime); runner.txrequest.SanityCheck(); }); } /** Schedule a ReceivedResponse call at the Scheduler's current time. */ void ReceivedResponse(NodeId peer, const TxId &txid) { auto &runner = m_runner; runner.actions.emplace_back(m_now, [=, &runner]() { runner.txrequest.ReceivedResponse(peer, txid); runner.txrequest.SanityCheck(); }); } /** * Schedule calls to verify the TxRequestTracker's state at the Scheduler's * current time. * * @param peer The peer whose state will be inspected. * @param expected The expected return value for GetRequestable(peer) * @param candidates The expected return value CountCandidates(peer) * @param inflight The expected return value CountInFlight(peer) * @param completed The expected return value of Count(peer), minus * candidates and inflight. * @param checkname An arbitrary string to include in error messages, for * test identificatrion. * @param offset Offset with the current time to use (must be <= 0). * This allows simulations of time going backwards (but note that the * ordering of this event only follows the scenario's m_now. */ void Check(NodeId peer, const std::vector &expected, size_t candidates, size_t inflight, size_t completed, const std::string &checkname, std::chrono::microseconds offset = std::chrono::microseconds{0}) { const auto comment = m_testname + " " + checkname; auto &runner = m_runner; const auto now = m_now; assert(offset.count() <= 0); runner.actions.emplace_back(m_now, [=, &runner]() { std::vector> expired_now; auto ret = runner.txrequest.GetRequestable(peer, now + offset, &expired_now); for (const auto &entry : expired_now) { runner.expired.insert(entry); } runner.txrequest.SanityCheck(); runner.txrequest.PostGetRequestableSanityCheck(now + offset); size_t total = candidates + inflight + completed; size_t real_total = runner.txrequest.Count(peer); size_t real_candidates = runner.txrequest.CountCandidates(peer); size_t real_inflight = runner.txrequest.CountInFlight(peer); BOOST_CHECK_MESSAGE( real_total == total, strprintf("[" + comment + "] total %i (%i expected)", real_total, total)); BOOST_CHECK_MESSAGE( real_inflight == inflight, strprintf("[" + comment + "] inflight %i (%i expected)", real_inflight, inflight)); BOOST_CHECK_MESSAGE( real_candidates == candidates, strprintf("[" + comment + "] candidates %i (%i expected)", real_candidates, candidates)); BOOST_CHECK_MESSAGE(ret == expected, "[" + comment + "] mismatching requestables"); }); } /** * Verify that an announcement for txid by peer has expired some time before * this check is scheduled. * * Every expected expiration should be accounted for through exactly one * call to this function. */ void CheckExpired(NodeId peer, TxId txid) { const auto &testname = m_testname; auto &runner = m_runner; runner.actions.emplace_back(m_now, [=, &runner]() { auto it = runner.expired.find(std::pair{peer, txid}); BOOST_CHECK_MESSAGE(it != runner.expired.end(), "[" + testname + "] missing expiration"); if (it != runner.expired.end()) { runner.expired.erase(it); } }); } /** * Generate a random txid, whose priorities for certain peers are * constrained. * * For example, NewTxId({{p1,p2,p3},{p2,p4,p5}}) will generate a txid T * such that both: * - priority(p1,T) > priority(p2,T) > priority(p3,T) * - priority(p2,T) > priority(p4,T) > priority(p5,T) * where priority is the predicted internal TxRequestTracker's priority, * assuming all announcements are within the same preferredness class. */ TxId NewTxId(const std::vector> &orders = {}) { TxId ret; bool ok; do { ret = TxId(InsecureRand256()); ok = true; for (const auto &order : orders) { for (size_t pos = 1; pos < order.size(); ++pos) { uint64_t prio_prev = m_runner.txrequest.ComputePriority( ret, order[pos - 1], true); uint64_t prio_cur = m_runner.txrequest.ComputePriority( ret, order[pos], true); if (prio_prev <= prio_cur) { ok = false; break; } } if (!ok) { break; } } if (ok) { ok = m_runner.txidset.insert(ret).second; } } while (!ok); return ret; } /** * Generate a new random NodeId to use as peer. The same NodeId is never * returned twice (across all Scenarios combined). */ NodeId NewPeer() { bool ok; NodeId ret; do { ret = InsecureRandBits(63); ok = m_runner.peerset.insert(ret).second; } while (!ok); return ret; } std::chrono::microseconds Now() const { return m_now; } }; /** * Add to scenario a test with a single tx announced by a single peer. * * config is an integer in [0, 32), which controls which variant of the test is * used. */ void BuildSingleTest(Scenario &scenario, int config) { auto peer = scenario.NewPeer(); auto txid = scenario.NewTxId(); bool immediate = config & 1; bool preferred = config & 2; auto delay = immediate ? NO_TIME : RandomTime8s(); scenario.SetTestName(strprintf("Single(config=%i)", config)); // Receive an announcement, either immediately requestable or delayed. scenario.ReceivedInv(peer, txid, preferred, immediate ? MIN_TIME : scenario.Now() + delay); if (immediate) { scenario.Check(peer, {txid}, 1, 0, 0, "s1"); } else { scenario.Check(peer, {}, 1, 0, 0, "s2"); scenario.AdvanceTime(delay - MICROSECOND); scenario.Check(peer, {}, 1, 0, 0, "s3"); scenario.AdvanceTime(MICROSECOND); scenario.Check(peer, {txid}, 1, 0, 0, "s4"); } if (config >> 3) { // We'll request the transaction scenario.AdvanceTime(RandomTime8s()); auto expiry = RandomTime8s(); scenario.Check(peer, {txid}, 1, 0, 0, "s5"); scenario.RequestedTx(peer, txid, scenario.Now() + expiry); scenario.Check(peer, {}, 0, 1, 0, "s6"); if ((config >> 3) == 1) { // The request will time out scenario.AdvanceTime(expiry - MICROSECOND); scenario.Check(peer, {}, 0, 1, 0, "s7"); scenario.AdvanceTime(MICROSECOND); scenario.Check(peer, {}, 0, 0, 0, "s8"); scenario.CheckExpired(peer, txid); return; } else { scenario.AdvanceTime( std::chrono::microseconds{InsecureRandRange(expiry.count())}); scenario.Check(peer, {}, 0, 1, 0, "s9"); if ((config >> 3) == 3) { // A response will arrive for the transaction scenario.ReceivedResponse(peer, txid); scenario.Check(peer, {}, 0, 0, 0, "s10"); return; } } } if (config & 4) { // The peer will go offline scenario.DisconnectedPeer(peer); } else { // The transaction is no longer needed scenario.ForgetTxId(txid); } scenario.Check(peer, {}, 0, 0, 0, "s11"); } /** * Add to scenario a test with a single tx announced by two peers, to verify * the right peer is selected for requests. * * config is an integer in [0, 32), which controls which variant of the test is * used. */ void BuildPriorityTest(Scenario &scenario, int config) { scenario.SetTestName(strprintf("Priority(config=%i)", config)); // Two peers. They will announce in order {peer1, peer2}. auto peer1 = scenario.NewPeer(), peer2 = scenario.NewPeer(); // Construct a transaction that under random rules would be preferred by // peer2 or peer1, depending on configuration. bool prio1 = config & 1; auto txid = prio1 ? scenario.NewTxId({{peer1, peer2}}) : scenario.NewTxId({{peer2, peer1}}); bool pref1 = config & 2, pref2 = config & 4; scenario.ReceivedInv(peer1, txid, pref1, MIN_TIME); scenario.Check(peer1, {txid}, 1, 0, 0, "p1"); if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); scenario.Check(peer1, {txid}, 1, 0, 0, "p2"); } scenario.ReceivedInv(peer2, txid, pref2, MIN_TIME); bool stage2_prio = // At this point, peer2 will be given priority if: // - It is preferred and peer1 is not (pref2 && !pref1) || // - They're in the same preference class, // and the randomized priority favors peer2 over peer1. (pref1 == pref2 && !prio1); NodeId priopeer = stage2_prio ? peer2 : peer1, otherpeer = stage2_prio ? peer1 : peer2; scenario.Check(otherpeer, {}, 1, 0, 0, "p3"); scenario.Check(priopeer, {txid}, 1, 0, 0, "p4"); if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } scenario.Check(otherpeer, {}, 1, 0, 0, "p5"); scenario.Check(priopeer, {txid}, 1, 0, 0, "p6"); // We possibly request from the selected peer. if (config & 8) { scenario.RequestedTx(priopeer, txid, MAX_TIME); scenario.Check(priopeer, {}, 0, 1, 0, "p7"); scenario.Check(otherpeer, {}, 1, 0, 0, "p8"); if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } } // The peer which was selected (or requested from) now goes offline, or a // NOTFOUND is received from them. if (config & 16) { scenario.DisconnectedPeer(priopeer); } else { scenario.ReceivedResponse(priopeer, txid); } if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } scenario.Check(priopeer, {}, 0, 0, !(config & 16), "p8"); scenario.Check(otherpeer, {txid}, 1, 0, 0, "p9"); if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } // Now the other peer goes offline. scenario.DisconnectedPeer(otherpeer); if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } scenario.Check(peer1, {}, 0, 0, 0, "p10"); scenario.Check(peer2, {}, 0, 0, 0, "p11"); } /** * Add to scenario a randomized test in which N peers announce the same * transaction, to verify the order in which they are requested. */ void BuildBigPriorityTest(Scenario &scenario, int peers) { scenario.SetTestName(strprintf("BigPriority(peers=%i)", peers)); // We will have N peers announce the same transaction. std::map preferred; std::vector pref_peers, npref_peers; // Some preferred, ... int num_pref = InsecureRandRange(peers + 1); // some not preferred. int num_npref = peers - num_pref; for (int i = 0; i < num_pref; ++i) { pref_peers.push_back(scenario.NewPeer()); preferred[pref_peers.back()] = true; } for (int i = 0; i < num_npref; ++i) { npref_peers.push_back(scenario.NewPeer()); preferred[npref_peers.back()] = false; } // Make a list of all peers, in order of intended request order // (concatenation of pref_peers and npref_peers). std::vector request_order; for (int i = 0; i < num_pref; ++i) { request_order.push_back(pref_peers[i]); } for (int i = 0; i < num_npref; ++i) { request_order.push_back(npref_peers[i]); } // Determine the announcement order randomly. std::vector announce_order = request_order; Shuffle(announce_order.begin(), announce_order.end(), g_insecure_rand_ctx); // Find a txid whose prioritization is consistent with the required // ordering within pref_peers and within npref_peers. auto txid = scenario.NewTxId({pref_peers, npref_peers}); // Decide reqtimes in opposite order of the expected request order. This // means that as time passes we expect the to-be-requested-from-peer will // change every time a subsequent reqtime is passed. std::map reqtimes; auto reqtime = scenario.Now(); for (int i = peers - 1; i >= 0; --i) { reqtime += RandomTime8s(); reqtimes[request_order[i]] = reqtime; } // Actually announce from all peers simultaneously (but in announce_order). for (const auto peer : announce_order) { scenario.ReceivedInv(peer, txid, preferred[peer], reqtimes[peer]); } for (const auto peer : announce_order) { scenario.Check(peer, {}, 1, 0, 0, "b1"); } // Let time pass and observe the to-be-requested-from peer change, from // nonpreferred to preferred, and from high priority to low priority within // each class. for (int i = peers - 1; i >= 0; --i) { scenario.AdvanceTime(reqtimes[request_order[i]] - scenario.Now() - MICROSECOND); scenario.Check(request_order[i], {}, 1, 0, 0, "b2"); scenario.AdvanceTime(MICROSECOND); scenario.Check(request_order[i], {txid}, 1, 0, 0, "b3"); } // Peers now in random order go offline, or send NOTFOUNDs. At every point // in time the new to-be-requested-from peer should be the best remaining // one, so verify this after every response. for (int i = 0; i < peers; ++i) { if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } const int pos = InsecureRandRange(request_order.size()); const auto peer = request_order[pos]; request_order.erase(request_order.begin() + pos); if (InsecureRandBool()) { scenario.DisconnectedPeer(peer); scenario.Check(peer, {}, 0, 0, 0, "b4"); } else { scenario.ReceivedResponse(peer, txid); scenario.Check(peer, {}, 0, 0, request_order.size() > 0, "b5"); } if (request_order.size()) { scenario.Check(request_order[0], {txid}, 1, 0, 0, "b6"); } } // Everything is gone in the end. for (const auto peer : announce_order) { scenario.Check(peer, {}, 0, 0, 0, "b7"); } } /** * Add to scenario a test with one peer announcing two transactions, to verify * they are fetched in announcement order. * * config is an integer in [0, 4) inclusive, and selects the variant of the * test. */ void BuildRequestOrderTest(Scenario &scenario, int config) { scenario.SetTestName(strprintf("RequestOrder(config=%i)", config)); auto peer = scenario.NewPeer(); auto txid1 = scenario.NewTxId(); auto txid2 = scenario.NewTxId(); auto reqtime2 = scenario.Now() + RandomTime8s(); auto reqtime1 = reqtime2 + RandomTime8s(); scenario.ReceivedInv(peer, txid1, config & 1, reqtime1); // Simulate time going backwards by giving the second announcement an // earlier reqtime. scenario.ReceivedInv(peer, txid2, config & 2, reqtime2); scenario.AdvanceTime(reqtime2 - MICROSECOND - scenario.Now()); scenario.Check(peer, {}, 2, 0, 0, "o1"); scenario.AdvanceTime(MICROSECOND); scenario.Check(peer, {txid2}, 2, 0, 0, "o2"); scenario.AdvanceTime(reqtime1 - MICROSECOND - scenario.Now()); scenario.Check(peer, {txid2}, 2, 0, 0, "o3"); scenario.AdvanceTime(MICROSECOND); // Even with time going backwards in between announcements, the return value // of GetRequestable is in announcement order. scenario.Check(peer, {txid1, txid2}, 2, 0, 0, "o4"); scenario.DisconnectedPeer(peer); scenario.Check(peer, {}, 0, 0, 0, "o5"); } /** Add to scenario a test that exercises clocks that go backwards. */ void BuildTimeBackwardsTest(Scenario &scenario) { auto peer1 = scenario.NewPeer(); auto peer2 = scenario.NewPeer(); auto txid = scenario.NewTxId({{peer1, peer2}}); // Announce from peer2. auto reqtime = scenario.Now() + RandomTime8s(); scenario.ReceivedInv(peer2, txid, true, reqtime); scenario.Check(peer2, {}, 1, 0, 0, "r1"); scenario.AdvanceTime(reqtime - scenario.Now()); scenario.Check(peer2, {txid}, 1, 0, 0, "r2"); // Check that if the clock goes backwards by 1us, the transaction would stop // being requested. scenario.Check(peer2, {}, 1, 0, 0, "r3", -MICROSECOND); // But it reverts to being requested if time goes forward again. scenario.Check(peer2, {txid}, 1, 0, 0, "r4"); // Announce from peer1. if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } scenario.ReceivedInv(peer1, txid, true, MAX_TIME); scenario.Check(peer2, {txid}, 1, 0, 0, "r5"); scenario.Check(peer1, {}, 1, 0, 0, "r6"); // Request from peer1. if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } auto expiry = scenario.Now() + RandomTime8s(); scenario.RequestedTx(peer1, txid, expiry); scenario.Check(peer1, {}, 0, 1, 0, "r7"); scenario.Check(peer2, {}, 1, 0, 0, "r8"); // Expiration passes. scenario.AdvanceTime(expiry - scenario.Now()); scenario.Check(peer1, {}, 0, 0, 1, "r9"); // Request goes back to peer2. scenario.Check(peer2, {txid}, 1, 0, 0, "r10"); scenario.CheckExpired(peer1, txid); // Going back does not unexpire. scenario.Check(peer1, {}, 0, 0, 1, "r11", -MICROSECOND); scenario.Check(peer2, {txid}, 1, 0, 0, "r12", -MICROSECOND); // Peer2 goes offline, meaning no viable announcements remain. if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } scenario.DisconnectedPeer(peer2); scenario.Check(peer1, {}, 0, 0, 0, "r13"); scenario.Check(peer2, {}, 0, 0, 0, "r14"); } /** * Add to scenario a test that involves RequestedTx() calls for txids not * returned by GetRequestable. */ void BuildWeirdRequestsTest(Scenario &scenario) { auto peer1 = scenario.NewPeer(); auto peer2 = scenario.NewPeer(); auto txid1 = scenario.NewTxId({{peer1, peer2}}); auto txid2 = scenario.NewTxId({{peer2, peer1}}); // Announce txid1 by peer1. scenario.ReceivedInv(peer1, txid1, true, MIN_TIME); scenario.Check(peer1, {txid1}, 1, 0, 0, "q1"); // Announce txid2 by peer2. if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } scenario.ReceivedInv(peer2, txid2, true, MIN_TIME); scenario.Check(peer1, {txid1}, 1, 0, 0, "q2"); scenario.Check(peer2, {txid2}, 1, 0, 0, "q3"); // We request txid2 from *peer1* - no effect. if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } scenario.RequestedTx(peer1, txid2, MAX_TIME); scenario.Check(peer1, {txid1}, 1, 0, 0, "q4"); scenario.Check(peer2, {txid2}, 1, 0, 0, "q5"); // Now request txid1 from peer1 - marks it as REQUESTED. if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } auto expiryA = scenario.Now() + RandomTime8s(); scenario.RequestedTx(peer1, txid1, expiryA); scenario.Check(peer1, {}, 0, 1, 0, "q6"); scenario.Check(peer2, {txid2}, 1, 0, 0, "q7"); // Request it a second time - nothing happens, as it's already REQUESTED. auto expiryB = expiryA + RandomTime8s(); scenario.RequestedTx(peer1, txid1, expiryB); scenario.Check(peer1, {}, 0, 1, 0, "q8"); scenario.Check(peer2, {txid2}, 1, 0, 0, "q9"); // Also announce txid1 from peer2 now, so that the txid isn't forgotten // when the peer1 request expires. scenario.ReceivedInv(peer2, txid1, true, MIN_TIME); scenario.Check(peer1, {}, 0, 1, 0, "q10"); scenario.Check(peer2, {txid2}, 2, 0, 0, "q11"); // When reaching expiryA, it expires (not expiryB, which is later). scenario.AdvanceTime(expiryA - scenario.Now()); scenario.Check(peer1, {}, 0, 0, 1, "q12"); scenario.Check(peer2, {txid2, txid1}, 2, 0, 0, "q13"); scenario.CheckExpired(peer1, txid1); // Requesting it yet again from peer1 doesn't do anything, as it's already // COMPLETED. if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } scenario.RequestedTx(peer1, txid1, MAX_TIME); scenario.Check(peer1, {}, 0, 0, 1, "q14"); scenario.Check(peer2, {txid2, txid1}, 2, 0, 0, "q15"); // Now announce txid2 from peer1. if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } scenario.ReceivedInv(peer1, txid2, true, MIN_TIME); scenario.Check(peer1, {}, 1, 0, 1, "q16"); scenario.Check(peer2, {txid2, txid1}, 2, 0, 0, "q17"); // And request it from peer1 (weird as peer2 has the preference). if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } scenario.RequestedTx(peer1, txid2, MAX_TIME); scenario.Check(peer1, {}, 0, 1, 1, "q18"); scenario.Check(peer2, {txid1}, 2, 0, 0, "q19"); // If peer2 now (normally) requests txid2, the existing request by peer1 // becomes COMPLETED. if (InsecureRandBool()) { scenario.AdvanceTime(RandomTime8s()); } scenario.RequestedTx(peer2, txid2, MAX_TIME); scenario.Check(peer1, {}, 0, 0, 2, "q20"); scenario.Check(peer2, {txid1}, 1, 1, 0, "q21"); // If peer2 goes offline, no viable announcements remain. scenario.DisconnectedPeer(peer2); scenario.Check(peer1, {}, 0, 0, 0, "q22"); scenario.Check(peer2, {}, 0, 0, 0, "q23"); } void TestInterleavedScenarios() { // Create a list of functions which add tests to scenarios. std::vector> builders; // Add instances of every test, for every configuration. for (int n = 0; n < 64; ++n) { builders.emplace_back([n](Scenario &scenario) { BuildRequestOrderTest(scenario, n & 3); }); builders.emplace_back( [n](Scenario &scenario) { BuildSingleTest(scenario, n & 31); }); builders.emplace_back( [n](Scenario &scenario) { BuildPriorityTest(scenario, n & 31); }); builders.emplace_back([n](Scenario &scenario) { BuildBigPriorityTest(scenario, (n & 7) + 1); }); builders.emplace_back( [](Scenario &scenario) { BuildTimeBackwardsTest(scenario); }); builders.emplace_back( [](Scenario &scenario) { BuildWeirdRequestsTest(scenario); }); } // Randomly shuffle all those functions. Shuffle(builders.begin(), builders.end(), g_insecure_rand_ctx); Runner runner; auto starttime = RandomTime1y(); // Construct many scenarios, and run (up to) 10 randomly-chosen tests // consecutively in each. while (builders.size()) { // Introduce some variation in the start time of each scenario, so they // don't all start off concurrently, but get a more random interleaving. auto scenario_start = starttime + RandomTime8s() + RandomTime8s() + RandomTime8s(); Scenario scenario(runner, scenario_start); for (int j = 0; builders.size() && j < 10; ++j) { builders.back()(scenario); builders.pop_back(); } } // Sort all the actions from all those scenarios chronologically, resulting // in the actions from distinct scenarios to become interleaved. Use // stable_sort so that actions from one scenario aren't reordered w.r.t. // each other. std::stable_sort( runner.actions.begin(), runner.actions.end(), [](const Action &a1, const Action &a2) { return a1.first < a2.first; }); // Run all actions from all scenarios, in order. for (auto &action : runner.actions) { action.second(); } BOOST_CHECK_EQUAL(runner.txrequest.Size(), 0U); BOOST_CHECK(runner.expired.empty()); } } // namespace BOOST_AUTO_TEST_CASE(TxRequestTest) { for (int i = 0; i < 5; ++i) { TestInterleavedScenarios(); } } BOOST_AUTO_TEST_SUITE_END() diff --git a/src/txrequest.cpp b/src/txrequest.cpp index 16decd32f..f8b77e471 100644 --- a/src/txrequest.cpp +++ b/src/txrequest.cpp @@ -1,936 +1,864 @@ // Copyright (c) 2020 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include -#include #include #include #include #include #include #include #include #include namespace { /** * The various states a (txid, peer) pair can be in. * * Note that CANDIDATE is split up into 3 substates (DELAYED, BEST, READY), * allowing more efficient implementation. Also note that the sorting order of * ByTxIdView relies on the specific order of values in this enum. * * Expected behaviour is: * - When first announced by a peer, the state is CANDIDATE_DELAYED until * reqtime is reached. * - Announcements that have reached their reqtime but not been requested will * be either CANDIDATE_READY or CANDIDATE_BEST. Neither of those has an * expiration time; they remain in that state until they're requested or no * longer needed. CANDIDATE_READY announcements are promoted to * CANDIDATE_BEST when they're the best one left. * - When requested, an announcement will be in state REQUESTED until expiry * is reached. * - If expiry is reached, or the peer replies to the request (either with * NOTFOUND or the tx), the state becomes COMPLETED. */ enum class State : uint8_t { /** A CANDIDATE announcement whose reqtime is in the future. */ CANDIDATE_DELAYED, /** * A CANDIDATE announcement that's not CANDIDATE_DELAYED or CANDIDATE_BEST. */ CANDIDATE_READY, /** * The best CANDIDATE for a given txid; only if there is no REQUESTED * announcement already for that txid. The CANDIDATE_BEST is the * highest-priority announcement among all CANDIDATE_READY (and _BEST) ones * for that txid. */ CANDIDATE_BEST, /** A REQUESTED announcement. */ REQUESTED, /** A COMPLETED announcement. */ COMPLETED, }; //! Type alias for sequence numbers. using SequenceNumber = uint64_t; /** * An announcement. This is the data we track for each txid that is announced * to us by each peer. */ struct Announcement { /** TxId that was announced. */ const uint256 m_txid; /** * For CANDIDATE_{DELAYED,BEST,READY} the reqtime; for REQUESTED the * expiry. */ std::chrono::microseconds m_time; /** What peer the request was from. */ const NodeId m_peer; /** What sequence number this announcement has. */ const SequenceNumber m_sequence : 60; /** Whether the request is preferred. */ const bool m_preferred : 1; /** * What state this announcement is in. * This is a uint8_t instead of a State to silence a GCC warning in versions * prior to 8.4 and 9.3. See * https://gcc.gnu.org/bugzilla/show_bug.cgi?id=61414 */ uint8_t m_state : 3; /** Convert m_state to a State enum. */ State GetState() const { return static_cast(m_state); } /** Convert a State enum to a uint8_t and store it in m_state. */ void SetState(State state) { m_state = static_cast(state); } /** * Whether this announcement is selected. There can be at most 1 selected * peer per txid. */ bool IsSelected() const { return GetState() == State::CANDIDATE_BEST || GetState() == State::REQUESTED; } /** Whether this announcement is waiting for a certain time to pass. */ bool IsWaiting() const { return GetState() == State::REQUESTED || GetState() == State::CANDIDATE_DELAYED; } /** * Whether this announcement can feasibly be selected if the current * IsSelected() one disappears. */ bool IsSelectable() const { return GetState() == State::CANDIDATE_READY || GetState() == State::CANDIDATE_BEST; } /** * Construct a new announcement from scratch, initially in * CANDIDATE_DELAYED state. */ Announcement(const uint256 &txid, NodeId peer, bool preferred, std::chrono::microseconds reqtime, SequenceNumber sequence) : m_txid(txid), m_time(reqtime), m_peer(peer), m_sequence(sequence), m_preferred(preferred), m_state(static_cast(State::CANDIDATE_DELAYED)) {} }; //! Type alias for priorities. using Priority = uint64_t; /** * A functor with embedded salt that computes priority of an announcement. * * Higher priorities are selected first. */ class PriorityComputer { const uint64_t m_k0, m_k1; public: explicit PriorityComputer(bool deterministic) : m_k0{deterministic ? 0 : GetRand(0xFFFFFFFFFFFFFFFF)}, m_k1{deterministic ? 0 : GetRand(0xFFFFFFFFFFFFFFFF)} {} Priority operator()(const uint256 &txid, NodeId peer, bool preferred) const { uint64_t low_bits = CSipHasher(m_k0, m_k1) .Write(txid.begin(), txid.size()) .Write(peer) .Finalize() >> 1; return low_bits | uint64_t{preferred} << 63; } Priority operator()(const Announcement &ann) const { return operator()(ann.m_txid, ann.m_peer, ann.m_preferred); } }; // Definitions for the 3 indexes used in the main data structure. // // Each index has a By* type to identify it, a By*View data type to represent // the view of announcement it is sorted by, and an By*ViewExtractor type to // convert an announcement into the By*View type. See // https://www.boost.org/doc/libs/1_58_0/libs/multi_index/doc/reference/key_extraction.html#key_extractors // for more information about the key extraction concept. // The ByPeer index is sorted by (peer, state == CANDIDATE_BEST, txid) // // Uses: // * Looking up existing announcements by peer/txid, by checking both (peer, // false, txid) and (peer, true, txid). // * Finding all CANDIDATE_BEST announcements for a given peer in // GetRequestable. struct ByPeer {}; using ByPeerView = std::tuple; struct ByPeerViewExtractor { using result_type = ByPeerView; result_type operator()(const Announcement &ann) const { return ByPeerView{ann.m_peer, ann.GetState() == State::CANDIDATE_BEST, ann.m_txid}; } }; // The ByTxId index is sorted by (txid, state, priority). // // Note: priority == 0 whenever state != CANDIDATE_READY. // // Uses: // * Deleting all announcements with a given txid in ForgetTxId. // * Finding the best CANDIDATE_READY to convert to CANDIDATE_BEST, when no // other CANDIDATE_READY or REQUESTED announcement exists for that txid. // * Determining when no more non-COMPLETED announcements for a given txid // exist, so the COMPLETED ones can be deleted. struct ByTxId {}; using ByTxIdView = std::tuple; class ByTxIdViewExtractor { const PriorityComputer &m_computer; public: ByTxIdViewExtractor(const PriorityComputer &computer) : m_computer(computer) {} using result_type = ByTxIdView; result_type operator()(const Announcement &ann) const { const Priority prio = (ann.GetState() == State::CANDIDATE_READY) ? m_computer(ann) : 0; return ByTxIdView{ann.m_txid, ann.GetState(), prio}; } }; enum class WaitState { //! Used for announcements that need efficient testing of "is their //! timestamp in the future?". FUTURE_EVENT, //! Used for announcements whose timestamp is not relevant. NO_EVENT, //! Used for announcements that need efficient testing of "is their //! timestamp in the past?". PAST_EVENT, }; WaitState GetWaitState(const Announcement &ann) { if (ann.IsWaiting()) { return WaitState::FUTURE_EVENT; } if (ann.IsSelectable()) { return WaitState::PAST_EVENT; } return WaitState::NO_EVENT; } // The ByTime index is sorted by (wait_state, time). // // All announcements with a timestamp in the future can be found by iterating // the index forward from the beginning. All announcements with a timestamp in // the past can be found by iterating the index backwards from the end. // // Uses: // * Finding CANDIDATE_DELAYED announcements whose reqtime has passed, and // REQUESTED announcements whose expiry has passed. // * Finding CANDIDATE_READY/BEST announcements whose reqtime is in the future // (when the clock time went backwards). struct ByTime {}; using ByTimeView = std::pair; struct ByTimeViewExtractor { using result_type = ByTimeView; result_type operator()(const Announcement &ann) const { return ByTimeView{GetWaitState(ann), ann.m_time}; } }; /** * Data type for the main data structure (Announcement objects with * ByPeer/ByTxId/ByTime indexes). */ using Index = boost::multi_index_container< Announcement, boost::multi_index::indexed_by< boost::multi_index::ordered_unique, ByPeerViewExtractor>, boost::multi_index::ordered_non_unique, ByTxIdViewExtractor>, boost::multi_index::ordered_non_unique, ByTimeViewExtractor>>>; /** Helper type to simplify syntax of iterator types. */ template using Iter = typename Index::index::type::iterator; /** Per-peer statistics object. */ struct PeerInfo { //! Total number of announcements for this peer. size_t m_total = 0; //! Number of COMPLETED announcements for this peer. size_t m_completed = 0; //! Number of REQUESTED announcements for this peer. size_t m_requested = 0; }; /** Per-txid statistics object. Only used for sanity checking. */ struct TxIdInfo { //! Number of CANDIDATE_DELAYED announcements for this txid. size_t m_candidate_delayed = 0; //! Number of CANDIDATE_READY announcements for this txid. size_t m_candidate_ready = 0; //! Number of CANDIDATE_BEST announcements for this txid (at most one). size_t m_candidate_best = 0; //! Number of REQUESTED announcements for this txid (at most one; mutually //! exclusive with CANDIDATE_BEST). size_t m_requested = 0; //! The priority of the CANDIDATE_BEST announcement if one exists, or max() //! otherwise. Priority m_priority_candidate_best = std::numeric_limits::max(); //! The highest priority of all CANDIDATE_READY announcements (or min() if //! none exist). Priority m_priority_best_candidate_ready = std::numeric_limits::min(); //! All peers we have an announcement for this txid for. std::vector m_peers; }; /** Compare two PeerInfo objects. Only used for sanity checking. */ bool operator==(const PeerInfo &a, const PeerInfo &b) { return std::tie(a.m_total, a.m_completed, a.m_requested) == std::tie(b.m_total, b.m_completed, b.m_requested); }; /** * (Re)compute the PeerInfo map from the index. Only used for sanity checking. */ std::unordered_map RecomputePeerInfo(const Index &index) { std::unordered_map ret; for (const Announcement &ann : index) { PeerInfo &info = ret[ann.m_peer]; ++info.m_total; info.m_requested += (ann.GetState() == State::REQUESTED); info.m_completed += (ann.GetState() == State::COMPLETED); } return ret; } /** Compute the TxIdInfo map. Only used for sanity checking. */ std::map ComputeTxIdInfo(const Index &index, const PriorityComputer &computer) { std::map ret; for (const Announcement &ann : index) { TxIdInfo &info = ret[ann.m_txid]; // Classify how many announcements of each state we have for this txid. info.m_candidate_delayed += (ann.GetState() == State::CANDIDATE_DELAYED); info.m_candidate_ready += (ann.GetState() == State::CANDIDATE_READY); info.m_candidate_best += (ann.GetState() == State::CANDIDATE_BEST); info.m_requested += (ann.GetState() == State::REQUESTED); // And track the priority of the best CANDIDATE_READY/CANDIDATE_BEST // announcements. if (ann.GetState() == State::CANDIDATE_BEST) { info.m_priority_candidate_best = computer(ann); } if (ann.GetState() == State::CANDIDATE_READY) { info.m_priority_best_candidate_ready = std::max(info.m_priority_best_candidate_ready, computer(ann)); } // Also keep track of which peers this txid has an announcement for // (so we can detect duplicates). info.m_peers.push_back(ann.m_peer); } return ret; } } // namespace /** Actual implementation for TxRequestTracker's data structure. */ class InvRequestTrackerImpl : public InvRequestTrackerImplInterface { //! The current sequence number. Increases for every announcement. This is //! used to sort txid returned by GetRequestable in announcement order. SequenceNumber m_current_sequence{0}; //! This tracker's priority computer. const PriorityComputer m_computer; //! This tracker's main data structure. See SanityCheck() for the invariants //! that apply to it. Index m_index; //! Map with this tracker's per-peer statistics. std::unordered_map m_peerinfo; public: void SanityCheck() const { // Recompute m_peerdata from m_index. This verifies the data in it as it // should just be caching statistics on m_index. It also verifies the // invariant that no PeerInfo announcements with m_total==0 exist. assert(m_peerinfo == RecomputePeerInfo(m_index)); // Calculate per-txid statistics from m_index, and validate // invariants. for (auto &item : ComputeTxIdInfo(m_index, m_computer)) { TxIdInfo &info = item.second; // Cannot have only COMPLETED peer (txid should have been forgotten // already) assert(info.m_candidate_delayed + info.m_candidate_ready + info.m_candidate_best + info.m_requested > 0); // Can have at most 1 CANDIDATE_BEST/REQUESTED peer assert(info.m_candidate_best + info.m_requested <= 1); // If there are any CANDIDATE_READY announcements, there must be // exactly one CANDIDATE_BEST or REQUESTED announcement. if (info.m_candidate_ready > 0) { assert(info.m_candidate_best + info.m_requested == 1); } // If there is both a CANDIDATE_READY and a CANDIDATE_BEST // announcement, the CANDIDATE_BEST one must be at least as good // (equal or higher priority) as the best CANDIDATE_READY. if (info.m_candidate_ready && info.m_candidate_best) { assert(info.m_priority_candidate_best >= info.m_priority_best_candidate_ready); } // No txid can have been announced by the same peer twice. std::sort(info.m_peers.begin(), info.m_peers.end()); assert( std::adjacent_find(info.m_peers.begin(), info.m_peers.end()) == info.m_peers.end()); } } void PostGetRequestableSanityCheck(std::chrono::microseconds now) const { for (const Announcement &ann : m_index) { if (ann.IsWaiting()) { // REQUESTED and CANDIDATE_DELAYED must have a time in the // future (they should have been converted to // COMPLETED/CANDIDATE_READY respectively). assert(ann.m_time > now); } else if (ann.IsSelectable()) { // CANDIDATE_READY and CANDIDATE_BEST cannot have a time in the // future (they should have remained CANDIDATE_DELAYED, or // should have been converted back to it if time went // backwards). assert(ann.m_time <= now); } } } private: //! Wrapper around Index::...::erase that keeps m_peerinfo up to date. template Iter Erase(Iter it) { auto peerit = m_peerinfo.find(it->m_peer); peerit->second.m_completed -= it->GetState() == State::COMPLETED; peerit->second.m_requested -= it->GetState() == State::REQUESTED; if (--peerit->second.m_total == 0) { m_peerinfo.erase(peerit); } return m_index.get().erase(it); } //! Wrapper around Index::...::modify that keeps m_peerinfo up to date. template void Modify(Iter it, Modifier modifier) { auto peerit = m_peerinfo.find(it->m_peer); peerit->second.m_completed -= it->GetState() == State::COMPLETED; peerit->second.m_requested -= it->GetState() == State::REQUESTED; m_index.get().modify(it, std::move(modifier)); peerit->second.m_completed += it->GetState() == State::COMPLETED; peerit->second.m_requested += it->GetState() == State::REQUESTED; } //! Convert a CANDIDATE_DELAYED announcement into a CANDIDATE_READY. If this //! makes it the new best CANDIDATE_READY (and no REQUESTED exists) and //! better than the CANDIDATE_BEST (if any), it becomes the new //! CANDIDATE_BEST. void PromoteCandidateReady(Iter it) { assert(it != m_index.get().end()); assert(it->GetState() == State::CANDIDATE_DELAYED); // Convert CANDIDATE_DELAYED to CANDIDATE_READY first. Modify(it, [](Announcement &ann) { ann.SetState(State::CANDIDATE_READY); }); // The following code relies on the fact that the ByTxId is sorted by // txid, and then by state (first _DELAYED, then _READY, then // _BEST/REQUESTED). Within the _READY announcements, the best one // (highest priority) comes last. Thus, if an existing _BEST exists for // the same txid that this announcement may be preferred over, it must // immediately follow the newly created _READY. auto it_next = std::next(it); if (it_next == m_index.get().end() || it_next->m_txid != it->m_txid || it_next->GetState() == State::COMPLETED) { // This is the new best CANDIDATE_READY, and there is no // IsSelected() announcement for this txid already. Modify(it, [](Announcement &ann) { ann.SetState(State::CANDIDATE_BEST); }); } else if (it_next->GetState() == State::CANDIDATE_BEST) { Priority priority_old = m_computer(*it_next); Priority priority_new = m_computer(*it); if (priority_new > priority_old) { // There is a CANDIDATE_BEST announcement already, but this one // is better. Modify(it_next, [](Announcement &ann) { ann.SetState(State::CANDIDATE_READY); }); Modify(it, [](Announcement &ann) { ann.SetState(State::CANDIDATE_BEST); }); } } } //! Change the state of an announcement to something non-IsSelected(). If it //! was IsSelected(), the next best announcement will be marked //! CANDIDATE_BEST. void ChangeAndReselect(Iter it, State new_state) { assert(new_state == State::COMPLETED || new_state == State::CANDIDATE_DELAYED); assert(it != m_index.get().end()); if (it->IsSelected() && it != m_index.get().begin()) { auto it_prev = std::prev(it); // The next best CANDIDATE_READY, if any, immediately precedes the // REQUESTED or CANDIDATE_BEST announcement in the ByTxId index. if (it_prev->m_txid == it->m_txid && it_prev->GetState() == State::CANDIDATE_READY) { // If one such CANDIDATE_READY exists (for this txid), convert // it to CANDIDATE_BEST. Modify(it_prev, [](Announcement &ann) { ann.SetState(State::CANDIDATE_BEST); }); } } Modify( it, [new_state](Announcement &ann) { ann.SetState(new_state); }); } //! Check if 'it' is the only announcement for a given txid that isn't //! COMPLETED. bool IsOnlyNonCompleted(Iter it) { assert(it != m_index.get().end()); // Not allowed to call this on COMPLETED announcements. assert(it->GetState() != State::COMPLETED); // This announcement has a predecessor that belongs to the same txid. // Due to ordering, and the fact that 'it' is not COMPLETED, its // predecessor cannot be COMPLETED here. if (it != m_index.get().begin() && std::prev(it)->m_txid == it->m_txid) { return false; } // This announcement has a successor that belongs to the same txid, // and is not COMPLETED. if (std::next(it) != m_index.get().end() && std::next(it)->m_txid == it->m_txid && std::next(it)->GetState() != State::COMPLETED) { return false; } return true; } /** * Convert any announcement to a COMPLETED one. If there are no * non-COMPLETED announcements left for this txid, they are deleted. If * this was a REQUESTED announcement, and there are other CANDIDATEs left, * the best one is made CANDIDATE_BEST. Returns whether the announcement * still exists. */ bool MakeCompleted(Iter it) { assert(it != m_index.get().end()); // Nothing to be done if it's already COMPLETED. if (it->GetState() == State::COMPLETED) { return true; } if (IsOnlyNonCompleted(it)) { // This is the last non-COMPLETED announcement for this txid. // Delete all. uint256 txid = it->m_txid; do { it = Erase(it); } while (it != m_index.get().end() && it->m_txid == txid); return false; } // Mark the announcement COMPLETED, and select the next best // announcement (the first CANDIDATE_READY) if needed. ChangeAndReselect(it, State::COMPLETED); return true; } //! Make the data structure consistent with a given point in time: //! - REQUESTED annoucements with expiry <= now are turned into COMPLETED. //! - CANDIDATE_DELAYED announcements with reqtime <= now are turned into //! CANDIDATE_{READY,BEST}. //! - CANDIDATE_{READY,BEST} announcements with reqtime > now are turned //! into CANDIDATE_DELAYED. void SetTimePoint(std::chrono::microseconds now, ClearExpiredFun clearExpired, EmplaceExpiredFun emplaceExpired) { clearExpired(); // Iterate over all CANDIDATE_DELAYED and REQUESTED from old to new, as // long as they're in the past, and convert them to CANDIDATE_READY and // COMPLETED respectively. while (!m_index.empty()) { auto it = m_index.get().begin(); if (it->GetState() == State::CANDIDATE_DELAYED && it->m_time <= now) { PromoteCandidateReady(m_index.project(it)); } else if (it->GetState() == State::REQUESTED && it->m_time <= now) { emplaceExpired(it->m_peer, it->m_txid); MakeCompleted(m_index.project(it)); } else { break; } } while (!m_index.empty()) { // If time went backwards, we may need to demote CANDIDATE_BEST and // CANDIDATE_READY announcements back to CANDIDATE_DELAYED. This is // an unusual edge case, and unlikely to matter in production. // However, it makes it much easier to specify and test // TxRequestTracker::Impl's behaviour. auto it = std::prev(m_index.get().end()); if (it->IsSelectable() && it->m_time > now) { ChangeAndReselect(m_index.project(it), State::CANDIDATE_DELAYED); } else { break; } } } public: InvRequestTrackerImpl(bool deterministic) : m_computer(deterministic), // Explicitly initialize m_index as we need to pass a reference to // m_computer to ByTxHashViewExtractor. m_index(boost::make_tuple( boost::make_tuple(ByPeerViewExtractor(), std::less()), boost::make_tuple(ByTxIdViewExtractor(m_computer), std::less()), boost::make_tuple(ByTimeViewExtractor(), std::less()))) {} // Disable copying and assigning (a default copy won't work due the stateful // ByTxIdViewExtractor). InvRequestTrackerImpl(const InvRequestTrackerImpl &) = delete; InvRequestTrackerImpl &operator=(const InvRequestTrackerImpl &) = delete; ~InvRequestTrackerImpl() = default; void DisconnectedPeer(NodeId peer) { auto &index = m_index.get(); auto it = index.lower_bound(ByPeerView{peer, false, uint256(uint256::ZERO)}); while (it != index.end() && it->m_peer == peer) { // Check what to continue with after this iteration. 'it' will be // deleted in what follows, so we need to decide what to continue // with afterwards. There are a number of cases to consider: // - std::next(it) is end() or belongs to a different peer. In that // case, this is the last iteration of the loop (denote this by // setting it_next to end()). // - 'it' is not the only non-COMPLETED announcement for its txid. // This means it will be deleted, but no other Announcement // objects will be modified. Continue with std::next(it) if it // belongs to the same peer, but decide this ahead of time (as // 'it' may change position in what follows). // - 'it' is the only non-COMPLETED announcement for its txid. This // means it will be deleted along with all other announcements for // the same txid - which may include std::next(it). However, other // than 'it', no announcements for the same peer can be affected // (due to (peer, txid) uniqueness). In other words, the situation // where std::next(it) is deleted can only occur if std::next(it) // belongs to a different peer but the same txid as 'it'. This is // covered by the first bulletpoint already, and we'll have set // it_next to end(). auto it_next = (std::next(it) == index.end() || std::next(it)->m_peer != peer) ? index.end() : std::next(it); // If the announcement isn't already COMPLETED, first make it // COMPLETED (which will mark other CANDIDATEs as CANDIDATE_BEST, or // delete all of a txid's announcements if no non-COMPLETED ones are // left). if (MakeCompleted(m_index.project(it))) { // Then actually delete the announcement (unless it was already // deleted by MakeCompleted). Erase(it); } it = it_next; } } void ForgetTxId(const uint256 &txid) { auto it = m_index.get().lower_bound( ByTxIdView{txid, State::CANDIDATE_DELAYED, 0}); while (it != m_index.get().end() && it->m_txid == txid) { it = Erase(it); } } void ReceivedInv(NodeId peer, const uint256 &txid, bool preferred, std::chrono::microseconds reqtime) { // Bail out if we already have a CANDIDATE_BEST announcement for this // (txid, peer) combination. The case where there is a // non-CANDIDATE_BEST announcement already will be caught by the // uniqueness property of the ByPeer index when we try to emplace the // new object below. if (m_index.get().count(ByPeerView{peer, true, txid})) { return; } // Try creating the announcement with CANDIDATE_DELAYED state (which // will fail due to the uniqueness of the ByPeer index if a // non-CANDIDATE_BEST announcement already exists with the same txid // and peer). Bail out in that case. auto ret = m_index.get().emplace(txid, peer, preferred, reqtime, m_current_sequence); if (!ret.second) { return; } // Update accounting metadata. ++m_peerinfo[peer].m_total; ++m_current_sequence; } //! Find the TxIds to request now from peer. std::vector GetRequestable(NodeId peer, std::chrono::microseconds now, ClearExpiredFun clearExpired, EmplaceExpiredFun emplaceExpired) { // Move time. SetTimePoint(now, clearExpired, emplaceExpired); // Find all CANDIDATE_BEST announcements for this peer. std::vector selected; auto it_peer = m_index.get().lower_bound( ByPeerView{peer, true, uint256(uint256::ZERO)}); while (it_peer != m_index.get().end() && it_peer->m_peer == peer && it_peer->GetState() == State::CANDIDATE_BEST) { selected.emplace_back(&*it_peer); ++it_peer; } // Sort by sequence number. std::sort(selected.begin(), selected.end(), [](const Announcement *a, const Announcement *b) { return a->m_sequence < b->m_sequence; }); // Convert to TxId and return. std::vector ret; ret.reserve(selected.size()); std::transform(selected.begin(), selected.end(), std::back_inserter(ret), [](const Announcement *ann) { return ann->m_txid; }); return ret; } void RequestedTx(NodeId peer, const uint256 &txid, std::chrono::microseconds expiry) { auto it = m_index.get().find(ByPeerView{peer, true, txid}); if (it == m_index.get().end()) { // There is no CANDIDATE_BEST announcement, look for a _READY or // _DELAYED instead. If the caller only ever invokes RequestedTx // with the values returned by GetRequestable, and no other // non-const functions other than ForgetTxHash and GetRequestable in // between, this branch will never execute (as txids returned by // GetRequestable always correspond to CANDIDATE_BEST // announcements). it = m_index.get().find(ByPeerView{peer, false, txid}); if (it == m_index.get().end() || (it->GetState() != State::CANDIDATE_DELAYED && it->GetState() != State::CANDIDATE_READY)) { // There is no CANDIDATE announcement tracked for this peer, so // we have nothing to do. Either this txid wasn't tracked at // all (and the caller should have called ReceivedInv), or it // was already requested and/or completed for other reasons and // this is just a superfluous RequestedTx call. return; } // Look for an existing CANDIDATE_BEST or REQUESTED with the same // txid. We only need to do this if the found announcement had a // different state than CANDIDATE_BEST. If it did, invariants // guarantee that no other CANDIDATE_BEST or REQUESTED can exist. auto it_old = m_index.get().lower_bound( ByTxIdView{txid, State::CANDIDATE_BEST, 0}); if (it_old != m_index.get().end() && it_old->m_txid == txid) { if (it_old->GetState() == State::CANDIDATE_BEST) { // The data structure's invariants require that there can be // at most one CANDIDATE_BEST or one REQUESTED announcement // per txid (but not both simultaneously), so we have to // convert any existing CANDIDATE_BEST to another // CANDIDATE_* when constructing another REQUESTED. It // doesn't matter whether we pick CANDIDATE_READY or // _DELAYED here, as SetTimePoint() will correct it at // GetRequestable() time. If time only goes forward, it will // always be _READY, so pick that to avoid extra work in // SetTimePoint(). Modify(it_old, [](Announcement &ann) { ann.SetState(State::CANDIDATE_READY); }); } else if (it_old->GetState() == State::REQUESTED) { // As we're no longer waiting for a response to the previous // REQUESTED announcement, convert it to COMPLETED. This // also helps guaranteeing progress. Modify(it_old, [](Announcement &ann) { ann.SetState(State::COMPLETED); }); } } } Modify(it, [expiry](Announcement &ann) { ann.SetState(State::REQUESTED); ann.m_time = expiry; }); } void ReceivedResponse(NodeId peer, const uint256 &txid) { // We need to search the ByPeer index for both (peer, false, txid) and // (peer, true, txid). auto it = m_index.get().find(ByPeerView{peer, false, txid}); if (it == m_index.get().end()) { it = m_index.get().find(ByPeerView{peer, true, txid}); } if (it != m_index.get().end()) { MakeCompleted(m_index.project(it)); } } size_t CountInFlight(NodeId peer) const { auto it = m_peerinfo.find(peer); if (it != m_peerinfo.end()) { return it->second.m_requested; } return 0; } size_t CountCandidates(NodeId peer) const { auto it = m_peerinfo.find(peer); if (it != m_peerinfo.end()) { return it->second.m_total - it->second.m_requested - it->second.m_completed; } return 0; } size_t Count(NodeId peer) const { auto it = m_peerinfo.find(peer); if (it != m_peerinfo.end()) { return it->second.m_total; } return 0; } //! Count how many announcements are being tracked in total across all peers //! and transactions. size_t Size() const { return m_index.size(); } uint64_t ComputePriority(const uint256 &txid, NodeId peer, bool preferred) const { // Return Priority as a uint64_t as Priority is internal. return uint64_t{m_computer(txid, peer, preferred)}; } }; std::unique_ptr InvRequestTrackerImplInterface::BuildImpl(bool deterministic) { return std::make_unique(deterministic); } - -TxRequestTracker::TxRequestTracker(bool deterministic) - : m_impl{InvRequestTrackerImplInterface::BuildImpl(deterministic)} {} - -TxRequestTracker::~TxRequestTracker() = default; - -void TxRequestTracker::ForgetTxId(const TxId &txid) { - m_impl->ForgetTxId(txid); -} -void TxRequestTracker::DisconnectedPeer(NodeId peer) { - m_impl->DisconnectedPeer(peer); -} -size_t TxRequestTracker::CountInFlight(NodeId peer) const { - return m_impl->CountInFlight(peer); -} -size_t TxRequestTracker::CountCandidates(NodeId peer) const { - return m_impl->CountCandidates(peer); -} -size_t TxRequestTracker::Count(NodeId peer) const { - return m_impl->Count(peer); -} -size_t TxRequestTracker::Size() const { - return m_impl->Size(); -} -void TxRequestTracker::SanityCheck() const { - m_impl->SanityCheck(); -} - -void TxRequestTracker::PostGetRequestableSanityCheck( - std::chrono::microseconds now) const { - m_impl->PostGetRequestableSanityCheck(now); -} - -void TxRequestTracker::ReceivedInv(NodeId peer, const TxId &txid, - bool preferred, - std::chrono::microseconds reqtime) { - m_impl->ReceivedInv(peer, txid, preferred, reqtime); -} - -void TxRequestTracker::RequestedTx(NodeId peer, const TxId &txid, - std::chrono::microseconds expiry) { - m_impl->RequestedTx(peer, txid, expiry); -} - -void TxRequestTracker::ReceivedResponse(NodeId peer, const TxId &txid) { - m_impl->ReceivedResponse(peer, txid); -} - -std::vector TxRequestTracker::GetRequestable( - NodeId peer, std::chrono::microseconds now, - std::vector> *expired) { - InvRequestTrackerImplInterface::ClearExpiredFun clearExpired = [expired]() { - if (expired) { - expired->clear(); - } - }; - InvRequestTrackerImplInterface::EmplaceExpiredFun emplaceExpired = - [expired](const NodeId &nodeid, const uint256 &txid) { - if (expired) { - expired->emplace_back(nodeid, TxId(txid)); - } - }; - std::vector hashes = - m_impl->GetRequestable(peer, now, clearExpired, emplaceExpired); - return std::vector(hashes.begin(), hashes.end()); -} - -uint64_t TxRequestTracker::ComputePriority(const TxId &txid, NodeId peer, - bool preferred) const { - return m_impl->ComputePriority(txid, peer, preferred); -} diff --git a/src/txrequest.h b/src/txrequest.h index 8f0690935..4f9315bb2 100644 --- a/src/txrequest.h +++ b/src/txrequest.h @@ -1,299 +1,337 @@ // Copyright (c) 2020 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_TXREQUEST_H #define BITCOIN_TXREQUEST_H #include // For NodeId -#include #include -#include - #include +#include +#include /** * Data structure to keep track of, and schedule, transaction downloads from * peers. * * === Specification === * * We keep track of which peers have announced which transactions, and use that * to determine which requests should go to which peer, when, and in what order. * * The following information is tracked per peer/tx combination * ("announcement"): * - Which peer announced it (through their NodeId) * - The txid of the transaction * - What the earliest permitted time is that that transaction can be requested * from that peer (called "reqtime"). * - Whether it's from a "preferred" peer or not. Which announcements get this * flag is determined by the caller, but this is designed for outbound peers, * or other peers that we have a higher level of trust in. Even when the * peers' preferredness changes, the preferred flag of existing announcements * from that peer won't change. * - Whether or not the transaction was requested already, and if so, when it * times out (called "expiry"). * - Whether or not the transaction request failed already (timed out, or * invalid transaction or NOTFOUND was received). * * Transaction requests are then assigned to peers, following these rules: * * - No transaction is requested as long as another request for the same txid * is outstanding (it needs to fail first by passing expiry, or a NOTFOUND or * invalid transaction has to be received for it). * * Rationale: to avoid wasting bandwidth on multiple copies of the same * transaction. * * - The same transaction is never requested twice from the same peer, unless * the announcement was forgotten in between, and re-announced. Announcements * are forgotten only: * - If a peer goes offline, all its announcements are forgotten. * - If a transaction has been successfully received, or is otherwise no * longer needed, the caller can call ForgetTxId, which removes all * announcements across all peers with the specified txid. * - If for a given txid only already-failed announcements remain, they are * all forgotten. * * Rationale: giving a peer multiple chances to announce a transaction would * allow them to bias requests in their favor, worsening * transaction censoring attacks. The flip side is that as long as * an attacker manages to prevent us from receiving a transaction, * failed announcements (including those from honest peers) will * linger longer, increasing memory usage somewhat. The impact of * this is limited by imposing a cap on the number of tracked * announcements per peer. As failed requests in response to * announcements from honest peers should be rare, this almost * solely hinders attackers. * Transaction censoring attacks can be done by announcing * transactions quickly while not answering requests for them. See * https://allquantor.at/blockchainbib/pdf/miller2015topology.pdf * for more information. * * - Transactions are not requested from a peer until its reqtime has passed. * * Rationale: enable the calling code to define a delay for less-than-ideal * peers, so that (presumed) better peers have a chance to give * their announcement first. * * - If multiple viable candidate peers exist according to the above rules, pick * a peer as follows: * * - If any preferred peers are available, non-preferred peers are not * considered for what follows. * * Rationale: preferred peers are more trusted by us, so are less likely to * be under attacker control. * * - Pick a uniformly random peer among the candidates. * * Rationale: random assignments are hard to influence for attackers. * * Together these rules strike a balance between being fast in non-adverserial * conditions and minimizing susceptibility to censorship attacks. An attacker * that races the network: * - Will be unsuccessful if all preferred connections are honest (and there is * at least one preferred connection). * - If there are P preferred connections of which Ph>=1 are honest, the * attacker can delay us from learning about a transaction by k expiration * periods, where k ~ 1 + NHG(N=P-1,K=P-Ph-1,r=1), which has mean P/(Ph+1) * (where NHG stands for Negative Hypergeometric distribution). The "1 +" is * due to the fact that the attacker can be the first to announce through a * preferred connection in this scenario, which very likely means they get the * first request. * - If all P preferred connections are to the attacker, and there are NP * non-preferred connections of which NPh>=1 are honest, where we assume that * the attacker can disconnect and reconnect those connections, the * distribution becomes k ~ P + NB(p=1-NPh/NP,r=1) (where NB stands for * Negative Binomial distribution), which has mean P-1+NP/NPh. * * Complexity: * - Memory usage is proportional to the total number of tracked announcements * (Size()) plus the number of peers with a nonzero number of tracked * announcements. * - CPU usage is generally logarithmic in the total number of tracked * announcements, plus the number of announcements affected by an operation * (amortized O(1) per announcement). */ // Avoid littering this header file with implementation details. class InvRequestTrackerImplInterface { - friend class TxRequestTracker; + template friend class TxRequestTracker; // The base class is responsible for building the child implementation. // This is a hack that allows for hiding the concrete implementation details // from the callsite. static std::unique_ptr BuildImpl(bool deterministic); public: using ClearExpiredFun = const std::function &; using EmplaceExpiredFun = const std::function &; virtual ~InvRequestTrackerImplInterface() = default; virtual void ReceivedInv(NodeId peer, const uint256 &txid, bool preferred, std::chrono::microseconds reqtime) = 0; virtual void DisconnectedPeer(NodeId peer) = 0; virtual void ForgetTxId(const uint256 &txid) = 0; virtual std::vector GetRequestable(NodeId peer, std::chrono::microseconds now, ClearExpiredFun clearExpired, EmplaceExpiredFun emplaceExpired) = 0; virtual void RequestedTx(NodeId peer, const uint256 &txid, std::chrono::microseconds expiry) = 0; virtual void ReceivedResponse(NodeId peer, const uint256 &txid) = 0; virtual size_t CountInFlight(NodeId peer) const = 0; virtual size_t CountCandidates(NodeId peer) const = 0; virtual size_t Count(NodeId peer) const = 0; virtual size_t Size() const = 0; virtual uint64_t ComputePriority(const uint256 &txid, NodeId peer, bool preferred) const = 0; virtual void SanityCheck() const = 0; virtual void PostGetRequestableSanityCheck(std::chrono::microseconds now) const = 0; }; -class TxRequestTracker { +template class TxRequestTracker { + /* + * Only uint256-based InvId types are supported for now. + * FIXME: use a template constraint when C++20 is available. + */ + static_assert(std::is_base_of::value, + "InvRequestTracker inv id type should be uint256 or derived"); + const std::unique_ptr m_impl; public: //! Construct a TxRequestTracker. - explicit TxRequestTracker(bool deterministic = false); - ~TxRequestTracker(); + + explicit TxRequestTracker(bool deterministic = false) + : m_impl{InvRequestTrackerImplInterface::BuildImpl(deterministic)} {} + ~TxRequestTracker() = default; // Conceptually, the data structure consists of a collection of // "announcements", one for each peer/txid combination: // // - CANDIDATE announcements represent transactions that were announced by a // peer, and that become available for download after their reqtime has // passed. // // - REQUESTED announcements represent transactions that have been // requested, and which we're awaiting a response for from that peer. // Their expiry value determines when the request times out. // // - COMPLETED announcements represent transactions that have been requested // from a peer, and a NOTFOUND or a transaction was received in response // (valid or not), or they timed out. They're only kept around to prevent // requesting them again. If only COMPLETED announcements for a given txid // remain (so no CANDIDATE or REQUESTED ones), all of them are deleted // (this is an invariant, and maintained by all operations below). // // The operations below manipulate the data structure. /** * Adds a new CANDIDATE announcement. * * Does nothing if one already exists for that (txid, peer) combination * (whether it's CANDIDATE, REQUESTED, or COMPLETED). */ - void ReceivedInv(NodeId peer, const TxId &txid, bool preferred, - std::chrono::microseconds reqtime); + void ReceivedInv(NodeId peer, const InvId &txid, bool preferred, + std::chrono::microseconds reqtime) { + m_impl->ReceivedInv(peer, txid, preferred, reqtime); + } /** * Deletes all announcements for a given peer. * * It should be called when a peer goes offline. */ - void DisconnectedPeer(NodeId peer); + void DisconnectedPeer(NodeId peer) { m_impl->DisconnectedPeer(peer); } /** * Deletes all announcements for a given txid. * * This should be called when a transaction is no longer needed. The caller * should ensure that new announcements for the same txid will not trigger * new ReceivedInv calls, at least in the short term after this call. */ - void ForgetTxId(const TxId &txid); + void ForgetTxId(const InvId &txid) { m_impl->ForgetTxId(txid); } /** * Find the txids to request now from peer. * * It does the following: * - Convert all REQUESTED announcements (for all txids/peers) with * (expiry <= now) to COMPLETED ones. These are returned in expired, if * non-nullptr. * - Requestable announcements are selected: CANDIDATE announcements from * the specified peer with (reqtime <= now) for which no existing * REQUESTED announcement with the same txid from a different peer * exists, and for which the specified peer is the best choice among all * (reqtime <= now) CANDIDATE announcements with the same txid (subject * to preferredness rules, and tiebreaking using a deterministic salted * hash of peer and txid). * - The selected announcements are returned in announcement order (even * if multiple were added at the same time, or when the clock went * backwards while they were being added). This is done to minimize * disruption from dependent transactions being requested out of order: * if multiple dependent transactions are announced simultaneously by one * peer, and end up being requested from them, the requests will happen * in announcement order. */ - std::vector + std::vector GetRequestable(NodeId peer, std::chrono::microseconds now, - std::vector> *expired = nullptr); + std::vector> *expired) { + InvRequestTrackerImplInterface::ClearExpiredFun clearExpired = + [expired]() { + if (expired) { + expired->clear(); + } + }; + InvRequestTrackerImplInterface::EmplaceExpiredFun emplaceExpired = + [expired](const NodeId &nodeid, const uint256 &txid) { + if (expired) { + expired->emplace_back(nodeid, InvId(txid)); + } + }; + std::vector hashes = + m_impl->GetRequestable(peer, now, clearExpired, emplaceExpired); + return std::vector(hashes.begin(), hashes.end()); + } /** * Marks a transaction as requested, with a specified expiry. * * If no CANDIDATE announcement for the provided peer and txid exists, this * call has no effect. Otherwise: * - That announcement is converted to REQUESTED. * - If any other REQUESTED announcement for the same txid already * existed, it means an unexpected request was made (GetRequestable will * never advise doing so). In this case it is converted to COMPLETED, as * we're no longer waiting for a response to it. */ - void RequestedTx(NodeId peer, const TxId &txid, - std::chrono::microseconds expiry); + void RequestedTx(NodeId peer, const InvId &txid, + std::chrono::microseconds expiry) { + m_impl->RequestedTx(peer, txid, expiry); + } /** * Converts a CANDIDATE or REQUESTED announcement to a COMPLETED one. If no * such announcement exists for the provided peer and txid, nothing happens. * * It should be called whenever a transaction or NOTFOUND was received from * a peer. When the transaction is not needed entirely anymore, ForgetTxId * should be called instead of, or in addition to, this call. */ - void ReceivedResponse(NodeId peer, const TxId &txid); + void ReceivedResponse(NodeId peer, const InvId &txid) { + m_impl->ReceivedResponse(peer, txid); + } // The operations below inspect the data structure. /** Count how many REQUESTED announcements a peer has. */ - size_t CountInFlight(NodeId peer) const; + size_t CountInFlight(NodeId peer) const { + return m_impl->CountInFlight(peer); + } /** Count how many CANDIDATE announcements a peer has. */ - size_t CountCandidates(NodeId peer) const; + size_t CountCandidates(NodeId peer) const { + return m_impl->CountCandidates(peer); + } /** * Count how many announcements a peer has (REQUESTED, CANDIDATE, and * COMPLETED combined). */ - size_t Count(NodeId peer) const; + size_t Count(NodeId peer) const { return m_impl->Count(peer); } /** * Count how many announcements are being tracked in total across all peers * and transaction hashes. */ - size_t Size() const; + size_t Size() const { return m_impl->Size(); } /** Access to the internal priority computation (testing only) */ - uint64_t ComputePriority(const TxId &txid, NodeId peer, - bool preferred) const; + uint64_t ComputePriority(const InvId &txid, NodeId peer, + bool preferred) const { + return m_impl->ComputePriority(txid, peer, preferred); + } /** Run internal consistency check (testing only). */ - void SanityCheck() const; + void SanityCheck() const { m_impl->SanityCheck(); } /** * Run a time-dependent internal consistency check (testing only). * * This can only be called immediately after GetRequestable, with the same * 'now' parameter. */ - void PostGetRequestableSanityCheck(std::chrono::microseconds now) const; + void PostGetRequestableSanityCheck(std::chrono::microseconds now) const { + m_impl->PostGetRequestableSanityCheck(now); + } }; #endif // BITCOIN_TXREQUEST_H