Changeset View
Changeset View
Standalone View
Standalone View
src/scheduler.cpp
// Copyright (c) 2015-2016 The Bitcoin Core developers | // Copyright (c) 2015-2016 The Bitcoin Core developers | ||||
// Distributed under the MIT software license, see the accompanying | // Distributed under the MIT software license, see the accompanying | ||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php. | // file COPYING or http://www.opensource.org/licenses/mit-license.php. | ||||
#include <scheduler.h> | #include <scheduler.h> | ||||
#include <random.h> | #include <random.h> | ||||
#include <cassert> | #include <cassert> | ||||
#include <utility> | #include <utility> | ||||
CScheduler::CScheduler() | CScheduler::CScheduler() {} | ||||
: nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false) {} | |||||
CScheduler::~CScheduler() { | CScheduler::~CScheduler() { | ||||
assert(nThreadsServicingQueue == 0); | assert(nThreadsServicingQueue == 0); | ||||
if (stopWhenEmpty) { | |||||
assert(taskQueue.empty()); | |||||
} | |||||
} | } | ||||
void CScheduler::serviceQueue() { | void CScheduler::serviceQueue() { | ||||
WAIT_LOCK(newTaskMutex, lock); | WAIT_LOCK(newTaskMutex, lock); | ||||
++nThreadsServicingQueue; | ++nThreadsServicingQueue; | ||||
// newTaskMutex is locked throughout this loop EXCEPT when the thread is | // newTaskMutex is locked throughout this loop EXCEPT when the thread is | ||||
// waiting or when the user's function is called. | // waiting or when the user's function is called. | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | void CScheduler::schedule(CScheduler::Function f, | ||||
std::chrono::system_clock::time_point t) { | std::chrono::system_clock::time_point t) { | ||||
{ | { | ||||
LOCK(newTaskMutex); | LOCK(newTaskMutex); | ||||
taskQueue.insert(std::make_pair(t, f)); | taskQueue.insert(std::make_pair(t, f)); | ||||
} | } | ||||
newTaskScheduled.notify_one(); | newTaskScheduled.notify_one(); | ||||
} | } | ||||
void CScheduler::scheduleFromNow(CScheduler::Function f, | |||||
int64_t deltaMilliSeconds) { | |||||
schedule(f, std::chrono::system_clock::now() + | |||||
std::chrono::milliseconds(deltaMilliSeconds)); | |||||
} | |||||
void CScheduler::MockForward(std::chrono::seconds delta_seconds) { | void CScheduler::MockForward(std::chrono::seconds delta_seconds) { | ||||
assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1}); | assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1}); | ||||
{ | { | ||||
LOCK(newTaskMutex); | LOCK(newTaskMutex); | ||||
// use temp_queue to maintain updated schedule | // use temp_queue to maintain updated schedule | ||||
std::multimap<std::chrono::system_clock::time_point, Function> | std::multimap<std::chrono::system_clock::time_point, Function> | ||||
temp_queue; | temp_queue; | ||||
for (const auto &element : taskQueue) { | for (const auto &element : taskQueue) { | ||||
temp_queue.emplace_hint(temp_queue.cend(), | temp_queue.emplace_hint(temp_queue.cend(), | ||||
element.first - delta_seconds, | element.first - delta_seconds, | ||||
element.second); | element.second); | ||||
} | } | ||||
// point taskQueue to temp_queue | // point taskQueue to temp_queue | ||||
taskQueue = std::move(temp_queue); | taskQueue = std::move(temp_queue); | ||||
} | } | ||||
// notify that the taskQueue needs to be processed | // notify that the taskQueue needs to be processed | ||||
newTaskScheduled.notify_one(); | newTaskScheduled.notify_one(); | ||||
} | } | ||||
static void Repeat(CScheduler *s, CScheduler::Predicate p, | static void Repeat(CScheduler &s, CScheduler::Predicate p, | ||||
int64_t deltaMilliSeconds) { | std::chrono::milliseconds delta) { | ||||
if (p()) { | if (p()) { | ||||
s->scheduleFromNow(std::bind(&Repeat, s, p, deltaMilliSeconds), | s.scheduleFromNow([=, &s] { Repeat(s, p, delta); }, delta); | ||||
deltaMilliSeconds); | |||||
} | } | ||||
} | } | ||||
void CScheduler::scheduleEvery(CScheduler::Predicate p, | void CScheduler::scheduleEvery(CScheduler::Predicate p, | ||||
int64_t deltaMilliSeconds) { | std::chrono::milliseconds delta) { | ||||
scheduleFromNow(std::bind(&Repeat, this, p, deltaMilliSeconds), | scheduleFromNow([=] { Repeat(*this, p, delta); }, delta); | ||||
deltaMilliSeconds); | |||||
} | } | ||||
size_t | size_t | ||||
CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first, | CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first, | ||||
std::chrono::system_clock::time_point &last) const { | std::chrono::system_clock::time_point &last) const { | ||||
LOCK(newTaskMutex); | LOCK(newTaskMutex); | ||||
size_t result = taskQueue.size(); | size_t result = taskQueue.size(); | ||||
if (!taskQueue.empty()) { | if (!taskQueue.empty()) { | ||||
▲ Show 20 Lines • Show All 89 Lines • Show Last 20 Lines |