Changeset View
Changeset View
Standalone View
Standalone View
src/net.cpp
Show First 20 Lines • Show All 1,332 Lines • ▼ Show 20 Lines | if (nTime - pnode->nTimeConnected > 60) { | ||||
} else if (!pnode->fSuccessfullyConnected) { | } else if (!pnode->fSuccessfullyConnected) { | ||||
LogPrint(BCLog::NET, "version handshake timeout from %d\n", | LogPrint(BCLog::NET, "version handshake timeout from %d\n", | ||||
pnode->GetId()); | pnode->GetId()); | ||||
pnode->fDisconnect = true; | pnode->fDisconnect = true; | ||||
} | } | ||||
} | } | ||||
} | } | ||||
void CConnman::ThreadSocketHandler() { | void CConnman::SocketHandler() { | ||||
while (!interruptNet) { | |||||
DisconnectNodes(); | |||||
NotifyNumConnectionsChanged(); | |||||
// | // | ||||
// Find which sockets have data to receive | // Find which sockets have data to receive | ||||
// | // | ||||
struct timeval timeout; | struct timeval timeout; | ||||
timeout.tv_sec = 0; | timeout.tv_sec = 0; | ||||
// Frequency to poll pnode->vSend | // frequency to poll pnode->vSend | ||||
Fabien: NIt: restore capitalization. | |||||
timeout.tv_usec = 50000; | timeout.tv_usec = 50000; | ||||
fd_set fdsetRecv; | fd_set fdsetRecv; | ||||
fd_set fdsetSend; | fd_set fdsetSend; | ||||
fd_set fdsetError; | fd_set fdsetError; | ||||
FD_ZERO(&fdsetRecv); | FD_ZERO(&fdsetRecv); | ||||
FD_ZERO(&fdsetSend); | FD_ZERO(&fdsetSend); | ||||
FD_ZERO(&fdsetError); | FD_ZERO(&fdsetError); | ||||
SOCKET hSocketMax = 0; | SOCKET hSocketMax = 0; | ||||
bool have_fds = false; | bool have_fds = false; | ||||
for (const ListenSocket &hListenSocket : vhListenSocket) { | for (const ListenSocket &hListenSocket : vhListenSocket) { | ||||
FD_SET(hListenSocket.socket, &fdsetRecv); | FD_SET(hListenSocket.socket, &fdsetRecv); | ||||
hSocketMax = std::max(hSocketMax, hListenSocket.socket); | hSocketMax = std::max(hSocketMax, hListenSocket.socket); | ||||
have_fds = true; | have_fds = true; | ||||
} | } | ||||
{ | { | ||||
LOCK(cs_vNodes); | LOCK(cs_vNodes); | ||||
for (CNode *pnode : vNodes) { | for (CNode *pnode : vNodes) { | ||||
// Implement the following logic: | // Implement the following logic: | ||||
// * If there is data to send, select() for sending data. As | // * If there is data to send, select() for sending data. As this | ||||
// this only happens when optimistic write failed, we choose to | // only happens when optimistic write failed, we choose to first | ||||
// first drain the write buffer in this case before receiving | // drain the write buffer in this case before receiving more. This | ||||
// more. This avoids needlessly queueing received data, if the | // avoids needlessly queueing received data, if the remote peer is | ||||
// remote peer is not themselves receiving data. This means | // not themselves receiving data. This means properly utilizing | ||||
// properly utilizing TCP flow control signalling. | // TCP flow control signalling. | ||||
// * Otherwise, if there is space left in the receive buffer, | // * Otherwise, if there is space left in the receive buffer, | ||||
// select() for receiving data. | // select() for receiving data. | ||||
// * Hand off all complete messages to the processor, to be | // * Hand off all complete messages to the processor, to be handled | ||||
// handled without blocking here. | // without blocking here. | ||||
bool select_recv = !pnode->fPauseRecv; | bool select_recv = !pnode->fPauseRecv; | ||||
bool select_send; | bool select_send; | ||||
{ | { | ||||
LOCK(pnode->cs_vSend); | LOCK(pnode->cs_vSend); | ||||
select_send = !pnode->vSendMsg.empty(); | select_send = !pnode->vSendMsg.empty(); | ||||
} | } | ||||
LOCK(pnode->cs_hSocket); | LOCK(pnode->cs_hSocket); | ||||
if (pnode->hSocket == INVALID_SOCKET) { | if (pnode->hSocket == INVALID_SOCKET) { | ||||
continue; | continue; | ||||
} | } | ||||
FD_SET(pnode->hSocket, &fdsetError); | FD_SET(pnode->hSocket, &fdsetError); | ||||
hSocketMax = std::max(hSocketMax, pnode->hSocket); | hSocketMax = std::max(hSocketMax, pnode->hSocket); | ||||
have_fds = true; | have_fds = true; | ||||
if (select_send) { | if (select_send) { | ||||
FD_SET(pnode->hSocket, &fdsetSend); | FD_SET(pnode->hSocket, &fdsetSend); | ||||
continue; | continue; | ||||
} | } | ||||
if (select_recv) { | if (select_recv) { | ||||
FD_SET(pnode->hSocket, &fdsetRecv); | FD_SET(pnode->hSocket, &fdsetRecv); | ||||
} | } | ||||
} | } | ||||
} | } | ||||
int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv, | int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv, &fdsetSend, | ||||
&fdsetSend, &fdsetError, &timeout); | &fdsetError, &timeout); | ||||
if (interruptNet) { | if (interruptNet) { | ||||
return; | return; | ||||
} | } | ||||
if (nSelect == SOCKET_ERROR) { | if (nSelect == SOCKET_ERROR) { | ||||
if (have_fds) { | if (have_fds) { | ||||
int nErr = WSAGetLastError(); | int nErr = WSAGetLastError(); | ||||
LogPrintf("socket select error %s\n", NetworkErrorString(nErr)); | LogPrintf("socket select error %s\n", NetworkErrorString(nErr)); | ||||
for (unsigned int i = 0; i <= hSocketMax; i++) { | for (unsigned int i = 0; i <= hSocketMax; i++) { | ||||
FD_SET(i, &fdsetRecv); | FD_SET(i, &fdsetRecv); | ||||
} | } | ||||
} | } | ||||
FD_ZERO(&fdsetSend); | FD_ZERO(&fdsetSend); | ||||
FD_ZERO(&fdsetError); | FD_ZERO(&fdsetError); | ||||
if (!interruptNet.sleep_for( | if (!interruptNet.sleep_for( | ||||
std::chrono::milliseconds(timeout.tv_usec / 1000))) { | std::chrono::milliseconds(timeout.tv_usec / 1000))) { | ||||
return; | return; | ||||
} | } | ||||
} | } | ||||
// | // | ||||
// Accept new connections | // Accept new connections | ||||
// | // | ||||
for (const ListenSocket &hListenSocket : vhListenSocket) { | for (const ListenSocket &hListenSocket : vhListenSocket) { | ||||
if (hListenSocket.socket != INVALID_SOCKET && | if (hListenSocket.socket != INVALID_SOCKET && | ||||
FD_ISSET(hListenSocket.socket, &fdsetRecv)) { | FD_ISSET(hListenSocket.socket, &fdsetRecv)) { | ||||
AcceptConnection(hListenSocket); | AcceptConnection(hListenSocket); | ||||
} | } | ||||
} | } | ||||
// | // | ||||
// Service each socket | // Service each socket | ||||
// | // | ||||
std::vector<CNode *> vNodesCopy; | std::vector<CNode *> vNodesCopy; | ||||
{ | { | ||||
LOCK(cs_vNodes); | LOCK(cs_vNodes); | ||||
vNodesCopy = vNodes; | vNodesCopy = vNodes; | ||||
for (CNode *pnode : vNodesCopy) { | for (CNode *pnode : vNodesCopy) { | ||||
pnode->AddRef(); | pnode->AddRef(); | ||||
} | } | ||||
} | } | ||||
for (CNode *pnode : vNodesCopy) { | for (CNode *pnode : vNodesCopy) { | ||||
if (interruptNet) { | if (interruptNet) { | ||||
return; | return; | ||||
} | } | ||||
// | // | ||||
// Receive | // Receive | ||||
// | // | ||||
bool recvSet = false; | bool recvSet = false; | ||||
bool sendSet = false; | bool sendSet = false; | ||||
bool errorSet = false; | bool errorSet = false; | ||||
{ | { | ||||
LOCK(pnode->cs_hSocket); | LOCK(pnode->cs_hSocket); | ||||
if (pnode->hSocket == INVALID_SOCKET) { | if (pnode->hSocket == INVALID_SOCKET) { | ||||
continue; | continue; | ||||
} | } | ||||
recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv); | recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv); | ||||
sendSet = FD_ISSET(pnode->hSocket, &fdsetSend); | sendSet = FD_ISSET(pnode->hSocket, &fdsetSend); | ||||
errorSet = FD_ISSET(pnode->hSocket, &fdsetError); | errorSet = FD_ISSET(pnode->hSocket, &fdsetError); | ||||
} | } | ||||
if (recvSet || errorSet) { | if (recvSet || errorSet) { | ||||
// typical socket buffer is 8K-64K | // typical socket buffer is 8K-64K | ||||
char pchBuf[0x10000]; | char pchBuf[0x10000]; | ||||
int32_t nBytes = 0; | int32_t nBytes = 0; | ||||
{ | { | ||||
LOCK(pnode->cs_hSocket); | LOCK(pnode->cs_hSocket); | ||||
if (pnode->hSocket == INVALID_SOCKET) { | if (pnode->hSocket == INVALID_SOCKET) { | ||||
continue; | continue; | ||||
} | } | ||||
nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), | nBytes = | ||||
MSG_DONTWAIT); | recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); | ||||
} | } | ||||
if (nBytes > 0) { | if (nBytes > 0) { | ||||
bool notify = false; | bool notify = false; | ||||
if (!pnode->ReceiveMsgBytes(*config, pchBuf, nBytes, | if (!pnode->ReceiveMsgBytes(*config, pchBuf, nBytes, notify)) { | ||||
notify)) { | |||||
pnode->CloseSocketDisconnect(); | pnode->CloseSocketDisconnect(); | ||||
} | } | ||||
RecordBytesRecv(nBytes); | RecordBytesRecv(nBytes); | ||||
if (notify) { | if (notify) { | ||||
size_t nSizeAdded = 0; | size_t nSizeAdded = 0; | ||||
auto it(pnode->vRecvMsg.begin()); | auto it(pnode->vRecvMsg.begin()); | ||||
for (; it != pnode->vRecvMsg.end(); ++it) { | for (; it != pnode->vRecvMsg.end(); ++it) { | ||||
if (!it->complete()) { | if (!it->complete()) { | ||||
break; | break; | ||||
} | } | ||||
nSizeAdded += | nSizeAdded += | ||||
it->vRecv.size() + CMessageHeader::HEADER_SIZE; | it->vRecv.size() + CMessageHeader::HEADER_SIZE; | ||||
} | } | ||||
{ | { | ||||
LOCK(pnode->cs_vProcessMsg); | LOCK(pnode->cs_vProcessMsg); | ||||
pnode->vProcessMsg.splice( | pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), | ||||
pnode->vProcessMsg.end(), pnode->vRecvMsg, | pnode->vRecvMsg, | ||||
pnode->vRecvMsg.begin(), it); | pnode->vRecvMsg.begin(), it); | ||||
pnode->nProcessQueueSize += nSizeAdded; | pnode->nProcessQueueSize += nSizeAdded; | ||||
pnode->fPauseRecv = | pnode->fPauseRecv = | ||||
pnode->nProcessQueueSize > nReceiveFloodSize; | pnode->nProcessQueueSize > nReceiveFloodSize; | ||||
} | } | ||||
WakeMessageHandler(); | WakeMessageHandler(); | ||||
} | } | ||||
} else if (nBytes == 0) { | } else if (nBytes == 0) { | ||||
// socket closed gracefully | // socket closed gracefully | ||||
if (!pnode->fDisconnect) { | if (!pnode->fDisconnect) { | ||||
LogPrint(BCLog::NET, "socket closed\n"); | LogPrint(BCLog::NET, "socket closed\n"); | ||||
} | } | ||||
pnode->CloseSocketDisconnect(); | pnode->CloseSocketDisconnect(); | ||||
} else if (nBytes < 0) { | } else if (nBytes < 0) { | ||||
// error | // error | ||||
int nErr = WSAGetLastError(); | int nErr = WSAGetLastError(); | ||||
if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && | if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && | ||||
nErr != WSAEINTR && nErr != WSAEINPROGRESS) { | nErr != WSAEINTR && nErr != WSAEINPROGRESS) { | ||||
if (!pnode->fDisconnect) { | if (!pnode->fDisconnect) { | ||||
LogPrintf("socket recv error %s\n", | LogPrintf("socket recv error %s\n", | ||||
NetworkErrorString(nErr)); | NetworkErrorString(nErr)); | ||||
} | } | ||||
pnode->CloseSocketDisconnect(); | pnode->CloseSocketDisconnect(); | ||||
} | } | ||||
} | } | ||||
} | } | ||||
// | // | ||||
// Send | // Send | ||||
// | // | ||||
if (sendSet) { | if (sendSet) { | ||||
LOCK(pnode->cs_vSend); | LOCK(pnode->cs_vSend); | ||||
size_t nBytes = SocketSendData(pnode); | size_t nBytes = SocketSendData(pnode); | ||||
if (nBytes) { | if (nBytes) { | ||||
RecordBytesSent(nBytes); | RecordBytesSent(nBytes); | ||||
} | } | ||||
} | } | ||||
InactivityCheck(pnode); | InactivityCheck(pnode); | ||||
} | } | ||||
{ | { | ||||
LOCK(cs_vNodes); | LOCK(cs_vNodes); | ||||
for (CNode *pnode : vNodesCopy) { | for (CNode *pnode : vNodesCopy) { | ||||
pnode->Release(); | pnode->Release(); | ||||
} | } | ||||
} | } | ||||
} | } | ||||
void CConnman::ThreadSocketHandler() { | |||||
while (!interruptNet) { | |||||
DisconnectNodes(); | |||||
NotifyNumConnectionsChanged(); | |||||
SocketHandler(); | |||||
} | |||||
} | } | ||||
void CConnman::WakeMessageHandler() { | void CConnman::WakeMessageHandler() { | ||||
{ | { | ||||
std::lock_guard<std::mutex> lock(mutexMsgProc); | std::lock_guard<std::mutex> lock(mutexMsgProc); | ||||
fMsgProcWake = true; | fMsgProcWake = true; | ||||
} | } | ||||
condMsgProc.notify_one(); | condMsgProc.notify_one(); | ||||
▲ Show 20 Lines • Show All 1,561 Lines • Show Last 20 Lines |
NIt: restore capitalization.