Changeset View
Changeset View
Standalone View
Standalone View
src/scheduler.cpp
Show First 20 Lines • Show All 65 Lines • ▼ Show 20 Lines | #else | ||||
// Exit loop after timeout, it means we reached the time of | // Exit loop after timeout, it means we reached the time of | ||||
// the event | // the event | ||||
break; | break; | ||||
} | } | ||||
} | } | ||||
#endif | #endif | ||||
// If there are multiple threads, the queue can empty while we're | // If there are multiple threads, the queue can empty while we're | ||||
// waiting (another thread may service the task we were waiting on). | // waiting (another thread may service the task we were waiting on). | ||||
if (shouldStop() || taskQueue.empty()) continue; | if (shouldStop() || taskQueue.empty()) { | ||||
continue; | |||||
} | |||||
Function f = taskQueue.begin()->second; | Function f = taskQueue.begin()->second; | ||||
taskQueue.erase(taskQueue.begin()); | taskQueue.erase(taskQueue.begin()); | ||||
{ | { | ||||
// Unlock before calling f, so it can reschedule itself or | // Unlock before calling f, so it can reschedule itself or | ||||
// another task without deadlocking: | // another task without deadlocking: | ||||
reverse_lock<boost::unique_lock<boost::mutex>> rlock(lock); | reverse_lock<boost::unique_lock<boost::mutex>> rlock(lock); | ||||
f(); | f(); | ||||
} | } | ||||
} catch (...) { | } catch (...) { | ||||
--nThreadsServicingQueue; | --nThreadsServicingQueue; | ||||
throw; | throw; | ||||
} | } | ||||
} | } | ||||
--nThreadsServicingQueue; | --nThreadsServicingQueue; | ||||
newTaskScheduled.notify_one(); | newTaskScheduled.notify_one(); | ||||
} | } | ||||
void CScheduler::stop(bool drain) { | void CScheduler::stop(bool drain) { | ||||
{ | { | ||||
boost::unique_lock<boost::mutex> lock(newTaskMutex); | boost::unique_lock<boost::mutex> lock(newTaskMutex); | ||||
if (drain) | if (drain) { | ||||
stopWhenEmpty = true; | stopWhenEmpty = true; | ||||
else | } else { | ||||
stopRequested = true; | stopRequested = true; | ||||
} | } | ||||
} | |||||
newTaskScheduled.notify_all(); | newTaskScheduled.notify_all(); | ||||
} | } | ||||
void CScheduler::schedule(CScheduler::Function f, | void CScheduler::schedule(CScheduler::Function f, | ||||
boost::chrono::system_clock::time_point t) { | boost::chrono::system_clock::time_point t) { | ||||
{ | { | ||||
boost::unique_lock<boost::mutex> lock(newTaskMutex); | boost::unique_lock<boost::mutex> lock(newTaskMutex); | ||||
taskQueue.insert(std::make_pair(t, f)); | taskQueue.insert(std::make_pair(t, f)); | ||||
} | } | ||||
newTaskScheduled.notify_one(); | newTaskScheduled.notify_one(); | ||||
} | } | ||||
void CScheduler::scheduleFromNow(CScheduler::Function f, | void CScheduler::scheduleFromNow(CScheduler::Function f, | ||||
int64_t deltaMilliSeconds) { | int64_t deltaMilliSeconds) { | ||||
schedule(f, | schedule(f, | ||||
boost::chrono::system_clock::now() + | boost::chrono::system_clock::now() + | ||||
boost::chrono::milliseconds(deltaMilliSeconds)); | boost::chrono::milliseconds(deltaMilliSeconds)); | ||||
} | } | ||||
static void Repeat(CScheduler *s, CScheduler::Function f, | static void Repeat(CScheduler *s, CScheduler::Predicate p, | ||||
int64_t deltaMilliSeconds) { | int64_t deltaMilliSeconds) { | ||||
f(); | if (p()) { | ||||
s->scheduleFromNow(boost::bind(&Repeat, s, f, deltaMilliSeconds), | s->scheduleFromNow(boost::bind(&Repeat, s, p, deltaMilliSeconds), | ||||
deltaMilliSeconds); | deltaMilliSeconds); | ||||
} | } | ||||
} | |||||
void CScheduler::scheduleEvery(CScheduler::Function f, | void CScheduler::scheduleEvery(CScheduler::Predicate p, | ||||
int64_t deltaMilliSeconds) { | int64_t deltaMilliSeconds) { | ||||
scheduleFromNow(boost::bind(&Repeat, this, f, deltaMilliSeconds), | scheduleFromNow(boost::bind(&Repeat, this, p, deltaMilliSeconds), | ||||
deltaMilliSeconds); | deltaMilliSeconds); | ||||
} | } | ||||
size_t | size_t | ||||
CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, | CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, | ||||
boost::chrono::system_clock::time_point &last) const { | boost::chrono::system_clock::time_point &last) const { | ||||
boost::unique_lock<boost::mutex> lock(newTaskMutex); | boost::unique_lock<boost::mutex> lock(newTaskMutex); | ||||
size_t result = taskQueue.size(); | size_t result = taskQueue.size(); | ||||
▲ Show 20 Lines • Show All 75 Lines • Show Last 20 Lines |