Changeset View
Changeset View
Standalone View
Standalone View
src/scheduler.cpp
Show First 20 Lines • Show All 136 Lines • ▼ Show 20 Lines | CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, | ||||
boost::unique_lock<boost::mutex> lock(newTaskMutex); | boost::unique_lock<boost::mutex> lock(newTaskMutex); | ||||
size_t result = taskQueue.size(); | size_t result = taskQueue.size(); | ||||
if (!taskQueue.empty()) { | if (!taskQueue.empty()) { | ||||
first = taskQueue.begin()->first; | first = taskQueue.begin()->first; | ||||
last = taskQueue.rbegin()->first; | last = taskQueue.rbegin()->first; | ||||
} | } | ||||
return result; | return result; | ||||
} | } | ||||
bool CScheduler::AreThreadsServicingQueue() const { | |||||
return nThreadsServicingQueue; | |||||
} | |||||
void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() { | |||||
{ | |||||
LOCK(m_cs_callbacks_pending); | |||||
// Try to avoid scheduling too many copies here, but if we | |||||
// accidentally have two ProcessQueue's scheduled at once its | |||||
// not a big deal. | |||||
if (m_are_callbacks_running) return; | |||||
if (m_callbacks_pending.empty()) return; | |||||
} | |||||
m_pscheduler->schedule( | |||||
std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this)); | |||||
} | |||||
void SingleThreadedSchedulerClient::ProcessQueue() { | |||||
std::function<void(void)> callback; | |||||
{ | |||||
LOCK(m_cs_callbacks_pending); | |||||
if (m_are_callbacks_running) return; | |||||
if (m_callbacks_pending.empty()) return; | |||||
m_are_callbacks_running = true; | |||||
callback = std::move(m_callbacks_pending.front()); | |||||
m_callbacks_pending.pop_front(); | |||||
} | |||||
// RAII the setting of fCallbacksRunning and calling | |||||
// MaybeScheduleProcessQueue | |||||
// to ensure both happen safely even if callback() throws. | |||||
struct RAIICallbacksRunning { | |||||
SingleThreadedSchedulerClient *instance; | |||||
RAIICallbacksRunning(SingleThreadedSchedulerClient *_instance) | |||||
: instance(_instance) {} | |||||
~RAIICallbacksRunning() { | |||||
{ | |||||
LOCK(instance->m_cs_callbacks_pending); | |||||
instance->m_are_callbacks_running = false; | |||||
} | |||||
instance->MaybeScheduleProcessQueue(); | |||||
} | |||||
} raiicallbacksrunning(this); | |||||
callback(); | |||||
} | |||||
void SingleThreadedSchedulerClient::AddToProcessQueue( | |||||
std::function<void(void)> func) { | |||||
assert(m_pscheduler); | |||||
{ | |||||
LOCK(m_cs_callbacks_pending); | |||||
m_callbacks_pending.emplace_back(std::move(func)); | |||||
} | |||||
MaybeScheduleProcessQueue(); | |||||
} | |||||
void SingleThreadedSchedulerClient::EmptyQueue() { | |||||
assert(!m_pscheduler->AreThreadsServicingQueue()); | |||||
bool should_continue = true; | |||||
while (should_continue) { | |||||
ProcessQueue(); | |||||
LOCK(m_cs_callbacks_pending); | |||||
should_continue = !m_callbacks_pending.empty(); | |||||
} | |||||
} |