Changeset View
Changeset View
Standalone View
Standalone View
src/rcu.cpp
// Copyright (c) 2018 The Bitcoin developers | // Copyright (c) 2018 The Bitcoin developers | ||||
// Distributed under the MIT software license, see the accompanying | // Distributed under the MIT software license, see the accompanying | ||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php. | // file COPYING or http://www.opensource.org/licenses/mit-license.php. | ||||
#include "rcu.h" | #include "rcu.h" | ||||
#include "sync.h" | #include "sync.h" | ||||
#include <algorithm> | |||||
#include <chrono> | #include <chrono> | ||||
#include <condition_variable> | #include <condition_variable> | ||||
std::atomic<uint64_t> RCUInfos::revision{0}; | std::atomic<uint64_t> RCUInfos::revision{0}; | ||||
thread_local RCUInfos RCUInfos::infos{}; | thread_local RCUInfos RCUInfos::infos{}; | ||||
/** | /** | ||||
* How many time a busy loop runs before yelding. | * How many time a busy loop runs before yelding. | ||||
*/ | */ | ||||
static const int RCU_ACTIVE_LOOP_COUNT = 30; | static const int RCU_ACTIVE_LOOP_COUNT = 10; | ||||
/** | /** | ||||
* We maintain a linked list of all the RCUInfos for each active thread. Upon | * We maintain a linked list of all the RCUInfos for each active thread. Upon | ||||
* start, a new thread adds itself to the head of the liked list and the node is | * start, a new thread adds itself to the head of the liked list and the node is | ||||
* then removed when the threads shuts down. | * then removed when the threads shuts down. | ||||
* | * | ||||
* Insertion is fairly straightforward. The first step is to set the next | * Insertion is fairly straightforward. The first step is to set the next | ||||
* pointer of the node being inserted as the first node in the list as follow: | * pointer of the node being inserted as the first node in the list as follow: | ||||
▲ Show 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | |||||
* allowing the node to be destroyed. We need to keep the mutex during that | * allowing the node to be destroyed. We need to keep the mutex during that | ||||
* process, because we just removed our thread from the list of thread to wait | * process, because we just removed our thread from the list of thread to wait | ||||
* for. A concurrent deletion would not wait for us and may end up deleting data | * for. A concurrent deletion would not wait for us and may end up deleting data | ||||
* we rely on as a result. | * we rely on as a result. | ||||
*/ | */ | ||||
static std::atomic<RCUInfos *> threadInfos{nullptr}; | static std::atomic<RCUInfos *> threadInfos{nullptr}; | ||||
static CCriticalSection csThreadInfosDelete; | static CCriticalSection csThreadInfosDelete; | ||||
RCUInfos::RCUInfos() : next(nullptr), state(0) { | RCUInfos::RCUInfos() : state(0), next(nullptr) { | ||||
RCUInfos *head = threadInfos.load(); | RCUInfos *head = threadInfos.load(); | ||||
do { | do { | ||||
next.store(head); | next.store(head); | ||||
} while (!threadInfos.compare_exchange_weak(head, this)); | } while (!threadInfos.compare_exchange_weak(head, this)); | ||||
// Release the lock. | // Release the lock. | ||||
readFree(); | readFree(); | ||||
} | } | ||||
RCUInfos::~RCUInfos() { | RCUInfos::~RCUInfos() { | ||||
/** | |||||
* Before the thread is removed from the list, make sure we cleanup | |||||
* everything. | |||||
*/ | |||||
runCleanups(); | |||||
while (cleanups.size() > 0) { | |||||
synchronize(); | |||||
} | |||||
while (true) { | while (true) { | ||||
LOCK(csThreadInfosDelete); | LOCK(csThreadInfosDelete); | ||||
std::atomic<RCUInfos *> *ptr; | std::atomic<RCUInfos *> *ptr; | ||||
{ | { | ||||
RCULock lock(this); | RCULock lock(this); | ||||
ptr = &threadInfos; | ptr = &threadInfos; | ||||
Show All 31 Lines | RCUInfos::~RCUInfos() { | ||||
} | } | ||||
} | } | ||||
void RCUInfos::synchronize() { | void RCUInfos::synchronize() { | ||||
uint64_t syncRev = ++revision; | uint64_t syncRev = ++revision; | ||||
// Loop a few time lock free. | // Loop a few time lock free. | ||||
for (int i = 0; i < RCU_ACTIVE_LOOP_COUNT; i++) { | for (int i = 0; i < RCU_ACTIVE_LOOP_COUNT; i++) { | ||||
if (hasSynced(syncRev)) { | runCleanups(); | ||||
// Done! | if (cleanups.empty() && hasSyncedTo(syncRev)) { | ||||
return; | return; | ||||
} | } | ||||
} | } | ||||
// It seems like we have some contention. Let's try to not starve the | // It seems like we have some contention. Let's try to not starve the | ||||
// system. Let's make sure threads that land here proceed one by one. | // system. Let's make sure threads that land here proceed one by one. | ||||
// XXX: The best option long term is most likely to use a futex on one of | // XXX: The best option long term is most likely to use a futex on one of | ||||
// the thread causing synchronization delay so this thread can be waked up | // the thread causing synchronization delay so this thread can be waked up | ||||
// at an apropriate time. | // at an apropriate time. | ||||
static std::condition_variable cond; | static std::condition_variable cond; | ||||
static CWaitableCriticalSection cs; | static CWaitableCriticalSection cs; | ||||
WAIT_LOCK(cs, lock); | WAIT_LOCK(cs, lock); | ||||
do { | do { | ||||
runCleanups(); | |||||
cond.notify_one(); | cond.notify_one(); | ||||
} while (!cond.wait_for(lock, std::chrono::microseconds(1), | } while (!cond.wait_for(lock, std::chrono::microseconds(1), [&] { | ||||
[&] { return hasSynced(syncRev); })); | return cleanups.empty() && hasSyncedTo(syncRev); | ||||
})); | |||||
} | |||||
void RCUInfos::runCleanups() { | |||||
// By the time we run a set of cleanups, we may have more cleanups | |||||
// available so we loop until there is nothing available for cleanup. | |||||
while (true) { | |||||
if (cleanups.empty()) { | |||||
// There is nothing to cleanup. | |||||
return; | |||||
} | |||||
auto it = cleanups.begin(); | |||||
uint64_t syncedTo = hasSyncedTo(it->first); | |||||
while (it != cleanups.end() && it->first <= syncedTo) { | |||||
// Run the cleanup and remove it from the map. | |||||
it->second(); | |||||
cleanups.erase(it++); | |||||
} | } | ||||
} | |||||
} | |||||
uint64_t RCUInfos::hasSyncedTo(uint64_t cutoff) { | |||||
uint64_t syncedTo = revision.load(); | |||||
bool RCUInfos::hasSynced(uint64_t syncRev) { | |||||
// Go over the list and check all threads are past the synchronization | // Go over the list and check all threads are past the synchronization | ||||
// point. | // point. | ||||
RCULock lock(this); | RCULock lock(this); | ||||
RCUInfos *current = threadInfos.load(); | RCUInfos *current = threadInfos.load(); | ||||
while (current != nullptr) { | while (current != nullptr) { | ||||
// If the current thread is not up to speed, bail. | syncedTo = std::min(syncedTo, current->state.load()); | ||||
if (current->state < syncRev) { | if (syncedTo < cutoff) { | ||||
return false; | return 0; | ||||
} | } | ||||
current = current->next.load(); | current = current->next.load(); | ||||
} | } | ||||
return true; | return syncedTo; | ||||
} | } |