Changeset View
Standalone View
src/rcu.cpp
- This file was added.
// Copyright (c) 2018 The Bitcoin developers | |||||
// Distributed under the MIT software license, see the accompanying | |||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php. | |||||
#include "rcu.h" | |||||
#include "sync.h" | |||||
#include <cstdio> | |||||
std::atomic<uint64_t> RCUInfos::revision(0); | |||||
thread_local RCUInfos RCUInfos::infos{}; | |||||
/** | |||||
* How many tim a buzy loop runs before yeilding. | |||||
Fabien: tim->time | |||||
FabienUnsubmitted Not Done Inline Actionsbuzy->busy Fabien: buzy->busy | |||||
FabienUnsubmitted Not Done Inline Actionsyeilding->yelding Fabien: yeilding->yelding | |||||
FabienUnsubmitted Not Done Inline Actionsyeilding->yielding Fabien: yeilding->yielding | |||||
*/ | |||||
static const uint64_t RCU_ACTIVE_LOOP_COUNT = 30; | |||||
/** | |||||
* We maintain a linked list of all the RCUInfos for each active thread. Upon | |||||
* start, a new thread adds itself to the head of the liked list and the node is | |||||
* then removed when the threads shuts down. | |||||
* | |||||
* Insertion is fairly straightforward. The first step is to set the next | |||||
* pointer of the node being inserted as the first node in the list as follow: | |||||
* | |||||
* threadInfos -> Node -> ... | |||||
* ^ | |||||
* Nadded -| | |||||
* | |||||
* The second step is to update threadInfos to point on the inserted node. This | |||||
* is done using compare and swap. If the head of the list changed during this | |||||
* process - for instance due to another insertion, CAS will fail and we can | |||||
* start again. | |||||
* | |||||
* threadInfos Node -> ... | |||||
* | ^ | |||||
* \-> Nadded -| | |||||
* | |||||
* Deletion is a slightly more complex process. The general idea is to go over | |||||
* the list, find the parent of the item we want to remove and set it's next | |||||
* pointer to jump over it. | |||||
* | |||||
* Nparent -> Ndelete -> Nchild | |||||
* | |||||
* Nparent Ndelete -> Nchild | |||||
* | ^ | |||||
* \--------------------| | |||||
* | |||||
* We run into problems when a nodes is deleted concurently wth another node | |||||
FabienUnsubmitted Not Done Inline Actionsconcurently wth->concurrently with Fabien: concurently wth->concurrently with | |||||
* being inserted. Hopefully, we can solve that problem with CAS as well. | |||||
* | |||||
* threadInfos -> Ndelete -> Nchild | |||||
* ^ | |||||
* Nadded -| | |||||
* | |||||
* The insertion will try to update threadInfos to point to Nadded, while the | |||||
* deletion will try to update it to point to Nchild. Whichever goes first will | |||||
* cause the other to fail its CAS and restart its process. | |||||
* | |||||
* threadInfos Ndelete -> Nchild | |||||
* | ^ | |||||
* \--> Nadded -| | |||||
* | |||||
* After a successful insertion, threadInfos now points to Nadded, and the CAS | |||||
FabienUnsubmitted Not Done Inline Actionssuccessful->successful Fabien: successful->successful | |||||
* to move it to Nchild will fail, causing the deletion process to restart from | |||||
* scratch. | |||||
* | |||||
* /----------------------| | |||||
* | V | |||||
* threadInfos Ndelete -> Nchild | |||||
* ^ | |||||
* Nadded -| | |||||
* | |||||
* After a succesful deletion, threadInfos now points to NChild and the CAS to | |||||
* move it to Nadded will fail, causing the insertion process to fail. | |||||
* | |||||
* We also run into problems when several nodes are deleted concurently. Because | |||||
FabienUnsubmitted Not Done Inline Actionsconcurently->concurrently Fabien: concurently->concurrently | |||||
* it is not possible to read Ndelete->next and update Nparent->next atomicaly, | |||||
FabienUnsubmitted Not Done Inline Actionsatomicaly->atomically Fabien: atomicaly->atomically | |||||
* we may end up setting Nparent->next to a stale value if Nchild is deleted. | |||||
* | |||||
* /----------------------| | |||||
* | V | |||||
* Nparent Ndelete Nchild -> Ngrandchild | |||||
* | ^ | |||||
* \--------------------| | |||||
* | |||||
* This would cause Nchild to be 'resurected', which is obviously a problem. In | |||||
FabienUnsubmitted Not Done Inline Actionsresurected->resurrected Fabien: resurected->resurrected | |||||
* order to avoid this problem, we sure that no concurent deletion takes places | |||||
FabienUnsubmitted Not Done Inline Actionswe sure->we make sure, concurent->concurrent Fabien: we sure->we make sure, concurent->concurrent | |||||
* using a good old mutex. Using a mutex for deletion also ensures we are safe | |||||
* from the ABA problem. | |||||
* | |||||
* Once a node is deleted from the list, we cannot destroy it right away. | |||||
* Readers do not hold the mutex and may still be using that node. We need to | |||||
* leverage RCU to make sure all the readers have finished their work before | |||||
* allowing the node to be destroyed. We need to keep the mutex during that | |||||
* process, because we just removed our thread from the list of thread to wait | |||||
* for. A concurent deletion would not wait for us and may end up deleting data | |||||
FabienUnsubmitted Not Done Inline Actionsconcurent->concurrent Fabien: concurent->concurrent | |||||
* we rely on as a result. | |||||
*/ | |||||
std::atomic<RCUInfos *> threadInfos{nullptr}; | |||||
CCriticalSection csThreadInfosDelete; | |||||
RCUInfos::RCUInfos() : next(nullptr), state(0) { | |||||
while (true) { | |||||
RCUInfos *head = threadInfos.load(); | |||||
next.store(head); | |||||
if (threadInfos.compare_exchange_weak(head, this)) { | |||||
break; | |||||
} | |||||
} | |||||
// Release the lock. | |||||
readFree(); | |||||
} | |||||
RCUInfos::~RCUInfos() { | |||||
while (true) { | |||||
LOCK(csThreadInfosDelete); | |||||
FabienUnsubmitted Not Done Inline ActionsYou may want to move it outside of the loop, as there is a theoretical max number of time you can lock the same mutex Fabien: You may want to move it outside of the loop, as there is a theoretical max number of time you… | |||||
deadalnixAuthorUnsubmitted Done Inline ActionsThat would prevent any concurrent thread from moving forward as long as this is not finished, which doesn't seem very desirable. deadalnix: That would prevent any concurrent thread from moving forward as long as this is not finished… | |||||
std::atomic<RCUInfos *> *ptr; | |||||
{ | |||||
RCULock lock(this); | |||||
FabienUnsubmitted Not Done Inline ActionsThis call is not needed, the one from hasSynced() is enough Fabien: This call is not needed, the one from `hasSynced()` is enough | |||||
deadalnixAuthorUnsubmitted Done Inline ActionsThis is reading the linked list, so this require a lock. Technically, there is the mutex that makes it redundant, but it make the code fragile. deadalnix: This is reading the linked list, so this require a lock. Technically, there is the mutex that… | |||||
ptr = &threadInfos; | |||||
while (true) { | |||||
RCUInfos *current = ptr->load(); | |||||
if (current == this) { | |||||
break; | |||||
} | |||||
assert(current != nullptr); | |||||
ptr = ¤t->next; | |||||
} | |||||
} | |||||
/** | |||||
* We have our node and the parent is ready to be updated. | |||||
* NB: The CAS operation ony checks for *ptr and not for next. This | |||||
FabienUnsubmitted Not Done Inline Actionsony->only Fabien: ony->only | |||||
* would be a big problem in the general case, but because we only | |||||
* insert at the tip of the list and cannot have concurent deletion | |||||
FabienUnsubmitted Not Done Inline Actionsconcurent->concurrent, also tip->head would be more accurate Fabien: concurent->concurrent, also tip->head would be more accurate | |||||
* thanks to the use of a mutex, we are safe. | |||||
*/ | |||||
RCUInfos *foo = this; | |||||
if (!ptr->compare_exchange_weak(foo, next.load())) { | |||||
continue; | |||||
} | |||||
/** | |||||
* We now wait for possible readers to to go past the synchronization | |||||
FabienUnsubmitted Not Done Inline Actionsto to->to Fabien: to to->to | |||||
* point. We need to do so while holding the lock as this operation | |||||
* require us to a be a reader, but we just removed ourselves from the | |||||
* list of reader to check and may therefore not be waited for. | |||||
*/ | |||||
synchronize(); | |||||
break; | |||||
} | |||||
} | |||||
void RCUInfos::synchronize() { | |||||
uint64_t count = 0, syncRev = ++revision; | |||||
// Loop until all thread synced. | |||||
while (!hasSynced(syncRev)) { | |||||
if (count++ >= RCU_ACTIVE_LOOP_COUNT) { | |||||
// Make sure we don't starve the system by spinning too long without | |||||
// yielding. | |||||
std::this_thread::yield(); | |||||
} | |||||
} | |||||
} | |||||
bool RCUInfos::hasSynced(uint64_t syncRev) { | |||||
// Go over the list and check all threads are past the synchronization | |||||
// point. | |||||
RCULock lock(this); | |||||
RCUInfos *current = threadInfos.load(); | |||||
while (current != nullptr) { | |||||
// If the current thread is not up to speed, bail. | |||||
if (current->state < syncRev) { | |||||
return false; | |||||
} | |||||
current = current->next.load(); | |||||
} | |||||
return true; | |||||
} |
tim->time