Changeset View
Changeset View
Standalone View
Standalone View
src/scheduler.h
// Copyright (c) 2015 The Bitcoin Core developers | // Copyright (c) 2015 The Bitcoin Core developers | ||||
// Distributed under the MIT software license, see the accompanying | // Distributed under the MIT software license, see the accompanying | ||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php. | // file COPYING or http://www.opensource.org/licenses/mit-license.php. | ||||
#ifndef BITCOIN_SCHEDULER_H | #ifndef BITCOIN_SCHEDULER_H | ||||
#define BITCOIN_SCHEDULER_H | #define BITCOIN_SCHEDULER_H | ||||
#include <sync.h> | #include <sync.h> | ||||
// | // | ||||
// NOTE: | // NOTE: | ||||
// boost::thread / boost::chrono should be ported to | // boost::thread should be ported to std::thread | ||||
// std::thread / std::chrono when we support C++11. | // when we support C++11. | ||||
// | // | ||||
#include <boost/chrono/chrono.hpp> | #include <condition_variable> | ||||
#include <boost/thread.hpp> | #include <functional> | ||||
#include <list> | |||||
#include <map> | #include <map> | ||||
// | // | ||||
// Simple class for background tasks that should be run periodically or once | // Simple class for background tasks that should be run periodically or once | ||||
// "after a while" | // "after a while" | ||||
// | // | ||||
// Usage: | // Usage: | ||||
// | // | ||||
// CScheduler* s = new CScheduler(); | // CScheduler* s = new CScheduler(); | ||||
// s->scheduleFromNow(doSomething, 11); // Assuming a: void doSomething() { } | // s->scheduleFromNow(doSomething, 11); // Assuming a: void doSomething() { } | ||||
// s->scheduleFromNow(std::bind(Class::func, this, argument), 3); | // s->scheduleFromNow(std::bind(Class::func, this, argument), 3); | ||||
// boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s)); | // boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s)); | ||||
// | // | ||||
// ... then at program shutdown, clean up the thread running serviceQueue: | // ... then at program shutdown, make sure to call stop() to clean up the | ||||
// t->interrupt(); | // thread(s) running serviceQueue: | ||||
// s->stop(); | |||||
// t->join(); | // t->join(); | ||||
// delete t; | // delete t; | ||||
// delete s; // Must be done after thread is interrupted/joined. | // delete s; // Must be done after thread is interrupted/joined. | ||||
// | // | ||||
class CScheduler { | class CScheduler { | ||||
public: | public: | ||||
CScheduler(); | CScheduler(); | ||||
~CScheduler(); | ~CScheduler(); | ||||
typedef std::function<void()> Function; | typedef std::function<void()> Function; | ||||
typedef std::function<bool()> Predicate; | typedef std::function<bool()> Predicate; | ||||
// Call func at/after time t | // Call func at/after time t | ||||
void schedule(Function f, boost::chrono::system_clock::time_point t = | void schedule(Function f, std::chrono::system_clock::time_point t); | ||||
boost::chrono::system_clock::now()); | |||||
// Convenience method: call f once deltaMilliSeconds from now | // Convenience method: call f once deltaMilliSeconds from now | ||||
void scheduleFromNow(Function f, int64_t deltaMilliSeconds); | void scheduleFromNow(Function f, int64_t deltaMilliSeconds); | ||||
// Another convenience method: call p approximately every deltaMilliSeconds | // Another convenience method: call p approximately every deltaMilliSeconds | ||||
// forever, starting deltaMilliSeconds from now untill p returns false. To | // forever, starting deltaMilliSeconds from now untill p returns false. To | ||||
// be more precise: every time p is finished, it is rescheduled to run | // be more precise: every time p is finished, it is rescheduled to run | ||||
// deltaMilliSeconds later. If you need more accurate scheduling, don't use | // deltaMilliSeconds later. If you need more accurate scheduling, don't use | ||||
// this method. | // this method. | ||||
void scheduleEvery(Predicate p, int64_t deltaMilliSeconds); | void scheduleEvery(Predicate p, int64_t deltaMilliSeconds); | ||||
/** | /** | ||||
* Mock the scheduler to fast forward in time. | * Mock the scheduler to fast forward in time. | ||||
* Iterates through items on taskQueue and reschedules them | * Iterates through items on taskQueue and reschedules them | ||||
* to be delta_seconds sooner. | * to be delta_seconds sooner. | ||||
*/ | */ | ||||
void MockForward(boost::chrono::seconds delta_seconds); | void MockForward(std::chrono::seconds delta_seconds); | ||||
// To keep things as simple as possible, there is no unschedule. | // To keep things as simple as possible, there is no unschedule. | ||||
// Services the queue 'forever'. Should be run in a thread, and interrupted | // Services the queue 'forever'. Should be run in a thread, and interrupted | ||||
// using boost::interrupt_thread | // using boost::interrupt_thread | ||||
void serviceQueue(); | void serviceQueue(); | ||||
// Tell any threads running serviceQueue to stop as soon as they're done | // Tell any threads running serviceQueue to stop as soon as they're done | ||||
// servicing whatever task they're currently servicing (drain=false) or when | // servicing whatever task they're currently servicing (drain=false) or when | ||||
// there is no work left to be done (drain=true) | // there is no work left to be done (drain=true) | ||||
void stop(bool drain = false); | void stop(bool drain = false); | ||||
// Returns number of tasks waiting to be serviced, and first and last task | // Returns number of tasks waiting to be serviced, | ||||
// times | // and first and last task times | ||||
size_t getQueueInfo(boost::chrono::system_clock::time_point &first, | size_t getQueueInfo(std::chrono::system_clock::time_point &first, | ||||
boost::chrono::system_clock::time_point &last) const; | std::chrono::system_clock::time_point &last) const; | ||||
// Returns true if there are threads actively running in serviceQueue() | // Returns true if there are threads actively running in serviceQueue() | ||||
bool AreThreadsServicingQueue() const; | bool AreThreadsServicingQueue() const; | ||||
private: | private: | ||||
std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue; | mutable Mutex newTaskMutex; | ||||
boost::condition_variable newTaskScheduled; | std::condition_variable newTaskScheduled; | ||||
mutable boost::mutex newTaskMutex; | std::multimap<std::chrono::system_clock::time_point, Function> | ||||
int nThreadsServicingQueue; | taskQueue GUARDED_BY(newTaskMutex); | ||||
bool stopRequested; | int nThreadsServicingQueue GUARDED_BY(newTaskMutex); | ||||
bool stopWhenEmpty; | bool stopRequested GUARDED_BY(newTaskMutex); | ||||
bool shouldStop() const { | bool stopWhenEmpty GUARDED_BY(newTaskMutex); | ||||
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { | |||||
return stopRequested || (stopWhenEmpty && taskQueue.empty()); | return stopRequested || (stopWhenEmpty && taskQueue.empty()); | ||||
} | } | ||||
}; | }; | ||||
/** | /** | ||||
* Class used by CScheduler clients which may schedule multiple jobs | * Class used by CScheduler clients which may schedule multiple jobs | ||||
* which are required to be run serially. Jobs may not be run on the | * which are required to be run serially. Jobs may not be run on the | ||||
* same thread, but no two jobs will be executed | * same thread, but no two jobs will be executed | ||||
Show All 39 Lines |