diff --git a/doc/release-notes.md b/doc/release-notes.md --- a/doc/release-notes.md +++ b/doc/release-notes.md @@ -11,3 +11,13 @@ until v0.21 at which time `getaccountaddress` will also be removed. To use `getaccountaddress` start `bitcoind` with the `-deprecatedrpc=accounts` option. See the v0.20.6 release notes for more details. + +Network +------- + - When fetching a transaction announced by multiple peers, previous versions of + Bitcoin ABC would sequentially attempt to download the transaction from each + announcing peer until the transaction is received, in the order that those + peers' announcements were received. In this release, the download logic has + changed to randomize the fetch order across peers and to prefer sending + download requests to outbound peers over inbound peers. This fixes an issue + where inbound peers can prevent a node from getting a transaction. diff --git a/src/net.h b/src/net.h --- a/src/net.h +++ b/src/net.h @@ -75,10 +75,6 @@ #else static const bool DEFAULT_UPNP = false; #endif -/** The maximum number of entries in mapAskFor */ -static const size_t MAPASKFOR_MAX_SZ = MAX_INV_SZ; -/** The maximum number of entries in setAskFor (larger due to getdata latency)*/ -static const size_t SETASKFOR_MAX_SZ = 2 * MAX_INV_SZ; /** The maximum number of peer connections to maintain. */ static const unsigned int DEFAULT_MAX_PEER_CONNECTIONS = 125; /** The default for -maxuploadtarget. 0 = Unlimited */ @@ -504,8 +500,6 @@ extern bool fListen; extern bool fRelayTxes; -extern limitedmap mapAlreadyAskedFor; - struct LocalServiceInfo { int nScore; int nPort; @@ -707,8 +701,6 @@ // requested. std::vector vInventoryBlockToSend GUARDED_BY(cs_inventory); CCriticalSection cs_inventory; - std::set setAskFor; - std::multimap mapAskFor; int64_t nNextInvSend{0}; // Used for headers announcements - unfiltered blocks to relay. std::vector vBlockHashesToAnnounce GUARDED_BY(cs_inventory); @@ -834,8 +826,6 @@ vBlockHashesToAnnounce.push_back(hash); } - void AskFor(const CInv &inv); - void CloseSocketDisconnect(); void copyStats(CNodeStats &stats); diff --git a/src/net.cpp b/src/net.cpp --- a/src/net.cpp +++ b/src/net.cpp @@ -93,8 +93,6 @@ std::map mapLocalHost GUARDED_BY(cs_mapLocalHost); static bool vfLimited[NET_MAX] GUARDED_BY(cs_mapLocalHost) = {}; -limitedmap mapAlreadyAskedFor(MAX_INV_SZ); - void CConnman::AddOneShot(const std::string &strDest) { LOCK(cs_vOneShots); vOneShots.push_back(strDest); @@ -2685,48 +2683,6 @@ CloseSocket(hSocket); } -void CNode::AskFor(const CInv &inv) { - if (mapAskFor.size() > MAPASKFOR_MAX_SZ || - setAskFor.size() > SETASKFOR_MAX_SZ) { - return; - } - - // a peer may not have multiple non-responded queue positions for a single - // inv item. - if (!setAskFor.insert(inv.hash).second) { - return; - } - - // We're using mapAskFor as a priority queue, the key is the earliest time - // the request can be sent. - int64_t nRequestTime; - limitedmap::const_iterator it = - mapAlreadyAskedFor.find(inv.hash); - if (it != mapAlreadyAskedFor.end()) { - nRequestTime = it->second; - } else { - nRequestTime = 0; - } - LogPrint(BCLog::NET, "askfor %s %d (%s) peer=%d\n", inv.ToString(), - nRequestTime, FormatISO8601DateTime(nRequestTime / 1000000), id); - - // Make sure not to reuse time indexes to keep things in the same order - int64_t nNow = GetTimeMicros() - 1000000; - static int64_t nLastTime; - ++nLastTime; - nNow = std::max(nNow, nLastTime); - nLastTime = nNow; - - // Each retry is 2 minutes after the last - nRequestTime = std::max(nRequestTime + 2 * 60 * 1000000, nNow); - if (it != mapAlreadyAskedFor.end()) { - mapAlreadyAskedFor.update(it, nRequestTime); - } else { - mapAlreadyAskedFor.insert(std::make_pair(inv.hash, nRequestTime)); - } - mapAskFor.insert(std::make_pair(nRequestTime, inv)); -} - bool CConnman::NodeFullyConnected(const CNode *pnode) { return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; } diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -82,6 +82,38 @@ /// Age after which a block is considered historical for purposes of rate /// limiting block relay. Set to one week, denominated in seconds. static constexpr int HISTORICAL_BLOCK_AGE = 7 * 24 * 60 * 60; +/** Maximum number of in-flight transactions from a peer */ +static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100; +/** Maximum number of announced transactions from a peer */ +static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; +/** How many microseconds to delay requesting transactions from inbound peers */ +// 2 seconds +static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; +/** + * How long to wait (in microseconds) before downloading a transaction from an + * additional peer. + */ +// 1 minute +static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; +/** + * Maximum delay (in microseconds) for transaction requests to avoid biasing + * some peers over others. + */ +// 2 seconds +static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; +/** + * How long to wait (in microseconds) before expiring an in-flight getdata + * request to a peer. + */ +static constexpr int64_t TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL; +static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, + "To preserve security, MAX_GETDATA_RANDOM_DELAY should not " + "exceed INBOUND_PEER_DELAY"); +/** + * Limit to avoid sending big packets. Not used in processing incoming GETDATA + * for compatibility. + */ +static const unsigned int MAX_GETDATA_SZ = 1000; /// How many non standard orphan do we consider from a node before ignoring it. static constexpr uint32_t MAX_NON_STANDARD_ORPHAN_PER_NODE = 5; @@ -294,6 +326,70 @@ //! Time of last new block announcement int64_t m_last_block_announcement; + /* + * State associated with transaction download. + * + * Tx download algorithm: + * + * When inv comes in, queue up (process_time, txid) inside the peer's + * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer + * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS). + * + * The process_time for a transaction is set to nNow for outbound peers, + * nNow + 2 seconds for inbound peers. This is the time at which we'll + * consider trying to request the transaction from the peer in + * SendMessages(). The delay for inbound peers is to allow outbound peers + * a chance to announce before we request from inbound peers, to prevent + * an adversary from using inbound connections to blind us to a + * transaction (InvBlock). + * + * When we call SendMessages() for a given peer, + * we will loop over the transactions in m_tx_process_time, looking + * at the transactions whose process_time <= nNow. We'll request each + * such transaction that we don't have already and that hasn't been + * requested from another peer recently, up until we hit the + * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update + * g_already_asked_for for each requested txid, storing the time of the + * GETDATA request. We use g_already_asked_for to coordinate transaction + * requests amongst our peers. + * + * For transactions that we still need but we have already recently + * requested from some other peer, we'll reinsert (process_time, txid) + * back into the peer's m_tx_process_time at the point in the future at + * which the most recent GETDATA request would time out (ie + * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for). + * We add an additional delay for inbound peers, again to prefer + * attempting download from outbound peers first. + * We also add an extra small random delay up to 2 seconds + * to avoid biasing some peers over others. (e.g., due to fixed ordering + * of peer processing in ThreadMessageHandler). + * + * When we receive a transaction from a peer, we remove the txid from the + * peer's m_tx_in_flight set and from their recently announced set + * (m_tx_announced). We also clear g_already_asked_for for that entry, so + * that if somehow the transaction is not accepted but also not added to + * the reject filter, then we will eventually redownload from other + * peers. + */ + struct TxDownloadState { + /** + * Track when to attempt download of announced transactions (process + * time in micros -> txid) + */ + std::multimap m_tx_process_time; + + //! Store all the transactions a peer has recently announced + std::set m_tx_announced; + + //! Store transactions which were requested by us, with timestamp + std::map m_tx_in_flight; + + //! Periodically check for stuck getdata requests + int64_t m_check_expiry_timer{0}; + }; + + TxDownloadState m_tx_download; + CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) { fCurrentlyConnected = false; @@ -320,6 +416,10 @@ } }; +// Keeps track of the time (in microseconds) when transactions were requested +// last time +limitedmap g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ); + /** Map maintaining per-node state. */ static std::map mapNodeState GUARDED_BY(cs_main); @@ -698,6 +798,73 @@ } } +void EraseTxRequest(const TxId &txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { + g_already_asked_for.erase(txid); +} + +int64_t GetTxRequestTime(const TxId &txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { + auto it = g_already_asked_for.find(txid); + if (it != g_already_asked_for.end()) { + return it->second; + } + return 0; +} + +void UpdateTxRequestTime(const TxId &txid, int64_t request_time) + EXCLUSIVE_LOCKS_REQUIRED(cs_main) { + auto it = g_already_asked_for.find(txid); + if (it == g_already_asked_for.end()) { + g_already_asked_for.insert(std::make_pair(txid, request_time)); + } else { + g_already_asked_for.update(it, request_time); + } +} + +int64_t CalculateTxGetDataTime(const TxId &txid, int64_t current_time, + bool use_inbound_delay) + EXCLUSIVE_LOCKS_REQUIRED(cs_main) { + int64_t process_time; + int64_t last_request_time = GetTxRequestTime(txid); + // First time requesting this tx + if (last_request_time == 0) { + process_time = current_time; + } else { + // Randomize the delay to avoid biasing some peers over others (such as + // due to fixed ordering of peer processing in ThreadMessageHandler) + process_time = last_request_time + GETDATA_TX_INTERVAL + + GetRand(MAX_GETDATA_RANDOM_DELAY); + } + + // We delay processing announcements from inbound peers + if (use_inbound_delay) { + process_time += INBOUND_PEER_TX_DELAY; + } + + return process_time; +} + +void RequestTx(CNodeState *state, const TxId &txid, int64_t nNow) + EXCLUSIVE_LOCKS_REQUIRED(cs_main) { + CNodeState::TxDownloadState &peer_download_state = state->m_tx_download; + if (peer_download_state.m_tx_announced.size() >= + MAX_PEER_TX_ANNOUNCEMENTS || + peer_download_state.m_tx_process_time.size() >= + MAX_PEER_TX_ANNOUNCEMENTS || + peer_download_state.m_tx_announced.count(txid)) { + // Too many queued announcements from this peer, or we already have + // this announcement + return; + } + peer_download_state.m_tx_announced.insert(txid); + + // Calculate the time to try requesting this transaction. Use + // fPreferredDownload as a proxy for outbound peers. + int64_t process_time = + CalculateTxGetDataTime(txid, nNow, !state->fPreferredDownload); + + peer_download_state.m_tx_process_time.emplace(process_time, txid); +} + } // namespace // This function is used for testing the stale tip eviction logic, see @@ -1535,13 +1702,17 @@ if (!vNotFound.empty()) { // Let the peer know that we didn't find what it asked for, so it - // doesn't have to wait around forever. Currently only SPV clients - // actually care about this message: it's needed when they are - // recursively walking the dependencies of relevant unconfirmed - // transactions. SPV clients want to do that because they want to know - // about (and store and rebroadcast and risk analyze) the dependencies - // of transactions relevant to them, without having to download the - // entire memory pool. + // doesn't have to wait around forever. SPV clients care about this + // message: it's needed when they are recursively walking the + // dependencies of relevant unconfirmed transactions. SPV clients want + // to do that because they want to know about (and store and rebroadcast + // and risk analyze) the dependencies of transactions relevant to them, + // without having to download the entire memory pool. Also, other nodes + // can use these messages to automatically request a transaction from + // some other peer that annnounced it, and stop waiting for us to + // respond. In normal operation, we often send NOTFOUND messages for + // parents of transactions that we relay; if a peer is missing a parent, + // they may assume we have them and request the parents from us. connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::NOTFOUND, vNotFound)); } @@ -2261,7 +2432,9 @@ LOCK(cs_main); - for (auto &inv : vInv) { + int64_t nNow = GetTimeMicros(); + + for (CInv &inv : vInv) { if (interruptMsgProc) { return true; } @@ -2299,7 +2472,7 @@ inv.hash.ToString(), pfrom->GetId()); } else if (!fAlreadyHave && !fImporting && !fReindex && !IsInitialBlockDownload()) { - pfrom->AskFor(inv); + RequestTx(State(pfrom->GetId()), TxId(inv.hash), nNow); } } } @@ -2573,8 +2746,10 @@ CValidationState state; const TxId txid(inv.hash); - pfrom->setAskFor.erase(txid); - mapAlreadyAskedFor.erase(txid); + CNodeState *nodestate = State(pfrom->GetId()); + nodestate->m_tx_download.m_tx_announced.erase(txid); + nodestate->m_tx_download.m_tx_in_flight.erase(txid); + EraseTxRequest(txid); if (!AlreadyHave(inv) && AcceptToMemoryPool(config, g_mempool, state, ptx, true, @@ -2677,12 +2852,15 @@ } } if (!fRejectedParents) { + int64_t nNow = GetTimeMicros(); + for (const CTxIn &txin : tx.vin) { // FIXME: MSG_TX should use a TxHash, not a TxId. - CInv _inv(MSG_TX, txin.prevout.GetTxId()); + const TxId _txid = txin.prevout.GetTxId(); + CInv _inv(MSG_TX, _txid); pfrom->AddInventoryKnown(_inv); if (!AlreadyHave(_inv)) { - pfrom->AskFor(_inv); + RequestTx(State(pfrom->GetId()), _txid, nNow); } } AddOrphanTx(ptx, pfrom->GetId()); @@ -3400,8 +3578,31 @@ } if (strCommand == NetMsgType::NOTFOUND) { - // We do not care about the NOTFOUND message, but logging an Unknown - // Command message would be undesirable as we transmit it ourselves. + // Remove the NOTFOUND transactions from the peer + LOCK(cs_main); + CNodeState *state = State(pfrom->GetId()); + std::vector vInv; + vRecv >> vInv; + if (vInv.size() <= + MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + for (CInv &inv : vInv) { + if (inv.type == MSG_TX) { + const TxId txid(inv.hash); + // If we receive a NOTFOUND message for a txid we requested, + // erase it from our data structures for this peer. + auto in_flight_it = + state->m_tx_download.m_tx_in_flight.find(txid); + if (in_flight_it == + state->m_tx_download.m_tx_in_flight.end()) { + // Skip any further work if this is a spurious NOTFOUND + // message. + continue; + } + state->m_tx_download.m_tx_in_flight.erase(in_flight_it); + state->m_tx_download.m_tx_announced.erase(txid); + } + } + } return true; } @@ -4396,23 +4597,68 @@ // // Message: getdata (transactions) // - while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow) { - const CInv &inv = (*pto->mapAskFor.begin()).second; + + // For robustness, expire old requests after a long timeout, so that we can + // resume downloading transactions from a peer even if they were + // unresponsive in the past. Eventually we should consider disconnecting + // peers, but this is conservative. + if (state.m_tx_download.m_check_expiry_timer <= nNow) { + for (auto it = state.m_tx_download.m_tx_in_flight.begin(); + it != state.m_tx_download.m_tx_in_flight.end();) { + if (it->second <= nNow - TX_EXPIRY_INTERVAL) { + LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", + it->first.ToString(), pto->GetId()); + state.m_tx_download.m_tx_announced.erase(it->first); + state.m_tx_download.m_tx_in_flight.erase(it++); + } else { + ++it; + } + } + // On average, we do this check every TX_EXPIRY_INTERVAL. Randomize + // so that we're not doing this for all peers at the same time. + state.m_tx_download.m_check_expiry_timer = + nNow + TX_EXPIRY_INTERVAL / 2 + GetRand(TX_EXPIRY_INTERVAL); + } + + auto &tx_process_time = state.m_tx_download.m_tx_process_time; + while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && + state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { + const TxId txid = tx_process_time.begin()->second; + // Erase this entry from tx_process_time (it may be added back for + // processing at a later time, see below) + tx_process_time.erase(tx_process_time.begin()); + CInv inv(MSG_TX, txid); if (!AlreadyHave(inv)) { - LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), - pto->GetId()); - vGetData.push_back(inv); - if (vGetData.size() >= 1000) { - connman->PushMessage( - pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); - vGetData.clear(); + // If this transaction was last requested more than 1 minute ago, + // then request. + int64_t last_request_time = GetTxRequestTime(txid); + if (last_request_time <= nNow - GETDATA_TX_INTERVAL) { + LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), + pto->GetId()); + vGetData.push_back(inv); + if (vGetData.size() >= MAX_GETDATA_SZ) { + connman->PushMessage( + pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + vGetData.clear(); + } + UpdateTxRequestTime(txid, nNow); + state.m_tx_download.m_tx_in_flight.emplace(txid, nNow); + } else { + // This transaction is in flight from someone else; queue + // up processing to happen after the download times out + // (with a slight delay for inbound peers, to prefer + // requests to outbound peers). + int64_t next_process_time = CalculateTxGetDataTime( + txid, nNow, !state.fPreferredDownload); + tx_process_time.emplace(next_process_time, txid); } } else { - // If we're not going to ask, don't expect a response. - pto->setAskFor.erase(inv.hash); + // We have already seen this transaction, no need to download. + state.m_tx_download.m_tx_announced.erase(txid); + state.m_tx_download.m_tx_in_flight.erase(txid); } - pto->mapAskFor.erase(pto->mapAskFor.begin()); } + if (!vGetData.empty()) { connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); } diff --git a/src/test/util_tests.cpp b/src/test/util_tests.cpp --- a/src/test/util_tests.cpp +++ b/src/test/util_tests.cpp @@ -154,10 +154,6 @@ BOOST_CHECK_EQUAL(FormatISO8601Date(1317425777), "2011-09-30"); } -BOOST_AUTO_TEST_CASE(util_FormatISO8601Time) { - BOOST_CHECK_EQUAL(FormatISO8601Time(1317425777), "23:36:17Z"); -} - struct TestArgsManager : public ArgsManager { TestArgsManager() { m_network_only_args.clear(); } std::map> &GetOverrideArgs() { diff --git a/src/util/time.h b/src/util/time.h --- a/src/util/time.h +++ b/src/util/time.h @@ -34,6 +34,5 @@ */ std::string FormatISO8601DateTime(int64_t nTime); std::string FormatISO8601Date(int64_t nTime); -std::string FormatISO8601Time(int64_t nTime); #endif // BITCOIN_UTIL_TIME_H diff --git a/src/util/time.cpp b/src/util/time.cpp --- a/src/util/time.cpp +++ b/src/util/time.cpp @@ -87,14 +87,3 @@ return strprintf("%04i-%02i-%02i", ts.tm_year + 1900, ts.tm_mon + 1, ts.tm_mday); } - -std::string FormatISO8601Time(int64_t nTime) { - struct tm ts; - time_t time_val = nTime; -#ifdef _MSC_VER - gmtime_s(&ts, &time_val); -#else - gmtime_r(&time_val, &ts); -#endif - return strprintf("%02i:%02i:%02iZ", ts.tm_hour, ts.tm_min, ts.tm_sec); -}