Changeset View
Changeset View
Standalone View
Standalone View
src/avalanche.cpp
Show All 28 Lines | |||||
bool AvalancheProcessor::isAccepted(const CBlockIndex *pindex) const { | bool AvalancheProcessor::isAccepted(const CBlockIndex *pindex) const { | ||||
if (auto vr = GetRecord(vote_records, pindex)) { | if (auto vr = GetRecord(vote_records, pindex)) { | ||||
return vr->isValid(); | return vr->isValid(); | ||||
} | } | ||||
return false; | return false; | ||||
} | } | ||||
bool AvalancheProcessor::registerVotes(const AvalancheResponse &response, | bool AvalancheProcessor::registerVotes(NodeId nodeid, | ||||
const AvalancheResponse &response, | |||||
std::vector<uint256> &finalized) { | std::vector<uint256> &finalized) { | ||||
RequestReccord r{0, {}}; | |||||
{ | |||||
// Check that the query exists. | |||||
auto w = queries.getWriteView(); | |||||
auto it = w->find(nodeid); | |||||
if (it == w.end()) { | |||||
// NB: The request may be old, so we don't increase banscore. | |||||
return false; | |||||
} | |||||
r = std::move(it->second); | |||||
w->erase(it); | |||||
} | |||||
// Verify that the request and the vote are consistent. | |||||
const std::vector<CInv> &invs = r.GetInvs(); | |||||
const std::vector<AvalancheVote> &votes = response.GetVotes(); | const std::vector<AvalancheVote> &votes = response.GetVotes(); | ||||
size_t size = invs.size(); | |||||
if (votes.size() != size) { | |||||
// TODO: increase banscore for inconsistent response. | |||||
// NB: This isn't timeout but actually node misbehaving. | |||||
return false; | |||||
} | |||||
for (size_t i = 0; i < size; i++) { | |||||
if (invs[i].hash != votes[i].GetHash()) { | |||||
// TODO: increase banscore for inconsistent response. | |||||
// NB: This isn't timeout but actually node misbehaving. | |||||
return false; | |||||
} | |||||
} | |||||
{ | |||||
// Register votes. | // Register votes. | ||||
auto w = vote_records.getWriteView(); | auto w = vote_records.getWriteView(); | ||||
for (auto &v : votes) { | for (auto &v : votes) { | ||||
auto &r = w[v.GetHash()]; | auto &vr = w[v.GetHash()]; | ||||
jasonbcox: Needs check for vr == w.end() for sanity | |||||
deadalnixAuthorUnsubmitted Done Inline ActionsBracket create the element if missing, so it's never end(). But this whole thing is wong anyways. deadalnix: Bracket create the element if missing, so it's never end(). But this whole thing is wong… | |||||
r.registerVote(v.IsValid()); | vr.registerVote(v.IsValid()); | ||||
if (r.hasFinalized()) { | if (vr.hasFinalized()) { | ||||
finalized.push_back(v.GetHash()); | finalized.push_back(v.GetHash()); | ||||
} | } | ||||
} | } | ||||
// Clear finalized results. | |||||
for (auto &h : finalized) { | for (auto &h : finalized) { | ||||
w->erase(h); | w->erase(h); | ||||
} | } | ||||
} | |||||
// Put the node back in the list of queriable nodes. | |||||
auto w = nodeids.getWriteView(); | |||||
w->insert(nodeid); | |||||
return true; | return true; | ||||
} | } | ||||
namespace { | namespace { | ||||
/** | /** | ||||
* Run the avalanche event loop every 20ms. | * Run the avalanche event loop every 20ms. | ||||
*/ | */ | ||||
static int64_t AVALANCHE_TIME_STEP_MILISECONDS = 10; | static int64_t AVALANCHE_TIME_STEP_MILISECONDS = 10; | ||||
Show All 10 Lines | if (running) { | ||||
return false; | return false; | ||||
} | } | ||||
running = true; | running = true; | ||||
// Start the event loop. | // Start the event loop. | ||||
scheduler.scheduleEvery( | scheduler.scheduleEvery( | ||||
[this]() -> bool { | [this]() -> bool { | ||||
runEventLoop(); | |||||
if (!stopRequest) { | if (!stopRequest) { | ||||
return true; | return true; | ||||
} | } | ||||
LOCK(cs_running); | LOCK(cs_running); | ||||
running = false; | running = false; | ||||
cond_running.notify_all(); | cond_running.notify_all(); | ||||
Show All 40 Lines | for (const std::pair<uint256, VoteRecord> &p : r) { | ||||
// protocol. | // protocol. | ||||
return invs; | return invs; | ||||
} | } | ||||
} | } | ||||
return invs; | return invs; | ||||
} | } | ||||
NodeId AvalancheProcessor::getSuitableNodeToQuery() { | |||||
auto w = nodeids.getWriteView(); | |||||
if (w->empty()) { | |||||
auto r = queries.getReadView(); | |||||
// We don't have any candidate node, so let's try to find some. | |||||
connman->ForEachNode([&w, &r](CNode *pnode) { | |||||
jasonbcoxUnsubmitted Not Done Inline ActionsDoes there need to be fuzzing so that the same node isn't selected every time? jasonbcox: Does there need to be fuzzing so that the same node isn't selected every time? | |||||
deadalnixAuthorUnsubmitted Done Inline ActionsThere is no node selection for now, so that wouldn't fuzz anything :) deadalnix: There is no node selection for now, so that wouldn't fuzz anything :) | |||||
// If this node doesn't support avalanche, we remove. | |||||
if (!(pnode->nServices & NODE_AVALANCHE)) { | |||||
return; | |||||
} | |||||
// if we have a request in flight for that node. | |||||
if (r->find(pnode->GetId()) != r.end()) { | |||||
return; | |||||
} | |||||
w->insert(pnode->GetId()); | |||||
}); | |||||
} | |||||
// We don't have any suitable candidate. | |||||
if (w->empty()) { | |||||
return -1; | |||||
} | |||||
auto it = w.begin(); | |||||
NodeId nodeid = *it; | |||||
w->erase(it); | |||||
return nodeid; | |||||
} | |||||
void AvalancheProcessor::runEventLoop() { | void AvalancheProcessor::runEventLoop() { | ||||
std::vector<CInv> invs = getInvsForNextPoll(); | std::vector<CInv> invs = getInvsForNextPoll(); | ||||
if (invs.empty()) { | |||||
// If there are no invs to poll, we are done. | |||||
return; | |||||
} | |||||
NodeId nodeid = getSuitableNodeToQuery(); | |||||
/** | |||||
* If we lost contact to that node, then we remive it from nodeids, but | |||||
jasonbcoxUnsubmitted Not Done Inline Actionsremive -> remove jasonbcox: remive -> remove | |||||
* enevr add the request to queries, which ensures bad nodes get cleaned up | |||||
jasonbcoxUnsubmitted Not Done Inline Actionsenevr -> never jasonbcox: enevr -> never | |||||
* over time. | |||||
*/ | |||||
connman->ForNode(nodeid, [this, &invs](CNode *pfrom) { | |||||
{ | |||||
// Register the query. | |||||
queries.getWriteView()->emplace( | |||||
pfrom->GetId(), RequestReccord(GetAdjustedTime(), invs)); | |||||
} | |||||
// Send the query to the node. | |||||
connman->PushMessage( | |||||
pfrom, | |||||
CNetMsgMaker(pfrom->GetSendVersion()) | |||||
.Make(NetMsgType::AVA_POLL, AvalanchePoll(std::move(invs)))); | |||||
return true; | |||||
}); | |||||
} | } |
Needs check for vr == w.end() for sanity