Commit 72aed358 authored by François Grisez's avatar François Grisez

Allow Soci AuthDB backend to reconnect when the database was down on Flexisip starting

parents c866a895 76c1679d
Pipeline #6272 passed with stages
in 22 minutes and 34 seconds
......@@ -101,8 +101,7 @@ void SociAuthDB::declareConfig(GenericStruct *mc) {
mc->addChildrenValues(items);
}
SociAuthDB::SociAuthDB() : conn_pool(NULL) {
SociAuthDB::SociAuthDB() {
GenericStruct *cr = GenericManager::get()->getRoot();
GenericStruct *ma = cr->get<GenericStruct>("module::Authentication");
GenericStruct *mp = cr->get<GenericStruct>("module::Presence");
......@@ -117,25 +116,37 @@ SociAuthDB::SociAuthDB() : conn_pool(NULL) {
hashed_passwd = ma->get<ConfigBoolean>("hashed-passwords")->read();
check_domain_in_presence_results = mp->get<ConfigBoolean>("check-domain-in-presence-results")->read();
conn_pool = new connection_pool(poolSize);
thread_pool = new ThreadPool(poolSize, max_queue_size);
conn_pool.reset(new connection_pool(poolSize));
thread_pool.reset(new ThreadPool(poolSize, max_queue_size));
LOGD("[SOCI] Authentication provider for backend %s created. Pooled for %d connections", backend.c_str(), (int)poolSize);
LOGD("[SOCI] Authentication provider for backend %s created. Pooled for %zu connections", backend.c_str(), poolSize);
connectDatabase();
}
void SociAuthDB::connectDatabase() {
SLOGD << "[SOCI] Connecting to database (" << poolSize << " pooled connections)";
try {
for (size_t i = 0; i < poolSize; i++) {
conn_pool->at(i).open(backend, connection_string);
}
} catch (soci::mysql_soci_error const & e) {
_connected = true;
} catch (const soci::mysql_soci_error &e) {
SLOGE << "[SOCI] connection pool open MySQL error: " << e.err_num_ << " " << e.what() << endl;
} catch (exception const &e) {
closeOpenedSessions();
} catch (const runtime_error &e) { // std::runtime_error includes all soci exceptions
SLOGE << "[SOCI] connection pool open error: " << e.what() << endl;
closeOpenedSessions();
}
}
SociAuthDB::~SociAuthDB() {
delete thread_pool; // will automatically shut it down, clearing threads
delete conn_pool;
void SociAuthDB::closeOpenedSessions() {
for (size_t i = 0; i < poolSize; i++) {
soci::session &conn = conn_pool->at(i);
if (conn.get_backend()) { // if the session is open
conn.close();
}
}
_connected = false;
}
void SociAuthDB::getPasswordWithPool(const string &id, const string &domain,
......@@ -326,11 +337,17 @@ void SociAuthDB::notifyAllListeners(std::list<std::tuple<std::string, std::strin
void SociAuthDB::getPasswordFromBackend(const string &id, const string &domain,
const string &authid, AuthDbListener *listener, AuthDbListener *listener_ref) {
if (!_connected) connectDatabase();
if (!_connected) {
if (listener) listener->onResult(AUTH_ERROR , "");
return;
}
// create a thread to grab a pool connection and use it to retrieve the auth information
auto func = bind(&SociAuthDB::getPasswordWithPool, this, id, domain, authid, listener, listener_ref);
bool success = thread_pool->run(func);
if (success == FALSE) {
if (!success) {
// Enqueue() can fail when the queue is full, so we have to act on that
SLOGE << "[SOCI] Auth queue is full, cannot fullfil password request for " << id << " / " << domain << " / "
<< authid;
......@@ -339,6 +356,11 @@ void SociAuthDB::getPasswordFromBackend(const string &id, const string &domain,
}
void SociAuthDB::getUserWithPhoneFromBackend(const string &phone, const string &domain, AuthDbListener *listener) {
if (!_connected) connectDatabase();
if (!_connected) {
if (listener) listener->onResult(AUTH_ERROR , "");
return;
}
// create a thread to grab a pool connection and use it to retrieve the auth information
auto func = bind(&SociAuthDB::getUserWithPhoneWithPool, this, phone, domain, listener);
......@@ -352,6 +374,14 @@ void SociAuthDB::getUserWithPhoneFromBackend(const string &phone, const string &
}
void SociAuthDB::getUsersWithPhonesFromBackend(list<tuple<string, string, AuthDbListener*>> &creds) {
if (!_connected) connectDatabase();
if (!_connected) {
for (const auto &cred : creds) {
AuthDbListener *listener = std::get<2>(cred);
if (listener) listener->onResult(AUTH_ERROR, "");
}
return;
}
// create a thread to grab a pool connection and use it to retrieve the auth information
auto func = bind(&SociAuthDB::getUsersWithPhonesWithPool, this, creds);
......
......@@ -292,19 +292,20 @@ public:
namespace flexisip {
class SociAuthDB : public AuthDbBackend {
virtual ~SociAuthDB();
public:
SociAuthDB();
void setConnectionParameters(const std::string &domain, const std::string &request);
virtual void getUserWithPhoneFromBackend(const std::string & , const std::string &, AuthDbListener *listener);
virtual void getUsersWithPhonesFromBackend(std::list<std::tuple<std::string,std::string,AuthDbListener*>> &creds);
virtual void getPasswordFromBackend(const std::string &id, const std::string &domain,
const std::string &authid, AuthDbListener *listener, AuthDbListener *listener_ref);
void getUserWithPhoneFromBackend(const std::string & , const std::string &, AuthDbListener *listener) override;
void getUsersWithPhonesFromBackend(std::list<std::tuple<std::string,std::string,AuthDbListener*>> &creds) override;
void getPasswordFromBackend(const std::string &id, const std::string &domain,
const std::string &authid, AuthDbListener *listener, AuthDbListener *listener_ref) override;
static void declareConfig(GenericStruct *mc);
private:
SociAuthDB();
void connectDatabase();
void closeOpenedSessions();
void getUserWithPhoneWithPool(const std::string &phone, const std::string &domain, AuthDbListener *listener);
void getUsersWithPhonesWithPool(std::list<std::tuple<std::string,std::string,AuthDbListener*>> &creds);
void getPasswordWithPool(const std::string &id, const std::string &domain,
......@@ -313,9 +314,9 @@ private:
void notifyAllListeners(std::list<std::tuple<std::string, std::string, AuthDbListener *>> &creds, const std::set<std::pair<std::string, std::string>> &presences);
size_t poolSize;
soci::connection_pool *conn_pool;
ThreadPool *thread_pool;
std::size_t poolSize;
std::unique_ptr<soci::connection_pool> conn_pool;
std::unique_ptr<ThreadPool> thread_pool;
std::string connection_string;
std::string backend;
std::string get_password_request;
......@@ -324,6 +325,9 @@ private:
std::string get_password_algo_request;
bool check_domain_in_presence_results = false;
bool hashed_passwd;
bool _connected = false;
friend AuthDbBackend;
};
}
......
......@@ -22,6 +22,55 @@
namespace flexisip{
void SociHelper::execute(const std::function<void (soci::session &)> &requestLambda) {
std::chrono::steady_clock::time_point start;
std::chrono::steady_clock::time_point stop;
std::unique_ptr<soci::session> sql;
int errorCount = 0;
bool retry;
bool good = false;
do {
retry = false;
try {
// will grab a connection from the pool. This is thread safe.
start = std::chrono::steady_clock::now();
sql.reset(new soci::session(mPool));
stop = std::chrono::steady_clock::now();
LOGD("[SOCI] Session acquired from pool in %lu ms", durationMs(start, stop));
start = stop;
requestLambda(*sql);
stop = std::chrono::steady_clock::now();
LOGD("[SOCI] statement successfully executed in %lu ms", durationMs(start, stop));
good = true;
} catch (const std::runtime_error &e) { // soci::mysql_soci_error is a subclass of std::runtime_error
errorCount++;
stop = std::chrono::steady_clock::now();
const auto *sqlErr = dynamic_cast<const soci::mysql_soci_error *>(&e);
std::ostringstream os;
os << "[SOCI] " << (sqlErr ? "MySQL " : "") << "error after " << durationMs(start, stop) << "ms: ";
if (sqlErr) os << sqlErr->err_num_ << " ";
os << e.what();
LOGE("%s", os.str().c_str());
if (sql) reconnectSession(*sql);
if (sqlErr && (sqlErr->err_num_ == 2014 || sqlErr->err_num_ == 2006) && errorCount == 1) {
/* 2014 is the infamous "Commands out of sync; you can't run this command now" mysql error,
* which is retryable.
* At this time we don't know if it is a soci or mysql bug, or bug with the sql request being executed.
*
* 2006 is "MySQL server has gone away" which is also retryable.
*/
SLOGE << "[SOCI] retryable mysql error [" << sqlErr->err_num_ << "], so trying statement execution again...";
retry = true;
}
}
} while (retry);
if (!good) throw DatabaseException();
}
void SociHelper::reconnectSession(soci::session &session) {
try {
SLOGE << "[SOCI] Trying close/reconnect session";
......
......@@ -35,115 +35,17 @@ namespace flexisip{
*/
class SociHelper{
public:
class DatabaseException : std::exception{
virtual const char* what() const noexcept override{
return "Database failure"; //The great thing about exception.
}
class DatabaseException : public std::runtime_error {
public:
DatabaseException() : std::runtime_error("Database failure") {}
};
// Initialize the SociHelper by giving the connection pool.
SociHelper(soci::connection_pool &pool) : mPool(pool){};
// Execute the database query safely. The code to execute the query shall be provided in the lambda argument.
#if 0
template <typename _lambda>
soci::rowset<soci::row> execute(_lambda requestLambda){
std::chrono::steady_clock::time_point start;
std::chrono::steady_clock::time_point stop;
soci::session *sql = nullptr;
int errorCount = 0;
bool retry;
do{
retry = false;
try{
start = std::chrono::steady_clock::now();
// will grab a connection from the pool. This is thread safe.
sql = new soci::session(mPool);
stop = std::chrono::steady_clock::now();
LOGD("[SOCI] Session acquired from pool in %lu ms", durationMs(start, stop));
start = stop;
auto ret = requestLambda(*sql);
stop = std::chrono::steady_clock::now();
LOGD("[SOCI] statement successfully executed in %lu ms", durationMs(start, stop));
if (sql) delete sql;
return ret;
} catch (soci::mysql_soci_error const &e) {
errorCount++;
stop = std::chrono::steady_clock::now();
SLOGE << "[SOCI] MySQL error after " << durationMs(start, stop) << " ms : " << e.err_num_ << " " << e.what();
if (sql) reconnectSession(*sql);
if ((e.err_num_ == 2014 || e.err_num_ == 2006) && errorCount == 1){
/* 2014 is the infamous "Commands out of sync; you can't run this command now" mysql error,
* which is retryable.
* At this time we don't know if it is a soci or mysql bug, or bug with the sql request being executed.
*
* 2006 is "MySQL server has gone away" which is also retryable.
*/
SLOGE << "[SOCI] retryable mysql error ["<< e.err_num_<<"], so trying statement execution again...";
retry = true;
}
} catch (const std::runtime_error &e) {
errorCount++;
stop = std::chrono::steady_clock::now();
SLOGE << "[SOCI] error after " << durationMs(start, stop) << " ms : " << e.what();
if (sql) reconnectSession(*sql);
}
} while (retry);
if (sql) delete sql;
throw DatabaseException();
}
#endif
// Variant of the previous method for the case where no rowset is needed as return value.
// Probably it is possible to merge the two methods thanks std::enable_if (TODO later).
template <typename _lambda>
void execute(_lambda requestLambda){
std::chrono::steady_clock::time_point start;
std::chrono::steady_clock::time_point stop;
soci::session *sql = nullptr;
int errorCount = 0;
bool retry;
bool good = false;
do{
retry = false;
try{
// will grab a connection from the pool. This is thread safe.
start = std::chrono::steady_clock::now();
sql = new soci::session(mPool);
stop = std::chrono::steady_clock::now();
LOGD("[SOCI] Session acquired from pool in %lu ms", durationMs(start, stop));
start = stop;
requestLambda(*sql);
stop = std::chrono::steady_clock::now();
LOGD("[SOCI] statement successfully executed in %lu ms", durationMs(start, stop));
good = true;
} catch (soci::mysql_soci_error const &e) {
errorCount++;
stop = std::chrono::steady_clock::now();
SLOGE << "[SOCI] MySQL error after " << durationMs(start, stop) << " ms : " << e.err_num_ << " " << e.what();
if (sql) reconnectSession(*sql);
void execute(const std::function<void (soci::session &)> &requestLambda);
if ((e.err_num_ == 2014 || e.err_num_ == 2006) && errorCount == 1){
/* 2014 is the infamous "Commands out of sync; you can't run this command now" mysql error,
* which is retryable.
* At this time we don't know if it is a soci or mysql bug, or bug with the sql request being executed.
*
* 2006 is "MySQL server has gone away" which is also retryable.
*/
SLOGE << "[SOCI] retryable mysql error ["<< e.err_num_<<"], so trying statement execution again...";
retry = true;
}
} catch (const std::runtime_error &e) {
errorCount++;
stop = std::chrono::steady_clock::now();
SLOGE << "[SOCI] error after " << durationMs(start, stop) << " ms : " << e.what();
if (sql) reconnectSession(*sql);
}
if (sql) delete sql;
} while (retry);
if (!good) throw DatabaseException();
}
private:
void reconnectSession(soci::session &session);
unsigned long durationMs(std::chrono::steady_clock::time_point start, std::chrono::steady_clock::time_point stop){
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment