Changeset View
Changeset View
Standalone View
Standalone View
src/net_processing.cpp
Show First 20 Lines • Show All 4,799 Lines • ▼ Show 20 Lines | if (pingSend) { | ||||
} else { | } else { | ||||
// Peer is too old to support ping command with nonce, pong will | // Peer is too old to support ping command with nonce, pong will | ||||
// never arrive. | // never arrive. | ||||
pto->nPingNonceSent = 0; | pto->nPingNonceSent = 0; | ||||
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::PING)); | m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::PING)); | ||||
} | } | ||||
} | } | ||||
auto current_time = GetTime<std::chrono::microseconds>(); | |||||
bool fFetch; | |||||
{ | { | ||||
LOCK(cs_main); | LOCK(cs_main); | ||||
CNodeState &state = *State(pto->GetId()); | CNodeState &state = *State(pto->GetId()); | ||||
// Address refresh broadcast | // Address refresh broadcast | ||||
auto current_time = GetTime<std::chrono::microseconds>(); | |||||
if (pto->IsAddrRelayPeer() && | if (pto->IsAddrRelayPeer() && | ||||
!::ChainstateActive().IsInitialBlockDownload() && | !::ChainstateActive().IsInitialBlockDownload() && | ||||
pto->m_next_local_addr_send < current_time) { | pto->m_next_local_addr_send < current_time) { | ||||
AdvertiseLocal(pto); | AdvertiseLocal(pto); | ||||
pto->m_next_local_addr_send = PoissonNextSend( | pto->m_next_local_addr_send = PoissonNextSend( | ||||
current_time, AVG_LOCAL_ADDRESS_BROADCAST_INTERVAL); | current_time, AVG_LOCAL_ADDRESS_BROADCAST_INTERVAL); | ||||
} | } | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | bool fFetch; | ||||
// Start block sync | // Start block sync | ||||
if (pindexBestHeader == nullptr) { | if (pindexBestHeader == nullptr) { | ||||
pindexBestHeader = ::ChainActive().Tip(); | pindexBestHeader = ::ChainActive().Tip(); | ||||
} | } | ||||
// Download if this is a nice peer, or we have no nice peers and this | // Download if this is a nice peer, or we have no nice peers and this | ||||
// one might do. | // one might do. | ||||
bool fFetch = state.fPreferredDownload || | fFetch = state.fPreferredDownload || | ||||
(nPreferredDownload == 0 && !pto->fClient && | (nPreferredDownload == 0 && !pto->fClient && | ||||
!pto->IsAddrFetchConn()); | !pto->IsAddrFetchConn()); | ||||
if (!state.fSyncStarted && !pto->fClient && !fImporting && !fReindex) { | if (!state.fSyncStarted && !pto->fClient && !fImporting && !fReindex) { | ||||
// Only actively request headers from a single peer, unless we're | // Only actively request headers from a single peer, unless we're | ||||
// close to today. | // close to today. | ||||
if ((nSyncStarted == 0 && fFetch) || | if ((nSyncStarted == 0 && fFetch) || | ||||
pindexBestHeader->GetBlockTime() > | pindexBestHeader->GetBlockTime() > | ||||
GetAdjustedTime() - 24 * 60 * 60) { | GetAdjustedTime() - 24 * 60 * 60) { | ||||
state.fSyncStarted = true; | state.fSyncStarted = true; | ||||
▲ Show 20 Lines • Show All 185 Lines • ▼ Show 20 Lines | bool fFetch; | ||||
LogPrint(BCLog::NET, | LogPrint(BCLog::NET, | ||||
"%s: sending inv peer=%d hash=%s\n", __func__, | "%s: sending inv peer=%d hash=%s\n", __func__, | ||||
pto->GetId(), hashToAnnounce.ToString()); | pto->GetId(), hashToAnnounce.ToString()); | ||||
} | } | ||||
} | } | ||||
} | } | ||||
pto->vBlockHashesToAnnounce.clear(); | pto->vBlockHashesToAnnounce.clear(); | ||||
} | } | ||||
} // release cs_main | |||||
// | // | ||||
// Message: inventory | // Message: inventory | ||||
// | // | ||||
std::vector<CInv> vInv; | std::vector<CInv> vInv; | ||||
{ | |||||
auto addInvAndMaybeFlush = [&](uint32_t type, const uint256 &hash) { | auto addInvAndMaybeFlush = [&](uint32_t type, const uint256 &hash) { | ||||
vInv.emplace_back(type, hash); | vInv.emplace_back(type, hash); | ||||
if (vInv.size() == MAX_INV_SZ) { | if (vInv.size() == MAX_INV_SZ) { | ||||
m_connman.PushMessage( | m_connman.PushMessage( | ||||
pto, msgMaker.Make(NetMsgType::INV, std::move(vInv))); | pto, msgMaker.Make(NetMsgType::INV, std::move(vInv))); | ||||
vInv.clear(); | vInv.clear(); | ||||
} | } | ||||
}; | }; | ||||
LOCK(pto->cs_inventory); | { | ||||
LOCK2(cs_main, pto->cs_inventory); | |||||
vInv.reserve(std::max<size_t>(pto->vInventoryBlockToSend.size(), | vInv.reserve(std::max<size_t>(pto->vInventoryBlockToSend.size(), | ||||
INVENTORY_BROADCAST_MAX_PER_MB * | INVENTORY_BROADCAST_MAX_PER_MB * | ||||
config.GetMaxBlockSize() / | config.GetMaxBlockSize() / 1000000)); | ||||
1000000)); | |||||
// Add blocks | // Add blocks | ||||
for (const BlockHash &hash : pto->vInventoryBlockToSend) { | for (const BlockHash &hash : pto->vInventoryBlockToSend) { | ||||
addInvAndMaybeFlush(MSG_BLOCK, hash); | addInvAndMaybeFlush(MSG_BLOCK, hash); | ||||
} | } | ||||
pto->vInventoryBlockToSend.clear(); | pto->vInventoryBlockToSend.clear(); | ||||
if (pto->m_tx_relay != nullptr) { | if (pto->m_tx_relay != nullptr) { | ||||
LOCK(pto->m_tx_relay->cs_tx_inventory); | LOCK(pto->m_tx_relay->cs_tx_inventory); | ||||
// Check whether periodic sends should happen | // Check whether periodic sends should happen | ||||
bool fSendTrickle = pto->HasPermission(PF_NOBAN); | bool fSendTrickle = pto->HasPermission(PF_NOBAN); | ||||
if (pto->m_tx_relay->nNextInvSend < current_time) { | if (pto->m_tx_relay->nNextInvSend < current_time) { | ||||
fSendTrickle = true; | fSendTrickle = true; | ||||
if (pto->IsInboundConn()) { | if (pto->IsInboundConn()) { | ||||
pto->m_tx_relay->nNextInvSend = | pto->m_tx_relay->nNextInvSend = std::chrono::microseconds{ | ||||
std::chrono::microseconds{ | |||||
m_connman.PoissonNextSendInbound( | m_connman.PoissonNextSendInbound( | ||||
count_microseconds(current_time), | count_microseconds(current_time), | ||||
INVENTORY_BROADCAST_INTERVAL)}; | INVENTORY_BROADCAST_INTERVAL)}; | ||||
} else { | } else { | ||||
// Skip delay for outbound peers, as there is less | // Skip delay for outbound peers, as there is less | ||||
// privacy concern for them. | // privacy concern for them. | ||||
pto->m_tx_relay->nNextInvSend = current_time; | pto->m_tx_relay->nNextInvSend = current_time; | ||||
} | } | ||||
} | } | ||||
// Time to send but the peer has requested we not relay | // Time to send but the peer has requested we not relay | ||||
// transactions. | // transactions. | ||||
if (fSendTrickle) { | if (fSendTrickle) { | ||||
LOCK(pto->m_tx_relay->cs_filter); | LOCK(pto->m_tx_relay->cs_filter); | ||||
if (!pto->m_tx_relay->fRelayTxes) { | if (!pto->m_tx_relay->fRelayTxes) { | ||||
pto->m_tx_relay->setInventoryTxToSend.clear(); | pto->m_tx_relay->setInventoryTxToSend.clear(); | ||||
} | } | ||||
} | } | ||||
// Respond to BIP35 mempool requests | // Respond to BIP35 mempool requests | ||||
if (fSendTrickle && pto->m_tx_relay->fSendMempool) { | if (fSendTrickle && pto->m_tx_relay->fSendMempool) { | ||||
auto vtxinfo = m_mempool.infoAll(); | auto vtxinfo = m_mempool.infoAll(); | ||||
pto->m_tx_relay->fSendMempool = false; | pto->m_tx_relay->fSendMempool = false; | ||||
CFeeRate filterrate; | CFeeRate filterrate; | ||||
{ | { | ||||
LOCK(pto->m_tx_relay->cs_feeFilter); | LOCK(pto->m_tx_relay->cs_feeFilter); | ||||
filterrate = CFeeRate(pto->m_tx_relay->minFeeFilter); | filterrate = CFeeRate(pto->m_tx_relay->minFeeFilter); | ||||
} | } | ||||
LOCK(pto->m_tx_relay->cs_filter); | LOCK(pto->m_tx_relay->cs_filter); | ||||
for (const auto &txinfo : vtxinfo) { | for (const auto &txinfo : vtxinfo) { | ||||
const TxId &txid = txinfo.tx->GetId(); | const TxId &txid = txinfo.tx->GetId(); | ||||
pto->m_tx_relay->setInventoryTxToSend.erase(txid); | pto->m_tx_relay->setInventoryTxToSend.erase(txid); | ||||
// Don't send transactions that peers will not put into | // Don't send transactions that peers will not put into | ||||
// their mempool | // their mempool | ||||
if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) { | if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) { | ||||
continue; | continue; | ||||
} | } | ||||
if (pto->m_tx_relay->pfilter && | if (pto->m_tx_relay->pfilter && | ||||
!pto->m_tx_relay->pfilter->IsRelevantAndUpdate( | !pto->m_tx_relay->pfilter->IsRelevantAndUpdate( | ||||
*txinfo.tx)) { | *txinfo.tx)) { | ||||
continue; | continue; | ||||
} | } | ||||
pto->m_tx_relay->filterInventoryKnown.insert(txid); | pto->m_tx_relay->filterInventoryKnown.insert(txid); | ||||
// Responses to MEMPOOL requests bypass the | // Responses to MEMPOOL requests bypass the | ||||
// m_recently_announced_invs filter. | // m_recently_announced_invs filter. | ||||
addInvAndMaybeFlush(MSG_TX, txid); | addInvAndMaybeFlush(MSG_TX, txid); | ||||
} | } | ||||
pto->m_tx_relay->m_last_mempool_req = | pto->m_tx_relay->m_last_mempool_req = | ||||
GetTime<std::chrono::seconds>(); | GetTime<std::chrono::seconds>(); | ||||
} | } | ||||
// Determine transactions to relay | // Determine transactions to relay | ||||
if (fSendTrickle) { | if (fSendTrickle) { | ||||
// Produce a vector with all candidates for sending | // Produce a vector with all candidates for sending | ||||
std::vector<std::set<TxId>::iterator> vInvTx; | std::vector<std::set<TxId>::iterator> vInvTx; | ||||
vInvTx.reserve( | vInvTx.reserve(pto->m_tx_relay->setInventoryTxToSend.size()); | ||||
pto->m_tx_relay->setInventoryTxToSend.size()); | |||||
for (std::set<TxId>::iterator it = | for (std::set<TxId>::iterator it = | ||||
pto->m_tx_relay->setInventoryTxToSend.begin(); | pto->m_tx_relay->setInventoryTxToSend.begin(); | ||||
it != pto->m_tx_relay->setInventoryTxToSend.end(); | it != pto->m_tx_relay->setInventoryTxToSend.end(); it++) { | ||||
it++) { | |||||
vInvTx.push_back(it); | vInvTx.push_back(it); | ||||
} | } | ||||
CFeeRate filterrate; | CFeeRate filterrate; | ||||
{ | { | ||||
LOCK(pto->m_tx_relay->cs_feeFilter); | LOCK(pto->m_tx_relay->cs_feeFilter); | ||||
filterrate = CFeeRate(pto->m_tx_relay->minFeeFilter); | filterrate = CFeeRate(pto->m_tx_relay->minFeeFilter); | ||||
} | } | ||||
// Topologically and fee-rate sort the inventory we send for | // Topologically and fee-rate sort the inventory we send for | ||||
// privacy and priority reasons. A heap is used so that not | // privacy and priority reasons. A heap is used so that not | ||||
// all items need sorting if only a few are being sent. | // all items need sorting if only a few are being sent. | ||||
CompareInvMempoolOrder compareInvMempoolOrder(&m_mempool); | CompareInvMempoolOrder compareInvMempoolOrder(&m_mempool); | ||||
std::make_heap(vInvTx.begin(), vInvTx.end(), | std::make_heap(vInvTx.begin(), vInvTx.end(), | ||||
compareInvMempoolOrder); | compareInvMempoolOrder); | ||||
// No reason to drain out at many times the network's | // No reason to drain out at many times the network's | ||||
// capacity, especially since we have many peers and some | // capacity, especially since we have many peers and some | ||||
// will draw much shorter delays. | // will draw much shorter delays. | ||||
unsigned int nRelayedTransactions = 0; | unsigned int nRelayedTransactions = 0; | ||||
LOCK(pto->m_tx_relay->cs_filter); | LOCK(pto->m_tx_relay->cs_filter); | ||||
while (!vInvTx.empty() && | while (!vInvTx.empty() && | ||||
nRelayedTransactions < | nRelayedTransactions < INVENTORY_BROADCAST_MAX_PER_MB * | ||||
INVENTORY_BROADCAST_MAX_PER_MB * | config.GetMaxBlockSize() / | ||||
config.GetMaxBlockSize() / 1000000) { | 1000000) { | ||||
// Fetch the top element from the heap | // Fetch the top element from the heap | ||||
std::pop_heap(vInvTx.begin(), vInvTx.end(), | std::pop_heap(vInvTx.begin(), vInvTx.end(), | ||||
compareInvMempoolOrder); | compareInvMempoolOrder); | ||||
std::set<TxId>::iterator it = vInvTx.back(); | std::set<TxId>::iterator it = vInvTx.back(); | ||||
vInvTx.pop_back(); | vInvTx.pop_back(); | ||||
const TxId txid = *it; | const TxId txid = *it; | ||||
// Remove it from the to-be-sent set | // Remove it from the to-be-sent set | ||||
pto->m_tx_relay->setInventoryTxToSend.erase(it); | pto->m_tx_relay->setInventoryTxToSend.erase(it); | ||||
// Check if not in the filter already | // Check if not in the filter already | ||||
if (pto->m_tx_relay->filterInventoryKnown.contains( | if (pto->m_tx_relay->filterInventoryKnown.contains(txid)) { | ||||
txid)) { | |||||
continue; | continue; | ||||
} | } | ||||
// Not in the mempool anymore? don't bother sending it. | // Not in the mempool anymore? don't bother sending it. | ||||
auto txinfo = m_mempool.info(txid); | auto txinfo = m_mempool.info(txid); | ||||
if (!txinfo.tx) { | if (!txinfo.tx) { | ||||
continue; | continue; | ||||
} | } | ||||
// Peer told you to not send transactions at that | // Peer told you to not send transactions at that | ||||
// feerate? Don't bother sending it. | // feerate? Don't bother sending it. | ||||
if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) { | if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) { | ||||
continue; | continue; | ||||
} | } | ||||
if (pto->m_tx_relay->pfilter && | if (pto->m_tx_relay->pfilter && | ||||
!pto->m_tx_relay->pfilter->IsRelevantAndUpdate( | !pto->m_tx_relay->pfilter->IsRelevantAndUpdate( | ||||
*txinfo.tx)) { | *txinfo.tx)) { | ||||
continue; | continue; | ||||
} | } | ||||
// Send | // Send | ||||
State(pto->GetId()) | State(pto->GetId())->m_recently_announced_invs.insert(txid); | ||||
->m_recently_announced_invs.insert(txid); | |||||
addInvAndMaybeFlush(MSG_TX, txid); | addInvAndMaybeFlush(MSG_TX, txid); | ||||
nRelayedTransactions++; | nRelayedTransactions++; | ||||
{ | { | ||||
// Expire old relay messages | // Expire old relay messages | ||||
while (!vRelayExpiration.empty() && | while (!vRelayExpiration.empty() && | ||||
vRelayExpiration.front().first < | vRelayExpiration.front().first < | ||||
count_microseconds(current_time)) { | count_microseconds(current_time)) { | ||||
mapRelay.erase(vRelayExpiration.front().second); | mapRelay.erase(vRelayExpiration.front().second); | ||||
vRelayExpiration.pop_front(); | vRelayExpiration.pop_front(); | ||||
} | } | ||||
auto ret = mapRelay.insert( | auto ret = mapRelay.insert( | ||||
std::make_pair(txid, std::move(txinfo.tx))); | std::make_pair(txid, std::move(txinfo.tx))); | ||||
if (ret.second) { | if (ret.second) { | ||||
vRelayExpiration.push_back(std::make_pair( | vRelayExpiration.push_back(std::make_pair( | ||||
count_microseconds(current_time) + | count_microseconds(current_time) + | ||||
std::chrono::microseconds{ | std::chrono::microseconds{ | ||||
RELAY_TX_CACHE_TIME} | RELAY_TX_CACHE_TIME} | ||||
.count(), | .count(), | ||||
ret.first)); | ret.first)); | ||||
} | } | ||||
} | } | ||||
pto->m_tx_relay->filterInventoryKnown.insert(txid); | pto->m_tx_relay->filterInventoryKnown.insert(txid); | ||||
} | } | ||||
} | } | ||||
} | } | ||||
} | } // release cs_main, pto->cs_inventory | ||||
if (!vInv.empty()) { | if (!vInv.empty()) { | ||||
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); | m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); | ||||
} | } | ||||
{ | |||||
LOCK(cs_main); | |||||
CNodeState &state = *State(pto->GetId()); | |||||
// Detect whether we're stalling | // Detect whether we're stalling | ||||
current_time = GetTime<std::chrono::microseconds>(); | current_time = GetTime<std::chrono::microseconds>(); | ||||
if (state.nStallingSince && | if (state.nStallingSince && | ||||
state.nStallingSince < count_microseconds(current_time) - | state.nStallingSince < count_microseconds(current_time) - | ||||
1000000 * BLOCK_STALLING_TIMEOUT) { | 1000000 * BLOCK_STALLING_TIMEOUT) { | ||||
// Stalling only triggers when the block download window cannot | // Stalling only triggers when the block download window cannot | ||||
// move. During normal steady state, the download window should be | // move. During normal steady state, the download window should be | ||||
// much larger than the to-be-downloaded set of blocks, so | // much larger than the to-be-downloaded set of blocks, so | ||||
▲ Show 20 Lines • Show All 221 Lines • Show Last 20 Lines |