Changeset View
Changeset View
Standalone View
Standalone View
src/avalanche.cpp
Show All 32 Lines | bool AvalancheProcessor::isAccepted(const CBlockIndex *pindex) const { | ||||
if (auto vr = GetRecord(vote_records, pindex)) { | if (auto vr = GetRecord(vote_records, pindex)) { | ||||
return vr->isAccepted(); | return vr->isAccepted(); | ||||
} | } | ||||
return false; | return false; | ||||
} | } | ||||
bool AvalancheProcessor::registerVotes( | bool AvalancheProcessor::registerVotes( | ||||
const AvalancheResponse &response, | NodeId nodeid, const AvalancheResponse &response, | ||||
std::vector<AvalancheBlockUpdate> &updates) { | std::vector<AvalancheBlockUpdate> &updates) { | ||||
RequestRecord r; | |||||
{ | |||||
// Check that the query exists. | |||||
auto w = queries.getWriteView(); | |||||
auto it = w->find(nodeid); | |||||
if (it == w.end()) { | |||||
// NB: The request may be old, so we don't increase banscore. | |||||
return false; | |||||
} | |||||
r = std::move(it->second); | |||||
w->erase(it); | |||||
} | |||||
// Verify that the request and the vote are consistent. | |||||
const std::vector<CInv> &invs = r.GetInvs(); | |||||
const std::vector<AvalancheVote> &votes = response.GetVotes(); | const std::vector<AvalancheVote> &votes = response.GetVotes(); | ||||
size_t size = invs.size(); | |||||
if (votes.size() != size) { | |||||
// TODO: increase banscore for inconsistent response. | |||||
// NB: This isn't timeout but actually node misbehaving. | |||||
return false; | |||||
} | |||||
for (size_t i = 0; i < size; i++) { | |||||
if (invs[i].hash != votes[i].GetHash()) { | |||||
// TODO: increase banscore for inconsistent response. | |||||
// NB: This isn't timeout but actually node misbehaving. | |||||
return false; | |||||
} | |||||
} | |||||
std::map<CBlockIndex *, AvalancheVote> responseIndex; | std::map<CBlockIndex *, AvalancheVote> responseIndex; | ||||
{ | { | ||||
LOCK(cs_main); | LOCK(cs_main); | ||||
for (auto &v : votes) { | for (auto &v : votes) { | ||||
BlockMap::iterator mi = mapBlockIndex.find(v.GetHash()); | BlockMap::iterator mi = mapBlockIndex.find(v.GetHash()); | ||||
if (mi == mapBlockIndex.end()) { | if (mi == mapBlockIndex.end()) { | ||||
Show All 39 Lines | std::map<CBlockIndex *, AvalancheVote> responseIndex; | ||||
updates.emplace_back(pindex, | updates.emplace_back(pindex, | ||||
vr.isAccepted() | vr.isAccepted() | ||||
? AvalancheBlockUpdate::Status::Finalized | ? AvalancheBlockUpdate::Status::Finalized | ||||
: AvalancheBlockUpdate::Status::Invalid); | : AvalancheBlockUpdate::Status::Invalid); | ||||
w->erase(it); | w->erase(it); | ||||
} | } | ||||
} | } | ||||
// Put the node back in the list of queriable nodes. | |||||
auto w = nodeids.getWriteView(); | |||||
w->insert(nodeid); | |||||
return true; | return true; | ||||
} | } | ||||
namespace { | namespace { | ||||
/** | /** | ||||
* Run the avalanche event loop every 10ms. | * Run the avalanche event loop every 10ms. | ||||
*/ | */ | ||||
static int64_t AVALANCHE_TIME_STEP_MILLISECONDS = 10; | static int64_t AVALANCHE_TIME_STEP_MILLISECONDS = 10; | ||||
Show All 10 Lines | if (running) { | ||||
return false; | return false; | ||||
} | } | ||||
running = true; | running = true; | ||||
// Start the event loop. | // Start the event loop. | ||||
scheduler.scheduleEvery( | scheduler.scheduleEvery( | ||||
[this]() -> bool { | [this]() -> bool { | ||||
runEventLoop(); | |||||
if (!stopRequest) { | if (!stopRequest) { | ||||
return true; | return true; | ||||
} | } | ||||
LOCK(cs_running); | LOCK(cs_running); | ||||
running = false; | running = false; | ||||
cond_running.notify_all(); | cond_running.notify_all(); | ||||
Show All 40 Lines | for (const std::pair<const CBlockIndex *, VoteRecord> &p : | ||||
// Make sure we do not produce more invs than specified by the | // Make sure we do not produce more invs than specified by the | ||||
// protocol. | // protocol. | ||||
return invs; | return invs; | ||||
} | } | ||||
} | } | ||||
return invs; | return invs; | ||||
} | } | ||||
NodeId AvalancheProcessor::getSuitableNodeToQuery() { | |||||
auto w = nodeids.getWriteView(); | |||||
if (w->empty()) { | |||||
auto r = queries.getReadView(); | |||||
// We don't have any candidate node, so let's try to find some. | |||||
connman->ForEachNode([&w, &r](CNode *pnode) { | |||||
// If this node doesn't support avalanche, we remove. | |||||
if (!(pnode->nServices & NODE_AVALANCHE)) { | |||||
return; | |||||
} | |||||
// if we have a request in flight for that node. | |||||
if (r->find(pnode->GetId()) != r.end()) { | |||||
return; | |||||
} | |||||
w->insert(pnode->GetId()); | |||||
}); | |||||
} | |||||
// We don't have any suitable candidate. | |||||
if (w->empty()) { | |||||
return -1; | |||||
} | |||||
auto it = w.begin(); | |||||
NodeId nodeid = *it; | |||||
w->erase(it); | |||||
return nodeid; | |||||
} | |||||
void AvalancheProcessor::runEventLoop() { | |||||
std::vector<CInv> invs = getInvsForNextPoll(); | |||||
if (invs.empty()) { | |||||
// If there are no invs to poll, we are done. | |||||
return; | |||||
} | |||||
NodeId nodeid = getSuitableNodeToQuery(); | |||||
/** | |||||
* If we lost contact to that node, then we remove it from nodeids, but | |||||
* never add the request to queries, which ensures bad nodes get cleaned up | |||||
* over time. | |||||
*/ | |||||
connman->ForNode(nodeid, [this, &invs](CNode *pnode) { | |||||
{ | |||||
// Register the query. | |||||
queries.getWriteView()->emplace( | |||||
pnode->GetId(), RequestRecord(GetAdjustedTime(), invs)); | |||||
} | |||||
// Send the query to the node. | |||||
connman->PushMessage( | |||||
pnode, | |||||
CNetMsgMaker(pnode->GetSendVersion()) | |||||
.Make(NetMsgType::AVAPOLL, | |||||
AvalanchePoll(round++, std::move(invs)))); | |||||
return true; | |||||
}); | |||||
} |