diff --git a/src/net.h b/src/net.h --- a/src/net.h +++ b/src/net.h @@ -346,6 +346,7 @@ void DisconnectNodes(); void NotifyNumConnectionsChanged(); void InactivityCheck(CNode *pnode); + void SocketHandler(); void ThreadSocketHandler(); void ThreadDNSAddressSeed(); diff --git a/src/net.cpp b/src/net.cpp --- a/src/net.cpp +++ b/src/net.cpp @@ -1338,220 +1338,222 @@ } } -void CConnman::ThreadSocketHandler() { - while (!interruptNet) { - DisconnectNodes(); - NotifyNumConnectionsChanged(); +void CConnman::SocketHandler() { + // + // Find which sockets have data to receive + // + struct timeval timeout; + timeout.tv_sec = 0; + // frequency to poll pnode->vSend + timeout.tv_usec = 50000; + + fd_set fdsetRecv; + fd_set fdsetSend; + fd_set fdsetError; + FD_ZERO(&fdsetRecv); + FD_ZERO(&fdsetSend); + FD_ZERO(&fdsetError); + SOCKET hSocketMax = 0; + bool have_fds = false; + + for (const ListenSocket &hListenSocket : vhListenSocket) { + FD_SET(hListenSocket.socket, &fdsetRecv); + hSocketMax = std::max(hSocketMax, hListenSocket.socket); + have_fds = true; + } - // - // Find which sockets have data to receive - // - struct timeval timeout; - timeout.tv_sec = 0; - // Frequency to poll pnode->vSend - timeout.tv_usec = 50000; - - fd_set fdsetRecv; - fd_set fdsetSend; - fd_set fdsetError; - FD_ZERO(&fdsetRecv); - FD_ZERO(&fdsetSend); - FD_ZERO(&fdsetError); - SOCKET hSocketMax = 0; - bool have_fds = false; + { + LOCK(cs_vNodes); + for (CNode *pnode : vNodes) { + // Implement the following logic: + // * If there is data to send, select() for sending data. As this + // only happens when optimistic write failed, we choose to first + // drain the write buffer in this case before receiving more. This + // avoids needlessly queueing received data, if the remote peer is + // not themselves receiving data. This means properly utilizing + // TCP flow control signalling. + // * Otherwise, if there is space left in the receive buffer, + // select() for receiving data. + // * Hand off all complete messages to the processor, to be handled + // without blocking here. + + bool select_recv = !pnode->fPauseRecv; + bool select_send; + { + LOCK(pnode->cs_vSend); + select_send = !pnode->vSendMsg.empty(); + } - for (const ListenSocket &hListenSocket : vhListenSocket) { - FD_SET(hListenSocket.socket, &fdsetRecv); - hSocketMax = std::max(hSocketMax, hListenSocket.socket); - have_fds = true; - } + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) { + continue; + } - { - LOCK(cs_vNodes); - for (CNode *pnode : vNodes) { - // Implement the following logic: - // * If there is data to send, select() for sending data. As - // this only happens when optimistic write failed, we choose to - // first drain the write buffer in this case before receiving - // more. This avoids needlessly queueing received data, if the - // remote peer is not themselves receiving data. This means - // properly utilizing TCP flow control signalling. - // * Otherwise, if there is space left in the receive buffer, - // select() for receiving data. - // * Hand off all complete messages to the processor, to be - // handled without blocking here. - - bool select_recv = !pnode->fPauseRecv; - bool select_send; - { - LOCK(pnode->cs_vSend); - select_send = !pnode->vSendMsg.empty(); - } + FD_SET(pnode->hSocket, &fdsetError); + hSocketMax = std::max(hSocketMax, pnode->hSocket); + have_fds = true; - LOCK(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) { - continue; - } + if (select_send) { + FD_SET(pnode->hSocket, &fdsetSend); + continue; + } + if (select_recv) { + FD_SET(pnode->hSocket, &fdsetRecv); + } + } + } - FD_SET(pnode->hSocket, &fdsetError); - hSocketMax = std::max(hSocketMax, pnode->hSocket); - have_fds = true; + int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv, &fdsetSend, + &fdsetError, &timeout); + if (interruptNet) { + return; + } - if (select_send) { - FD_SET(pnode->hSocket, &fdsetSend); - continue; - } - if (select_recv) { - FD_SET(pnode->hSocket, &fdsetRecv); - } + if (nSelect == SOCKET_ERROR) { + if (have_fds) { + int nErr = WSAGetLastError(); + LogPrintf("socket select error %s\n", NetworkErrorString(nErr)); + for (unsigned int i = 0; i <= hSocketMax; i++) { + FD_SET(i, &fdsetRecv); } } - - int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv, - &fdsetSend, &fdsetError, &timeout); - if (interruptNet) { + FD_ZERO(&fdsetSend); + FD_ZERO(&fdsetError); + if (!interruptNet.sleep_for( + std::chrono::milliseconds(timeout.tv_usec / 1000))) { return; } + } - if (nSelect == SOCKET_ERROR) { - if (have_fds) { - int nErr = WSAGetLastError(); - LogPrintf("socket select error %s\n", NetworkErrorString(nErr)); - for (unsigned int i = 0; i <= hSocketMax; i++) { - FD_SET(i, &fdsetRecv); - } - } - FD_ZERO(&fdsetSend); - FD_ZERO(&fdsetError); - if (!interruptNet.sleep_for( - std::chrono::milliseconds(timeout.tv_usec / 1000))) { - return; - } + // + // Accept new connections + // + for (const ListenSocket &hListenSocket : vhListenSocket) { + if (hListenSocket.socket != INVALID_SOCKET && + FD_ISSET(hListenSocket.socket, &fdsetRecv)) { + AcceptConnection(hListenSocket); } + } - // - // Accept new connections - // - for (const ListenSocket &hListenSocket : vhListenSocket) { - if (hListenSocket.socket != INVALID_SOCKET && - FD_ISSET(hListenSocket.socket, &fdsetRecv)) { - AcceptConnection(hListenSocket); - } + // + // Service each socket + // + std::vector vNodesCopy; + { + LOCK(cs_vNodes); + vNodesCopy = vNodes; + for (CNode *pnode : vNodesCopy) { + pnode->AddRef(); + } + } + for (CNode *pnode : vNodesCopy) { + if (interruptNet) { + return; } // - // Service each socket + // Receive // - std::vector vNodesCopy; + bool recvSet = false; + bool sendSet = false; + bool errorSet = false; { - LOCK(cs_vNodes); - vNodesCopy = vNodes; - for (CNode *pnode : vNodesCopy) { - pnode->AddRef(); - } - } - for (CNode *pnode : vNodesCopy) { - if (interruptNet) { - return; + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) { + continue; } - - // - // Receive - // - bool recvSet = false; - bool sendSet = false; - bool errorSet = false; + recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv); + sendSet = FD_ISSET(pnode->hSocket, &fdsetSend); + errorSet = FD_ISSET(pnode->hSocket, &fdsetError); + } + if (recvSet || errorSet) { + // typical socket buffer is 8K-64K + char pchBuf[0x10000]; + int32_t nBytes = 0; { LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) { continue; } - recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv); - sendSet = FD_ISSET(pnode->hSocket, &fdsetSend); - errorSet = FD_ISSET(pnode->hSocket, &fdsetError); + nBytes = + recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); } - if (recvSet || errorSet) { - // typical socket buffer is 8K-64K - char pchBuf[0x10000]; - int32_t nBytes = 0; - { - LOCK(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) { - continue; - } - nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), - MSG_DONTWAIT); + if (nBytes > 0) { + bool notify = false; + if (!pnode->ReceiveMsgBytes(*config, pchBuf, nBytes, notify)) { + pnode->CloseSocketDisconnect(); } - if (nBytes > 0) { - bool notify = false; - if (!pnode->ReceiveMsgBytes(*config, pchBuf, nBytes, - notify)) { - pnode->CloseSocketDisconnect(); - } - RecordBytesRecv(nBytes); - if (notify) { - size_t nSizeAdded = 0; - auto it(pnode->vRecvMsg.begin()); - for (; it != pnode->vRecvMsg.end(); ++it) { - if (!it->complete()) { - break; - } - nSizeAdded += - it->vRecv.size() + CMessageHeader::HEADER_SIZE; + RecordBytesRecv(nBytes); + if (notify) { + size_t nSizeAdded = 0; + auto it(pnode->vRecvMsg.begin()); + for (; it != pnode->vRecvMsg.end(); ++it) { + if (!it->complete()) { + break; } - { - LOCK(pnode->cs_vProcessMsg); - pnode->vProcessMsg.splice( - pnode->vProcessMsg.end(), pnode->vRecvMsg, - pnode->vRecvMsg.begin(), it); - pnode->nProcessQueueSize += nSizeAdded; - pnode->fPauseRecv = - pnode->nProcessQueueSize > nReceiveFloodSize; - } - WakeMessageHandler(); + nSizeAdded += + it->vRecv.size() + CMessageHeader::HEADER_SIZE; + } + { + LOCK(pnode->cs_vProcessMsg); + pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), + pnode->vRecvMsg, + pnode->vRecvMsg.begin(), it); + pnode->nProcessQueueSize += nSizeAdded; + pnode->fPauseRecv = + pnode->nProcessQueueSize > nReceiveFloodSize; } - } else if (nBytes == 0) { - // socket closed gracefully + WakeMessageHandler(); + } + } else if (nBytes == 0) { + // socket closed gracefully + if (!pnode->fDisconnect) { + LogPrint(BCLog::NET, "socket closed\n"); + } + pnode->CloseSocketDisconnect(); + } else if (nBytes < 0) { + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && + nErr != WSAEINTR && nErr != WSAEINPROGRESS) { if (!pnode->fDisconnect) { - LogPrint(BCLog::NET, "socket closed\n"); + LogPrintf("socket recv error %s\n", + NetworkErrorString(nErr)); } pnode->CloseSocketDisconnect(); - } else if (nBytes < 0) { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && - nErr != WSAEINTR && nErr != WSAEINPROGRESS) { - if (!pnode->fDisconnect) { - LogPrintf("socket recv error %s\n", - NetworkErrorString(nErr)); - } - pnode->CloseSocketDisconnect(); - } } } + } - // - // Send - // - if (sendSet) { - LOCK(pnode->cs_vSend); - size_t nBytes = SocketSendData(pnode); - if (nBytes) { - RecordBytesSent(nBytes); - } + // + // Send + // + if (sendSet) { + LOCK(pnode->cs_vSend); + size_t nBytes = SocketSendData(pnode); + if (nBytes) { + RecordBytesSent(nBytes); } - - InactivityCheck(pnode); } - { - LOCK(cs_vNodes); - for (CNode *pnode : vNodesCopy) { - pnode->Release(); - } + + InactivityCheck(pnode); + } + { + LOCK(cs_vNodes); + for (CNode *pnode : vNodesCopy) { + pnode->Release(); } } } +void CConnman::ThreadSocketHandler() { + while (!interruptNet) { + DisconnectNodes(); + NotifyNumConnectionsChanged(); + SocketHandler(); + } +} + void CConnman::WakeMessageHandler() { { std::lock_guard lock(mutexMsgProc);