Commit f380b354 authored by François Grisez's avatar François Grisez

Rewrite ThreadPool class

parent 1a439dcf
Pipeline #6093 passed with stages
in 16 minutes and 10 seconds
......@@ -329,7 +329,7 @@ void SociAuthDB::getPasswordFromBackend(const string &id, const string &domain,
// create a thread to grab a pool connection and use it to retrieve the auth information
auto func = bind(&SociAuthDB::getPasswordWithPool, this, id, domain, authid, listener, listener_ref);
bool success = thread_pool->Enqueue(func);
bool success = thread_pool->run(func);
if (success == FALSE) {
// Enqueue() can fail when the queue is full, so we have to act on that
SLOGE << "[SOCI] Auth queue is full, cannot fullfil password request for " << id << " / " << domain << " / "
......@@ -343,7 +343,7 @@ void SociAuthDB::getUserWithPhoneFromBackend(const string &phone, const string &
// create a thread to grab a pool connection and use it to retrieve the auth information
auto func = bind(&SociAuthDB::getUserWithPhoneWithPool, this, phone, domain, listener);
bool success = thread_pool->Enqueue(func);
bool success = thread_pool->run(func);
if (success == FALSE) {
// Enqueue() can fail when the queue is full, so we have to act on that
SLOGE << "[SOCI] Auth queue is full, cannot fullfil user request for " << phone;
......@@ -356,7 +356,7 @@ void SociAuthDB::getUsersWithPhonesFromBackend(list<tuple<string, string, AuthDb
// create a thread to grab a pool connection and use it to retrieve the auth information
auto func = bind(&SociAuthDB::getUsersWithPhonesWithPool, this, creds);
bool success = thread_pool->Enqueue(func);
bool success = thread_pool->run(func);
if (success == FALSE) {
// Enqueue() can fail when the queue is full, so we have to act on that
SLOGE << "[SOCI] Auth queue is full, cannot fullfil user request for " << &creds;
......
......@@ -881,7 +881,7 @@ void DataBaseEventLogWriter::write(const std::shared_ptr<EventLog> &evlog) {
mMutex.unlock();
// Save event in database.
if (!mThreadPool->Enqueue(bind(&DataBaseEventLogWriter::writeEventFromQueue, this))) {
if (!mThreadPool->run(bind(&DataBaseEventLogWriter::writeEventFromQueue, this))) {
LOGE("DataBaseEventLogWriter: unable to enqueue event!");
}
} else {
......
......@@ -266,7 +266,7 @@ class DoSProtection : public Module, ModuleToolbox {
string ip = ctx->ip;
string port = ctx->port;
mThreadPool->Enqueue([&, protocol, ip, port] {
mThreadPool->run([&, protocol, ip, port] {
char iptables_cmd[512];
bool is_ipv6 = strchr(ip.c_str(), ':') != nullptr;
snprintf(iptables_cmd, sizeof(iptables_cmd), "%s -D %s -p %s -s %s -m multiport --sports %s -j REJECT",
......@@ -344,7 +344,7 @@ class DoSProtection : public Module, ModuleToolbox {
LOGW("Packet count rate (%f) >= limit (%i), blocking ip/port %s/%s on protocol udp for %i minutes",
dosContext.packet_count_rate, mPacketRateLimit, ip, port, mBanTime);
if (!isIpWhiteListed(ip)) {
mThreadPool->Enqueue([&, ip, port] { banIP(ip, port, "udp"); });
mThreadPool->run([&, ip, port] { banIP(ip, port, "udp"); });
createBanContextAndPostInFuture(ip, port, "udp");
ev->terminateProcessing(); // the event is discarded
} else {
......@@ -368,7 +368,7 @@ class DoSProtection : public Module, ModuleToolbox {
LOGW("Packet count rate (%lu) >= limit (%i), blocking ip/port %s/%s on protocol tcp for %i minutes",
packet_count_rate, mPacketRateLimit, ip, port, mBanTime);
if (!isIpWhiteListed(ip)) {
mThreadPool->Enqueue([&, ip, port] { banIP(ip, port, "tcp"); });
mThreadPool->run([&, ip, port] { banIP(ip, port, "tcp"); });
createBanContextAndPostInFuture(ip, port, "tcp");
ev->terminateProcessing(); // the event is discarded
} else {
......
......@@ -44,7 +44,7 @@ ExternalListSubscription::ExternalListSubscription(
// create a thread to grab a pool connection and use it to retrieve the auth information
auto func = bind(&ExternalListSubscription::getUsersList, this, sqlRequest, ist);
bool success = threadPool->Enqueue(func);
bool success = threadPool->run(func);
if (!success) // Enqueue() can fail when the queue is full, so we have to act on that
SLOGE << "[SOCI] Auth queue is full, cannot fullfil user request for list subscription";
}
......
......@@ -16,113 +16,103 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "flexisip/logmanager.hh"
#include "threadpool.hh"
#include <flexisip/logmanager.hh>
using namespace std;
// Constructor.
ThreadPool::ThreadPool(unsigned int threads, unsigned int max_queue_size)
: max_queue_size(max_queue_size), terminate(false), stopped(false) {
SLOGD << "[POOL] Init with " << threads << " threads and queue size " << max_queue_size;
namespace flexisip {
ThreadPool::ThreadPool(unsigned int nThreads, unsigned int maxQueueSize) : mMaxQueueSize(maxQueueSize) {
SLOGD << "ThreadPool [" << this << "]: init with " << nThreads << " threads and queue size " << maxQueueSize;
// Create number of required threads and add them to the thread pool vector.
for (unsigned int i = 0; i < threads; i++) {
threadPool.emplace_back(thread(&ThreadPool::Invoke, this));
for (unsigned int i = 0; i < nThreads; i++) {
mThreadPool.emplace_back(&ThreadPool::_run, this);
}
}
bool ThreadPool::Enqueue(function<void()> f) {
ThreadPool::~ThreadPool() {
if (mState != Stopped) stop();
}
bool ThreadPool::run(Task t) {
bool enqueued = false;
// Scope based locking.
{
// Put unique lock on task mutex.
unique_lock<mutex> lock(tasksMutex);
unique_lock<mutex> lock(mTasksMutex);
// Push task into queue.
if (tasks.size() < max_queue_size) {
//SLOGE << "[POOL] Enqueue(" << tasks.size() << ")";
tasks.push(f);
if (mTasks.size() < mMaxQueueSize) {
mTasks.push(t);
enqueued = true;
}
}
// Wake up one thread if the task was successfully queued
if (enqueued)
condition.notify_one();
mCondition.notify_one();
return enqueued;
}
bool ThreadPool::conditionCheck() const {
return !tasks.empty() || terminate;
}
void ThreadPool::stop() {
SLOGD << "ThreadPool [" << this << "]: shutdown";
// Scope based locking.
{
// Put unique lock on task mutex.
unique_lock<mutex> lock(mTasksMutex);
void ThreadPool::Invoke() {
// Set termination flag to true.
mState = Shutdown;
}
// Wake up all threads.
mCondition.notify_all();
function<void()> task;
// Join all threads.
for (auto &thread : mThreadPool) {
thread.join();
}
// Empty workers vector.
mThreadPool.empty();
// Indicate that the pool has been shut down.
mState = Stopped;
}
void ThreadPool::_run() {
Task task;
while (true) {
// Scope based locking.
{
// Put unique lock on task mutex.
unique_lock<mutex> lock(tasksMutex);
auto predicate = std::bind(&ThreadPool::conditionCheck, this);
unique_lock<mutex> lock(mTasksMutex);
// Wait until queue is not empty or termination signal is sent.
condition.wait(lock, predicate);
ThreadPool *thiz = this;
mCondition.wait(lock, [thiz](){return !thiz->mTasks.empty() || thiz->mState == Shutdown;});
// If termination signal received and queue is empty then exit else continue clearing the queue.
if (terminate && tasks.empty()) {
SLOGD << "[POOL] Terminate thread";
if (mState == Shutdown && mTasks.empty()) {
SLOGD << "ThreadPool [" << this << "]: terminate thread";
return;
}
// Get next task in the queue.
task = tasks.front();
task = mTasks.front();
// Remove it from the queue.
tasks.pop();
//SLOGE << "[POOL] Pop task " << tasks.size();
mTasks.pop();
}
// Execute the task.
//SLOGE << "[POOL] Task()";
task();
}
}
void ThreadPool::ShutDown() {
SLOGD << "[POOL] Shutdown";
// Scope based locking.
{
// Put unique lock on task mutex.
unique_lock<mutex> lock(tasksMutex);
// Set termination flag to true.
terminate = true;
}
// Wake up all threads.
condition.notify_all();
// Join all threads.
for (auto it = threadPool.begin(); it != threadPool.end(); ++it) {
it->join();
}
// Empty workers vector.
threadPool.empty();
// Indicate that the pool has been shut down.
stopped = true;
}
// Destructor.
ThreadPool::~ThreadPool() {
if (!stopped) {
ShutDown();
}
}
} // namespace flexisip
/*
Flexisip, a flexible SIP proxy server with media capabilities.
Copyright (C) 2016 Belledonne Communications SARL.
Copyright (C) 2010 Belledonne Communications SARL.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
......@@ -16,60 +16,70 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// From http://alexagafonov.com/2015/05/05/thread-pool-implementation-in-c-11/ and modified for queue size handling
#pragma once
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <functional>
#include <unistd.h>
class ThreadPool {
public:
// Constructor.
ThreadPool(unsigned int threads, unsigned int max_queue_size);
// Destructor.
~ThreadPool();
// Adds task to a task queue.
bool Enqueue(std::function<void()> f);
// set pool size (only allowed if not yet populated)
void setPoolSize(int threads);
// Shut down the pool.
void ShutDown();
private:
// Thread pool storage.
std::vector<std::thread> threadPool;
// Queue to keep track of incoming tasks.
std::queue<std::function<void()>> tasks;
// Task queue mutex.
std::mutex tasksMutex;
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
// Condition variable.
std::condition_variable condition;
// Maximum amount of tasks to be enqueued
unsigned int max_queue_size;
namespace flexisip {
// Indicates that pool needs to be shut down.
bool terminate;
/**
* Provide a pool of threads for executing custom tasks.
*/
class ThreadPool {
public:
using Task = std::function<void()>;
/**
* Constructor.
* @param[in] nThreads number of threads in the pool.
* @param[in] maxQueueSize maximum number of task that can be queued.
*/
ThreadPool(unsigned int nThreads, unsigned int maxQueueSize);
~ThreadPool();
// Indicates that pool has been terminated.
bool stopped;
/**
* Assign a task to a thread for execution. If no thread is available
* while this method is called, then the task is queued until a thread
* has completed its task.
* @param[in] t the task to run.
* @return True on success or false when the queue is full.
*/
bool run(Task t);
/**
* Stop all the threads.
* After calling this method, no more task will be
* executed and the threads cannot be started again.
*/
void stop();
private:
enum State {
Running,
Shutdown,
Stopped
};
/**
* This methed is called by each thread.
*/
void _run();
std::vector<std::thread> mThreadPool;
std::queue<Task> mTasks;
std::mutex mTasksMutex;
std::condition_variable mCondition;
unsigned mMaxQueueSize;
State mState = Running;
};
// Function that will be invoked by our threads.
void Invoke();
bool conditionCheck() const;
};
} // namespace flexisip
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment