Commit 46035d6e authored by François Grisez's avatar François Grisez

Flexisip monitor initial commit

parent 3f779942
# -*-coding:Utf-8 -*
# <one line to give the program's name and a brief idea of what it does.>
# Copyright (C) 2014 François Grisez <francois.grisez@belledonne-communications.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import linphone
import time
import logging
class CoreManager:
def __init__(self, client_id, user_id, proxy_uri):
config = linphone.LpConfig.new(None)
config.set_int("sip", "sip_port", -1)
config.set_int("sip", "sip_tcp_port", -1)
config.set_int("sip", "sip_tls_port", -1)
vtable = {
"call_state_changed": CoreManager._call_state_callback,
"call_stats_updated": CoreManager._call_stats_updated_callback
}
self.core = linphone.Core.new_with_config(vtable, config)
self.core.user_data = self
self.core.echo_cancellation_enabled = False
self.core.video_capture_enabled = False
self.core.video_display_enabled = False
self.use_files = True
self.play_file = "./hello8000.wav"
proxy_config = self.core.create_proxy_config()
proxy_config.identity = user_id
proxy_config.server_addr = proxy_uri
proxy_config.register_enabled = False
address = self.core.create_address(user_id)
auth_info = self.core.create_auth_info(address.username,
None,
address.password,
None,
None,
address.domain)
self.core.add_proxy_config(proxy_config)
self.core.default_proxy_config = proxy_config
self.core.add_auth_info(auth_info)
self.client_id = client_id
def register(self, timeout=5):
self.core.default_proxy_config.edit()
self.core.default_proxy_config.register_enabled = True
self.core.default_proxy_config.done()
success = self.wait_for_until(lambda m: m.core.default_proxy_config.state == linphone.RegistrationState.RegistrationOk, timeout)
if(not success):
raise CoreManager.RegistrationFailError(self)
def unregister(self, timeout=5):
self.core.default_proxy_config.edit()
self.core.default_proxy_config.register_enabled = False
self.core.default_proxy_config.done()
success = self.wait_for_until(lambda m: m.core.default_proxy_config.state == linphone.RegistrationState.RegistrationCleared, timeout)
if(not success):
raise CoreManager.UnregistrationFailError(self)
def wait_for_until(self, test_func, timeout):
start_time = time.time()
delta = 0
while delta < timeout and not test_func(self):
self.core.iterate()
time.sleep(0.1)
delta = time.time() - start_time
return delta < timeout
def _call_state_callback(lc, call, state, msg):
if state == linphone.CallState.CallIncomingReceived:
lc.accept_call(call)
_call_state_callback = staticmethod(_call_state_callback)
def _call_stats_updated_callback(lc, call, stats):
client_id = lc.user_data.client_id
logging.info("c{0}: D={1}kbit/s U={2}kbit/s".format(client_id, stats.download_bandwidth, stats.upload_bandwidth))
_call_stats_updated_callback = staticmethod(_call_stats_updated_callback)
class RegistrationFailError(Exception):
def __init__(self, manager):
self.manager = manager
class UnregistrationFailError(Exception):
def __init__(self, manager):
self.manager = manager
#!/usr/bin/python2
# -*-coding:Utf-8 -*
# <one line to give the program's name and a brief idea of what it does.>
# Copyright (C) 2014 François Grisez <francois.grisez@belledonne-communications.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import time
import logging
import argparse
import test
parser = argparse.ArgumentParser(description="daemon for testing availability of each server of a Flexisip cluster")
parser.add_argument("proxy_config", nargs='+', help="configuration of each client\n" +
"format: identity_uri/proxy_uri")
parser.add_argument("--interval", type=int, help="set time interval in seconds between successive tests", dest="test_interval", default=30)
parser.add_argument("--log", help="log file path", dest="log_file", default="./flexisip_monitor.log")
parser.add_argument("--port", "-p", help="port to switch off when test fails", dest="port", type=int, default=12345)
args = parser.parse_args()
configs = []
for config_str in args.proxy_config:
configs.append(tuple(config_str.split('/')))
logging.basicConfig(level=logging.INFO, filename=args.log_file)
action = test.TcpPortAction(args.port)
test = test.InterCallTest(configs)
test.listeners.append(action)
logging.info("Starting Flexisip monitior with the folowing configuration")
test.print_client_configs()
logging.info("")
try:
while True:
test.run()
logging.info("sleeping for {0} seconds".format(args.test_interval))
time.sleep(args.test_interval)
except KeyboardInterrupt:
logging.info("Stopping Flexisip monitor")
# -*-coding:Utf-8 -*
# <one line to give the program's name and a brief idea of what it does.>
# Copyright (C) 2014 François Grisez <francois.grisez@belledonne-communications.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from coremanager import *
import logging
import socket
import threading
class AbstractTest:
def __init__(self):
self.listeners = []
self.test_succeed = False
def run(self):
self.test_succeed = self._run()
for listener in self.listeners:
listener.notify(self)
def _run(self):
pass
class InterCallTest(AbstractTest):
def __init__(self, lc_configs, timeout=5):
AbstractTest.__init__(self)
self._managers = []
for config in lc_configs:
index = lc_configs.index(config) + 1
self._managers.append(CoreManager(index, *config))
self.test_count = 0
self.timeout = timeout
def _run(self):
self.test_count += 1
try:
logging.info("Starting test #{0}".format(self.test_count))
logging.info("Registering all clients")
for manager in self._managers:
manager.register(timeout=self.timeout)
logging.info("Testing all call combinations")
for m1 in self._managers:
for m2 in self._managers:
if m1 != m2:
index1 = self._managers.index(m1) + 1
logging.info("Client #{0} -> client #{1}".format(m1.client_id, m2.client_id))
InterCallTest._test_call(m1, m2, timeout=self.timeout)
logging.info("All tests have succeed")
return True
except CoreManager.RegistrationFailError as e:
index = self.id(e.manager)
logging.error("Registration of client #{0} has failed".format(index))
return False
except CoreManager.UnregistrationFailError as e:
index = self.id(e.manager)
logging.error("Unregistration of client #{0} has failed".format(index))
return False
except InterCallTest.CallTestFailException as e:
caller_index = self.id(e.caller)
callee_index = self.id(e.callee)
logging.error("Call between client #{0} and client #{1} has failed".format(caller_index, callee_index))
return False
finally:
logging.info("Unregistering all clients")
for manager in self._managers:
manager.unregister(timeout=self.timeout)
def print_client_configs(self):
for manager in self._managers:
index = self.id(manager)
proxy_config = manager.core.default_proxy_config
identity = proxy_config.identity
server_addr = proxy_config.server_addr
logging.info("Client #{0}: {1}\t{2}".format(index, identity, server_addr))
def id(self, manager):
return self._managers.index(manager) + 1
def _test_call(caller, callee, timeout=5):
call = caller.core.invite(callee.core.default_proxy_config.identity)
result = InterCallTest._wait_for_until((caller, callee),
lambda m: m[0].core.current_call.state == linphone.CallState.CallStreamsRunning,
timeout)
if result:
InterCallTest._wait_for_until((caller, callee),
lambda m:
m[0].core.current_call.audio_stats.download_bandwidth > 0 and
m[0].core.current_call.audio_stats.upload_bandwidth > 0 and
m[1].core.current_call.audio_stats.download_bandwidth > 0 and
m[1].core.current_call.audio_stats.upload_bandwidth > 0,
timeout)
else:
raise(InterCallTest.CallTestFailException(caller, callee))
caller.core.terminate_call(call)
InterCallTest._wait_for_until((caller, callee),
lambda m: m[1].core.current_call is None,
timeout)
_test_call = staticmethod(_test_call)
def _wait_for_until(managers, test_func, timeout):
start_time = time.time()
delta = 0
while delta < timeout and \
not test_func(managers):
for manager in managers:
manager.core.iterate()
time.sleep(0.1)
delta = time.time() - start_time
return delta < timeout
_wait_for_until = staticmethod(_wait_for_until)
class CallTestFailException(Exception):
def __init__(self, caller, callee):
self.caller = caller
self.callee = callee
class TcpPortAction(threading.Thread):
def __init__(self, port):
threading.Thread.__init__(self)
self._port = port
self._socket = None
self._lock_socket = threading.RLock()
self.daemon = True
self.start()
def run(self):
while not True:
self._lock_socket.acquire()
if self._socket:
self._lock_socket.release()
conn = self._socket.accept()
else:
self._lock_socket.release()
time.sleep(0.1)
def notify(self, test):
if test.test_succeed:
self._open()
else:
self._close()
def _get_port(self):
return self._port
port = property(_get_port)
def _open(self):
if not self._socket:
self._lock_socket.acquire()
self._socket = socket.socket()
self._socket.bind(('0.0.0.0', self._port))
self._socket.listen(1)
self._lock_socket.release()
def _close(self):
if self._socket:
self._lock_socket.acquire()
self._socket.close()
self._socket = None
self._lock_socket.release()
......@@ -33,6 +33,7 @@ thesources= \
event.cc event.hh \
transaction.cc transaction.hh \
module.cc module.hh \
monitor.cc monitor.hh \
entryfilter.cc entryfilter.hh \
stun.cc stun.hh \
mediarelay.cc mediarelay.hh \
......
......@@ -71,9 +71,12 @@
#include "presence/presence-server.h"
#endif //ENABLE_PRESENCE
#include "monitor.hh"
static int run=1;
static int pipe_fds[2]={-1}; //pipes used by flexisip to notify its starter process that everything went fine
static pid_t flexisip_pid=0;
static pid_t flexisip_pid = -1;
static pid_t monitor_pid = -1;
static su_root_t *root=NULL;
bool sUseSyslog=false;
......@@ -205,8 +208,9 @@ static void makePidFile(const char *pidfile){
}
}
static void forkAndDetach(const char *pidfile, bool auto_respawn){
static void forkAndDetach(const char *pidfile, const char *monitor_pidfile, bool auto_respawn){
int err=pipe(pipe_fds);
bool firstTime = true;
if (err==-1){
LOGE("Could not create pipes: %s",strerror(errno));
exit(-1);
......@@ -220,7 +224,7 @@ static void forkAndDetach(const char *pidfile, bool auto_respawn){
if (pid==0){
fork_flexisip:
/*fork a second time for the flexisip real process*/
/*fork for the flexisip real process*/
flexisip_pid = fork();
if (flexisip_pid < 0){
fprintf(stderr,"Could not fork: %s\n",strerror(errno));
......@@ -240,6 +244,28 @@ fork_flexisip:
makePidFile(pidfile);
return;
}
if(!firstTime) goto watchdog_loop;
fork_monitor:
/* fork for the flexisip monitor */
monitor_pid = fork();
if (monitor_pid < 0){
fprintf(stderr,"Could not fork: %s\n",strerror(errno));
exit(-1);
}
if (monitor_pid == 0) {
/* This is the flexisip monitor process now. */
#ifdef HAVE_SYS_PRCTL_H
if (prctl(PR_SET_NAME,"flexisip_monitor",NULL,NULL,NULL)==-1){
LOGW("prctl() failed: %s",strerror(errno));
}
#endif
close(pipe_fds[0]);
close(pipe_fds[1]);
makePidFile(monitor_pidfile);
return;
}
if(!firstTime) goto watchdog_loop;
/* We are in the watchdog process. It will block until flexisip exits cleanly.
In case of crash, it will restart it.*/
......@@ -248,23 +274,36 @@ fork_flexisip:
LOGW("prctl() failed: %s",strerror(errno));
}
#endif
firstTime = false;
watchdog_loop:
while(true) {
int status=0;
pid_t retpid=wait(&status);
if (retpid>0){
if (WIFEXITED(status)) {
if (WEXITSTATUS(status) == RESTART_EXIT_CODE) {
LOGI("Flexisip restart to apply new config...");
if(retpid == flexisip_pid) {
if (WIFEXITED(status)) {
if (WEXITSTATUS(status) == RESTART_EXIT_CODE) {
LOGI("Flexisip restart to apply new config...");
sleep(1);
goto fork_flexisip;
} else {
LOGD("Flexisip exited normally");
exit(0);
}
}else if (auto_respawn){
LOGE("Flexisip apparently crashed, respawning now...");
sleep(1);
goto fork_flexisip;
} else {
LOGD("Flexisip exited normally");
exit(0);
}
}else if (auto_respawn){
LOGE("Flexisip apparently crashed, respawning now...");
sleep(1);
goto fork_flexisip;
} else if(retpid == monitor_pid) {
if(WIFEXITED(status)) {
LOGD("Flexisip monitior exited normaly");
} else if(auto_respawn){
LOGE("The Flexisip monitor apparently crashed, respawning now...");
sleep(1);
goto fork_monitor;
}
}
}else if (errno!=EINTR){
LOGE("waitpid() error: %s",strerror(errno));
......@@ -547,7 +586,11 @@ int main(int argc, char *argv[]){
if (daemon){
/*now that we have successfully loaded the config, there is nothing that can prevent us to start (normally).
So we can detach.*/
forkAndDetach(pidfile,cfg->getGlobal()->get<ConfigBoolean>("auto-respawn")->read());
forkAndDetach(pidfile, "/home/francois/flexisip_monitor.pid", cfg->getGlobal()->get<ConfigBoolean>("auto-respawn")->read());
if(monitor_pid == 0) {
Monitor::exec();
LOGE("Execution of the Flexisip monitor failed");
}
}
LOGN("Starting flexisip version %s (git %s)", VERSION, FLEXISIP_GIT_VERSION);
......@@ -608,4 +651,3 @@ int main(int argc, char *argv[]){
GenericManager::get()->sendTrap("Flexisip exiting normally");
return 0;
}
/*
Flexisip, a flexible SIP proxy server with media capabilities.
Copyright (C) 2014 Belledonne Communications SARL.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "monitor.hh"
#include "configmanager.hh"
using namespace std;
Monitor::Init Monitor::sInit;
const string Monitor::PYTHON_INTERPRETOR = "/usr/bin/python2";
const string Monitor::SCRIPT_PATH = "/home/francois/projects/flexisip/flexisip_monitor/flexisip_monitor.py";
Monitor::Init::Init() {
ConfigItemDescriptor items[] = {
{ Boolean , "enable" , "Enable or disable the Flexisip monitor daemon", "false" },
{ StringList, "proxy-configs", "List of proxy parameters of each SIP client which will be used for inter-calling tests", ""},
{ Integer , "test-interval", "Time between two consecutive tests", "30"},
{ String , "logfile" , "Path to the log file", "/etc/flexisip/flexisip_monitor.log"},
{ Integer , "switch-port" , "Port to open/close folowing the test succeed or not", "12345"},
config_item_end
};
GenericStruct *s = new GenericStruct("monitor", "Flexisip monitor parameters", 0);
GenericManager::get()->getRoot()->addChild(s);
s->addChildrenValues(items);
}
void Monitor::exec() {
GenericStruct *monitorParams = GenericManager::get()->getRoot()->get<GenericStruct>("monitor");
string interval = monitorParams->get<ConfigValue>("test-interval")->get();
string logfile = monitorParams->get<ConfigString>("logfile")->read();
string port = monitorParams->get<ConfigValue>("switch-port")->get();
list<string> proxyConfigs = monitorParams->get<ConfigStringList>("proxy-configs")->read();
char **args = new char *[proxyConfigs.size() + 9];
args[0] = strdup(PYTHON_INTERPRETOR.c_str());
args[1] = strdup(SCRIPT_PATH.c_str());
args[2] = strdup("--interval");
args[3] = strdup(interval.c_str());
args[4] = strdup("--log");
args[5] = strdup(logfile.c_str());
args[6] = strdup("--port");
args[7] = strdup(port.c_str());
int i=6;
for(string proxyConfig : proxyConfigs) {
args[i] = strdup(proxyConfig.c_str());
i++;
}
args[i] = NULL;
execvp(args[0], args);
}
/*
Flexisip, a flexible SIP proxy server with media capabilities.
Copyright (C) 2014 Belledonne Communications SARL.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef monitor_hh
#define monitor_hh
#include <string>
class Monitor {
public:
static void exec();
private:
class Init {
public:
Init();
};
static Init sInit;
static const std::string PYTHON_INTERPRETOR;
static const std::string SCRIPT_PATH;
};
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment