Changeset View
Changeset View
Standalone View
Standalone View
src/avalanche/processor.cpp
// Copyright (c) 2018-2019 The Bitcoin developers | // Copyright (c) 2018-2019 The Bitcoin developers | ||||
// Distributed under the MIT software license, see the accompanying | // Distributed under the MIT software license, see the accompanying | ||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php. | // file COPYING or http://www.opensource.org/licenses/mit-license.php. | ||||
#include <avalanche/processor.h> | #include <avalanche/processor.h> | ||||
#include <avalanche/peermanager.h> | |||||
#include <chain.h> | #include <chain.h> | ||||
#include <netmessagemaker.h> | #include <netmessagemaker.h> | ||||
#include <reverse_iterator.h> | #include <reverse_iterator.h> | ||||
#include <scheduler.h> | #include <scheduler.h> | ||||
#include <util/bitmanip.h> | #include <util/bitmanip.h> | ||||
#include <validation.h> | #include <validation.h> | ||||
#include <chrono> | #include <chrono> | ||||
▲ Show 20 Lines • Show All 111 Lines • ▼ Show 20 Lines | if (IsBlockFinalized(pindex)) { | ||||
return false; | return false; | ||||
} | } | ||||
return true; | return true; | ||||
} | } | ||||
AvalancheProcessor::AvalancheProcessor(CConnman *connmanIn) | AvalancheProcessor::AvalancheProcessor(CConnman *connmanIn) | ||||
: connman(connmanIn), queryTimeoutDuration(AVALANCHE_DEFAULT_QUERY_TIMEOUT), | : connman(connmanIn), queryTimeoutDuration(AVALANCHE_DEFAULT_QUERY_TIMEOUT), | ||||
round(0) { | round(0), peerManager(std::make_unique<PeerManager>()) { | ||||
// Pick a random key for the session. | // Pick a random key for the session. | ||||
sessionKey.MakeNewKey(true); | sessionKey.MakeNewKey(true); | ||||
} | } | ||||
AvalancheProcessor::~AvalancheProcessor() { | AvalancheProcessor::~AvalancheProcessor() { | ||||
stopEventLoop(); | stopEventLoop(); | ||||
} | } | ||||
▲ Show 20 Lines • Show All 82 Lines • ▼ Show 20 Lines | connman->PushMessage( | ||||
TCPAvalancheResponse(std::move(response), sessionKey))); | TCPAvalancheResponse(std::move(response), sessionKey))); | ||||
} | } | ||||
bool AvalancheProcessor::registerVotes( | bool AvalancheProcessor::registerVotes( | ||||
NodeId nodeid, const AvalancheResponse &response, | NodeId nodeid, const AvalancheResponse &response, | ||||
std::vector<AvalancheBlockUpdate> &updates) { | std::vector<AvalancheBlockUpdate> &updates) { | ||||
{ | { | ||||
// Save the time at which we can query again. | // Save the time at which we can query again. | ||||
auto w = peerSet.getWriteView(); | LOCK(cs_peerManager); | ||||
auto it = w->find(nodeid); | |||||
if (it != w->end()) { | // FIXME: This will override the time even when we received an old stale | ||||
w->modify(it, [&response](AvalancheNode &n) { | // message. This should check that the message is indeed the most up to | ||||
// FIXME: This will override the time even when we received an | // date one before updating the time. | ||||
// old stale message. This should check that the message is | peerManager->updateNextRequestTime( | ||||
// indeed the most up to date one before updating the time. | nodeid, std::chrono::steady_clock::now() + | ||||
n.nextRequestTime = | std::chrono::milliseconds(response.getCooldown())); | ||||
std::chrono::steady_clock::now() + | |||||
std::chrono::milliseconds(response.getCooldown()); | |||||
}); | |||||
} | |||||
} | } | ||||
std::vector<CInv> invs; | std::vector<CInv> invs; | ||||
{ | { | ||||
// Check that the query exists. | // Check that the query exists. | ||||
auto w = queries.getWriteView(); | auto w = queries.getWriteView(); | ||||
auto it = w->find(std::make_tuple(nodeid, response.getRound())); | auto it = w->find(std::make_tuple(nodeid, response.getRound())); | ||||
▲ Show 20 Lines • Show All 82 Lines • ▼ Show 20 Lines | std::map<CBlockIndex *, AvalancheVote> responseIndex; | ||||
w->erase(it); | w->erase(it); | ||||
} | } | ||||
} | } | ||||
return true; | return true; | ||||
} | } | ||||
bool AvalancheProcessor::addPeer(NodeId nodeid, int64_t score, CPubKey pubkey) { | bool AvalancheProcessor::addPeer(NodeId nodeid, int64_t score, CPubKey pubkey) { | ||||
return peerSet.getWriteView() | LOCK(cs_peerManager); | ||||
->insert({nodeid, /* peerid is unused here */ 0, std::move(pubkey)}) | |||||
.second; | PeerId p = peerManager->addPeer(score); | ||||
bool inserted = peerManager->addNodeToPeer(p, nodeid, std::move(pubkey)); | |||||
if (!inserted) { | |||||
peerManager->removePeer(p); | |||||
} | |||||
return inserted; | |||||
} | } | ||||
bool AvalancheProcessor::forNode( | bool AvalancheProcessor::forNode( | ||||
NodeId nodeid, std::function<bool(const AvalancheNode &n)> func) const { | NodeId nodeid, std::function<bool(const AvalancheNode &n)> func) const { | ||||
auto r = peerSet.getReadView(); | LOCK(cs_peerManager); | ||||
auto it = r->find(nodeid); | return peerManager->forNode(nodeid, std::move(func)); | ||||
return it != r->end() && func(*it); | |||||
} | } | ||||
bool AvalancheProcessor::startEventLoop(CScheduler &scheduler) { | bool AvalancheProcessor::startEventLoop(CScheduler &scheduler) { | ||||
return eventLoop.startEventLoop( | return eventLoop.startEventLoop( | ||||
scheduler, [this]() { this->runEventLoop(); }, AVALANCHE_TIME_STEP); | scheduler, [this]() { this->runEventLoop(); }, AVALANCHE_TIME_STEP); | ||||
} | } | ||||
bool AvalancheProcessor::stopEventLoop() { | bool AvalancheProcessor::stopEventLoop() { | ||||
Show All 35 Lines | for (const std::pair<const CBlockIndex *const, VoteRecord> &p : | ||||
return invs; | return invs; | ||||
} | } | ||||
} | } | ||||
return invs; | return invs; | ||||
} | } | ||||
NodeId AvalancheProcessor::getSuitableNodeToQuery() { | NodeId AvalancheProcessor::getSuitableNodeToQuery() { | ||||
auto r = peerSet.getReadView(); | LOCK(cs_peerManager); | ||||
auto it = r->get<next_request_time>().begin(); | return peerManager->getSuitableNodeToQuery(); | ||||
if (it == r->get<next_request_time>().end()) { | |||||
return NO_NODE; | |||||
} | |||||
if (it->nextRequestTime <= std::chrono::steady_clock::now()) { | |||||
return it->nodeid; | |||||
} | |||||
return NO_NODE; | |||||
} | } | ||||
void AvalancheProcessor::clearTimedoutRequests() { | void AvalancheProcessor::clearTimedoutRequests() { | ||||
auto now = std::chrono::steady_clock::now(); | auto now = std::chrono::steady_clock::now(); | ||||
std::map<CInv, uint8_t> timedout_items{}; | std::map<CInv, uint8_t> timedout_items{}; | ||||
{ | { | ||||
// Clear expired requests. | // Clear expired requests. | ||||
▲ Show 20 Lines • Show All 67 Lines • ▼ Show 20 Lines | do { | ||||
{ | { | ||||
// Compute the time at which this requests times out. | // Compute the time at which this requests times out. | ||||
auto timeout = | auto timeout = | ||||
std::chrono::steady_clock::now() + queryTimeoutDuration; | std::chrono::steady_clock::now() + queryTimeoutDuration; | ||||
// Register the query. | // Register the query. | ||||
queries.getWriteView()->insert( | queries.getWriteView()->insert( | ||||
{pnode->GetId(), current_round, timeout, invs}); | {pnode->GetId(), current_round, timeout, invs}); | ||||
// Set the timeout. | // Set the timeout. | ||||
auto w = peerSet.getWriteView(); | LOCK(cs_peerManager); | ||||
auto it = w->find(pnode->GetId()); | peerManager->updateNextRequestTime(pnode->GetId(), timeout); | ||||
if (it != w->end()) { | |||||
w->modify(it, [&timeout](AvalancheNode &n) { | |||||
n.nextRequestTime = timeout; | |||||
}); | |||||
} | |||||
} | } | ||||
// Send the query to the node. | // Send the query to the node. | ||||
connman->PushMessage( | connman->PushMessage( | ||||
pnode, | pnode, | ||||
CNetMsgMaker(pnode->GetSendVersion()) | CNetMsgMaker(pnode->GetSendVersion()) | ||||
.Make(NetMsgType::AVAPOLL, | .Make(NetMsgType::AVAPOLL, | ||||
AvalanchePoll(current_round, std::move(invs)))); | AvalanchePoll(current_round, std::move(invs)))); | ||||
return true; | return true; | ||||
}); | }); | ||||
// Success! | // Success! | ||||
if (hasSent) { | if (hasSent) { | ||||
return; | return; | ||||
} | } | ||||
{ | |||||
// This node is obsolete, delete it. | // This node is obsolete, delete it. | ||||
peerSet.getWriteView()->erase(nodeid); | LOCK(cs_peerManager); | ||||
peerManager->removeNode(nodeid); | |||||
} | |||||
// Get next suitable node to try again | // Get next suitable node to try again | ||||
nodeid = getSuitableNodeToQuery(); | nodeid = getSuitableNodeToQuery(); | ||||
} while (nodeid != NO_NODE); | } while (nodeid != NO_NODE); | ||||
} | } |