Changeset View
Changeset View
Standalone View
Standalone View
src/avalanche.cpp
Show First 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | bool AvalancheProcessor::isAccepted(const CBlockIndex *pindex) const { | ||||
} | } | ||||
return false; | return false; | ||||
} | } | ||||
bool AvalancheProcessor::registerVotes( | bool AvalancheProcessor::registerVotes( | ||||
NodeId nodeid, const AvalancheResponse &response, | NodeId nodeid, const AvalancheResponse &response, | ||||
std::vector<AvalancheBlockUpdate> &updates) { | std::vector<AvalancheBlockUpdate> &updates) { | ||||
{ | |||||
// Save the time at which we can query again. | // Save the time at which we can query again. | ||||
auto cooldown_end = std::chrono::steady_clock::now() + | auto w = peerSet.getWriteView(); | ||||
auto it = w->find(nodeid); | |||||
if (it != w->end()) { | |||||
w->modify(it, [&response](Peer &p) { | |||||
// FIXME: This will override the time even when we received an | |||||
// old stale message. This should check that the message is | |||||
// indeed the most up to date one before updating the time. | |||||
p.nextRequestTime = | |||||
std::chrono::steady_clock::now() + | |||||
std::chrono::milliseconds(response.getCooldown()); | std::chrono::milliseconds(response.getCooldown()); | ||||
}); | |||||
} | |||||
} | |||||
std::vector<CInv> invs; | std::vector<CInv> invs; | ||||
{ | { | ||||
// Check that the query exists. | // Check that the query exists. | ||||
auto w = queries.getWriteView(); | auto w = queries.getWriteView(); | ||||
auto it = w->find(std::make_tuple(nodeid, response.getRound())); | auto it = w->find(std::make_tuple(nodeid, response.getRound())); | ||||
if (it == w.end()) { | if (it == w.end()) { | ||||
▲ Show 20 Lines • Show All 77 Lines • ▼ Show 20 Lines | std::map<CBlockIndex *, AvalancheVote> responseIndex; | ||||
updates.emplace_back(pindex, | updates.emplace_back(pindex, | ||||
vr.isAccepted() | vr.isAccepted() | ||||
? AvalancheBlockUpdate::Status::Finalized | ? AvalancheBlockUpdate::Status::Finalized | ||||
: AvalancheBlockUpdate::Status::Invalid); | : AvalancheBlockUpdate::Status::Invalid); | ||||
w->erase(it); | w->erase(it); | ||||
} | } | ||||
} | } | ||||
// Put the node back in the list of queriable nodes. | |||||
auto w = nodecooldown.getWriteView(); | |||||
w->insert(std::make_pair(cooldown_end, nodeid)); | |||||
return true; | return true; | ||||
} | } | ||||
bool AvalancheProcessor::addPeer(NodeId nodeid, uint32_t score) { | |||||
return peerSet.getWriteView() | |||||
->insert({nodeid, score, std::chrono::steady_clock::now()}) | |||||
.second; | |||||
} | |||||
namespace { | namespace { | ||||
/** | /** | ||||
* Run the avalanche event loop every 10ms. | * Run the avalanche event loop every 10ms. | ||||
*/ | */ | ||||
static int64_t AVALANCHE_TIME_STEP_MILLISECONDS = 10; | static int64_t AVALANCHE_TIME_STEP_MILLISECONDS = 10; | ||||
/** | /** | ||||
* Maximum item that can be polled at once. | * Maximum item that can be polled at once. | ||||
*/ | */ | ||||
▲ Show 20 Lines • Show All 66 Lines • ▼ Show 20 Lines | for (const std::pair<const CBlockIndex *, VoteRecord> &p : | ||||
return invs; | return invs; | ||||
} | } | ||||
} | } | ||||
return invs; | return invs; | ||||
} | } | ||||
NodeId AvalancheProcessor::getSuitableNodeToQuery() { | NodeId AvalancheProcessor::getSuitableNodeToQuery() { | ||||
auto w = nodeids.getWriteView(); | auto r = peerSet.getReadView(); | ||||
bool isCooldownMapEmpty; | auto it = r->get<next_request_time>().begin(); | ||||
if (it == r->get<next_request_time>().end()) { | |||||
{ | return NO_NODE; | ||||
// Recover nodes for which cooldown is over. | |||||
auto now = std::chrono::steady_clock::now(); | |||||
auto wcooldown = nodecooldown.getWriteView(); | |||||
for (auto it = wcooldown.begin(); | |||||
it != wcooldown.end() && it->first < now;) { | |||||
w->insert(it->second); | |||||
wcooldown->erase(it++); | |||||
} | |||||
isCooldownMapEmpty = wcooldown->empty(); | |||||
} | |||||
// If the cooldown map is empty and we don't have any nodes, it's time to | |||||
// fish for new ones. | |||||
// FIXME: Clearly, we need a better way to fish for new nodes, but this is | |||||
// out of scope for now. | |||||
if (isCooldownMapEmpty && w->empty()) { | |||||
auto r = queries.getReadView(); | |||||
// We don't have any candidate node, so let's try to find some. | |||||
connman->ForEachNode([&w, &r](CNode *pnode) { | |||||
// If this node doesn't support avalanche, we remove. | |||||
if (!(pnode->nServices & NODE_AVALANCHE)) { | |||||
return; | |||||
} | |||||
// if we have a request in flight for that node. | |||||
if (r->find(pnode->GetId()) != r.end()) { | |||||
return; | |||||
} | |||||
w->insert(pnode->GetId()); | |||||
}); | |||||
} | } | ||||
// We don't have any suitable candidate. | if (it->nextRequestTime <= std::chrono::steady_clock::now()) { | ||||
if (w->empty()) { | return it->nodeid; | ||||
return -1; | |||||
} | } | ||||
auto it = w.begin(); | return NO_NODE; | ||||
NodeId nodeid = *it; | |||||
w->erase(it); | |||||
return nodeid; | |||||
} | } | ||||
void AvalancheProcessor::runEventLoop() { | void AvalancheProcessor::runEventLoop() { | ||||
std::vector<CInv> invs = getInvsForNextPoll(); | std::vector<CInv> invs = getInvsForNextPoll(); | ||||
if (invs.empty()) { | if (invs.empty()) { | ||||
// If there are no invs to poll, we are done. | // If there are no invs to poll, we are done. | ||||
return; | return; | ||||
} | } | ||||
while (true) { | |||||
NodeId nodeid = getSuitableNodeToQuery(); | NodeId nodeid = getSuitableNodeToQuery(); | ||||
if (nodeid == NO_NODE) { | |||||
return; | |||||
} | |||||
/** | /** | ||||
* If we lost contact to that node, then we remove it from nodeids, but | * If we lost contact to that node, then we remove it from nodeids, but | ||||
* never add the request to queries, which ensures bad nodes get cleaned up | * never add the request to queries, which ensures bad nodes get cleaned | ||||
* over time. | * up over time. | ||||
*/ | */ | ||||
connman->ForNode(nodeid, [this, &invs](CNode *pnode) { | bool hasSent = connman->ForNode(nodeid, [this, &invs](CNode *pnode) { | ||||
uint64_t current_round = round++; | |||||
{ | { | ||||
// Compute the time at which this requests times out. | // Compute the time at which this requests times out. | ||||
auto timeout = | auto timeout = | ||||
std::chrono::steady_clock::now() + std::chrono::seconds(10); | std::chrono::steady_clock::now() + std::chrono::seconds(10); | ||||
// Register the query. | // Register the query. | ||||
queries.getWriteView()->insert( | queries.getWriteView()->insert( | ||||
{pnode->GetId(), round, timeout, invs}); | {pnode->GetId(), current_round, timeout, invs}); | ||||
// Set the timeout. | |||||
auto w = peerSet.getWriteView(); | |||||
auto it = w->find(pnode->GetId()); | |||||
if (it != w->end()) { | |||||
w->modify(it, [&timeout](Peer &p) { | |||||
p.nextRequestTime = timeout; | |||||
}); | |||||
} | |||||
} | } | ||||
// Send the query to the node. | // Send the query to the node. | ||||
connman->PushMessage( | connman->PushMessage( | ||||
pnode, | pnode, | ||||
CNetMsgMaker(pnode->GetSendVersion()) | CNetMsgMaker(pnode->GetSendVersion()) | ||||
.Make(NetMsgType::AVAPOLL, | .Make(NetMsgType::AVAPOLL, | ||||
AvalanchePoll(round++, std::move(invs)))); | AvalanchePoll(current_round, std::move(invs)))); | ||||
return true; | return true; | ||||
}); | }); | ||||
// Success ! | |||||
if (hasSent) { | |||||
return; | |||||
} | |||||
// This node is obsolete, delete it. | |||||
peerSet.getWriteView()->erase(nodeid); | |||||
} | |||||
} | } |