Commit 11a8bee8 authored by Benjamin REIS's avatar Benjamin REIS

Add presence list functions to make just one SQL request for a suscribe

parent cd88e2d7
......@@ -47,6 +47,11 @@ void SociAuthDB::declareConfig(GenericStruct *mc) {
"Named parameters are:\n -':phone' : the phone number to search for.\n"
"The use of the :phone parameter is mandatory.",
"select login from accounts where phone = :phone"},
{String, "soci-users-with-phones-request",
"Soci SQL request to execute to obtain the usernames associated with phones aliases.\n"
"Named parameters are:\n -':phones' : the phones to search for.\n"
"The use of the :phones parameter is mandatory.",
"select login, domain, phone from accounts where phone in :phones"},
{Integer, "soci-poolsize",
"Size of the pool of connections that Soci will use. We open a thread for each DB query, and this pool will "
......@@ -90,6 +95,7 @@ SociAuthDB::SociAuthDB() : conn_pool(NULL) {
backend = ma->get<ConfigString>("soci-backend")->read();
get_password_request = ma->get<ConfigString>("soci-password-request")->read();
get_user_with_phone_request = ma->get<ConfigString>("soci-user-with-phone-request")->read();
get_users_with_phones_request = ma->get<ConfigString>("soci-users-with-phones-request")->read();
unsigned int max_queue_size = (unsigned int)ma->get<ConfigInt>("soci-max-queue-size")->read();
conn_pool = new connection_pool(poolSize);
......@@ -230,6 +236,72 @@ void SociAuthDB::getUserWithPhoneWithPool(const std::string &phone, const std::s
if (sql) delete sql;
}
void SociAuthDB::getUsersWithPhonesWithPool(list<tuple<std::string,std::string,AuthDbListener*>> &creds, AuthDbListener *listener) {
steady_clock::time_point start;
steady_clock::time_point stop;
set<std::string> users;
string in;
session *sql = NULL;
list<std::string> phones;
for(tuple<std::string,std::string,AuthDbListener*> cred : creds) {
phones.push_back(std::get<0>(cred));
if(in == "") {
in += std::get<0>(cred);
} else {
in += "," + std::get<0>(cred);
}
}
try {
start = steady_clock::now();
// will grab a connection from the pool. This is thread safe
sql = new session(*conn_pool); //this may raise a soci_error exception, so keep it in the try block.
stop = steady_clock::now();
SLOGD << "[SOCI] Pool acquired in " << DURATION_MS(start, stop) << "ms";
start = stop;
string s = get_users_with_phones_request;
int index = s.find(":");
while(index > -1) {
s = s.replace(index, 7, in);
index = s.find(":");
}
rowset<row> ret = (sql->prepare << s);
stop = steady_clock::now();
for (rowset<row>::const_iterator it = ret.begin(); it != ret.end(); ++it) {
row const& row = *it;
string phone = row.get<string>(2);
string domain = row.get<string>(1);
string user = row.get<string>(0);
SLOGD << "[SOCI] Got user for " << phone << " in " << DURATION_MS(start, stop) << "ms";
cacheUserWithPhone(phone, domain, user);
users.insert(phone);
}
if (listener){
listener->onResults(phones, users);
}
} catch (mysql_soci_error const &e) {
stop = steady_clock::now();
SLOGE << "[SOCI] getUsersWithPhonesWithPool MySQL error after " << DURATION_MS(start, stop) << "ms : " << e.err_num_ << " " << e.what();
users.clear();
if (listener) listener->onResults(phones, users);
if (sql) reconnectSession(*sql);
} catch (exception const &e) {
stop = steady_clock::now();
SLOGE << "[SOCI] getUsersWithPhonesWithPool error after " << DURATION_MS(start, stop) << "ms : " << e.what();
users.clear();
if (listener) listener->onResults(phones, users);
if (sql) reconnectSession(*sql);
}
if (sql) delete sql;
}
#pragma mark - Inherited virtuals
void SociAuthDB::getPasswordFromBackend(const std::string &id, const std::string &domain,
......@@ -259,3 +331,16 @@ void SociAuthDB::getUserWithPhoneFromBackend(const string &phone, const string &
if (listener) listener->onResult(AUTH_ERROR, "");
}
}
void SociAuthDB::getUsersWithPhonesFromBackend(list<tuple<std::string,std::string,AuthDbListener*>> &creds, AuthDbListener *listener) {
// create a thread to grab a pool connection and use it to retrieve the auth information
auto func = bind(&SociAuthDB::getUsersWithPhonesWithPool, this, creds, listener);
bool success = thread_pool->Enqueue(func);
if (success == FALSE) {
// Enqueue() can fail when the queue is full, so we have to act on that
SLOGE << "[SOCI] Auth queue is full, cannot fullfil user request for " << &creds;
if (listener) listener->onResult(AUTH_ERROR, "");
}
}
......@@ -25,6 +25,9 @@ AuthDbBackend *AuthDbBackend::sUnique = NULL;
AuthDbListener::~AuthDbListener(){
}
void AuthDbListener::onResults(list<std::string> &phones, set<std::string> &users) {
}
class FixedAuthDb : public AuthDbBackend {
public:
......@@ -206,3 +209,35 @@ void AuthDbBackend::getUserWithPhone(const std::string & phone, const std::strin
// if we reach here, password wasn't cached: we have to grab the password from the actual backend
getUserWithPhoneFromBackend(phone, domain, listener);
}
void AuthDbBackend::getUsersWithPhone(list<tuple<std::string,std::string,AuthDbListener*>> & creds, AuthDbListener *listener) {
list<tuple<std::string,std::string,AuthDbListener*>> needed_creds;
for (tuple<std::string,std::string,AuthDbListener*> cred : creds) {
// Check for usable cached password
string user;
string phone = std::get<0>(cred);
string domain = std::get<1>(cred);
AuthDbListener* cred_listener = std::get<2>(cred);
switch (getCachedUserWithPhone(phone, domain, user)) {
case VALID_PASS_FOUND:
if (cred_listener) cred_listener->onResult(AuthDbResult::PASSWORD_FOUND, user);
break;
case EXPIRED_PASS_FOUND:
case NO_PASS_FOUND:
needed_creds.push_back(cred);
break;
}
}
// if we reach here, password wasn't cached: we have to grab the password from the actual backend
getUsersWithPhonesFromBackend(needed_creds, listener);
}
void AuthDbBackend::getUsersWithPhonesFromBackend(list<tuple<std::string,std::string,AuthDbListener*>> &creds, AuthDbListener *listener) {
for(tuple<std::string,std::string,AuthDbListener*> cred : creds) {
string phone = std::get<0>(cred);
string domain = std::get<1>(cred);
AuthDbListener* l = std::get<2>(cred);
getUserWithPhoneFromBackend(phone,domain, l);
}
}
......@@ -48,6 +48,7 @@ struct AuthDbTimings;
class AuthDbListener : public StatFinishListener {
public:
virtual void onResult(AuthDbResult result, const std::string &passwd) = 0;
virtual void onResults(list<std::string> &phones, set<std::string> &users);
virtual ~AuthDbListener();
};
......@@ -83,7 +84,9 @@ class AuthDbBackend {
// warning: listener may be invoked on authdb backend thread, so listener must be threadsafe somehow!
void getPassword(const std::string & user, const std::string & domain, const std::string &auth_username, AuthDbListener *listener);
void getUserWithPhone(const std::string &phone, const std::string &domain, AuthDbListener *listener);
void getUsersWithPhone(list<tuple<std::string,std::string,AuthDbListener *>> & creds, AuthDbListener *listener);
virtual void getUserWithPhoneFromBackend(const std::string &, const std::string &, AuthDbListener *listener) = 0;
virtual void getUsersWithPhonesFromBackend(list<tuple<std::string,std::string,AuthDbListener*>> &creds, AuthDbListener *listener);
virtual void createAccount(const std::string &user, const std::string & domain, const std::string &auth_username, const std::string &password, int expires, const std::string &phone_alias = "");
......@@ -179,6 +182,7 @@ class SociAuthDB : public AuthDbBackend {
SociAuthDB();
void setConnectionParameters(const string &domain, const string &request);
virtual void getUserWithPhoneFromBackend(const std::string & , const std::string &, AuthDbListener *listener);
virtual void getUsersWithPhonesFromBackend(list<tuple<std::string,std::string,AuthDbListener*>> &creds, AuthDbListener *listener);
virtual void getPasswordFromBackend(const std::string &id, const std::string &domain,
const std::string &authid, AuthDbListener *listener);
......@@ -186,6 +190,7 @@ class SociAuthDB : public AuthDbBackend {
private:
void getUserWithPhoneWithPool(const std::string &phone, const std::string &domain, AuthDbListener *listener);
void getUsersWithPhonesWithPool(list<tuple<std::string,std::string,AuthDbListener*>> &creds, AuthDbListener *listener);
void getPasswordWithPool(const std::string &id, const std::string &domain,
const std::string &authid, AuthDbListener *listener);
......@@ -198,6 +203,7 @@ class SociAuthDB : public AuthDbBackend {
std::string backend;
std::string get_password_request;
std::string get_user_with_phone_request;
std::string get_users_with_phones_request;
};
#endif /* ENABLE_SOCI */
......
......@@ -11,6 +11,10 @@ public:
: mMainLoop(mainLoop), mInfo(info) {
AuthDbBackend::get(); /*this will initialize the database backend, which is good to know that it works at startup*/
}
PresenceAuthListener(belle_sip_main_loop_t *mainLoop, std::map<std::string,std::shared_ptr<PresentityPresenceInformation>> &dInfo)
: mMainLoop(mainLoop), mDInfo(dInfo) {
AuthDbBackend::get(); /*this will initialize the database backend, which is good to know that it works at startup*/
}
virtual void onResult(AuthDbResult result, const std::string &passwd) {
belle_sip_source_cpp_func_t *func = new belle_sip_source_cpp_func_t([this, result, passwd](unsigned int events) {
......@@ -22,38 +26,63 @@ public:
, 0
, "OnAuthListener to mainthread");
}
void onResults(list<std::string> &phones, set<std::string> &users) {
for(std::string phone : phones) {
if(users.size() == 0) {
this->onResult(PASSWORD_NOT_FOUND, phone);
} else if (users.find(phone) != users.end()){
this->onResult(PASSWORD_FOUND, phone);
} else {
this->onResult(PASSWORD_NOT_FOUND, phone);
}
}
}
private:
void processResponse(AuthDbResult result, const std::string &user) {
const char* cuser = belle_sip_uri_get_user(mInfo->getEntity());
std::shared_ptr<PresentityPresenceInformation> info;
bool must_delete = FALSE;
if(mInfo == nullptr) {
std::map<std::string,std::shared_ptr<PresentityPresenceInformation>>::iterator it = mDInfo.find(user);
info = it->second;
} else {
info = mInfo;
must_delete = TRUE;
}
const char* cuser = belle_sip_uri_get_user(info->getEntity());
if (result == AuthDbResult::PASSWORD_FOUND) {
// result is a phone alias if (and only if) user is not the same as the entity user
bool isPhone = (strcmp(user.c_str(), cuser) != 0);
if (isPhone) {
// change contact accordingly
belle_sip_uri_t *uri = BELLE_SIP_URI(belle_sip_object_clone(BELLE_SIP_OBJECT(mInfo->getEntity())));
belle_sip_uri_t *uri = BELLE_SIP_URI(belle_sip_object_clone(BELLE_SIP_OBJECT(info->getEntity())));
belle_sip_parameters_t* params=BELLE_SIP_PARAMETERS(uri);
belle_sip_parameters_remove_parameter(params, "user");
belle_sip_uri_set_user(uri, user.c_str());
char *contact_as_string = belle_sip_uri_to_string(uri);
belle_sip_object_unref(uri);
SLOGD << __FILE__ << ": " << "Found user " << user << " for phone "
<< belle_sip_uri_get_user(mInfo->getEntity()) << ", adding contact " << contact_as_string << " presence information";
mInfo->setDefaultElement(contact_as_string);
<< belle_sip_uri_get_user(info->getEntity()) << ", adding contact " << contact_as_string << " presence information";
info->setDefaultElement(contact_as_string);
belle_sip_free(contact_as_string);
} else {
SLOGD << __FILE__ << ": " << "Found user " << user << ", adding presence information";
mInfo->setDefaultElement();
info->setDefaultElement();
}
} else {
SLOGD << __FILE__ << ": " << "Could not find user " << cuser << ".";
}
delete this;
if(must_delete) {
delete this;
}
}
private:
belle_sip_main_loop_t *mMainLoop;
const std::shared_ptr<PresentityPresenceInformation> mInfo;
std::map<std::string,std::shared_ptr<PresentityPresenceInformation>> mDInfo;
};
void PresenceLongterm::onNewPresenceInfo(const std::shared_ptr<PresentityPresenceInformation>& info) const {
......@@ -73,3 +102,16 @@ void PresenceLongterm::onListenerEvent(const std::shared_ptr<PresentityPresenceI
, new PresenceAuthListener(mMainLoop, info));
}
}
void PresenceLongterm::onListenerEvents(list<std::shared_ptr<PresentityPresenceInformation>>& infos) const {
list<tuple<std::string,std::string,AuthDbListener*>> creds;
std::map<std::string,std::shared_ptr<PresentityPresenceInformation>> dInfo;
for (shared_ptr<PresentityPresenceInformation> &info : infos) {
if (!info->hasDefaultElement()) {
creds.push_back(make_tuple(belle_sip_uri_get_user(info->getEntity()), belle_sip_uri_get_host(info->getEntity()), new PresenceAuthListener(mMainLoop, info)));
}
dInfo.insert(std::pair<std::string,std::shared_ptr<PresentityPresenceInformation>>(belle_sip_uri_get_user(info->getEntity()), info));
}
AuthDbBackend::get()->getUsersWithPhone(creds
, new PresenceAuthListener(mMainLoop, dInfo));
}
......@@ -29,7 +29,7 @@ namespace flexisip {
PresenceLongterm(belle_sip_main_loop_t *mainLoop) : mMainLoop(mainLoop) {};
virtual void onNewPresenceInfo(const std::shared_ptr<PresentityPresenceInformation>& info) const override;
virtual void onListenerEvent(const std::shared_ptr<PresentityPresenceInformation>& info) const override;
virtual void onListenerEvents(list<std::shared_ptr<PresentityPresenceInformation>>& info) const override;
private:
belle_sip_main_loop_t *mMainLoop;
};
......
......@@ -633,9 +633,13 @@ void PresenceServer::processSubscribeRequestEvent(const belle_sip_request_event_
belle_sip_server_transaction_send_response(server_transaction, resp);
belle_sip_dialog_set_application_data(dialog, new shared_ptr<Subscription> (listSubscription));
#if 0
for (shared_ptr<PresentityPresenceInformationListener> &listener : listSubscription->getListeners()) {
addOrUpdateListener(listener); //expiration is handled by dialog
}
#else
addOrUpdateListeners(listSubscription->getListeners());
#endif
listSubscription->notify(TRUE);
} else {
......@@ -716,10 +720,13 @@ void PresenceServer::processSubscribeRequestEvent(const belle_sip_request_event_
} else {
// list subscription case
shared_ptr<ListSubscription> listSubscription = dynamic_pointer_cast<ListSubscription>(subscription);
for (shared_ptr<PresentityPresenceInformationListener> listener :
listSubscription->getListeners()) {
addOrUpdateListener(listener, expires);
#if 0
for (shared_ptr<PresentityPresenceInformationListener> &listener : listSubscription->getListeners()) {
addOrUpdateListener(listener, expires); //expiration is handled by dialog
}
#else
addOrUpdateListeners(listSubscription->getListeners(), expires);
#endif
}
}
break;
......@@ -810,6 +817,7 @@ void PresenceServer::addOrUpdateListener(shared_ptr<PresentityPresenceInformatio
}
void PresenceServer::addOrUpdateListener(shared_ptr<PresentityPresenceInformationListener> &listener, int expires) {
std::shared_ptr<PresentityPresenceInformation> presenceInfo = getPresenceInfo(listener->getPresentityUri());
if (presenceInfo == NULL) {
/*no information available yet, but creating entry to be able to register subscribers*/
presenceInfo = make_shared<PresentityPresenceInformation>(listener->getPresentityUri(), *this,
......@@ -828,6 +836,35 @@ void PresenceServer::addOrUpdateListener(shared_ptr<PresentityPresenceInformatio
else
presenceInfo->addOrUpdateListener(listener);
}
void PresenceServer::addOrUpdateListeners(list<shared_ptr<PresentityPresenceInformationListener>> &listeners) {
addOrUpdateListeners(listeners,-1);
}
void PresenceServer::addOrUpdateListeners(list<shared_ptr<PresentityPresenceInformationListener>> &listeners, int expires) {
list<std::shared_ptr<PresentityPresenceInformation>> presenceInfos;
for (shared_ptr<PresentityPresenceInformationListener> &listener : listeners) {
std::shared_ptr<PresentityPresenceInformation> presenceInfo = getPresenceInfo(listener->getPresentityUri());
if (presenceInfo == NULL) {
/*no information available yet, but creating entry to be able to register subscribers*/
presenceInfo = make_shared<PresentityPresenceInformation>(listener->getPresentityUri(), *this,
belle_sip_stack_get_main_loop(mStack));
SLOGD << "New Presentity [" << *presenceInfo << "] created from SUBSCRIBE";
addPresenceInfo(presenceInfo);
}
if (expires > 0)
presenceInfo->addOrUpdateListener(listener, expires);
else
presenceInfo->addOrUpdateListener(listener);
presenceInfos.push_back(presenceInfo);
}
//notify observers that a listener is added or updated
for (auto& listener : mPresenceInfoObservers) {
listener->onListenerEvents(presenceInfos);
}
}
void PresenceServer::removeListener(const shared_ptr<PresentityPresenceInformationListener> &listener) {
const std::shared_ptr<PresentityPresenceInformation> presenceInfo = getPresenceInfo(listener->getPresentityUri());
if (presenceInfo) {
......
......@@ -63,6 +63,8 @@ public:
virtual void onNewPresenceInfo(const std::shared_ptr<PresentityPresenceInformation>& info) const = 0;
//notified when a listener is added or refreshed
virtual void onListenerEvent(const std::shared_ptr<PresentityPresenceInformation>& info) const = 0;
//notified when a listener is added or refreshed
virtual void onListenerEvents(list<std::shared_ptr<PresentityPresenceInformation>>& infos) const = 0;
};
class PresenceServer : public PresentityManager {
......@@ -128,6 +130,8 @@ private:
void addOrUpdateListener(shared_ptr<PresentityPresenceInformationListener>& listerner,int expires);
void addOrUpdateListener(shared_ptr<PresentityPresenceInformationListener>& listerner);
void addOrUpdateListeners(list<shared_ptr<PresentityPresenceInformationListener>>& listerner,int expires);
void addOrUpdateListeners(list<shared_ptr<PresentityPresenceInformationListener>>& listerner);
void removeListener(const shared_ptr<PresentityPresenceInformationListener>& listerner);
void removeSubscription(shared_ptr<Subscription> &identity) throw();
......
......@@ -269,6 +269,7 @@ void PresentityPresenceInformation::addOrUpdateListener(const shared_ptr<Present
*/
listener->onInformationChanged(*this);
}
void PresentityPresenceInformation::removeListener(const shared_ptr<PresentityPresenceInformationListener> &listener) {
SLOGD << "removing listener [" << listener.get() << "] on [" << *this << "]";
// 1 cancel expiration time
......@@ -282,6 +283,7 @@ void PresentityPresenceInformation::removeListener(const shared_ptr<PresentityPr
// successful unsubscription will also trigger a final NOTIFY message.
listener->onInformationChanged(*this);
}
bool PresentityPresenceInformation::hasDefaultElement() {
return mDefaultInformationElement != nullptr;
}
......
belle-sip @ ddb0cafc
Subproject commit 65974d1f4a692c43601649d274ddd18ab9e37869
Subproject commit ddb0cafc35c2d81446885c8561ffeebfff14329c
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment