Commit 294440b8 authored by Simon Morlat's avatar Simon Morlat

add tasks to MSTicker, so that MSFilter can schedule arbitrary tasks for next tick execution.

MSRtpSend can make use of it so that sending of RTP packet is perfectly aligned with the tick time (but with a fixed delay of at most one tick duration)
parent fecaf6d5
......@@ -152,6 +152,7 @@ struct _MSFilter{
/*private attributes */
uint32_t last_tick;
MSFilterStats *stats;
int postponed_task; /*number of postponed tasks*/
bool_t seen;
};
......@@ -627,6 +628,19 @@ void ms_filter_notify_no_arg(MSFilter *f, unsigned int id);
#define ms_filter_unlock(f) ms_mutex_unlock(&(f)->lock)
void ms_filter_unregister_all(void);
struct _MSFilterTask{
MSFilter *f;
MSFilterFunc taskfunc;
};
typedef struct _MSFilterTask MSFilterTask;
void ms_filter_task_process(MSFilterTask *task);
/**
* Allow a filter to request the ticker to call him the tick after.
* The ticker will call the taskfunc prior to all filter's process func.
**/
void ms_filter_postpone_task(MSFilter *f, MSFilterFunc taskfunc);
#ifdef __cplusplus
}
#endif
......
......@@ -68,6 +68,7 @@ struct _MSTicker
ms_mutex_t lock;
ms_cond_t cond;
MSList *execution_list; /* the list of source filters to be executed.*/
MSList *task_list; /* list of tasks (see ms_filter_postpone_task())*/
ms_thread_t thread; /* the thread ressource*/
int interval; /* in miliseconds*/
int exec_id;
......
......@@ -18,6 +18,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#include "mediastreamer2/msfilter.h"
#include "mediastreamer2/msticker.h"
static MSList *desc_list=NULL;
static bool_t statistics_enabled=FALSE;
......@@ -258,6 +259,23 @@ void ms_filter_process(MSFilter *f){
}
void ms_filter_task_process(MSFilterTask *task){
MSTimeSpec start,stop;
MSFilter *f=task->f;
/*ms_message("Executing task of filter %s:%p",f->desc->name,f);*/
if (f->stats)
ms_get_cur_time(&start);
task->taskfunc(f);
if (f->stats){
ms_get_cur_time(&stop);
f->stats->count++;
f->stats->elapsed+=(stop.tv_sec-start.tv_sec)*1000000000LL + (stop.tv_nsec-start.tv_nsec);
}
f->postponed_task--;
}
void ms_filter_preprocess(MSFilter *f, struct _MSTicker *t){
f->last_tick=0;
f->ticker=t;
......@@ -280,7 +298,19 @@ bool_t ms_filter_inputs_have_data(MSFilter *f){
return FALSE;
}
void ms_filter_postpone_task(MSFilter *f, MSFilterFunc taskfunc){
MSFilterTask *task;
MSTicker *ticker=f->ticker;
if (ticker==NULL){
ms_error("ms_filter_postpone_task(): this method cannot be called outside of filter's process method.");
return;
}
task=ms_new(MSFilterTask,1);
task->f=f;
task->taskfunc=taskfunc;
ticker->task_list=ms_list_prepend(ticker->task_list,task);
f->postponed_task++;
}
static void find_filters(MSList **filters, MSFilter *f ){
int i,found;
......
......@@ -45,6 +45,7 @@ static const double smooth_coef=0.9;
static void * ms_ticker_run(void *s);
static uint64_t get_cur_time_ms(void *);
static int wait_next_tick(void *, uint64_t virt_ticker_time);
static void remove_tasks_for_filter(MSTicker *ticker, MSFilter *f);
static void ms_ticker_start(MSTicker *s){
s->run=TRUE;
......@@ -164,7 +165,10 @@ int ms_ticker_attach_multiple(MSTicker *ticker,MSFilter *f,...)
return 0;
}
static void call_postprocess(MSFilter *f){
if (f->postponed_task) remove_tasks_for_filter(f->ticker,f);
ms_filter_postprocess(f);
}
int ms_ticker_detach(MSTicker *ticker,MSFilter *f){
MSList *sources=NULL;
......@@ -191,7 +195,7 @@ int ms_ticker_detach(MSTicker *ticker,MSFilter *f){
ticker->execution_list=ms_list_remove(ticker->execution_list,it->data);
}
ms_mutex_unlock(&ticker->lock);
ms_list_for_each(filters,(void (*)(void*))ms_filter_postprocess);
ms_list_for_each(filters,(void (*)(void*))call_postprocess);
ms_list_free(filters);
ms_list_free(sources);
return 0;
......@@ -221,6 +225,7 @@ static void call_process(MSFilter *f){
ms_warning("Re-scheduling filter %s: all data should be consumed in one process call, so fix it.",f->desc->name);
}
ms_filter_process(f);
if (f->postponed_task) break;
process_done=TRUE;
}
}
......@@ -264,6 +269,31 @@ static void run_graphs(MSTicker *s, MSList *execution_list, bool_t force_schedul
}
}
static void run_tasks(MSTicker *ticker){
MSList *elem,*prevelem=NULL;
for (elem=ticker->task_list;elem!=NULL;){
MSFilterTask *t=(MSFilterTask*)elem->data;
ms_filter_task_process(t);
ms_free(t);
prevelem=elem;
elem=elem->next;
ms_free(prevelem);
}
ticker->task_list=NULL;
}
static void remove_tasks_for_filter(MSTicker *ticker, MSFilter *f){
MSList *elem,*nextelem;
for (elem=ticker->task_list;elem!=NULL;elem=nextelem){
MSFilterTask *t=(MSFilterTask*)elem->data;
nextelem=elem->next;
if (t->f==f){
ticker->task_list=ms_list_remove_link(ticker->task_list,elem);
ms_free(t);
}
}
}
static uint64_t get_cur_time_ms(void *unused){
MSTimeSpec ts;
ms_get_cur_time(&ts);
......@@ -405,6 +435,7 @@ void * ms_ticker_run(void *arg)
ms_get_cur_time(&begin);
#endif
run_tasks(s);
run_graphs(s,s->execution_list,FALSE);
#if TICKER_MEASUREMENTS
ms_get_cur_time(&end);
......
......@@ -49,6 +49,7 @@ struct SenderData {
bool_t dtmf_start;
bool_t skip;
bool_t mute_mic;
bool_t use_task;
};
typedef struct SenderData SenderData;
......@@ -75,7 +76,8 @@ static void send_stun_packet(RtpSession *s)
static void sender_init(MSFilter * f)
{
SenderData *d = (SenderData *)ms_new0(SenderData, 1);
const char *tmp=getenv("MS2_RTP_FIXED_DELAY");
d->session = NULL;
d->tsoff = 0;
d->skip_until = 0;
......@@ -91,6 +93,8 @@ static void sender_init(MSFilter * f)
d->last_sent_time=-1;
d->last_stun_sent_time = -1;
d->last_ts=0;
d->use_task= tmp ? (!!atoi(tmp)) : FALSE;
if (d->use_task) ms_message("MSRtpSend will use tasks to send out packet at the beginning of ticks.");
f->data = d;
}
......@@ -336,7 +340,7 @@ static int send_dtmf(MSFilter * f, uint32_t timestamp_start)
return 0;
}
static void sender_process(MSFilter * f)
static void _sender_process(MSFilter * f)
{
SenderData *d = (SenderData *) f->data;
RtpSession *s = d->session;
......@@ -344,10 +348,6 @@ static void sender_process(MSFilter * f)
mblk_t *im;
uint32_t timestamp;
if (s == NULL){
ms_queue_flush(f->inputs[0]);
return;
}
if (d->relay_session_id_size>0 &&
( (f->ticker->time-d->last_rsi_time)>5000 || d->last_rsi_time==0) ) {
......@@ -400,6 +400,19 @@ static void sender_process(MSFilter * f)
ms_filter_unlock(f);
}
static void sender_process(MSFilter * f){
SenderData *d = (SenderData *) f->data;
RtpSession *s = d->session;
if (s == NULL){
ms_queue_flush(f->inputs[0]);
return;
}
if (d->use_task)
ms_filter_postpone_task(f,_sender_process);
else _sender_process(f);
}
static MSFilterMethod sender_methods[] = {
{MS_RTP_SEND_MUTE_MIC, sender_mute_mic},
{MS_RTP_SEND_UNMUTE_MIC, sender_unmute_mic},
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment