Page MenuHomePhabricator

No OneTemporary

diff --git a/src/avalanche.cpp b/src/avalanche.cpp
index 7ac03a046d..8b74b01b26 100644
--- a/src/avalanche.cpp
+++ b/src/avalanche.cpp
@@ -1,336 +1,340 @@
// Copyright (c) 2018 The Bitcoin developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "avalanche.h"
#include "chain.h"
#include "netmessagemaker.h"
#include "scheduler.h"
#include "validation.h"
#include <boost/range/adaptor/reversed.hpp>
+#include <tuple>
+
static bool IsWorthPolling(const CBlockIndex *pindex) {
AssertLockHeld(cs_main);
if (pindex->nStatus.isInvalid()) {
// No point polling invalid blocks.
return false;
}
if (IsBlockFinalized(pindex)) {
// There is no point polling finalized block.
return false;
}
return true;
}
bool AvalancheProcessor::addBlockToReconcile(const CBlockIndex *pindex) {
bool isAccepted;
{
LOCK(cs_main);
if (!IsWorthPolling(pindex)) {
// There is no point polling this block.
return false;
}
isAccepted = chainActive.Contains(pindex);
}
return vote_records.getWriteView()
->insert(std::make_pair(pindex, VoteRecord(isAccepted)))
.second;
}
static const VoteRecord *
GetRecord(const RWCollection<BlockVoteMap> &vote_records,
const CBlockIndex *pindex) {
auto r = vote_records.getReadView();
auto it = r->find(pindex);
if (it == r.end()) {
return nullptr;
}
return &it->second;
}
bool AvalancheProcessor::isAccepted(const CBlockIndex *pindex) const {
if (auto vr = GetRecord(vote_records, pindex)) {
return vr->isAccepted();
}
return false;
}
bool AvalancheProcessor::registerVotes(
NodeId nodeid, const AvalancheResponse &response,
std::vector<AvalancheBlockUpdate> &updates) {
// Save the time at which we can query again.
auto cooldown_end = std::chrono::steady_clock::now() +
std::chrono::milliseconds(response.getCooldown());
- RequestRecord r;
+ std::vector<CInv> invs;
{
// Check that the query exists.
auto w = queries.getWriteView();
- auto it = w->find(nodeid);
+ auto it = w->find(std::make_tuple(nodeid, response.getRound()));
if (it == w.end()) {
// NB: The request may be old, so we don't increase banscore.
return false;
}
- r = std::move(it->second);
+ invs = std::move(it->invs);
w->erase(it);
}
// Verify that the request and the vote are consistent.
- const std::vector<CInv> &invs = r.GetInvs();
const std::vector<AvalancheVote> &votes = response.GetVotes();
size_t size = invs.size();
if (votes.size() != size) {
// TODO: increase banscore for inconsistent response.
// NB: This isn't timeout but actually node misbehaving.
return false;
}
for (size_t i = 0; i < size; i++) {
if (invs[i].hash != votes[i].GetHash()) {
// TODO: increase banscore for inconsistent response.
// NB: This isn't timeout but actually node misbehaving.
return false;
}
}
std::map<CBlockIndex *, AvalancheVote> responseIndex;
{
LOCK(cs_main);
for (auto &v : votes) {
BlockMap::iterator mi = mapBlockIndex.find(v.GetHash());
if (mi == mapBlockIndex.end()) {
// This should not happen, but just in case...
continue;
}
CBlockIndex *pindex = mi->second;
if (!IsWorthPolling(pindex)) {
// There is no point polling this block.
continue;
}
responseIndex.insert(std::make_pair(pindex, v));
}
}
{
// Register votes.
auto w = vote_records.getWriteView();
for (auto &p : responseIndex) {
CBlockIndex *pindex = p.first;
const AvalancheVote &v = p.second;
auto it = w->find(pindex);
if (it == w.end()) {
// We are not voting on that item anymore.
continue;
}
auto &vr = it->second;
if (!vr.registerVote(v.IsValid())) {
// This vote did not provide any extra information, move on.
continue;
}
if (!vr.hasFinalized()) {
// This item has note been finalized, so we have nothing more to
// do.
updates.emplace_back(
pindex,
vr.isAccepted() ? AvalancheBlockUpdate::Status::Accepted
: AvalancheBlockUpdate::Status::Rejected);
continue;
}
// We just finalized a vote. If it is valid, then let the caller
// know. Either way, remove the item from the map.
updates.emplace_back(pindex,
vr.isAccepted()
? AvalancheBlockUpdate::Status::Finalized
: AvalancheBlockUpdate::Status::Invalid);
w->erase(it);
}
}
// Put the node back in the list of queriable nodes.
auto w = nodecooldown.getWriteView();
w->insert(std::make_pair(cooldown_end, nodeid));
return true;
}
namespace {
/**
* Run the avalanche event loop every 10ms.
*/
static int64_t AVALANCHE_TIME_STEP_MILLISECONDS = 10;
/**
* Maximum item that can be polled at once.
*/
static size_t AVALANCHE_MAX_ELEMENT_POLL = 4096;
}
bool AvalancheProcessor::startEventLoop(CScheduler &scheduler) {
LOCK(cs_running);
if (running) {
// Do not start the event loop twice.
return false;
}
running = true;
// Start the event loop.
scheduler.scheduleEvery(
[this]() -> bool {
runEventLoop();
if (!stopRequest) {
return true;
}
LOCK(cs_running);
running = false;
cond_running.notify_all();
// A stop request was made.
return false;
},
AVALANCHE_TIME_STEP_MILLISECONDS);
return true;
}
bool AvalancheProcessor::stopEventLoop() {
WAIT_LOCK(cs_running, lock);
if (!running) {
return false;
}
// Request avalanche to stop.
stopRequest = true;
// Wait for avalanche to stop.
cond_running.wait(lock, [this] { return !running; });
stopRequest = false;
return true;
}
std::vector<CInv> AvalancheProcessor::getInvsForNextPoll() const {
std::vector<CInv> invs;
auto r = vote_records.getReadView();
for (const std::pair<const CBlockIndex *, VoteRecord> &p :
boost::adaptors::reverse(r)) {
const CBlockIndex *pindex = p.first;
if (!IsWorthPolling(pindex)) {
// Obviously do not poll if the block is not worth polling.
continue;
}
// We don't have a decision, we need more votes.
invs.emplace_back(MSG_BLOCK, p.first->GetBlockHash());
if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) {
// Make sure we do not produce more invs than specified by the
// protocol.
return invs;
}
}
return invs;
}
NodeId AvalancheProcessor::getSuitableNodeToQuery() {
auto w = nodeids.getWriteView();
bool isCooldownMapEmpty;
{
// Recover nodes for which cooldown is over.
auto now = std::chrono::steady_clock::now();
auto wcooldown = nodecooldown.getWriteView();
for (auto it = wcooldown.begin();
it != wcooldown.end() && it->first < now;) {
w->insert(it->second);
wcooldown->erase(it++);
}
isCooldownMapEmpty = wcooldown->empty();
}
// If the cooldown map is empty and we don't have any nodes, it's time to
// fish for new ones.
// FIXME: Clearly, we need a better way to fish for new nodes, but this is
// out of scope for now.
if (isCooldownMapEmpty && w->empty()) {
auto r = queries.getReadView();
// We don't have any candidate node, so let's try to find some.
connman->ForEachNode([&w, &r](CNode *pnode) {
// If this node doesn't support avalanche, we remove.
if (!(pnode->nServices & NODE_AVALANCHE)) {
return;
}
// if we have a request in flight for that node.
if (r->find(pnode->GetId()) != r.end()) {
return;
}
w->insert(pnode->GetId());
});
}
// We don't have any suitable candidate.
if (w->empty()) {
return -1;
}
auto it = w.begin();
NodeId nodeid = *it;
w->erase(it);
return nodeid;
}
void AvalancheProcessor::runEventLoop() {
std::vector<CInv> invs = getInvsForNextPoll();
if (invs.empty()) {
// If there are no invs to poll, we are done.
return;
}
NodeId nodeid = getSuitableNodeToQuery();
/**
* If we lost contact to that node, then we remove it from nodeids, but
* never add the request to queries, which ensures bad nodes get cleaned up
* over time.
*/
connman->ForNode(nodeid, [this, &invs](CNode *pnode) {
{
+ // Compute the time at which this requests times out.
+ auto timeout =
+ std::chrono::steady_clock::now() + std::chrono::seconds(10);
// Register the query.
- queries.getWriteView()->emplace(
- pnode->GetId(), RequestRecord(GetAdjustedTime(), invs));
+ queries.getWriteView()->insert(
+ {pnode->GetId(), round, timeout, invs});
}
// Send the query to the node.
connman->PushMessage(
pnode,
CNetMsgMaker(pnode->GetSendVersion())
.Make(NetMsgType::AVAPOLL,
AvalanchePoll(round++, std::move(invs))));
return true;
});
}
diff --git a/src/avalanche.h b/src/avalanche.h
index cfed897acc..6b1b8cedf9 100644
--- a/src/avalanche.h
+++ b/src/avalanche.h
@@ -1,261 +1,292 @@
// Copyright (c) 2018 The Bitcoin developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_AVALANCHE_H
#define BITCOIN_AVALANCHE_H
#include "blockindexworkcomparator.h"
#include "net.h"
#include "protocol.h" // for CInv
#include "rwcollection.h"
#include "serialize.h"
#include "uint256.h"
+#include <boost/multi_index/composite_key.hpp>
+#include <boost/multi_index/hashed_index.hpp>
+#include <boost/multi_index/member.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+#include <boost/multi_index_container.hpp>
+
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <vector>
class Config;
class CBlockIndex;
class CScheduler;
namespace {
/**
* Finalization score.
*/
static int AVALANCHE_FINALIZATION_SCORE = 128;
}
/**
* Vote history.
*/
struct VoteRecord {
private:
// Historical record of votes.
uint16_t votes;
// confidence's LSB bit is the result. Higher bits are actual confidence
// score.
uint16_t confidence;
/**
* Return the number of bits set in an integer value.
* TODO: There are compiler intrinsics to do that, but we'd need to get them
* detected so this will do for now.
*/
static uint32_t countBits(uint32_t value) {
uint32_t count = 0;
while (value) {
// If the value is non zero, then at least one bit is set.
count++;
// Clear the rightmost bit set.
value &= (value - 1);
}
return count;
}
public:
VoteRecord(bool accepted) : votes(0xaaaa), confidence(accepted) {}
bool isAccepted() const { return confidence & 0x01; }
uint16_t getConfidence() const { return confidence >> 1; }
bool hasFinalized() const {
return getConfidence() >= AVALANCHE_FINALIZATION_SCORE;
}
/**
* Register a new vote for an item and update confidence accordingly.
* Returns true if the acceptance or finalization state changed.
*/
bool registerVote(bool vote) {
votes = (votes << 1) | vote;
auto bits = countBits(votes & 0xff);
bool yes = bits > 6;
bool no = bits < 2;
if (!yes && !no) {
// The vote is inconclusive.
return false;
}
if (isAccepted() == yes) {
// If the vote is in agreement with our internal status, increase
// confidence.
confidence += 2;
return getConfidence() == AVALANCHE_FINALIZATION_SCORE;
}
// The vote did not agree with our internal state, in that case, reset
// confidence.
confidence = yes;
return true;
}
};
class AvalancheVote {
uint32_t error;
uint256 hash;
public:
AvalancheVote() : error(-1), hash() {}
AvalancheVote(uint32_t errorIn, uint256 hashIn)
: error(errorIn), hash(hashIn) {}
const uint256 &GetHash() const { return hash; }
bool IsValid() const { return error == 0; }
// serialization support
ADD_SERIALIZE_METHODS;
template <typename Stream, typename Operation>
inline void SerializationOp(Stream &s, Operation ser_action) {
READWRITE(error);
READWRITE(hash);
}
};
class AvalancheResponse {
+ uint64_t round;
uint32_t cooldown;
std::vector<AvalancheVote> votes;
public:
- AvalancheResponse(uint32_t cooldownIn, std::vector<AvalancheVote> votesIn)
- : cooldown(cooldownIn), votes(votesIn) {}
+ AvalancheResponse(uint64_t roundIn, uint32_t cooldownIn,
+ std::vector<AvalancheVote> votesIn)
+ : round(roundIn), cooldown(cooldownIn), votes(votesIn) {}
+ uint64_t getRound() const { return round; }
uint32_t getCooldown() const { return cooldown; }
const std::vector<AvalancheVote> &GetVotes() const { return votes; }
// serialization support
ADD_SERIALIZE_METHODS;
template <typename Stream, typename Operation>
inline void SerializationOp(Stream &s, Operation ser_action) {
+ READWRITE(round);
READWRITE(cooldown);
READWRITE(votes);
}
};
class AvalanchePoll {
- uint32_t round;
+ uint64_t round;
std::vector<CInv> invs;
public:
AvalanchePoll(uint32_t roundIn, std::vector<CInv> invsIn)
: round(roundIn), invs(invsIn) {}
const std::vector<CInv> &GetInvs() const { return invs; }
// serialization support
ADD_SERIALIZE_METHODS;
template <typename Stream, typename Operation>
inline void SerializationOp(Stream &s, Operation ser_action) {
READWRITE(round);
READWRITE(invs);
}
};
class AvalancheBlockUpdate {
union {
CBlockIndex *pindex;
size_t raw;
};
public:
enum Status : uint8_t {
Invalid,
Rejected,
Accepted,
Finalized,
};
AvalancheBlockUpdate(CBlockIndex *pindexIn, Status statusIn)
: pindex(pindexIn) {
raw |= statusIn;
}
Status getStatus() const { return Status(raw & 0x03); }
CBlockIndex *getBlockIndex() {
return reinterpret_cast<CBlockIndex *>(raw & -size_t(0x04));
}
const CBlockIndex *getBlockIndex() const {
return const_cast<AvalancheBlockUpdate *>(this)->getBlockIndex();
}
};
typedef std::map<const CBlockIndex *, VoteRecord, CBlockIndexWorkComparator>
BlockVoteMap;
typedef std::map<std::chrono::time_point<std::chrono::steady_clock>, NodeId>
NodeCooldownMap;
+struct query_timeout {};
+
class AvalancheProcessor {
private:
CConnman *connman;
/**
* Blocks to run avalanche on.
*/
RWCollection<BlockVoteMap> vote_records;
/**
* Keep track of peers and queries sent.
*/
- struct RequestRecord {
- private:
- int64_t timestamp;
- std::vector<CInv> invs;
-
- public:
- RequestRecord() : timestamp(0), invs() {}
- RequestRecord(int64_t timestampIn, std::vector<CInv> invIn)
- : timestamp(timestampIn), invs(std::move(invIn)) {}
-
- int64_t GetTimestamp() const { return timestamp; }
- const std::vector<CInv> &GetInvs() const { return invs; }
- };
+ typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint;
- std::atomic<uint32_t> round;
+ std::atomic<uint64_t> round;
RWCollection<std::set<NodeId>> nodeids;
- RWCollection<std::map<NodeId, RequestRecord>> queries;
RWCollection<NodeCooldownMap> nodecooldown;
+ struct Query {
+ NodeId nodeid;
+ uint64_t round;
+ TimePoint timeout;
+
+ /**
+ * We declare this as mutable so it can be modified in the multi_index.
+ * This is ok because we do not use this field to index in anyway.
+ *
+ * /!\ Do not use any mutable field as index.
+ */
+ mutable std::vector<CInv> invs;
+ };
+
+ typedef boost::multi_index_container<
+ Query,
+ boost::multi_index::indexed_by<
+ // index by nodeid/round
+ boost::multi_index::ordered_unique<
+ boost::multi_index::composite_key<
+ Query,
+ boost::multi_index::member<Query, NodeId, &Query::nodeid>,
+ boost::multi_index::member<Query, uint64_t,
+ &Query::round>>>,
+ // sorted by timeout
+ boost::multi_index::ordered_non_unique<
+ boost::multi_index::tag<query_timeout>,
+ boost::multi_index::member<Query, TimePoint, &Query::timeout>>>>
+ QuerySet;
+
+ RWCollection<QuerySet> queries;
+
/**
* Start stop machinery.
*/
std::atomic<bool> stopRequest;
bool running GUARDED_BY(cs_running);
CWaitableCriticalSection cs_running;
std::condition_variable cond_running;
public:
AvalancheProcessor(CConnman *connmanIn)
: connman(connmanIn), stopRequest(false), running(false) {}
~AvalancheProcessor() { stopEventLoop(); }
bool addBlockToReconcile(const CBlockIndex *pindex);
bool isAccepted(const CBlockIndex *pindex) const;
bool registerVotes(NodeId nodeid, const AvalancheResponse &response,
std::vector<AvalancheBlockUpdate> &updates);
bool startEventLoop(CScheduler &scheduler);
bool stopEventLoop();
private:
void runEventLoop();
std::vector<CInv> getInvsForNextPoll() const;
NodeId getSuitableNodeToQuery();
friend struct AvalancheTest;
};
#endif // BITCOIN_AVALANCHE_H
diff --git a/src/test/avalanche_tests.cpp b/src/test/avalanche_tests.cpp
index d61912e682..d9ed9ce4e9 100644
--- a/src/test/avalanche_tests.cpp
+++ b/src/test/avalanche_tests.cpp
@@ -1,569 +1,615 @@
// Copyright (c) 2010 The Bitcoin developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "avalanche.h"
#include "config.h"
#include "net_processing.h" // For PeerLogicValidation
#include "test/test_bitcoin.h"
#include <boost/test/unit_test.hpp>
struct AvalancheTest {
static void runEventLoop(AvalancheProcessor &p) { p.runEventLoop(); }
static std::vector<CInv> getInvsForNextPoll(const AvalancheProcessor &p) {
return p.getInvsForNextPoll();
}
static NodeId getSuitableNodeToQuery(AvalancheProcessor &p) {
return p.getSuitableNodeToQuery();
}
static uint32_t getRound(const AvalancheProcessor &p) { return p.round; }
};
BOOST_FIXTURE_TEST_SUITE(avalanche_tests, TestChain100Setup)
#define REGISTER_VOTE_AND_CHECK(vr, vote, state, finalized, confidence) \
vr.registerVote(vote); \
BOOST_CHECK_EQUAL(vr.isAccepted(), state); \
BOOST_CHECK_EQUAL(vr.hasFinalized(), finalized); \
BOOST_CHECK_EQUAL(vr.getConfidence(), confidence);
BOOST_AUTO_TEST_CASE(vote_record) {
VoteRecord vraccepted(true);
// Check initial state.
BOOST_CHECK_EQUAL(vraccepted.isAccepted(), true);
BOOST_CHECK_EQUAL(vraccepted.hasFinalized(), false);
BOOST_CHECK_EQUAL(vraccepted.getConfidence(), 0);
VoteRecord vr(false);
// Check initial state.
BOOST_CHECK_EQUAL(vr.isAccepted(), false);
BOOST_CHECK_EQUAL(vr.hasFinalized(), false);
BOOST_CHECK_EQUAL(vr.getConfidence(), 0);
// We register one vote for, which keep things at 4/4.
REGISTER_VOTE_AND_CHECK(vr, true, false, false, 0);
// One more and we are at 5/3.
REGISTER_VOTE_AND_CHECK(vr, true, false, false, 0);
// One more and we are at 5/3.
REGISTER_VOTE_AND_CHECK(vr, true, false, false, 0);
// One more and we are at 6/2.
REGISTER_VOTE_AND_CHECK(vr, true, false, false, 0);
// One more and we are at 6/2.
REGISTER_VOTE_AND_CHECK(vr, true, false, false, 0);
// Next vote will flip state, and confidence will increase as long as we
// vote yes.
for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) {
REGISTER_VOTE_AND_CHECK(vr, true, true, false, i);
}
// The next vote will finalize the decision.
REGISTER_VOTE_AND_CHECK(vr, false, true, true,
AVALANCHE_FINALIZATION_SCORE);
// Now that we have two no votes, confidence stop increasing.
for (int i = 0; i < 5; i++) {
REGISTER_VOTE_AND_CHECK(vr, false, true, true,
AVALANCHE_FINALIZATION_SCORE);
}
// Next vote will flip state, and confidence will increase as long as we
// vote no.
for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) {
REGISTER_VOTE_AND_CHECK(vr, false, false, false, i);
}
// The next vote will finalize the decision.
REGISTER_VOTE_AND_CHECK(vr, true, false, true,
AVALANCHE_FINALIZATION_SCORE);
}
BOOST_AUTO_TEST_CASE(block_update) {
CBlockIndex index;
CBlockIndex *pindex = &index;
std::set<AvalancheBlockUpdate::Status> status{
AvalancheBlockUpdate::Status::Invalid,
AvalancheBlockUpdate::Status::Rejected,
AvalancheBlockUpdate::Status::Accepted,
AvalancheBlockUpdate::Status::Finalized,
};
for (auto s : status) {
AvalancheBlockUpdate abu(pindex, s);
BOOST_CHECK(abu.getBlockIndex() == pindex);
BOOST_CHECK_EQUAL(abu.getStatus(), s);
}
}
CService ip(uint32_t i) {
struct in_addr s;
s.s_addr = i;
return CService(CNetAddr(s), Params().GetDefaultPort());
}
std::unique_ptr<CNode> ConnectNode(const Config &config, ServiceFlags nServices,
PeerLogicValidation &peerLogic) {
static NodeId id = 0;
CAddress addr(ip(GetRandInt(0xffffffff)), NODE_NONE);
std::unique_ptr<CNode> nodeptr(new CNode(id++, ServiceFlags(NODE_NETWORK),
0, INVALID_SOCKET, addr, 0, 0,
CAddress(), "",
/*fInboundIn=*/false));
CNode &node = *nodeptr;
node.SetSendVersion(PROTOCOL_VERSION);
node.nServices = nServices;
peerLogic.InitializeNode(config, &node);
node.nVersion = 1;
node.fSuccessfullyConnected = true;
CConnmanTest::AddNode(node);
return nodeptr;
}
+static AvalancheResponse next(AvalancheResponse &r) {
+ auto copy = r;
+ r = {r.getRound() + 1, r.getCooldown(), r.GetVotes()};
+ return copy;
+}
+
BOOST_AUTO_TEST_CASE(block_register) {
AvalancheProcessor p(g_connman.get());
std::vector<AvalancheBlockUpdate> updates;
CBlock block = CreateAndProcessBlock({}, CScript());
const uint256 blockHash = block.GetHash();
const CBlockIndex *pindex = mapBlockIndex[blockHash];
const Config &config = GetConfig();
// Create a node that supports avalanche.
auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic);
NodeId nodeid = avanode->GetId();
// Querying for random block returns false.
BOOST_CHECK(!p.isAccepted(pindex));
// Add a new block. Check it is added to the polls.
BOOST_CHECK(p.addBlockToReconcile(pindex));
auto invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 1);
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHash);
// Newly added blocks' state reflect the blockchain.
BOOST_CHECK(p.isAccepted(pindex));
// Let's vote for this block a few times.
- AvalancheResponse resp{0, {AvalancheVote(0, blockHash)}};
+ AvalancheResponse resp{0, 0, {AvalancheVote(0, blockHash)}};
for (int i = 0; i < 4; i++) {
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(nodeid, resp, updates));
+ BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates));
BOOST_CHECK(p.isAccepted(pindex));
BOOST_CHECK_EQUAL(updates.size(), 0);
}
// We vote for it numerous times to finalize it.
for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) {
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(nodeid, resp, updates));
+ BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates));
BOOST_CHECK(p.isAccepted(pindex));
BOOST_CHECK_EQUAL(updates.size(), 0);
}
// As long as it is not finalized, we poll.
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 1);
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHash);
// Now finalize the decision.
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(nodeid, resp, updates));
+ BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates));
BOOST_CHECK_EQUAL(updates.size(), 1);
BOOST_CHECK(updates[0].getBlockIndex() == pindex);
BOOST_CHECK_EQUAL(updates[0].getStatus(),
AvalancheBlockUpdate::Status::Finalized);
updates = {};
// Once the decision is finalized, there is no poll for it.
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 0);
// Now let's undo this and finalize rejection.
BOOST_CHECK(p.addBlockToReconcile(pindex));
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 1);
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHash);
- resp = {0, {AvalancheVote(1, blockHash)}};
+ resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(1, blockHash)}};
for (int i = 0; i < 4; i++) {
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(nodeid, resp, updates));
+ BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates));
BOOST_CHECK(p.isAccepted(pindex));
BOOST_CHECK_EQUAL(updates.size(), 0);
}
// Now the state will flip.
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(nodeid, resp, updates));
+ BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates));
BOOST_CHECK(!p.isAccepted(pindex));
BOOST_CHECK_EQUAL(updates.size(), 1);
BOOST_CHECK(updates[0].getBlockIndex() == pindex);
BOOST_CHECK_EQUAL(updates[0].getStatus(),
AvalancheBlockUpdate::Status::Rejected);
updates = {};
// Now it is rejected, but we can vote for it numerous times.
for (int i = 1; i < AVALANCHE_FINALIZATION_SCORE; i++) {
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(nodeid, resp, updates));
+ BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates));
BOOST_CHECK(!p.isAccepted(pindex));
BOOST_CHECK_EQUAL(updates.size(), 0);
}
// As long as it is not finalized, we poll.
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 1);
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHash);
// Now finalize the decision.
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(nodeid, resp, updates));
+ BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates));
BOOST_CHECK(!p.isAccepted(pindex));
BOOST_CHECK_EQUAL(updates.size(), 1);
BOOST_CHECK(updates[0].getBlockIndex() == pindex);
BOOST_CHECK_EQUAL(updates[0].getStatus(),
AvalancheBlockUpdate::Status::Invalid);
updates = {};
// Once the decision is finalized, there is no poll for it.
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 0);
// Adding the block twice does nothing.
BOOST_CHECK(p.addBlockToReconcile(pindex));
BOOST_CHECK(!p.addBlockToReconcile(pindex));
BOOST_CHECK(p.isAccepted(pindex));
CConnmanTest::ClearNodes();
}
BOOST_AUTO_TEST_CASE(multi_block_register) {
AvalancheProcessor p(g_connman.get());
CBlockIndex indexA, indexB;
std::vector<AvalancheBlockUpdate> updates;
const Config &config = GetConfig();
// Create a node that supports avalanche.
auto node0 = ConnectNode(config, NODE_AVALANCHE, *peerLogic);
auto node1 = ConnectNode(config, NODE_AVALANCHE, *peerLogic);
// Make sure the block has a hash.
CBlock blockA = CreateAndProcessBlock({}, CScript());
const uint256 blockHashA = blockA.GetHash();
const CBlockIndex *pindexA = mapBlockIndex[blockHashA];
CBlock blockB = CreateAndProcessBlock({}, CScript());
const uint256 blockHashB = blockB.GetHash();
const CBlockIndex *pindexB = mapBlockIndex[blockHashB];
// Querying for random block returns false.
BOOST_CHECK(!p.isAccepted(pindexA));
BOOST_CHECK(!p.isAccepted(pindexB));
// Start voting on block A.
BOOST_CHECK(p.addBlockToReconcile(pindexA));
auto invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 1);
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHashA);
+ uint64_t round = AvalancheTest::getRound(p);
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(node0->GetId(),
- {0, {AvalancheVote(0, blockHashA)}}, updates));
+ BOOST_CHECK(p.registerVotes(
+ node0->GetId(), {round, 0, {AvalancheVote(0, blockHashA)}}, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
// Start voting on block B after one vote.
AvalancheResponse resp{
- 0, {AvalancheVote(0, blockHashB), AvalancheVote(0, blockHashA)}};
+ round + 1,
+ 0,
+ {AvalancheVote(0, blockHashB), AvalancheVote(0, blockHashA)}};
BOOST_CHECK(p.addBlockToReconcile(pindexB));
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 2);
// Ensure B comes before A because it has accumulated more PoW.
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHashB);
BOOST_CHECK_EQUAL(invs[1].type, MSG_BLOCK);
BOOST_CHECK(invs[1].hash == blockHashA);
// Let's vote for these blocks a few times.
for (int i = 0; i < 3; i++) {
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(node0->GetId(), resp, updates));
+ BOOST_CHECK(p.registerVotes(node0->GetId(), next(resp), updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
}
// Now it is accepted, but we can vote for it numerous times.
for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) {
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(node0->GetId(), resp, updates));
+ BOOST_CHECK(p.registerVotes(node0->GetId(), next(resp), updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
}
// Running two iterration of the event loop so that vote gets triggerd on A
// and B.
+ NodeId secondNodeid = AvalancheTest::getSuitableNodeToQuery(p);
+ // NB: getSuitableNodeToQuery remove the node from the candidate list, so it
+ // has returned the node that will be queried second. The other one is the
+ // first.
+ NodeId firstNodeid =
+ (node0->GetId() == secondNodeid) ? node1->GetId() : node0->GetId();
AvalancheTest::runEventLoop(p);
AvalancheTest::runEventLoop(p);
// Next vote will finalize block A.
- BOOST_CHECK(p.registerVotes(node1->GetId(), resp, updates));
+ BOOST_CHECK(p.registerVotes(firstNodeid, next(resp), updates));
BOOST_CHECK_EQUAL(updates.size(), 1);
BOOST_CHECK(updates[0].getBlockIndex() == pindexA);
BOOST_CHECK_EQUAL(updates[0].getStatus(),
AvalancheBlockUpdate::Status::Finalized);
updates = {};
// We do not vote on A anymore.
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 1);
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHashB);
// Next vote will finalize block B.
- BOOST_CHECK(p.registerVotes(node0->GetId(), resp, updates));
+ BOOST_CHECK(p.registerVotes(secondNodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 1);
BOOST_CHECK(updates[0].getBlockIndex() == pindexB);
BOOST_CHECK_EQUAL(updates[0].getStatus(),
AvalancheBlockUpdate::Status::Finalized);
updates = {};
// There is nothing left to vote on.
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 0);
CConnmanTest::ClearNodes();
}
BOOST_AUTO_TEST_CASE(poll_and_response) {
AvalancheProcessor p(g_connman.get());
std::vector<AvalancheBlockUpdate> updates;
CBlock block = CreateAndProcessBlock({}, CScript());
const uint256 blockHash = block.GetHash();
const CBlockIndex *pindex = mapBlockIndex[blockHash];
const Config &config = GetConfig();
// There is no node to query.
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1);
// Create a node that supports avalanche and one that doesn't.
auto oldnode = ConnectNode(config, NODE_NONE, *peerLogic);
auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic);
NodeId avanodeid = avanode->GetId();
// It returns the avalanche peer.
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
// Register a block and check it is added to the list of elements to poll.
BOOST_CHECK(p.addBlockToReconcile(pindex));
auto invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 1);
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHash);
// Trigger a poll on avanode.
+ uint64_t round = AvalancheTest::getRound(p);
AvalancheTest::runEventLoop(p);
// There is no more suitable peer available, so return nothing.
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1);
// Respond to the request.
- AvalancheResponse resp = {0, {AvalancheVote(0, blockHash)}};
+ AvalancheResponse resp = {round, 0, {AvalancheVote(0, blockHash)}};
BOOST_CHECK(p.registerVotes(avanodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
// Now that avanode fullfilled his request, it is added back to the list of
// queriable nodes.
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
// Sending a response when not polled fails.
- BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates));
+ BOOST_CHECK(!p.registerVotes(avanodeid, next(resp), updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
// Trigger a poll on avanode.
+ round = AvalancheTest::getRound(p);
AvalancheTest::runEventLoop(p);
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1);
// Sending responses that do not match the request also fails.
// 1. Too many results.
- resp = {0, {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash)}};
+ resp = {
+ round, 0, {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash)}};
AvalancheTest::runEventLoop(p);
BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
// 2. Not enough results.
- resp = {0, {}};
+ resp = {AvalancheTest::getRound(p), 0, {}};
AvalancheTest::runEventLoop(p);
BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
// 3. Do not match the poll.
- resp = {0, {AvalancheVote()}};
+ resp = {AvalancheTest::getRound(p), 0, {AvalancheVote()}};
AvalancheTest::runEventLoop(p);
BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
- // Proper response gets processed and avanode is available again.
- resp = {0, {AvalancheVote(0, blockHash)}};
+ // 4. Invalid round count. Node is not returned to the pool.
+ uint64_t queryRound = AvalancheTest::getRound(p);
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(avanodeid, resp, updates));
+
+ resp = {queryRound + 1, 0, {AvalancheVote()}};
+ BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
- BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
+ BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1);
- // Making request for invalid nodes do not work.
+ resp = {queryRound - 1, 0, {AvalancheVote()}};
+ BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates));
+ BOOST_CHECK_EQUAL(updates.size(), 0);
+ BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1);
+
+ // 5. Making request for invalid nodes do not work. Node is not returned to
+ // the pool.
+ resp = {queryRound, 0, {AvalancheVote(0, blockHash)}};
BOOST_CHECK(!p.registerVotes(avanodeid + 1234, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
+ BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1);
+
+ // Proper response gets processed and avanode is available again.
+ resp = {queryRound, 0, {AvalancheVote(0, blockHash)}};
+ BOOST_CHECK(p.registerVotes(avanodeid, resp, updates));
+ BOOST_CHECK_EQUAL(updates.size(), 0);
+ BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
// Out of order response are rejected.
CBlock block2 = CreateAndProcessBlock({}, CScript());
const uint256 blockHash2 = block2.GetHash();
CBlockIndex *pindex2 = mapBlockIndex[blockHash2];
BOOST_CHECK(p.addBlockToReconcile(pindex2));
- resp = {0, {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash2)}};
+ resp = {AvalancheTest::getRound(p),
+ 0,
+ {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash2)}};
AvalancheTest::runEventLoop(p);
BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
+ // But they are accepted in order.
+ resp = {AvalancheTest::getRound(p),
+ 0,
+ {AvalancheVote(0, blockHash2), AvalancheVote(0, blockHash)}};
+ AvalancheTest::runEventLoop(p);
+ BOOST_CHECK(p.registerVotes(avanode->GetId(), resp, updates));
+ BOOST_CHECK_EQUAL(updates.size(), 0);
+ BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
+
// When a block is marked invalid, stop polling.
pindex2->nStatus = pindex2->nStatus.withFailed();
- resp = {0, {AvalancheVote(0, blockHash)}};
+ resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(0, blockHash)}};
AvalancheTest::runEventLoop(p);
BOOST_CHECK(p.registerVotes(avanodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
CConnmanTest::ClearNodes();
}
BOOST_AUTO_TEST_CASE(event_loop) {
AvalancheProcessor p(g_connman.get());
CScheduler s;
CBlock block = CreateAndProcessBlock({}, CScript());
const uint256 blockHash = block.GetHash();
const CBlockIndex *pindex = mapBlockIndex[blockHash];
// Starting the event loop.
BOOST_CHECK(p.startEventLoop(s));
// There is one task planned in the next hour (our event loop).
boost::chrono::system_clock::time_point start, stop;
BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1);
// Starting twice doesn't start it twice.
BOOST_CHECK(!p.startEventLoop(s));
// Start the scheduler thread.
std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s));
// Create a node and a block to query.
const Config &config = GetConfig();
// Create a node that supports avalanche.
auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic);
NodeId nodeid = avanode->GetId();
// There is no query in flight at the moment.
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), nodeid);
// Add a new block. Check it is added to the polls.
+ uint64_t queryRound = AvalancheTest::getRound(p);
BOOST_CHECK(p.addBlockToReconcile(pindex));
- uint32_t round = AvalancheTest::getRound(p);
for (int i = 0; i < 1000; i++) {
// Technically, this is a race condition, but this should do just fine
// as we wait up to 1s for an event that should take 10ms.
boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
- if (AvalancheTest::getRound(p) != round) {
+ if (AvalancheTest::getRound(p) != queryRound) {
break;
}
}
// Check that we effectively got a request and not timed out.
- BOOST_CHECK(AvalancheTest::getRound(p) > round);
+ BOOST_CHECK(AvalancheTest::getRound(p) > queryRound);
// Respond and check the cooldown time is respected.
- round = AvalancheTest::getRound(p);
+ uint64_t responseRound = AvalancheTest::getRound(p);
auto queryTime =
std::chrono::steady_clock::now() + std::chrono::milliseconds(100);
std::vector<AvalancheBlockUpdate> updates;
- p.registerVotes(nodeid, {100, {AvalancheVote(0, blockHash)}}, updates);
+ p.registerVotes(nodeid, {queryRound, 100, {AvalancheVote(0, blockHash)}},
+ updates);
for (int i = 0; i < 1000; i++) {
// We make sure that we do not get a request before queryTime.
boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
- if (AvalancheTest::getRound(p) != round) {
+ if (AvalancheTest::getRound(p) != responseRound) {
BOOST_CHECK(std::chrono::steady_clock::now() > queryTime);
break;
}
}
// But we eventually get one.
- BOOST_CHECK(AvalancheTest::getRound(p) > round);
+ BOOST_CHECK(AvalancheTest::getRound(p) > responseRound);
// Stop event loop.
BOOST_CHECK(p.stopEventLoop());
// We don't have any task scheduled anymore.
BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0);
// Can't stop the event loop twice.
BOOST_CHECK(!p.stopEventLoop());
// Wait for the scheduler to stop.
s.stop(true);
schedulerThread.join();
CConnmanTest::ClearNodes();
}
BOOST_AUTO_TEST_CASE(destructor) {
CScheduler s;
boost::chrono::system_clock::time_point start, stop;
// Start the scheduler thread.
std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s));
{
AvalancheProcessor p(g_connman.get());
BOOST_CHECK(p.startEventLoop(s));
BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1);
}
// Now that avalanche is destroyed, there is no more scheduled tasks.
BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0);
// Wait for the scheduler to stop.
s.stop(true);
schedulerThread.join();
}
BOOST_AUTO_TEST_SUITE_END()

File Metadata

Mime Type
text/x-diff
Expires
Sun, Mar 2, 11:50 (1 d, 6 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5187702
Default Alt Text
(44 KB)

Event Timeline