Changeset View
Changeset View
Standalone View
Standalone View
src/net_processing.cpp
Show First 20 Lines • Show All 607 Lines • ▼ Show 20 Lines | struct Peer { | ||||
bool m_should_discourage GUARDED_BY(m_misbehavior_mutex){false}; | bool m_should_discourage GUARDED_BY(m_misbehavior_mutex){false}; | ||||
/** | /** | ||||
* Set of txids to reconsider once their parent transactions have been | * Set of txids to reconsider once their parent transactions have been | ||||
* accepted | * accepted | ||||
*/ | */ | ||||
std::set<TxId> m_orphan_work_set GUARDED_BY(g_cs_orphans); | std::set<TxId> m_orphan_work_set GUARDED_BY(g_cs_orphans); | ||||
/** Protects vRecvGetData **/ | /** Protects m_getdata_requests **/ | ||||
Mutex m_getdata_requests_mutex; | Mutex m_getdata_requests_mutex; | ||||
/** Work queue of items requested by this peer **/ | /** Work queue of items requested by this peer **/ | ||||
std::deque<CInv> vRecvGetData GUARDED_BY(m_getdata_requests_mutex); | std::deque<CInv> m_getdata_requests GUARDED_BY(m_getdata_requests_mutex); | ||||
Peer(NodeId id) : m_id(id) {} | Peer(NodeId id) : m_id(id) {} | ||||
}; | }; | ||||
using PeerRef = std::shared_ptr<Peer>; | using PeerRef = std::shared_ptr<Peer>; | ||||
/** | /** | ||||
* Map of all Peer objects, keyed by peer id. This map is protected | * Map of all Peer objects, keyed by peer id. This map is protected | ||||
▲ Show 20 Lines • Show All 1,540 Lines • ▼ Show 20 Lines | |||||
static void ProcessGetData(const Config &config, CNode &pfrom, Peer &peer, | static void ProcessGetData(const Config &config, CNode &pfrom, Peer &peer, | ||||
CConnman &connman, CTxMemPool &mempool, | CConnman &connman, CTxMemPool &mempool, | ||||
const std::atomic<bool> &interruptMsgProc) | const std::atomic<bool> &interruptMsgProc) | ||||
EXCLUSIVE_LOCKS_REQUIRED(peer.m_getdata_requests_mutex) | EXCLUSIVE_LOCKS_REQUIRED(peer.m_getdata_requests_mutex) | ||||
LOCKS_EXCLUDED(::cs_main) { | LOCKS_EXCLUDED(::cs_main) { | ||||
AssertLockNotHeld(cs_main); | AssertLockNotHeld(cs_main); | ||||
std::deque<CInv>::iterator it = peer.vRecvGetData.begin(); | std::deque<CInv>::iterator it = peer.m_getdata_requests.begin(); | ||||
std::vector<CInv> vNotFound; | std::vector<CInv> vNotFound; | ||||
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); | const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); | ||||
const std::chrono::seconds now = GetTime<std::chrono::seconds>(); | const std::chrono::seconds now = GetTime<std::chrono::seconds>(); | ||||
// Get last mempool request time | // Get last mempool request time | ||||
const std::chrono::seconds mempool_req = | const std::chrono::seconds mempool_req = | ||||
pfrom.m_tx_relay != nullptr | pfrom.m_tx_relay != nullptr | ||||
? pfrom.m_tx_relay->m_last_mempool_req.load() | ? pfrom.m_tx_relay->m_last_mempool_req.load() | ||||
: std::chrono::seconds::min(); | : std::chrono::seconds::min(); | ||||
// Process as many TX or AVA_PROOF items from the front of the getdata | // Process as many TX or AVA_PROOF items from the front of the getdata | ||||
// queue as possible, since they're common and it's efficient to batch | // queue as possible, since they're common and it's efficient to batch | ||||
// process them. | // process them. | ||||
while (it != peer.vRecvGetData.end()) { | while (it != peer.m_getdata_requests.end()) { | ||||
if (interruptMsgProc) { | if (interruptMsgProc) { | ||||
return; | return; | ||||
} | } | ||||
// The send buffer provides backpressure. If there's no space in | // The send buffer provides backpressure. If there's no space in | ||||
// the buffer, pause processing until the next call. | // the buffer, pause processing until the next call. | ||||
if (pfrom.fPauseSend) { | if (pfrom.fPauseSend) { | ||||
break; | break; | ||||
} | } | ||||
▲ Show 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | while (it != peer.m_getdata_requests.end()) { | ||||
} | } | ||||
// It's neither a proof nor a transaction | // It's neither a proof nor a transaction | ||||
break; | break; | ||||
} | } | ||||
// Only process one BLOCK item per call, since they're uncommon and can be | // Only process one BLOCK item per call, since they're uncommon and can be | ||||
// expensive to process. | // expensive to process. | ||||
if (it != peer.vRecvGetData.end() && !pfrom.fPauseSend) { | if (it != peer.m_getdata_requests.end() && !pfrom.fPauseSend) { | ||||
const CInv &inv = *it++; | const CInv &inv = *it++; | ||||
if (inv.IsGenBlkMsg()) { | if (inv.IsGenBlkMsg()) { | ||||
ProcessGetBlockData(config, pfrom, inv, connman, interruptMsgProc); | ProcessGetBlockData(config, pfrom, inv, connman, interruptMsgProc); | ||||
} | } | ||||
// else: If the first item on the queue is an unknown type, we erase it | // else: If the first item on the queue is an unknown type, we erase it | ||||
// and continue processing the queue on the next call. | // and continue processing the queue on the next call. | ||||
} | } | ||||
peer.vRecvGetData.erase(peer.vRecvGetData.begin(), it); | peer.m_getdata_requests.erase(peer.m_getdata_requests.begin(), it); | ||||
if (!vNotFound.empty()) { | if (!vNotFound.empty()) { | ||||
// Let the peer know that we didn't find what it asked for, so it | // Let the peer know that we didn't find what it asked for, so it | ||||
// doesn't have to wait around forever. SPV clients care about this | // doesn't have to wait around forever. SPV clients care about this | ||||
// message: it's needed when they are recursively walking the | // message: it's needed when they are recursively walking the | ||||
// dependencies of relevant unconfirmed transactions. SPV clients want | // dependencies of relevant unconfirmed transactions. SPV clients want | ||||
// to do that because they want to know about (and store and rebroadcast | // to do that because they want to know about (and store and rebroadcast | ||||
// and risk analyze) the dependencies of transactions relevant to them, | // and risk analyze) the dependencies of transactions relevant to them, | ||||
▲ Show 20 Lines • Show All 1,110 Lines • ▼ Show 20 Lines | if (msg_type == NetMsgType::GETDATA) { | ||||
if (vInv.size() > 0) { | if (vInv.size() > 0) { | ||||
LogPrint(BCLog::NET, "received getdata for: %s peer=%d\n", | LogPrint(BCLog::NET, "received getdata for: %s peer=%d\n", | ||||
vInv[0].ToString(), pfrom.GetId()); | vInv[0].ToString(), pfrom.GetId()); | ||||
} | } | ||||
{ | { | ||||
LOCK(peer->m_getdata_requests_mutex); | LOCK(peer->m_getdata_requests_mutex); | ||||
peer->vRecvGetData.insert(peer->vRecvGetData.end(), vInv.begin(), | peer->m_getdata_requests.insert(peer->m_getdata_requests.end(), | ||||
vInv.end()); | vInv.begin(), vInv.end()); | ||||
ProcessGetData(config, pfrom, *peer, m_connman, m_mempool, | ProcessGetData(config, pfrom, *peer, m_connman, m_mempool, | ||||
interruptMsgProc); | interruptMsgProc); | ||||
} | } | ||||
return; | return; | ||||
} | } | ||||
if (msg_type == NetMsgType::GETBLOCKS) { | if (msg_type == NetMsgType::GETBLOCKS) { | ||||
▲ Show 20 Lines • Show All 130 Lines • ▼ Show 20 Lines | if (msg_type == NetMsgType::GETBLOCKTXN) { | ||||
// actually receive all the data read from disk over the network. | // actually receive all the data read from disk over the network. | ||||
LogPrint(BCLog::NET, | LogPrint(BCLog::NET, | ||||
"Peer %d sent us a getblocktxn for a block > %i deep\n", | "Peer %d sent us a getblocktxn for a block > %i deep\n", | ||||
pfrom.GetId(), MAX_BLOCKTXN_DEPTH); | pfrom.GetId(), MAX_BLOCKTXN_DEPTH); | ||||
CInv inv; | CInv inv; | ||||
inv.type = MSG_BLOCK; | inv.type = MSG_BLOCK; | ||||
inv.hash = req.blockhash; | inv.hash = req.blockhash; | ||||
WITH_LOCK(peer->m_getdata_requests_mutex, | WITH_LOCK(peer->m_getdata_requests_mutex, | ||||
peer->vRecvGetData.push_back(inv)); | peer->m_getdata_requests.push_back(inv)); | ||||
// The message processing loop will go around again (without pausing) | // The message processing loop will go around again (without pausing) | ||||
// and we'll respond then (without cs_main) | // and we'll respond then (without cs_main) | ||||
return; | return; | ||||
} | } | ||||
if (msg_type == NetMsgType::GETHEADERS) { | if (msg_type == NetMsgType::GETHEADERS) { | ||||
CBlockLocator locator; | CBlockLocator locator; | ||||
BlockHash hashStop; | BlockHash hashStop; | ||||
▲ Show 20 Lines • Show All 1,297 Lines • ▼ Show 20 Lines | bool PeerManager::ProcessMessages(const Config &config, CNode *pfrom, | ||||
PeerRef peer = GetPeerRef(pfrom->GetId()); | PeerRef peer = GetPeerRef(pfrom->GetId()); | ||||
if (peer == nullptr) { | if (peer == nullptr) { | ||||
return false; | return false; | ||||
} | } | ||||
{ | { | ||||
LOCK(peer->m_getdata_requests_mutex); | LOCK(peer->m_getdata_requests_mutex); | ||||
if (!peer->vRecvGetData.empty()) { | if (!peer->m_getdata_requests.empty()) { | ||||
ProcessGetData(config, *pfrom, *peer, m_connman, m_mempool, | ProcessGetData(config, *pfrom, *peer, m_connman, m_mempool, | ||||
interruptMsgProc); | interruptMsgProc); | ||||
} | } | ||||
} | } | ||||
{ | { | ||||
LOCK2(cs_main, g_cs_orphans); | LOCK2(cs_main, g_cs_orphans); | ||||
if (!peer->m_orphan_work_set.empty()) { | if (!peer->m_orphan_work_set.empty()) { | ||||
ProcessOrphanTx(config, peer->m_orphan_work_set); | ProcessOrphanTx(config, peer->m_orphan_work_set); | ||||
} | } | ||||
} | } | ||||
if (pfrom->fDisconnect) { | if (pfrom->fDisconnect) { | ||||
return false; | return false; | ||||
} | } | ||||
// this maintains the order of responses and prevents vRecvGetData from | // this maintains the order of responses and prevents m_getdata_requests | ||||
// growing unbounded | // from growing unbounded | ||||
{ | { | ||||
LOCK(peer->m_getdata_requests_mutex); | LOCK(peer->m_getdata_requests_mutex); | ||||
if (!peer->vRecvGetData.empty()) { | if (!peer->m_getdata_requests.empty()) { | ||||
return true; | return true; | ||||
} | } | ||||
} | } | ||||
{ | { | ||||
LOCK(g_cs_orphans); | LOCK(g_cs_orphans); | ||||
if (!peer->m_orphan_work_set.empty()) { | if (!peer->m_orphan_work_set.empty()) { | ||||
return true; | return true; | ||||
▲ Show 20 Lines • Show All 67 Lines • ▼ Show 20 Lines | try { | ||||
ProcessMessage(config, *pfrom, msg_type, vRecv, msg.m_time, | ProcessMessage(config, *pfrom, msg_type, vRecv, msg.m_time, | ||||
interruptMsgProc); | interruptMsgProc); | ||||
if (interruptMsgProc) { | if (interruptMsgProc) { | ||||
return false; | return false; | ||||
} | } | ||||
{ | { | ||||
LOCK(peer->m_getdata_requests_mutex); | LOCK(peer->m_getdata_requests_mutex); | ||||
if (!peer->vRecvGetData.empty()) { | if (!peer->m_getdata_requests.empty()) { | ||||
fMoreWork = true; | fMoreWork = true; | ||||
} | } | ||||
} | } | ||||
} catch (const std::exception &e) { | } catch (const std::exception &e) { | ||||
LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", | LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", | ||||
__func__, SanitizeString(msg_type), nMessageSize, e.what(), | __func__, SanitizeString(msg_type), nMessageSize, e.what(), | ||||
typeid(e).name()); | typeid(e).name()); | ||||
} catch (...) { | } catch (...) { | ||||
▲ Show 20 Lines • Show All 1,036 Lines • Show Last 20 Lines |