Commit 2cff42a6 authored by Simon Morlat's avatar Simon Morlat

make audio file player and writer using asynchronous read/write operations.

This avoids blocking the process() function if the read() or write() had to block.
Blocking read/write can happen when running mediastreamer2 in a server application handling several hundreds of file streaming/recording operations.
parent e82774e4
......@@ -35,6 +35,8 @@ struct _MSAsyncReader{
bool_t eof;
};
static void async_reader_fill(void *data);
MSAsyncReader *ms_async_reader_new(int fd){
MSAsyncReader *obj = ms_new0(MSAsyncReader,1);
ms_mutex_init(&obj->mutex, NULL);
......@@ -46,6 +48,9 @@ MSAsyncReader *ms_async_reader_new(int fd){
#else
obj->blocksize = 4096;
#endif
/*immediately start filling the reader */
obj->ntasks_pending++;
ms_worker_thread_add_task(obj->wth, async_reader_fill, obj);
return obj;
}
......@@ -59,6 +64,7 @@ void ms_async_reader_destroy(MSAsyncReader *obj){
static void async_reader_fill(void *data){
MSAsyncReader *obj = (MSAsyncReader*) data;
mblk_t *m = allocb(obj->blocksize, 0);
int err = (int)bctbx_read(obj->fd, m->b_wptr, obj->blocksize);
ms_mutex_lock(&obj->mutex);
if (err >= 0){
......@@ -79,27 +85,27 @@ static void async_reader_fill(void *data){
int ms_async_reader_read(MSAsyncReader *obj, uint8_t *buf, size_t size){
int err;
size_t ret;
size_t avail;
ms_mutex_lock(&obj->mutex);
if (obj->moving){
err = -EWOULDBLOCK;
goto end;
}
ret = ms_bufferizer_get_avail(&obj->buf);
if (ret < size && obj->ntasks_pending){
avail = ms_bufferizer_get_avail(&obj->buf);
if (avail < size && obj->ntasks_pending){
err = -EWOULDBLOCK;
goto end;
}
/*eventually ask to fill the bufferizer*/
if (obj->ntasks_pending == 0){
if (ret < obj->blocksize){
if (avail < obj->blocksize){
obj->ntasks_pending++;
ms_worker_thread_add_task(obj->wth, async_reader_fill, obj);
}
}
/*and finally return the datas*/
err = (int)ms_bufferizer_read(&obj->buf, buf, size);
err = (int)ms_bufferizer_read(&obj->buf, buf, MIN(size, avail));
end:
ms_mutex_unlock(&obj->mutex);
return err;
......@@ -111,7 +117,6 @@ static void async_reader_seek(void *data){
if (lseek(obj->fd, obj->seekoff, SEEK_SET) == -1){
ms_error("async_reader_seek() seek failed : %s", strerror(errno));
}
obj->ntasks_pending--;
obj->moving--;
ms_bufferizer_flush(&obj->buf);
ms_mutex_unlock(&obj->mutex);
......
......@@ -58,7 +58,7 @@ static SoundDeviceDescription devices[]={
{ "LGE", "LS670", "", 0, 170 },
{ "LGE", "Nexus 5", "msm8974", 0, 0 , 16000 },
{ "LGE", "LG-H815", "msm8992", DEVICE_HAS_BUILTIN_AEC, 0 },
{ "LGE", "LG-H735", "msm8916" DEVICE_HAS_BUILTIN_OPENSLES_AEC, 0, 16000},
{ "motorola", "DROID RAZR", "", 0, 400 },
{ "motorola", "MB860", "", 0, 200 },
{ "motorola", "XT907", "", 0, 500 },
......
......@@ -24,6 +24,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#include "mediastreamer2/msfileplayer.h"
#include "waveheader.h"
#include "mediastreamer2/msticker.h"
#include "asyncrw.h"
#ifdef HAVE_PCAP
#include <pcap/pcap.h>
......@@ -39,6 +40,7 @@ static int player_close(MSFilter *f, void *arg);
struct _PlayerData{
int fd;
MSAsyncReader *reader;
MSPlayerState state;
int rate;
int nchannels;
......@@ -49,6 +51,7 @@ struct _PlayerData{
int samplesize;
char *mime;
uint32_t ts;
int async_read_too_late;
bool_t swap;
bool_t is_raw;
#ifdef HAVE_PCAP
......@@ -190,6 +193,7 @@ static int player_open(MSFilter *f, void *arg){
d->state=MSPlayerPaused;
d->fd=fd;
d->ts=0;
d->async_read_too_late = 0;
#ifdef HAVE_PCAP
d->pcap = NULL;
d->pcap_started = FALSE;
......@@ -207,6 +211,7 @@ static int player_open(MSFilter *f, void *arg){
if (read_wav_header(d)!=0 && strstr(file,".wav")){
ms_warning("File %s has .wav extension but wav header could be found.",file);
}
d->reader = ms_async_reader_new(d->fd);
ms_filter_notify_no_arg(f,MS_FILTER_OUTPUT_FMT_CHANGED);
ms_message("MSFilePlayer[%p]: %s opened: rate=%i,channel=%i",f,file,d->rate,d->nchannels);
return 0;
......@@ -224,7 +229,7 @@ static int player_stop(MSFilter *f, void *arg){
ms_filter_lock(f);
if (d->state!=MSPlayerClosed){
d->state=MSPlayerPaused;
lseek(d->fd,d->hsize,SEEK_SET);
if (d->reader) ms_async_reader_seek(d->reader, d->hsize);
}
ms_filter_unlock(f);
return 0;
......@@ -246,9 +251,16 @@ static int player_close(MSFilter *f, void *arg){
#ifdef HAVE_PCAP
if (d->pcap) pcap_close(d->pcap);
#endif
if (d->reader){
ms_async_reader_destroy(d->reader);
d->reader = NULL;
}
if (d->fd!=-1) close(d->fd);
d->fd=-1;
d->state=MSPlayerClosed;
if (d->async_read_too_late > 0){
ms_warning("MSFilePlayer[%p] had %i late read events.", f, d->async_read_too_late);
}
return 0;
}
......@@ -365,10 +377,10 @@ static void player_process(MSFilter *f){
memset(om->b_wptr,0,bytes);
d->pause_time-=f->ticker->interval;
}else{
err=read(d->fd,om->b_wptr,bytes);
if (d->swap) swap_bytes(om->b_wptr,bytes);
err = ms_async_reader_read(d->reader, om->b_wptr, bytes);
}
if (err>=0){
if (d->swap) swap_bytes(om->b_wptr,bytes);
if (err!=0){
if (err<bytes)
memset(om->b_wptr+err,0,bytes-err);
......@@ -378,8 +390,7 @@ static void player_process(MSFilter *f){
ms_queue_put(f->outputs[0],om);
}else freemsg(om);
if (err<bytes){
lseek(d->fd,d->hsize,SEEK_SET);
ms_async_reader_seek(d->reader, d->hsize);
/* special value for playing file only once */
if (d->loop_after<0){
......@@ -392,7 +403,9 @@ static void player_process(MSFilter *f){
ms_filter_notify_no_arg(f,MS_FILE_PLAYER_EOF);
}
}else{
ms_warning("Fail to read %i bytes: %s",bytes,strerror(errno));
if (err != -EWOULDBLOCK) ms_warning("MSFilePlayer[%p]: fail to read %i bytes.",f, bytes);
else d->async_read_too_late++;
freemsg(om);
}
}
}
......
......@@ -23,6 +23,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#include "mediastreamer2/msfilerec.h"
#include "waveheader.h"
#include "asyncrw.h"
static int rec_close(MSFilter *f, void *arg);
......@@ -35,8 +36,9 @@ typedef struct RecState{
int size;
int max_size;
char *mime;
bool_t swap;
MSAsyncWriter *writer;
MSRecorderState state;
bool_t swap;
} RecState;
static void rec_init(MSFilter *f){
......@@ -52,6 +54,8 @@ static void rec_init(MSFilter *f){
f->data=s;
}
static void _rec_close(RecState *s);
static void swap_bytes(unsigned char *bytes, int len){
int i;
unsigned char tmp;
......@@ -65,40 +69,28 @@ static void swap_bytes(unsigned char *bytes, int len){
static void rec_process(MSFilter *f){
RecState *s=(RecState*)f->data;
mblk_t *m;
int err;
ms_mutex_lock(&f->lock);
while((m=ms_queue_get(f->inputs[0]))!=NULL){
mblk_t *it=m;
ms_mutex_lock(&f->lock);
if (s->state==MSRecorderRunning){
while(it!=NULL){
int len=(int)(it->b_wptr-it->b_rptr);
int max_size_reached = 0;
if (s->max_size!=0 && s->size+len > s->max_size) {
len = s->max_size - s->size;
max_size_reached = 1;
}
if (s->swap) swap_bytes(it->b_wptr,len);
if ((err=write(s->fd,it->b_rptr,len))!=len){
if (err<0)
ms_warning("MSFileRec: fail to write %i bytes: %s",len,strerror(errno));
}
it=it->b_cont;
s->size+=len;
if (max_size_reached) {
ms_warning("MSFileRec: Maximum size (%d) has been reached. closing file.",s->max_size);
s->state=MSRecorderClosed;
if (s->fd!=-1){
write_wav_header(s->fd, s->rate, s->nchannels, s->size);
close(s->fd);
s->fd=-1;
}
ms_filter_notify_no_arg(f,MS_RECORDER_MAX_SIZE_REACHED);
}
int len=(int)(m->b_wptr-m->b_rptr);
int max_size_reached = 0;
if (s->max_size!=0 && s->size+len > s->max_size) {
len = s->max_size - s->size;
max_size_reached = 1;
}
}
ms_mutex_unlock(&f->lock);
freemsg(m);
if (s->swap) swap_bytes(m->b_wptr,len);
ms_async_reader_write(s->writer,m);
s->size+=len;
if (max_size_reached) {
ms_warning("MSFileRec: Maximum size (%d) has been reached. closing file.",s->max_size);
_rec_close(s);
ms_filter_notify_no_arg(f,MS_RECORDER_MAX_SIZE_REACHED);
}
}else freemsg(m);
}
ms_mutex_unlock(&f->lock);
}
static int rec_get_length(const char *file, int *length){
......@@ -145,6 +137,7 @@ static int rec_open(MSFilter *f, void *arg){
}else ms_error("fstat() failed: %s",strerror(errno));
}
ms_message("MSFileRec: recording into %s",filename);
s->writer = ms_async_writer_new(s->fd);
ms_mutex_lock(&f->lock);
s->state=MSRecorderPaused;
ms_mutex_unlock(&f->lock);
......@@ -194,15 +187,21 @@ static void write_wav_header(int fd, int rate, int nchannels, int size){
}
}
static int rec_close(MSFilter *f, void *arg){
RecState *s=(RecState*)f->data;
ms_mutex_lock(&f->lock);
static void _rec_close(RecState *s){
s->state=MSRecorderClosed;
if (s->fd!=-1){
ms_async_writer_destroy(s->writer);
s->writer = NULL;
write_wav_header(s->fd, s->rate, s->nchannels, s->size);
close(s->fd);
s->fd=-1;
}
}
static int rec_close(MSFilter *f, void *arg){
RecState *s=(RecState*)f->data;
ms_mutex_lock(&f->lock);
_rec_close(s);
ms_mutex_unlock(&f->lock);
return 0;
}
......
......@@ -78,7 +78,7 @@ void ms_bufferizer_put_from_queue(MSBufferizer *obj, MSQueue *q){
}
size_t ms_bufferizer_read(MSBufferizer *obj, uint8_t *data, size_t datalen){
if (obj->size>=datalen){
if (obj->size>=datalen && datalen > 0){
/*we can return something */
size_t sz=0;
size_t cplen;
......
......@@ -89,6 +89,7 @@ typedef struct _MediastreamIceCandidate {
} MediastreamIceCandidate;
typedef struct _MediastreamDatas {
MSFactory *factory;
int localport,remoteport,payload;
char ip[64];
char *fmtp;
......@@ -689,7 +690,7 @@ void setup_media_streams(MediastreamDatas* args) {
ortp_set_log_level_mask(ORTP_LOG_DOMAIN, ORTP_MESSAGE|ORTP_WARNING|ORTP_ERROR|ORTP_FATAL);
}
factory = ms_factory_new_with_voip();
args->factory = factory = ms_factory_new_with_voip();
#if TARGET_OS_IPHONE || defined(ANDROID)
#if TARGET_OS_IPHONE || (defined(HAVE_X264) && defined(VIDEO_ENABLED))
......@@ -1117,7 +1118,7 @@ void clear_mediastreams(MediastreamDatas* args) {
if (args->logfile)
fclose(args->logfile);
ms_factory_destroy(args->video->ms.factory);
ms_factory_destroy(args->factory);
}
// ANDROID JNI WRAPPER
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment