diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -699,6 +699,8 @@ peerManager->updateNextRequestTime(pnode->GetId(), timeout); } + pnode->m_avalanche_state->invsPolled(invs.size()); + // Send the query to the node. connman->PushMessage( pnode, CNetMsgMaker(pnode->GetCommonVersion()) diff --git a/src/avalanche/test/processor_tests.cpp b/src/avalanche/test/processor_tests.cpp --- a/src/avalanche/test/processor_tests.cpp +++ b/src/avalanche/test/processor_tests.cpp @@ -109,6 +109,7 @@ m_node.peerman->InitializeNode(config, node); node->nVersion = 1; node->fSuccessfullyConnected = true; + node->m_avalanche_state = std::make_unique(); m_connman->AddNode(*node); return node; diff --git a/src/net.h b/src/net.h --- a/src/net.h +++ b/src/net.h @@ -99,6 +99,18 @@ static const size_t DEFAULT_MAXRECEIVEBUFFER = 5 * 1000; static const size_t DEFAULT_MAXSENDBUFFER = 1 * 1000; +/** Refresh period for the avalanche statistics computation */ +static constexpr std::chrono::minutes AVALANCHE_STATISTICS_REFRESH_PERIOD{10}; +/** Time constant for the avalanche statistics computation */ +static constexpr std::chrono::minutes AVALANCHE_STATISTICS_TIME_CONSTANT{10}; +/** + * Pre-computed decay factor for the avalanche statistics computation. + * There is currently no constexpr variant of std::exp, so use a const. + */ +static const double AVALANCHE_STATISTICS_DECAY_FACTOR = + 1. - std::exp(-1. * AVALANCHE_STATISTICS_REFRESH_PERIOD.count() / + AVALANCHE_STATISTICS_TIME_CONSTANT.count()); + typedef int64_t NodeId; /** @@ -1036,10 +1048,53 @@ // m_proof_relay == nullptr if we're not relaying proofs with this peer std::unique_ptr m_proof_relay; - struct AvalancheState { - AvalancheState() {} + class AvalancheState { + /** + * The inventories polled and voted couters since last score + * computation, stored as a pair of uint32_t with the poll counter + * being the 32 lowest bits and the vote counter the 32 highest bits. + */ + std::atomic invCounters; + + /** The last computed score */ + std::atomic availabilityScore; + /** + * Protect the sequence of operations required for updating the + * statistics. + */ + Mutex cs_statistics; + + public: CPubKey pubkey; + + AvalancheState() : invCounters(0), availabilityScore(0.) {} + + /** The node was polled for count invs */ + void invsPolled(uint32_t count); + + /** The node voted for count invs */ + void invsVoted(uint32_t count); + + /** + * The availability score is calculated using an exponentially weighted + * average. + * This has several interesting properties: + * - The most recent polls/responses have more weight than the previous + * ones. A node that recently stopped answering will see its ratio + * decrease quickly. + * - This is a low-pass filter, so it causes delay. This means that a + * node needs to have a track record for the ratio to be high. A node + * that has been little requested will have a lower ratio than a node + * that failed to answer a few polls but answered a lot of them. + * - It is cheap to compute. + * + * This is expected to be called at a fixed interval of + * AVALANCHE_STATISTICS_REFRESH_PERIOD. + */ + void updateAvailabilityScore(); + + double getAvailabilityScore() const; }; // m_avalanche_state == nullptr if we're not using avalanche with this peer diff --git a/src/net.cpp b/src/net.cpp --- a/src/net.cpp +++ b/src/net.cpp @@ -44,7 +44,9 @@ "miniUPnPc API version >= 10 assumed"); #endif +#include #include +#include #include #include @@ -3046,6 +3048,34 @@ return nReceiveFloodSize; } +void CNode::AvalancheState::invsPolled(uint32_t count) { + invCounters += count; +} + +void CNode::AvalancheState::invsVoted(uint32_t count) { + invCounters += uint64_t(count) << 32; +} + +void CNode::AvalancheState::updateAvailabilityScore() { + LOCK(cs_statistics); + + uint64_t windowInvCounters = invCounters.exchange(0); + double previousScore = availabilityScore; + + uint32_t polls = windowInvCounters & std::numeric_limits::max(); + uint32_t votes = windowInvCounters >> 32; + + availabilityScore = + AVALANCHE_STATISTICS_DECAY_FACTOR * (2 * votes - polls) + + (1. - AVALANCHE_STATISTICS_DECAY_FACTOR) * previousScore; +} + +double CNode::AvalancheState::getAvailabilityScore() const { + // The score is set atomically so there is no need to lock the statistics + // when reading. + return availabilityScore; +} + CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, diff --git a/src/net_processing.h b/src/net_processing.h --- a/src/net_processing.h +++ b/src/net_processing.h @@ -140,6 +140,11 @@ */ void ReattemptInitialBroadcast(CScheduler &scheduler) const; + /** + * Update the avalanche statistics for all the nodes + */ + void UpdateAvalancheStatistics() const; + private: // overloaded variant of above to operate on CNode*s void Misbehaving(const CNode &node, int howmuch, diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1121,6 +1121,14 @@ delta); } +void PeerManager::UpdateAvalancheStatistics() const { + m_connman.ForEachNode([](CNode *pnode) { + if (pnode->m_avalanche_state) { + pnode->m_avalanche_state->updateAvailabilityScore(); + } + }); +} + void PeerManager::FinalizeNode(const Config &config, NodeId nodeid, bool &fUpdateConnectionTime) { fUpdateConnectionTime = false; @@ -1543,6 +1551,14 @@ std::chrono::minutes{10} + GetRandMillis(std::chrono::minutes{5}); scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta); + + // Update the avalanche statistics on a schedule + scheduler.scheduleEvery( + [this]() { + UpdateAvalancheStatistics(); + return true; + }, + AVALANCHE_STATISTICS_REFRESH_PERIOD); } /** @@ -4347,6 +4363,8 @@ return; } + pfrom.m_avalanche_state->invsVoted(response.GetVotes().size()); + if (updates.size()) { for (avalanche::BlockUpdate &u : updates) { CBlockIndex *pindex = u.getBlockIndex(); diff --git a/src/test/net_tests.cpp b/src/test/net_tests.cpp --- a/src/test/net_tests.cpp +++ b/src/test/net_tests.cpp @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -955,4 +956,63 @@ } } +BOOST_AUTO_TEST_CASE(avalanche_statistics) { + const uint32_t step = AVALANCHE_STATISTICS_REFRESH_PERIOD.count(); + const uint32_t tau = AVALANCHE_STATISTICS_TIME_CONSTANT.count(); + + CNode::AvalancheState avastats; + + double previousScore = avastats.getAvailabilityScore(); + BOOST_CHECK_SMALL(previousScore, 1e-6); + + // Check the statistics follow an exponential response for 1 to 10 tau + for (size_t i = 1; i <= 10; i++) { + for (uint32_t j = 0; j < tau; j += step) { + avastats.invsPolled(1); + // Always respond to everything correctly + avastats.invsVoted(1); + + avastats.updateAvailabilityScore(); + + // Expect a monotonic rise + double currentScore = avastats.getAvailabilityScore(); + BOOST_CHECK_GE(currentScore, previousScore); + previousScore = currentScore; + } + + // We expect (1 - e^-i) after i * tau. The tolerance is expressed + // as a percentage, and we add a (large) 0.1% margin to account for + // floating point errors. + BOOST_CHECK_CLOSE(previousScore, -1 * std::expm1(-1. * i), 100.1 / tau); + } + + // After 10 tau we should be very close to 100% (about 99.995%) + BOOST_CHECK_CLOSE(previousScore, 1., 0.01); + + for (size_t i = 1; i <= 3; i++) { + for (uint32_t j = 0; j < tau; j += step) { + avastats.invsPolled(2); + + // Stop responding to the polls. + avastats.invsVoted(1); + + avastats.updateAvailabilityScore(); + + // Expect a monotonic fall + double currentScore = avastats.getAvailabilityScore(); + BOOST_CHECK_LE(currentScore, previousScore); + previousScore = currentScore; + } + + // There is a slight error in the expected value because we did not + // start the decay at exactly 100%, but the 0.1% margin is at least an + // order of magnitude larger than the expected error so it doesn't + // matter. + BOOST_CHECK_CLOSE(previousScore, 1. + std::expm1(-1. * i), 100.1 / tau); + } + + // After 3 more tau we should be under 5% + BOOST_CHECK_LT(previousScore, .05); +} + BOOST_AUTO_TEST_SUITE_END()