authdb-soci.cc 14.2 KB
Newer Older
1
/*
2 3
	Flexisip, a flexible SIP proxy server with media capabilities.
	Copyright (C) 2010-2015  Belledonne Communications SARL, All rights reserved.
4

5 6 7 8
	This program is free software: you can redistribute it and/or modify
	it under the terms of the GNU Affero General Public License as
	published by the Free Software Foundation, either version 3 of the
	License, or (at your option) any later version.
9

10 11 12 13
	This program is distributed in the hope that it will be useful,
	but WITHOUT ANY WARRANTY; without even the implied warranty of
	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
	GNU Affero General Public License for more details.
14

15 16
	You should have received a copy of the GNU Affero General Public License
	along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 18 19
*/

#include "authdb.hh"
20
#include "mysql/soci-mysql.h"
21 22 23
#include <thread>

using namespace soci;
24 25 26

// The dreaded chrono::steady_clock which is not supported for gcc < 4.7
#include <chrono>
27
using namespace std;
28
using namespace chrono;
29 30 31 32 33
#ifdef USE_MONOTONIC_CLOCK
namespace std {
typedef monotonic_clock steady_clock;
}
#endif
34

35 36
void SociAuthDB::declareConfig(GenericStruct *mc) {
	// ODBC-specific configuration keys
37 38 39 40 41 42 43 44
	ConfigItemDescriptor items[] = {

		{String, "soci-password-request",
		 "Soci SQL request to execute to obtain the password.\n"
		 "Named parameters are:\n -':id' : the user found in the from header,\n -':domain' : the authorization realm, "
		 "and\n -':authid' : the authorization username.\n"
		 "The use of the :id parameter is mandatory.",
		 "select password from accounts where id = :id and domain = :domain and authid=:authid"},
45 46 47
		{String, "soci-user-with-phone-request",
		 "Soci SQL request to execute to obtain the username associated with a phone alias.\n"
		 "Named parameters are:\n -':phone' : the phone number to search for.\n"
48 49 50
		 "The use of the :phone parameter is mandatory.\n"
		 "Example : select login from accounts where phone = :phone ",
		 ""},
51 52 53
		{String, "soci-users-with-phones-request",
		 "Soci SQL request to execute to obtain the usernames associated with phones aliases.\n"
		 "Named parameters are:\n -':phones' : the phones to search for.\n"
54
		 "The use of the :phones parameter is mandatory.\n"
55 56 57
         "If you use phone number linked accounts you'll need to select login, domain, phone in your request for flexisip to work."
		 "Example : select login, domain, phone from accounts where phone in (:phones)",
		 ""},
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78

		{Integer, "soci-poolsize",
		 "Size of the pool of connections that Soci will use. We open a thread for each DB query, and this pool will "
		 "allow each thread to get a connection.\n"
		 "The threads are blocked until a connection is released back to the pool, so increasing the pool size will "
		 "allow more connections to occur simultaneously.\n"
		 "On the other hand, you should not keep too many open connections to your DB at the same time.",
		 "100"},

		{String, "soci-backend", "Choose the type of backend that Soci will use for the connection.\n"
								 "Depending on your Soci package and the modules you installed, this could be 'mysql', "
								 "'oracle', 'postgresql' or something else.",
		 "mysql"},

		{String, "soci-connection-string", "The configuration parameters of the Soci backend.\n"
										   "The basic format is \"key=value key2=value2\". For a mysql backend, this "
										   "is a valid config: \"db=mydb user=user password='pass' host=myhost.com\".\n"
										   "Please refer to the Soci documentation of your backend, for intance: "
										   "http://soci.sourceforge.net/doc/3.2/backends/mysql.html",
		 "db=mydb user=myuser password='mypass' host=myhost.com"},

79 80 81 82 83 84
		{Integer, "soci-max-queue-size",
		 "Amount of queries that will be allowed to be queued before bailing password "
		 "requests.\n This value should be chosen accordingly with 'soci-poolsize', so "
		 "that you have a coherent behavior.\n This limit is here mainly as a safeguard "
		 "against out-of-control growth of the queue in the event of a flood or big "
		 "delays in the database backend.",
85 86
		 "1000"},
		config_item_end};
87 88 89 90

	mc->addChildrenValues(items);
}

91
SociAuthDB::SociAuthDB() : conn_pool(NULL) {
92

93 94
	GenericStruct *cr = GenericManager::get()->getRoot();
	GenericStruct *ma = cr->get<GenericStruct>("module::Authentication");
95

96 97 98
	poolSize = ma->get<ConfigInt>("soci-poolsize")->read();
	connection_string = ma->get<ConfigString>("soci-connection-string")->read();
	backend = ma->get<ConfigString>("soci-backend")->read();
99
	get_password_request = ma->get<ConfigString>("soci-password-request")->read();
100
	get_user_with_phone_request = ma->get<ConfigString>("soci-user-with-phone-request")->read();
101
	get_users_with_phones_request = ma->get<ConfigString>("soci-users-with-phones-request")->read();
102
	unsigned int max_queue_size = (unsigned int)ma->get<ConfigInt>("soci-max-queue-size")->read();
103

104 105
	conn_pool = new connection_pool(poolSize);
	thread_pool = new ThreadPool(poolSize, max_queue_size);
106

107
	LOGD("[SOCI] Authentication provider for backend %s created. Pooled for %d connections", backend.c_str(), (int)poolSize);
108

109 110 111 112 113 114 115 116
	try {
		for (size_t i = 0; i < poolSize; i++) {
			conn_pool->at(i).open(backend, connection_string);
		}
	} catch (soci::mysql_soci_error const & e) {
		SLOGE << "[SOCI] connection pool open MySQL error: " << e.err_num_ << " " << e.what() << endl;
	} catch (exception const &e) {
		SLOGE << "[SOCI] connection pool open error: " << e.what() << endl;
117 118 119 120
	}
}

SociAuthDB::~SociAuthDB() {
121 122
	delete thread_pool; // will automatically shut it down, clearing threads
	delete conn_pool;
123 124
}

125
void SociAuthDB::reconnectSession(soci::session &session) {
126
	try {
127
		SLOGE << "[SOCI] Trying close/reconnect session";
128 129
		session.close();
		session.reconnect();
130
		SLOGD << "[SOCI] Session " << session.get_backend_name() << " successfully reconnected";
131 132 133 134 135
	} catch (soci::mysql_soci_error const & e) {
		SLOGE << "[SOCI] reconnectSession MySQL error: " << e.err_num_ << " " << e.what() << endl;
	} catch (exception const &e) {
		SLOGE << "[SOCI] reconnectSession error: " << e.what() << endl;
	}
136 137
}

138
#define DURATION_MS(start, stop) (unsigned long) duration_cast<milliseconds>((stop) - (start)).count()
139

140 141
void SociAuthDB::getPasswordWithPool(const std::string &id, const std::string &domain,
									 const std::string &authid, AuthDbListener *listener) {
142 143
	steady_clock::time_point start;
	steady_clock::time_point stop;
144
	std::string pass;
145
	session *sql = NULL;
146 147 148
	int errorCount = 0;
	bool retry = false;
	
149
	while (errorCount < 2) {
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
		retry = false;
		try {
			start = steady_clock::now();
			// will grab a connection from the pool. This is thread safe
			sql = new session(*conn_pool); //this may raise a soci_error exception, so keep it in the try block.

			stop = steady_clock::now();

			SLOGD << "[SOCI] Pool acquired in " << DURATION_MS(start, stop) << "ms";
			start = stop;

			*sql << get_password_request, into(pass), use(id, "id"), use(domain, "domain"), use(authid, "authid");
			stop = steady_clock::now();
			SLOGD << "[SOCI] Got pass for " << id << " in " << DURATION_MS(start, stop) << "ms";
			cachePassword(createPasswordKey(id, authid), domain, pass, mCacheExpire);
			if (listener){
				listener->onResult(pass.empty() ? PASSWORD_NOT_FOUND : PASSWORD_FOUND, pass);
			}
			errorCount = 0;
		} catch (mysql_soci_error const &e) {
			errorCount++;
			stop = steady_clock::now();
172
			SLOGE << "[SOCI] getPasswordWithPool MySQL error after " << DURATION_MS(start, stop) << "ms : " << e.err_num_ << " " << e.what();
173 174
			if (sql) reconnectSession(*sql);
			
175 176
			if ((e.err_num_ == 2014 || e.err_num_ == 2006) && errorCount == 1){
				/* 2014 is the infamous "Commands out of sync; you can't run this command now" mysql error,
177 178
				 * which is retryable.
				 * At this time we don't know if it is a soci or mysql bug, or bug with the sql request being executed.
179 180
				 * 
				 * 2006 is "MySQL server has gone away" which is also retryable.
181
				 */
182
				SLOGE << "[SOCI] retrying mysql error " << e.err_num_;
183 184 185 186 187
				retry = true;
			}
		} catch (exception const &e) {
			errorCount++;
			stop = steady_clock::now();
188
			SLOGE << "[SOCI] getPasswordWithPool error after " << DURATION_MS(start, stop) << "ms : " << e.what();
189 190 191 192 193 194 195 196
			if (sql) reconnectSession(*sql);
		}
		if (sql) delete sql;
		if (!retry){
			if (errorCount){
				if (listener) listener->onResult(AUTH_ERROR, pass);
			}
			break;
197
		}
198 199 200
	}
}

201
void SociAuthDB::getUserWithPhoneWithPool(const std::string &phone, const std::string &domain, AuthDbListener *listener) {
202 203 204 205 206 207 208 209 210 211 212 213 214 215
	steady_clock::time_point start;
	steady_clock::time_point stop;
	std::string user;
	session *sql = NULL;

	try {
		start = steady_clock::now();
		// will grab a connection from the pool. This is thread safe
		sql = new session(*conn_pool); //this may raise a soci_error exception, so keep it in the try block.

		stop = steady_clock::now();

		SLOGD << "[SOCI] Pool acquired in " << DURATION_MS(start, stop) << "ms";
		start = stop;
216 217 218
		if(get_user_with_phone_request != "") {
			*sql << get_user_with_phone_request, into(user), use(phone, "phone");
		} else {
Benjamin REIS's avatar
Benjamin REIS committed
219 220 221 222 223 224 225 226 227 228 229
			string s = get_users_with_phones_request;
			int index = s.find(":phones");
			while(index > -1) {
				s = s.replace(index, 7, phone);
				index = s.find(":phones");
			}
			rowset<row> ret = (sql->prepare << s);
			for (rowset<row>::const_iterator it = ret.begin(); it != ret.end(); ++it) {
				row const& row = *it;
				user = row.get<string>(0);
			}
230
		}
231 232 233 234 235 236 237 238 239 240 241
		stop = steady_clock::now();
		if (!user.empty())  {
			SLOGD << "[SOCI] Got user for " << phone << " in " << DURATION_MS(start, stop) << "ms";
			cacheUserWithPhone(phone, domain, user);
		}
		if (listener){
			listener->onResult(user.empty() ? PASSWORD_NOT_FOUND : PASSWORD_FOUND, user);
		}
	} catch (mysql_soci_error const &e) {

		stop = steady_clock::now();
242
		SLOGE << "[SOCI] getUserWithPhoneWithPool MySQL error after " << DURATION_MS(start, stop) << "ms : " << e.err_num_ << " " << e.what();
243 244 245 246 247 248
		if (listener) listener->onResult(PASSWORD_NOT_FOUND, user);

		if (sql) reconnectSession(*sql);

	} catch (exception const &e) {
		stop = steady_clock::now();
249
		SLOGE << "[SOCI] getUserWithPhoneWithPool error after " << DURATION_MS(start, stop) << "ms : " << e.what();
250 251 252 253 254 255
		if (listener) listener->onResult(PASSWORD_NOT_FOUND, user);
		if (sql) reconnectSession(*sql);
	}
	if (sql) delete sql;
}

256 257 258 259
void SociAuthDB::getUsersWithPhonesWithPool(list<tuple<std::string,std::string,AuthDbListener*>> &creds, AuthDbListener *listener) {
	steady_clock::time_point start;
	steady_clock::time_point stop;
	set<std::string> users;
Benjamin REIS's avatar
Benjamin REIS committed
260
	std::ostringstream in;
261 262
	session *sql = NULL;
	list<std::string> phones;
Benjamin REIS's avatar
Benjamin REIS committed
263
	bool first = true;
264 265
	for(tuple<std::string,std::string,AuthDbListener*> cred : creds) {
		phones.push_back(std::get<0>(cred));
Benjamin REIS's avatar
Benjamin REIS committed
266 267 268
		if(first) {
			first = false;
			in << "'" << std::get<0>(cred) << "'";
269
		} else {
Benjamin REIS's avatar
Benjamin REIS committed
270
			in << ",'" << std::get<0>(cred) << "'";
271 272 273
		}
	}
	
274
	string s = get_users_with_phones_request;
Benjamin REIS's avatar
Benjamin REIS committed
275
	int index = s.find(":phones");
276
	while(index > -1) {
Benjamin REIS's avatar
Benjamin REIS committed
277
		s = s.replace(index, 7, in.str());
Benjamin REIS's avatar
Benjamin REIS committed
278
		index = s.find(":phones");
279 280
	}
	
281 282 283 284 285 286 287 288 289 290 291 292
	try {
		start = steady_clock::now();
		// will grab a connection from the pool. This is thread safe
		sql = new session(*conn_pool); //this may raise a soci_error exception, so keep it in the try block.
		
		stop = steady_clock::now();
		
		SLOGD << "[SOCI] Pool acquired in " << DURATION_MS(start, stop) << "ms";
		start = stop;
		rowset<row> ret = (sql->prepare << s);
		stop = steady_clock::now();
		
Benjamin REIS's avatar
Benjamin REIS committed
293 294
		SLOGD << "[SOCI] Got users in " << DURATION_MS(start, stop) << "ms";
		
295 296 297
		for (rowset<row>::const_iterator it = ret.begin(); it != ret.end(); ++it) {
			row const& row = *it;
			string user = row.get<string>(0);
Benjamin REIS's avatar
Benjamin REIS committed
298
			string phone = (row.size() > 2) ? row.get<string>(2) : "";
Benjamin REIS's avatar
Benjamin REIS committed
299
			if(phone != "") {
300
				string domain = row.get<string>(1);
Benjamin REIS's avatar
Benjamin REIS committed
301 302 303 304 305
				cacheUserWithPhone(phone, domain, user);
				users.insert(phone);
			} else {
				users.insert(user);
			}
306 307 308 309 310 311 312 313
		}

		if (listener){
			listener->onResults(phones, users);
		}
	} catch (mysql_soci_error const &e) {
		stop = steady_clock::now();
		SLOGE << "[SOCI] getUsersWithPhonesWithPool MySQL error after " << DURATION_MS(start, stop) << "ms : " << e.err_num_ << " " << e.what();
314
		SLOGE << "[SOCI] MySQL request causing the error was : " << s;
315 316 317 318 319 320 321 322 323 324 325 326 327 328
		users.clear();
		if (listener) listener->onResults(phones, users);
		
		if (sql) reconnectSession(*sql);
		
	} catch (exception const &e) {
		stop = steady_clock::now();
		SLOGE << "[SOCI] getUsersWithPhonesWithPool error after " << DURATION_MS(start, stop) << "ms : " << e.what();
		users.clear();
		if (listener) listener->onResults(phones, users);
		if (sql) reconnectSession(*sql);
	}
	if (sql) delete sql;
}
329 330

#ifdef __clang__
331
#pragma mark - Inherited virtuals
332
#endif
333

334
void SociAuthDB::getPasswordFromBackend(const std::string &id, const std::string &domain,
335
										const std::string &authid, AuthDbListener *listener) {
336 337

	// create a thread to grab a pool connection and use it to retrieve the auth information
338
	auto func = bind(&SociAuthDB::getPasswordWithPool, this, id, domain, authid, listener);
339

340
	bool success = thread_pool->Enqueue(func);
341
	if (success == FALSE) {
342
		// Enqueue() can fail when the queue is full, so we have to act on that
343 344
		SLOGE << "[SOCI] Auth queue is full, cannot fullfil password request for " << id << " / " << domain << " / "
			  << authid;
345
		if (listener) listener->onResult(AUTH_ERROR, "");
346
	}
347 348
}

349
void SociAuthDB::getUserWithPhoneFromBackend(const string &phone, const string &domain, AuthDbListener *listener) {
350 351

	// create a thread to grab a pool connection and use it to retrieve the auth information
352
	auto func = bind(&SociAuthDB::getUserWithPhoneWithPool, this, phone, domain, listener);
353

354 355 356 357 358 359
	bool success = thread_pool->Enqueue(func);
	if (success == FALSE) {
		// Enqueue() can fail when the queue is full, so we have to act on that
		SLOGE << "[SOCI] Auth queue is full, cannot fullfil user request for " << phone;
		if (listener) listener->onResult(AUTH_ERROR, "");
	}
360
}
361 362 363 364 365 366 367 368 369 370 371 372 373

void SociAuthDB::getUsersWithPhonesFromBackend(list<tuple<std::string,std::string,AuthDbListener*>> &creds, AuthDbListener *listener) {
	
	// create a thread to grab a pool connection and use it to retrieve the auth information
	auto func = bind(&SociAuthDB::getUsersWithPhonesWithPool, this, creds, listener);
	
	bool success = thread_pool->Enqueue(func);
	if (success == FALSE) {
		// Enqueue() can fail when the queue is full, so we have to act on that
		SLOGE << "[SOCI] Auth queue is full, cannot fullfil user request for " << &creds;
		if (listener) listener->onResult(AUTH_ERROR, "");
	}
}