Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F13115712
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
44 KB
Subscribers
None
View Options
diff --git a/src/avalanche.cpp b/src/avalanche.cpp
index 7ac03a046d..8b74b01b26 100644
--- a/src/avalanche.cpp
+++ b/src/avalanche.cpp
@@ -1,336 +1,340 @@
// Copyright (c) 2018 The Bitcoin developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "avalanche.h"
#include "chain.h"
#include "netmessagemaker.h"
#include "scheduler.h"
#include "validation.h"
#include <boost/range/adaptor/reversed.hpp>
+#include <tuple>
+
static bool IsWorthPolling(const CBlockIndex *pindex) {
AssertLockHeld(cs_main);
if (pindex->nStatus.isInvalid()) {
// No point polling invalid blocks.
return false;
}
if (IsBlockFinalized(pindex)) {
// There is no point polling finalized block.
return false;
}
return true;
}
bool AvalancheProcessor::addBlockToReconcile(const CBlockIndex *pindex) {
bool isAccepted;
{
LOCK(cs_main);
if (!IsWorthPolling(pindex)) {
// There is no point polling this block.
return false;
}
isAccepted = chainActive.Contains(pindex);
}
return vote_records.getWriteView()
->insert(std::make_pair(pindex, VoteRecord(isAccepted)))
.second;
}
static const VoteRecord *
GetRecord(const RWCollection<BlockVoteMap> &vote_records,
const CBlockIndex *pindex) {
auto r = vote_records.getReadView();
auto it = r->find(pindex);
if (it == r.end()) {
return nullptr;
}
return &it->second;
}
bool AvalancheProcessor::isAccepted(const CBlockIndex *pindex) const {
if (auto vr = GetRecord(vote_records, pindex)) {
return vr->isAccepted();
}
return false;
}
bool AvalancheProcessor::registerVotes(
NodeId nodeid, const AvalancheResponse &response,
std::vector<AvalancheBlockUpdate> &updates) {
// Save the time at which we can query again.
auto cooldown_end = std::chrono::steady_clock::now() +
std::chrono::milliseconds(response.getCooldown());
- RequestRecord r;
+ std::vector<CInv> invs;
{
// Check that the query exists.
auto w = queries.getWriteView();
- auto it = w->find(nodeid);
+ auto it = w->find(std::make_tuple(nodeid, response.getRound()));
if (it == w.end()) {
// NB: The request may be old, so we don't increase banscore.
return false;
}
- r = std::move(it->second);
+ invs = std::move(it->invs);
w->erase(it);
}
// Verify that the request and the vote are consistent.
- const std::vector<CInv> &invs = r.GetInvs();
const std::vector<AvalancheVote> &votes = response.GetVotes();
size_t size = invs.size();
if (votes.size() != size) {
// TODO: increase banscore for inconsistent response.
// NB: This isn't timeout but actually node misbehaving.
return false;
}
for (size_t i = 0; i < size; i++) {
if (invs[i].hash != votes[i].GetHash()) {
// TODO: increase banscore for inconsistent response.
// NB: This isn't timeout but actually node misbehaving.
return false;
}
}
std::map<CBlockIndex *, AvalancheVote> responseIndex;
{
LOCK(cs_main);
for (auto &v : votes) {
BlockMap::iterator mi = mapBlockIndex.find(v.GetHash());
if (mi == mapBlockIndex.end()) {
// This should not happen, but just in case...
continue;
}
CBlockIndex *pindex = mi->second;
if (!IsWorthPolling(pindex)) {
// There is no point polling this block.
continue;
}
responseIndex.insert(std::make_pair(pindex, v));
}
}
{
// Register votes.
auto w = vote_records.getWriteView();
for (auto &p : responseIndex) {
CBlockIndex *pindex = p.first;
const AvalancheVote &v = p.second;
auto it = w->find(pindex);
if (it == w.end()) {
// We are not voting on that item anymore.
continue;
}
auto &vr = it->second;
if (!vr.registerVote(v.IsValid())) {
// This vote did not provide any extra information, move on.
continue;
}
if (!vr.hasFinalized()) {
// This item has note been finalized, so we have nothing more to
// do.
updates.emplace_back(
pindex,
vr.isAccepted() ? AvalancheBlockUpdate::Status::Accepted
: AvalancheBlockUpdate::Status::Rejected);
continue;
}
// We just finalized a vote. If it is valid, then let the caller
// know. Either way, remove the item from the map.
updates.emplace_back(pindex,
vr.isAccepted()
? AvalancheBlockUpdate::Status::Finalized
: AvalancheBlockUpdate::Status::Invalid);
w->erase(it);
}
}
// Put the node back in the list of queriable nodes.
auto w = nodecooldown.getWriteView();
w->insert(std::make_pair(cooldown_end, nodeid));
return true;
}
namespace {
/**
* Run the avalanche event loop every 10ms.
*/
static int64_t AVALANCHE_TIME_STEP_MILLISECONDS = 10;
/**
* Maximum item that can be polled at once.
*/
static size_t AVALANCHE_MAX_ELEMENT_POLL = 4096;
}
bool AvalancheProcessor::startEventLoop(CScheduler &scheduler) {
LOCK(cs_running);
if (running) {
// Do not start the event loop twice.
return false;
}
running = true;
// Start the event loop.
scheduler.scheduleEvery(
[this]() -> bool {
runEventLoop();
if (!stopRequest) {
return true;
}
LOCK(cs_running);
running = false;
cond_running.notify_all();
// A stop request was made.
return false;
},
AVALANCHE_TIME_STEP_MILLISECONDS);
return true;
}
bool AvalancheProcessor::stopEventLoop() {
WAIT_LOCK(cs_running, lock);
if (!running) {
return false;
}
// Request avalanche to stop.
stopRequest = true;
// Wait for avalanche to stop.
cond_running.wait(lock, [this] { return !running; });
stopRequest = false;
return true;
}
std::vector<CInv> AvalancheProcessor::getInvsForNextPoll() const {
std::vector<CInv> invs;
auto r = vote_records.getReadView();
for (const std::pair<const CBlockIndex *, VoteRecord> &p :
boost::adaptors::reverse(r)) {
const CBlockIndex *pindex = p.first;
if (!IsWorthPolling(pindex)) {
// Obviously do not poll if the block is not worth polling.
continue;
}
// We don't have a decision, we need more votes.
invs.emplace_back(MSG_BLOCK, p.first->GetBlockHash());
if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) {
// Make sure we do not produce more invs than specified by the
// protocol.
return invs;
}
}
return invs;
}
NodeId AvalancheProcessor::getSuitableNodeToQuery() {
auto w = nodeids.getWriteView();
bool isCooldownMapEmpty;
{
// Recover nodes for which cooldown is over.
auto now = std::chrono::steady_clock::now();
auto wcooldown = nodecooldown.getWriteView();
for (auto it = wcooldown.begin();
it != wcooldown.end() && it->first < now;) {
w->insert(it->second);
wcooldown->erase(it++);
}
isCooldownMapEmpty = wcooldown->empty();
}
// If the cooldown map is empty and we don't have any nodes, it's time to
// fish for new ones.
// FIXME: Clearly, we need a better way to fish for new nodes, but this is
// out of scope for now.
if (isCooldownMapEmpty && w->empty()) {
auto r = queries.getReadView();
// We don't have any candidate node, so let's try to find some.
connman->ForEachNode([&w, &r](CNode *pnode) {
// If this node doesn't support avalanche, we remove.
if (!(pnode->nServices & NODE_AVALANCHE)) {
return;
}
// if we have a request in flight for that node.
if (r->find(pnode->GetId()) != r.end()) {
return;
}
w->insert(pnode->GetId());
});
}
// We don't have any suitable candidate.
if (w->empty()) {
return -1;
}
auto it = w.begin();
NodeId nodeid = *it;
w->erase(it);
return nodeid;
}
void AvalancheProcessor::runEventLoop() {
std::vector<CInv> invs = getInvsForNextPoll();
if (invs.empty()) {
// If there are no invs to poll, we are done.
return;
}
NodeId nodeid = getSuitableNodeToQuery();
/**
* If we lost contact to that node, then we remove it from nodeids, but
* never add the request to queries, which ensures bad nodes get cleaned up
* over time.
*/
connman->ForNode(nodeid, [this, &invs](CNode *pnode) {
{
+ // Compute the time at which this requests times out.
+ auto timeout =
+ std::chrono::steady_clock::now() + std::chrono::seconds(10);
// Register the query.
- queries.getWriteView()->emplace(
- pnode->GetId(), RequestRecord(GetAdjustedTime(), invs));
+ queries.getWriteView()->insert(
+ {pnode->GetId(), round, timeout, invs});
}
// Send the query to the node.
connman->PushMessage(
pnode,
CNetMsgMaker(pnode->GetSendVersion())
.Make(NetMsgType::AVAPOLL,
AvalanchePoll(round++, std::move(invs))));
return true;
});
}
diff --git a/src/avalanche.h b/src/avalanche.h
index cfed897acc..6b1b8cedf9 100644
--- a/src/avalanche.h
+++ b/src/avalanche.h
@@ -1,261 +1,292 @@
// Copyright (c) 2018 The Bitcoin developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_AVALANCHE_H
#define BITCOIN_AVALANCHE_H
#include "blockindexworkcomparator.h"
#include "net.h"
#include "protocol.h" // for CInv
#include "rwcollection.h"
#include "serialize.h"
#include "uint256.h"
+#include <boost/multi_index/composite_key.hpp>
+#include <boost/multi_index/hashed_index.hpp>
+#include <boost/multi_index/member.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+#include <boost/multi_index_container.hpp>
+
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <vector>
class Config;
class CBlockIndex;
class CScheduler;
namespace {
/**
* Finalization score.
*/
static int AVALANCHE_FINALIZATION_SCORE = 128;
}
/**
* Vote history.
*/
struct VoteRecord {
private:
// Historical record of votes.
uint16_t votes;
// confidence's LSB bit is the result. Higher bits are actual confidence
// score.
uint16_t confidence;
/**
* Return the number of bits set in an integer value.
* TODO: There are compiler intrinsics to do that, but we'd need to get them
* detected so this will do for now.
*/
static uint32_t countBits(uint32_t value) {
uint32_t count = 0;
while (value) {
// If the value is non zero, then at least one bit is set.
count++;
// Clear the rightmost bit set.
value &= (value - 1);
}
return count;
}
public:
VoteRecord(bool accepted) : votes(0xaaaa), confidence(accepted) {}
bool isAccepted() const { return confidence & 0x01; }
uint16_t getConfidence() const { return confidence >> 1; }
bool hasFinalized() const {
return getConfidence() >= AVALANCHE_FINALIZATION_SCORE;
}
/**
* Register a new vote for an item and update confidence accordingly.
* Returns true if the acceptance or finalization state changed.
*/
bool registerVote(bool vote) {
votes = (votes << 1) | vote;
auto bits = countBits(votes & 0xff);
bool yes = bits > 6;
bool no = bits < 2;
if (!yes && !no) {
// The vote is inconclusive.
return false;
}
if (isAccepted() == yes) {
// If the vote is in agreement with our internal status, increase
// confidence.
confidence += 2;
return getConfidence() == AVALANCHE_FINALIZATION_SCORE;
}
// The vote did not agree with our internal state, in that case, reset
// confidence.
confidence = yes;
return true;
}
};
class AvalancheVote {
uint32_t error;
uint256 hash;
public:
AvalancheVote() : error(-1), hash() {}
AvalancheVote(uint32_t errorIn, uint256 hashIn)
: error(errorIn), hash(hashIn) {}
const uint256 &GetHash() const { return hash; }
bool IsValid() const { return error == 0; }
// serialization support
ADD_SERIALIZE_METHODS;
template <typename Stream, typename Operation>
inline void SerializationOp(Stream &s, Operation ser_action) {
READWRITE(error);
READWRITE(hash);
}
};
class AvalancheResponse {
+ uint64_t round;
uint32_t cooldown;
std::vector<AvalancheVote> votes;
public:
- AvalancheResponse(uint32_t cooldownIn, std::vector<AvalancheVote> votesIn)
- : cooldown(cooldownIn), votes(votesIn) {}
+ AvalancheResponse(uint64_t roundIn, uint32_t cooldownIn,
+ std::vector<AvalancheVote> votesIn)
+ : round(roundIn), cooldown(cooldownIn), votes(votesIn) {}
+ uint64_t getRound() const { return round; }
uint32_t getCooldown() const { return cooldown; }
const std::vector<AvalancheVote> &GetVotes() const { return votes; }
// serialization support
ADD_SERIALIZE_METHODS;
template <typename Stream, typename Operation>
inline void SerializationOp(Stream &s, Operation ser_action) {
+ READWRITE(round);
READWRITE(cooldown);
READWRITE(votes);
}
};
class AvalanchePoll {
- uint32_t round;
+ uint64_t round;
std::vector<CInv> invs;
public:
AvalanchePoll(uint32_t roundIn, std::vector<CInv> invsIn)
: round(roundIn), invs(invsIn) {}
const std::vector<CInv> &GetInvs() const { return invs; }
// serialization support
ADD_SERIALIZE_METHODS;
template <typename Stream, typename Operation>
inline void SerializationOp(Stream &s, Operation ser_action) {
READWRITE(round);
READWRITE(invs);
}
};
class AvalancheBlockUpdate {
union {
CBlockIndex *pindex;
size_t raw;
};
public:
enum Status : uint8_t {
Invalid,
Rejected,
Accepted,
Finalized,
};
AvalancheBlockUpdate(CBlockIndex *pindexIn, Status statusIn)
: pindex(pindexIn) {
raw |= statusIn;
}
Status getStatus() const { return Status(raw & 0x03); }
CBlockIndex *getBlockIndex() {
return reinterpret_cast<CBlockIndex *>(raw & -size_t(0x04));
}
const CBlockIndex *getBlockIndex() const {
return const_cast<AvalancheBlockUpdate *>(this)->getBlockIndex();
}
};
typedef std::map<const CBlockIndex *, VoteRecord, CBlockIndexWorkComparator>
BlockVoteMap;
typedef std::map<std::chrono::time_point<std::chrono::steady_clock>, NodeId>
NodeCooldownMap;
+struct query_timeout {};
+
class AvalancheProcessor {
private:
CConnman *connman;
/**
* Blocks to run avalanche on.
*/
RWCollection<BlockVoteMap> vote_records;
/**
* Keep track of peers and queries sent.
*/
- struct RequestRecord {
- private:
- int64_t timestamp;
- std::vector<CInv> invs;
-
- public:
- RequestRecord() : timestamp(0), invs() {}
- RequestRecord(int64_t timestampIn, std::vector<CInv> invIn)
- : timestamp(timestampIn), invs(std::move(invIn)) {}
-
- int64_t GetTimestamp() const { return timestamp; }
- const std::vector<CInv> &GetInvs() const { return invs; }
- };
+ typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint;
- std::atomic<uint32_t> round;
+ std::atomic<uint64_t> round;
RWCollection<std::set<NodeId>> nodeids;
- RWCollection<std::map<NodeId, RequestRecord>> queries;
RWCollection<NodeCooldownMap> nodecooldown;
+ struct Query {
+ NodeId nodeid;
+ uint64_t round;
+ TimePoint timeout;
+
+ /**
+ * We declare this as mutable so it can be modified in the multi_index.
+ * This is ok because we do not use this field to index in anyway.
+ *
+ * /!\ Do not use any mutable field as index.
+ */
+ mutable std::vector<CInv> invs;
+ };
+
+ typedef boost::multi_index_container<
+ Query,
+ boost::multi_index::indexed_by<
+ // index by nodeid/round
+ boost::multi_index::ordered_unique<
+ boost::multi_index::composite_key<
+ Query,
+ boost::multi_index::member<Query, NodeId, &Query::nodeid>,
+ boost::multi_index::member<Query, uint64_t,
+ &Query::round>>>,
+ // sorted by timeout
+ boost::multi_index::ordered_non_unique<
+ boost::multi_index::tag<query_timeout>,
+ boost::multi_index::member<Query, TimePoint, &Query::timeout>>>>
+ QuerySet;
+
+ RWCollection<QuerySet> queries;
+
/**
* Start stop machinery.
*/
std::atomic<bool> stopRequest;
bool running GUARDED_BY(cs_running);
CWaitableCriticalSection cs_running;
std::condition_variable cond_running;
public:
AvalancheProcessor(CConnman *connmanIn)
: connman(connmanIn), stopRequest(false), running(false) {}
~AvalancheProcessor() { stopEventLoop(); }
bool addBlockToReconcile(const CBlockIndex *pindex);
bool isAccepted(const CBlockIndex *pindex) const;
bool registerVotes(NodeId nodeid, const AvalancheResponse &response,
std::vector<AvalancheBlockUpdate> &updates);
bool startEventLoop(CScheduler &scheduler);
bool stopEventLoop();
private:
void runEventLoop();
std::vector<CInv> getInvsForNextPoll() const;
NodeId getSuitableNodeToQuery();
friend struct AvalancheTest;
};
#endif // BITCOIN_AVALANCHE_H
diff --git a/src/test/avalanche_tests.cpp b/src/test/avalanche_tests.cpp
index d61912e682..d9ed9ce4e9 100644
--- a/src/test/avalanche_tests.cpp
+++ b/src/test/avalanche_tests.cpp
@@ -1,569 +1,615 @@
// Copyright (c) 2010 The Bitcoin developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "avalanche.h"
#include "config.h"
#include "net_processing.h" // For PeerLogicValidation
#include "test/test_bitcoin.h"
#include <boost/test/unit_test.hpp>
struct AvalancheTest {
static void runEventLoop(AvalancheProcessor &p) { p.runEventLoop(); }
static std::vector<CInv> getInvsForNextPoll(const AvalancheProcessor &p) {
return p.getInvsForNextPoll();
}
static NodeId getSuitableNodeToQuery(AvalancheProcessor &p) {
return p.getSuitableNodeToQuery();
}
static uint32_t getRound(const AvalancheProcessor &p) { return p.round; }
};
BOOST_FIXTURE_TEST_SUITE(avalanche_tests, TestChain100Setup)
#define REGISTER_VOTE_AND_CHECK(vr, vote, state, finalized, confidence) \
vr.registerVote(vote); \
BOOST_CHECK_EQUAL(vr.isAccepted(), state); \
BOOST_CHECK_EQUAL(vr.hasFinalized(), finalized); \
BOOST_CHECK_EQUAL(vr.getConfidence(), confidence);
BOOST_AUTO_TEST_CASE(vote_record) {
VoteRecord vraccepted(true);
// Check initial state.
BOOST_CHECK_EQUAL(vraccepted.isAccepted(), true);
BOOST_CHECK_EQUAL(vraccepted.hasFinalized(), false);
BOOST_CHECK_EQUAL(vraccepted.getConfidence(), 0);
VoteRecord vr(false);
// Check initial state.
BOOST_CHECK_EQUAL(vr.isAccepted(), false);
BOOST_CHECK_EQUAL(vr.hasFinalized(), false);
BOOST_CHECK_EQUAL(vr.getConfidence(), 0);
// We register one vote for, which keep things at 4/4.
REGISTER_VOTE_AND_CHECK(vr, true, false, false, 0);
// One more and we are at 5/3.
REGISTER_VOTE_AND_CHECK(vr, true, false, false, 0);
// One more and we are at 5/3.
REGISTER_VOTE_AND_CHECK(vr, true, false, false, 0);
// One more and we are at 6/2.
REGISTER_VOTE_AND_CHECK(vr, true, false, false, 0);
// One more and we are at 6/2.
REGISTER_VOTE_AND_CHECK(vr, true, false, false, 0);
// Next vote will flip state, and confidence will increase as long as we
// vote yes.
for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) {
REGISTER_VOTE_AND_CHECK(vr, true, true, false, i);
}
// The next vote will finalize the decision.
REGISTER_VOTE_AND_CHECK(vr, false, true, true,
AVALANCHE_FINALIZATION_SCORE);
// Now that we have two no votes, confidence stop increasing.
for (int i = 0; i < 5; i++) {
REGISTER_VOTE_AND_CHECK(vr, false, true, true,
AVALANCHE_FINALIZATION_SCORE);
}
// Next vote will flip state, and confidence will increase as long as we
// vote no.
for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) {
REGISTER_VOTE_AND_CHECK(vr, false, false, false, i);
}
// The next vote will finalize the decision.
REGISTER_VOTE_AND_CHECK(vr, true, false, true,
AVALANCHE_FINALIZATION_SCORE);
}
BOOST_AUTO_TEST_CASE(block_update) {
CBlockIndex index;
CBlockIndex *pindex = &index;
std::set<AvalancheBlockUpdate::Status> status{
AvalancheBlockUpdate::Status::Invalid,
AvalancheBlockUpdate::Status::Rejected,
AvalancheBlockUpdate::Status::Accepted,
AvalancheBlockUpdate::Status::Finalized,
};
for (auto s : status) {
AvalancheBlockUpdate abu(pindex, s);
BOOST_CHECK(abu.getBlockIndex() == pindex);
BOOST_CHECK_EQUAL(abu.getStatus(), s);
}
}
CService ip(uint32_t i) {
struct in_addr s;
s.s_addr = i;
return CService(CNetAddr(s), Params().GetDefaultPort());
}
std::unique_ptr<CNode> ConnectNode(const Config &config, ServiceFlags nServices,
PeerLogicValidation &peerLogic) {
static NodeId id = 0;
CAddress addr(ip(GetRandInt(0xffffffff)), NODE_NONE);
std::unique_ptr<CNode> nodeptr(new CNode(id++, ServiceFlags(NODE_NETWORK),
0, INVALID_SOCKET, addr, 0, 0,
CAddress(), "",
/*fInboundIn=*/false));
CNode &node = *nodeptr;
node.SetSendVersion(PROTOCOL_VERSION);
node.nServices = nServices;
peerLogic.InitializeNode(config, &node);
node.nVersion = 1;
node.fSuccessfullyConnected = true;
CConnmanTest::AddNode(node);
return nodeptr;
}
+static AvalancheResponse next(AvalancheResponse &r) {
+ auto copy = r;
+ r = {r.getRound() + 1, r.getCooldown(), r.GetVotes()};
+ return copy;
+}
+
BOOST_AUTO_TEST_CASE(block_register) {
AvalancheProcessor p(g_connman.get());
std::vector<AvalancheBlockUpdate> updates;
CBlock block = CreateAndProcessBlock({}, CScript());
const uint256 blockHash = block.GetHash();
const CBlockIndex *pindex = mapBlockIndex[blockHash];
const Config &config = GetConfig();
// Create a node that supports avalanche.
auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic);
NodeId nodeid = avanode->GetId();
// Querying for random block returns false.
BOOST_CHECK(!p.isAccepted(pindex));
// Add a new block. Check it is added to the polls.
BOOST_CHECK(p.addBlockToReconcile(pindex));
auto invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 1);
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHash);
// Newly added blocks' state reflect the blockchain.
BOOST_CHECK(p.isAccepted(pindex));
// Let's vote for this block a few times.
- AvalancheResponse resp{0, {AvalancheVote(0, blockHash)}};
+ AvalancheResponse resp{0, 0, {AvalancheVote(0, blockHash)}};
for (int i = 0; i < 4; i++) {
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(nodeid, resp, updates));
+ BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates));
BOOST_CHECK(p.isAccepted(pindex));
BOOST_CHECK_EQUAL(updates.size(), 0);
}
// We vote for it numerous times to finalize it.
for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) {
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(nodeid, resp, updates));
+ BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates));
BOOST_CHECK(p.isAccepted(pindex));
BOOST_CHECK_EQUAL(updates.size(), 0);
}
// As long as it is not finalized, we poll.
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 1);
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHash);
// Now finalize the decision.
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(nodeid, resp, updates));
+ BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates));
BOOST_CHECK_EQUAL(updates.size(), 1);
BOOST_CHECK(updates[0].getBlockIndex() == pindex);
BOOST_CHECK_EQUAL(updates[0].getStatus(),
AvalancheBlockUpdate::Status::Finalized);
updates = {};
// Once the decision is finalized, there is no poll for it.
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 0);
// Now let's undo this and finalize rejection.
BOOST_CHECK(p.addBlockToReconcile(pindex));
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 1);
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHash);
- resp = {0, {AvalancheVote(1, blockHash)}};
+ resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(1, blockHash)}};
for (int i = 0; i < 4; i++) {
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(nodeid, resp, updates));
+ BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates));
BOOST_CHECK(p.isAccepted(pindex));
BOOST_CHECK_EQUAL(updates.size(), 0);
}
// Now the state will flip.
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(nodeid, resp, updates));
+ BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates));
BOOST_CHECK(!p.isAccepted(pindex));
BOOST_CHECK_EQUAL(updates.size(), 1);
BOOST_CHECK(updates[0].getBlockIndex() == pindex);
BOOST_CHECK_EQUAL(updates[0].getStatus(),
AvalancheBlockUpdate::Status::Rejected);
updates = {};
// Now it is rejected, but we can vote for it numerous times.
for (int i = 1; i < AVALANCHE_FINALIZATION_SCORE; i++) {
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(nodeid, resp, updates));
+ BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates));
BOOST_CHECK(!p.isAccepted(pindex));
BOOST_CHECK_EQUAL(updates.size(), 0);
}
// As long as it is not finalized, we poll.
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 1);
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHash);
// Now finalize the decision.
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(nodeid, resp, updates));
+ BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates));
BOOST_CHECK(!p.isAccepted(pindex));
BOOST_CHECK_EQUAL(updates.size(), 1);
BOOST_CHECK(updates[0].getBlockIndex() == pindex);
BOOST_CHECK_EQUAL(updates[0].getStatus(),
AvalancheBlockUpdate::Status::Invalid);
updates = {};
// Once the decision is finalized, there is no poll for it.
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 0);
// Adding the block twice does nothing.
BOOST_CHECK(p.addBlockToReconcile(pindex));
BOOST_CHECK(!p.addBlockToReconcile(pindex));
BOOST_CHECK(p.isAccepted(pindex));
CConnmanTest::ClearNodes();
}
BOOST_AUTO_TEST_CASE(multi_block_register) {
AvalancheProcessor p(g_connman.get());
CBlockIndex indexA, indexB;
std::vector<AvalancheBlockUpdate> updates;
const Config &config = GetConfig();
// Create a node that supports avalanche.
auto node0 = ConnectNode(config, NODE_AVALANCHE, *peerLogic);
auto node1 = ConnectNode(config, NODE_AVALANCHE, *peerLogic);
// Make sure the block has a hash.
CBlock blockA = CreateAndProcessBlock({}, CScript());
const uint256 blockHashA = blockA.GetHash();
const CBlockIndex *pindexA = mapBlockIndex[blockHashA];
CBlock blockB = CreateAndProcessBlock({}, CScript());
const uint256 blockHashB = blockB.GetHash();
const CBlockIndex *pindexB = mapBlockIndex[blockHashB];
// Querying for random block returns false.
BOOST_CHECK(!p.isAccepted(pindexA));
BOOST_CHECK(!p.isAccepted(pindexB));
// Start voting on block A.
BOOST_CHECK(p.addBlockToReconcile(pindexA));
auto invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 1);
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHashA);
+ uint64_t round = AvalancheTest::getRound(p);
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(node0->GetId(),
- {0, {AvalancheVote(0, blockHashA)}}, updates));
+ BOOST_CHECK(p.registerVotes(
+ node0->GetId(), {round, 0, {AvalancheVote(0, blockHashA)}}, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
// Start voting on block B after one vote.
AvalancheResponse resp{
- 0, {AvalancheVote(0, blockHashB), AvalancheVote(0, blockHashA)}};
+ round + 1,
+ 0,
+ {AvalancheVote(0, blockHashB), AvalancheVote(0, blockHashA)}};
BOOST_CHECK(p.addBlockToReconcile(pindexB));
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 2);
// Ensure B comes before A because it has accumulated more PoW.
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHashB);
BOOST_CHECK_EQUAL(invs[1].type, MSG_BLOCK);
BOOST_CHECK(invs[1].hash == blockHashA);
// Let's vote for these blocks a few times.
for (int i = 0; i < 3; i++) {
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(node0->GetId(), resp, updates));
+ BOOST_CHECK(p.registerVotes(node0->GetId(), next(resp), updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
}
// Now it is accepted, but we can vote for it numerous times.
for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) {
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(node0->GetId(), resp, updates));
+ BOOST_CHECK(p.registerVotes(node0->GetId(), next(resp), updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
}
// Running two iterration of the event loop so that vote gets triggerd on A
// and B.
+ NodeId secondNodeid = AvalancheTest::getSuitableNodeToQuery(p);
+ // NB: getSuitableNodeToQuery remove the node from the candidate list, so it
+ // has returned the node that will be queried second. The other one is the
+ // first.
+ NodeId firstNodeid =
+ (node0->GetId() == secondNodeid) ? node1->GetId() : node0->GetId();
AvalancheTest::runEventLoop(p);
AvalancheTest::runEventLoop(p);
// Next vote will finalize block A.
- BOOST_CHECK(p.registerVotes(node1->GetId(), resp, updates));
+ BOOST_CHECK(p.registerVotes(firstNodeid, next(resp), updates));
BOOST_CHECK_EQUAL(updates.size(), 1);
BOOST_CHECK(updates[0].getBlockIndex() == pindexA);
BOOST_CHECK_EQUAL(updates[0].getStatus(),
AvalancheBlockUpdate::Status::Finalized);
updates = {};
// We do not vote on A anymore.
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 1);
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHashB);
// Next vote will finalize block B.
- BOOST_CHECK(p.registerVotes(node0->GetId(), resp, updates));
+ BOOST_CHECK(p.registerVotes(secondNodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 1);
BOOST_CHECK(updates[0].getBlockIndex() == pindexB);
BOOST_CHECK_EQUAL(updates[0].getStatus(),
AvalancheBlockUpdate::Status::Finalized);
updates = {};
// There is nothing left to vote on.
invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 0);
CConnmanTest::ClearNodes();
}
BOOST_AUTO_TEST_CASE(poll_and_response) {
AvalancheProcessor p(g_connman.get());
std::vector<AvalancheBlockUpdate> updates;
CBlock block = CreateAndProcessBlock({}, CScript());
const uint256 blockHash = block.GetHash();
const CBlockIndex *pindex = mapBlockIndex[blockHash];
const Config &config = GetConfig();
// There is no node to query.
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1);
// Create a node that supports avalanche and one that doesn't.
auto oldnode = ConnectNode(config, NODE_NONE, *peerLogic);
auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic);
NodeId avanodeid = avanode->GetId();
// It returns the avalanche peer.
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
// Register a block and check it is added to the list of elements to poll.
BOOST_CHECK(p.addBlockToReconcile(pindex));
auto invs = AvalancheTest::getInvsForNextPoll(p);
BOOST_CHECK_EQUAL(invs.size(), 1);
BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK);
BOOST_CHECK(invs[0].hash == blockHash);
// Trigger a poll on avanode.
+ uint64_t round = AvalancheTest::getRound(p);
AvalancheTest::runEventLoop(p);
// There is no more suitable peer available, so return nothing.
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1);
// Respond to the request.
- AvalancheResponse resp = {0, {AvalancheVote(0, blockHash)}};
+ AvalancheResponse resp = {round, 0, {AvalancheVote(0, blockHash)}};
BOOST_CHECK(p.registerVotes(avanodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
// Now that avanode fullfilled his request, it is added back to the list of
// queriable nodes.
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
// Sending a response when not polled fails.
- BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates));
+ BOOST_CHECK(!p.registerVotes(avanodeid, next(resp), updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
// Trigger a poll on avanode.
+ round = AvalancheTest::getRound(p);
AvalancheTest::runEventLoop(p);
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1);
// Sending responses that do not match the request also fails.
// 1. Too many results.
- resp = {0, {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash)}};
+ resp = {
+ round, 0, {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash)}};
AvalancheTest::runEventLoop(p);
BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
// 2. Not enough results.
- resp = {0, {}};
+ resp = {AvalancheTest::getRound(p), 0, {}};
AvalancheTest::runEventLoop(p);
BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
// 3. Do not match the poll.
- resp = {0, {AvalancheVote()}};
+ resp = {AvalancheTest::getRound(p), 0, {AvalancheVote()}};
AvalancheTest::runEventLoop(p);
BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
- // Proper response gets processed and avanode is available again.
- resp = {0, {AvalancheVote(0, blockHash)}};
+ // 4. Invalid round count. Node is not returned to the pool.
+ uint64_t queryRound = AvalancheTest::getRound(p);
AvalancheTest::runEventLoop(p);
- BOOST_CHECK(p.registerVotes(avanodeid, resp, updates));
+
+ resp = {queryRound + 1, 0, {AvalancheVote()}};
+ BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
- BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
+ BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1);
- // Making request for invalid nodes do not work.
+ resp = {queryRound - 1, 0, {AvalancheVote()}};
+ BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates));
+ BOOST_CHECK_EQUAL(updates.size(), 0);
+ BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1);
+
+ // 5. Making request for invalid nodes do not work. Node is not returned to
+ // the pool.
+ resp = {queryRound, 0, {AvalancheVote(0, blockHash)}};
BOOST_CHECK(!p.registerVotes(avanodeid + 1234, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
+ BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1);
+
+ // Proper response gets processed and avanode is available again.
+ resp = {queryRound, 0, {AvalancheVote(0, blockHash)}};
+ BOOST_CHECK(p.registerVotes(avanodeid, resp, updates));
+ BOOST_CHECK_EQUAL(updates.size(), 0);
+ BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
// Out of order response are rejected.
CBlock block2 = CreateAndProcessBlock({}, CScript());
const uint256 blockHash2 = block2.GetHash();
CBlockIndex *pindex2 = mapBlockIndex[blockHash2];
BOOST_CHECK(p.addBlockToReconcile(pindex2));
- resp = {0, {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash2)}};
+ resp = {AvalancheTest::getRound(p),
+ 0,
+ {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash2)}};
AvalancheTest::runEventLoop(p);
BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
+ // But they are accepted in order.
+ resp = {AvalancheTest::getRound(p),
+ 0,
+ {AvalancheVote(0, blockHash2), AvalancheVote(0, blockHash)}};
+ AvalancheTest::runEventLoop(p);
+ BOOST_CHECK(p.registerVotes(avanode->GetId(), resp, updates));
+ BOOST_CHECK_EQUAL(updates.size(), 0);
+ BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
+
// When a block is marked invalid, stop polling.
pindex2->nStatus = pindex2->nStatus.withFailed();
- resp = {0, {AvalancheVote(0, blockHash)}};
+ resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(0, blockHash)}};
AvalancheTest::runEventLoop(p);
BOOST_CHECK(p.registerVotes(avanodeid, resp, updates));
BOOST_CHECK_EQUAL(updates.size(), 0);
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid);
CConnmanTest::ClearNodes();
}
BOOST_AUTO_TEST_CASE(event_loop) {
AvalancheProcessor p(g_connman.get());
CScheduler s;
CBlock block = CreateAndProcessBlock({}, CScript());
const uint256 blockHash = block.GetHash();
const CBlockIndex *pindex = mapBlockIndex[blockHash];
// Starting the event loop.
BOOST_CHECK(p.startEventLoop(s));
// There is one task planned in the next hour (our event loop).
boost::chrono::system_clock::time_point start, stop;
BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1);
// Starting twice doesn't start it twice.
BOOST_CHECK(!p.startEventLoop(s));
// Start the scheduler thread.
std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s));
// Create a node and a block to query.
const Config &config = GetConfig();
// Create a node that supports avalanche.
auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic);
NodeId nodeid = avanode->GetId();
// There is no query in flight at the moment.
BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), nodeid);
// Add a new block. Check it is added to the polls.
+ uint64_t queryRound = AvalancheTest::getRound(p);
BOOST_CHECK(p.addBlockToReconcile(pindex));
- uint32_t round = AvalancheTest::getRound(p);
for (int i = 0; i < 1000; i++) {
// Technically, this is a race condition, but this should do just fine
// as we wait up to 1s for an event that should take 10ms.
boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
- if (AvalancheTest::getRound(p) != round) {
+ if (AvalancheTest::getRound(p) != queryRound) {
break;
}
}
// Check that we effectively got a request and not timed out.
- BOOST_CHECK(AvalancheTest::getRound(p) > round);
+ BOOST_CHECK(AvalancheTest::getRound(p) > queryRound);
// Respond and check the cooldown time is respected.
- round = AvalancheTest::getRound(p);
+ uint64_t responseRound = AvalancheTest::getRound(p);
auto queryTime =
std::chrono::steady_clock::now() + std::chrono::milliseconds(100);
std::vector<AvalancheBlockUpdate> updates;
- p.registerVotes(nodeid, {100, {AvalancheVote(0, blockHash)}}, updates);
+ p.registerVotes(nodeid, {queryRound, 100, {AvalancheVote(0, blockHash)}},
+ updates);
for (int i = 0; i < 1000; i++) {
// We make sure that we do not get a request before queryTime.
boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
- if (AvalancheTest::getRound(p) != round) {
+ if (AvalancheTest::getRound(p) != responseRound) {
BOOST_CHECK(std::chrono::steady_clock::now() > queryTime);
break;
}
}
// But we eventually get one.
- BOOST_CHECK(AvalancheTest::getRound(p) > round);
+ BOOST_CHECK(AvalancheTest::getRound(p) > responseRound);
// Stop event loop.
BOOST_CHECK(p.stopEventLoop());
// We don't have any task scheduled anymore.
BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0);
// Can't stop the event loop twice.
BOOST_CHECK(!p.stopEventLoop());
// Wait for the scheduler to stop.
s.stop(true);
schedulerThread.join();
CConnmanTest::ClearNodes();
}
BOOST_AUTO_TEST_CASE(destructor) {
CScheduler s;
boost::chrono::system_clock::time_point start, stop;
// Start the scheduler thread.
std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s));
{
AvalancheProcessor p(g_connman.get());
BOOST_CHECK(p.startEventLoop(s));
BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1);
}
// Now that avalanche is destroyed, there is no more scheduled tasks.
BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0);
// Wait for the scheduler to stop.
s.stop(true);
schedulerThread.join();
}
BOOST_AUTO_TEST_SUITE_END()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Mar 2, 11:50 (1 d, 3 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5187702
Default Alt Text
(44 KB)
Attached To
rSTAGING Bitcoin ABC staging
Event Timeline
Log In to Comment