diff --git a/src/net.h b/src/net.h --- a/src/net.h +++ b/src/net.h @@ -1213,7 +1213,10 @@ void ThreadOpenAddedConnections(); void AddAddrFetch(const std::string &strDest); void ProcessAddrFetch(); - void ThreadOpenConnections(std::vector connect); + void + ThreadOpenConnections(std::vector connect, + std::function + mockOpenConnection); void ThreadMessageHandler(); void ThreadI2PAcceptIncoming(); void AcceptConnection(const ListenSocket &hListenSocket); diff --git a/src/net.cpp b/src/net.cpp --- a/src/net.cpp +++ b/src/net.cpp @@ -2243,7 +2243,9 @@ return std::max(block_relay_peers - m_max_outbound_block_relay, 0); } -void CConnman::ThreadOpenConnections(const std::vector connect) { +void CConnman::ThreadOpenConnections( + const std::vector connect, + std::function mockOpenConnection) { // Connect to specific addresses if (!connect.empty()) { for (int64_t nLoop = 0;; nLoop++) { @@ -2282,7 +2284,9 @@ while (!interruptNet) { ProcessAddrFetch(); - if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) { + // No need to sleep the thread if we are mocking the network connection + if (!mockOpenConnection && + !interruptNet.sleep_for(std::chrono::milliseconds(500))) { return; } @@ -2569,10 +2573,16 @@ addrConnect.ToString()); } - OpenNetworkConnection(addrConnect, - int(setConnected.size()) >= - std::min(nMaxConnections - 1, 2), - &grant, nullptr, conn_type); + // This mock is for testing purpose only. It prevents the thread + // from attempting the connection which is useful for testing. + if (mockOpenConnection) { + mockOpenConnection(addrConnect, conn_type); + } else { + OpenNetworkConnection(addrConnect, + int(setConnected.size()) >= + std::min(nMaxConnections - 1, 2), + &grant, nullptr, conn_type); + } } } } @@ -3145,9 +3155,9 @@ !connOptions.m_specified_outgoing.empty()) { threadOpenConnections = std::thread(&TraceThread>, "opencon", - std::function( - std::bind(&CConnman::ThreadOpenConnections, this, - connOptions.m_specified_outgoing))); + std::function(std::bind( + &CConnman::ThreadOpenConnections, this, + connOptions.m_specified_outgoing, nullptr))); } // Process messages diff --git a/src/test/net_tests.cpp b/src/test/net_tests.cpp --- a/src/test/net_tests.cpp +++ b/src/test/net_tests.cpp @@ -6,9 +6,12 @@ #include #include +#include +#include #include #include #include +#include #include #include #include @@ -16,6 +19,7 @@ #include #include #include +#include // for bilingual_str #include #include @@ -23,35 +27,56 @@ #include #include +#include #include +#include #include +#include #include #include #include using namespace std::literals; +static CNetAddr ip(uint32_t ip) { + struct in_addr s; + s.s_addr = ip; + return CNetAddr(s); +} + namespace { struct CConnmanTest : public CConnman { using CConnman::CConnman; + Mutex cs; + size_t outboundFullRelayCount GUARDED_BY(cs); + size_t avalancheOutboundsCount GUARDED_BY(cs); + + std::condition_variable cvar; + NodeId nodeid = 0; + CConnmanTest() : CConnman(GetConfig(), 0x1337, 0x1337) {} + ~CConnmanTest() {} + void AddNode(ConnectionType type) { + CAddress addr( + CService(ip(GetRandInt(0xffffffff)), Params().GetDefaultPort()), + NODE_NONE); + + return AddNode(addr, type); + } + + void AddNode(const CAddress &addr, ConnectionType type) { ServiceFlags services = NODE_NETWORK; if (type == ConnectionType::AVALANCHE_OUTBOUND) { services = ServiceFlags(services | NODE_AVALANCHE); } - struct in_addr s; - s.s_addr = GetRandInt(0xffffffff); - CAddress addr(CService(CNetAddr(s), Params().GetDefaultPort()), - NODE_NONE); - CNode *pnode = new CNode(nodeid++, services, INVALID_SOCKET, addr, - /* nKeyedNetGroupIn */ 0, + CalculateKeyedNetGroup(addr), /* nLocalHostNonceIn */ 0, - /* nLocalExtraEntropyIn */ 0, CAddress(), + /* nLocalExtraEntropyIn */ 0, addr, /* pszDest */ "", type, /* inbound_onion */ false); @@ -75,6 +100,140 @@ options.m_max_avalanche_outbound = maxAvalancheOutbounds; Init(options); }; + + void MakeAddrmanDeterministic() { addrman.MakeDeterministic(); } + + void Init(const Options &connOptions) { + CConnman::Init(connOptions); + + if (semOutbound == nullptr) { + // initialize semaphore + semOutbound = std::make_unique( + std::min(m_max_outbound, nMaxConnections)); + } + if (semAddnode == nullptr) { + // initialize semaphore + semAddnode = std::make_unique(nMaxAddnode); + } + } + + void openNetworkConnection(const CAddress &addrConnect, + ConnectionType connType) { + bool newConnection = !AlreadyConnectedToAddress(addrConnect); + addrman.Attempt(addrConnect, true); + + if (newConnection) { + { + LOCK(cs); + + if (connType == ConnectionType::AVALANCHE_OUTBOUND) { + avalancheOutboundsCount++; + } + if (connType == ConnectionType::OUTBOUND_FULL_RELAY) { + outboundFullRelayCount++; + } + } + + AddNode(addrConnect, connType); + BOOST_CHECK(AlreadyConnectedToAddress(addrConnect)); + addrman.Connected(addrConnect); + } + + cvar.notify_all(); + } + + struct TestAddresses { + uint32_t group; + uint32_t services; + size_t quantity; + }; + + bool checkContiguousAddressesConnection( + const std::vector &testAddresses, + size_t expectedOutboundFullRelayCount, + size_t expectedAvalancheOutboundsCount) { + { + LOCK(cs); + + // Reset + outboundFullRelayCount = 0; + avalancheOutboundsCount = 0; + } + + addrman.Clear(); + ClearNodes(); + + struct IpGen { + uint32_t baseIp; + uint32_t offset; + }; + std::vector ipGroups{ + {0x00010101, 1}, {0x00010164, 1}, {0x000101c8, 1}, {0x00010201, 1}, + {0x00010264, 1}, {0x000102c8, 1}, {0x00010301, 1}, {0x00010364, 1}, + {0x000103c8, 1}, {0x00010401, 1}, {0x00010464, 1}, {0x000104c8, 1}}; + + { + // Make sure we produce addresses in different groups as expected + std::set> groups; + for (auto &[baseIp, _] : ipGroups) { + for (uint32_t j = 0; j < 255; j++) { + CNetAddr addr = ip(baseIp + (j << 24)); + groups.insert(addr.GetGroup({})); + } + } + BOOST_CHECK_EQUAL(groups.size(), ipGroups.size()); + } + + // Generate contiguous addresses + auto getAddrGroup = [&](size_t group, uint64_t services) { + CNetAddr addr = + ip(ipGroups[group].baseIp + (ipGroups[group].offset++ << 24)); + return CAddress(CService(addr, Params().GetDefaultPort()), + ServiceFlags(services)); + }; + + size_t addressCount = 0; + for (const TestAddresses &addresses : testAddresses) { + assert(addresses.group < ipGroups.size()); + + addressCount += addresses.quantity; + do { + addrman.Add(getAddrGroup(addresses.group, + ServiceFlags(addresses.services)), + CNetAddr()); + } while (addrman.size() < addressCount); + } + + interruptNet.reset(); + std::vector empty; + threadOpenConnections = std::thread( + &CConnman::ThreadOpenConnections, this, empty, + std::bind(&CConnmanTest::openNetworkConnection, this, + std::placeholders::_1, std::placeholders::_2)); + + Mutex mutex; + WAIT_LOCK(mutex, lock); + bool ret = cvar.wait_for(lock, 10s, [&]() { + LOCK(cs); + return outboundFullRelayCount == expectedOutboundFullRelayCount && + avalancheOutboundsCount == expectedAvalancheOutboundsCount; + }); + + // Check each node belongs to a different group + std::set> groups; + ForEachNode([&](const CNode *pnode) { + groups.insert(pnode->addr.GetGroup({})); + }); + BOOST_CHECK_EQUAL(groups.size(), expectedOutboundFullRelayCount + + expectedAvalancheOutboundsCount); + + interruptNet(); + if (threadOpenConnections.joinable()) { + threadOpenConnections.join(); + } + + return ret; + } }; } // namespace @@ -1002,4 +1161,98 @@ checkExtraFullOutboundCount(5, 5, 2); } +BOOST_FIXTURE_TEST_CASE(net_group_limit, TestChain100Setup) { + const CChainParams ¶ms = GetConfig().GetChainParams(); + + m_node.connman = std::make_unique(); + m_node.peerman = PeerManager::make( + params, *m_node.connman, m_node.banman.get(), *m_node.scheduler, + *m_node.chainman, *m_node.mempool, false); + + bilingual_str error; + // Init the global avalanche object otherwise the avalanche outbound + // slots are not allocated. + g_avalanche = avalanche::Processor::MakeProcessor( + *m_node.args, *m_node.chain, m_node.connman.get(), *m_node.chainman, + *m_node.scheduler, error); + BOOST_CHECK(g_avalanche); + + CConnman::Options options; + options.nMaxConnections = 200; + options.m_max_outbound_full_relay = 8; + options.m_max_avalanche_outbound = 60; + + auto connman = static_cast(m_node.connman.get()); + connman->MakeAddrmanDeterministic(); + connman->Init(options); + + // Single full relay outbound is no problem + BOOST_CHECK(connman->checkContiguousAddressesConnection( + { + // group, services, quantity + {0, NODE_NETWORK, 1}, + }, + 1, // Expected full-relay outbound count + 0 // Expected avalanche outbound count + )); + + // Adding more contiguous full relay outbounds fails due to network group + // limitation + BOOST_CHECK(connman->checkContiguousAddressesConnection( + { + // group, services, quantity + {0, NODE_NETWORK, 3}, + }, + 1, // Expected full-relay outbound count + 0 // Expected avalanche outbound count + )); + + // Outbounds from different groups can be connected + BOOST_CHECK(connman->checkContiguousAddressesConnection( + { + // group, services, quantity + {0, NODE_NETWORK, 1}, + {1, NODE_NETWORK, 1}, + {2, NODE_NETWORK, 1}, + }, + 3, // Expected full-relay outbound count + 0 // Expected avalanche outbound count + )); + + // Up to the max + BOOST_CHECK(connman->checkContiguousAddressesConnection( + { + // group, services, quantity + {0, NODE_NETWORK, 1}, + {1, NODE_NETWORK, 1}, + {2, NODE_NETWORK, 1}, + {3, NODE_NETWORK, 1}, + {4, NODE_NETWORK, 1}, + {5, NODE_NETWORK, 1}, + {6, NODE_NETWORK, 1}, + {7, NODE_NETWORK, 1}, + {8, NODE_NETWORK, 1}, + {9, NODE_NETWORK, 1}, + {10, NODE_NETWORK, 1}, + {11, NODE_NETWORK, 1}, + }, + options.m_max_outbound_full_relay, // Expected full-relay outbound count + 0 // Expected avalanche outbound count + )); + + // Avalanche outbounds are prioritized, so contiguous full relay outbounds + // will fail due to network group limitation + BOOST_CHECK(connman->checkContiguousAddressesConnection( + { + // group, services, quantity + {0, NODE_NETWORK | NODE_AVALANCHE, 1}, + {0, NODE_NETWORK, 3}, + }, + 0, // Expected full-relay outbound count + 1 // Expected avalanche outbound count + )); + + g_avalanche.reset(); +} + BOOST_AUTO_TEST_SUITE_END()