Commit 27ffa231 authored by Ghislain MARY's avatar Ghislain MARY

Add audio flow control filter.

parent aead67df
......@@ -163,7 +163,8 @@ typedef enum MSFilterId{
MS_G729_DEC_ID,
MS_G729_ENC_ID,
MS_WASAPI_READ_ID,
MS_WASAPI_WRITE_ID
MS_WASAPI_WRITE_ID,
MS_AUDIO_FLOW_CONTROL_ID
} MSFilterId;
#endif
......@@ -42,6 +42,20 @@ MS2_PUBLIC void ms_audio_flow_controller_set_target(MSAudioFlowController *ctl,
MS2_PUBLIC mblk_t *ms_audio_flow_controller_process(MSAudioFlowController *ctl, mblk_t *m);
/**
* Structure carried by MS_AUDIO_FLOW_CONTROL_DROP_EVENT
**/
typedef struct _MSAudioFlowControlDropEvent{
uint32_t flow_control_interval_ms;
uint32_t drop_ms;
} MSAudioFlowControlDropEvent;
/**
* Event sent by the filter each time some samples need to be dropped.
**/
#define MS_AUDIO_FLOW_CONTROL_DROP_EVENT MS_FILTER_EVENT(MS_AUDIO_FLOW_CONTROL_ID, 0, MSAudioFlowControlDropEvent)
#ifdef __cplusplus
}
#endif
......
......@@ -384,6 +384,7 @@ struct _AudioStream
int videopin;
bool_t plumbed;
}av_player;
MSFilter *flowcontrol;
RtpSession *rtp_io_session; /**< The RTP session used for RTP input/output. */
MSFilter *vaddtx;
char *recorder_file;
......@@ -512,6 +513,7 @@ MS2_PUBLIC AudioStream *audio_stream_new_with_sessions(MSFactory* factory, const
#define AUDIO_STREAM_FEATURE_MIXED_RECORDING (1 << 7)
#define AUDIO_STREAM_FEATURE_LOCAL_PLAYING (1 << 8)
#define AUDIO_STREAM_FEATURE_REMOTE_PLAYING (1 << 9)
#define AUDIO_STREAM_FEATURE_FLOW_CONTROL (1 << 10)
#define AUDIO_STREAM_FEATURE_ALL (\
AUDIO_STREAM_FEATURE_PLC | \
......@@ -523,7 +525,8 @@ MS2_PUBLIC AudioStream *audio_stream_new_with_sessions(MSFactory* factory, const
AUDIO_STREAM_FEATURE_DTMF_ECHO |\
AUDIO_STREAM_FEATURE_MIXED_RECORDING |\
AUDIO_STREAM_FEATURE_LOCAL_PLAYING | \
AUDIO_STREAM_FEATURE_REMOTE_PLAYING \
AUDIO_STREAM_FEATURE_REMOTE_PLAYING | \
AUDIO_STREAM_FEATURE_FLOW_CONTROL \
)
......
......@@ -452,19 +452,6 @@ MS2_PUBLIC bool_t ms_filter_implements_interface(MSFilter *f, MSFilterInterfaceI
**/
MS2_PUBLIC bool_t ms_filter_desc_implements_interface(MSFilterDesc *desc, MSFilterInterfaceId id);
/**
* Set a callback on filter's to be informed of private filter's event.
* This callback is called from the filter's MSTicker, unless a global event queue
* is created to receive all filter's notification asynchronously.
* See ms_event_queue_new() for details.
*
* @param f A MSFilter object.
* @param fn A MSFilterNotifyFunc that will be called.
* @param userdata A pointer to private data.
* @deprecated use ms_filter_add_notify_callback()
*
*/
/**
* Set a callback on filter's to be informed of private filter's event.
......
......@@ -156,6 +156,49 @@ MS2_PUBLIC void ms_bufferizer_uninit(MSBufferizer *obj);
MS2_PUBLIC void ms_bufferizer_destroy(MSBufferizer *obj);
struct _MSFlowControlledBufferizer {
MSBufferizer base;
struct _MSFilter *filter;
uint64_t flow_control_time;
uint32_t flow_control_interval_ms;
uint32_t max_size_ms;
int samplerate;
int nchannels;
};
typedef struct _MSFlowControlledBufferizer MSFlowControlledBufferizer;
MS2_PUBLIC MSFlowControlledBufferizer * ms_flow_controlled_bufferizer_new(struct _MSFilter *f, int samplerate, int nchannels);
MS2_PUBLIC void ms_flow_controlled_bufferizer_init(MSFlowControlledBufferizer *obj, struct _MSFilter *f, int samplerate, int nchannels);
MS2_PUBLIC void ms_flow_controlled_bufferizer_set_max_size_ms(MSFlowControlledBufferizer *obj, uint32_t ms);
MS2_PUBLIC void ms_flow_controlled_bufferizer_set_flow_control_interval_ms(MSFlowControlledBufferizer *obj, uint32_t ms);
MS2_PUBLIC void ms_flow_controlled_bufferizer_set_samplerate(MSFlowControlledBufferizer *obj, int samplerate);
MS2_PUBLIC void ms_flow_controlled_bufferizer_set_nchannels(MSFlowControlledBufferizer *obj, int nchannels);
MS2_PUBLIC void ms_flow_controlled_bufferizer_put(MSFlowControlledBufferizer *obj, mblk_t *m);
MS2_PUBLIC void ms_flow_controlled_bufferizer_put_from_queue(MSFlowControlledBufferizer *obj, MSQueue *q);
#define ms_flow_controlled_bufferizer_read(obj, data, datalen) ms_bufferizer_read((MSBufferizer *)(obj), data, datalen)
#define ms_flow_controlled_bufferizer_fill_current_metas(obj, m) ms_bufferizer_fill_current_metas((MSBufferizer *)(obj), m)
#define ms_flow_controlled_bufferizer_get_avail(obj) ms_bufferizer_get_avail((MSBufferizer *)(obj))
#define ms_flow_controlled_bufferizer_skip_bytes(obj, bytes) ms_bufferizer_skip_bytes((MSBufferizer *)(obj), bytes)
#define ms_flow_controlled_bufferizer_flush(obj) ms_bufferizer_flush((MSBufferizer *)(obj))
#define ms_flow_controlled_bufferizer_uninit(obj) ms_bufferizer_uninit((MSBufferizer *)(obj))
#define ms_flow_controlled_bufferizer_destroy(obj) ms_bufferizer_destroy((MSBufferizer *)(obj))
#ifdef __cplusplus
}
#endif
......
......@@ -19,8 +19,11 @@
*/
#include "mediastreamer2/mscommon.h"
#include "mediastreamer2/msfilter.h"
#include "mediastreamer2/msticker.h"
#include "mediastreamer2/flowcontrol.h"
void ms_audio_flow_controller_init(MSAudioFlowController *ctl)
{
ctl->target_samples = 0;
......@@ -73,8 +76,12 @@ static void discard_well_choosed_samples(mblk_t *m, int nsamples, int todrop)
}
}
static bool_t ms_audio_flow_controller_running(MSAudioFlowController *ctl) {
return (ctl->total_samples > 0) && (ctl->target_samples > 0);
}
mblk_t *ms_audio_flow_controller_process(MSAudioFlowController *ctl, mblk_t *m){
if (ctl->total_samples > 0 && ctl->target_samples > 0) {
if (ms_audio_flow_controller_running(ctl)) {
int nsamples = (int)((m->b_wptr - m->b_rptr) / 2);
int th_dropped;
int todrop;
......@@ -98,3 +105,142 @@ mblk_t *ms_audio_flow_controller_process(MSAudioFlowController *ctl, mblk_t *m){
}
return m;
}
typedef struct MSAudioFlowControlState {
MSAudioFlowController afc;
int samplerate;
int nchannels;
} MSAudioFlowControlState;
static void ms_audio_flow_control_init(MSFilter *f) {
MSAudioFlowControlState *s = ms_new0(MSAudioFlowControlState, 1);
f->data = s;
}
static void ms_audio_flow_control_preprocess(MSFilter *f) {
MSAudioFlowControlState *s = (MSAudioFlowControlState *)f->data;
ms_audio_flow_controller_init(&s->afc);
}
static void ms_audio_flow_control_process(MSFilter *f) {
MSAudioFlowControlState *s = (MSAudioFlowControlState *)f->data;
mblk_t *m;
while((m = ms_queue_get(f->inputs[0])) != NULL) {
m = ms_audio_flow_controller_process(&s->afc, m);
if (m) {
ms_queue_put(f->outputs[0], m);
}
}
}
static void ms_audio_flow_control_postprocess(MSFilter *f) {
}
static void ms_audio_flow_control_uninit(MSFilter *f) {
MSAudioFlowControlState *s = (MSAudioFlowControlState *)f->data;
ms_free(s);
}
void ms_audio_flow_control_event_handler(void *user_data, MSFilter *source, unsigned int event, void *eventdata) {
if (event == MS_AUDIO_FLOW_CONTROL_DROP_EVENT) {
MSFilter *f = (MSFilter *)user_data;
MSAudioFlowControlState *s = (MSAudioFlowControlState *)f->data;
MSAudioFlowControlDropEvent *ev = (MSAudioFlowControlDropEvent *)eventdata;
if (!ms_audio_flow_controller_running(&s->afc)) {
ms_warning("Too much buffered audio signal, throwing out %u ms", ev->drop_ms);
ms_audio_flow_controller_set_target(&s->afc,
(ev->drop_ms * s->samplerate * s->nchannels) / 1000,
(ev->flow_control_interval_ms * s->samplerate * s->nchannels) / 1000);
}
}
}
static int ms_audio_flow_control_set_sample_rate(MSFilter *f, void *arg) {
MSAudioFlowControlState *s = (MSAudioFlowControlState *)f->data;
s->samplerate = *((int *)arg);
return 0;
}
static int ms_audio_flow_control_get_sample_rate(MSFilter *f, void *arg) {
MSAudioFlowControlState *s = (MSAudioFlowControlState *)f->data;
*((int *)arg) = s->samplerate;
return 0;
}
static int ms_audio_flow_control_set_nchannels(MSFilter *f, void *arg) {
MSAudioFlowControlState *s = (MSAudioFlowControlState *)f->data;
s->nchannels = *((int *)arg);
return 0;
}
static int ms_audio_flow_control_get_nchannels(MSFilter *f, void *arg) {
MSAudioFlowControlState *s = (MSAudioFlowControlState *)f->data;
*((int *)arg) = s->nchannels;
return 0;
}
static MSFilterMethod ms_audio_flow_control_methods[] = {
{ MS_FILTER_SET_SAMPLE_RATE, ms_audio_flow_control_set_sample_rate },
{ MS_FILTER_GET_SAMPLE_RATE, ms_audio_flow_control_get_sample_rate },
{ MS_FILTER_SET_NCHANNELS, ms_audio_flow_control_set_nchannels },
{ MS_FILTER_GET_NCHANNELS, ms_audio_flow_control_get_nchannels }
};
#define MS_AUDIO_FLOW_CONTROL_NAME "MSAudioFlowControl"
#define MS_AUDIO_FLOW_CONTROL_DESCRIPTION "Flow control filter to drop sample in the audio graph if too many samples are queued."
#define MS_AUDIO_FLOW_CONTROL_CATEGORY MS_FILTER_OTHER
#define MS_AUDIO_FLOW_CONTROL_ENC_FMT NULL
#define MS_AUDIO_FLOW_CONTROL_NINPUTS 1
#define MS_AUDIO_FLOW_CONTROL_NOUTPUTS 1
#define MS_AUDIO_FLOW_CONTROL_FLAGS 0
#ifdef _MSC_VER
MSFilterDesc ms_audio_flow_control_desc = {
MS_AUDIO_FLOW_CONTROL_ID,
MS_AUDIO_FLOW_CONTROL_NAME,
MS_AUDIO_FLOW_CONTROL_DESCRIPTION,
MS_AUDIO_FLOW_CONTROL_CATEGORY,
MS_AUDIO_FLOW_CONTROL_ENC_FMT,
MS_AUDIO_FLOW_CONTROL_NINPUTS,
MS_AUDIO_FLOW_CONTROL_NOUTPUTS,
ms_audio_flow_control_init,
ms_audio_flow_control_preprocess,
ms_audio_flow_control_process,
ms_audio_flow_control_postprocess,
ms_audio_flow_control_uninit,
ms_audio_flow_control_methods,
MS_AUDIO_FLOW_CONTROL_FLAGS
};
#else
MSFilterDesc ms_audio_flow_control_desc = {
.id = MS_AUDIO_FLOW_CONTROL_ID,
.name = MS_AUDIO_FLOW_CONTROL_NAME,
.text = MS_AUDIO_FLOW_CONTROL_DESCRIPTION,
.category = MS_AUDIO_FLOW_CONTROL_CATEGORY,
.enc_fmt = MS_AUDIO_FLOW_CONTROL_ENC_FMT,
.ninputs = MS_AUDIO_FLOW_CONTROL_NINPUTS,
.noutputs = MS_AUDIO_FLOW_CONTROL_NOUTPUTS,
.init = ms_audio_flow_control_init,
.preprocess = ms_audio_flow_control_preprocess,
.process = ms_audio_flow_control_process,
.postprocess = ms_audio_flow_control_postprocess,
.uninit = ms_audio_flow_control_uninit,
.methods = ms_audio_flow_control_methods,
.flags = MS_AUDIO_FLOW_CONTROL_FLAGS
};
#endif
MS_FILTER_DESC_EXPORT(ms_audio_flow_control_desc)
......@@ -22,7 +22,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#endif
#include "mediastreamer2/msqueue.h"
#include "mediastreamer2/msticker.h"
#include "mediastreamer2/msvideo.h"
#include "mediastreamer2/flowcontrol.h"
#include <string.h>
......@@ -134,3 +136,67 @@ void ms_bufferizer_destroy(MSBufferizer *obj){
ms_bufferizer_uninit(obj);
ms_free(obj);
}
static const uint32_t flow_control_interval_ms = 5000;
static const uint32_t max_size_ms = 1000;
MSFlowControlledBufferizer * ms_flow_controlled_bufferizer_new(MSFilter *f, int samplerate, int nchannels) {
MSFlowControlledBufferizer *obj = (MSFlowControlledBufferizer *)ms_new0(MSFlowControlledBufferizer, 1);
ms_flow_controlled_bufferizer_init(obj, f, samplerate, nchannels);
return obj;
}
void ms_flow_controlled_bufferizer_init(MSFlowControlledBufferizer *obj, MSFilter *f, int samplerate, int nchannels) {
ms_bufferizer_init(&obj->base);
obj->filter = f;
obj->flow_control_interval_ms = flow_control_interval_ms;
obj->max_size_ms = max_size_ms;
obj->flow_control_time = 0;
obj->samplerate = samplerate;
obj->nchannels = nchannels;
}
void ms_flow_controlled_bufferizer_set_max_size_ms(MSFlowControlledBufferizer *obj, uint32_t ms) {
obj->max_size_ms = ms;
}
void ms_flow_controlled_bufferizer_set_flow_control_interval_ms(MSFlowControlledBufferizer *obj, uint32_t ms) {
obj->flow_control_interval_ms = ms;
}
void ms_flow_controlled_bufferizer_set_samplerate(MSFlowControlledBufferizer *obj, int samplerate) {
obj->samplerate = samplerate;
}
void ms_flow_controlled_bufferizer_set_nchannels(MSFlowControlledBufferizer *obj, int nchannels) {
obj->nchannels = nchannels;
}
static void control_flow(MSFlowControlledBufferizer *obj) {
uint32_t accumulated_ms = ((obj->base.size * 1000) / obj->samplerate) / obj->nchannels;
if (obj->flow_control_time == 0) {
obj->flow_control_time = obj->filter->ticker->time;
}
if ((accumulated_ms > obj->max_size_ms)
&& (((uint32_t)(obj->filter->ticker->time - obj->flow_control_time)) >= obj->flow_control_interval_ms)) {
MSAudioFlowControlDropEvent ev;
ev.flow_control_interval_ms = obj->flow_control_interval_ms;
ev.drop_ms = (accumulated_ms - obj->max_size_ms) + (obj->max_size_ms / 4);
ms_warning("Flow controlled bufferizer of max %u ms buffered %u ms, asking to drop %u ms", obj->max_size_ms, accumulated_ms, ev.drop_ms);
ms_filter_notify(obj->filter, MS_AUDIO_FLOW_CONTROL_DROP_EVENT, &ev);
obj->flow_control_time = obj->filter->ticker->time;
}
}
void ms_flow_controlled_bufferizer_put(MSFlowControlledBufferizer *obj, mblk_t *m) {
ms_bufferizer_put(&obj->base, m);
control_flow(obj);
}
void ms_flow_controlled_bufferizer_put_from_queue(MSFlowControlledBufferizer *obj, MSQueue *q) {
ms_bufferizer_put_from_queue(&obj->base, q);
control_flow(obj);
}
......@@ -64,6 +64,7 @@ static void audio_stream_free(AudioStream *stream) {
if (stream->soundread!=NULL) ms_filter_destroy(stream->soundread);
if (stream->soundwrite!=NULL) ms_filter_destroy(stream->soundwrite);
if (stream->dtmfgen!=NULL) ms_filter_destroy(stream->dtmfgen);
if (stream->flowcontrol != NULL) ms_filter_destroy(stream->flowcontrol);
if (stream->plc!=NULL) ms_filter_destroy(stream->plc);
if (stream->ec!=NULL) ms_filter_destroy(stream->ec);
if (stream->volrecv!=NULL) ms_filter_destroy(stream->volrecv);
......@@ -1169,6 +1170,18 @@ int audio_stream_start_from_io(AudioStream *stream, RtpProfile *profile, const c
stream->plc = NULL;
}
if ((stream->features & AUDIO_STREAM_FEATURE_FLOW_CONTROL) != 0) {
stream->flowcontrol = ms_factory_create_filter(stream->ms.factory, MS_AUDIO_FLOW_CONTROL_ID);
if (stream->flowcontrol) {
ms_filter_call_method(stream->flowcontrol, MS_FILTER_SET_NCHANNELS, &nchannels);
ms_filter_call_method(stream->flowcontrol, MS_FILTER_SET_SAMPLE_RATE, &sample_rate);
if (stream->ec) ms_filter_add_notify_callback(stream->ec, ms_audio_flow_control_event_handler, stream->flowcontrol, FALSE);
if (stream->soundwrite) ms_filter_add_notify_callback(stream->soundwrite, ms_audio_flow_control_event_handler, stream->flowcontrol, FALSE);
}
} else {
stream->flowcontrol = NULL;
}
if (stream->features & AUDIO_STREAM_FEATURE_LOCAL_PLAYING){
stream->local_mixer=ms_factory_create_filter(stream->ms.factory, MS_AUDIO_MIXER_ID);
......@@ -1215,6 +1228,8 @@ int audio_stream_start_from_io(AudioStream *stream, RtpProfile *profile, const c
ms_connection_helper_link(&h,stream->ms.decoder,0,0);
if (stream->plc)
ms_connection_helper_link(&h,stream->plc,0,0);
if (stream->flowcontrol)
ms_connection_helper_link(&h, stream->flowcontrol, 0, 0);
if (stream->dtmfgen)
ms_connection_helper_link(&h,stream->dtmfgen,0,0);
if (stream->volrecv)
......@@ -1730,6 +1745,8 @@ void audio_stream_stop(AudioStream * stream){
ms_connection_helper_unlink(&h,stream->ms.decoder,0,0);
if (stream->plc!=NULL)
ms_connection_helper_unlink(&h,stream->plc,0,0);
if (stream->flowcontrol != NULL)
ms_connection_helper_unlink(&h, stream->flowcontrol, 0, 0);
if (stream->dtmfgen!=NULL)
ms_connection_helper_unlink(&h,stream->dtmfgen,0,0);
if (stream->volrecv!=NULL)
......
......@@ -104,6 +104,7 @@ void register_video_preset_high_fps(MSVideoPresetsManager *manager);
MSFilter *_ms_create_av_player(const char *filename, MSFactory* factory);
void video_recorder_handle_event(void *userdata, MSFilter *recorder, unsigned int event, void *event_arg);
void ms_audio_flow_control_event_handler(void *user_data, MSFilter *f, unsigned int event, void *eventdata);
#ifdef __cplusplus
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment