Commit 161469f7 authored by Simon Morlat's avatar Simon Morlat

rework ITC filters to fix race conditions.

parent 3323709c
...@@ -642,6 +642,13 @@ build_matroska=yes ...@@ -642,6 +642,13 @@ build_matroska=yes
if test x"$ebmldir" != xno ; then if test x"$ebmldir" != xno ; then
MS_CHECK_DEP([ebml library],[EBML],[${ebmldir}/include], MS_CHECK_DEP([ebml library],[EBML],[${ebmldir}/include],
[${ebmldir}/lib],[ebml/ebml.h],[ebml2],[EBML_ElementRender]) [${ebmldir}/lib],[ebml/ebml.h],[ebml2],[EBML_ElementRender])
if test "$EBML_found" = "no" ; then
no_ebml=true
dnl check without ebml lib, directly in matroska2 (cmake build doesn't create libebml)
MS_CHECK_DEP([ebml library],[EBML],[${ebmldir}/include],
[${ebmldir}/lib],[ebml/ebml.h],[matroska2],[EBML_ElementRender])
fi
if test "$EBML_found" = "no" ; then if test "$EBML_found" = "no" ; then
build_matroska=no build_matroska=no
if test "$matroska" = "true" ; then if test "$matroska" = "true" ; then
...@@ -653,8 +660,14 @@ else ...@@ -653,8 +660,14 @@ else
fi fi
if test x"$matroskadir" != xno ; then if test x"$matroskadir" != xno ; then
MS_CHECK_DEP([matroska library],[MATROSKA],[${matroskadir}/include],
if test "$no_ebml" = "true" ; then
MS_CHECK_DEP([matroska library],[MATROSKA],[${matroskadir}/include],
[${matroskadir}/lib],[matroska/matroska.h],[matroska2],[MATROSKA_BlockReleaseData])
else
MS_CHECK_DEP([matroska library],[MATROSKA],[${matroskadir}/include],
[${matroskadir}/lib],[matroska/matroska.h],[matroska2],[MATROSKA_BlockReleaseData],[-lebml2]) [${matroskadir}/lib],[matroska/matroska.h],[matroska2],[MATROSKA_BlockReleaseData],[-lebml2])
fi
if test "$MATROSKA_found" = "trueno" ; then if test "$MATROSKA_found" = "trueno" ; then
build_matroska=no build_matroska=no
if test "$matroska" = "true" ; then if test "$matroska" = "true" ; then
......
...@@ -20,33 +20,85 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. ...@@ -20,33 +20,85 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "mediastreamer2/msitc.h" #include "mediastreamer2/msitc.h"
typedef struct SinkState{ typedef struct SharedState{
ms_mutex_t mutex; ms_mutex_t mutex;
int refcnt;
int rate; int rate;
int nchannels; int nchannels;
MSQueue q; MSQueue q;
const MSFmtDescriptor *fmt; const MSFmtDescriptor *fmt;
MSFilter *source; MSFilter *source;
}SinkState; }SharedState;
static void itc_source_init(MSFilter *f){ static SharedState * itc_get_shared_state(MSFilter *f){
f->data=NULL; SharedState *s=(SharedState*)f->data;
return s;
} }
static void itc_source_uninit(MSFilter *f){ static SharedState *shared_state_new(void){
SharedState *s = ms_new0(SharedState,1);
ms_mutex_init(&s->mutex, NULL);
ms_queue_init(&s->q);
return s;
} }
static SinkState * itc_source_get_sink_state(MSFilter *f){ static void shared_state_release(SharedState *s){
MSFilter *sink=(MSFilter *)f->data; ms_mutex_lock(&s->mutex);
if (sink){ s->refcnt--;
SinkState *s=(SinkState*)sink->data; ms_mutex_unlock(&s->mutex);
return s;
if (s->refcnt == 0){
ms_mutex_destroy(&s->mutex);
ms_queue_flush(&s->q);
ms_free(s);
}
}
static void itc_assign(MSFilter *f, SharedState *s, bool_t is_source){
SharedState *current;
if (s){
ms_mutex_lock(&s->mutex);
if (is_source) s->source = f;
s->refcnt++;
ms_mutex_unlock(&s->mutex);
}
current = itc_get_shared_state(f);
f->data = s;
if (current){
ms_mutex_lock(&current->mutex);
if (current->source == f)
current->source = NULL;
ms_mutex_unlock(&current->mutex);
shared_state_release(current);
} }
return NULL; }
static void itc_connect(MSFilter *sink, MSFilter *source){
SharedState *s;
s = (SharedState*) sink->data;
if (!s){
itc_assign(sink, s = shared_state_new(), FALSE);
}
if (source){
ms_filter_lock(source);
itc_assign(source, s, TRUE);
ms_filter_unlock(source);
}
}
static void itc_source_init(MSFilter *f){
f->data=NULL;
}
static void itc_source_uninit(MSFilter *f){
itc_assign(f, NULL, TRUE);
} }
static int itc_source_get_nchannels(MSFilter *f, void *data){ static int itc_source_get_nchannels(MSFilter *f, void *data){
SinkState *ss=itc_source_get_sink_state(f); SharedState *ss=itc_get_shared_state(f);
if (ss){ if (ss){
*(int*)data=ss->nchannels; *(int*)data=ss->nchannels;
return 0; return 0;
...@@ -56,7 +108,7 @@ static int itc_source_get_nchannels(MSFilter *f, void *data){ ...@@ -56,7 +108,7 @@ static int itc_source_get_nchannels(MSFilter *f, void *data){
} }
static int itc_source_get_rate(MSFilter *f, void *data){ static int itc_source_get_rate(MSFilter *f, void *data){
SinkState *ss=itc_source_get_sink_state(f); SharedState *ss=itc_get_shared_state(f);
if (ss){ if (ss){
*(int*)data=ss->rate; *(int*)data=ss->rate;
return 0; return 0;
...@@ -66,7 +118,7 @@ static int itc_source_get_rate(MSFilter *f, void *data){ ...@@ -66,7 +118,7 @@ static int itc_source_get_rate(MSFilter *f, void *data){
} }
static int itc_source_get_out_fmt(MSFilter *f, void *data){ static int itc_source_get_out_fmt(MSFilter *f, void *data){
SinkState *ss=itc_source_get_sink_state(f); SharedState *ss=itc_get_shared_state(f);
if (ss){ if (ss){
((MSPinFormat*)data)->fmt=ss->fmt; ((MSPinFormat*)data)->fmt=ss->fmt;
return 0; return 0;
...@@ -76,9 +128,12 @@ static int itc_source_get_out_fmt(MSFilter *f, void *data){ ...@@ -76,9 +128,12 @@ static int itc_source_get_out_fmt(MSFilter *f, void *data){
} }
static void itc_source_process(MSFilter *f){ static void itc_source_process(MSFilter *f){
SinkState *ss=itc_source_get_sink_state(f); SharedState *ss;
mblk_t *m; mblk_t *m;
ms_filter_lock(f);
ss = itc_get_shared_state(f);
if (ss){ if (ss){
ms_mutex_lock(&ss->mutex); ms_mutex_lock(&ss->mutex);
while((m=ms_queue_get(&ss->q))!=NULL){ while((m=ms_queue_get(&ss->q))!=NULL){
...@@ -88,6 +143,7 @@ static void itc_source_process(MSFilter *f){ ...@@ -88,6 +143,7 @@ static void itc_source_process(MSFilter *f){
} }
ms_mutex_unlock(&ss->mutex); ms_mutex_unlock(&ss->mutex);
} }
ms_filter_unlock(f);
} }
static MSFilterMethod source_methods[]={ static MSFilterMethod source_methods[]={
...@@ -133,85 +189,96 @@ MSFilterDesc ms_itc_source_desc={ ...@@ -133,85 +189,96 @@ MSFilterDesc ms_itc_source_desc={
#endif #endif
static void itc_sink_init(MSFilter *f){ static void itc_sink_init(MSFilter *f){
SinkState *s; itc_assign(f, shared_state_new(), FALSE);
f->data=s=ms_new0(SinkState,1);
ms_mutex_init(&s->mutex,NULL);
ms_queue_init(&s->q);
} }
static void itc_sink_uninit(MSFilter *f){ static void itc_sink_uninit(MSFilter *f){
SinkState *s=(SinkState *)f->data; itc_assign(f, NULL, FALSE);
ms_queue_flush(&s->q);
ms_mutex_destroy(&s->mutex);
ms_free(s);
} }
static void itc_sink_preprocess(MSFilter *f){ static void itc_sink_preprocess(MSFilter *f){
SinkState *s=(SinkState *)f->data; SharedState *ss;
if (s->source && s->fmt==NULL) ms_filter_notify_no_arg(s->source,MS_FILTER_OUTPUT_FMT_CHANGED); ms_filter_lock(f);
} ss = itc_get_shared_state(f);
ms_filter_unlock(f);
static void itc_sink_queue_packet(MSFilter *f, mblk_t *m){
SinkState *s=(SinkState *)f->data; ms_mutex_lock(&ss->mutex);
ms_mutex_lock(&s->mutex); if (ss->source && ss->fmt != NULL) ms_filter_notify_no_arg(ss->source,MS_FILTER_OUTPUT_FMT_CHANGED);
if (s->source==NULL){ ms_mutex_unlock(&ss->mutex);
freemsg(m);
}else{
ms_queue_put(&s->q,m);
}
ms_mutex_unlock(&s->mutex);
} }
static void itc_sink_process(MSFilter *f){ static void itc_sink_process(MSFilter *f){
SharedState *s;
mblk_t *im; mblk_t *im;
ms_filter_lock(f);
s = itc_get_shared_state(f);
ms_filter_unlock(f);
ms_mutex_lock(&s->mutex);
while((im=ms_queue_get(f->inputs[0]))!=NULL){ while((im=ms_queue_get(f->inputs[0]))!=NULL){
itc_sink_queue_packet(f,im); if (s->source==NULL){
freemsg(im);
}else{
ms_queue_put(&s->q, im);
}
} }
ms_mutex_unlock(&s->mutex);
} }
static int itc_sink_connect(MSFilter *f, void *data){ static int itc_sink_connect(MSFilter *f, void *data){
SinkState *s=(SinkState *)f->data; ms_filter_lock(f);
MSFilter *srcfilter=(MSFilter*)data; itc_connect(f, (MSFilter *)data);
if (srcfilter){ ms_filter_unlock(f);
srcfilter->data=f;
}else{
MSFilter *oldsrc=s->source;
if (oldsrc)
oldsrc->data=NULL;
}
s->source=srcfilter;
return 0; return 0;
} }
static int itc_sink_set_nchannels(MSFilter *f , void *data){ static int itc_sink_set_nchannels(MSFilter *f , void *data){
SinkState *s=(SinkState *)f->data; SharedState *s;
ms_filter_lock(f);
s = itc_get_shared_state(f);
s->nchannels=*(int*)data; s->nchannels=*(int*)data;
ms_filter_unlock(f);
return 0; return 0;
} }
static int itc_sink_set_sr(MSFilter *f , void *data){ static int itc_sink_set_sr(MSFilter *f , void *data){
SinkState *s=(SinkState *)f->data; SharedState *s;
ms_filter_lock(f);
s = itc_get_shared_state(f);
s->rate=*(int*)data; s->rate=*(int*)data;
ms_filter_unlock(f);
return 0; return 0;
} }
static int itc_sink_get_nchannels(MSFilter *f , void *data){ static int itc_sink_get_nchannels(MSFilter *f , void *data){
SinkState *s=(SinkState *)f->data; SharedState *s;
ms_filter_lock(f);
s = itc_get_shared_state(f);
*(int*)data=s->nchannels; *(int*)data=s->nchannels;
ms_filter_unlock(f);
return 0; return 0;
} }
static int itc_sink_get_sr(MSFilter *f , void *data){ static int itc_sink_get_sr(MSFilter *f , void *data){
SinkState *s=(SinkState *)f->data; SharedState *s;
ms_filter_lock(f);
s = itc_get_shared_state(f);
*(int*)data=s->rate; *(int*)data=s->rate;
ms_filter_unlock(f);
return 0; return 0;
} }
static int itc_sink_set_fmt(MSFilter *f, void *data){ static int itc_sink_set_fmt(MSFilter *f, void *data){
SinkState *s=(SinkState *)f->data; SharedState *s;
ms_filter_lock(f);
s = itc_get_shared_state(f);
s->fmt=((MSPinFormat*)data)->fmt; s->fmt=((MSPinFormat*)data)->fmt;
ms_filter_unlock(f);
ms_mutex_lock(&s->mutex);
if (s->source && s->fmt) if (s->source && s->fmt)
ms_filter_notify_no_arg(s->source,MS_FILTER_OUTPUT_FMT_CHANGED); ms_filter_notify_no_arg(s->source,MS_FILTER_OUTPUT_FMT_CHANGED);
ms_mutex_unlock(&s->mutex);
return 0; return 0;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment