diff --git a/src/net.cpp b/src/net.cpp --- a/src/net.cpp +++ b/src/net.cpp @@ -2526,8 +2526,12 @@ std::bind(&CConnman::ThreadMessageHandler, this))); // Dump network addresses - scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), - DUMP_ADDRESSES_INTERVAL * 1000); + scheduler.scheduleEvery( + [this]() { + this->DumpData(); + return true; + }, + DUMP_ADDRESSES_INTERVAL * 1000); return true; } diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -938,8 +938,10 @@ EXTRA_PEER_CHECK_INTERVAL < STALE_CHECK_INTERVAL, "peer eviction timer should be less than stale tip check timer"); scheduler.scheduleEvery( - std::bind(&PeerLogicValidation::CheckForStaleTipAndEvictPeers, this, - consensusParams), + [this, &consensusParams]() { + this->CheckForStaleTipAndEvictPeers(consensusParams); + return true; + }, EXTRA_PEER_CHECK_INTERVAL * 1000); } diff --git a/src/scheduler.h b/src/scheduler.h --- a/src/scheduler.h +++ b/src/scheduler.h @@ -42,6 +42,7 @@ ~CScheduler(); typedef std::function Function; + typedef std::function Predicate; // Call func at/after time t void schedule(Function f, @@ -55,7 +56,7 @@ // forever, starting deltaMilliSeconds from now. To be more precise: every // time f is finished, it is rescheduled to run deltaMilliSeconds later. If // you need more accurate scheduling, don't use this method. - void scheduleEvery(Function f, int64_t deltaMilliSeconds); + void scheduleEvery(Predicate p, int64_t deltaMilliSeconds); // To keep things as simple as possible, there is no unschedule. diff --git a/src/scheduler.cpp b/src/scheduler.cpp --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -71,7 +71,9 @@ #endif // If there are multiple threads, the queue can empty while we're // waiting (another thread may service the task we were waiting on). - if (shouldStop() || taskQueue.empty()) continue; + if (shouldStop() || taskQueue.empty()) { + continue; + } Function f = taskQueue.begin()->second; taskQueue.erase(taskQueue.begin()); @@ -94,10 +96,11 @@ void CScheduler::stop(bool drain) { { boost::unique_lock lock(newTaskMutex); - if (drain) + if (drain) { stopWhenEmpty = true; - else + } else { stopRequested = true; + } } newTaskScheduled.notify_all(); } @@ -118,16 +121,17 @@ boost::chrono::milliseconds(deltaMilliSeconds)); } -static void Repeat(CScheduler *s, CScheduler::Function f, +static void Repeat(CScheduler *s, CScheduler::Predicate p, int64_t deltaMilliSeconds) { - f(); - s->scheduleFromNow(boost::bind(&Repeat, s, f, deltaMilliSeconds), - deltaMilliSeconds); + if (p()) { + s->scheduleFromNow(boost::bind(&Repeat, s, p, deltaMilliSeconds), + deltaMilliSeconds); + } } -void CScheduler::scheduleEvery(CScheduler::Function f, +void CScheduler::scheduleEvery(CScheduler::Predicate p, int64_t deltaMilliSeconds) { - scheduleFromNow(boost::bind(&Repeat, this, f, deltaMilliSeconds), + scheduleFromNow(boost::bind(&Repeat, this, p, deltaMilliSeconds), deltaMilliSeconds); } diff --git a/src/test/scheduler_tests.cpp b/src/test/scheduler_tests.cpp --- a/src/test/scheduler_tests.cpp +++ b/src/test/scheduler_tests.cpp @@ -13,6 +13,9 @@ #include #include +#include +#include + BOOST_AUTO_TEST_SUITE(scheduler_tests) static void microTask(CScheduler &s, boost::mutex &mutex, int &counter, @@ -83,17 +86,20 @@ // As soon as these are created they will start running and servicing the // queue boost::thread_group microThreads; - for (int i = 0; i < 5; i++) + for (int i = 0; i < 5; i++) { microThreads.create_thread( boost::bind(&CScheduler::serviceQueue, µTasks)); + } MicroSleep(600); now = boost::chrono::system_clock::now(); // More threads and more tasks: - for (int i = 0; i < 5; i++) + for (int i = 0; i < 5; i++) { microThreads.create_thread( boost::bind(&CScheduler::serviceQueue, µTasks)); + } + for (int i = 0; i < 100; i++) { boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng)); @@ -109,7 +115,8 @@ // Drain the task queue then exit threads microTasks.stop(true); - microThreads.join_all(); // ... wait until all the threads are done + // ... wait until all the threads are done + microThreads.join_all(); int counterSum = 0; for (int i = 0; i < 10; i++) { @@ -119,4 +126,52 @@ BOOST_CHECK_EQUAL(counterSum, 200); } +BOOST_AUTO_TEST_CASE(schedule_every) { + CScheduler scheduler; + + boost::condition_variable cvar; + std::atomic counter{15}; + std::atomic keepRunning{true}; + + scheduler.scheduleEvery( + [&keepRunning, &cvar, &counter, &scheduler]() { + BOOST_CHECK(counter > 0); + cvar.notify_all(); + if (--counter > 0) { + return true; + } + + // We reached the end of our test, make sure nothing run again for + // 100ms. + scheduler.scheduleFromNow( + [&keepRunning, &cvar]() { + keepRunning = false; + cvar.notify_all(); + }, + 100); + + // We set the counter to some magic value to check the scheduler + // empty its queue properly after 120ms. + scheduler.scheduleFromNow([&counter]() { counter = 42; }, 120); + return false; + }, + 5); + + // Start the scheduler thread. + std::thread schedulerThread( + std::bind(&CScheduler::serviceQueue, &scheduler)); + + boost::mutex mutex; + boost::unique_lock lock(mutex); + while (keepRunning) { + cvar.wait(lock); + BOOST_CHECK(counter >= 0); + } + + BOOST_CHECK_EQUAL(counter, 0); + scheduler.stop(true); + schedulerThread.join(); + BOOST_CHECK_EQUAL(counter, 42); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -4191,7 +4191,12 @@ // Run a thread to flush wallet periodically. if (!CWallet::fFlushScheduled.exchange(true)) { - scheduler.scheduleEvery(MaybeCompactWalletDB, 500); + scheduler.scheduleEvery( + []() { + MaybeCompactWalletDB(); + return true; + }, + 500); } }