diff --git a/src/scheduler.cpp b/src/scheduler.cpp index b5dc69675..c8a77ab42 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -1,210 +1,235 @@ // Copyright (c) 2015-2016 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false) {} CScheduler::~CScheduler() { assert(nThreadsServicingQueue == 0); } void CScheduler::serviceQueue() { boost::unique_lock lock(newTaskMutex); ++nThreadsServicingQueue; // newTaskMutex is locked throughout this loop EXCEPT when the thread is // waiting or when the user's function is called. while (!shouldStop()) { try { if (!shouldStop() && taskQueue.empty()) { reverse_lock> rlock(lock); } while (!shouldStop() && taskQueue.empty()) { // Wait until there is something to do. newTaskScheduled.wait(lock); } // Wait until either there is a new task, or until the time of the // first item on the queue. // Some boost versions have a conflicting overload of wait_until // that returns void. Explicitly use a template here to avoid // hitting that overload. while (!shouldStop() && !taskQueue.empty()) { boost::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first; if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == boost::cv_status::timeout) { // Exit loop after timeout, it means we reached the time of // the event break; } } // If there are multiple threads, the queue can empty while we're // waiting (another thread may service the task we were waiting on). if (shouldStop() || taskQueue.empty()) { continue; } Function f = taskQueue.begin()->second; taskQueue.erase(taskQueue.begin()); { // Unlock before calling f, so it can reschedule itself or // another task without deadlocking: reverse_lock> rlock(lock); f(); } } catch (...) { --nThreadsServicingQueue; throw; } } --nThreadsServicingQueue; newTaskScheduled.notify_one(); } void CScheduler::stop(bool drain) { { boost::unique_lock lock(newTaskMutex); if (drain) { stopWhenEmpty = true; } else { stopRequested = true; } } newTaskScheduled.notify_all(); } void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t) { { boost::unique_lock lock(newTaskMutex); taskQueue.insert(std::make_pair(t, f)); } newTaskScheduled.notify_one(); } void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds) { schedule(f, boost::chrono::system_clock::now() + boost::chrono::milliseconds(deltaMilliSeconds)); } +void CScheduler::MockForward(boost::chrono::seconds delta_seconds) { + assert(delta_seconds.count() > 0 && + delta_seconds < boost::chrono::hours{1}); + + { + boost::unique_lock lock(newTaskMutex); + + // use temp_queue to maintain updated schedule + std::multimap + temp_queue; + + for (const auto &element : taskQueue) { + temp_queue.emplace_hint(temp_queue.cend(), + element.first - delta_seconds, + element.second); + } + + // point taskQueue to temp_queue + taskQueue = std::move(temp_queue); + } + + // notify that the taskQueue needs to be processed + newTaskScheduled.notify_one(); +} + static void Repeat(CScheduler *s, CScheduler::Predicate p, int64_t deltaMilliSeconds) { if (p()) { s->scheduleFromNow(std::bind(&Repeat, s, p, deltaMilliSeconds), deltaMilliSeconds); } } void CScheduler::scheduleEvery(CScheduler::Predicate p, int64_t deltaMilliSeconds) { scheduleFromNow(std::bind(&Repeat, this, p, deltaMilliSeconds), deltaMilliSeconds); } size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, boost::chrono::system_clock::time_point &last) const { boost::unique_lock lock(newTaskMutex); size_t result = taskQueue.size(); if (!taskQueue.empty()) { first = taskQueue.begin()->first; last = taskQueue.rbegin()->first; } return result; } bool CScheduler::AreThreadsServicingQueue() const { boost::unique_lock lock(newTaskMutex); return nThreadsServicingQueue; } void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() { { LOCK(m_cs_callbacks_pending); // Try to avoid scheduling too many copies here, but if we // accidentally have two ProcessQueue's scheduled at once its // not a big deal. if (m_are_callbacks_running) { return; } if (m_callbacks_pending.empty()) { return; } } m_pscheduler->schedule( std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this)); } void SingleThreadedSchedulerClient::ProcessQueue() { std::function callback; { LOCK(m_cs_callbacks_pending); if (m_are_callbacks_running) { return; } if (m_callbacks_pending.empty()) { return; } m_are_callbacks_running = true; callback = std::move(m_callbacks_pending.front()); m_callbacks_pending.pop_front(); } // RAII the setting of fCallbacksRunning and calling // MaybeScheduleProcessQueue to ensure both happen safely even if callback() // throws. struct RAIICallbacksRunning { SingleThreadedSchedulerClient *instance; explicit RAIICallbacksRunning(SingleThreadedSchedulerClient *_instance) : instance(_instance) {} ~RAIICallbacksRunning() { { LOCK(instance->m_cs_callbacks_pending); instance->m_are_callbacks_running = false; } instance->MaybeScheduleProcessQueue(); } } raiicallbacksrunning(this); callback(); } void SingleThreadedSchedulerClient::AddToProcessQueue( std::function func) { assert(m_pscheduler); { LOCK(m_cs_callbacks_pending); m_callbacks_pending.emplace_back(std::move(func)); } MaybeScheduleProcessQueue(); } void SingleThreadedSchedulerClient::EmptyQueue() { assert(!m_pscheduler->AreThreadsServicingQueue()); bool should_continue = true; while (should_continue) { ProcessQueue(); LOCK(m_cs_callbacks_pending); should_continue = !m_callbacks_pending.empty(); } } size_t SingleThreadedSchedulerClient::CallbacksPending() { LOCK(m_cs_callbacks_pending); return m_callbacks_pending.size(); } diff --git a/src/scheduler.h b/src/scheduler.h index 26be6a3f9..e9abde98f 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -1,133 +1,140 @@ // Copyright (c) 2015 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_SCHEDULER_H #define BITCOIN_SCHEDULER_H #include // // NOTE: // boost::thread / boost::chrono should be ported to // std::thread / std::chrono when we support C++11. // #include #include #include // // Simple class for background tasks that should be run periodically or once // "after a while" // // Usage: // // CScheduler* s = new CScheduler(); // s->scheduleFromNow(doSomething, 11); // Assuming a: void doSomething() { } // s->scheduleFromNow(std::bind(Class::func, this, argument), 3); // boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s)); // // ... then at program shutdown, clean up the thread running serviceQueue: // t->interrupt(); // t->join(); // delete t; // delete s; // Must be done after thread is interrupted/joined. // class CScheduler { public: CScheduler(); ~CScheduler(); typedef std::function Function; typedef std::function Predicate; // Call func at/after time t void schedule(Function f, boost::chrono::system_clock::time_point t = boost::chrono::system_clock::now()); // Convenience method: call f once deltaMilliSeconds from now void scheduleFromNow(Function f, int64_t deltaMilliSeconds); // Another convenience method: call p approximately every deltaMilliSeconds // forever, starting deltaMilliSeconds from now untill p returns false. To // be more precise: every time p is finished, it is rescheduled to run // deltaMilliSeconds later. If you need more accurate scheduling, don't use // this method. void scheduleEvery(Predicate p, int64_t deltaMilliSeconds); + /** + * Mock the scheduler to fast forward in time. + * Iterates through items on taskQueue and reschedules them + * to be delta_seconds sooner. + */ + void MockForward(boost::chrono::seconds delta_seconds); + // To keep things as simple as possible, there is no unschedule. // Services the queue 'forever'. Should be run in a thread, and interrupted // using boost::interrupt_thread void serviceQueue(); // Tell any threads running serviceQueue to stop as soon as they're done // servicing whatever task they're currently servicing (drain=false) or when // there is no work left to be done (drain=true) void stop(bool drain = false); // Returns number of tasks waiting to be serviced, and first and last task // times size_t getQueueInfo(boost::chrono::system_clock::time_point &first, boost::chrono::system_clock::time_point &last) const; // Returns true if there are threads actively running in serviceQueue() bool AreThreadsServicingQueue() const; private: std::multimap taskQueue; boost::condition_variable newTaskScheduled; mutable boost::mutex newTaskMutex; int nThreadsServicingQueue; bool stopRequested; bool stopWhenEmpty; bool shouldStop() const { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } }; /** * Class used by CScheduler clients which may schedule multiple jobs * which are required to be run serially. Jobs may not be run on the * same thread, but no two jobs will be executed * at the same time and memory will be release-acquire consistent * (the scheduler will internally do an acquire before invoking a callback * as well as a release at the end). In practice this means that a callback * B() will be able to observe all of the effects of callback A() which executed * before it. */ class SingleThreadedSchedulerClient { private: CScheduler *m_pscheduler; RecursiveMutex m_cs_callbacks_pending; std::list> m_callbacks_pending GUARDED_BY(m_cs_callbacks_pending); bool m_are_callbacks_running GUARDED_BY(m_cs_callbacks_pending) = false; void MaybeScheduleProcessQueue(); void ProcessQueue(); public: explicit SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {} /** * Add a callback to be executed. Callbacks are executed serially * and memory is release-acquire consistent between callback executions. * Practially, this means that callbacks can behave as if they are executed * in order by a single thread. */ void AddToProcessQueue(std::function func); // Processes all remaining queue members on the calling thread, blocking // until queue is empty. Must be called after the CScheduler has no // remaining processing threads! void EmptyQueue(); size_t CallbacksPending(); }; #endif // BITCOIN_SCHEDULER_H