Changeset View
Changeset View
Standalone View
Standalone View
src/scheduler.cpp
// Copyright (c) 2015-2016 The Bitcoin Core developers | // Copyright (c) 2015-2016 The Bitcoin Core developers | ||||
// Distributed under the MIT software license, see the accompanying | // Distributed under the MIT software license, see the accompanying | ||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php. | // file COPYING or http://www.opensource.org/licenses/mit-license.php. | ||||
#include <scheduler.h> | #include <scheduler.h> | ||||
#include <random.h> | #include <random.h> | ||||
#include <reverselock.h> | |||||
#include <cassert> | #include <cassert> | ||||
#include <utility> | #include <utility> | ||||
CScheduler::CScheduler() | CScheduler::CScheduler() | ||||
: nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false) {} | : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false) {} | ||||
CScheduler::~CScheduler() { | CScheduler::~CScheduler() { | ||||
assert(nThreadsServicingQueue == 0); | assert(nThreadsServicingQueue == 0); | ||||
} | } | ||||
void CScheduler::serviceQueue() { | void CScheduler::serviceQueue() { | ||||
boost::unique_lock<boost::mutex> lock(newTaskMutex); | WAIT_LOCK(newTaskMutex, lock); | ||||
++nThreadsServicingQueue; | ++nThreadsServicingQueue; | ||||
// newTaskMutex is locked throughout this loop EXCEPT when the thread is | // newTaskMutex is locked throughout this loop EXCEPT when the thread is | ||||
// waiting or when the user's function is called. | // waiting or when the user's function is called. | ||||
while (!shouldStop()) { | while (!shouldStop()) { | ||||
try { | try { | ||||
if (!shouldStop() && taskQueue.empty()) { | if (!shouldStop() && taskQueue.empty()) { | ||||
reverse_lock<boost::unique_lock<boost::mutex>> rlock(lock); | REVERSE_LOCK(lock); | ||||
} | } | ||||
while (!shouldStop() && taskQueue.empty()) { | while (!shouldStop() && taskQueue.empty()) { | ||||
// Wait until there is something to do. | // Wait until there is something to do. | ||||
newTaskScheduled.wait(lock); | newTaskScheduled.wait(lock); | ||||
} | } | ||||
// Wait until either there is a new task, or until the time of the | // Wait until either there is a new task, or until | ||||
// first item on the queue. | // the time of the first item on the queue: | ||||
// Some boost versions have a conflicting overload of wait_until | |||||
// that returns void. Explicitly use a template here to avoid | |||||
// hitting that overload. | |||||
while (!shouldStop() && !taskQueue.empty()) { | while (!shouldStop() && !taskQueue.empty()) { | ||||
boost::chrono::system_clock::time_point timeToWaitFor = | std::chrono::system_clock::time_point timeToWaitFor = | ||||
taskQueue.begin()->first; | taskQueue.begin()->first; | ||||
if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == | if (newTaskScheduled.wait_until(lock, timeToWaitFor) == | ||||
boost::cv_status::timeout) { | std::cv_status::timeout) { | ||||
// Exit loop after timeout, it means we reached the time of | // Exit loop after timeout, it means we reached the time of | ||||
// the event | // the event | ||||
break; | break; | ||||
} | } | ||||
} | } | ||||
// If there are multiple threads, the queue can empty while we're | // If there are multiple threads, the queue can empty while we're | ||||
// waiting (another thread may service the task we were waiting on). | // waiting (another thread may service the task we were waiting on). | ||||
if (shouldStop() || taskQueue.empty()) { | if (shouldStop() || taskQueue.empty()) { | ||||
continue; | continue; | ||||
} | } | ||||
Function f = taskQueue.begin()->second; | Function f = taskQueue.begin()->second; | ||||
taskQueue.erase(taskQueue.begin()); | taskQueue.erase(taskQueue.begin()); | ||||
{ | { | ||||
// Unlock before calling f, so it can reschedule itself or | // Unlock before calling f, so it can reschedule itself or | ||||
// another task without deadlocking: | // another task without deadlocking: | ||||
reverse_lock<boost::unique_lock<boost::mutex>> rlock(lock); | REVERSE_LOCK(lock); | ||||
f(); | f(); | ||||
} | } | ||||
} catch (...) { | } catch (...) { | ||||
--nThreadsServicingQueue; | --nThreadsServicingQueue; | ||||
throw; | throw; | ||||
} | } | ||||
} | } | ||||
--nThreadsServicingQueue; | --nThreadsServicingQueue; | ||||
newTaskScheduled.notify_one(); | newTaskScheduled.notify_one(); | ||||
} | } | ||||
void CScheduler::stop(bool drain) { | void CScheduler::stop(bool drain) { | ||||
{ | { | ||||
boost::unique_lock<boost::mutex> lock(newTaskMutex); | LOCK(newTaskMutex); | ||||
if (drain) { | if (drain) { | ||||
stopWhenEmpty = true; | stopWhenEmpty = true; | ||||
} else { | } else { | ||||
stopRequested = true; | stopRequested = true; | ||||
} | } | ||||
} | } | ||||
newTaskScheduled.notify_all(); | newTaskScheduled.notify_all(); | ||||
} | } | ||||
void CScheduler::schedule(CScheduler::Function f, | void CScheduler::schedule(CScheduler::Function f, | ||||
boost::chrono::system_clock::time_point t) { | std::chrono::system_clock::time_point t) { | ||||
{ | { | ||||
boost::unique_lock<boost::mutex> lock(newTaskMutex); | LOCK(newTaskMutex); | ||||
taskQueue.insert(std::make_pair(t, f)); | taskQueue.insert(std::make_pair(t, f)); | ||||
} | } | ||||
newTaskScheduled.notify_one(); | newTaskScheduled.notify_one(); | ||||
} | } | ||||
void CScheduler::scheduleFromNow(CScheduler::Function f, | void CScheduler::scheduleFromNow(CScheduler::Function f, | ||||
int64_t deltaMilliSeconds) { | int64_t deltaMilliSeconds) { | ||||
schedule(f, boost::chrono::system_clock::now() + | schedule(f, std::chrono::system_clock::now() + | ||||
boost::chrono::milliseconds(deltaMilliSeconds)); | std::chrono::milliseconds(deltaMilliSeconds)); | ||||
} | } | ||||
void CScheduler::MockForward(boost::chrono::seconds delta_seconds) { | void CScheduler::MockForward(std::chrono::seconds delta_seconds) { | ||||
assert(delta_seconds.count() > 0 && | assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1}); | ||||
delta_seconds < boost::chrono::hours{1}); | |||||
{ | { | ||||
boost::unique_lock<boost::mutex> lock(newTaskMutex); | LOCK(newTaskMutex); | ||||
// use temp_queue to maintain updated schedule | // use temp_queue to maintain updated schedule | ||||
std::multimap<boost::chrono::system_clock::time_point, Function> | std::multimap<std::chrono::system_clock::time_point, Function> | ||||
temp_queue; | temp_queue; | ||||
for (const auto &element : taskQueue) { | for (const auto &element : taskQueue) { | ||||
temp_queue.emplace_hint(temp_queue.cend(), | temp_queue.emplace_hint(temp_queue.cend(), | ||||
element.first - delta_seconds, | element.first - delta_seconds, | ||||
element.second); | element.second); | ||||
} | } | ||||
Show All 15 Lines | |||||
void CScheduler::scheduleEvery(CScheduler::Predicate p, | void CScheduler::scheduleEvery(CScheduler::Predicate p, | ||||
int64_t deltaMilliSeconds) { | int64_t deltaMilliSeconds) { | ||||
scheduleFromNow(std::bind(&Repeat, this, p, deltaMilliSeconds), | scheduleFromNow(std::bind(&Repeat, this, p, deltaMilliSeconds), | ||||
deltaMilliSeconds); | deltaMilliSeconds); | ||||
} | } | ||||
size_t | size_t | ||||
CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, | CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first, | ||||
boost::chrono::system_clock::time_point &last) const { | std::chrono::system_clock::time_point &last) const { | ||||
boost::unique_lock<boost::mutex> lock(newTaskMutex); | LOCK(newTaskMutex); | ||||
size_t result = taskQueue.size(); | size_t result = taskQueue.size(); | ||||
if (!taskQueue.empty()) { | if (!taskQueue.empty()) { | ||||
first = taskQueue.begin()->first; | first = taskQueue.begin()->first; | ||||
last = taskQueue.rbegin()->first; | last = taskQueue.rbegin()->first; | ||||
} | } | ||||
return result; | return result; | ||||
} | } | ||||
bool CScheduler::AreThreadsServicingQueue() const { | bool CScheduler::AreThreadsServicingQueue() const { | ||||
boost::unique_lock<boost::mutex> lock(newTaskMutex); | LOCK(newTaskMutex); | ||||
return nThreadsServicingQueue; | return nThreadsServicingQueue; | ||||
} | } | ||||
void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() { | void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() { | ||||
{ | { | ||||
LOCK(m_cs_callbacks_pending); | LOCK(m_cs_callbacks_pending); | ||||
// Try to avoid scheduling too many copies here, but if we | // Try to avoid scheduling too many copies here, but if we | ||||
// accidentally have two ProcessQueue's scheduled at once its | // accidentally have two ProcessQueue's scheduled at once its | ||||
// not a big deal. | // not a big deal. | ||||
if (m_are_callbacks_running) { | if (m_are_callbacks_running) { | ||||
return; | return; | ||||
} | } | ||||
if (m_callbacks_pending.empty()) { | if (m_callbacks_pending.empty()) { | ||||
return; | return; | ||||
} | } | ||||
} | } | ||||
m_pscheduler->schedule( | m_pscheduler->schedule( | ||||
std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this)); | std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), | ||||
std::chrono::system_clock::now()); | |||||
} | } | ||||
void SingleThreadedSchedulerClient::ProcessQueue() { | void SingleThreadedSchedulerClient::ProcessQueue() { | ||||
std::function<void()> callback; | std::function<void()> callback; | ||||
{ | { | ||||
LOCK(m_cs_callbacks_pending); | LOCK(m_cs_callbacks_pending); | ||||
if (m_are_callbacks_running) { | if (m_are_callbacks_running) { | ||||
return; | return; | ||||
▲ Show 20 Lines • Show All 54 Lines • Show Last 20 Lines |