diff --git a/src/rpc/misc.cpp b/src/rpc/misc.cpp --- a/src/rpc/misc.cpp +++ b/src/rpc/misc.cpp @@ -478,7 +478,7 @@ // protect against null pointer dereference CHECK_NONFATAL(g_rpc_node); CHECK_NONFATAL(g_rpc_node->scheduler); - g_rpc_node->scheduler->MockForward(boost::chrono::seconds(delta_seconds)); + g_rpc_node->scheduler->MockForward(std::chrono::seconds(delta_seconds)); return NullUniValue; } diff --git a/src/scheduler.h b/src/scheduler.h --- a/src/scheduler.h +++ b/src/scheduler.h @@ -9,12 +9,12 @@ // // NOTE: -// boost::thread / boost::chrono should be ported to -// std::thread / std::chrono when we support C++11. +// boost::thread should be ported to std::thread +// when we support C++11. // -#include -#include - +#include +#include +#include #include // @@ -28,8 +28,9 @@ // s->scheduleFromNow(std::bind(Class::func, this, argument), 3); // boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s)); // -// ... then at program shutdown, clean up the thread running serviceQueue: -// t->interrupt(); +// ... then at program shutdown, make sure to call stop() to clean up the +// thread(s) running serviceQueue: +// s->stop(); // t->join(); // delete t; // delete s; // Must be done after thread is interrupted/joined. @@ -44,8 +45,7 @@ typedef std::function Predicate; // Call func at/after time t - void schedule(Function f, boost::chrono::system_clock::time_point t = - boost::chrono::system_clock::now()); + void schedule(Function f, std::chrono::system_clock::time_point t); // Convenience method: call f once deltaMilliSeconds from now void scheduleFromNow(Function f, int64_t deltaMilliSeconds); @@ -62,7 +62,7 @@ * Iterates through items on taskQueue and reschedules them * to be delta_seconds sooner. */ - void MockForward(boost::chrono::seconds delta_seconds); + void MockForward(std::chrono::seconds delta_seconds); // To keep things as simple as possible, there is no unschedule. @@ -75,22 +75,23 @@ // there is no work left to be done (drain=true) void stop(bool drain = false); - // Returns number of tasks waiting to be serviced, and first and last task - // times - size_t getQueueInfo(boost::chrono::system_clock::time_point &first, - boost::chrono::system_clock::time_point &last) const; + // Returns number of tasks waiting to be serviced, + // and first and last task times + size_t getQueueInfo(std::chrono::system_clock::time_point &first, + std::chrono::system_clock::time_point &last) const; // Returns true if there are threads actively running in serviceQueue() bool AreThreadsServicingQueue() const; private: - std::multimap taskQueue; - boost::condition_variable newTaskScheduled; - mutable boost::mutex newTaskMutex; - int nThreadsServicingQueue; - bool stopRequested; - bool stopWhenEmpty; - bool shouldStop() const { + mutable Mutex newTaskMutex; + std::condition_variable newTaskScheduled; + std::multimap + taskQueue GUARDED_BY(newTaskMutex); + int nThreadsServicingQueue GUARDED_BY(newTaskMutex); + bool stopRequested GUARDED_BY(newTaskMutex); + bool stopWhenEmpty GUARDED_BY(newTaskMutex); + bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } }; diff --git a/src/scheduler.cpp b/src/scheduler.cpp --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -5,7 +5,6 @@ #include #include -#include #include #include @@ -18,7 +17,7 @@ } void CScheduler::serviceQueue() { - boost::unique_lock lock(newTaskMutex); + WAIT_LOCK(newTaskMutex, lock); ++nThreadsServicingQueue; // newTaskMutex is locked throughout this loop EXCEPT when the thread is @@ -26,23 +25,21 @@ while (!shouldStop()) { try { if (!shouldStop() && taskQueue.empty()) { - reverse_lock> rlock(lock); + REVERSE_LOCK(lock); } while (!shouldStop() && taskQueue.empty()) { // Wait until there is something to do. newTaskScheduled.wait(lock); } - // Wait until either there is a new task, or until the time of the - // first item on the queue. - // Some boost versions have a conflicting overload of wait_until - // that returns void. Explicitly use a template here to avoid - // hitting that overload. + // Wait until either there is a new task, or until + // the time of the first item on the queue: + while (!shouldStop() && !taskQueue.empty()) { - boost::chrono::system_clock::time_point timeToWaitFor = + std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first; - if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == - boost::cv_status::timeout) { + if (newTaskScheduled.wait_until(lock, timeToWaitFor) == + std::cv_status::timeout) { // Exit loop after timeout, it means we reached the time of // the event break; @@ -61,7 +58,7 @@ { // Unlock before calling f, so it can reschedule itself or // another task without deadlocking: - reverse_lock> rlock(lock); + REVERSE_LOCK(lock); f(); } } catch (...) { @@ -75,7 +72,7 @@ void CScheduler::stop(bool drain) { { - boost::unique_lock lock(newTaskMutex); + LOCK(newTaskMutex); if (drain) { stopWhenEmpty = true; } else { @@ -86,9 +83,9 @@ } void CScheduler::schedule(CScheduler::Function f, - boost::chrono::system_clock::time_point t) { + std::chrono::system_clock::time_point t) { { - boost::unique_lock lock(newTaskMutex); + LOCK(newTaskMutex); taskQueue.insert(std::make_pair(t, f)); } newTaskScheduled.notify_one(); @@ -96,19 +93,18 @@ void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds) { - schedule(f, boost::chrono::system_clock::now() + - boost::chrono::milliseconds(deltaMilliSeconds)); + schedule(f, std::chrono::system_clock::now() + + std::chrono::milliseconds(deltaMilliSeconds)); } -void CScheduler::MockForward(boost::chrono::seconds delta_seconds) { - assert(delta_seconds.count() > 0 && - delta_seconds < boost::chrono::hours{1}); +void CScheduler::MockForward(std::chrono::seconds delta_seconds) { + assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1}); { - boost::unique_lock lock(newTaskMutex); + LOCK(newTaskMutex); // use temp_queue to maintain updated schedule - std::multimap + std::multimap temp_queue; for (const auto &element : taskQueue) { @@ -140,9 +136,9 @@ } size_t -CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, - boost::chrono::system_clock::time_point &last) const { - boost::unique_lock lock(newTaskMutex); +CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first, + std::chrono::system_clock::time_point &last) const { + LOCK(newTaskMutex); size_t result = taskQueue.size(); if (!taskQueue.empty()) { first = taskQueue.begin()->first; @@ -152,7 +148,7 @@ } bool CScheduler::AreThreadsServicingQueue() const { - boost::unique_lock lock(newTaskMutex); + LOCK(newTaskMutex); return nThreadsServicingQueue; } @@ -170,7 +166,8 @@ } } m_pscheduler->schedule( - std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this)); + std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), + std::chrono::system_clock::now()); } void SingleThreadedSchedulerClient::ProcessQueue() { diff --git a/src/test/avalanche_tests.cpp b/src/test/avalanche_tests.cpp --- a/src/test/avalanche_tests.cpp +++ b/src/test/avalanche_tests.cpp @@ -879,7 +879,7 @@ BOOST_CHECK(p.startEventLoop(s)); // There is one task planned in the next hour (our event loop). - boost::chrono::system_clock::time_point start, stop; + std::chrono::system_clock::time_point start, stop; BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); // Starting twice doesn't start it twice. @@ -951,7 +951,7 @@ BOOST_AUTO_TEST_CASE(destructor) { CScheduler s; - boost::chrono::system_clock::time_point start, stop; + std::chrono::system_clock::time_point start, stop; // Start the scheduler thread. std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); diff --git a/src/test/scheduler_tests.cpp b/src/test/scheduler_tests.cpp --- a/src/test/scheduler_tests.cpp +++ b/src/test/scheduler_tests.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -18,13 +19,13 @@ static void microTask(CScheduler &s, boost::mutex &mutex, int &counter, int delta, - boost::chrono::system_clock::time_point rescheduleTime) { + std::chrono::system_clock::time_point rescheduleTime) { { boost::unique_lock lock(mutex); counter += delta; } - boost::chrono::system_clock::time_point noTime = - boost::chrono::system_clock::time_point::min(); + std::chrono::system_clock::time_point noTime = + std::chrono::system_clock::time_point::min(); if (rescheduleTime != noTime) { CScheduler::Function f = std::bind(µTask, std::ref(s), std::ref(mutex), @@ -62,18 +63,18 @@ return -1000 + int(rc.randrange(2001)); }; - boost::chrono::system_clock::time_point start = - boost::chrono::system_clock::now(); - boost::chrono::system_clock::time_point now = start; - boost::chrono::system_clock::time_point first, last; + std::chrono::system_clock::time_point start = + std::chrono::system_clock::now(); + std::chrono::system_clock::time_point now = start; + std::chrono::system_clock::time_point first, last; size_t nTasks = microTasks.getQueueInfo(first, last); BOOST_CHECK(nTasks == 0); for (int i = 0; i < 100; ++i) { - boost::chrono::system_clock::time_point t = - now + boost::chrono::microseconds(randomMsec(rng)); - boost::chrono::system_clock::time_point tReschedule = - now + boost::chrono::microseconds(500 + randomMsec(rng)); + std::chrono::system_clock::time_point t = + now + std::chrono::microseconds(randomMsec(rng)); + std::chrono::system_clock::time_point tReschedule = + now + std::chrono::microseconds(500 + randomMsec(rng)); int whichCounter = zeroToNine(rng); CScheduler::Function f = std::bind(µTask, std::ref(microTasks), std::ref(counterMutex[whichCounter]), @@ -95,7 +96,7 @@ } UninterruptibleSleep(std::chrono::microseconds{600}); - now = boost::chrono::system_clock::now(); + now = std::chrono::system_clock::now(); // More threads and more tasks: for (int i = 0; i < 5; i++) { @@ -104,10 +105,10 @@ } for (int i = 0; i < 100; i++) { - boost::chrono::system_clock::time_point t = - now + boost::chrono::microseconds(randomMsec(rng)); - boost::chrono::system_clock::time_point tReschedule = - now + boost::chrono::microseconds(500 + randomMsec(rng)); + std::chrono::system_clock::time_point t = + now + std::chrono::microseconds(randomMsec(rng)); + std::chrono::system_clock::time_point tReschedule = + now + std::chrono::microseconds(500 + randomMsec(rng)); int whichCounter = zeroToNine(rng); CScheduler::Function f = std::bind(µTask, std::ref(microTasks), std::ref(counterMutex[whichCounter]), @@ -236,14 +237,14 @@ scheduler.scheduleFromNow(dummy, 8 * min_in_milli); // check taskQueue - boost::chrono::system_clock::time_point first, last; + std::chrono::system_clock::time_point first, last; size_t num_tasks = scheduler.getQueueInfo(first, last); BOOST_CHECK_EQUAL(num_tasks, 3ul); std::thread scheduler_thread([&]() { scheduler.serviceQueue(); }); // bump the scheduler forward 5 minutes - scheduler.MockForward(boost::chrono::seconds(5 * 60)); + scheduler.MockForward(std::chrono::seconds(5 * 60)); // ensure scheduler has chance to process all tasks queued for before 1 ms // from now. @@ -258,11 +259,10 @@ BOOST_CHECK_EQUAL(counter, 2); // check that the time of the remaining job has been updated - boost::chrono::system_clock::time_point now = - boost::chrono::system_clock::now(); + std::chrono::system_clock::time_point now = + std::chrono::system_clock::now(); int delta = - boost::chrono::duration_cast(first - now) - .count(); + std::chrono::duration_cast(first - now).count(); // should be between 2 & 3 minutes from now BOOST_CHECK(delta > 2 * 60 && delta < 3 * 60); } diff --git a/src/test/util/setup_common.h b/src/test/util/setup_common.h --- a/src/test/util/setup_common.h +++ b/src/test/util/setup_common.h @@ -16,6 +16,8 @@ #include #include +#include // For boost::thread_group + #include /** diff --git a/test/lint/lint-boost-dependencies.sh b/test/lint/lint-boost-dependencies.sh --- a/test/lint/lint-boost-dependencies.sh +++ b/test/lint/lint-boost-dependencies.sh @@ -13,7 +13,6 @@ boost/algorithm/string/classification.hpp boost/algorithm/string/replace.hpp boost/algorithm/string/split.hpp - boost/chrono/chrono.hpp boost/date_time/posix_time/posix_time.hpp boost/filesystem.hpp boost/filesystem/fstream.hpp