Changeset View
Changeset View
Standalone View
Standalone View
src/net_processing.cpp
Show First 20 Lines • Show All 96 Lines • ▼ Show 20 Lines | |||||
static constexpr std::chrono::minutes PING_INTERVAL{2}; | static constexpr std::chrono::minutes PING_INTERVAL{2}; | ||||
/** The maximum number of entries in a locator */ | /** The maximum number of entries in a locator */ | ||||
static const unsigned int MAX_LOCATOR_SZ = 101; | static const unsigned int MAX_LOCATOR_SZ = 101; | ||||
/** The maximum number of entries in an 'inv' protocol message */ | /** The maximum number of entries in an 'inv' protocol message */ | ||||
static const unsigned int MAX_INV_SZ = 50000; | static const unsigned int MAX_INV_SZ = 50000; | ||||
static_assert(MAX_PROTOCOL_MESSAGE_LENGTH > MAX_INV_SZ * sizeof(CInv), | static_assert(MAX_PROTOCOL_MESSAGE_LENGTH > MAX_INV_SZ * sizeof(CInv), | ||||
"Max protocol message length must be greater than largest " | "Max protocol message length must be greater than largest " | ||||
"possible INV message"); | "possible INV message"); | ||||
/** Maximum number of in-flight transactions from a peer */ | /** Maximum number of in-flight transaction requests from a peer. It is not a | ||||
static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100; | * hard limit, but the threshold at which point the OVERLOADED_PEER_TX_DELAY | ||||
* kicks in. */ | |||||
static constexpr int32_t MAX_PEER_TX_REQUEST_IN_FLIGHT = 100; | |||||
/** Maximum number of announced transactions from a peer */ | /** Maximum number of announced transactions from a peer */ | ||||
static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; | static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; | ||||
/** How many microseconds to delay requesting transactions from inbound peers */ | /** How long to delay requesting transactions from non-preferred peers */ | ||||
static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{ | static constexpr auto NONPREF_PEER_TX_DELAY = std::chrono::seconds{2}; | ||||
std::chrono::seconds{2}}; | /** How long to delay requesting transactions from overloaded peers (see | ||||
* MAX_PEER_TX_REQUEST_IN_FLIGHT). */ | |||||
static constexpr auto OVERLOADED_PEER_TX_DELAY = std::chrono::seconds{2}; | |||||
/** | /** | ||||
* How long to wait (in microseconds) before downloading a transaction from an | * How long to wait (in microseconds) before downloading a transaction from an | ||||
* additional peer. | * additional peer. | ||||
*/ | */ | ||||
static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{ | static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{ | ||||
std::chrono::seconds{60}}; | std::chrono::seconds{60}}; | ||||
/** | /** | ||||
* Maximum delay (in microseconds) for transaction requests to avoid biasing | |||||
* some peers over others. | |||||
*/ | |||||
static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{ | |||||
std::chrono::seconds{2}}; | |||||
/** | |||||
* How long to wait (in microseconds) before expiring an in-flight getdata | |||||
* request to a peer. | |||||
*/ | |||||
static constexpr std::chrono::microseconds TX_EXPIRY_INTERVAL{ | |||||
GETDATA_TX_INTERVAL * 10}; | |||||
static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, | |||||
"To preserve security, MAX_GETDATA_RANDOM_DELAY should not " | |||||
"exceed INBOUND_PEER_DELAY"); | |||||
/** | |||||
* Limit to avoid sending big packets. Not used in processing incoming GETDATA | * Limit to avoid sending big packets. Not used in processing incoming GETDATA | ||||
* for compatibility. | * for compatibility. | ||||
*/ | */ | ||||
static const unsigned int MAX_GETDATA_SZ = 1000; | static const unsigned int MAX_GETDATA_SZ = 1000; | ||||
/** | /** | ||||
* Number of blocks that can be requested at any given time from a single peer. | * Number of blocks that can be requested at any given time from a single peer. | ||||
*/ | */ | ||||
static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER = 16; | static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER = 16; | ||||
▲ Show 20 Lines • Show All 300 Lines • ▼ Show 20 Lines | struct ChainSyncTimeoutState { | ||||
bool m_protect; | bool m_protect; | ||||
}; | }; | ||||
ChainSyncTimeoutState m_chain_sync; | ChainSyncTimeoutState m_chain_sync; | ||||
//! Time of last new block announcement | //! Time of last new block announcement | ||||
int64_t m_last_block_announcement; | int64_t m_last_block_announcement; | ||||
/* | |||||
* State associated with transaction download. | |||||
* | |||||
* Tx download algorithm: | |||||
* | |||||
* When inv comes in, queue up (process_time, txid) inside the peer's | |||||
* CNodeState (m_tx_process_time) as long as m_tx_announced for the peer | |||||
* isn't too big (MAX_PEER_TX_ANNOUNCEMENTS). | |||||
* | |||||
* The process_time for a transaction is set to nNow for outbound peers, | |||||
* nNow + 2 seconds for inbound peers. This is the time at which we'll | |||||
* consider trying to request the transaction from the peer in | |||||
* SendMessages(). The delay for inbound peers is to allow outbound peers | |||||
* a chance to announce before we request from inbound peers, to prevent | |||||
* an adversary from using inbound connections to blind us to a | |||||
* transaction (InvBlock). | |||||
* | |||||
* When we call SendMessages() for a given peer, | |||||
* we will loop over the transactions in m_tx_process_time, looking | |||||
* at the transactions whose process_time <= nNow. We'll request each | |||||
* such transaction that we don't have already and that hasn't been | |||||
* requested from another peer recently, up until we hit the | |||||
* MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update | |||||
* g_already_asked_for for each requested txid, storing the time of the | |||||
* GETDATA request. We use g_already_asked_for to coordinate transaction | |||||
* requests amongst our peers. | |||||
* | |||||
* For transactions that we still need but we have already recently | |||||
* requested from some other peer, we'll reinsert (process_time, txid) | |||||
* back into the peer's m_tx_process_time at the point in the future at | |||||
* which the most recent GETDATA request would time out (ie | |||||
* GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for). | |||||
* We add an additional delay for inbound peers, again to prefer | |||||
* attempting download from outbound peers first. | |||||
* We also add an extra small random delay up to 2 seconds | |||||
* to avoid biasing some peers over others. (e.g., due to fixed ordering | |||||
* of peer processing in ThreadMessageHandler). | |||||
* | |||||
* When we receive a transaction from a peer, we remove the txid from the | |||||
* peer's m_tx_in_flight set and from their recently announced set | |||||
* (m_tx_announced). We also clear g_already_asked_for for that entry, so | |||||
* that if somehow the transaction is not accepted but also not added to | |||||
* the reject filter, then we will eventually redownload from other | |||||
* peers. | |||||
*/ | |||||
struct TxDownloadState { | |||||
/** | |||||
* Track when to attempt download of announced transactions (process | |||||
* time in micros -> txid) | |||||
*/ | |||||
std::multimap<std::chrono::microseconds, TxId> m_tx_process_time; | |||||
//! Store all the transactions a peer has recently announced | |||||
std::set<TxId> m_tx_announced; | |||||
//! Store transactions which were requested by us, with timestamp | |||||
std::map<TxId, std::chrono::microseconds> m_tx_in_flight; | |||||
//! Periodically check for stuck getdata requests | |||||
std::chrono::microseconds m_check_expiry_timer{0}; | |||||
}; | |||||
TxDownloadState m_tx_download; | |||||
struct AvalancheState { | struct AvalancheState { | ||||
std::chrono::time_point<std::chrono::steady_clock> last_poll; | std::chrono::time_point<std::chrono::steady_clock> last_poll; | ||||
}; | }; | ||||
AvalancheState m_avalanche_state; | AvalancheState m_avalanche_state; | ||||
//! Whether this peer is an inbound connection | //! Whether this peer is an inbound connection | ||||
bool m_is_inbound; | bool m_is_inbound; | ||||
Show All 26 Lines | CNodeState(CAddress addrIn, bool is_inbound, bool is_manual) | ||||
fProvidesHeaderAndIDs = false; | fProvidesHeaderAndIDs = false; | ||||
fSupportsDesiredCmpctVersion = false; | fSupportsDesiredCmpctVersion = false; | ||||
m_chain_sync = {0, nullptr, false, false}; | m_chain_sync = {0, nullptr, false, false}; | ||||
m_last_block_announcement = 0; | m_last_block_announcement = 0; | ||||
m_recently_announced_invs.reset(); | m_recently_announced_invs.reset(); | ||||
} | } | ||||
}; | }; | ||||
// Keeps track of the time (in microseconds) when transactions were requested | |||||
// last time | |||||
limitedmap<TxId, std::chrono::microseconds> | |||||
g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ); | |||||
/** Map maintaining per-node state. */ | /** Map maintaining per-node state. */ | ||||
static std::map<NodeId, CNodeState> mapNodeState GUARDED_BY(cs_main); | static std::map<NodeId, CNodeState> mapNodeState GUARDED_BY(cs_main); | ||||
static CNodeState *State(NodeId pnode) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { | static CNodeState *State(NodeId pnode) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { | ||||
std::map<NodeId, CNodeState>::iterator it = mapNodeState.find(pnode); | std::map<NodeId, CNodeState>::iterator it = mapNodeState.find(pnode); | ||||
if (it == mapNodeState.end()) { | if (it == mapNodeState.end()) { | ||||
return nullptr; | return nullptr; | ||||
} | } | ||||
▲ Show 20 Lines • Show All 415 Lines • ▼ Show 20 Lines | while (pindexWalk->nHeight < nMaxHeight) { | ||||
} else if (waitingfor == -1) { | } else if (waitingfor == -1) { | ||||
// This is the first already-in-flight block. | // This is the first already-in-flight block. | ||||
waitingfor = mapBlocksInFlight[pindex->GetBlockHash()].first; | waitingfor = mapBlocksInFlight[pindex->GetBlockHash()].first; | ||||
} | } | ||||
} | } | ||||
} | } | ||||
} | } | ||||
void EraseTxRequest(const TxId &txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { | } // namespace | ||||
g_already_asked_for.erase(txid); | |||||
} | |||||
std::chrono::microseconds GetTxRequestTime(const TxId &txid) | |||||
EXCLUSIVE_LOCKS_REQUIRED(cs_main) { | |||||
auto it = g_already_asked_for.find(txid); | |||||
if (it != g_already_asked_for.end()) { | |||||
return it->second; | |||||
} | |||||
return {}; | |||||
} | |||||
void UpdateTxRequestTime(const TxId &txid, | |||||
std::chrono::microseconds request_time) | |||||
EXCLUSIVE_LOCKS_REQUIRED(cs_main) { | |||||
auto it = g_already_asked_for.find(txid); | |||||
if (it == g_already_asked_for.end()) { | |||||
g_already_asked_for.insert(std::make_pair(txid, request_time)); | |||||
} else { | |||||
g_already_asked_for.update(it, request_time); | |||||
} | |||||
} | |||||
std::chrono::microseconds | |||||
CalculateTxGetDataTime(const TxId &txid, std::chrono::microseconds current_time, | |||||
bool use_inbound_delay) | |||||
EXCLUSIVE_LOCKS_REQUIRED(cs_main) { | |||||
std::chrono::microseconds process_time; | |||||
const auto last_request_time = GetTxRequestTime(txid); | |||||
// First time requesting this tx | |||||
if (last_request_time.count() == 0) { | |||||
process_time = current_time; | |||||
} else { | |||||
// Randomize the delay to avoid biasing some peers over others (such as | |||||
// due to fixed ordering of peer processing in ThreadMessageHandler) | |||||
process_time = last_request_time + GETDATA_TX_INTERVAL + | |||||
GetRandMicros(MAX_GETDATA_RANDOM_DELAY); | |||||
} | |||||
// We delay processing announcements from inbound peers | void PeerManager::AddTxAnnouncement(const CNode &node, const TxId >xid, | ||||
if (use_inbound_delay) { | std::chrono::microseconds current_time) { | ||||
process_time += INBOUND_PEER_TX_DELAY; | AssertLockHeld(::cs_main); // For m_txrequest | ||||
NodeId nodeid = node.GetId(); | |||||
if (m_txrequest.Count(nodeid) >= MAX_PEER_TX_ANNOUNCEMENTS) { | |||||
// Too many queued announcements from this peer | |||||
return; | |||||
} | |||||
const CNodeState *state = State(nodeid); | |||||
// Decide the TxRequestTracker parameters for this announcement: | |||||
// - "preferred": if fPreferredDownload is set (= outbound, or PF_NOBAN | |||||
// permission) | |||||
// - "reqtime": current time plus delays for: | |||||
// - NONPREF_PEER_TX_DELAY for announcements from non-preferred | |||||
// connections | |||||
// - TXID_RELAY_DELAY for announcements from txid peers while wtxid peers | |||||
// are available | |||||
// - OVERLOADED_PEER_TX_DELAY for announcements from peers which have at | |||||
// least | |||||
// MAX_PEER_TX_REQUEST_IN_FLIGHT requests in flight. | |||||
auto delay = std::chrono::microseconds{0}; | |||||
const bool preferred = state->fPreferredDownload; | |||||
if (!preferred) { | |||||
delay += NONPREF_PEER_TX_DELAY; | |||||
} | |||||
const bool overloaded = | |||||
m_txrequest.CountInFlight(nodeid) >= MAX_PEER_TX_REQUEST_IN_FLIGHT; | |||||
if (overloaded) { | |||||
delay += OVERLOADED_PEER_TX_DELAY; | |||||
} | } | ||||
m_txrequest.ReceivedInv(nodeid, gtxid, preferred, current_time + delay); | |||||
return process_time; | |||||
} | } | ||||
void RequestTx(CNodeState *state, const TxId &txid, | |||||
std::chrono::microseconds current_time) | |||||
EXCLUSIVE_LOCKS_REQUIRED(cs_main) { | |||||
CNodeState::TxDownloadState &peer_download_state = state->m_tx_download; | |||||
if (peer_download_state.m_tx_announced.size() >= | |||||
MAX_PEER_TX_ANNOUNCEMENTS || | |||||
peer_download_state.m_tx_process_time.size() >= | |||||
MAX_PEER_TX_ANNOUNCEMENTS || | |||||
peer_download_state.m_tx_announced.count(txid)) { | |||||
// Too many queued announcements from this peer, or we already have | |||||
// this announcement | |||||
return; | |||||
} | |||||
peer_download_state.m_tx_announced.insert(txid); | |||||
// Calculate the time to try requesting this transaction. Use | |||||
// fPreferredDownload as a proxy for outbound peers. | |||||
const auto process_time = | |||||
CalculateTxGetDataTime(txid, current_time, !state->fPreferredDownload); | |||||
peer_download_state.m_tx_process_time.emplace(process_time, txid); | |||||
} | |||||
} // namespace | |||||
// This function is used for testing the stale tip eviction logic, see | // This function is used for testing the stale tip eviction logic, see | ||||
// denialofservice_tests.cpp | // denialofservice_tests.cpp | ||||
void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) { | void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) { | ||||
LOCK(cs_main); | LOCK(cs_main); | ||||
CNodeState *state = State(node); | CNodeState *state = State(node); | ||||
if (state) { | if (state) { | ||||
state->m_last_block_announcement = time_in_seconds; | state->m_last_block_announcement = time_in_seconds; | ||||
} | } | ||||
} | } | ||||
void PeerManager::InitializeNode(const Config &config, CNode *pnode) { | void PeerManager::InitializeNode(const Config &config, CNode *pnode) { | ||||
CAddress addr = pnode->addr; | CAddress addr = pnode->addr; | ||||
std::string addrName = pnode->GetAddrName(); | std::string addrName = pnode->GetAddrName(); | ||||
NodeId nodeid = pnode->GetId(); | NodeId nodeid = pnode->GetId(); | ||||
{ | { | ||||
LOCK(cs_main); | LOCK(cs_main); | ||||
mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, | mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, | ||||
std::forward_as_tuple(nodeid), | std::forward_as_tuple(nodeid), | ||||
std::forward_as_tuple(addr, | std::forward_as_tuple(addr, | ||||
pnode->IsInboundConn(), | pnode->IsInboundConn(), | ||||
pnode->IsManualConn())); | pnode->IsManualConn())); | ||||
assert(m_txrequest.Count(nodeid) == 0); | |||||
} | } | ||||
{ | { | ||||
PeerRef peer = std::make_shared<Peer>(nodeid); | PeerRef peer = std::make_shared<Peer>(nodeid); | ||||
LOCK(g_peer_mutex); | LOCK(g_peer_mutex); | ||||
g_peer_map.emplace_hint(g_peer_map.end(), nodeid, std::move(peer)); | g_peer_map.emplace_hint(g_peer_map.end(), nodeid, std::move(peer)); | ||||
} | } | ||||
if (!pnode->IsInboundConn()) { | if (!pnode->IsInboundConn()) { | ||||
PushNodeVersion(config, *pnode, m_connman, GetTime()); | PushNodeVersion(config, *pnode, m_connman, GetTime()); | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | void PeerManager::FinalizeNode(const Config &config, NodeId nodeid, | ||||
if (misbehavior == 0 && state->fCurrentlyConnected) { | if (misbehavior == 0 && state->fCurrentlyConnected) { | ||||
fUpdateConnectionTime = true; | fUpdateConnectionTime = true; | ||||
} | } | ||||
for (const QueuedBlock &entry : state->vBlocksInFlight) { | for (const QueuedBlock &entry : state->vBlocksInFlight) { | ||||
mapBlocksInFlight.erase(entry.hash); | mapBlocksInFlight.erase(entry.hash); | ||||
} | } | ||||
EraseOrphansFor(nodeid); | EraseOrphansFor(nodeid); | ||||
m_txrequest.DisconnectedPeer(nodeid); | |||||
nPreferredDownload -= state->fPreferredDownload; | nPreferredDownload -= state->fPreferredDownload; | ||||
nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0); | nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0); | ||||
assert(nPeersWithValidatedDownloads >= 0); | assert(nPeersWithValidatedDownloads >= 0); | ||||
g_outbound_peers_with_protect_from_disconnect -= | g_outbound_peers_with_protect_from_disconnect -= | ||||
state->m_chain_sync.m_protect; | state->m_chain_sync.m_protect; | ||||
assert(g_outbound_peers_with_protect_from_disconnect >= 0); | assert(g_outbound_peers_with_protect_from_disconnect >= 0); | ||||
mapNodeState.erase(nodeid); | mapNodeState.erase(nodeid); | ||||
if (mapNodeState.empty()) { | if (mapNodeState.empty()) { | ||||
// Do a consistency check after the last peer is removed. | // Do a consistency check after the last peer is removed. | ||||
assert(mapBlocksInFlight.empty()); | assert(mapBlocksInFlight.empty()); | ||||
assert(nPreferredDownload == 0); | assert(nPreferredDownload == 0); | ||||
assert(nPeersWithValidatedDownloads == 0); | assert(nPeersWithValidatedDownloads == 0); | ||||
assert(g_outbound_peers_with_protect_from_disconnect == 0); | assert(g_outbound_peers_with_protect_from_disconnect == 0); | ||||
assert(m_txrequest.Size() == 0); | |||||
} | } | ||||
LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid); | LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid); | ||||
} | } | ||||
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { | bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { | ||||
{ | { | ||||
LOCK(cs_main); | LOCK(cs_main); | ||||
CNodeState *state = State(nodeid); | CNodeState *state = State(nodeid); | ||||
▲ Show 20 Lines • Show All 1,963 Lines • ▼ Show 20 Lines | if (msg_type == NetMsgType::INV) { | ||||
LogPrint(BCLog::NET, | LogPrint(BCLog::NET, | ||||
"transaction (%s) inv sent in violation of " | "transaction (%s) inv sent in violation of " | ||||
"protocol, disconnecting peer=%d\n", | "protocol, disconnecting peer=%d\n", | ||||
txid.ToString(), pfrom.GetId()); | txid.ToString(), pfrom.GetId()); | ||||
pfrom.fDisconnect = true; | pfrom.fDisconnect = true; | ||||
return; | return; | ||||
} else if (!fAlreadyHave && !m_chainman.ActiveChainstate() | } else if (!fAlreadyHave && !m_chainman.ActiveChainstate() | ||||
.IsInitialBlockDownload()) { | .IsInitialBlockDownload()) { | ||||
RequestTx(State(pfrom.GetId()), txid, current_time); | AddTxAnnouncement(pfrom, txid, current_time); | ||||
} | } | ||||
} else { | } else { | ||||
LogPrint(BCLog::NET, | LogPrint(BCLog::NET, | ||||
"Unknown inv type \"%s\" received from peer=%d\n", | "Unknown inv type \"%s\" received from peer=%d\n", | ||||
inv.ToString(), pfrom.GetId()); | inv.ToString(), pfrom.GetId()); | ||||
} | } | ||||
} | } | ||||
▲ Show 20 Lines • Show All 273 Lines • ▼ Show 20 Lines | if (msg_type == NetMsgType::TX) { | ||||
const CTransaction &tx = *ptx; | const CTransaction &tx = *ptx; | ||||
const TxId &txid = tx.GetId(); | const TxId &txid = tx.GetId(); | ||||
pfrom.AddKnownTx(txid); | pfrom.AddKnownTx(txid); | ||||
LOCK2(cs_main, g_cs_orphans); | LOCK2(cs_main, g_cs_orphans); | ||||
TxValidationState state; | TxValidationState state; | ||||
CNodeState *nodestate = State(pfrom.GetId()); | m_txrequest.ReceivedResponse(pfrom.GetId(), txid); | ||||
nodestate->m_tx_download.m_tx_announced.erase(txid); | |||||
nodestate->m_tx_download.m_tx_in_flight.erase(txid); | |||||
EraseTxRequest(txid); | |||||
if (!AlreadyHaveTx(txid, m_mempool) && | if (!AlreadyHaveTx(txid, m_mempool) && | ||||
AcceptToMemoryPool(config, m_mempool, state, ptx, | AcceptToMemoryPool(config, m_mempool, state, ptx, | ||||
false /* bypass_limits */, | false /* bypass_limits */, | ||||
Amount::zero() /* nAbsurdFee */)) { | Amount::zero() /* nAbsurdFee */)) { | ||||
m_mempool.check(&::ChainstateActive().CoinsTip()); | m_mempool.check(&::ChainstateActive().CoinsTip()); | ||||
RelayTransaction(tx.GetId(), m_connman); | RelayTransaction(tx.GetId(), m_connman); | ||||
for (size_t i = 0; i < tx.vout.size(); i++) { | for (size_t i = 0; i < tx.vout.size(); i++) { | ||||
Show All 30 Lines | if (msg_type == NetMsgType::TX) { | ||||
if (!fRejectedParents) { | if (!fRejectedParents) { | ||||
const auto current_time = GetTime<std::chrono::microseconds>(); | const auto current_time = GetTime<std::chrono::microseconds>(); | ||||
for (const CTxIn &txin : tx.vin) { | for (const CTxIn &txin : tx.vin) { | ||||
// FIXME: MSG_TX should use a TxHash, not a TxId. | // FIXME: MSG_TX should use a TxHash, not a TxId. | ||||
const TxId _txid = txin.prevout.GetTxId(); | const TxId _txid = txin.prevout.GetTxId(); | ||||
pfrom.AddKnownTx(_txid); | pfrom.AddKnownTx(_txid); | ||||
if (!AlreadyHaveTx(_txid, m_mempool)) { | if (!AlreadyHaveTx(_txid, m_mempool)) { | ||||
RequestTx(State(pfrom.GetId()), _txid, current_time); | AddTxAnnouncement(pfrom, _txid, current_time); | ||||
} | } | ||||
} | } | ||||
AddOrphanTx(ptx, pfrom.GetId()); | AddOrphanTx(ptx, pfrom.GetId()); | ||||
// DoS prevention: do not allow mapOrphanTransactions to grow | // DoS prevention: do not allow mapOrphanTransactions to grow | ||||
// unbounded (see CVE-2012-3789) | // unbounded (see CVE-2012-3789) | ||||
unsigned int nMaxOrphanTx = (unsigned int)std::max( | unsigned int nMaxOrphanTx = (unsigned int)std::max( | ||||
int64_t(0), gArgs.GetArg("-maxorphantx", | int64_t(0), gArgs.GetArg("-maxorphantx", | ||||
▲ Show 20 Lines • Show All 943 Lines • ▼ Show 20 Lines | |||||
} | } | ||||
if (msg_type == NetMsgType::GETCFCHECKPT) { | if (msg_type == NetMsgType::GETCFCHECKPT) { | ||||
ProcessGetCFCheckPt(pfrom, vRecv, m_chainparams, m_connman); | ProcessGetCFCheckPt(pfrom, vRecv, m_chainparams, m_connman); | ||||
return; | return; | ||||
} | } | ||||
if (msg_type == NetMsgType::NOTFOUND) { | if (msg_type == NetMsgType::NOTFOUND) { | ||||
// Remove the NOTFOUND transactions from the peer | |||||
LOCK(cs_main); | |||||
CNodeState *state = State(pfrom.GetId()); | |||||
std::vector<CInv> vInv; | std::vector<CInv> vInv; | ||||
vRecv >> vInv; | vRecv >> vInv; | ||||
if (vInv.size() <= | if (vInv.size() <= | ||||
MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { | MAX_PEER_TX_ANNOUNCEMENTS + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { | ||||
LOCK(::cs_main); | |||||
for (CInv &inv : vInv) { | for (CInv &inv : vInv) { | ||||
if (inv.IsMsgTx()) { | if (inv.IsMsgTx()) { | ||||
const TxId txid(inv.hash); | // If we receive a NOTFOUND message for a tx we requested, | ||||
// If we receive a NOTFOUND message for a txid we requested, | // mark the announcement for it as completed in | ||||
// erase it from our data structures for this peer. | // TxRequestTracker. | ||||
auto in_flight_it = | m_txrequest.ReceivedResponse(pfrom.GetId(), inv.hash); | ||||
state->m_tx_download.m_tx_in_flight.find(txid); | |||||
if (in_flight_it == | |||||
state->m_tx_download.m_tx_in_flight.end()) { | |||||
// Skip any further work if this is a spurious NOTFOUND | |||||
// message. | |||||
continue; | |||||
} | |||||
state->m_tx_download.m_tx_in_flight.erase(in_flight_it); | |||||
state->m_tx_download.m_tx_announced.erase(txid); | |||||
} | } | ||||
} | } | ||||
} | } | ||||
return; | return; | ||||
} | } | ||||
// Ignore unknown commands for extensibility | // Ignore unknown commands for extensibility | ||||
LogPrint(BCLog::NET, "Unknown command \"%s\" from peer=%d\n", | LogPrint(BCLog::NET, "Unknown command \"%s\" from peer=%d\n", | ||||
▲ Show 20 Lines • Show All 999 Lines • ▼ Show 20 Lines | if (pingSend) { | ||||
LogPrint(BCLog::NET, "Stall started peer=%d\n", staller); | LogPrint(BCLog::NET, "Stall started peer=%d\n", staller); | ||||
} | } | ||||
} | } | ||||
} | } | ||||
// | // | ||||
// Message: getdata (transactions) | // Message: getdata (transactions) | ||||
// | // | ||||
for (const TxId >xid : | |||||
// For robustness, expire old requests after a long timeout, so that we | m_txrequest.GetRequestable(pto->GetId(), current_time)) { | ||||
// can resume downloading transactions from a peer even if they were | if (!AlreadyHaveTx(gtxid, m_mempool)) { | ||||
// unresponsive in the past. Eventually we should consider disconnecting | LogPrint(BCLog::NET, "Requesting tx %s peer=%d\n", | ||||
// peers, but this is conservative. | gtxid.ToString(), pto->GetId()); | ||||
if (state.m_tx_download.m_check_expiry_timer <= current_time) { | vGetData.emplace_back(CInv(MSG_TX, gtxid)); | ||||
for (auto it = state.m_tx_download.m_tx_in_flight.begin(); | |||||
it != state.m_tx_download.m_tx_in_flight.end();) { | |||||
if (it->second <= current_time - TX_EXPIRY_INTERVAL) { | |||||
LogPrint(BCLog::NET, | |||||
"timeout of inflight tx %s from peer=%d\n", | |||||
it->first.ToString(), pto->GetId()); | |||||
state.m_tx_download.m_tx_announced.erase(it->first); | |||||
state.m_tx_download.m_tx_in_flight.erase(it++); | |||||
} else { | |||||
++it; | |||||
} | |||||
} | |||||
// On average, we do this check every TX_EXPIRY_INTERVAL. Randomize | |||||
// so that we're not doing this for all peers at the same time. | |||||
state.m_tx_download.m_check_expiry_timer = | |||||
current_time + TX_EXPIRY_INTERVAL / 2 + | |||||
GetRandMicros(TX_EXPIRY_INTERVAL); | |||||
} | |||||
auto &tx_process_time = state.m_tx_download.m_tx_process_time; | |||||
while (!tx_process_time.empty() && | |||||
tx_process_time.begin()->first <= current_time && | |||||
state.m_tx_download.m_tx_in_flight.size() < | |||||
MAX_PEER_TX_IN_FLIGHT) { | |||||
const TxId txid = tx_process_time.begin()->second; | |||||
// Erase this entry from tx_process_time (it may be added back for | |||||
// processing at a later time, see below) | |||||
tx_process_time.erase(tx_process_time.begin()); | |||||
CInv inv(MSG_TX, txid); | |||||
if (!AlreadyHaveTx(txid, m_mempool)) { | |||||
// If this transaction was last requested more than 1 minute | |||||
// ago, then request. | |||||
const auto last_request_time = GetTxRequestTime(txid); | |||||
if (last_request_time <= current_time - GETDATA_TX_INTERVAL) { | |||||
LogPrint(BCLog::NET, "Requesting %s peer=%d\n", | |||||
inv.ToString(), pto->GetId()); | |||||
vGetData.push_back(inv); | |||||
if (vGetData.size() >= MAX_GETDATA_SZ) { | if (vGetData.size() >= MAX_GETDATA_SZ) { | ||||
m_connman.PushMessage( | m_connman.PushMessage( | ||||
pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); | pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); | ||||
vGetData.clear(); | vGetData.clear(); | ||||
} | } | ||||
UpdateTxRequestTime(txid, current_time); | m_txrequest.RequestedTx(pto->GetId(), gtxid, | ||||
state.m_tx_download.m_tx_in_flight.emplace(txid, | current_time + GETDATA_TX_INTERVAL); | ||||
current_time); | |||||
} else { | |||||
// This transaction is in flight from someone else; queue | |||||
// up processing to happen after the download times out | |||||
// (with a slight delay for inbound peers, to prefer | |||||
// requests to outbound peers). | |||||
const auto next_process_time = CalculateTxGetDataTime( | |||||
txid, current_time, !state.fPreferredDownload); | |||||
tx_process_time.emplace(next_process_time, txid); | |||||
} | |||||
} else { | } else { | ||||
// We have already seen this transaction, no need to download. | // We have already seen this transaction, no need to download. | ||||
state.m_tx_download.m_tx_announced.erase(txid); | m_txrequest.ForgetTxHash(gtxid); | ||||
state.m_tx_download.m_tx_in_flight.erase(txid); | |||||
} | } | ||||
} | } | ||||
if (!vGetData.empty()) { | if (!vGetData.empty()) { | ||||
m_connman.PushMessage(pto, | m_connman.PushMessage(pto, | ||||
msgMaker.Make(NetMsgType::GETDATA, vGetData)); | msgMaker.Make(NetMsgType::GETDATA, vGetData)); | ||||
} | } | ||||
▲ Show 20 Lines • Show All 72 Lines • Show Last 20 Lines |