diff --git a/src/rcu.cpp b/src/rcu.cpp index bb9626cf4a..67ecc04b5a 100644 --- a/src/rcu.cpp +++ b/src/rcu.cpp @@ -1,202 +1,199 @@ // Copyright (c) 2018 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "rcu.h" #include "sync.h" #include #include std::atomic RCUInfos::revision{0}; thread_local RCUInfos RCUInfos::infos{}; /** * How many time a busy loop runs before yelding. */ static const int RCU_ACTIVE_LOOP_COUNT = 30; /** * We maintain a linked list of all the RCUInfos for each active thread. Upon * start, a new thread adds itself to the head of the liked list and the node is * then removed when the threads shuts down. * * Insertion is fairly straightforward. The first step is to set the next * pointer of the node being inserted as the first node in the list as follow: * * threadInfos -> Node -> ... * ^ * Nadded -| * * The second step is to update threadInfos to point on the inserted node. This * is done using compare and swap. If the head of the list changed during this * process - for instance due to another insertion, CAS will fail and we can * start again. * * threadInfos Node -> ... * | ^ * \-> Nadded -| * * Deletion is a slightly more complex process. The general idea is to go over * the list, find the parent of the item we want to remove and set it's next * pointer to jump over it. * * Nparent -> Ndelete -> Nchild * * Nparent Ndelete -> Nchild * | ^ * \--------------------| * * We run into problems when a nodes is deleted concurrently with another node * being inserted. Hopefully, we can solve that problem with CAS as well. * * threadInfos -> Ndelete -> Nchild * ^ * Nadded -| * * The insertion will try to update threadInfos to point to Nadded, while the * deletion will try to update it to point to Nchild. Whichever goes first will * cause the other to fail its CAS and restart its process. * * threadInfos Ndelete -> Nchild * | ^ * \--> Nadded -| * * After a successful insertion, threadInfos now points to Nadded, and the CAS * to move it to Nchild will fail, causing the deletion process to restart from * scratch. * * /----------------------| * | V * threadInfos Ndelete -> Nchild * ^ * Nadded -| * * After a succesful deletion, threadInfos now points to NChild and the CAS to * move it to Nadded will fail, causing the insertion process to fail. * * We also run into problems when several nodes are deleted concurrently. * Because it is not possible to read Ndelete->next and update Nparent->next * atomically, we may end up setting Nparent->next to a stale value if Nchild is * deleted. * * /----------------------| * | V * Nparent Ndelete Nchild -> Ngrandchild * | ^ * \--------------------| * * This would cause Nchild to be 'resurrected', which is obviously a problem. In * order to avoid this problem, we make sure that no concurrent deletion takes * places using a good old mutex. Using a mutex for deletion also ensures we are * safe from the ABA problem. * * Once a node is deleted from the list, we cannot destroy it right away. * Readers do not hold the mutex and may still be using that node. We need to * leverage RCU to make sure all the readers have finished their work before * allowing the node to be destroyed. We need to keep the mutex during that * process, because we just removed our thread from the list of thread to wait * for. A concurrent deletion would not wait for us and may end up deleting data * we rely on as a result. */ static std::atomic threadInfos{nullptr}; static CCriticalSection csThreadInfosDelete; RCUInfos::RCUInfos() : next(nullptr), state(0) { - while (true) { - RCUInfos *head = threadInfos.load(); + RCUInfos *head = threadInfos.load(); + do { next.store(head); - if (threadInfos.compare_exchange_weak(head, this)) { - break; - } - } + } while (!threadInfos.compare_exchange_weak(head, this)); // Release the lock. readFree(); } RCUInfos::~RCUInfos() { while (true) { LOCK(csThreadInfosDelete); std::atomic *ptr; { RCULock lock(this); ptr = &threadInfos; while (true) { RCUInfos *current = ptr->load(); if (current == this) { break; } assert(current != nullptr); ptr = ¤t->next; } } /** * We have our node and the parent is ready to be updated. * NB: The CAS operation only checks for *ptr and not for next. This * would be a big problem in the general case, but because we only * insert at the tip of the list and cannot have concurrent deletion * thanks to the use of a mutex, we are safe. */ - RCUInfos *foo = this; - if (!ptr->compare_exchange_weak(foo, next.load())) { + RCUInfos *current = this; + if (!ptr->compare_exchange_weak(current, next.load())) { continue; } /** * We now wait for possible readers to go past the synchronization * point. We need to do so while holding the lock as this operation * require us to a be a reader, but we just removed ourselves from the * list of reader to check and may therefore not be waited for. */ synchronize(); break; } } void RCUInfos::synchronize() { uint64_t syncRev = ++revision; // Loop a few time lock free. for (int i = 0; i < RCU_ACTIVE_LOOP_COUNT; i++) { if (hasSynced(syncRev)) { // Done! return; } } // It seems like we have some contention. Let's try to not starve the // system. Let's make sure threads that land here proceed one by one. // XXX: The best option long term is most likely to use a futex on one of // the thread causing synchronization delay so this thread can be waked up // at an apropriate time. static std::condition_variable cond; static CWaitableCriticalSection cs; WAIT_LOCK(cs, lock); do { cond.notify_one(); } while (!cond.wait_for(lock, std::chrono::microseconds(1), [&] { return hasSynced(syncRev); })); } bool RCUInfos::hasSynced(uint64_t syncRev) { // Go over the list and check all threads are past the synchronization // point. RCULock lock(this); RCUInfos *current = threadInfos.load(); while (current != nullptr) { // If the current thread is not up to speed, bail. if (current->state < syncRev) { return false; } current = current->next.load(); } return true; } diff --git a/src/rcu.h b/src/rcu.h index 2e53ac6a6b..4eaf7ffdfd 100644 --- a/src/rcu.h +++ b/src/rcu.h @@ -1,64 +1,63 @@ // Copyright (c) 2018 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_RCU_H #define BITCOIN_RCU_H #include #include #include -#include #include class RCUInfos; class RCUReadLock; class RCUInfos { std::atomic next; std::atomic state; // The largest revision possible means unlocked. static const uint64_t UNLOCKED = -uint64_t(1); RCUInfos(); ~RCUInfos(); void readLock() { assert(!isLocked()); state.store(revision.load()); } void readFree() { assert(isLocked()); state.store(UNLOCKED); } bool isLocked() const { return state.load() != UNLOCKED; } void synchronize(); bool hasSynced(uint64_t syncRev); friend class RCULock; friend struct RCUTest; static std::atomic revision; static thread_local RCUInfos infos; }; class RCULock : public boost::noncopyable { RCUInfos *infos; RCULock(RCUInfos *infosIn) : infos(infosIn) { infos->readLock(); } friend class RCUInfos; public: RCULock() : RCULock(&RCUInfos::infos) {} ~RCULock() { infos->readFree(); } static void synchronize() { RCUInfos::infos.synchronize(); } static bool isLocked() { return RCUInfos::infos.isLocked(); } }; #endif // BITCOIN_RCU_H diff --git a/src/test/rcu_tests.cpp b/src/test/rcu_tests.cpp index d4a084c605..d81966097b 100644 --- a/src/test/rcu_tests.cpp +++ b/src/test/rcu_tests.cpp @@ -1,133 +1,133 @@ -// Copyright (c) 2010 The Bitcoin developers +// Copyright (c) 2018 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "rcu.h" #include "test/test_bitcoin.h" #include #include struct RCUTest { static uint64_t getRevision() { return RCUInfos::revision.load(); } static uint64_t hasSynced(uint64_t syncRev) { return RCUInfos::infos.hasSynced(syncRev); } }; BOOST_FIXTURE_TEST_SUITE(rcu_tests, BasicTestingSetup) enum RCUTestStep { Init, Locked, LockAck, RCULocked, Synchronizing, Synchronized, }; #define WAIT_FOR_STEP(step) \ do { \ cond.notify_all(); \ } while (!cond.wait_for(lock, std::chrono::milliseconds(1), \ [&] { return otherstep == step; })) void synchronize(std::atomic &step, const std::atomic &otherstep, CWaitableCriticalSection &cs, std::condition_variable &cond, std::atomic &syncRev) { BOOST_CHECK(step == RCUTestStep::Init); { WAIT_LOCK(cs, lock); step = RCUTestStep::Locked; // Wait for our lock to be acknowledged. WAIT_FOR_STEP(RCUTestStep::LockAck); RCULock rculock; // Update step. step = RCUTestStep::RCULocked; // Wait for master. WAIT_FOR_STEP(RCUTestStep::RCULocked); } // Update step. syncRev = RCUTest::getRevision() + 1; step = RCUTestStep::Synchronizing; BOOST_CHECK(!RCUTest::hasSynced(syncRev)); // We wait for readers. RCULock::synchronize(); // Update step. step = RCUTestStep::Synchronized; } void lockAndWaitForSynchronize(std::atomic &step, const std::atomic &otherstep, CWaitableCriticalSection &cs, std::condition_variable &cond, std::atomic &syncRev) { BOOST_CHECK(step == RCUTestStep::Init); WAIT_LOCK(cs, lock); // Wait for th eother thread to be locked. WAIT_FOR_STEP(RCUTestStep::Locked); step = RCUTestStep::LockAck; // Wait for the synchronizing tread to take its RCU lock. WAIT_FOR_STEP(RCUTestStep::RCULocked); BOOST_CHECK(!RCUTest::hasSynced(syncRev)); { RCULock rculock; // Update master step. step = RCUTestStep::RCULocked; while (RCUTest::getRevision() < syncRev) { WAIT_FOR_STEP(RCUTestStep::Synchronizing); } BOOST_CHECK(RCUTest::getRevision() >= syncRev); BOOST_CHECK_EQUAL(otherstep.load(), RCUTestStep::Synchronizing); } BOOST_CHECK(RCUTest::hasSynced(syncRev)); WAIT_FOR_STEP(RCUTestStep::Synchronized); } static const int COUNT = 128; BOOST_AUTO_TEST_CASE(synchronize_test) { CWaitableCriticalSection cs; std::condition_variable cond; std::atomic parentstep; std::atomic childstep; std::atomic syncRev; for (int i = 0; i < COUNT; i++) { parentstep = RCUTestStep::Init; childstep = RCUTestStep::Init; syncRev = RCUTest::getRevision() + 1; std::thread tlock([&] { lockAndWaitForSynchronize(parentstep, childstep, cs, cond, syncRev); }); std::thread tsync( [&] { synchronize(childstep, parentstep, cs, cond, syncRev); }); tlock.join(); tsync.join(); } } BOOST_AUTO_TEST_SUITE_END()