diff --git a/src/net_processing.h b/src/net_processing.h --- a/src/net_processing.h +++ b/src/net_processing.h @@ -9,6 +9,7 @@ #include #include #include +#include #include extern RecursiveMutex cs_main; @@ -188,6 +189,15 @@ void SendBlockTransactions(CNode &pfrom, const CBlock &block, const BlockTransactionsRequest &req); + /** + * Register with TxRequestTracker that an INV has been received from a + * peer. The announcement parameters are decided in PeerManager and then + * passed to TxRequestTracker. + */ + void AddTxAnnouncement(const CNode &node, const TxId &txid, + std::chrono::microseconds current_time) + EXCLUSIVE_LOCKS_REQUIRED(::cs_main); + const CChainParams &m_chainparams; CConnman &m_connman; /** @@ -197,6 +207,7 @@ BanMan *const m_banman; ChainstateManager &m_chainman; CTxMemPool &m_mempool; + TxRequestTracker m_txrequest GUARDED_BY(::cs_main); //! Next time to check for stale tip int64_t m_stale_tip_check_time; diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -102,34 +102,27 @@ static_assert(MAX_PROTOCOL_MESSAGE_LENGTH > MAX_INV_SZ * sizeof(CInv), "Max protocol message length must be greater than largest " "possible INV message"); -/** Maximum number of in-flight transactions from a peer */ -static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100; +/** + * Maximum number of in-flight transaction requests from a peer. It is not a + * hard limit, but the threshold at which point the OVERLOADED_PEER_TX_DELAY + * kicks in. + */ +static constexpr int32_t MAX_PEER_TX_REQUEST_IN_FLIGHT = 100; /** Maximum number of announced transactions from a peer */ static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; -/** How many microseconds to delay requesting transactions from inbound peers */ -static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{ - std::chrono::seconds{2}}; +/** How long to delay requesting transactions from non-preferred peers */ +static constexpr auto NONPREF_PEER_TX_DELAY = std::chrono::seconds{2}; +/** + * How long to delay requesting transactions from overloaded peers (see + * MAX_PEER_TX_REQUEST_IN_FLIGHT). + */ +static constexpr auto OVERLOADED_PEER_TX_DELAY = std::chrono::seconds{2}; /** * How long to wait (in microseconds) before downloading a transaction from an * additional peer. */ static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{ std::chrono::seconds{60}}; -/** - * Maximum delay (in microseconds) for transaction requests to avoid biasing - * some peers over others. - */ -static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{ - std::chrono::seconds{2}}; -/** - * How long to wait (in microseconds) before expiring an in-flight getdata - * request to a peer. - */ -static constexpr std::chrono::microseconds TX_EXPIRY_INTERVAL{ - GETDATA_TX_INTERVAL * 10}; -static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, - "To preserve security, MAX_GETDATA_RANDOM_DELAY should not " - "exceed INBOUND_PEER_DELAY"); /** * Limit to avoid sending big packets. Not used in processing incoming GETDATA * for compatibility. @@ -447,70 +440,6 @@ //! Time of last new block announcement int64_t m_last_block_announcement; - /* - * State associated with transaction download. - * - * Tx download algorithm: - * - * When inv comes in, queue up (process_time, txid) inside the peer's - * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer - * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS). - * - * The process_time for a transaction is set to nNow for outbound peers, - * nNow + 2 seconds for inbound peers. This is the time at which we'll - * consider trying to request the transaction from the peer in - * SendMessages(). The delay for inbound peers is to allow outbound peers - * a chance to announce before we request from inbound peers, to prevent - * an adversary from using inbound connections to blind us to a - * transaction (InvBlock). - * - * When we call SendMessages() for a given peer, - * we will loop over the transactions in m_tx_process_time, looking - * at the transactions whose process_time <= nNow. We'll request each - * such transaction that we don't have already and that hasn't been - * requested from another peer recently, up until we hit the - * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update - * g_already_asked_for for each requested txid, storing the time of the - * GETDATA request. We use g_already_asked_for to coordinate transaction - * requests amongst our peers. - * - * For transactions that we still need but we have already recently - * requested from some other peer, we'll reinsert (process_time, txid) - * back into the peer's m_tx_process_time at the point in the future at - * which the most recent GETDATA request would time out (ie - * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for). - * We add an additional delay for inbound peers, again to prefer - * attempting download from outbound peers first. - * We also add an extra small random delay up to 2 seconds - * to avoid biasing some peers over others. (e.g., due to fixed ordering - * of peer processing in ThreadMessageHandler). - * - * When we receive a transaction from a peer, we remove the txid from the - * peer's m_tx_in_flight set and from their recently announced set - * (m_tx_announced). We also clear g_already_asked_for for that entry, so - * that if somehow the transaction is not accepted but also not added to - * the reject filter, then we will eventually redownload from other - * peers. - */ - struct TxDownloadState { - /** - * Track when to attempt download of announced transactions (process - * time in micros -> txid) - */ - std::multimap m_tx_process_time; - - //! Store all the transactions a peer has recently announced - std::set m_tx_announced; - - //! Store transactions which were requested by us, with timestamp - std::map m_tx_in_flight; - - //! Periodically check for stuck getdata requests - std::chrono::microseconds m_check_expiry_timer{0}; - }; - - TxDownloadState m_tx_download; - struct AvalancheState { std::chrono::time_point last_poll; }; @@ -553,11 +482,6 @@ } }; -// Keeps track of the time (in microseconds) when transactions were requested -// last time -limitedmap - g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ); - /** Map maintaining per-node state. */ static std::map mapNodeState GUARDED_BY(cs_main); @@ -990,79 +914,41 @@ } } -void EraseTxRequest(const TxId &txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - g_already_asked_for.erase(txid); -} - -std::chrono::microseconds GetTxRequestTime(const TxId &txid) - EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - auto it = g_already_asked_for.find(txid); - if (it != g_already_asked_for.end()) { - return it->second; - } - return {}; -} - -void UpdateTxRequestTime(const TxId &txid, - std::chrono::microseconds request_time) - EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - auto it = g_already_asked_for.find(txid); - if (it == g_already_asked_for.end()) { - g_already_asked_for.insert(std::make_pair(txid, request_time)); - } else { - g_already_asked_for.update(it, request_time); - } -} +} // namespace -std::chrono::microseconds -CalculateTxGetDataTime(const TxId &txid, std::chrono::microseconds current_time, - bool use_inbound_delay) - EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - std::chrono::microseconds process_time; - const auto last_request_time = GetTxRequestTime(txid); - // First time requesting this tx - if (last_request_time.count() == 0) { - process_time = current_time; - } else { - // Randomize the delay to avoid biasing some peers over others (such as - // due to fixed ordering of peer processing in ThreadMessageHandler) - process_time = last_request_time + GETDATA_TX_INTERVAL + - GetRandMicros(MAX_GETDATA_RANDOM_DELAY); +void PeerManager::AddTxAnnouncement(const CNode &node, const TxId &txid, + std::chrono::microseconds current_time) { + // For m_txrequest + AssertLockHeld(::cs_main); + NodeId nodeid = node.GetId(); + if (m_txrequest.Count(nodeid) >= MAX_PEER_TX_ANNOUNCEMENTS) { + // Too many queued announcements from this peer + return; } + const CNodeState *state = State(nodeid); - // We delay processing announcements from inbound peers - if (use_inbound_delay) { - process_time += INBOUND_PEER_TX_DELAY; + // Decide the TxRequestTracker parameters for this announcement: + // - "preferred": if fPreferredDownload is set (= outbound, or PF_NOBAN + // permission) + // - "reqtime": current time plus delays for: + // - NONPREF_PEER_TX_DELAY for announcements from non-preferred + // connections + // - OVERLOADED_PEER_TX_DELAY for announcements from peers which have at + // least MAX_PEER_TX_REQUEST_IN_FLIGHT requests in flight. + auto delay = std::chrono::microseconds{0}; + const bool preferred = state->fPreferredDownload; + if (!preferred) { + delay += NONPREF_PEER_TX_DELAY; } - return process_time; -} - -void RequestTx(CNodeState *state, const TxId &txid, - std::chrono::microseconds current_time) - EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - CNodeState::TxDownloadState &peer_download_state = state->m_tx_download; - if (peer_download_state.m_tx_announced.size() >= - MAX_PEER_TX_ANNOUNCEMENTS || - peer_download_state.m_tx_process_time.size() >= - MAX_PEER_TX_ANNOUNCEMENTS || - peer_download_state.m_tx_announced.count(txid)) { - // Too many queued announcements from this peer, or we already have - // this announcement - return; + const bool overloaded = + m_txrequest.CountInFlight(nodeid) >= MAX_PEER_TX_REQUEST_IN_FLIGHT; + if (overloaded) { + delay += OVERLOADED_PEER_TX_DELAY; } - peer_download_state.m_tx_announced.insert(txid); - - // Calculate the time to try requesting this transaction. Use - // fPreferredDownload as a proxy for outbound peers. - const auto process_time = - CalculateTxGetDataTime(txid, current_time, !state->fPreferredDownload); - - peer_download_state.m_tx_process_time.emplace(process_time, txid); + m_txrequest.ReceivedInv(nodeid, txid, preferred, current_time + delay); } -} // namespace - // This function is used for testing the stale tip eviction logic, see // denialofservice_tests.cpp void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) { @@ -1084,6 +970,7 @@ std::forward_as_tuple(addr, pnode->IsInboundConn(), pnode->IsManualConn())); + assert(m_txrequest.Count(nodeid) == 0); } { PeerRef peer = std::make_shared(nodeid); @@ -1144,6 +1031,7 @@ mapBlocksInFlight.erase(entry.hash); } EraseOrphansFor(nodeid); + m_txrequest.DisconnectedPeer(nodeid); nPreferredDownload -= state->fPreferredDownload; nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0); assert(nPeersWithValidatedDownloads >= 0); @@ -1159,6 +1047,7 @@ assert(nPreferredDownload == 0); assert(nPeersWithValidatedDownloads == 0); assert(g_outbound_peers_with_protect_from_disconnect == 0); + assert(m_txrequest.Size() == 0); } LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid); } @@ -3138,7 +3027,7 @@ return; } else if (!fAlreadyHave && !m_chainman.ActiveChainstate() .IsInitialBlockDownload()) { - RequestTx(State(pfrom.GetId()), txid, current_time); + AddTxAnnouncement(pfrom, txid, current_time); } } else { LogPrint(BCLog::NET, @@ -3428,10 +3317,7 @@ TxValidationState state; - CNodeState *nodestate = State(pfrom.GetId()); - nodestate->m_tx_download.m_tx_announced.erase(txid); - nodestate->m_tx_download.m_tx_in_flight.erase(txid); - EraseTxRequest(txid); + m_txrequest.ReceivedResponse(pfrom.GetId(), txid); if (!AlreadyHaveTx(txid, m_mempool) && AcceptToMemoryPool(config, m_mempool, state, ptx, @@ -3478,7 +3364,7 @@ const TxId _txid = txin.prevout.GetTxId(); pfrom.AddKnownTx(_txid); if (!AlreadyHaveTx(_txid, m_mempool)) { - RequestTx(State(pfrom.GetId()), _txid, current_time); + AddTxAnnouncement(pfrom, _txid, current_time); } } AddOrphanTx(ptx, pfrom.GetId()); @@ -4438,28 +4324,17 @@ } if (msg_type == NetMsgType::NOTFOUND) { - // Remove the NOTFOUND transactions from the peer - LOCK(cs_main); - CNodeState *state = State(pfrom.GetId()); std::vector vInv; vRecv >> vInv; if (vInv.size() <= - MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + MAX_PEER_TX_ANNOUNCEMENTS + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + LOCK(::cs_main); for (CInv &inv : vInv) { if (inv.IsMsgTx()) { - const TxId txid(inv.hash); - // If we receive a NOTFOUND message for a txid we requested, - // erase it from our data structures for this peer. - auto in_flight_it = - state->m_tx_download.m_tx_in_flight.find(txid); - if (in_flight_it == - state->m_tx_download.m_tx_in_flight.end()) { - // Skip any further work if this is a spurious NOTFOUND - // message. - continue; - } - state->m_tx_download.m_tx_in_flight.erase(in_flight_it); - state->m_tx_download.m_tx_announced.erase(txid); + // If we receive a NOTFOUND message for a tx we requested, + // mark the announcement for it as completed in + // TxRequestTracker. + m_txrequest.ReceivedResponse(pfrom.GetId(), TxId(inv.hash)); } } } @@ -5480,69 +5355,22 @@ // Message: getdata (transactions) // - // For robustness, expire old requests after a long timeout, so that we - // can resume downloading transactions from a peer even if they were - // unresponsive in the past. Eventually we should consider disconnecting - // peers, but this is conservative. - if (state.m_tx_download.m_check_expiry_timer <= current_time) { - for (auto it = state.m_tx_download.m_tx_in_flight.begin(); - it != state.m_tx_download.m_tx_in_flight.end();) { - if (it->second <= current_time - TX_EXPIRY_INTERVAL) { - LogPrint(BCLog::NET, - "timeout of inflight tx %s from peer=%d\n", - it->first.ToString(), pto->GetId()); - state.m_tx_download.m_tx_announced.erase(it->first); - state.m_tx_download.m_tx_in_flight.erase(it++); - } else { - ++it; - } - } - // On average, we do this check every TX_EXPIRY_INTERVAL. Randomize - // so that we're not doing this for all peers at the same time. - state.m_tx_download.m_check_expiry_timer = - current_time + TX_EXPIRY_INTERVAL / 2 + - GetRandMicros(TX_EXPIRY_INTERVAL); - } - - auto &tx_process_time = state.m_tx_download.m_tx_process_time; - while (!tx_process_time.empty() && - tx_process_time.begin()->first <= current_time && - state.m_tx_download.m_tx_in_flight.size() < - MAX_PEER_TX_IN_FLIGHT) { - const TxId txid = tx_process_time.begin()->second; - // Erase this entry from tx_process_time (it may be added back for - // processing at a later time, see below) - tx_process_time.erase(tx_process_time.begin()); - CInv inv(MSG_TX, txid); + for (const TxId &txid : + m_txrequest.GetRequestable(pto->GetId(), current_time)) { if (!AlreadyHaveTx(txid, m_mempool)) { - // If this transaction was last requested more than 1 minute - // ago, then request. - const auto last_request_time = GetTxRequestTime(txid); - if (last_request_time <= current_time - GETDATA_TX_INTERVAL) { - LogPrint(BCLog::NET, "Requesting %s peer=%d\n", - inv.ToString(), pto->GetId()); - vGetData.push_back(inv); - if (vGetData.size() >= MAX_GETDATA_SZ) { - m_connman.PushMessage( - pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); - vGetData.clear(); - } - UpdateTxRequestTime(txid, current_time); - state.m_tx_download.m_tx_in_flight.emplace(txid, - current_time); - } else { - // This transaction is in flight from someone else; queue - // up processing to happen after the download times out - // (with a slight delay for inbound peers, to prefer - // requests to outbound peers). - const auto next_process_time = CalculateTxGetDataTime( - txid, current_time, !state.fPreferredDownload); - tx_process_time.emplace(next_process_time, txid); + LogPrint(BCLog::NET, "Requesting tx %s peer=%d\n", + txid.ToString(), pto->GetId()); + vGetData.emplace_back(MSG_TX, txid); + if (vGetData.size() >= MAX_GETDATA_SZ) { + m_connman.PushMessage( + pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + vGetData.clear(); } + m_txrequest.RequestedTx(pto->GetId(), txid, + current_time + GETDATA_TX_INTERVAL); } else { // We have already seen this transaction, no need to download. - state.m_tx_download.m_tx_announced.erase(txid); - state.m_tx_download.m_tx_in_flight.erase(txid); + m_txrequest.ForgetTxId(txid); } } diff --git a/test/functional/p2p_tx_download.py b/test/functional/p2p_tx_download.py --- a/test/functional/p2p_tx_download.py +++ b/test/functional/p2p_tx_download.py @@ -42,15 +42,13 @@ # Constants from net_processing GETDATA_TX_INTERVAL = 60 # seconds -MAX_GETDATA_RANDOM_DELAY = 2 # seconds INBOUND_PEER_TX_DELAY = 2 # seconds +OVERLOADED_PEER_DELAY = 2 # seconds MAX_GETDATA_IN_FLIGHT = 100 -TX_EXPIRY_INTERVAL = GETDATA_TX_INTERVAL * 10 # Python test constants NUM_INBOUND = 10 -MAX_GETDATA_INBOUND_WAIT = GETDATA_TX_INTERVAL + \ - MAX_GETDATA_RANDOM_DELAY + INBOUND_PEER_TX_DELAY +MAX_GETDATA_INBOUND_WAIT = GETDATA_TX_INTERVAL + INBOUND_PEER_TX_DELAY class TxDownloadTest(BitcoinTestFramework): @@ -125,15 +123,14 @@ # * the first time it is re-requested from the outbound peer, plus # * 2 seconds to avoid races assert self.nodes[1].getpeerinfo()[0]['inbound'] is False - timeout = 2 + (MAX_GETDATA_RANDOM_DELAY + INBOUND_PEER_TX_DELAY) + ( - GETDATA_TX_INTERVAL + MAX_GETDATA_RANDOM_DELAY) + timeout = 2 + INBOUND_PEER_TX_DELAY + GETDATA_TX_INTERVAL self.log.info( "Tx should be received at node 1 after {} seconds".format(timeout)) self.sync_mempools(timeout=timeout) def test_in_flight_max(self): - self.log.info("Test that we don't request more than {} transactions from any peer, every {} minutes".format( - MAX_GETDATA_IN_FLIGHT, TX_EXPIRY_INTERVAL / 60)) + self.log.info("Test that we don't load peers with more than {} transaction requests immediately".format( + MAX_GETDATA_IN_FLIGHT)) txids = [i for i in range(MAX_GETDATA_IN_FLIGHT + 2)] p = self.nodes[0].p2ps[0] @@ -141,38 +138,144 @@ with mininode_lock: p.tx_getdata_count = 0 - p.send_message(msg_inv([CInv(t=MSG_TX, h=i) for i in txids])) - wait_until(lambda: p.tx_getdata_count >= - MAX_GETDATA_IN_FLIGHT, lock=mininode_lock) + mock_time = int(time.time() + 1) + self.nodes[0].setmocktime(mock_time) + for i in range(MAX_GETDATA_IN_FLIGHT): + p.send_message(msg_inv([CInv(t=MSG_TX, h=txids[i])])) + p.sync_with_ping() + mock_time += INBOUND_PEER_TX_DELAY + self.nodes[0].setmocktime(mock_time) + p.wait_until(lambda: p.tx_getdata_count >= MAX_GETDATA_IN_FLIGHT) + for i in range(MAX_GETDATA_IN_FLIGHT, len(txids)): + p.send_message(msg_inv([CInv(t=MSG_TX, h=txids[i])])) + p.sync_with_ping() + self.log.info( + "No more than {} requests should be seen within {} seconds after announcement".format( + MAX_GETDATA_IN_FLIGHT, + INBOUND_PEER_TX_DELAY + + OVERLOADED_PEER_DELAY - + 1)) + self.nodes[0].setmocktime( + mock_time + + INBOUND_PEER_TX_DELAY + + OVERLOADED_PEER_DELAY - + 1) + p.sync_with_ping() with mininode_lock: assert_equal(p.tx_getdata_count, MAX_GETDATA_IN_FLIGHT) + self.log.info( + "If we wait {} seconds after announcement, we should eventually get more requests".format( + INBOUND_PEER_TX_DELAY + + OVERLOADED_PEER_DELAY)) + self.nodes[0].setmocktime( + mock_time + + INBOUND_PEER_TX_DELAY + + OVERLOADED_PEER_DELAY) + p.wait_until(lambda: p.tx_getdata_count == len(txids)) + def test_expiry_fallback(self): self.log.info( - "Now check that if we send a NOTFOUND for a transaction, we'll get one more request") - p.send_message(msg_notfound(vec=[CInv(t=MSG_TX, h=txids[0])])) - wait_until( - lambda: p.tx_getdata_count >= MAX_GETDATA_IN_FLIGHT + 1, - timeout=10, - lock=mininode_lock) + 'Check that expiry will select another peer for download') + TXID = 0xffaa + peer1 = self.nodes[0].add_p2p_connection(TestP2PConn()) + peer2 = self.nodes[0].add_p2p_connection(TestP2PConn()) + for p in [peer1, peer2]: + p.send_message(msg_inv([CInv(t=MSG_TX, h=TXID)])) + # One of the peers is asked for the tx + peer2.wait_until( + lambda: sum( + p.tx_getdata_count for p in [ + peer1, peer2]) == 1) with mininode_lock: - assert_equal(p.tx_getdata_count, MAX_GETDATA_IN_FLIGHT + 1) + peer_expiry, peer_fallback = ( + peer1, peer2) if peer1.tx_getdata_count == 1 else ( + peer2, peer1) + assert_equal(peer_fallback.tx_getdata_count, 0) + # Wait for request to peer_expiry to expire + self.nodes[0].setmocktime(int(time.time()) + GETDATA_TX_INTERVAL + 1) + peer_fallback.wait_until( + lambda: peer_fallback.tx_getdata_count >= 1, timeout=1) + with mininode_lock: + assert_equal(peer_fallback.tx_getdata_count, 1) + # reset mocktime + self.restart_node(0) - WAIT_TIME = TX_EXPIRY_INTERVAL // 2 + TX_EXPIRY_INTERVAL + def test_disconnect_fallback(self): self.log.info( - "if we wait about {} minutes, we should eventually get more requests".format( - WAIT_TIME / 60)) - self.nodes[0].setmocktime(int(time.time() + WAIT_TIME)) - wait_until(lambda: p.tx_getdata_count == MAX_GETDATA_IN_FLIGHT + 2) - self.nodes[0].setmocktime(0) + 'Check that disconnect will select another peer for download') + TXID = 0xffbb + peer1 = self.nodes[0].add_p2p_connection(TestP2PConn()) + peer2 = self.nodes[0].add_p2p_connection(TestP2PConn()) + for p in [peer1, peer2]: + p.send_message(msg_inv([CInv(t=MSG_TX, h=TXID)])) + # One of the peers is asked for the tx + peer2.wait_until( + lambda: sum( + p.tx_getdata_count for p in [ + peer1, peer2]) == 1) + with mininode_lock: + peer_disconnect, peer_fallback = ( + peer1, peer2) if peer1.tx_getdata_count == 1 else ( + peer2, peer1) + assert_equal(peer_fallback.tx_getdata_count, 0) + peer_disconnect.peer_disconnect() + peer_disconnect.wait_for_disconnect() + peer_fallback.wait_until( + lambda: peer_fallback.tx_getdata_count >= 1, timeout=1) + with mininode_lock: + assert_equal(peer_fallback.tx_getdata_count, 1) + + def test_notfound_fallback(self): + self.log.info( + 'Check that notfounds will select another peer for download immediately') + TXID = 0xffdd + peer1 = self.nodes[0].add_p2p_connection(TestP2PConn()) + peer2 = self.nodes[0].add_p2p_connection(TestP2PConn()) + for p in [peer1, peer2]: + p.send_message(msg_inv([CInv(t=MSG_TX, h=TXID)])) + # One of the peers is asked for the tx + peer2.wait_until( + lambda: sum( + p.tx_getdata_count for p in [ + peer1, peer2]) == 1) + with mininode_lock: + peer_notfound, peer_fallback = ( + peer1, peer2) if peer1.tx_getdata_count == 1 else ( + peer2, peer1) + assert_equal(peer_fallback.tx_getdata_count, 0) + # Send notfound, so that fallback peer is selected + peer_notfound.send_and_ping(msg_notfound(vec=[CInv(MSG_TX, TXID)])) + peer_fallback.wait_until( + lambda: peer_fallback.tx_getdata_count >= 1, timeout=1) + with mininode_lock: + assert_equal(peer_fallback.tx_getdata_count, 1) + + def test_preferred_inv(self): + self.log.info( + 'Check that invs from preferred peers are downloaded immediately') + self.restart_node(0, extra_args=['-whitelist=noban@127.0.0.1']) + peer = self.nodes[0].add_p2p_connection(TestP2PConn()) + peer.send_message(msg_inv([CInv(t=MSG_TX, h=0xff00ff00)])) + peer.wait_until(lambda: peer.tx_getdata_count >= 1, timeout=1) + with mininode_lock: + assert_equal(peer.tx_getdata_count, 1) def test_spurious_notfound(self): self.log.info('Check that spurious notfound is ignored') self.nodes[0].p2ps[0].send_message(msg_notfound(vec=[CInv(MSG_TX, 1)])) def run_test(self): + # Run tests without mocktime that only need one peer-connection first, + # to avoid restarting the nodes + self.test_expiry_fallback() + self.test_disconnect_fallback() + self.test_notfound_fallback() + self.test_preferred_inv() + self.test_spurious_notfound() + # Run each test against new bitcoind instances, as setting mocktimes has long-term effects on when # the next trickle relay event happens. - for test in [self.test_spurious_notfound, self.test_in_flight_max, + for test in [self.test_in_flight_max, self.test_inv_block, self.test_tx_requests]: self.stop_nodes() self.start_nodes()