Commit 4d7f2571 authored by Simon Morlat's avatar Simon Morlat

add new api to perform asynchrous action (like with dispatch_async() on mac os)

add asynchronous read/write API for use by file player/writer MSFilters (not integrated yet).
parent ea379434
......@@ -72,6 +72,7 @@ set(HEADER_FILES
x11_helper.h
zrtp.h
msrtt4103.h
msasync.h
)
set(MEDIASTREAMER2_HEADER_FILES )
......
......@@ -51,7 +51,8 @@ mediastreamer2_include_HEADERS=allfilters.h \
stun.h \
upnp_igd.h \
x11_helper.h \
zrtp.h
zrtp.h \
msasync.h
EXTRA_DIST=$(mediastreamer2_include_HEADERS)
/*
mediastreamer2 library - modular sound and video processing and streaming
Copyright (C) 2014 Belledonne Communications SARL
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#ifndef msasync_h
#define msasync_h
#include "mediastreamer2/mscommon.h"
#ifdef __cplusplus
extern "C" {
#endif
struct _MSTask;
typedef void (*MSTaskFunc)(void *);
typedef enum _MSTaskState{
MSTaskInit,
MSTaskQueued,
MSTaskRunning,
MSTaskDone /**<the task was executed normally*/
}MSTaskState;
struct _MSTask{
/*state and mutex are overkill for the moment - it's for later managing task cancellation*/
ms_mutex_t mutex;
MSTaskFunc func;
void *data;
MSTaskState state;
};
typedef struct _MSTask MSTask;
struct _MSWorkerThread{
ms_thread_t thread;
ms_cond_t cond;
ms_mutex_t mutex;
bctbx_list_t *tasks;
bool_t running;
bool_t inwait;
bool_t finish_tasks;
};
typedef struct _MSWorkerThread MSWorkerThread;
MS2_PUBLIC MSWorkerThread * ms_worker_thread_new(void);
MS2_PUBLIC void ms_worker_thread_add_task(MSWorkerThread *obj, MSTaskFunc fn, void *data);
MS2_PUBLIC void ms_worker_thread_destroy(MSWorkerThread *obj, bool_t finish_tasks);
#ifdef __cplusplus
}
#endif
#endif
......@@ -47,6 +47,7 @@ set(BASE_SOURCE_FILES_C
base/msvideopresets.c
base/mswebcam.c
base/mtu.c
base/msasync.c
otherfilters/itc.c
otherfilters/join.c
otherfilters/tee.c
......@@ -143,6 +144,8 @@ set(VOIP_SOURCE_FILES_C
audiofilters/l16.c
audiofilters/msfileplayer.c
audiofilters/msfilerec.c
audiofilters/asyncrw.c
audiofilters/asyncrw.h
audiofilters/msg722.c
audiofilters/msvaddtx.c
audiofilters/msvolume.c
......
......@@ -82,6 +82,7 @@ libmediastreamer_base_la_SOURCES= base/mscommon.c \
base/msvideopresets.c \
base/mswebcam.c \
base/mtu.c \
base/msasync.c \
otherfilters/void.c \
otherfilters/itc.c
libmediastreamer_voip_la_SOURCES=
......@@ -151,6 +152,8 @@ libmediastreamer_voip_la_SOURCES+= audiofilters/alaw.c \
audiofilters/msgenericplc.c \
audiofilters/msfileplayer.c \
audiofilters/msfilerec.c \
audiofilters/asyncrw.c \
audiofilters/asyncrw.h \
audiofilters/waveheader.h \
audiofilters/flowcontrol.c \
audiofilters/msvaddtx.c
......
/*
mediastreamer2 library - modular sound and video processing and streaming
Copyright (C) 2016 Belledonne Communications SARL
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#include "mediastreamer2/msasync.h"
#include "asyncrw.h"
#include "mediastreamer2/msqueue.h"
struct _MSAsyncReader{
MSWorkerThread *wth;
ms_mutex_t mutex;
MSBufferizer buf;
int fd;
int ntasks_pending;
int blocksize;
off_t seekoff;
int moving;
bool_t eof;
};
MSAsyncReader *ms_async_reader_new(int fd){
MSAsyncReader *obj = ms_new0(MSAsyncReader,1);
ms_mutex_init(&obj->mutex, NULL);
ms_bufferizer_init(&obj->buf);
obj->fd = fd;
obj->wth = ms_worker_thread_new();
#ifndef WIN32
obj->blocksize = getpagesize();
#else
obj->blocksize = 4096;
#endif
return obj;
}
void ms_async_reader_destroy(MSAsyncReader *obj){
ms_worker_thread_destroy(obj->wth, FALSE);
ms_mutex_destroy(&obj->mutex);
ms_bufferizer_flush(&obj->buf);
ms_free(obj);
}
static void async_reader_fill(void *data){
MSAsyncReader *obj = (MSAsyncReader*) data;
mblk_t *m = allocb(obj->blocksize, 0);
int err = read(obj->fd, m->b_wptr, obj->blocksize);
ms_mutex_lock(&obj->mutex);
if (err >= 0){
if (err > 0){
m->b_wptr += err;
ms_bufferizer_put(&obj->buf, m);
}else freemsg(m);
if (err < obj->blocksize){
obj->eof = TRUE;
}
}else if (err == -1){
ms_error("async_reader_fill(): %s", strerror(errno));
obj->eof = TRUE; /*what to do then ?*/
}
obj->ntasks_pending--;
ms_mutex_unlock(&obj->mutex);
}
int ms_async_reader_read(MSAsyncReader *obj, uint8_t *buf, size_t size){
int err;
ms_mutex_lock(&obj->mutex);
if (obj->moving){
err = -EWOULDBLOCK;
goto end;
}
err = ms_bufferizer_get_avail(&obj->buf);
if (err < size && obj->ntasks_pending){
err = -EWOULDBLOCK;
goto end;
}
/*eventually ask to fill the bufferizer*/
if (obj->ntasks_pending == 0){
if (err < obj->blocksize){
obj->ntasks_pending++;
ms_worker_thread_add_task(obj->wth, async_reader_fill, obj);
}
}
/*and finally return the datas*/
err = ms_bufferizer_read(&obj->buf, buf, size);
end:
ms_mutex_unlock(&obj->mutex);
return err;
}
static void async_reader_seek(void *data){
MSAsyncReader *obj = (MSAsyncReader*) data;
ms_mutex_lock(&obj->mutex);
if (lseek(obj->fd, obj->seekoff, SEEK_SET) == -1){
ms_error("async_reader_seek() seek failed : %s", strerror(errno));
}
obj->ntasks_pending--;
obj->moving--;
ms_bufferizer_flush(&obj->buf);
ms_mutex_unlock(&obj->mutex);
async_reader_fill(data);
}
void ms_async_reader_seek(MSAsyncReader *obj, off_t offset){
ms_mutex_lock(&obj->mutex);
obj->ntasks_pending++;
obj->moving++;
obj->seekoff = offset;
ms_worker_thread_add_task(obj->wth, async_reader_seek, obj);
ms_mutex_unlock(&obj->mutex);
}
struct _MSAsyncWriter{
MSWorkerThread *wth;
ms_mutex_t mutex;
MSBufferizer buf;
uint8_t *wbuf;
int fd;
int blocksize;
};
MSAsyncWriter *ms_async_writer_new(int fd){
MSAsyncWriter *obj = ms_new0(MSAsyncWriter,1);
ms_mutex_init(&obj->mutex, NULL);
ms_bufferizer_init(&obj->buf);
obj->fd = fd;
obj->wth = ms_worker_thread_new();
#ifndef WIN32
obj->blocksize = getpagesize();
#else
obj->blocksize = 4096;
#endif
obj->wbuf = ms_malloc(obj->blocksize);
return obj;
}
static void async_writer_write(void *data){
MSAsyncWriter *obj = (MSAsyncWriter*) data;
size_t size;
bool_t ok = FALSE;
ms_mutex_lock(&obj->mutex);
size = MIN(obj->blocksize, ms_bufferizer_get_avail(&obj->buf));
if (ms_bufferizer_read(&obj->buf, obj->wbuf, size) == size){
ok = TRUE;
}else{
ms_error("async_writer_write(): should not happen");
}
ms_mutex_unlock(&obj->mutex);
if (ok){
if (write(obj->fd, obj->wbuf, size) == -1){
ms_error("async_writer_write(): %s", strerror(errno));
}
}
}
void ms_async_writer_destroy(MSAsyncWriter *obj){
if (ms_bufferizer_get_avail(&obj->buf) > 0){
/*push last samples, even if less than blocksize long */
ms_worker_thread_add_task(obj->wth, async_writer_write, obj);
}
ms_worker_thread_destroy(obj->wth, TRUE);
ms_mutex_destroy(&obj->mutex);
ms_bufferizer_flush(&obj->buf);
ms_free(obj->wbuf);
ms_free(obj);
}
int ms_async_reader_write(MSAsyncWriter *obj, mblk_t *m){
ms_mutex_lock(&obj->mutex);
ms_bufferizer_put(&obj->buf, m);
/*each time we have blocksize bytes in a bufferizer, push a write*/
if (ms_bufferizer_get_avail(&obj->buf) >= obj->blocksize){
ms_worker_thread_add_task(obj->wth, async_writer_write, obj);
}
ms_mutex_unlock(&obj->mutex);
return 0;
}
/*
mediastreamer2 library - modular sound and video processing and streaming
Copyright (C) 2016 Belledonne Communications SARL
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#ifndef asyncrw_h
#define asyncrw_h
typedef struct _MSAsyncReader MSAsyncReader;
typedef struct _MSAsyncWriter MSAsyncWriter;
MSAsyncReader *ms_async_reader_new(int fd);
void ms_async_reader_destroy(MSAsyncReader *obj);
int ms_async_reader_read(MSAsyncReader *obj, uint8_t *buf, size_t size);
void ms_async_reader_seek(MSAsyncReader *obj, off_t offset);
MSAsyncWriter *ms_async_writer_new(int fd);
void ms_async_writer_destroy(MSAsyncWriter *obj);
int ms_async_reader_write(MSAsyncWriter *obj, mblk_t *m);
#endif
/*
mediastreamer2 library - modular sound and video processing and streaming
Copyright (C) 2006 Simon MORLAT (simon.morlat@linphone.org)
Copyright (C) 2006-2016 Belledonne Communications SARL
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
......
/*
mediastreamer2 library - modular sound and video processing and streaming
Copyright (C) 2006 Belledonne Communications SARL
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#include "mediastreamer2/msasync.h"
static void ms_task_destroy(MSTask *obj){
ms_mutex_destroy(&obj->mutex);
ms_free(obj);
}
MSTask * ms_task_new(MSTaskFunc func, void *data){
MSTask *obj = ms_new0(MSTask, 1);
obj->data = data;
obj->func = func;
obj->state = MSTaskInit;
ms_mutex_init(&obj->mutex, NULL);
return obj;
}
void ms_task_execute(MSTask *obj, int do_it){
ms_mutex_lock(&obj->mutex);
obj->state = MSTaskRunning;
ms_mutex_unlock(&obj->mutex);
if (do_it) obj->func(obj->data);
ms_mutex_lock(&obj->mutex);
obj->state = MSTaskDone;
ms_mutex_unlock(&obj->mutex);
ms_task_destroy(obj);
}
static void *ms_worker_thread_run(void *d){
MSWorkerThread *obj = (MSWorkerThread*)d;
ms_mutex_lock(&obj->mutex);
while(obj->running || obj->tasks){ /*don't let the thread exit with unterminated tasks*/
if (obj->tasks){
MSTask *t = (MSTask*)obj->tasks->data;
/*pop first element*/
obj->tasks = bctbx_list_delete_link(obj->tasks, obj->tasks);
ms_mutex_unlock(&obj->mutex);
ms_task_execute(t, obj->running || obj->finish_tasks);
ms_mutex_lock(&obj->mutex);
}else{
obj->inwait = TRUE;
ms_cond_wait(&obj->cond, &obj->mutex);
obj->inwait = FALSE;
}
}
ms_mutex_unlock(&obj->mutex);
return NULL;
}
MSWorkerThread * ms_worker_thread_new(void){
MSWorkerThread *obj = ms_new0(MSWorkerThread, 1);
ms_mutex_init(&obj->mutex, NULL);
ms_cond_init(&obj->cond, NULL);
obj->running = TRUE;
ms_thread_create(&obj->thread, NULL, ms_worker_thread_run, obj);
return obj;
}
void ms_worker_thread_add_task(MSWorkerThread *obj, MSTaskFunc func, void *data){
MSTask *task = ms_task_new(func, data);
ms_mutex_lock(&obj->mutex);
obj->tasks = bctbx_list_append(obj->tasks, task);
if (obj->inwait) pthread_cond_signal(&obj->cond);
ms_mutex_unlock(&obj->mutex);
}
void ms_worker_thread_destroy(MSWorkerThread *obj, bool_t finish_tasks){
ms_mutex_lock(&obj->mutex);
obj->finish_tasks = finish_tasks;
obj->running = FALSE;
if (obj->inwait) pthread_cond_signal(&obj->cond);
ms_mutex_unlock(&obj->mutex);
ms_thread_join(obj->thread, NULL);
if (obj->tasks){
/*should never happen*/
ms_error("Leaving %i tasks in worker thread.", (int)bctbx_list_size(obj->tasks));
}
ms_mutex_destroy(&obj->mutex);
ms_cond_destroy(&obj->cond);
ms_free(obj);
}
/*
mediastreamer2 library - modular sound and video processing and streaming
Copyright (C) 2006 Simon MORLAT (simon.morlat@linphone.org)
Copyright (C) 2006 Belledonne Communications SARL
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment