diff --git a/src/rcu.h b/src/rcu.h --- a/src/rcu.h +++ b/src/rcu.h @@ -10,13 +10,17 @@ #include #include #include +#include +#include class RCUInfos; class RCUReadLock; class RCUInfos { - std::atomic next; std::atomic state; + std::atomic next; + + std::map> cleanups; // The largest revision possible means unlocked. static const uint64_t UNLOCKED = -uint64_t(1); @@ -35,9 +39,13 @@ } bool isLocked() const { return state.load() != UNLOCKED; } + void registerCleanup(const std::function &f) { + cleanups.emplace(++revision, f); + } void synchronize(); - bool hasSynced(uint64_t syncRev); + void runCleanups(); + uint64_t hasSyncedTo(uint64_t cutoff = UNLOCKED); friend class RCULock; friend struct RCUTest; @@ -56,8 +64,12 @@ RCULock() : RCULock(&RCUInfos::infos) {} ~RCULock() { infos->readFree(); } - static void synchronize() { RCUInfos::infos.synchronize(); } static bool isLocked() { return RCUInfos::infos.isLocked(); } + static void registerCleanup(const std::function &f) { + RCUInfos::infos.registerCleanup(f); + } + + static void synchronize() { RCUInfos::infos.synchronize(); } }; #endif // BITCOIN_RCU_H diff --git a/src/rcu.cpp b/src/rcu.cpp --- a/src/rcu.cpp +++ b/src/rcu.cpp @@ -5,6 +5,7 @@ #include "rcu.h" #include "sync.h" +#include #include #include @@ -14,7 +15,7 @@ /** * How many time a busy loop runs before yelding. */ -static const int RCU_ACTIVE_LOOP_COUNT = 30; +static const int RCU_ACTIVE_LOOP_COUNT = 10; /** * We maintain a linked list of all the RCUInfos for each active thread. Upon @@ -102,7 +103,7 @@ static std::atomic threadInfos{nullptr}; static CCriticalSection csThreadInfosDelete; -RCUInfos::RCUInfos() : next(nullptr), state(0) { +RCUInfos::RCUInfos() : state(0), next(nullptr) { RCUInfos *head = threadInfos.load(); do { next.store(head); @@ -113,6 +114,15 @@ } RCUInfos::~RCUInfos() { + /** + * Before the thread is removed from the list, make sure we cleanup + * everything. + */ + runCleanups(); + while (cleanups.size() > 0) { + synchronize(); + } + while (true) { LOCK(csThreadInfosDelete); @@ -160,8 +170,8 @@ // Loop a few time lock free. for (int i = 0; i < RCU_ACTIVE_LOOP_COUNT; i++) { - if (hasSynced(syncRev)) { - // Done! + runCleanups(); + if (cleanups.empty() && hasSyncedTo(syncRev)) { return; } } @@ -176,24 +186,47 @@ WAIT_LOCK(cs, lock); do { + runCleanups(); cond.notify_one(); - } while (!cond.wait_for(lock, std::chrono::microseconds(1), - [&] { return hasSynced(syncRev); })); + } while (!cond.wait_for(lock, std::chrono::microseconds(1), [&] { + return cleanups.empty() && hasSyncedTo(syncRev); + })); +} + +void RCUInfos::runCleanups() { + // By the time we run a set of cleanups, we may have more cleanups + // available so we loop until there is nothing available for cleanup. + while (true) { + if (cleanups.empty()) { + // There is nothing to cleanup. + return; + } + + auto it = cleanups.begin(); + uint64_t syncedTo = hasSyncedTo(it->first); + while (it != cleanups.end() && it->first <= syncedTo) { + // Run the cleanup and remove it from the map. + it->second(); + cleanups.erase(it++); + } + } } -bool RCUInfos::hasSynced(uint64_t syncRev) { +uint64_t RCUInfos::hasSyncedTo(uint64_t cutoff) { + uint64_t syncedTo = revision.load(); + // Go over the list and check all threads are past the synchronization // point. RCULock lock(this); RCUInfos *current = threadInfos.load(); while (current != nullptr) { - // If the current thread is not up to speed, bail. - if (current->state < syncRev) { - return false; + syncedTo = std::min(syncedTo, current->state.load()); + if (syncedTo < cutoff) { + return 0; } current = current->next.load(); } - return true; + return syncedTo; } diff --git a/src/test/rcu_tests.cpp b/src/test/rcu_tests.cpp --- a/src/test/rcu_tests.cpp +++ b/src/test/rcu_tests.cpp @@ -13,8 +13,12 @@ struct RCUTest { static uint64_t getRevision() { return RCUInfos::revision.load(); } - static uint64_t hasSynced(uint64_t syncRev) { - return RCUInfos::infos.hasSynced(syncRev); + static uint64_t hasSyncedTo(uint64_t syncRev) { + return RCUInfos::infos.hasSyncedTo(syncRev); + } + + static std::map> &getCleanups() { + return RCUInfos::infos.cleanups; } }; @@ -61,7 +65,7 @@ syncRev = RCUTest::getRevision() + 1; step = RCUTestStep::Synchronizing; - assert(!RCUTest::hasSynced(syncRev)); + assert(!RCUTest::hasSyncedTo(syncRev)); // We wait for readers. RCULock::synchronize(); @@ -84,7 +88,7 @@ // Wait for the synchronizing tread to take its RCU lock. WAIT_FOR_STEP(RCUTestStep::RCULocked); - assert(!RCUTest::hasSynced(syncRev)); + assert(!RCUTest::hasSyncedTo(syncRev)); { RCULock rculock; @@ -100,7 +104,7 @@ assert(otherstep.load() == RCUTestStep::Synchronizing); } - assert(RCUTest::hasSynced(syncRev)); + assert(RCUTest::hasSyncedTo(syncRev) >= syncRev); WAIT_FOR_STEP(RCUTestStep::Synchronized); } @@ -130,4 +134,56 @@ } } +BOOST_AUTO_TEST_CASE(cleanup_test) { + BOOST_CHECK(RCUTest::getCleanups().empty()); + + bool isClean1 = false; + RCULock::registerCleanup([&] { isClean1 = true; }); + + BOOST_CHECK(!isClean1); + BOOST_CHECK_EQUAL(RCUTest::getCleanups().size(), 1); + BOOST_CHECK_EQUAL(RCUTest::getRevision(), + RCUTest::getCleanups().begin()->first); + + // Synchronize runs the cleanups. + RCULock::synchronize(); + BOOST_CHECK(RCUTest::getCleanups().empty()); + BOOST_CHECK(isClean1); + + // Check multiple callbacks. + isClean1 = false; + bool isClean2 = false; + bool isClean3 = false; + RCULock::registerCleanup([&] { isClean1 = true; }); + RCULock::registerCleanup([&] { isClean2 = true; }); + RCULock::registerCleanup([&] { isClean3 = true; }); + + BOOST_CHECK_EQUAL(RCUTest::getCleanups().size(), 3); + RCULock::synchronize(); + BOOST_CHECK(RCUTest::getCleanups().empty()); + BOOST_CHECK(isClean1); + BOOST_CHECK(isClean2); + BOOST_CHECK(isClean3); + + // Check callbacks adding each others. + isClean1 = false; + isClean2 = false; + isClean3 = false; + + RCULock::registerCleanup([&] { + isClean1 = true; + RCULock::registerCleanup([&] { + isClean2 = true; + RCULock::registerCleanup([&] { isClean3 = true; }); + }); + }); + + BOOST_CHECK_EQUAL(RCUTest::getCleanups().size(), 1); + RCULock::synchronize(); + BOOST_CHECK(RCUTest::getCleanups().empty()); + BOOST_CHECK(isClean1); + BOOST_CHECK(isClean2); + BOOST_CHECK(isClean3); +} + BOOST_AUTO_TEST_SUITE_END()