Changeset View
Changeset View
Standalone View
Standalone View
src/avalanche.cpp
Show All 35 Lines | #else | ||||
*/ | */ | ||||
v = v - ((v >> 1) & 0x55555555); | v = v - ((v >> 1) & 0x55555555); | ||||
v = (v & 0x33333333) + ((v >> 2) & 0x33333333); | v = (v & 0x33333333) + ((v >> 2) & 0x33333333); | ||||
return (((v + (v >> 4)) & 0xF0F0F0F) * 0x1010101) >> 24; | return (((v + (v >> 4)) & 0xF0F0F0F) * 0x1010101) >> 24; | ||||
#endif | #endif | ||||
} | } | ||||
bool VoteRecord::registerVote(uint32_t error) { | bool VoteRecord::registerVote(uint32_t error) { | ||||
// We just got a new vote, so there is one less inflight request. | |||||
clearInflightRequest(); | |||||
/** | /** | ||||
* The result of the vote is determined from the error code. If the error | * The result of the vote is determined from the error code. If the error | ||||
* code is 0, there is no error and therefore the vote is yes. If there is | * code is 0, there is no error and therefore the vote is yes. If there is | ||||
* an error, we check the most significant bit to decide if the vote is a no | * an error, we check the most significant bit to decide if the vote is a no | ||||
* (for instance, the block is invalid) or is the vote inconclusive (for | * (for instance, the block is invalid) or is the vote inconclusive (for | ||||
* instance, the queried node does not have the block yet). | * instance, the queried node does not have the block yet). | ||||
*/ | */ | ||||
votes = (votes << 1) | (error == 0); | votes = (votes << 1) | (error == 0); | ||||
Show All 23 Lines | if (isAccepted() == yes) { | ||||
return getConfidence() == AVALANCHE_FINALIZATION_SCORE; | return getConfidence() == AVALANCHE_FINALIZATION_SCORE; | ||||
} | } | ||||
// The round changed our state. We reset the confidence. | // The round changed our state. We reset the confidence. | ||||
confidence = yes; | confidence = yes; | ||||
return true; | return true; | ||||
} | } | ||||
bool VoteRecord::registerPoll() const { | |||||
uint8_t count = inflight.load(); | |||||
while (count < AVALANCHE_MAX_INFLIGHT_POLL) { | |||||
if (inflight.compare_exchange_weak(count, count + 1)) { | |||||
return true; | |||||
} | |||||
} | |||||
return false; | |||||
} | |||||
static bool IsWorthPolling(const CBlockIndex *pindex) { | static bool IsWorthPolling(const CBlockIndex *pindex) { | ||||
AssertLockHeld(cs_main); | AssertLockHeld(cs_main); | ||||
if (pindex->nStatus.isInvalid()) { | if (pindex->nStatus.isInvalid()) { | ||||
// No point polling invalid blocks. | // No point polling invalid blocks. | ||||
return false; | return false; | ||||
} | } | ||||
▲ Show 20 Lines • Show All 206 Lines • ▼ Show 20 Lines | bool AvalancheProcessor::stopEventLoop() { | ||||
cond_running.wait(lock, [this]() EXCLUSIVE_LOCKS_REQUIRED(cs_running) { | cond_running.wait(lock, [this]() EXCLUSIVE_LOCKS_REQUIRED(cs_running) { | ||||
return !running; | return !running; | ||||
}); | }); | ||||
stopRequest = false; | stopRequest = false; | ||||
return true; | return true; | ||||
} | } | ||||
std::vector<CInv> AvalancheProcessor::getInvsForNextPoll() const { | std::vector<CInv> AvalancheProcessor::getInvsForNextPoll(bool forPoll) const { | ||||
std::vector<CInv> invs; | std::vector<CInv> invs; | ||||
auto r = vote_records.getReadView(); | auto r = vote_records.getReadView(); | ||||
for (const std::pair<const CBlockIndex *const, VoteRecord> &p : | for (const std::pair<const CBlockIndex *const, VoteRecord> &p : | ||||
reverse_iterate(r)) { | reverse_iterate(r)) { | ||||
const CBlockIndex *pindex = p.first; | const CBlockIndex *pindex = p.first; | ||||
{ | { | ||||
LOCK(cs_main); | LOCK(cs_main); | ||||
if (!IsWorthPolling(pindex)) { | if (!IsWorthPolling(pindex)) { | ||||
// Obviously do not poll if the block is not worth polling. | // Obviously do not poll if the block is not worth polling. | ||||
continue; | continue; | ||||
} | } | ||||
} | } | ||||
// Check if we can run poll. | |||||
const bool shouldPoll = | |||||
forPoll ? p.second.registerPoll() : p.second.shouldPoll(); | |||||
if (!shouldPoll) { | |||||
continue; | |||||
} | |||||
// We don't have a decision, we need more votes. | // We don't have a decision, we need more votes. | ||||
invs.emplace_back(MSG_BLOCK, pindex->GetBlockHash()); | invs.emplace_back(MSG_BLOCK, pindex->GetBlockHash()); | ||||
if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { | if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { | ||||
// Make sure we do not produce more invs than specified by the | // Make sure we do not produce more invs than specified by the | ||||
// protocol. | // protocol. | ||||
return invs; | return invs; | ||||
} | } | ||||
} | } | ||||
Show All 12 Lines | if (it->nextRequestTime <= std::chrono::steady_clock::now()) { | ||||
return it->nodeid; | return it->nodeid; | ||||
} | } | ||||
return NO_NODE; | return NO_NODE; | ||||
} | } | ||||
void AvalancheProcessor::clearTimedoutRequests() { | void AvalancheProcessor::clearTimedoutRequests() { | ||||
auto now = std::chrono::steady_clock::now(); | auto now = std::chrono::steady_clock::now(); | ||||
std::map<CInv, uint8_t> timedout_items{}; | |||||
{ | |||||
// Clear expired requests. | // Clear expired requests. | ||||
auto w = queries.getWriteView(); | auto w = queries.getWriteView(); | ||||
auto it = w->get<query_timeout>().begin(); | auto it = w->get<query_timeout>().begin(); | ||||
while (it != w->get<query_timeout>().end() && it->timeout < now) { | while (it != w->get<query_timeout>().end() && it->timeout < now) { | ||||
for (auto &i : it->invs) { | |||||
timedout_items[i]++; | |||||
} | |||||
w->get<query_timeout>().erase(it++); | w->get<query_timeout>().erase(it++); | ||||
} | } | ||||
} | } | ||||
if (timedout_items.empty()) { | |||||
return; | |||||
} | |||||
// In flight request accounting. | |||||
for (const auto &p : timedout_items) { | |||||
const CInv &inv = p.first; | |||||
assert(inv.type == MSG_BLOCK); | |||||
CBlockIndex *pindex; | |||||
{ | |||||
LOCK(cs_main); | |||||
BlockMap::iterator mi = mapBlockIndex.find(inv.hash); | |||||
if (mi == mapBlockIndex.end()) { | |||||
continue; | |||||
} | |||||
pindex = mi->second; | |||||
} | |||||
auto w = vote_records.getWriteView(); | |||||
auto it = w->find(pindex); | |||||
if (it == w.end()) { | |||||
continue; | |||||
} | |||||
it->second.clearInflightRequest(p.second); | |||||
} | |||||
} | |||||
void AvalancheProcessor::runEventLoop() { | void AvalancheProcessor::runEventLoop() { | ||||
// First things first, check if we have requests that timed out and clear | // First things first, check if we have requests that timed out and clear | ||||
// them. | // them. | ||||
clearTimedoutRequests(); | clearTimedoutRequests(); | ||||
std::vector<CInv> invs = getInvsForNextPoll(); | |||||
if (invs.empty()) { | |||||
// If there are no invs to poll, we are done. | |||||
return; | |||||
} | |||||
while (true) { | while (true) { | ||||
NodeId nodeid = getSuitableNodeToQuery(); | NodeId nodeid = getSuitableNodeToQuery(); | ||||
if (nodeid == NO_NODE) { | if (nodeid == NO_NODE) { | ||||
return; | return; | ||||
} | } | ||||
/** | /** | ||||
* If we lost contact to that node, then we remove it from nodeids, but | * If we lost contact to that node, then we remove it from nodeids, but | ||||
* never add the request to queries, which ensures bad nodes get cleaned | * never add the request to queries, which ensures bad nodes get cleaned | ||||
* up over time. | * up over time. | ||||
*/ | */ | ||||
std::vector<CInv> invs; | |||||
bool hasSent = connman->ForNode(nodeid, [this, &invs](CNode *pnode) { | bool hasSent = connman->ForNode(nodeid, [this, &invs](CNode *pnode) { | ||||
invs = getInvsForNextPoll(); | |||||
if (invs.empty()) { | |||||
return false; | |||||
} | |||||
uint64_t current_round = round++; | uint64_t current_round = round++; | ||||
{ | { | ||||
// Compute the time at which this requests times out. | // Compute the time at which this requests times out. | ||||
auto timeout = | auto timeout = | ||||
std::chrono::steady_clock::now() + queryTimeoutDuration; | std::chrono::steady_clock::now() + queryTimeoutDuration; | ||||
// Register the query. | // Register the query. | ||||
queries.getWriteView()->insert( | queries.getWriteView()->insert( | ||||
Show All 13 Lines | while (true) { | ||||
pnode, | pnode, | ||||
CNetMsgMaker(pnode->GetSendVersion()) | CNetMsgMaker(pnode->GetSendVersion()) | ||||
.Make(NetMsgType::AVAPOLL, | .Make(NetMsgType::AVAPOLL, | ||||
AvalanchePoll(current_round, std::move(invs)))); | AvalanchePoll(current_round, std::move(invs)))); | ||||
return true; | return true; | ||||
}); | }); | ||||
// Success! | // Success! | ||||
if (hasSent) { | if (hasSent || invs.empty()) { | ||||
return; | return; | ||||
} | } | ||||
// This node is obsolete, delete it. | // This node is obsolete, delete it. | ||||
peerSet.getWriteView()->erase(nodeid); | peerSet.getWriteView()->erase(nodeid); | ||||
} | } | ||||
} | } |