diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp index e0f8f829b..eeaa000aa 100644 --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -1,534 +1,531 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include /** * Run the avalanche event loop every 10ms. */ -static constexpr std::chrono::milliseconds AVALANCHE_TIME_STEP_MILLISECONDS{10}; +static constexpr std::chrono::milliseconds AVALANCHE_TIME_STEP{10}; // Unfortunately, the bitcoind codebase is full of global and we are kinda // forced into it here. std::unique_ptr g_avalanche; bool VoteRecord::registerVote(NodeId nodeid, uint32_t error) { // We just got a new vote, so there is one less inflight request. clearInflightRequest(); // We want to avoid having the same node voting twice in a quorum. if (!addNodeToQuorum(nodeid)) { return false; } /** * The result of the vote is determined from the error code. If the error * code is 0, there is no error and therefore the vote is yes. If there is * an error, we check the most significant bit to decide if the vote is a no * (for instance, the block is invalid) or is the vote inconclusive (for * instance, the queried node does not have the block yet). */ votes = (votes << 1) | (error == 0); consider = (consider << 1) | (int32_t(error) >= 0); /** * We compute the number of yes and/or no votes as follow: * * votes: 1010 * consider: 1100 * * yes votes: 1000 using votes & consider * no votes: 0100 using ~votes & consider */ bool yes = countBits(votes & consider & 0xff) > 6; if (!yes) { bool no = countBits(~votes & consider & 0xff) > 6; if (!no) { // The round is inconclusive. return false; } } // If the round is in agreement with previous rounds, increase confidence. if (isAccepted() == yes) { confidence += 2; return getConfidence() == AVALANCHE_FINALIZATION_SCORE; } // The round changed our state. We reset the confidence. confidence = yes; return true; } bool VoteRecord::addNodeToQuorum(NodeId nodeid) { if (nodeid == NO_NODE) { // Helpful for testing. return true; } // MMIX Linear Congruent Generator. const uint64_t r1 = 6364136223846793005 * uint64_t(nodeid) + 1442695040888963407; // Fibonacci hashing. const uint64_t r2 = 11400714819323198485ull * (nodeid ^ seed); // Combine and extract hash. const uint16_t h = (r1 + r2) >> 48; /** * Check if the node is in the filter. */ for (size_t i = 1; i < nodeFilter.size(); i++) { if (nodeFilter[(successfulVotes + i) % nodeFilter.size()] == h) { return false; } } /** * Add the node which just voted to the filter. */ nodeFilter[successfulVotes % nodeFilter.size()] = h; successfulVotes++; return true; } bool VoteRecord::registerPoll() const { uint8_t count = inflight.load(); while (count < AVALANCHE_MAX_INFLIGHT_POLL) { if (inflight.compare_exchange_weak(count, count + 1)) { return true; } } return false; } static bool IsWorthPolling(const CBlockIndex *pindex) { AssertLockHeld(cs_main); if (pindex->nStatus.isInvalid()) { // No point polling invalid blocks. return false; } if (IsBlockFinalized(pindex)) { // There is no point polling finalized block. return false; } return true; } AvalancheProcessor::AvalancheProcessor(CConnman *connmanIn) - : connman(connmanIn), - queryTimeoutDuration( - AVALANCHE_DEFAULT_QUERY_TIMEOUT_DURATION_MILLISECONDS), + : connman(connmanIn), queryTimeoutDuration(AVALANCHE_DEFAULT_QUERY_TIMEOUT), round(0) { // Pick a random key for the session. sessionKey.MakeNewKey(true); } AvalancheProcessor::~AvalancheProcessor() { stopEventLoop(); } bool AvalancheProcessor::addBlockToReconcile(const CBlockIndex *pindex) { bool isAccepted; { LOCK(cs_main); if (!IsWorthPolling(pindex)) { // There is no point polling this block. return false; } isAccepted = ::ChainActive().Contains(pindex); } return vote_records.getWriteView() ->insert(std::make_pair(pindex, VoteRecord(isAccepted))) .second; } bool AvalancheProcessor::isAccepted(const CBlockIndex *pindex) const { auto r = vote_records.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return false; } return it->second.isAccepted(); } int AvalancheProcessor::getConfidence(const CBlockIndex *pindex) const { auto r = vote_records.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return -1; } return it->second.getConfidence(); } namespace { /** * When using TCP, we need to sign all messages as the transport layer is not * secure. */ class TCPAvalancheResponse { AvalancheResponse response; std::array sig; public: TCPAvalancheResponse(AvalancheResponse responseIn, const CKey &key) : response(std::move(responseIn)) { CHashWriter hasher(SER_GETHASH, 0); hasher << response; const uint256 hash = hasher.GetHash(); // Now let's sign! std::vector vchSig; if (key.SignSchnorr(hash, vchSig)) { // Schnorr sigs are 64 bytes in size. assert(vchSig.size() == 64); std::copy(vchSig.begin(), vchSig.end(), sig.begin()); } else { sig.fill(0); } } // serialization support ADD_SERIALIZE_METHODS; template inline void SerializationOp(Stream &s, Operation ser_action) { READWRITE(response); READWRITE(sig); } }; } // namespace void AvalancheProcessor::sendResponse(CNode *pfrom, AvalancheResponse response) const { connman->PushMessage( pfrom, CNetMsgMaker(pfrom->GetSendVersion()) .Make(NetMsgType::AVARESPONSE, TCPAvalancheResponse(std::move(response), sessionKey))); } bool AvalancheProcessor::registerVotes( NodeId nodeid, const AvalancheResponse &response, std::vector &updates) { { // Save the time at which we can query again. auto w = peerSet.getWriteView(); auto it = w->find(nodeid); if (it != w->end()) { w->modify(it, [&response](Peer &p) { // FIXME: This will override the time even when we received an // old stale message. This should check that the message is // indeed the most up to date one before updating the time. p.nextRequestTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(response.getCooldown()); }); } } std::vector invs; { // Check that the query exists. auto w = queries.getWriteView(); auto it = w->find(std::make_tuple(nodeid, response.getRound())); if (it == w.end()) { // NB: The request may be old, so we don't increase banscore. return false; } invs = std::move(it->invs); w->erase(it); } // Verify that the request and the vote are consistent. const std::vector &votes = response.GetVotes(); size_t size = invs.size(); if (votes.size() != size) { // TODO: increase banscore for inconsistent response. // NB: This isn't timeout but actually node misbehaving. return false; } for (size_t i = 0; i < size; i++) { if (invs[i].hash != votes[i].GetHash()) { // TODO: increase banscore for inconsistent response. // NB: This isn't timeout but actually node misbehaving. return false; } } std::map responseIndex; { LOCK(cs_main); for (const auto &v : votes) { BlockMap::iterator mi = mapBlockIndex.find(BlockHash(v.GetHash())); if (mi == mapBlockIndex.end()) { // This should not happen, but just in case... continue; } CBlockIndex *pindex = mi->second; if (!IsWorthPolling(pindex)) { // There is no point polling this block. continue; } responseIndex.insert(std::make_pair(pindex, v)); } } { // Register votes. auto w = vote_records.getWriteView(); for (const auto &p : responseIndex) { CBlockIndex *pindex = p.first; const AvalancheVote &v = p.second; auto it = w->find(pindex); if (it == w.end()) { // We are not voting on that item anymore. continue; } auto &vr = it->second; if (!vr.registerVote(nodeid, v.GetError())) { // This vote did not provide any extra information, move on. continue; } if (!vr.hasFinalized()) { // This item has note been finalized, so we have nothing more to // do. updates.emplace_back( pindex, vr.isAccepted() ? AvalancheBlockUpdate::Status::Accepted : AvalancheBlockUpdate::Status::Rejected); continue; } // We just finalized a vote. If it is valid, then let the caller // know. Either way, remove the item from the map. updates.emplace_back(pindex, vr.isAccepted() ? AvalancheBlockUpdate::Status::Finalized : AvalancheBlockUpdate::Status::Invalid); w->erase(it); } } return true; } bool AvalancheProcessor::addPeer(NodeId nodeid, int64_t score, CPubKey pubkey) { return peerSet.getWriteView() ->insert({nodeid, score, std::chrono::steady_clock::now(), std::move(pubkey)}) .second; } CPubKey AvalancheProcessor::getPubKey(NodeId nodeid) const { auto r = peerSet.getReadView(); auto it = r->find(nodeid); if (it == r->end()) { return CPubKey(); } return it->pubkey; } bool AvalancheProcessor::startEventLoop(CScheduler &scheduler) { return eventLoop.startEventLoop( - scheduler, [this]() { this->runEventLoop(); }, - AVALANCHE_TIME_STEP_MILLISECONDS); + scheduler, [this]() { this->runEventLoop(); }, AVALANCHE_TIME_STEP); } bool AvalancheProcessor::stopEventLoop() { return eventLoop.stopEventLoop(); } std::vector AvalancheProcessor::getInvsForNextPoll(bool forPoll) { std::vector invs; // First remove all blocks that are not worth polling. { LOCK(cs_main); auto w = vote_records.getWriteView(); for (auto it = w->begin(); it != w->end();) { const CBlockIndex *pindex = it->first; if (!IsWorthPolling(pindex)) { w->erase(it++); } else { ++it; } } } auto r = vote_records.getReadView(); for (const std::pair &p : reverse_iterate(r)) { // Check if we can run poll. const bool shouldPoll = forPoll ? p.second.registerPoll() : p.second.shouldPoll(); if (!shouldPoll) { continue; } // We don't have a decision, we need more votes. invs.emplace_back(MSG_BLOCK, p.first->GetBlockHash()); if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { // Make sure we do not produce more invs than specified by the // protocol. return invs; } } return invs; } NodeId AvalancheProcessor::getSuitableNodeToQuery() { auto r = peerSet.getReadView(); auto it = r->get().begin(); if (it == r->get().end()) { return NO_NODE; } if (it->nextRequestTime <= std::chrono::steady_clock::now()) { return it->nodeid; } return NO_NODE; } void AvalancheProcessor::clearTimedoutRequests() { auto now = std::chrono::steady_clock::now(); std::map timedout_items{}; { // Clear expired requests. auto w = queries.getWriteView(); auto it = w->get().begin(); while (it != w->get().end() && it->timeout < now) { for (const auto &i : it->invs) { timedout_items[i]++; } w->get().erase(it++); } } if (timedout_items.empty()) { return; } // In flight request accounting. for (const auto &p : timedout_items) { const CInv &inv = p.first; assert(inv.type == MSG_BLOCK); CBlockIndex *pindex; { LOCK(cs_main); BlockMap::iterator mi = mapBlockIndex.find(BlockHash(inv.hash)); if (mi == mapBlockIndex.end()) { continue; } pindex = mi->second; } auto w = vote_records.getWriteView(); auto it = w->find(pindex); if (it == w.end()) { continue; } it->second.clearInflightRequest(p.second); } } void AvalancheProcessor::runEventLoop() { // First things first, check if we have requests that timed out and clear // them. clearTimedoutRequests(); // Make sure there is at least one suitable node to query before gathering // invs. NodeId nodeid = getSuitableNodeToQuery(); if (nodeid == NO_NODE) { return; } std::vector invs = getInvsForNextPoll(); if (invs.empty()) { return; } do { /** * If we lost contact to that node, then we remove it from nodeids, but * never add the request to queries, which ensures bad nodes get cleaned * up over time. */ bool hasSent = connman->ForNode(nodeid, [this, &invs](CNode *pnode) { uint64_t current_round = round++; { // Compute the time at which this requests times out. auto timeout = std::chrono::steady_clock::now() + queryTimeoutDuration; // Register the query. queries.getWriteView()->insert( {pnode->GetId(), current_round, timeout, invs}); // Set the timeout. auto w = peerSet.getWriteView(); auto it = w->find(pnode->GetId()); if (it != w->end()) { w->modify(it, [&timeout](Peer &p) { p.nextRequestTime = timeout; }); } } // Send the query to the node. connman->PushMessage( pnode, CNetMsgMaker(pnode->GetSendVersion()) .Make(NetMsgType::AVAPOLL, AvalanchePoll(current_round, std::move(invs)))); return true; }); // Success! if (hasSent) { return; } // This node is obsolete, delete it. peerSet.getWriteView()->erase(nodeid); // Get next suitable node to try again nodeid = getSuitableNodeToQuery(); } while (nodeid != NO_NODE); } diff --git a/src/avalanche/processor.h b/src/avalanche/processor.h index d6d0a98d7..0e4828a4c 100644 --- a/src/avalanche/processor.h +++ b/src/avalanche/processor.h @@ -1,301 +1,301 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_AVALANCHE_PROCESSOR_H #define BITCOIN_AVALANCHE_PROCESSOR_H #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include class Config; class CBlockIndex; class CScheduler; /** * Is avalanche enabled by default. */ static constexpr bool AVALANCHE_DEFAULT_ENABLED = false; /** * Finalization score. */ static constexpr int AVALANCHE_FINALIZATION_SCORE = 128; /** * Maximum item that can be polled at once. */ static constexpr size_t AVALANCHE_MAX_ELEMENT_POLL = 16; /** * Avalanche default cooldown in milliseconds. */ static constexpr size_t AVALANCHE_DEFAULT_COOLDOWN = 100; /** * How long before we consider that a query timed out. */ -static constexpr int AVALANCHE_DEFAULT_QUERY_TIMEOUT_DURATION_MILLISECONDS = - 10000; +static constexpr std::chrono::milliseconds AVALANCHE_DEFAULT_QUERY_TIMEOUT{ + 10000}; /** * How many inflight requests can exist for one item. */ static constexpr int AVALANCHE_MAX_INFLIGHT_POLL = 10; /** * Special NodeId that represent no node. */ static constexpr NodeId NO_NODE = -1; /** * Vote history. */ struct VoteRecord { private: // confidence's LSB bit is the result. Higher bits are actual confidence // score. uint16_t confidence = 0; // Historical record of votes. uint8_t votes = 0; // Each bit indicate if the vote is to be considered. uint8_t consider = 0; // How many in flight requests exists for this element. mutable std::atomic inflight{0}; // Seed for pseudorandom operations. const uint32_t seed = 0; // Track how many successful votes occured. uint32_t successfulVotes = 0; // Track the nodes which are part of the quorum. std::array nodeFilter{{0, 0, 0, 0, 0, 0, 0, 0}}; public: explicit VoteRecord(bool accepted) : confidence(accepted) {} /** * Copy semantic */ VoteRecord(const VoteRecord &other) : confidence(other.confidence), votes(other.votes), consider(other.consider), inflight(other.inflight.load()), successfulVotes(other.successfulVotes), nodeFilter(other.nodeFilter) { } /** * Vote accounting facilities. */ bool isAccepted() const { return confidence & 0x01; } uint16_t getConfidence() const { return confidence >> 1; } bool hasFinalized() const { return getConfidence() >= AVALANCHE_FINALIZATION_SCORE; } /** * Register a new vote for an item and update confidence accordingly. * Returns true if the acceptance or finalization state changed. */ bool registerVote(NodeId nodeid, uint32_t error); /** * Register that a request is being made regarding that item. * The method is made const so that it can be accessed via a read only view * of vote_records. It's not a problem as it is made thread safe. */ bool registerPoll() const; /** * Return if this item is in condition to be polled at the moment. */ bool shouldPoll() const { return inflight < AVALANCHE_MAX_INFLIGHT_POLL; } /** * Clear `count` inflight requests. */ void clearInflightRequest(uint8_t count = 1) { inflight -= count; } private: /** * Add the node to the quorum. * Returns true if the node was added, false if the node already was in the * quorum. */ bool addNodeToQuorum(NodeId nodeid); }; class AvalancheBlockUpdate { union { CBlockIndex *pindex; uintptr_t raw; }; static const size_t STATUS_BITS = 2; static const uintptr_t MASK = (1 << STATUS_BITS) - 1; static_assert( alignof(CBlockIndex) >= (1 << STATUS_BITS), "CBlockIndex alignement doesn't allow for Status to be stored."); public: enum Status : uint8_t { Invalid, Rejected, Accepted, Finalized, }; AvalancheBlockUpdate(CBlockIndex *pindexIn, Status statusIn) : pindex(pindexIn) { raw |= statusIn; } Status getStatus() const { return Status(raw & MASK); } CBlockIndex *getBlockIndex() { return reinterpret_cast(raw & ~MASK); } const CBlockIndex *getBlockIndex() const { return const_cast(this)->getBlockIndex(); } }; using BlockVoteMap = std::map; struct next_request_time {}; struct query_timeout {}; class AvalancheProcessor { private: CConnman *connman; std::chrono::milliseconds queryTimeoutDuration; /** * Blocks to run avalanche on. */ RWCollection vote_records; /** * Keep track of peers and queries sent. */ std::atomic round; using TimePoint = std::chrono::time_point; struct Peer { NodeId nodeid; int64_t score; TimePoint nextRequestTime; CPubKey pubkey; }; using PeerSet = boost::multi_index_container< Peer, boost::multi_index::indexed_by< // index by nodeid boost::multi_index::hashed_unique< boost::multi_index::member>, // sorted by nextRequestTime boost::multi_index::ordered_non_unique< boost::multi_index::tag, boost::multi_index::member>>>; RWCollection peerSet; struct Query { NodeId nodeid; uint64_t round; TimePoint timeout; /** * We declare this as mutable so it can be modified in the multi_index. * This is ok because we do not use this field to index in anyway. * * /!\ Do not use any mutable field as index. */ mutable std::vector invs; }; using QuerySet = boost::multi_index_container< Query, boost::multi_index::indexed_by< // index by nodeid/round boost::multi_index::ordered_unique< boost::multi_index::composite_key< Query, boost::multi_index::member, boost::multi_index::member>>, // sorted by timeout boost::multi_index::ordered_non_unique< boost::multi_index::tag, boost::multi_index::member>>>; RWCollection queries; /** The key used to sign responses. */ CKey sessionKey; /** Event loop machinery. */ EventLoop eventLoop; public: explicit AvalancheProcessor(CConnman *connmanIn); ~AvalancheProcessor(); void setQueryTimeoutDuration(std::chrono::milliseconds d) { queryTimeoutDuration = d; } bool addBlockToReconcile(const CBlockIndex *pindex); bool isAccepted(const CBlockIndex *pindex) const; int getConfidence(const CBlockIndex *pindex) const; // TDOD: Refactor the API to remove the dependency on avalanche/protocol.h void sendResponse(CNode *pfrom, AvalancheResponse response) const; bool registerVotes(NodeId nodeid, const AvalancheResponse &response, std::vector &updates); bool addPeer(NodeId nodeid, int64_t score, CPubKey pubkey); CPubKey getPubKey(NodeId nodeid) const; CPubKey getSessionPubKey() const { return sessionKey.GetPubKey(); } bool startEventLoop(CScheduler &scheduler); bool stopEventLoop(); private: void runEventLoop(); void clearTimedoutRequests(); std::vector getInvsForNextPoll(bool forPoll = true); NodeId getSuitableNodeToQuery(); friend struct AvalancheTest; }; /** * Global avalanche instance. */ extern std::unique_ptr g_avalanche; #endif // BITCOIN_AVALANCHE_PROCESSOR_H