Commit 39d9b4b9 authored by johan's avatar johan Committed by jehan
Browse files

Manage multithread

- ongoing
parent 4806746c
Lime
=======
Lime is a C++ library implementing Open Whisper System Signal protocol :
Lime is a thread safe C++ library implementing Open Whisper System Signal protocol :
Sesame, double ratchet and X3DH. https://signal.org/docs/
Lime can run the Signal Protocol using elliptic curve 25519 or curve 448-goldilocks.
......@@ -99,5 +99,5 @@ Options
------------------
- [1] linphone-desktop: git://git.linphone.org/linphone-desktop.git
- [2] bctoolbox: git://git.linphone.org/bctoolbox.git or <http://www.linphone.org/releases/sources/bctoolbox>
- [1] linphone-desktop: https://gitlab.linphone.org/BC/public/linphone-desktop.git
- [2] bctoolbox: https://gitlab.linphone.org/BC/public/bctoolbox.git or <http://www.linphone.org/releases/sources/bctoolbox>
......@@ -24,6 +24,7 @@
#include <vector>
#include <functional>
#include <string>
#include <mutex>
namespace lime {
......@@ -125,7 +126,9 @@ namespace lime {
class LimeManager {
private :
std::unordered_map<std::string, std::shared_ptr<LimeGeneric>> m_users_cache; // cache of already opened Lime Session, identified by user Id (GRUU)
std::mutex m_users_mutex; // m_users_cache mutex
std::string m_db_access; // DB access information forwarded to SOCI to correctly access database
std::shared_ptr<std::recursive_mutex> m_db_mutex; // database access mutex
limeX3DHServerPostData m_X3DH_post_data; // send data to the X3DH key server
void load_user(std::shared_ptr<LimeGeneric> &user, const std::string &localDeviceId, const bool allStatus=false); // helper function, get from m_users_cache of local Storage the requested Lime object
......@@ -367,9 +370,13 @@ namespace lime {
*
* @param[in] db_access string used to access DB: can be filename for sqlite3 or access params for mysql, directly forwarded to SOCI session opening
* @param[in] X3DH_post_data A function to send data to the X3DH server, parameters includes a callback to transfer back the server response
* @param[in] db_mutex a mutex used to lock database access. Is optionnal: if not given, the manager will produce one internally
*/
LimeManager(const std::string &db_access, const limeX3DHServerPostData &X3DH_post_data)
: m_users_cache{}, m_db_access{db_access}, m_X3DH_post_data{X3DH_post_data} {};
LimeManager(const std::string &db_access, const limeX3DHServerPostData &X3DH_post_data, std::shared_ptr<std::recursive_mutex> db_mutex);
/**
* @overload LimeManager(const std::string &db_access, const limeX3DHServerPostData &X3DH_post_data)
*/
LimeManager(const std::string &db_access, const limeX3DHServerPostData &X3DH_post_data);
~LimeManager() = default;
};
......
......@@ -207,6 +207,7 @@ int lime_ffi_delete_user(lime_manager_t manager, const char *localDeviceId, cons
/**
* @brief Check if a user is present and active in local storage
*
* @param[in] manager pointer to the opaque structure used to interact with lime
* @param[in] localDeviceId used to identify which local account looking up, shall be the GRUU (Null terminated string)
*
* @return LIME_FFI_SUCCESS if the user is active in the local storage, LIME_FFI_USER_NOT_FOUND otherwise
......@@ -417,7 +418,7 @@ int lime_ffi_set_x3dhServerUrl(lime_manager_t manager, const char *localDeviceId
* @param[in] manager pointer to the opaque structure used to interact with lime
* @param[in] localDeviceId Identify the local user account, it must be unique and is also be used as Id on the X3DH key server, it shall be the GRUU, in a NULL terminated string
* @param[in] x3dhServerUrl The complete url(including port) of the X3DH key server in a NULL terminated string
* @param[in/out] x3dhServerUrlSize Size of the previous buffer, is updated with actual size of data written(without the '\0', would give the same result as strlen.)
* @param[in,out] x3dhServerUrlSize Size of the previous buffer, is updated with actual size of data written(without the '\0', would give the same result as strlen.)
*
* @return LIME_FFI_SUCCESS or a negative error code
*/
......
......@@ -23,6 +23,7 @@
#include "bctoolbox/exception.hh"
#include "lime_double_ratchet.hpp"
#include "lime_double_ratchet_protocol.hpp"
#include <mutex>
using namespace::std;
......@@ -93,11 +94,12 @@ namespace lime {
void Lime<Curve>::publish_user(const limeCallback &callback, const uint16_t OPkInitialBatchSize) {
auto userData = make_shared<callbackUserData<Curve>>(this->shared_from_this(), callback, OPkInitialBatchSize);
get_SelfIdentityKey(); // make sure our Ik is loaded in object
/* Generate (or load if they already are in base when publishing an inactive user) the SPk */
// Generate (or load if they already are in base when publishing an inactive user) the SPk
X<Curve, lime::Xtype::publicKey> SPk{};
DSA<Curve, lime::DSAtype::signature> SPk_sig{};
uint32_t SPk_id=0;
X3DH_generate_SPk(SPk, SPk_sig, SPk_id, true);
// Generate (or load if they already are in base when publishing an inactive user) the OPks
std::vector<X<Curve, lime::Xtype::publicKey>> OPks{};
std::vector<uint32_t> OPk_ids{};
......@@ -167,12 +169,13 @@ namespace lime {
void Lime<Curve>::encrypt(std::shared_ptr<const std::string> recipientUserId, std::shared_ptr<std::vector<RecipientData>> recipients, std::shared_ptr<const std::vector<uint8_t>> plainMessage, const lime::EncryptionPolicy encryptionPolicy, std::shared_ptr<std::vector<uint8_t>> cipherMessage, const limeCallback &callback) {
LIME_LOGI<<"encrypt from "<<m_selfDeviceId<<" to "<<recipients->size()<<" recipients";
/* Check if we have all the Double Ratchet sessions ready or shall we go for an X3DH */
std::vector<std::string> missingPeers; /* vector of deviceId(GRUU) which are requested to perform X3DH before the encryption can occurs */
/* Create the appropriate recipient infos and fill it with sessions found in cache */
// internal_recipients is a vector duplicating the recipients one in the same order (ignoring the one with peerStatus set to fail)
// This allows fast copying of relevant information back to recipients when encryption is completed
std::vector<RecipientInfos<Curve>> internal_recipients{};
std::unique_lock<std::mutex> lock(m_mutex);
for (const auto &recipient : *recipients) {
// if the input recipient peerStatus is fail we must ignore it
// most likely: we're in a call after a key bundle fetch and this peer device does not have keys on the X3DH server
......@@ -208,35 +211,45 @@ namespace lime {
// retrieve bundles from X3DH server, when they arrive, it will run the X3DH initiation and create the DR sessions
std::vector<uint8_t> X3DHmessage{};
x3dh_protocol::buildMessage_getPeerBundles<Curve>(X3DHmessage, missing_devices);
lock.unlock(); // unlock before calling external callbacks
postToX3DHServer(userData, X3DHmessage);
} else { // got everyone, encrypt
encryptMessage(internal_recipients, *plainMessage, *recipientUserId, m_selfDeviceId, *cipherMessage, encryptionPolicy);
// move DR messages to the input/output structure, ignoring again the input with peerStatus set to fail
// so the index on the internal_recipients still matches the way we created it from recipients
size_t i=0;
auto callbackStatus = lime::CallbackReturn::fail;
std::string callbackMessage{"All recipients failed to provide a key bundle"};
for (auto &recipient : *recipients) {
if (recipient.peerStatus != lime::PeerDeviceStatus::fail) {
recipient.DRmessage = std::move(internal_recipients[i].DRmessage);
recipient.peerStatus = internal_recipients[i].peerStatus;
i++;
callbackStatus = lime::CallbackReturn::success; // we must have at least one recipient with a successful encryption to return success
callbackMessage.clear();
}
}
if (callback) callback(callbackStatus, callbackMessage);
// is there no one in an asynchronous encryption process and do we have something in encryption queue to process
if (m_ongoing_encryption == nullptr && !m_encryption_queue.empty()) { // may happend when an encryption was queued but session was created by a previously queued encryption request
auto userData = m_encryption_queue.front();
m_encryption_queue.pop(); // remove it from queue and do it
encrypt(userData->recipientUserId, userData->recipients, userData->plainMessage, userData->encryptionPolicy, userData->cipherMessage, userData->callback);
return;
}
// We have everyone: encrypt
encryptMessage(internal_recipients, *plainMessage, *recipientUserId, m_selfDeviceId, *cipherMessage, encryptionPolicy);
// move DR messages to the input/output structure, ignoring again the input with peerStatus set to fail
// so the index on the internal_recipients still matches the way we created it from recipients
size_t i=0;
auto callbackStatus = lime::CallbackReturn::fail;
std::string callbackMessage{"All recipients failed to provide a key bundle"};
for (auto &recipient : *recipients) {
if (recipient.peerStatus != lime::PeerDeviceStatus::fail) {
recipient.DRmessage = std::move(internal_recipients[i].DRmessage);
recipient.peerStatus = internal_recipients[i].peerStatus;
i++;
callbackStatus = lime::CallbackReturn::success; // we must have at least one recipient with a successful encryption to return success
callbackMessage.clear();
}
}
lock.unlock(); // unlock before calling external callbacks
if (callback) callback(callbackStatus, callbackMessage);
lock.lock();
// is there no one in an asynchronous encryption process and do we have something in encryption queue to process
if (m_ongoing_encryption == nullptr && !m_encryption_queue.empty()) { // may happend when an encryption was queued but session was created by a previously queued encryption request
auto userData = m_encryption_queue.front();
m_encryption_queue.pop(); // remove it from queue and do it
lock.unlock(); // unlock before recursive call
encrypt(userData->recipientUserId, userData->recipients, userData->plainMessage, userData->encryptionPolicy, userData->cipherMessage, userData->callback);
}
}
template <typename Curve>
lime::PeerDeviceStatus Lime<Curve>::decrypt(const std::string &recipientUserId, const std::string &senderDeviceId, const std::vector<uint8_t> &DRmessage, const std::vector<uint8_t> &cipherMessage, std::vector<uint8_t> &plainMessage) {
std::lock_guard<std::mutex> lock(m_mutex);
// before trying to decrypt, we must check if the sender device is known in the local Storage and if we trust it
// a successful decryption will insert it in local storage so we must check first if it is there in order to detect new devices
// Note: a device could already be trusted in DB even before the first message (if we established trust before sending the first message)
......@@ -249,8 +262,8 @@ namespace lime {
auto db_sessionIdInCache = 0; // this would be the db_sessionId of the session stored in cache if there is one, no session has the Id 0
if (sessionElem != m_DR_sessions_cache.end()) { // session is in cache, it is the active one, just give it a try
db_sessionIdInCache = sessionElem->second->dbSessionId();
std::vector<std::shared_ptr<DR<Curve>>> DRSessions{1, sessionElem->second}; // copy the session pointer into a vector as the decrypt function ask for it
if (decryptMessage<Curve>(senderDeviceId, m_selfDeviceId, recipientUserId, DRSessions, DRmessage, cipherMessage, plainMessage) != nullptr) {
std::vector<std::shared_ptr<DR<Curve>>> cached_DRSessions{1, sessionElem->second}; // copy the session pointer into a vector as the decrypt function ask for it
if (decryptMessage<Curve>(senderDeviceId, m_selfDeviceId, recipientUserId, cached_DRSessions, DRmessage, cipherMessage, plainMessage) != nullptr) {
// we manage to decrypt the message with the current active session loaded in cache
return senderDeviceStatus;
} else { // remove session from cache
......@@ -366,11 +379,12 @@ namespace lime {
* @param[in] OPkInitialBatchSize Number of OPks in the first batch uploaded to X3DH server
* @param[in] X3DH_post_data A function used to communicate with the X3DH server
* @param[in] callback To provide caller the operation result
* @param[in] db_mutex a mutex to protect db access
*
* @return a pointer to the LimeGeneric class allowing access to API declared in lime_lime.hpp
*/
std::shared_ptr<LimeGeneric> insert_LimeUser(const std::string &dbFilename, const std::string &deviceId, const std::string &url, const lime::CurveId curve, const uint16_t OPkInitialBatchSize,
const limeX3DHServerPostData &X3DH_post_data, const limeCallback &callback) {
const limeX3DHServerPostData &X3DH_post_data, const limeCallback &callback, std::shared_ptr<std::recursive_mutex> db_mutex) {
LIME_LOGI<<"Create Lime user "<<deviceId;
/* first check the requested curve is instanciable and return an exception if not */
#ifndef EC25519_ENABLED
......@@ -385,7 +399,7 @@ namespace lime {
#endif
/* open DB */
auto localStorage = std::unique_ptr<lime::Db>(new lime::Db(dbFilename)); // create as unique ptr, ownership is then passed to the Lime structure when instanciated
auto localStorage = std::unique_ptr<lime::Db>(new lime::Db(dbFilename, db_mutex)); // create as unique ptr, ownership is then passed to the Lime structure when instanciated
try {
//instanciate the correct Lime object
......@@ -428,17 +442,18 @@ namespace lime {
* Fail to find the user will raise an exception
* If allStatus flag is set to false (default value), raise an exception on inactive users otherwise load inactive user.
*
* @param[in] dbFilename Path to filename to use
* @param[in] deviceId User to lookup in DB, deviceId shall be the GRUU
* @param[in] X3DH_post_data A function used to communicate with the X3DH server
* @param[in] allStatus allow loading of inactive user if set to true
* @param[in] dbFilename Path to filename to use
* @param[in] deviceId User to lookup in DB, deviceId shall be the GRUU
* @param[in] X3DH_post_data A function used to communicate with the X3DH server
* @param[in] db_mutex a mutex to protect db access
* @param[in] allStatus allow loading of inactive user if set to true
*
* @return a pointer to the LimeGeneric class allowing access to API declared in lime_lime.hpp
*/
std::shared_ptr<LimeGeneric> load_LimeUser(const std::string &dbFilename, const std::string &deviceId, const limeX3DHServerPostData &X3DH_post_data, const bool allStatus) {
std::shared_ptr<LimeGeneric> load_LimeUser(const std::string &dbFilename, const std::string &deviceId, const limeX3DHServerPostData &X3DH_post_data, std::shared_ptr<std::recursive_mutex> db_mutex, const bool allStatus) {
/* open DB and load user */
auto localStorage = std::unique_ptr<lime::Db>(new lime::Db(dbFilename)); // create as unique ptr, ownership is then passed to the Lime structure when instanciated
auto localStorage = std::unique_ptr<lime::Db>(new lime::Db(dbFilename, db_mutex)); // create as unique ptr, ownership is then passed to the Lime structure when instanciated
auto curve = CurveId::unset;
long int Uid=0;
std::string x3dh_server_url;
......
......@@ -23,6 +23,7 @@
#include <vector>
#include <unordered_map>
#include <queue>
#include <mutex>
#include "lime/lime.hpp"
#include "lime_lime.hpp"
......@@ -48,6 +49,7 @@ namespace lime {
/* general purpose */
std::shared_ptr<RNG> m_RNG; // Random Number Generator context
std::string m_selfDeviceId; // self device Id, shall be the GRUU
std::mutex m_mutex; // a mutex to lock own thread sensitive ressources (m_DR_sessions_cache, encryption_queue)
/* X3DH keys */
DSApair<Curve> m_Ik; // our identity key pair, is loaded from DB only if requested(to sign a SPK or to perform X3DH init)
......
......@@ -23,6 +23,7 @@
#include <memory> // unique_ptr
#include <unordered_map>
#include <vector>
#include <mutex>
namespace lime {
......@@ -162,9 +163,9 @@ namespace lime {
/* Lime Factory functions : return a pointer to the implementation using the specified elliptic curve. Two functions: one for creation, one for loading from local storage */
std::shared_ptr<LimeGeneric> insert_LimeUser(const std::string &dbFilename, const std::string &deviceId, const std::string &url, const lime::CurveId curve, const uint16_t OPkInitialBatchSize,
const limeX3DHServerPostData &X3DH_post_data, const limeCallback &callback);
const limeX3DHServerPostData &X3DH_post_data, const limeCallback &callback, std::shared_ptr<std::recursive_mutex> mutex);
std::shared_ptr<LimeGeneric> load_LimeUser(const std::string &dbFilename, const std::string &deviceId, const limeX3DHServerPostData &X3DH_post_data, const bool allStatus=false);
std::shared_ptr<LimeGeneric> load_LimeUser(const std::string &dbFilename, const std::string &deviceId, const limeX3DHServerPostData &X3DH_post_data, std::shared_ptr<std::recursive_mutex> mutex, const bool allStatus=false);
}
#endif // lime_lime_hpp
......@@ -20,6 +20,7 @@
#include <bctoolbox/exception.hh>
#include <soci/soci.h>
#include <set>
#include <mutex>
#include "lime_log.hpp"
#include "lime/lime.hpp"
......@@ -38,12 +39,8 @@ namespace lime {
/* Db public API */
/* */
/******************************************************************************/
/**
* @brief Constructor, open and check DB validity, create or update db schema is needed
*
* @param[in] filename The path to DB file
*/
Db::Db(std::string filename) : sql{"sqlite3", filename}{
Db::Db(const std::string &filename, std::shared_ptr<std::recursive_mutex> db_mutex) : sql{"sqlite3", filename}, m_db_mutex{db_mutex} {
std::lock_guard<std::recursive_mutex> lock(*m_db_mutex);
constexpr int db_module_table_not_holding_lime_row = -1;
int userVersion=db_module_table_not_holding_lime_row;
......@@ -226,6 +223,7 @@ Db::Db(std::string filename) : sql{"sqlite3", filename}{
*/
void Db::load_LimeUser(const std::string &deviceId, long int &Uid, lime::CurveId &curveId, std::string &url, const bool allStatus)
{
std::lock_guard<std::recursive_mutex> lock(*m_db_mutex);
int curve=0;
sql<<"SELECT Uid,curveId,server FROM lime_LocalUsers WHERE UserId = :userId LIMIT 1;", into(Uid), into(curve), into(url), use(deviceId);
......@@ -270,6 +268,7 @@ void Db::load_LimeUser(const std::string &deviceId, long int &Uid, lime::CurveId
* Once we moved to next chain(as soon as peer got an answer from us and replies), the count won't be reset anymore
*/
void Db::clean_DRSessions() {
std::lock_guard<std::recursive_mutex> lock(*m_db_mutex);
// WARNING: not sure this code is portable it may work with sqlite3 only
// delete stale sessions considered to old
sql<<"DELETE FROM DR_sessions WHERE Status=0 AND timeStamp < date('now', '-"<<lime::settings::DRSession_limboTime_days<<" day');";
......@@ -284,6 +283,7 @@ void Db::clean_DRSessions() {
* SPk in stale status for more than SPK_limboTime_days are deleted
*/
void Db::clean_SPk() {
std::lock_guard<std::recursive_mutex> lock(*m_db_mutex);
// WARNING: not sure this code is portable it may work with sqlite3 only
// delete stale sessions considered to old
sql<<"DELETE FROM X3DH_SPK WHERE Status=0 AND timeStamp < date('now', '-"<<lime::settings::SPK_limboTime_days<<" day');";
......@@ -295,6 +295,7 @@ void Db::clean_SPk() {
* @param[out] deviceIds the list of all local users (their device Id)
*/
void Db::get_allLocalDevices(std::vector<std::string> &deviceIds) {
std::lock_guard<std::recursive_mutex> lock(*m_db_mutex);
deviceIds.clear();
rowset<row> rs = (sql.prepare << "SELECT UserId FROM lime_LocalUsers;");
for (const auto &r : rs) {
......@@ -336,6 +337,7 @@ void Db::get_allLocalDevices(std::vector<std::string> &deviceIds) {
* - insert/update the status. If inserted, insert an invalid Ik
*/
void Db::set_peerDeviceStatus(const std::string &peerDeviceId, const std::vector<uint8_t> &Ik, lime::PeerDeviceStatus status) {
std::lock_guard<std::recursive_mutex> lock(*m_db_mutex);
// if status is unsafe or untrusted, call the variant without Ik
if (status == lime::PeerDeviceStatus::unsafe || status == lime::PeerDeviceStatus::untrusted) {
this->set_peerDeviceStatus(peerDeviceId, status);
......@@ -380,6 +382,7 @@ void Db::set_peerDeviceStatus(const std::string &peerDeviceId, const std::vector
* Calls with status unsafe or untrusted are executed by this function as they do not need Ik.
*/
void Db::set_peerDeviceStatus(const std::string &peerDeviceId, lime::PeerDeviceStatus status) {
std::lock_guard<std::recursive_mutex> lock(*m_db_mutex);
// Check the status flag value, accepted values are: untrusted, unsafe
if (status != lime::PeerDeviceStatus::unsafe
&& status != lime::PeerDeviceStatus::untrusted) {
......@@ -430,6 +433,7 @@ void Db::set_peerDeviceStatus(const std::string &peerDeviceId, lime::PeerDeviceS
* @return unknown if the device is not in localStorage, untrusted, trusted or unsafe according to the stored value of peer device status flag otherwise
*/
lime::PeerDeviceStatus Db::get_peerDeviceStatus(const std::string &peerDeviceId) {
std::lock_guard<std::recursive_mutex> lock(*m_db_mutex);
int status;
sql<<"SELECT Status FROM Lime_PeerDevices WHERE DeviceId = :peerDeviceId LIMIT 1;", into(status), use(peerDeviceId);
if (sql.got_data()) { // Found it
......@@ -457,6 +461,7 @@ lime::PeerDeviceStatus Db::get_peerDeviceStatus(const std::string &peerDeviceId)
* Call is silently ignored if the device is not found in local storage
*/
void Db::delete_peerDevice(const std::string &peerDeviceId) {
std::lock_guard<std::recursive_mutex> lock(*m_db_mutex);
sql<<"DELETE FROM lime_peerDevices WHERE DeviceId = :peerDeviceId;", use(peerDeviceId);
}
......@@ -472,6 +477,7 @@ void Db::delete_peerDevice(const std::string &peerDeviceId) {
*/
template <typename Curve>
long int Db::check_peerDevice(const std::string &peerDeviceId, const DSA<Curve, lime::DSAtype::publicKey> &peerIk) {
std::lock_guard<std::recursive_mutex> lock(*m_db_mutex);
try {
blob Ik_blob(sql);
long int Did=0;
......@@ -512,6 +518,7 @@ long int Db::check_peerDevice(const std::string &peerDeviceId, const DSA<Curve,
*/
template <typename Curve>
long int Db::store_peerDevice(const std::string &peerDeviceId, const DSA<Curve, lime::DSAtype::publicKey> &peerIk) {
std::lock_guard<std::recursive_mutex> lock(*m_db_mutex);
try {
blob Ik_blob(sql);
......@@ -541,6 +548,7 @@ long int Db::store_peerDevice(const std::string &peerDeviceId, const DSA<Curve,
*/
void Db::delete_LimeUser(const std::string &deviceId)
{
std::lock_guard<std::recursive_mutex> lock(*m_db_mutex);
sql<<"DELETE FROM lime_LocalUsers WHERE UserId = :userId;", use(deviceId);
}
......@@ -562,6 +570,7 @@ void Db::delete_LimeUser(const std::string &deviceId)
/******************************************************************************/
template <typename DHKey>
bool DR<DHKey>::session_save() {
std::lock_guard<std::recursive_mutex> lock(*(m_localStorage->m_db_mutex));
// open transaction
transaction tr(m_localStorage->sql);
......@@ -716,6 +725,7 @@ bool DR<DHKey>::session_save() {
template <typename DHKey>
bool DR<DHKey>::session_load() {
std::lock_guard<std::recursive_mutex> lock(*(m_localStorage->m_db_mutex));
// blobs to store DR session data
blob DHr(m_localStorage->sql);
......@@ -756,6 +766,7 @@ bool DR<DHKey>::session_load() {
template <typename Curve>
bool DR<Curve>::trySkippedMessageKeys(const uint16_t Nr, const X<Curve, lime::Xtype::publicKey> &DHr, DRMKey &MK) {
std::lock_guard<std::recursive_mutex> lock(*(m_localStorage->m_db_mutex));
blob MK_blob(m_localStorage->sql);
blob DHr_blob(m_localStorage->sql);
DHr_blob.write(0, (char *)(DHr.data()), DHr.size());
......@@ -805,6 +816,7 @@ bool DR<Curve>::trySkippedMessageKeys(const uint16_t Nr, const X<Curve, lime::Xt
template <typename Curve>
bool Lime<Curve>::create_user()
{
std::lock_guard<std::recursive_mutex> lock(*(m_localStorage->m_db_mutex));
int Uid;
int curve;
......@@ -875,6 +887,7 @@ bool Lime<Curve>::create_user()
*/
template <typename Curve>
bool Lime<Curve>::activate_user() {
std::lock_guard<std::recursive_mutex> lock(*(m_localStorage->m_db_mutex));
// check if the user is the DB
int Uid = 0;
int curveId = 0;
......@@ -905,6 +918,7 @@ bool Lime<Curve>::activate_user() {
template <typename Curve>
void Lime<Curve>::get_SelfIdentityKey() {
if (m_Ik_loaded == false) {
std::lock_guard<std::recursive_mutex> lock(*(m_localStorage->m_db_mutex));
blob Ik_blob(m_localStorage->sql);
m_localStorage->sql<<"SELECT Ik FROM Lime_LocalUsers WHERE Uid = :UserId LIMIT 1;", into(Ik_blob), use(m_db_Uid);
if (m_localStorage->sql.got_data()) { // Found it, it is stored in one buffer Public || Private
......@@ -930,6 +944,9 @@ void Lime<Curve>::X3DH_generate_SPk(X<Curve, lime::Xtype::publicKey> &publicSPk,
// check Identity key is loaded in Lime object context
get_SelfIdentityKey();
// lock after the get_SelfIdentityKey as it also acquires this lock
std::lock_guard<std::recursive_mutex> lock(*(m_localStorage->m_db_mutex));
// if the load flag is on, try to load a existing active key instead of generating it
if (load) {
blob SPk_blob(m_localStorage->sql);
......@@ -1002,6 +1019,8 @@ void Lime<Curve>::X3DH_generate_SPk(X<Curve, lime::Xtype::publicKey> &publicSPk,
template <typename Curve>
void Lime<Curve>::X3DH_generate_OPks(std::vector<X<Curve, lime::Xtype::publicKey>> &publicOPks, std::vector<uint32_t> &OPk_ids, const uint16_t OPk_number, const bool load) {
std::lock_guard<std::recursive_mutex> lock(*(m_localStorage->m_db_mutex));
// make room for OPk and OPk ids
OPk_ids.clear();
publicOPks.clear();
......@@ -1088,6 +1107,7 @@ void Lime<Curve>::X3DH_generate_OPks(std::vector<X<Curve, lime::Xtype::publicKey
template <typename Curve>
void Lime<Curve>::cache_DR_sessions(std::vector<RecipientInfos<Curve>> &internal_recipients, std::vector<std::string> &missing_devices) {
std::lock_guard<std::recursive_mutex> lock(*(m_localStorage->m_db_mutex));
// build a user list of missing ones : produce a list ready to be sent to SQL query: 'user','user','user',... also build a map to store shared_ptr to sessions
// build also a list of all peer devices used to fetch from DB their status: unknown, untrusted or trusted
std::string sqlString_requestedDevices{""};
......@@ -1171,6 +1191,7 @@ void Lime<Curve>::cache_DR_sessions(std::vector<RecipientInfos<Curve>> &internal
// load from local storage in DRSessions all DR session matching the peerDeviceId, ignore the one picked by id in 2nd arg
template <typename Curve>
void Lime<Curve>::get_DRSessions(const std::string &senderDeviceId, const long int ignoreThisDRSessionId, std::vector<std::shared_ptr<DR<Curve>>> &DRSessions) {
std::lock_guard<std::recursive_mutex> lock(*(m_localStorage->m_db_mutex));
rowset<int> rs = (m_localStorage->sql.prepare << "SELECT s.sessionId FROM DR_sessions as s INNER JOIN lime_PeerDevices as d ON s.Did=d.Did WHERE d.DeviceId = :senderDeviceId AND s.Uid = :Uid AND s.sessionId <> :ignoreThisDRSessionId ORDER BY s.Status DESC, timeStamp ASC;", use(senderDeviceId), use (m_db_Uid), use(ignoreThisDRSessionId));
for (const auto &sessionId : rs) {
......@@ -1187,6 +1208,7 @@ void Lime<Curve>::get_DRSessions(const std::string &senderDeviceId, const long i
*/
template <typename Curve>
void Lime<Curve>::X3DH_get_SPk(uint32_t SPk_id, Xpair<Curve> &SPk) {
std::lock_guard<std::recursive_mutex> lock(*(m_localStorage->m_db_mutex));
blob SPk_blob(m_localStorage->sql);
m_localStorage->sql<<"SELECT SPk FROM X3DH_SPk WHERE Uid = :Uid AND SPKid = :SPk_id LIMIT 1;", into(SPk_blob), use(m_db_Uid), use(SPk_id);
if (m_localStorage->sql.got_data()) { // Found it, it is stored in one buffer Public || Private
......@@ -1202,6 +1224,7 @@ void Lime<Curve>::X3DH_get_SPk(uint32_t SPk_id, Xpair<Curve> &SPk) {
*/
template <typename Curve>
bool Lime<Curve>::is_currentSPk_valid(void) {
std::lock_guard<std::recursive_mutex> lock(*(m_localStorage->m_db_mutex));
// Do we have an active SPk for this user which is younger than SPK_lifeTime_days
int dummy;
m_localStorage->sql<<"SELECT SPKid FROM X3DH_SPk WHERE Uid = :Uid AND Status = 1 AND timeStamp > date('now', '-"<<lime::settings::SPK_lifeTime_days<<" day') LIMIT 1;", into(dummy), use(m_db_Uid);
......@@ -1221,6 +1244,7 @@ bool Lime<Curve>::is_currentSPk_valid(void) {
*/
template <typename Curve>
void Lime<Curve>::X3DH_get_OPk(uint32_t OPk_id, Xpair<Curve> &OPk) {
std::lock_guard<std::recursive_mutex> lock(*(m_localStorage->m_db_mutex));
blob OPk_blob(m_localStorage->sql);
m_localStorage->sql<<"SELECT OPk FROM X3DH_OPK WHERE Uid = :Uid AND OPKid = :OPk_id LIMIT 1;", into(OPk_blob), use(m_db_Uid), use(OPk_id);
if (m_localStorage->sql.got_data()) { // Found it, it is stored in one buffer Public || Private
......@@ -1240,6 +1264,7 @@ void Lime<Curve>::X3DH_get_OPk(uint32_t OPk_id, Xpair<Curve> &OPk) {
*/
template <typename Curve>
void Lime<Curve>::X3DH_updateOPkStatus(const std::vector<uint32_t> &OPkIds) {
std::lock_guard<std::recursive_mutex> lock(*(m_localStorage->m_db_mutex));
if (OPkIds.size()>0) { /* we have keys on server */
// build a comma-separated list of OPk id on server
std::string sqlString_OPkIds{""};
......@@ -1261,6 +1286,7 @@ void Lime<Curve>::X3DH_updateOPkStatus(const std::vector<uint32_t> &OPkIds) {
template <typename Curve>
void Lime<Curve>::set_x3dhServerUrl(const std::string &x3dhServerUrl) {
std::lock_guard<std::recursive_mutex> lock(*(m_localStorage->m_db_mutex));
transaction tr(m_localStorage->sql);
// update in DB, do not check presence as we're called after a load_user who already ensure that
......
......@@ -22,6 +22,7 @@
#include "soci/soci.h"
#include "lime_crypto_primitives.hpp"
#include <mutex>
namespace lime {
......@@ -34,6 +35,8 @@ namespace lime {
public:
/// soci connexion to DB
soci::session sql;
/// mutex on database access
std::shared_ptr<std::recursive_mutex> m_db_mutex;
Db()=delete; // we can't create a new DB holder without DB filename
......@@ -41,8 +44,9 @@ namespace lime {
* @brief Open and check DB validity, create or update db schema is needed
*
* @param[in] filename The path to DB file
* @param[in] db_mutex database access mutex
*/
Db(std::string filename);
Db(const std::string &filename, std::shared_ptr<std::recursive_mutex> db_mutex);
~Db(){sql.close();};
void load_LimeUser(const std::string &deviceId, long int &Uid, lime::CurveId &curveId, std::string &url, const bool allStatus=false);
......
<
......@@ -23,21 +23,30 @@
#include "lime_lime.hpp"
#include "lime_localStorage.hpp"
#include "lime_settings.hpp"
#include <mutex>
#include "bctoolbox/exception.hh"
using namespace::std;
namespace lime {
LimeManager::LimeManager(const std::string &db_access, const limeX3DHServerPostData &X3DH_post_data, std::shared_ptr<std::recursive_mutex> db_mutex)
: m_users_cache{}, m_db_access{db_access}, m_db_mutex{db_mutex}, m_X3DH_post_data{X3DH_post_data} { }
// When no mutex is provided for database access, create one
LimeManager::LimeManager(const std::string &db_access, const limeX3DHServerPostData &X3DH_post_data)
: m_users_cache{}, m_db_access{db_access}, m_db_mutex{std::make_shared<std::recursive_mutex>()}, m_X3DH_post_data{X3DH_post_data} { }
void LimeManager::load_user(std::shared_ptr<LimeGeneric> &user, const std::string &localDeviceId, const bool allStatus) {
// get the Lime manager lock
std::lock_guard<std::mutex> lock(m_users_mutex);
// Load user object
auto userElem = m_users_cache.find(localDeviceId);
if (userElem == m_users_cache.end()) { // not in cache, load it from DB
user = load_LimeUser(m_db_access, localDeviceId, m_X3DH_post_data, allStatus);
user = load_LimeUser(m_db_access, localDeviceId, m_X3DH_post_data, m_db_mutex, allStatus);
m_users_cache[localDeviceId]=user;
} else {
user = userElem->second;
}
}
/****************************************************************************/
......@@ -56,13 +65,19 @@ namespace lime {
// then check if it went well, if not delete the user from localDB
if (returnCode != lime::CallbackReturn::success) {
auto localStorage = std::unique_ptr<lime::Db>(new lime::Db(thiz->m_db_access));
auto localStorage = std::unique_ptr<lime::Db>(new lime::Db(thiz->m_db_access, thiz->m_db_mutex));
localStorage->delete_LimeUser(localDeviceId);
// Failure can occur only on X3DH server response(local failure generate an exception so we would never
// arrive in this callback)), so the lock acquired by create_user has already expired when we arrive here
std::lock_guard<std::mutex> lock(thiz->m_users_mutex);
thiz->m_users_cache.erase(localDeviceId);
}
});
m_users_cache.insert({localDeviceId, insert_LimeUser(m_db_access, localDeviceId, x3dhServerUrl, curve, OPkInitialBatchSize, m_X3DH_post_data, managerCreateCallback)});
std::lock_guard<std::mutex> lock(m_users_mutex);
m_users_cache.insert({localDeviceId, insert_LimeUser(m_db_access, localDeviceId, x3dhServerUrl, curve, OPkInitialBatchSize, m_X3DH_post_data, managerCreateCallback, m_db_mutex)});
}
void LimeManager::delete_user(const std::string &localDeviceId, const limeCallback &callback) {
......@@ -135,7 +150,7 @@ namespace lime {
}
void LimeManager::update(const limeCallback &callback, uint16_t OPkServerLowLimit, uint16_t OPkBatchSize) {
// open local DB
auto localStorage = std::unique_ptr<lime::Db>(new lime::Db(m_db_access));
auto localStorage = std::unique_ptr<lime::Db>(new lime::Db(m_db_access, m_db_mutex));
/* DR sessions and old stale SPk cleaning */
localStorage->clean_DRSessions();
......@@ -187,33 +202,34 @@ namespace lime {
void LimeManager::set_peerDeviceStatus(const std::string &peerDeviceId, const std::vector<uint8_t> &Ik, lime::PeerDeviceStatus status) {
// open local DB
auto localStorage = std::unique_ptr<lime::Db>(new lime::Db(m_db_access));
auto localStorage = std::unique_ptr<lime::Db>(new lime::Db(m_db_access, m_db_mutex));
localStorage->set_peerDeviceStatus(peerDeviceId, Ik, status);
}
void LimeManager::set_peerDeviceStatus(const std::string &peerDeviceId, lime::PeerDeviceStatus status) {
// open local DB
auto localStorage = std::unique_ptr<lime::Db>(new lime::Db(m_db_access));
auto localStorage = std::unique_ptr<lime::Db>(new lime::Db(m_db_access, m_db_mutex));
localStorage->set_peerDeviceStatus(peerDeviceId, status);
}
lime::PeerDeviceStatus LimeManager::get_peerDeviceStatus(const std::string &peerDeviceId) {
// open local DB
auto localStorage = std::unique_ptr<lime::Db>(new lime::Db(m_db_access));
auto localStorage = std::unique_ptr<lime::Db>(new lime::Db(m_db_access, m_db_mutex));
return localStorage->get_peerDeviceStatus(peerDeviceId);
}