Changeset View
Changeset View
Standalone View
Standalone View
src/avalanche.cpp
Show All 28 Lines | |||||
bool AvalancheProcessor::isAccepted(const CBlockIndex *pindex) const { | bool AvalancheProcessor::isAccepted(const CBlockIndex *pindex) const { | ||||
if (auto vr = GetRecord(vote_records, pindex)) { | if (auto vr = GetRecord(vote_records, pindex)) { | ||||
return vr->isValid(); | return vr->isValid(); | ||||
} | } | ||||
return false; | return false; | ||||
} | } | ||||
bool AvalancheProcessor::registerVotes(const AvalancheResponse &response, | bool AvalancheProcessor::registerVotes(NodeId nodeid, | ||||
const AvalancheResponse &response, | |||||
std::vector<uint256> &accepted, | std::vector<uint256> &accepted, | ||||
std::vector<uint256> &rejected) { | std::vector<uint256> &rejected) { | ||||
RequestRecord r; | |||||
{ | |||||
// Check that the query exists. | |||||
auto w = queries.getWriteView(); | |||||
auto it = w->find(nodeid); | |||||
if (it == w.end()) { | |||||
// NB: The request may be old, so we don't increase banscore. | |||||
return false; | |||||
} | |||||
r = std::move(it->second); | |||||
w->erase(it); | |||||
} | |||||
// Verify that the request and the vote are consistent. | |||||
const std::vector<CInv> &invs = r.GetInvs(); | |||||
const std::vector<AvalancheVote> &votes = response.GetVotes(); | const std::vector<AvalancheVote> &votes = response.GetVotes(); | ||||
size_t size = invs.size(); | |||||
if (votes.size() != size) { | |||||
// TODO: increase banscore for inconsistent response. | |||||
// NB: This isn't timeout but actually node misbehaving. | |||||
return false; | |||||
} | |||||
for (size_t i = 0; i < size; i++) { | |||||
if (invs[i].hash != votes[i].GetHash()) { | |||||
// TODO: increase banscore for inconsistent response. | |||||
// NB: This isn't timeout but actually node misbehaving. | |||||
return false; | |||||
} | |||||
} | |||||
{ | |||||
// Register votes. | // Register votes. | ||||
auto w = vote_records.getWriteView(); | auto w = vote_records.getWriteView(); | ||||
for (auto &v : votes) { | for (auto &v : votes) { | ||||
auto it = w->find(v.GetHash()); | auto it = w->find(v.GetHash()); | ||||
if (it == w.end()) { | if (it == w.end()) { | ||||
// We are not voting on that item anymore. | // We are not voting on that item anymore. | ||||
continue; | continue; | ||||
} | } | ||||
auto &vr = it->second; | auto &vr = it->second; | ||||
vr.registerVote(v.IsValid()); | vr.registerVote(v.IsValid()); | ||||
if (!vr.hasFinalized()) { | if (!vr.hasFinalized()) { | ||||
// This item has note been finalized, so we have nothing more to do. | // This item has note been finalized, so we have nothing more to | ||||
// do. | |||||
continue; | continue; | ||||
} | } | ||||
// We just finalized a vote. If it is valid, then let the caller know. | // We just finalized a vote. If it is valid, then let the caller | ||||
// Either way, remove the item from the map. | // know. Either way, remove the item from the map. | ||||
if (vr.isValid()) { | if (vr.isValid()) { | ||||
accepted.push_back(v.GetHash()); | accepted.push_back(v.GetHash()); | ||||
} else { | } else { | ||||
rejected.push_back(v.GetHash()); | rejected.push_back(v.GetHash()); | ||||
} | } | ||||
w->erase(it); | w->erase(it); | ||||
} | } | ||||
} | |||||
// Put the node back in the list of queriable nodes. | |||||
auto w = nodeids.getWriteView(); | |||||
w->insert(nodeid); | |||||
return true; | return true; | ||||
} | } | ||||
namespace { | namespace { | ||||
/** | /** | ||||
* Run the avalanche event loop every 10ms. | * Run the avalanche event loop every 10ms. | ||||
*/ | */ | ||||
static int64_t AVALANCHE_TIME_STEP_MILLISECONDS = 10; | static int64_t AVALANCHE_TIME_STEP_MILLISECONDS = 10; | ||||
Show All 10 Lines | if (running) { | ||||
return false; | return false; | ||||
} | } | ||||
running = true; | running = true; | ||||
// Start the event loop. | // Start the event loop. | ||||
scheduler.scheduleEvery( | scheduler.scheduleEvery( | ||||
[this]() -> bool { | [this]() -> bool { | ||||
runEventLoop(); | |||||
if (!stopRequest) { | if (!stopRequest) { | ||||
return true; | return true; | ||||
} | } | ||||
LOCK(cs_running); | LOCK(cs_running); | ||||
running = false; | running = false; | ||||
cond_running.notify_all(); | cond_running.notify_all(); | ||||
Show All 40 Lines | for (const std::pair<uint256, VoteRecord> &p : r) { | ||||
// protocol. | // protocol. | ||||
return invs; | return invs; | ||||
} | } | ||||
} | } | ||||
return invs; | return invs; | ||||
} | } | ||||
NodeId AvalancheProcessor::getSuitableNodeToQuery() { | |||||
auto w = nodeids.getWriteView(); | |||||
if (w->empty()) { | |||||
auto r = queries.getReadView(); | |||||
// We don't have any candidate node, so let's try to find some. | |||||
connman->ForEachNode([&w, &r](CNode *pnode) { | |||||
// If this node doesn't support avalanche, we remove. | |||||
if (!(pnode->nServices & NODE_AVALANCHE)) { | |||||
return; | |||||
} | |||||
// if we have a request in flight for that node. | |||||
if (r->find(pnode->GetId()) != r.end()) { | |||||
return; | |||||
} | |||||
w->insert(pnode->GetId()); | |||||
}); | |||||
} | |||||
// We don't have any suitable candidate. | |||||
if (w->empty()) { | |||||
return -1; | |||||
} | |||||
auto it = w.begin(); | |||||
NodeId nodeid = *it; | |||||
w->erase(it); | |||||
return nodeid; | |||||
} | |||||
void AvalancheProcessor::runEventLoop() { | void AvalancheProcessor::runEventLoop() { | ||||
std::vector<CInv> invs = getInvsForNextPoll(); | std::vector<CInv> invs = getInvsForNextPoll(); | ||||
if (invs.empty()) { | |||||
// If there are no invs to poll, we are done. | |||||
return; | |||||
} | |||||
NodeId nodeid = getSuitableNodeToQuery(); | |||||
/** | |||||
* If we lost contact to that node, then we remove it from nodeids, but | |||||
* never add the request to queries, which ensures bad nodes get cleaned up | |||||
* over time. | |||||
*/ | |||||
connman->ForNode(nodeid, [this, &invs](CNode *pfrom) { | |||||
{ | |||||
// Register the query. | |||||
queries.getWriteView()->emplace( | |||||
pfrom->GetId(), RequestRecord(GetAdjustedTime(), invs)); | |||||
} | |||||
// Send the query to the node. | |||||
connman->PushMessage( | |||||
pfrom, | |||||
CNetMsgMaker(pfrom->GetSendVersion()) | |||||
.Make(NetMsgType::AVA_POLL, AvalanchePoll(std::move(invs)))); | |||||
return true; | |||||
}); | |||||
} | } |