diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -91,25 +91,26 @@ /** Maximum number of announced transactions from a peer */ static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; /** How many microseconds to delay requesting transactions from inbound peers */ -// 2 seconds -static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; +static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{ + std::chrono::seconds{2}}; /** * How long to wait (in microseconds) before downloading a transaction from an * additional peer. */ -// 1 minute -static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; +static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{ + std::chrono::seconds{60}}; /** * Maximum delay (in microseconds) for transaction requests to avoid biasing * some peers over others. */ -// 2 seconds -static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; +static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{ + std::chrono::seconds{2}}; /** * How long to wait (in microseconds) before expiring an in-flight getdata * request to a peer. */ -static constexpr int64_t TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL; +static constexpr std::chrono::microseconds TX_EXPIRY_INTERVAL{ + GETDATA_TX_INTERVAL * 10}; static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, "To preserve security, MAX_GETDATA_RANDOM_DELAY should not " "exceed INBOUND_PEER_DELAY"); @@ -407,16 +408,16 @@ * Track when to attempt download of announced transactions (process * time in micros -> txid) */ - std::multimap m_tx_process_time; + std::multimap m_tx_process_time; //! Store all the transactions a peer has recently announced std::set m_tx_announced; //! Store transactions which were requested by us, with timestamp - std::map m_tx_in_flight; + std::map m_tx_in_flight; //! Periodically check for stuck getdata requests - int64_t m_check_expiry_timer{0}; + std::chrono::microseconds m_check_expiry_timer{0}; }; TxDownloadState m_tx_download; @@ -455,7 +456,8 @@ // Keeps track of the time (in microseconds) when transactions were requested // last time -limitedmap g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ); +limitedmap + g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ); /** Map maintaining per-node state. */ static std::map mapNodeState GUARDED_BY(cs_main); @@ -840,15 +842,17 @@ g_already_asked_for.erase(txid); } -int64_t GetTxRequestTime(const TxId &txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { +std::chrono::microseconds GetTxRequestTime(const TxId &txid) + EXCLUSIVE_LOCKS_REQUIRED(cs_main) { auto it = g_already_asked_for.find(txid); if (it != g_already_asked_for.end()) { return it->second; } - return 0; + return {}; } -void UpdateTxRequestTime(const TxId &txid, int64_t request_time) +void UpdateTxRequestTime(const TxId &txid, + std::chrono::microseconds request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { auto it = g_already_asked_for.find(txid); if (it == g_already_asked_for.end()) { @@ -858,19 +862,20 @@ } } -int64_t CalculateTxGetDataTime(const TxId &txid, int64_t current_time, - bool use_inbound_delay) +std::chrono::microseconds +CalculateTxGetDataTime(const TxId &txid, std::chrono::microseconds current_time, + bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - int64_t process_time; - int64_t last_request_time = GetTxRequestTime(txid); + std::chrono::microseconds process_time; + const auto last_request_time = GetTxRequestTime(txid); // First time requesting this tx - if (last_request_time == 0) { + if (last_request_time.count() == 0) { process_time = current_time; } else { // Randomize the delay to avoid biasing some peers over others (such as // due to fixed ordering of peer processing in ThreadMessageHandler) process_time = last_request_time + GETDATA_TX_INTERVAL + - GetRand(MAX_GETDATA_RANDOM_DELAY); + GetRandMicros(MAX_GETDATA_RANDOM_DELAY); } // We delay processing announcements from inbound peers @@ -881,7 +886,8 @@ return process_time; } -void RequestTx(CNodeState *state, const TxId &txid, int64_t nNow) +void RequestTx(CNodeState *state, const TxId &txid, + std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { CNodeState::TxDownloadState &peer_download_state = state->m_tx_download; if (peer_download_state.m_tx_announced.size() >= @@ -897,8 +903,8 @@ // Calculate the time to try requesting this transaction. Use // fPreferredDownload as a proxy for outbound peers. - int64_t process_time = - CalculateTxGetDataTime(txid, nNow, !state->fPreferredDownload); + const auto process_time = + CalculateTxGetDataTime(txid, current_time, !state->fPreferredDownload); peer_download_state.m_tx_process_time.emplace(process_time, txid); } @@ -2468,7 +2474,7 @@ LOCK(cs_main); - int64_t nNow = GetTimeMicros(); + const auto current_time = GetTime(); for (CInv &inv : vInv) { if (interruptMsgProc) { @@ -2509,7 +2515,8 @@ inv.hash.ToString(), pfrom->GetId()); } else if (!fAlreadyHave && !fImporting && !fReindex && !::ChainstateActive().IsInitialBlockDownload()) { - RequestTx(State(pfrom->GetId()), TxId(inv.hash), nNow); + RequestTx(State(pfrom->GetId()), TxId(inv.hash), + current_time); } } } @@ -2893,7 +2900,7 @@ } } if (!fRejectedParents) { - int64_t nNow = GetTimeMicros(); + const auto current_time = GetTime(); for (const CTxIn &txin : tx.vin) { // FIXME: MSG_TX should use a TxHash, not a TxId. @@ -2901,7 +2908,7 @@ CInv _inv(MSG_TX, _txid); pfrom->AddInventoryKnown(_inv); if (!AlreadyHave(_inv)) { - RequestTx(State(pfrom->GetId()), _txid, nNow); + RequestTx(State(pfrom->GetId()), _txid, current_time); } } AddOrphanTx(ptx, pfrom->GetId()); @@ -4632,6 +4639,9 @@ } // Detect whether we're stalling + const auto current_time = GetTime(); + // nNow is the current system time (GetTimeMicros is not mockable) and + // should be replaced by the mockable current_time eventually nNow = GetTimeMicros(); if (state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) { @@ -4750,10 +4760,10 @@ // resume downloading transactions from a peer even if they were // unresponsive in the past. Eventually we should consider disconnecting // peers, but this is conservative. - if (state.m_tx_download.m_check_expiry_timer <= nNow) { + if (state.m_tx_download.m_check_expiry_timer <= current_time) { for (auto it = state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) { - if (it->second <= nNow - TX_EXPIRY_INTERVAL) { + if (it->second <= current_time - TX_EXPIRY_INTERVAL) { LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId()); state.m_tx_download.m_tx_announced.erase(it->first); @@ -4765,11 +4775,13 @@ // On average, we do this check every TX_EXPIRY_INTERVAL. Randomize // so that we're not doing this for all peers at the same time. state.m_tx_download.m_check_expiry_timer = - nNow + TX_EXPIRY_INTERVAL / 2 + GetRand(TX_EXPIRY_INTERVAL); + current_time + TX_EXPIRY_INTERVAL / 2 + + GetRandMicros(TX_EXPIRY_INTERVAL); } auto &tx_process_time = state.m_tx_download.m_tx_process_time; - while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && + while (!tx_process_time.empty() && + tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { const TxId txid = tx_process_time.begin()->second; // Erase this entry from tx_process_time (it may be added back for @@ -4779,8 +4791,8 @@ if (!AlreadyHave(inv)) { // If this transaction was last requested more than 1 minute ago, // then request. - int64_t last_request_time = GetTxRequestTime(txid); - if (last_request_time <= nNow - GETDATA_TX_INTERVAL) { + const auto last_request_time = GetTxRequestTime(txid); + if (last_request_time <= current_time - GETDATA_TX_INTERVAL) { LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); vGetData.push_back(inv); @@ -4789,15 +4801,15 @@ pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); vGetData.clear(); } - UpdateTxRequestTime(txid, nNow); - state.m_tx_download.m_tx_in_flight.emplace(txid, nNow); + UpdateTxRequestTime(txid, current_time); + state.m_tx_download.m_tx_in_flight.emplace(txid, current_time); } else { // This transaction is in flight from someone else; queue // up processing to happen after the download times out // (with a slight delay for inbound peers, to prefer // requests to outbound peers). - int64_t next_process_time = CalculateTxGetDataTime( - txid, nNow, !state.fPreferredDownload); + const auto next_process_time = CalculateTxGetDataTime( + txid, current_time, !state.fPreferredDownload); tx_process_time.emplace(next_process_time, txid); } } else { diff --git a/src/random.h b/src/random.h --- a/src/random.h +++ b/src/random.h @@ -10,6 +10,7 @@ #include #include +#include // For std::chrono::microseconds #include #include @@ -75,6 +76,8 @@ */ void GetRandBytes(uint8_t *buf, int num) noexcept; uint64_t GetRand(uint64_t nMax) noexcept; +std::chrono::microseconds +GetRandMicros(std::chrono::microseconds duration_max) noexcept; int GetRandInt(int nMax) noexcept; uint256 GetRandHash() noexcept; diff --git a/src/random.cpp b/src/random.cpp --- a/src/random.cpp +++ b/src/random.cpp @@ -728,6 +728,11 @@ return FastRandomContext(g_mock_deterministic_tests).randrange(nMax); } +std::chrono::microseconds +GetRandMicros(std::chrono::microseconds duration_max) noexcept { + return std::chrono::microseconds{GetRand(duration_max.count())}; +} + int GetRandInt(int nMax) noexcept { return GetRand(nMax); } diff --git a/test/functional/p2p_tx_download.py b/test/functional/p2p_tx_download.py new file mode 100755 --- /dev/null +++ b/test/functional/p2p_tx_download.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +# Copyright (c) 2019 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +""" +Test transaction download behavior +""" + +from test_framework.address import ADDRESS_BCHREG_UNSPENDABLE +from test_framework.messages import ( + CInv, + CTransaction, + FromHex, + MSG_TX, + MSG_TYPE_MASK, + msg_inv, + msg_notfound, +) +from test_framework.mininode import ( + P2PInterface, + mininode_lock, +) +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import ( + assert_equal, + wait_until, +) + +import time + + +class TestP2PConn(P2PInterface): + def __init__(self): + super().__init__() + self.tx_getdata_count = 0 + + def on_getdata(self, message): + for i in message.inv: + if i.type & MSG_TYPE_MASK == MSG_TX: + self.tx_getdata_count += 1 + + +# Constants from net_processing +GETDATA_TX_INTERVAL = 60 # seconds +MAX_GETDATA_RANDOM_DELAY = 2 # seconds +INBOUND_PEER_TX_DELAY = 2 # seconds +MAX_GETDATA_IN_FLIGHT = 100 +TX_EXPIRY_INTERVAL = GETDATA_TX_INTERVAL * 10 + +# Python test constants +NUM_INBOUND = 10 +MAX_GETDATA_INBOUND_WAIT = GETDATA_TX_INTERVAL + \ + MAX_GETDATA_RANDOM_DELAY + INBOUND_PEER_TX_DELAY + + +class TxDownloadTest(BitcoinTestFramework): + def set_test_params(self): + self.setup_clean_chain = False + self.num_nodes = 2 + + def test_tx_requests(self): + self.log.info( + "Test that we request transactions from all our peers, eventually") + + txid = 0xdeadbeef + + self.log.info("Announce the txid from each incoming peer to node 0") + msg = msg_inv([CInv(t=1, h=txid)]) + for p in self.nodes[0].p2ps: + p.send_message(msg) + p.sync_with_ping() + + outstanding_peer_index = [i for i in range(len(self.nodes[0].p2ps))] + + def getdata_found(peer_index): + p = self.nodes[0].p2ps[peer_index] + with mininode_lock: + return p.last_message.get( + "getdata") and p.last_message["getdata"].inv[-1].hash == txid + + node_0_mocktime = int(time.time()) + while outstanding_peer_index: + node_0_mocktime += MAX_GETDATA_INBOUND_WAIT + self.nodes[0].setmocktime(node_0_mocktime) + wait_until(lambda: any(getdata_found(i) + for i in outstanding_peer_index)) + for i in outstanding_peer_index: + if getdata_found(i): + outstanding_peer_index.remove(i) + + self.nodes[0].setmocktime(0) + self.log.info("All outstanding peers received a getdata") + + def test_inv_block(self): + self.log.info("Generate a transaction on node 0") + tx = self.nodes[0].createrawtransaction( + inputs=[{ + # coinbase + "txid": self.nodes[0].getblock(self.nodes[0].getblockhash(1))['tx'][0], + "vout": 0 + }], + outputs={ADDRESS_BCHREG_UNSPENDABLE: 50 - 0.00025}, + ) + tx = self.nodes[0].signrawtransactionwithkey( + hexstring=tx, + privkeys=[self.nodes[0].get_deterministic_priv_key().key], + )['hex'] + ctx = FromHex(CTransaction(), tx) + txid = int(ctx.rehash(), 16) + + self.log.info( + "Announce the transaction to all nodes from all {} incoming peers, but never send it".format(NUM_INBOUND)) + msg = msg_inv([CInv(t=1, h=txid)]) + for p in self.peers: + p.send_message(msg) + p.sync_with_ping() + + self.log.info("Put the tx in node 0's mempool") + self.nodes[0].sendrawtransaction(tx) + + # Since node 1 is connected outbound to an honest peer (node 0), it + # should get the tx within a timeout. (Assuming that node 0 + # announced the tx within the timeout) + # The timeout is the sum of + # * the worst case until the tx is first requested from an inbound + # peer, plus + # * the first time it is re-requested from the outbound peer, plus + # * 2 seconds to avoid races + timeout = 2 + (MAX_GETDATA_RANDOM_DELAY + INBOUND_PEER_TX_DELAY) + ( + GETDATA_TX_INTERVAL + MAX_GETDATA_RANDOM_DELAY) + self.log.info( + "Tx should be received at node 1 after {} seconds".format(timeout)) + self.sync_mempools(timeout=timeout) + + def test_in_flight_max(self): + self.log.info("Test that we don't request more than {} transactions from any peer, every {} minutes".format( + MAX_GETDATA_IN_FLIGHT, TX_EXPIRY_INTERVAL / 60)) + txids = [i for i in range(MAX_GETDATA_IN_FLIGHT + 2)] + + p = self.nodes[0].p2ps[0] + + with mininode_lock: + p.tx_getdata_count = 0 + + p.send_message(msg_inv([CInv(t=1, h=i) for i in txids])) + wait_until(lambda: p.tx_getdata_count >= + MAX_GETDATA_IN_FLIGHT, lock=mininode_lock) + with mininode_lock: + assert_equal(p.tx_getdata_count, MAX_GETDATA_IN_FLIGHT) + + self.log.info( + "Now check that if we send a NOTFOUND for a transaction, we'll get one more request") + p.send_message(msg_notfound(vec=[CInv(t=1, h=txids[0])])) + wait_until( + lambda: p.tx_getdata_count >= MAX_GETDATA_IN_FLIGHT + 1, + timeout=10, + lock=mininode_lock) + with mininode_lock: + assert_equal(p.tx_getdata_count, MAX_GETDATA_IN_FLIGHT + 1) + + WAIT_TIME = TX_EXPIRY_INTERVAL // 2 + TX_EXPIRY_INTERVAL + self.log.info( + "if we wait about {} minutes, we should eventually get more requests".format( + WAIT_TIME / 60)) + self.nodes[0].setmocktime(int(time.time() + WAIT_TIME)) + wait_until(lambda: p.tx_getdata_count == MAX_GETDATA_IN_FLIGHT + 2) + self.nodes[0].setmocktime(0) + + def run_test(self): + # Setup the p2p connections + self.peers = [] + for node in self.nodes: + for i in range(NUM_INBOUND): + self.peers.append(node.add_p2p_connection(TestP2PConn())) + + self.log.info( + "Nodes are setup with {} incoming connections each".format(NUM_INBOUND)) + + # Test the in-flight max first, because we want no transactions in + # flight ahead of this test. + self.test_in_flight_max() + + self.test_inv_block() + + self.test_tx_requests() + + +if __name__ == '__main__': + TxDownloadTest().main()