Commit b88153ef authored by François Grisez's avatar François Grisez
Browse files

Refacoring of pulseaudio.c

+ Stream attributes set to NULL for playback_streams (fixed latency issues).
+ preprocess routine wait that stream is connected before return (fixe "bad state" errors).
+ Active waitings have been replaced by passive waitings.
parent c0d2b8f2
......@@ -22,8 +22,6 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#include <pulse/pulseaudio.h>
static const int latency_req=40; // ms
static void init_pulse_context();
static void pulse_card_detect(MSSndCardManager *m);
static MSFilter *pulse_card_create_reader(MSSndCard *card);
......@@ -53,15 +51,16 @@ static void pulse_card_detect(MSSndCardManager *m){
MSSndCard *card=ms_snd_card_new(&pulse_card_desc);
if (card!=NULL){
card->name=ms_strdup("default");
card->capabilities=MS_SND_CARD_CAP_CAPTURE|MS_SND_CARD_CAP_PLAYBACK;
card->capabilities = MS_SND_CARD_CAP_CAPTURE | MS_SND_CARD_CAP_PLAYBACK;
ms_snd_card_manager_add_card(m,card);
init_pulse_context();
}
}
static pa_context *context=NULL;
static pa_context_state_t contextState = PA_CONTEXT_UNCONNECTED;
static pa_threaded_mainloop *pa_loop=NULL;
static bool_t pa_ready=FALSE;
static const int fragtime = 20;/*ms*/
static void uninit_pulse_context(){
pa_context_disconnect(context);
......@@ -69,10 +68,10 @@ static void uninit_pulse_context(){
pa_threaded_mainloop_stop(pa_loop);
}
static void state_notify(pa_context *ctx, void *userdata){
pa_context_state_t state=pa_context_get_state(ctx);
static void context_state_notify_cb(pa_context *ctx, void *userdata){
contextState=pa_context_get_state(ctx);
const char *sname="";
switch (state){
switch (contextState){
case PA_CONTEXT_UNCONNECTED:
sname="PA_CONTEXT_UNCONNECTED";
break;
......@@ -87,7 +86,6 @@ static void state_notify(pa_context *ctx, void *userdata){
break;
case PA_CONTEXT_READY:
sname="PA_CONTEXT_READY";
pa_ready=TRUE;
break;
case PA_CONTEXT_FAILED:
sname="PA_CONTEXT_FAILED";
......@@ -97,129 +95,206 @@ static void state_notify(pa_context *ctx, void *userdata){
break;
}
ms_message("New PulseAudio context state: %s",sname);
pa_threaded_mainloop_signal(pa_loop, FALSE);
}
static void init_pulse_context(){
if (context==NULL){
pa_loop=pa_threaded_mainloop_new();
context=pa_context_new(pa_threaded_mainloop_get_api(pa_loop),NULL);
pa_context_set_state_callback(context,state_notify,NULL);
pa_context_connect(context,NULL,0,NULL);
pa_context_set_state_callback(context,context_state_notify_cb,NULL);
pa_context_connect(context, NULL, 0, NULL);
pa_threaded_mainloop_start(pa_loop);
atexit(uninit_pulse_context);
}
}
static void check_pulse_ready(){
int i=0;
while(pa_ready==FALSE && i<10){
usleep(20000);
static bool_t wait_for_context_state(pa_context_state_t successState, pa_context_state_t failureState){
pa_threaded_mainloop_lock(pa_loop);
while(contextState != successState && contextState != failureState) {
pa_threaded_mainloop_wait(pa_loop);
}
pa_threaded_mainloop_unlock(pa_loop);
return contextState == successState;
}
typedef struct _PulseReadState{
typedef enum _StreamType {
STREAM_TYPE_PLAYBACK,
STREAM_TYPE_RECORD
} StreamType;
typedef struct _Stream{
pa_sample_spec sampleSpec;
pa_stream *stream;
}PulseReadState;
pa_stream_state_t state;
}Stream;
static void pulse_read_init(MSFilter *f){
PulseReadState *s=ms_new0(PulseReadState,1);
typedef Stream RecordStream;
static void stream_state_notify_cb(pa_stream *p, void *userData) {
RecordStream *ctx = (RecordStream *)userData;
ctx->state = pa_stream_get_state(p);
pa_threaded_mainloop_signal(pa_loop, 0);
}
static bool_t stream_wait_for_state(Stream *ctx, pa_stream_state_t successState, pa_stream_state_t failureState) {
pa_threaded_mainloop_lock(pa_loop);
while(ctx->state != successState && ctx->state != failureState) {
pa_threaded_mainloop_wait(pa_loop);
}
pa_threaded_mainloop_unlock(pa_loop);
return ctx->state == successState;
}
static void stream_init(Stream *s) {
s->sampleSpec.format = PA_SAMPLE_S16LE;
s->sampleSpec.channels=1;
s->sampleSpec.rate=8000;
f->data=s;
check_pulse_ready();
s->stream = NULL;
s->state = PA_STREAM_UNCONNECTED;
}
static void pulse_read_preprocess(MSFilter *f){
PulseReadState *s=(PulseReadState *)f->data;
static void stream_connect(Stream *s, StreamType type, pa_buffer_attr *attr) {
int err;
pa_buffer_attr attr={0};
uint32_t fragsize;
if (context != NULL) {
s->stream=pa_stream_new(context,"phone",&s->sampleSpec,NULL);
if (s->stream != NULL){
pa_stream_set_state_callback(s->stream, stream_state_notify_cb, s);
pa_threaded_mainloop_lock(pa_loop);
if(type == STREAM_TYPE_PLAYBACK) {
err=pa_stream_connect_playback(s->stream,NULL,attr, PA_STREAM_ADJUST_LATENCY, NULL, NULL);
} else {
err=pa_stream_connect_record(s->stream,NULL,attr, PA_STREAM_ADJUST_LATENCY);
}
pa_threaded_mainloop_unlock(pa_loop);
if (err!=0 || !stream_wait_for_state(s, PA_STREAM_READY, PA_STREAM_FAILED)) {
ms_error("fails to connect PulseAudio stream");
}
} else {
ms_error("fails to create PulseAudio stream");
}
} else {
ms_error("No PulseAudio context");
}
}
if (context==NULL) return;
static void stream_disconnect(Stream *s) {
int err;
if (s->stream) {
pa_threaded_mainloop_lock(pa_loop);
err = pa_stream_disconnect(s->stream);
pa_threaded_mainloop_unlock(pa_loop);
if(err!=0 || !stream_wait_for_state(s, PA_STREAM_TERMINATED, PA_STREAM_FAILED)) {
ms_error("pa_stream_disconnect() failed");
}
pa_stream_unref(s->stream);
s->stream = NULL;
}
}
fragsize=pa_usec_to_bytes(latency_req * 1000, &s->sampleSpec);
attr.maxlength=-1;
attr.tlength=fragsize;
attr.prebuf=0;
attr.minreq=-1;
attr.fragsize=fragsize;
s->stream=pa_stream_new(context,"phone",&s->sampleSpec,NULL);
if (s->stream==NULL){
ms_error("pa_stream_new() failed: %s",pa_strerror(pa_context_errno(context)));
return;
static void pulse_read_init(MSFilter *f){
if(wait_for_context_state(PA_CONTEXT_READY, PA_CONTEXT_FAILED)) {
RecordStream *s=ms_new0(RecordStream,1);
stream_init(s);
f->data=s;
} else {
f->data = NULL;
ms_error("Could not connect to a pulseaudio server");
}
pa_threaded_mainloop_lock(pa_loop);
err=pa_stream_connect_record(s->stream,NULL,&attr, PA_STREAM_ADJUST_LATENCY);
pa_threaded_mainloop_unlock(pa_loop);
if (err!=0){
ms_error("pa_stream_connect_record() failed");
}
static void pulse_read_preprocess(MSFilter *f){
if(f->data != NULL) {
RecordStream *s=(RecordStream *)f->data;
pa_buffer_attr attr;
attr.maxlength = -1;
attr.fragsize = pa_usec_to_bytes(fragtime * 1000, &s->sampleSpec);
attr.tlength = -1;
attr.minreq = -1;
attr.prebuf = -1;
stream_connect(s, STREAM_TYPE_RECORD, &attr);
}
}
static void pulse_read_process(MSFilter *f){
PulseReadState *s=(PulseReadState *)f->data;
const void *buffer=NULL;
size_t nbytes=0;
if (s->stream!=NULL){
pa_threaded_mainloop_lock(pa_loop);
while (pa_stream_peek(s->stream,&buffer,&nbytes)==0 && nbytes>0){
mblk_t *om;
om=allocb(nbytes,0);
memcpy(om->b_wptr,buffer,nbytes);
om->b_wptr+=nbytes;
ms_queue_put(f->outputs[0],om);
nbytes=0;
pa_stream_drop(s->stream);
if(f->data != NULL) {
RecordStream *s=(RecordStream *)f->data;
const void *buffer=NULL;
size_t nbytes;
if (s->stream!=NULL){
pa_threaded_mainloop_lock(pa_loop);
while(pa_stream_readable_size(s->stream) > 0) {
if(pa_stream_peek(s->stream, &buffer, &nbytes) >= 0) {
if(buffer != NULL) {
mblk_t *om = allocb(nbytes, 0);
memcpy(om->b_wptr, buffer, nbytes);
om->b_wptr += nbytes;
ms_queue_put(f->outputs[0], om);
}
if(nbytes > 0) {
pa_stream_drop(s->stream);
}
} else {
ms_error("pa_stream_peek() failed");
}
}
pa_threaded_mainloop_unlock(pa_loop);
}
pa_threaded_mainloop_unlock(pa_loop);
}
}
static void pulse_read_postprocess(MSFilter *f){
PulseReadState *s=(PulseReadState *)f->data;
if (s->stream) {
pa_threaded_mainloop_lock(pa_loop);
pa_stream_disconnect(s->stream);
pa_stream_unref(s->stream);
pa_threaded_mainloop_unlock(pa_loop);
if(f->data != NULL) {
RecordStream *s=(RecordStream *)f->data;
stream_disconnect(s);
}
}
static void pulse_read_uninit(MSFilter *f) {
ms_free(f->data);
if(f->data != NULL) ms_free(f->data);
}
static int pulse_read_set_sr(MSFilter *f, void *arg){
PulseReadState *s=(PulseReadState *)f->data;
s->sampleSpec.rate=*(int*)arg;
return 0;
if(f->data != NULL) {
RecordStream *s=(RecordStream *)f->data;
s->sampleSpec.rate=*(int*)arg;
return 0;
} else {
return -1;
}
}
static int pulse_read_get_sr(MSFilter *f, void *arg){
PulseReadState *s=(PulseReadState *)f->data;
*(int*)arg=s->sampleSpec.rate;
return 0;
if(f->data != NULL) {
RecordStream *s=(RecordStream *)f->data;
*(int*)arg=s->sampleSpec.rate;
return 0;
} else {
return -1;
}
}
static int pulse_read_set_nchannels(MSFilter *f, void *arg){
PulseReadState *s=(PulseReadState *)f->data;
s->sampleSpec.channels=*(int*)arg;
return 0;
if(f->data != NULL) {
RecordStream *s=(RecordStream *)f->data;
s->sampleSpec.channels=*(int*)arg;
return 0;
} else {
return -1;
}
}
static int pulse_read_get_nchannels(MSFilter *f, void *arg){
PulseReadState *s=(PulseReadState *)f->data;
*(int*)arg=s->sampleSpec.channels;
return 0;
if(f->data != NULL) {
RecordStream *s=(RecordStream *)f->data;
*(int*)arg=s->sampleSpec.channels;
return 0;
} else {
return -1;
}
}
static MSFilterMethod pulse_read_methods[]={
......@@ -246,80 +321,59 @@ static MSFilterDesc pulse_read_desc={
};
typedef struct _PulseReadState PulseWriteState;
typedef Stream PlaybackStream;
static void pulse_write_init(MSFilter *f){
PulseWriteState *s=ms_new0(PulseWriteState,1);
s->sampleSpec.format = PA_SAMPLE_S16LE;
s->sampleSpec.rate=8000;
s->sampleSpec.channels=1;
f->data=s;
check_pulse_ready();
if(wait_for_context_state(PA_CONTEXT_READY, PA_CONTEXT_FAILED)) {
PlaybackStream *s=ms_new0(PlaybackStream,1);
stream_init(s);
f->data=s;
} else {
f->data = NULL;
ms_error("Could not connect to a pulseaudio server");
}
}
static void pulse_write_preprocess(MSFilter *f){
PulseWriteState *s=(PulseWriteState*)f->data;
int err;
pa_buffer_attr attr={0};
uint32_t fragsize;
if (context==NULL) return;
fragsize = pa_usec_to_bytes(latency_req * 1000, &s->sampleSpec);
attr.maxlength=fragsize*3;
attr.tlength=fragsize;
attr.prebuf=-1;
attr.minreq=-1;
attr.fragsize=-1;
s->stream=pa_stream_new(context,"phone",&s->sampleSpec,NULL);
if (s->stream==NULL){
ms_error("pa_stream_new() failed: %s",pa_strerror(pa_context_errno(context)));
return;
}
pa_threaded_mainloop_lock(pa_loop);
err=pa_stream_connect_playback(s->stream,NULL,&attr, PA_STREAM_ADJUST_LATENCY,NULL,NULL);
pa_threaded_mainloop_unlock(pa_loop);
if (err!=0){
ms_error("pa_stream_connect_playback() failed");
if(f->data != NULL) {
PlaybackStream *s=(PlaybackStream*)f->data;
stream_connect(s, STREAM_TYPE_PLAYBACK, NULL);
}
}
static void pulse_write_process(MSFilter *f){
PulseWriteState *s=(PulseWriteState*)f->data;
mblk_t *im;
while((im=ms_queue_get(f->inputs[0]))!=NULL){
int bsize=msgdsize(im);
if (s->stream){
int err;
int writable;
pa_threaded_mainloop_lock(pa_loop);
writable=pa_stream_writable_size(s->stream);
if (writable>=0 && writable<bsize){
pa_stream_flush(s->stream,NULL,NULL);
ms_warning("pa_stream_writable_size(): not enough space, flushing");
}else if ((err=pa_stream_write(s->stream,im->b_rptr,bsize,NULL,0,PA_SEEK_RELATIVE))!=0){
ms_error("pa_stream_write(): %s",pa_strerror(err));
if(f->data != NULL) {
PlaybackStream *s=(PlaybackStream*)f->data;
mblk_t *im;
while((im=ms_queue_get(f->inputs[0]))!=NULL){
int bsize=msgdsize(im);
if (s->stream){
int err;
int writable;
pa_threaded_mainloop_lock(pa_loop);
writable=pa_stream_writable_size(s->stream);
if (writable>=0 && writable<bsize){
pa_stream_flush(s->stream,NULL,NULL);
ms_warning("pa_stream_writable_size(): not enough space, flushing");
}else if ((err=pa_stream_write(s->stream,im->b_rptr,bsize,NULL,0,PA_SEEK_RELATIVE))!=0){
ms_error("pa_stream_write(): %s",pa_strerror(err));
}
pa_threaded_mainloop_unlock(pa_loop);
}
pa_threaded_mainloop_unlock(pa_loop);
freemsg(im);
}
freemsg(im);
}
}
static void pulse_write_postprocess(MSFilter *f){
PulseWriteState *s=(PulseWriteState*)f->data;
if (s->stream) {
pa_threaded_mainloop_lock(pa_loop);
pa_stream_disconnect(s->stream);
pa_stream_unref(s->stream);
pa_threaded_mainloop_unlock(pa_loop);
if(f->data != NULL) {
PlaybackStream *s=(PlaybackStream*)f->data;
stream_disconnect(s);
}
}
static void pulse_write_uninit(MSFilter *f) {
ms_free(f->data);
if(f->data != NULL) ms_free(f->data);
}
static MSFilterDesc pulse_write_desc={
......@@ -337,13 +391,11 @@ static MSFilterDesc pulse_write_desc={
.methods=pulse_read_methods
};
static MSFilter *pulse_card_create_reader(MSSndCard *card)
{
static MSFilter *pulse_card_create_reader(MSSndCard *card) {
return ms_filter_new_from_desc (&pulse_read_desc);
}
static MSFilter *pulse_card_create_writer(MSSndCard *card)
{
static MSFilter *pulse_card_create_writer(MSSndCard *card) {
return ms_filter_new_from_desc (&pulse_write_desc);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment