Changeset View
Changeset View
Standalone View
Standalone View
src/httpserver.cpp
Show First 20 Lines • Show All 70 Lines • ▼ Show 20 Lines | |||||
template <typename WorkItem> class WorkQueue { | template <typename WorkItem> class WorkQueue { | ||||
private: | private: | ||||
/** Mutex protects entire object */ | /** Mutex protects entire object */ | ||||
CWaitableCriticalSection cs; | CWaitableCriticalSection cs; | ||||
std::condition_variable cond; | std::condition_variable cond; | ||||
std::deque<std::unique_ptr<WorkItem>> queue; | std::deque<std::unique_ptr<WorkItem>> queue; | ||||
bool running; | bool running; | ||||
size_t maxDepth; | size_t maxDepth; | ||||
int numThreads; | |||||
/** RAII object to keep track of number of running worker threads */ | |||||
class ThreadCounter { | |||||
public: | public: | ||||
WorkQueue &wq; | explicit WorkQueue(size_t _maxDepth) : running(true), maxDepth(_maxDepth) {} | ||||
explicit ThreadCounter(WorkQueue &w) : wq(w) { | /** | ||||
std::lock_guard<std::mutex> lock(wq.cs); | * Precondition: worker threads have all stopped (they have all been joined) | ||||
wq.numThreads += 1; | */ | ||||
} | |||||
~ThreadCounter() { | |||||
std::lock_guard<std::mutex> lock(wq.cs); | |||||
wq.numThreads -= 1; | |||||
wq.cond.notify_all(); | |||||
} | |||||
}; | |||||
public: | |||||
explicit WorkQueue(size_t _maxDepth) | |||||
: running(true), maxDepth(_maxDepth), numThreads(0) {} | |||||
/** Precondition: worker threads have all stopped (call WaitExit) */ | |||||
~WorkQueue() {} | ~WorkQueue() {} | ||||
/** Enqueue a work item */ | /** Enqueue a work item */ | ||||
bool Enqueue(WorkItem *item) { | bool Enqueue(WorkItem *item) { | ||||
LOCK(cs); | LOCK(cs); | ||||
if (queue.size() >= maxDepth) { | if (queue.size() >= maxDepth) { | ||||
return false; | return false; | ||||
} | } | ||||
queue.emplace_back(std::unique_ptr<WorkItem>(item)); | queue.emplace_back(std::unique_ptr<WorkItem>(item)); | ||||
cond.notify_one(); | cond.notify_one(); | ||||
return true; | return true; | ||||
} | } | ||||
/** Thread function */ | /** Thread function */ | ||||
void Run() { | void Run() { | ||||
ThreadCounter count(*this); | |||||
while (true) { | while (true) { | ||||
std::unique_ptr<WorkItem> i; | std::unique_ptr<WorkItem> i; | ||||
{ | { | ||||
WAIT_LOCK(cs, lock); | WAIT_LOCK(cs, lock); | ||||
while (running && queue.empty()) | while (running && queue.empty()) | ||||
cond.wait(lock); | cond.wait(lock); | ||||
if (!running) break; | if (!running) break; | ||||
i = std::move(queue.front()); | i = std::move(queue.front()); | ||||
queue.pop_front(); | queue.pop_front(); | ||||
} | } | ||||
(*i)(); | (*i)(); | ||||
} | } | ||||
} | } | ||||
/** Interrupt and exit loops */ | /** Interrupt and exit loops */ | ||||
void Interrupt() { | void Interrupt() { | ||||
LOCK(cs); | LOCK(cs); | ||||
running = false; | running = false; | ||||
cond.notify_all(); | cond.notify_all(); | ||||
} | } | ||||
/** Wait for worker threads to exit */ | |||||
void WaitExit() { | |||||
std::unique_lock<std::mutex> lock(cs); | |||||
while (numThreads > 0) | |||||
cond.wait(lock); | |||||
} | |||||
}; | }; | ||||
struct HTTPPathHandler { | struct HTTPPathHandler { | ||||
HTTPPathHandler() {} | HTTPPathHandler() {} | ||||
HTTPPathHandler(std::string _prefix, bool _exactMatch, | HTTPPathHandler(std::string _prefix, bool _exactMatch, | ||||
HTTPRequestHandler _handler) | HTTPRequestHandler _handler) | ||||
: prefix(_prefix), exactMatch(_exactMatch), handler(_handler) {} | : prefix(_prefix), exactMatch(_exactMatch), handler(_handler) {} | ||||
std::string prefix; | std::string prefix; | ||||
▲ Show 20 Lines • Show All 291 Lines • ▼ Show 20 Lines | #endif | ||||
workQueue = new WorkQueue<HTTPClosure>(workQueueDepth); | workQueue = new WorkQueue<HTTPClosure>(workQueueDepth); | ||||
eventBase = base; | eventBase = base; | ||||
eventHTTP = http; | eventHTTP = http; | ||||
return true; | return true; | ||||
} | } | ||||
std::thread threadHTTP; | std::thread threadHTTP; | ||||
std::future<bool> threadResult; | std::future<bool> threadResult; | ||||
static std::vector<std::thread> g_thread_http_workers; | |||||
bool StartHTTPServer() { | bool StartHTTPServer() { | ||||
LogPrint(BCLog::HTTP, "Starting HTTP server\n"); | LogPrint(BCLog::HTTP, "Starting HTTP server\n"); | ||||
int rpcThreads = | int rpcThreads = | ||||
std::max((long)gArgs.GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L); | std::max((long)gArgs.GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L); | ||||
LogPrintf("HTTP: starting %d worker threads\n", rpcThreads); | LogPrintf("HTTP: starting %d worker threads\n", rpcThreads); | ||||
std::packaged_task<bool(event_base *, evhttp *)> task(ThreadHTTP); | std::packaged_task<bool(event_base *, evhttp *)> task(ThreadHTTP); | ||||
threadResult = task.get_future(); | threadResult = task.get_future(); | ||||
threadHTTP = std::thread(std::move(task), eventBase, eventHTTP); | threadHTTP = std::thread(std::move(task), eventBase, eventHTTP); | ||||
for (int i = 0; i < rpcThreads; i++) { | for (int i = 0; i < rpcThreads; i++) { | ||||
std::thread rpc_worker(HTTPWorkQueueRun, workQueue); | g_thread_http_workers.emplace_back(HTTPWorkQueueRun, workQueue); | ||||
rpc_worker.detach(); | |||||
} | } | ||||
return true; | return true; | ||||
} | } | ||||
void InterruptHTTPServer() { | void InterruptHTTPServer() { | ||||
LogPrint(BCLog::HTTP, "Interrupting HTTP server\n"); | LogPrint(BCLog::HTTP, "Interrupting HTTP server\n"); | ||||
if (eventHTTP) { | if (eventHTTP) { | ||||
// Unlisten sockets | // Unlisten sockets | ||||
for (evhttp_bound_socket *socket : boundSockets) { | for (evhttp_bound_socket *socket : boundSockets) { | ||||
evhttp_del_accept_socket(eventHTTP, socket); | evhttp_del_accept_socket(eventHTTP, socket); | ||||
} | } | ||||
// Reject requests on current connections | // Reject requests on current connections | ||||
evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr); | evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr); | ||||
} | } | ||||
if (workQueue) workQueue->Interrupt(); | if (workQueue) workQueue->Interrupt(); | ||||
} | } | ||||
void StopHTTPServer() { | void StopHTTPServer() { | ||||
LogPrint(BCLog::HTTP, "Stopping HTTP server\n"); | LogPrint(BCLog::HTTP, "Stopping HTTP server\n"); | ||||
if (workQueue) { | if (workQueue) { | ||||
LogPrint(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n"); | LogPrint(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n"); | ||||
workQueue->WaitExit(); | for (auto &thread : g_thread_http_workers) { | ||||
thread.join(); | |||||
} | |||||
g_thread_http_workers.clear(); | |||||
delete workQueue; | delete workQueue; | ||||
} | } | ||||
if (eventBase) { | if (eventBase) { | ||||
LogPrint(BCLog::HTTP, "Waiting for HTTP event thread to exit\n"); | LogPrint(BCLog::HTTP, "Waiting for HTTP event thread to exit\n"); | ||||
// Give event loop a few seconds to exit (to send back last RPC | // Give event loop a few seconds to exit (to send back last RPC | ||||
// responses), then break it. Before this was solved with | // responses), then break it. Before this was solved with | ||||
// event_base_loopexit, but that didn't work as expected in at least | // event_base_loopexit, but that didn't work as expected in at least | ||||
// libevent 2.0.21 and always introduced a delay. In libevent master | // libevent 2.0.21 and always introduced a delay. In libevent master | ||||
▲ Show 20 Lines • Show All 176 Lines • Show Last 20 Lines |