Changeset View
Changeset View
Standalone View
Standalone View
src/scheduler.cpp
Show All 12 Lines | |||||
CScheduler::CScheduler() | CScheduler::CScheduler() | ||||
: nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false) {} | : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false) {} | ||||
CScheduler::~CScheduler() { | CScheduler::~CScheduler() { | ||||
assert(nThreadsServicingQueue == 0); | assert(nThreadsServicingQueue == 0); | ||||
} | } | ||||
#if BOOST_VERSION < 105000 | |||||
static boost::system_time | |||||
toPosixTime(const boost::chrono::system_clock::time_point &t) { | |||||
return boost::posix_time::from_time_t( | |||||
boost::chrono::system_clock::to_time_t(t)); | |||||
} | |||||
#endif | |||||
void CScheduler::serviceQueue() { | void CScheduler::serviceQueue() { | ||||
boost::unique_lock<boost::mutex> lock(newTaskMutex); | boost::unique_lock<boost::mutex> lock(newTaskMutex); | ||||
++nThreadsServicingQueue; | ++nThreadsServicingQueue; | ||||
// newTaskMutex is locked throughout this loop EXCEPT when the thread is | // newTaskMutex is locked throughout this loop EXCEPT when the thread is | ||||
// waiting or when the user's function is called. | // waiting or when the user's function is called. | ||||
while (!shouldStop()) { | while (!shouldStop()) { | ||||
try { | try { | ||||
if (!shouldStop() && taskQueue.empty()) { | if (!shouldStop() && taskQueue.empty()) { | ||||
reverse_lock<boost::unique_lock<boost::mutex>> rlock(lock); | reverse_lock<boost::unique_lock<boost::mutex>> rlock(lock); | ||||
// Use this chance to get a tiny bit more entropy | // Use this chance to get a tiny bit more entropy | ||||
RandAddSeedSleep(); | RandAddSeedSleep(); | ||||
} | } | ||||
while (!shouldStop() && taskQueue.empty()) { | while (!shouldStop() && taskQueue.empty()) { | ||||
// Wait until there is something to do. | // Wait until there is something to do. | ||||
newTaskScheduled.wait(lock); | newTaskScheduled.wait(lock); | ||||
} | } | ||||
// Wait until either there is a new task, or until the time of the first item on | // Wait until either there is a new task, or until the time of the | ||||
// the queue: | // first item on the queue. | ||||
// wait_until needs boost 1.50 or later; older versions have timed_wait: | |||||
#if BOOST_VERSION < 105000 | |||||
while (!shouldStop() && !taskQueue.empty() && | |||||
newTaskScheduled.timed_wait( | |||||
lock, toPosixTime(taskQueue.begin()->first))) { | |||||
// Keep waiting until timeout | |||||
} | |||||
#else | |||||
// Some boost versions have a conflicting overload of wait_until | // Some boost versions have a conflicting overload of wait_until | ||||
// that returns void. Explicitly use a template here to avoid | // that returns void. Explicitly use a template here to avoid | ||||
// hitting that overload. | // hitting that overload. | ||||
while (!shouldStop() && !taskQueue.empty()) { | while (!shouldStop() && !taskQueue.empty()) { | ||||
boost::chrono::system_clock::time_point timeToWaitFor = | boost::chrono::system_clock::time_point timeToWaitFor = | ||||
taskQueue.begin()->first; | taskQueue.begin()->first; | ||||
if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == | if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == | ||||
boost::cv_status::timeout) { | boost::cv_status::timeout) { | ||||
// Exit loop after timeout, it means we reached the time of | // Exit loop after timeout, it means we reached the time of | ||||
// the event | // the event | ||||
break; | break; | ||||
} | } | ||||
} | } | ||||
#endif | |||||
// If there are multiple threads, the queue can empty while we're | // If there are multiple threads, the queue can empty while we're | ||||
// waiting (another thread may service the task we were waiting on). | // waiting (another thread may service the task we were waiting on). | ||||
if (shouldStop() || taskQueue.empty()) { | if (shouldStop() || taskQueue.empty()) { | ||||
continue; | continue; | ||||
} | } | ||||
Function f = taskQueue.begin()->second; | Function f = taskQueue.begin()->second; | ||||
taskQueue.erase(taskQueue.begin()); | taskQueue.erase(taskQueue.begin()); | ||||
▲ Show 20 Lines • Show All 138 Lines • Show Last 20 Lines |