diff --git a/src/avalanche.h b/src/avalanche.h --- a/src/avalanche.h +++ b/src/avalanche.h @@ -11,6 +11,8 @@ #include "serialize.h" #include "uint256.h" +#include +#include #include #include @@ -139,14 +141,27 @@ */ RWCollection> vote_records; + /** + * Start stop machinery. + */ + std::atomic stopRequest; + bool running GUARDED_BY(cs_running); + + CWaitableCriticalSection cs_running; + std::condition_variable cond_running; + public: - AvalancheProcessor() {} + AvalancheProcessor() : stopRequest(false), running(false) {} + ~AvalancheProcessor() { stopEventLoop(); } bool addBlockToReconcile(const CBlockIndex *pindex); bool isAccepted(const CBlockIndex *pindex) const; bool hasFinalized(const CBlockIndex *pindex) const; bool registerVotes(const AvalancheResponse &response); + + bool startEventLoop(CScheduler &scheduler); + bool stopEventLoop(); }; #endif // BITCOIN_AVALANCHE_H diff --git a/src/avalanche.cpp b/src/avalanche.cpp --- a/src/avalanche.cpp +++ b/src/avalanche.cpp @@ -74,3 +74,55 @@ return true; } + +namespace { +/** + * Run the avalanche event loop every 10ms. + */ +static int64_t AVALANCHE_TIME_STEP_MILLISECONDS = 10; +} + +bool AvalancheProcessor::startEventLoop(CScheduler &scheduler) { + LOCK(cs_running); + if (running) { + // Do not start the event loop twice. + return false; + } + + running = true; + + // Start the event loop. + scheduler.scheduleEvery( + [this]() -> bool { + if (!stopRequest) { + return true; + } + + LOCK(cs_running); + running = false; + + cond_running.notify_all(); + + // A stop request was made. + return false; + }, + AVALANCHE_TIME_STEP_MILLISECONDS); + + return true; +} + +bool AvalancheProcessor::stopEventLoop() { + WAIT_LOCK(cs_running, lock); + if (!running) { + return false; + } + + // Request avalanche to stop. + stopRequest = true; + + // Wait for avalanche to stop. + cond_running.wait(lock, [this] { return !running; }); + + stopRequest = false; + return true; +} diff --git a/src/test/avalanche_tests.cpp b/src/test/avalanche_tests.cpp --- a/src/test/avalanche_tests.cpp +++ b/src/test/avalanche_tests.cpp @@ -128,4 +128,56 @@ BOOST_CHECK(p.hasFinalized(pindex)); } +BOOST_AUTO_TEST_CASE(event_loop) { + AvalancheProcessor p; + CScheduler s; + + // Starting the event loop. + BOOST_CHECK(p.startEventLoop(s)); + + // There is one task planned in the next hour (our event loop). + boost::chrono::system_clock::time_point start, stop; + BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); + + // Starting twice doesn't start it twice. + BOOST_CHECK(!p.startEventLoop(s)); + + // Start the scheduler thread. + std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); + + // Stop event loop. + BOOST_CHECK(p.stopEventLoop()); + + // We don't have any task scheduled anymore. + BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); + + // Can't stop the event loop twice. + BOOST_CHECK(!p.stopEventLoop()); + + // Wait for the scheduler to stop. + s.stop(true); + schedulerThread.join(); +} + +BOOST_AUTO_TEST_CASE(destructor) { + CScheduler s; + boost::chrono::system_clock::time_point start, stop; + + // Start the scheduler thread. + std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); + + { + AvalancheProcessor p; + BOOST_CHECK(p.startEventLoop(s)); + BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); + } + + // Now that avalanche is destroyed, there is no more scheduled tasks. + BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); + + // Wait for the scheduler to stop. + s.stop(true); + schedulerThread.join(); +} + BOOST_AUTO_TEST_SUITE_END()