Commit 9f155f9c authored by Ghislain MARY's avatar Ghislain MARY
Browse files

Add first call unit tests in Python.

parent 515369ca
......@@ -30,7 +30,6 @@ class Logger(logging.Logger):
def __init__(self, filename):
logging.Logger.__init__(self, filename)
self.setLevel(logging.INFO)
handler = logging.FileHandler(filename)
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s.%(msecs)03d %(levelname)s: %(message)s', '%H:%M:%S')
......@@ -155,11 +154,33 @@ class CoreManagerStats:
class CoreManager:
@classmethod
def wait_for_until(cls, manager1, manager2, func, timeout):
managers = []
if manager1 is not None:
managers.append(manager1)
if manager2 is not None:
managers.append(manager2)
return cls.wait_for_list(managers, func, timeout)
@classmethod
def wait_for_list(cls, managers, func, timeout):
start = datetime.now()
end = start + timedelta(milliseconds = timeout)
res = func(*managers)
while not res and datetime.now() < end:
for manager in managers:
manager.lc.iterate()
time.sleep(0.02)
res = func(*managers)
return res
@classmethod
def registration_state_changed(cls, lc, cfg, state, message):
logging.info("New registration state {state} for user id [{identity}] at proxy [{addr}]".format(
state=linphone.RegistrationState.string(state), identity=cfg.identity, addr=cfg.server_addr))
manager = lc.user_data
if manager.logger is not None:
manager.logger.info("[TESTER] New registration state {state} for user id [{identity}] at proxy [{addr}]".format(
state=linphone.RegistrationState.string(state), identity=cfg.identity, addr=cfg.server_addr))
if state == linphone.RegistrationState.RegistrationNone:
manager.stats.number_of_LinphoneRegistrationNone += 1
elif state == linphone.RegistrationState.RegistrationProgress:
......@@ -175,18 +196,71 @@ class CoreManager:
@classmethod
def auth_info_requested(cls, lc, realm, username, domain):
logging.info("Auth info requested for user id [{username}] at realm [{realm}]".format(
username=username, realm=realm))
manager = lc.user_data
if manager.logger is not None:
manager.logger.info("[TESTER] Auth info requested for user id [{username}] at realm [{realm}]".format(
username=username, realm=realm))
manager.stats.number_of_auth_info_requested +=1
def __init__(self, rc_file = None, check_for_proxies = True, vtable = {}):
@classmethod
def call_state_changed(cls, lc, call, state, msg):
manager = lc.user_data
to_address = call.call_log.to.as_string()
#from_address = call.call_log.from.as_string()
from_address = ''
direction = "Outgoing"
if call.call_log.dir == linphone.CallDir.CallIncoming:
direction = "Incoming"
if manager.logger is not None:
manager.logger.info("[TESTER] {direction} call from [{from_address}] to [{to_address}], new state is [{state}]".format(
direction=direction, from_address=from_address, to_address=to_address, state=linphone.CallState.string(state)))
if state == linphone.CallState.CallIncomingReceived:
manager.stats.number_of_LinphoneCallIncomingReceived += 1
elif state == linphone.CallState.CallOutgoingInit:
manager.stats.number_of_LinphoneCallOutgoingInit += 1
elif state == linphone.CallState.CallOutgoingProgress:
manager.stats.number_of_LinphoneCallOutgoingProgress += 1
elif state == linphone.CallState.CallOutgoingRinging:
manager.stats.number_of_LinphoneCallOutgoingRinging += 1
elif state == linphone.CallState.CallOutgoingEarlyMedia:
manager.stats.number_of_LinphoneCallOutgoingEarlyMedia += 1
elif state == linphone.CallState.CallConnected:
manager.stats.number_of_LinphoneCallConnected += 1
elif state == linphone.CallState.CallStreamsRunning:
manager.stats.number_of_LinphoneCallStreamsRunning += 1
elif state == linphone.CallState.CallPausing:
manager.stats.number_of_LinphoneCallPausing += 1
elif state == linphone.CallState.CallPaused:
manager.stats.number_of_LinphoneCallPaused += 1
elif state == linphone.CallState.CallResuming:
manager.stats.number_of_LinphoneCallResuming += 1
elif state == linphone.CallState.CallRefered:
manager.stats.number_of_LinphoneCallRefered += 1
elif state == linphone.CallState.CallError:
manager.stats.number_of_LinphoneCallError += 1
elif state == linphone.CallState.CallEnd:
manager.stats.number_of_LinphoneCallEnd += 1
elif state == linphone.CallState.CallPausedByRemote:
manager.stats.number_of_LinphoneCallPausedByRemote += 1
elif state == linphone.CallState.CallUpdatedByRemote:
manager.stats.number_of_LinphoneCallUpdatedByRemote += 1
elif state == linphone.CallState.CallIncomingEarlyMedia:
manager.stats.number_of_LinphoneCallIncomingEarlyMedia += 1
elif state == linphone.CallState.CallUpdating:
manager.stats.number_of_LinphoneCallUpdating += 1
elif state == linphone.CallState.CallReleased:
manager.stats.number_of_LinphoneCallReleased += 1
else:
raise Exception("Unexpected call state")
def __init__(self, rc_file = None, check_for_proxies = True, vtable = {}, logger=None):
self.logger = logger
if not vtable.has_key('registration_state_changed'):
vtable['registration_state_changed'] = CoreManager.registration_state_changed
if not vtable.has_key('auth_info_requested'):
vtable['auth_info_requested'] = CoreManager.auth_info_requested
#if not vtable.has_key('call_state_changed'):
#vtable['call_state_changed'] = CoreManager.call_state_changed
if not vtable.has_key('call_state_changed'):
vtable['call_state_changed'] = CoreManager.call_state_changed
#if not vtable.has_key('text_received'):
#vtable['text_received'] = CoreManager.text_received
#if not vtable.has_key('message_received'):
......@@ -230,7 +304,8 @@ class CoreManager:
else:
proxy_count = 0
if proxy_count:
self.wait_for_until(self.lc, None, lambda manager: manager.stats.number_of_LinphoneRegistrationOk == proxy_count, 5000 * proxy_count)
self.logger.warning(self)
CoreManager.wait_for_until(self, None, lambda manager: manager.stats.number_of_LinphoneRegistrationOk == proxy_count, 5000 * proxy_count)
assert_equals(self.stats.number_of_LinphoneRegistrationOk, proxy_count)
self.enable_audio_codec("PCMU", 8000)
......@@ -257,25 +332,6 @@ class CoreManager:
lc.static_picture = os.path.join(resources_path, 'images', 'nowebcamCIF.jpg')
return lc
def wait_for_until(self, lc_1, lc_2, func, timeout):
lcs = []
if lc_1 is not None:
lcs.append(lc_1)
if lc_2 is not None:
lcs.append(lc_2)
return self.wait_for_list(lcs, func, timeout)
def wait_for_list(self, lcs, func, timeout):
start = datetime.now()
end = start + timedelta(milliseconds = timeout)
res = func(self)
while not res and datetime.now() < end:
for lc in lcs:
lc.iterate()
time.sleep(0.02)
res = func(self)
return res
def enable_audio_codec(self, mime, rate):
codecs = self.lc.audio_codecs
for codec in codecs:
......
from nose.tools import assert_equals
import linphone
import linphonetester
import os
import time
class TestCall:
@classmethod
def setup_class(cls):
base, ext = os.path.splitext(os.path.basename(__file__))
cls.logger = linphonetester.Logger(base + '.log')
def test_early_declined_call(self):
marie = linphonetester.CoreManager('marie_rc', logger=TestCall.logger)
pauline = linphonetester.CoreManager('pauline_rc', logger=TestCall.logger)
marie.lc.max_calls = 0
out_call = pauline.lc.invite('marie')
# Wait until flexisip transfers the busy...
assert_equals(linphonetester.CoreManager.wait_for_until(pauline, marie, lambda pauline, marie: pauline.stats.number_of_LinphoneCallError == 1, 33000), True)
assert_equals(pauline.stats.number_of_LinphoneCallError, 1)
assert_equals(out_call.reason, linphone.Reason.ReasonBusy)
if len(pauline.lc.call_logs) > 0:
out_call_log = pauline.lc.call_logs[0]
assert out_call_log != None
assert_equals(out_call_log.status, linphone.CallStatus.CallAborted)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment