diff --git a/src/avalanche.cpp b/src/avalanche.cpp index 13fa9ff70..7253f34b5 100644 --- a/src/avalanche.cpp +++ b/src/avalanche.cpp @@ -1,76 +1,128 @@ // Copyright (c) 2018 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "avalanche.h" #include "chain.h" #include "netmessagemaker.h" #include "scheduler.h" #include "validation.h" bool AvalancheProcessor::addBlockToReconcile(const CBlockIndex *pindex) { return vote_records.getWriteView() ->insert(std::make_pair(pindex, VoteRecord())) .second; } static const VoteRecord *GetRecord( const RWCollection> &vote_records, const CBlockIndex *pindex) { auto r = vote_records.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return nullptr; } return &it->second; } bool AvalancheProcessor::isAccepted(const CBlockIndex *pindex) const { if (auto vr = GetRecord(vote_records, pindex)) { return vr->isValid(); } return false; } bool AvalancheProcessor::hasFinalized(const CBlockIndex *pindex) const { if (auto vr = GetRecord(vote_records, pindex)) { return vr->hasFinalized(); } return false; } bool AvalancheProcessor::registerVotes(const AvalancheResponse &response) { const std::vector &votes = response.GetVotes(); std::map responseIndex; { LOCK(cs_main); for (auto &v : votes) { BlockMap::iterator mi = mapBlockIndex.find(v.GetHash()); if (mi == mapBlockIndex.end()) { // This should not happen, but just in case... continue; } responseIndex.insert(std::make_pair(mi->second, v)); } } { // Register votes. auto w = vote_records.getWriteView(); for (auto &p : responseIndex) { const CBlockIndex *pindex = p.first; const AvalancheVote &v = p.second; w[pindex].registerVote(v.IsValid()); } } return true; } + +namespace { +/** + * Run the avalanche event loop every 10ms. + */ +static int64_t AVALANCHE_TIME_STEP_MILLISECONDS = 10; +} + +bool AvalancheProcessor::startEventLoop(CScheduler &scheduler) { + LOCK(cs_running); + if (running) { + // Do not start the event loop twice. + return false; + } + + running = true; + + // Start the event loop. + scheduler.scheduleEvery( + [this]() -> bool { + if (!stopRequest) { + return true; + } + + LOCK(cs_running); + running = false; + + cond_running.notify_all(); + + // A stop request was made. + return false; + }, + AVALANCHE_TIME_STEP_MILLISECONDS); + + return true; +} + +bool AvalancheProcessor::stopEventLoop() { + WAIT_LOCK(cs_running, lock); + if (!running) { + return false; + } + + // Request avalanche to stop. + stopRequest = true; + + // Wait for avalanche to stop. + cond_running.wait(lock, [this] { return !running; }); + + stopRequest = false; + return true; +} diff --git a/src/avalanche.h b/src/avalanche.h index 1071abcbb..904ee5a2a 100644 --- a/src/avalanche.h +++ b/src/avalanche.h @@ -1,152 +1,167 @@ // Copyright (c) 2018 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_AVALANCHE_H #define BITCOIN_AVALANCHE_H #include "net.h" // for NodeId #include "protocol.h" // for CInv #include "rwcollection.h" #include "serialize.h" #include "uint256.h" +#include +#include #include #include class Config; class CBlockIndex; class CScheduler; namespace { /** * Finalization score. */ static int AVALANCHE_FINALIZATION_SCORE = 128; } /** * Vote history. */ struct VoteRecord { private: // Historical record of votes. uint16_t votes; // confidence's LSB bit is the result. Higher bits are actual confidence // score. uint16_t confidence; /** * Return the number of bits set in an integer value. * TODO: There are compiler intrinsics to do that, but we'd need to get them * detected so this will do for now. */ static uint32_t countBits(uint32_t value) { uint32_t count = 0; while (value) { // If the value is non zero, then at least one bit is set. count++; // Clear the rightmost bit set. value &= (value - 1); } return count; } public: VoteRecord() : votes(0xaaaa), confidence(0) {} bool isValid() const { return confidence & 0x01; } uint16_t getConfidence() const { return confidence >> 1; } bool hasFinalized() const { return getConfidence() >= AVALANCHE_FINALIZATION_SCORE; } bool registerVote(bool vote) { votes = (votes << 1) | vote; auto bits = countBits(votes & 0xff); bool yes = bits > 6; bool no = bits < 2; if (!yes && !no) { // The vote is inconclusive. return false; } if (isValid() == yes) { // If the vote is in agreement with our internal status, increase // confidence. confidence += 2; } else { // The vote did not agree with our internal state, in that case, // reset confidence. confidence = yes; } return true; } }; class AvalancheVote { uint32_t error; uint256 hash; public: AvalancheVote() : error(-1), hash() {} AvalancheVote(uint32_t errorIn, uint256 hashIn) : error(errorIn), hash(hashIn) {} const uint256 &GetHash() const { return hash; } bool IsValid() const { return error == 0; } // serialization support ADD_SERIALIZE_METHODS; template inline void SerializationOp(Stream &s, Operation ser_action) { READWRITE(error); READWRITE(hash); } }; class AvalancheResponse { uint32_t cooldown; std::vector votes; public: AvalancheResponse(uint32_t cooldownIn, std::vector votesIn) : cooldown(cooldownIn), votes(votesIn) {} const std::vector &GetVotes() const { return votes; } // serialization support ADD_SERIALIZE_METHODS; template inline void SerializationOp(Stream &s, Operation ser_action) { READWRITE(cooldown); READWRITE(votes); } }; class AvalancheProcessor { private: /** * Blocks to run avalanche on. */ RWCollection> vote_records; + /** + * Start stop machinery. + */ + std::atomic stopRequest; + bool running GUARDED_BY(cs_running); + + CWaitableCriticalSection cs_running; + std::condition_variable cond_running; + public: - AvalancheProcessor() {} + AvalancheProcessor() : stopRequest(false), running(false) {} + ~AvalancheProcessor() { stopEventLoop(); } bool addBlockToReconcile(const CBlockIndex *pindex); bool isAccepted(const CBlockIndex *pindex) const; bool hasFinalized(const CBlockIndex *pindex) const; bool registerVotes(const AvalancheResponse &response); + + bool startEventLoop(CScheduler &scheduler); + bool stopEventLoop(); }; #endif // BITCOIN_AVALANCHE_H diff --git a/src/test/avalanche_tests.cpp b/src/test/avalanche_tests.cpp index 1d7a9a8be..d75d44dc0 100644 --- a/src/test/avalanche_tests.cpp +++ b/src/test/avalanche_tests.cpp @@ -1,131 +1,183 @@ // Copyright (c) 2010 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "avalanche.h" #include "test/test_bitcoin.h" #include BOOST_FIXTURE_TEST_SUITE(avalanche_tests, TestChain100Setup) #define REGISTER_VOTE_AND_CHECK(vr, vote, state, finalized, confidence) \ vr.registerVote(vote); \ BOOST_CHECK_EQUAL(vr.isValid(), state); \ BOOST_CHECK_EQUAL(vr.hasFinalized(), finalized); \ BOOST_CHECK_EQUAL(vr.getConfidence(), confidence); BOOST_AUTO_TEST_CASE(vote_record) { VoteRecord vr; // Check initial state. BOOST_CHECK_EQUAL(vr.isValid(), false); BOOST_CHECK_EQUAL(vr.hasFinalized(), false); BOOST_CHECK_EQUAL(vr.getConfidence(), 0); // We register one vote for, which keep things at 4/4. REGISTER_VOTE_AND_CHECK(vr, true, false, false, 0); // One more and we are at 5/3. REGISTER_VOTE_AND_CHECK(vr, true, false, false, 0); // One more and we are at 5/3. REGISTER_VOTE_AND_CHECK(vr, true, false, false, 0); // One more and we are at 6/2. REGISTER_VOTE_AND_CHECK(vr, true, false, false, 0); // One more and we are at 6/2. REGISTER_VOTE_AND_CHECK(vr, true, false, false, 0); // Next vote will flip state, and confidence will increase as long as we // vote yes. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { REGISTER_VOTE_AND_CHECK(vr, true, true, false, i); } // The next vote will finalize the decision. REGISTER_VOTE_AND_CHECK(vr, false, true, true, AVALANCHE_FINALIZATION_SCORE); // Now that we have two no votes, confidence stop increasing. for (int i = 0; i < 5; i++) { REGISTER_VOTE_AND_CHECK(vr, false, true, true, AVALANCHE_FINALIZATION_SCORE); } // Next vote will flip state, and confidence will increase as long as we // vote no. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { REGISTER_VOTE_AND_CHECK(vr, false, false, false, i); } // The next vote will finalize the decision. REGISTER_VOTE_AND_CHECK(vr, true, false, true, AVALANCHE_FINALIZATION_SCORE); } BOOST_AUTO_TEST_CASE(block_register) { AvalancheProcessor p; CBlock block = CreateAndProcessBlock({}, CScript()); const uint256 hash = block.GetHash(); const CBlockIndex *pindex = mapBlockIndex[hash]; // Querying for random block returns false. BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK(!p.hasFinalized(pindex)); // Newly added blocks are also considered rejected. BOOST_CHECK(p.addBlockToReconcile(pindex)); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK(!p.hasFinalized(pindex)); // Let's vote for this block a few times. AvalancheResponse resp{0, {AvalancheVote(0, hash)}}; for (int i = 0; i < 5; i++) { p.registerVotes(resp); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK(!p.hasFinalized(pindex)); } // Now it is accepted, but we can vote for it numerous times. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { p.registerVotes(resp); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK(!p.hasFinalized(pindex)); } // Now finalize the decision. resp = {0, {AvalancheVote(1, hash)}}; p.registerVotes(resp); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK(p.hasFinalized(pindex)); // Now let's undo this and finalize rejection. for (int i = 0; i < 5; i++) { p.registerVotes(resp); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK(p.hasFinalized(pindex)); } // Now it is rejected, but we can vote for it numerous times. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { p.registerVotes(resp); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK(!p.hasFinalized(pindex)); } // Now finalize the decision. p.registerVotes(resp); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK(p.hasFinalized(pindex)); // Adding the block twice does nothing. BOOST_CHECK(!p.addBlockToReconcile(pindex)); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK(p.hasFinalized(pindex)); } +BOOST_AUTO_TEST_CASE(event_loop) { + AvalancheProcessor p; + CScheduler s; + + // Starting the event loop. + BOOST_CHECK(p.startEventLoop(s)); + + // There is one task planned in the next hour (our event loop). + boost::chrono::system_clock::time_point start, stop; + BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); + + // Starting twice doesn't start it twice. + BOOST_CHECK(!p.startEventLoop(s)); + + // Start the scheduler thread. + std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); + + // Stop event loop. + BOOST_CHECK(p.stopEventLoop()); + + // We don't have any task scheduled anymore. + BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); + + // Can't stop the event loop twice. + BOOST_CHECK(!p.stopEventLoop()); + + // Wait for the scheduler to stop. + s.stop(true); + schedulerThread.join(); +} + +BOOST_AUTO_TEST_CASE(destructor) { + CScheduler s; + boost::chrono::system_clock::time_point start, stop; + + // Start the scheduler thread. + std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); + + { + AvalancheProcessor p; + BOOST_CHECK(p.startEventLoop(s)); + BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); + } + + // Now that avalanche is destroyed, there is no more scheduled tasks. + BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); + + // Wait for the scheduler to stop. + s.stop(true); + schedulerThread.join(); +} + BOOST_AUTO_TEST_SUITE_END()