Commit 6648fe96 authored by François Grisez's avatar François Grisez Committed by Ghislain MARY

Flow control for the Pulseaudio filter

parent d6c10705
......@@ -298,8 +298,10 @@ typedef struct _Stream{
char *dev;
double init_volume;
uint64_t last_stats;
uint64_t last_flowcontrol_op;
int underflow_notifs;
int overflow_notifs;
int min_buffer_size;
}Stream;
static void stream_disconnect(Stream *s);
......@@ -351,9 +353,14 @@ static size_t stream_play(Stream *s, size_t nbytes) {
if (ms_bufferizer_get_avail(&s->bufferizer) >= nbytes){
uint8_t *data;
int buffer_size;
data = ms_new(uint8_t, nbytes);
ms_mutex_lock(&s->mutex);
ms_bufferizer_read(&s->bufferizer, data, nbytes);
buffer_size = ms_bufferizer_get_avail(&s->bufferizer);
if(s->min_buffer_size == -1 || buffer_size < s->min_buffer_size) {
s->min_buffer_size = buffer_size;
}
ms_mutex_unlock(&s->mutex);
pa_stream_write(s->stream, data, nbytes, ms_free, 0, PA_SEEK_RELATIVE);
}
......@@ -390,6 +397,8 @@ static bool_t stream_connect(Stream *s) {
pa_buffer_attr attr;
pa_cvolume volume, *volume_ptr = NULL;
s->min_buffer_size = -1;
attr.maxlength = -1;
attr.fragsize = pa_usec_to_bytes(targeted_latency * 1000, &s->sampleSpec);
attr.tlength = attr.fragsize;
......@@ -672,6 +681,9 @@ static MSFilterDesc pulse_read_desc={
};
static const int flow_control_op_interval = 1000;
static const int flow_control_threshold = 20; // ms
typedef Stream PlaybackStream;
static void pulse_write_init(MSFilter *f){
......@@ -688,6 +700,7 @@ static void pulse_write_preprocess(MSFilter *f) {
ms_error("Pulseaudio: fail to connect playback stream");
}
s->last_stats = (uint64_t)-1;
s->last_flowcontrol_op = (uint64_t)-1;
}
static void pulse_write_process(MSFilter *f){
......@@ -703,9 +716,10 @@ static void pulse_write_process(MSFilter *f){
nwritable = pa_stream_writable_size(s->stream);
stream_play(s, nwritable);
pa_threaded_mainloop_unlock(pa_loop);
if (s->last_stats == (uint64_t)-1){
if (s->last_stats == (uint64_t)-1) {
s->last_stats = f->ticker->time;
}else if (f->ticker->time - s->last_stats >= 5000) {
} else if (f->ticker->time - s->last_stats >= 5000) {
pa_usec_t latency;
int is_negative;
int err;
......@@ -722,6 +736,21 @@ static void pulse_write_process(MSFilter *f){
s->overflow_notifs = 0;
}
}
if (s->last_flowcontrol_op == (uint64_t)-1) {
s->last_flowcontrol_op = f->ticker->time;
} else if(f->ticker->time - s->last_flowcontrol_op >= flow_control_op_interval) {
size_t threshold_bytes = pa_usec_to_bytes(flow_control_threshold * 1000, &s->sampleSpec);
ms_mutex_lock(&s->mutex);
if(s->min_buffer_size >= threshold_bytes) {
size_t nbytes_to_drop = s->min_buffer_size - threshold_bytes/4;
ms_warning("pulseaudio: too much data waiting in the writing buffer. Droping %z bytes", nbytes_to_drop);
ms_bufferizer_skip_bytes(&s->bufferizer, nbytes_to_drop);
s->min_buffer_size = -1;
}
ms_mutex_unlock(&s->mutex);
s->last_flowcontrol_op = f->ticker->time;
}
} else {
ms_queue_flush(f->inputs[0]);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment