Changeset View
Changeset View
Standalone View
Standalone View
src/avalanche/processor.cpp
Show First 20 Lines • Show All 589 Lines • ▼ Show 20 Lines | bool Processor::registerVotes(NodeId nodeid, const Response &response, | ||||
return true; | return true; | ||||
} | } | ||||
CPubKey Processor::getSessionPubKey() const { | CPubKey Processor::getSessionPubKey() const { | ||||
return sessionKey.GetPubKey(); | return sessionKey.GetPubKey(); | ||||
} | } | ||||
uint256 Processor::buildLocalSighash(CNode *pfrom) const { | |||||
CHashWriter hasher(SER_GETHASH, 0); | |||||
hasher << peerData->delegation.getId(); | |||||
hasher << pfrom->GetLocalNonce(); | |||||
hasher << pfrom->nRemoteHostNonce; | |||||
hasher << pfrom->GetLocalExtraEntropy(); | |||||
hasher << pfrom->nRemoteExtraEntropy; | |||||
return hasher.GetHash(); | |||||
} | |||||
bool Processor::sendHello(CNode *pfrom) const { | bool Processor::sendHello(CNode *pfrom) const { | ||||
if (!peerData) { | if (!peerData) { | ||||
// We do not have a delegation to advertise. | // We do not have a delegation to advertise. | ||||
return false; | return false; | ||||
} | } | ||||
// Now let's sign! | // Now let's sign! | ||||
SchnorrSig sig; | SchnorrSig sig; | ||||
Show All 23 Lines | bool Processor::startEventLoop(CScheduler &scheduler) { | ||||
return eventLoop.startEventLoop( | return eventLoop.startEventLoop( | ||||
scheduler, [this]() { this->runEventLoop(); }, AVALANCHE_TIME_STEP); | scheduler, [this]() { this->runEventLoop(); }, AVALANCHE_TIME_STEP); | ||||
} | } | ||||
bool Processor::stopEventLoop() { | bool Processor::stopEventLoop() { | ||||
return eventLoop.stopEventLoop(); | return eventLoop.stopEventLoop(); | ||||
} | } | ||||
std::vector<CInv> Processor::getInvsForNextPoll(bool forPoll) { | void Processor::avaproofsSent(NodeId nodeid) { | ||||
std::vector<CInv> invs; | if (::ChainstateActive().IsInitialBlockDownload()) { | ||||
// Before IBD is complete there is no way to make sure a proof is valid | |||||
auto extractVoteRecordsToInvs = [&](const auto &itemVoteRecordRange, | // or not, e.g. it can be spent in a block we don't know yet. In order | ||||
auto buildInvFromVoteItem) { | // to increase confidence that our proof set is similar to other nodes | ||||
for (const auto &[item, voteRecord] : itemVoteRecordRange) { | // on the network, the messages received during IBD are not accounted. | ||||
if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { | return; | ||||
// Make sure we do not produce more invs than specified by the | |||||
// protocol. | |||||
return true; | |||||
} | } | ||||
const bool shouldPoll = | LOCK(cs_peerManager); | ||||
forPoll ? voteRecord.registerPoll() : voteRecord.shouldPoll(); | if (peerManager->latchAvaproofsSent(nodeid)) { | ||||
avaproofsNodeCounter++; | |||||
if (!shouldPoll) { | |||||
continue; | |||||
} | } | ||||
invs.emplace_back(buildInvFromVoteItem(item)); | |||||
} | } | ||||
return invs.size() >= AVALANCHE_MAX_ELEMENT_POLL; | /* | ||||
}; | * Returns a bool indicating whether we have a usable Avalanche quorum enabling | ||||
* us to take decisions based on polls. | |||||
if (extractVoteRecordsToInvs(proofVoteRecords.getReadView(), | */ | ||||
[](const ProofRef &proof) { | bool Processor::isQuorumEstablished() { | ||||
return CInv(MSG_AVA_PROOF, proof->getId()); | if (quorumIsEstablished) { | ||||
})) { | return true; | ||||
// The inventory vector is full, we're done | |||||
return invs; | |||||
} | |||||
// First remove all blocks that are not worth polling. | |||||
{ | |||||
LOCK(cs_main); | |||||
auto w = blockVoteRecords.getWriteView(); | |||||
for (auto it = w->begin(); it != w->end();) { | |||||
const CBlockIndex *pindex = it->first; | |||||
if (!IsWorthPolling(pindex)) { | |||||
w->erase(it++); | |||||
} else { | |||||
++it; | |||||
} | |||||
} | |||||
} | } | ||||
auto r = blockVoteRecords.getReadView(); | // Don't do Avalanche while node is IBD'ing | ||||
extractVoteRecordsToInvs(reverse_iterate(r), [](const CBlockIndex *pindex) { | if (::ChainstateActive().IsInitialBlockDownload()) { | ||||
return CInv(MSG_BLOCK, pindex->GetBlockHash()); | return false; | ||||
}); | |||||
return invs; | |||||
} | } | ||||
NodeId Processor::getSuitableNodeToQuery() { | if (avaproofsNodeCounter < minAvaproofsNodeCount) { | ||||
LOCK(cs_peerManager); | return false; | ||||
return peerManager->selectNode(); | |||||
} | } | ||||
void Processor::clearTimedoutRequests() { | auto localProof = getLocalProof(); | ||||
auto now = std::chrono::steady_clock::now(); | |||||
std::map<CInv, uint8_t> timedout_items{}; | |||||
// Get the registered proof score and registered score we have nodes for | |||||
uint32_t totalPeersScore; | |||||
uint32_t connectedPeersScore; | |||||
{ | { | ||||
// Clear expired requests. | LOCK(cs_peerManager); | ||||
auto w = queries.getWriteView(); | totalPeersScore = peerManager->getTotalPeersScore(); | ||||
auto it = w->get<query_timeout>().begin(); | connectedPeersScore = peerManager->getConnectedPeersScore(); | ||||
while (it != w->get<query_timeout>().end() && it->timeout < now) { | |||||
for (const auto &i : it->invs) { | |||||
timedout_items[i]++; | |||||
} | |||||
w->get<query_timeout>().erase(it++); | // Consider that we are always connected to our proof, even if we are | ||||
} | // the single node using that proof. | ||||
if (localProof && | |||||
peerManager->forPeer(localProof->getId(), [](const Peer &peer) { | |||||
return peer.node_count == 0; | |||||
})) { | |||||
connectedPeersScore += localProof->getScore(); | |||||
} | } | ||||
if (timedout_items.empty()) { | |||||
return; | |||||
} | } | ||||
auto clearInflightRequest = [&](auto &voteRecords, const auto &voteItem, | // Ensure enough is being staked overall | ||||
uint8_t count) { | if (totalPeersScore < minQuorumScore) { | ||||
if (!voteItem) { | |||||
return false; | return false; | ||||
} | } | ||||
auto voteRecordsWriteView = voteRecords.getWriteView(); | // Ensure we have connected score for enough of the overall score | ||||
auto it = voteRecordsWriteView->find(voteItem); | uint32_t minConnectedScore = | ||||
if (it == voteRecordsWriteView.end()) { | std::round(double(totalPeersScore) * minQuorumConnectedScoreRatio); | ||||
if (connectedPeersScore < minConnectedScore) { | |||||
return false; | return false; | ||||
} | } | ||||
it->second.clearInflightRequest(count); | quorumIsEstablished = true; | ||||
return true; | return true; | ||||
}; | |||||
// In flight request accounting. | |||||
for (const auto &p : timedout_items) { | |||||
const CInv &inv = p.first; | |||||
if (inv.IsMsgBlk()) { | |||||
const CBlockIndex *pindex = WITH_LOCK( | |||||
cs_main, return g_chainman.m_blockman.LookupBlockIndex( | |||||
BlockHash(inv.hash))); | |||||
if (!clearInflightRequest(blockVoteRecords, pindex, p.second)) { | |||||
continue; | |||||
} | } | ||||
} | |||||
if (inv.IsMsgProof()) { | |||||
const ProofRef proof = | |||||
WITH_LOCK(cs_peerManager, | |||||
return peerManager->getProof(ProofId(inv.hash))); | |||||
if (!clearInflightRequest(proofVoteRecords, proof, p.second)) { | void Processor::FinalizeNode(const Config &config, const CNode &node, | ||||
continue; | bool &update_connection_time) { | ||||
} | WITH_LOCK(cs_peerManager, peerManager->removeNode(node.GetId())); | ||||
} | |||||
} | |||||
} | } | ||||
void Processor::runEventLoop() { | void Processor::runEventLoop() { | ||||
// Don't poll if quorum hasn't been established yet | // Don't poll if quorum hasn't been established yet | ||||
if (!isQuorumEstablished()) { | if (!isQuorumEstablished()) { | ||||
return; | return; | ||||
} | } | ||||
▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | do { | ||||
peerManager->removeNode(nodeid); | peerManager->removeNode(nodeid); | ||||
} | } | ||||
// Get next suitable node to try again | // Get next suitable node to try again | ||||
nodeid = getSuitableNodeToQuery(); | nodeid = getSuitableNodeToQuery(); | ||||
} while (nodeid != NO_NODE); | } while (nodeid != NO_NODE); | ||||
} | } | ||||
void Processor::avaproofsSent(NodeId nodeid) { | void Processor::clearTimedoutRequests() { | ||||
if (::ChainstateActive().IsInitialBlockDownload()) { | auto now = std::chrono::steady_clock::now(); | ||||
// Before IBD is complete there is no way to make sure a proof is valid | std::map<CInv, uint8_t> timedout_items{}; | ||||
// or not, e.g. it can be spent in a block we don't know yet. In order | |||||
// to increase confidence that our proof set is similar to other nodes | { | ||||
// on the network, the messages received during IBD are not accounted. | // Clear expired requests. | ||||
return; | auto w = queries.getWriteView(); | ||||
auto it = w->get<query_timeout>().begin(); | |||||
while (it != w->get<query_timeout>().end() && it->timeout < now) { | |||||
for (const auto &i : it->invs) { | |||||
timedout_items[i]++; | |||||
} | } | ||||
LOCK(cs_peerManager); | w->get<query_timeout>().erase(it++); | ||||
if (peerManager->latchAvaproofsSent(nodeid)) { | |||||
avaproofsNodeCounter++; | |||||
} | } | ||||
} | } | ||||
/* | if (timedout_items.empty()) { | ||||
* Returns a bool indicating whether we have a usable Avalanche quorum enabling | return; | ||||
* us to take decisions based on polls. | |||||
*/ | |||||
bool Processor::isQuorumEstablished() { | |||||
if (quorumIsEstablished) { | |||||
return true; | |||||
} | } | ||||
// Don't do Avalanche while node is IBD'ing | auto clearInflightRequest = [&](auto &voteRecords, const auto &voteItem, | ||||
if (::ChainstateActive().IsInitialBlockDownload()) { | uint8_t count) { | ||||
if (!voteItem) { | |||||
return false; | return false; | ||||
} | } | ||||
if (avaproofsNodeCounter < minAvaproofsNodeCount) { | auto voteRecordsWriteView = voteRecords.getWriteView(); | ||||
auto it = voteRecordsWriteView->find(voteItem); | |||||
if (it == voteRecordsWriteView.end()) { | |||||
return false; | return false; | ||||
} | } | ||||
auto localProof = getLocalProof(); | it->second.clearInflightRequest(count); | ||||
// Get the registered proof score and registered score we have nodes for | return true; | ||||
uint32_t totalPeersScore; | }; | ||||
uint32_t connectedPeersScore; | |||||
{ | |||||
LOCK(cs_peerManager); | |||||
totalPeersScore = peerManager->getTotalPeersScore(); | |||||
connectedPeersScore = peerManager->getConnectedPeersScore(); | |||||
// Consider that we are always connected to our proof, even if we are | // In flight request accounting. | ||||
// the single node using that proof. | for (const auto &p : timedout_items) { | ||||
if (localProof && | const CInv &inv = p.first; | ||||
peerManager->forPeer(localProof->getId(), [](const Peer &peer) { | if (inv.IsMsgBlk()) { | ||||
return peer.node_count == 0; | const CBlockIndex *pindex = WITH_LOCK( | ||||
})) { | cs_main, return g_chainman.m_blockman.LookupBlockIndex( | ||||
connectedPeersScore += localProof->getScore(); | BlockHash(inv.hash))); | ||||
if (!clearInflightRequest(blockVoteRecords, pindex, p.second)) { | |||||
continue; | |||||
} | } | ||||
} | } | ||||
// Ensure enough is being staked overall | if (inv.IsMsgProof()) { | ||||
if (totalPeersScore < minQuorumScore) { | const ProofRef proof = | ||||
return false; | WITH_LOCK(cs_peerManager, | ||||
} | return peerManager->getProof(ProofId(inv.hash))); | ||||
// Ensure we have connected score for enough of the overall score | if (!clearInflightRequest(proofVoteRecords, proof, p.second)) { | ||||
uint32_t minConnectedScore = | continue; | ||||
std::round(double(totalPeersScore) * minQuorumConnectedScoreRatio); | } | ||||
if (connectedPeersScore < minConnectedScore) { | } | ||||
return false; | } | ||||
} | } | ||||
quorumIsEstablished = true; | std::vector<CInv> Processor::getInvsForNextPoll(bool forPoll) { | ||||
std::vector<CInv> invs; | |||||
auto extractVoteRecordsToInvs = [&](const auto &itemVoteRecordRange, | |||||
auto buildInvFromVoteItem) { | |||||
for (const auto &[item, voteRecord] : itemVoteRecordRange) { | |||||
if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { | |||||
// Make sure we do not produce more invs than specified by the | |||||
// protocol. | |||||
return true; | return true; | ||||
} | } | ||||
void Processor::FinalizeNode(const Config &config, const CNode &node, | const bool shouldPoll = | ||||
bool &update_connection_time) { | forPoll ? voteRecord.registerPoll() : voteRecord.shouldPoll(); | ||||
WITH_LOCK(cs_peerManager, peerManager->removeNode(node.GetId())); | |||||
if (!shouldPoll) { | |||||
continue; | |||||
} | |||||
invs.emplace_back(buildInvFromVoteItem(item)); | |||||
} | |||||
return invs.size() >= AVALANCHE_MAX_ELEMENT_POLL; | |||||
}; | |||||
if (extractVoteRecordsToInvs(proofVoteRecords.getReadView(), | |||||
[](const ProofRef &proof) { | |||||
return CInv(MSG_AVA_PROOF, proof->getId()); | |||||
})) { | |||||
// The inventory vector is full, we're done | |||||
return invs; | |||||
} | |||||
// First remove all blocks that are not worth polling. | |||||
{ | |||||
LOCK(cs_main); | |||||
auto w = blockVoteRecords.getWriteView(); | |||||
for (auto it = w->begin(); it != w->end();) { | |||||
const CBlockIndex *pindex = it->first; | |||||
if (!IsWorthPolling(pindex)) { | |||||
w->erase(it++); | |||||
} else { | |||||
++it; | |||||
} | |||||
} | |||||
} | |||||
auto r = blockVoteRecords.getReadView(); | |||||
extractVoteRecordsToInvs(reverse_iterate(r), [](const CBlockIndex *pindex) { | |||||
return CInv(MSG_BLOCK, pindex->GetBlockHash()); | |||||
}); | |||||
return invs; | |||||
} | |||||
NodeId Processor::getSuitableNodeToQuery() { | |||||
LOCK(cs_peerManager); | |||||
return peerManager->selectNode(); | |||||
} | |||||
uint256 Processor::buildLocalSighash(CNode *pfrom) const { | |||||
CHashWriter hasher(SER_GETHASH, 0); | |||||
hasher << peerData->delegation.getId(); | |||||
hasher << pfrom->GetLocalNonce(); | |||||
hasher << pfrom->nRemoteHostNonce; | |||||
hasher << pfrom->GetLocalExtraEntropy(); | |||||
hasher << pfrom->nRemoteExtraEntropy; | |||||
return hasher.GetHash(); | |||||
} | } | ||||
} // namespace avalanche | } // namespace avalanche |