Commit d2dc12ed authored by Mickaël Turnel's avatar Mickaël Turnel
Browse files

Changed JpegWriter to be done on threads to stop it from delaying the process chain

Showing with 155 additions and 92 deletions
......@@ -23,6 +23,7 @@
#endif
#include "ffmpeg-priv.h"
#include "mediastreamer2/msasync.h"
#include "mediastreamer2/msjpegwriter.h"
#include "mediastreamer2/msvideo.h"
......@@ -43,6 +44,8 @@ typedef struct {
AVCodec *codec;
AVFrame *pict;
MSFilter *f;
MSWorkerThread *process_thread;
queue_t entry_q;
} JpegWriter;
static void close_file(JpegWriter *obj, bool_t doRenaming);
......@@ -85,11 +88,16 @@ static void jpg_init(MSFilter *f) {
ms_error("Could not find CODEC_ID_MJPEG !");
}
s->pict = av_frame_alloc();
s->process_thread = ms_worker_thread_new("MSJpegWriter");
qinit(&s->entry_q);
f->data = s;
}
static void jpg_uninit(MSFilter *f) {
JpegWriter *s = (JpegWriter *)f->data;
ms_worker_thread_destroy(s->process_thread, TRUE);
s->process_thread = NULL;
flushq(&s->entry_q, 0);
s->f = NULL;
if (s->file != NULL) {
close_file(s, FALSE);
......@@ -112,92 +120,115 @@ static int take_snapshot(MSFilter *f, void *arg) {
}
static void cleanup(JpegWriter *s, AVCodecContext *avctx, bool_t success) {
ms_filter_lock(s->f);
if (s->file) {
close_file(s, success);
}
ms_filter_unlock(s->f);
if (avctx) {
avcodec_close(avctx);
av_free(avctx);
}
}
static void jpg_process(MSFilter *f) {
static void jpg_process_frame_task(void *obj) {
MSFilter *f = (MSFilter *)obj;
JpegWriter *s = (JpegWriter *)f->data;
MSPicture yuvbuf, yuvjpeg;
mblk_t *m = NULL;
ms_filter_lock(f);
if (s->file != NULL && s->codec != NULL) {
MSPicture yuvbuf, yuvjpeg;
mblk_t *m = ms_queue_peek_last(f->inputs[0]);
uint32_t frame_ts = mblk_get_timestamp_info(m);
if (ms_yuv_buf_init_from_mblk(&yuvbuf, m) == 0) {
int error, got_pict;
size_t comp_buf_sz = msgdsize(m);
uint8_t *comp_buf = (uint8_t *)ms_malloc0(comp_buf_sz);
mblk_t *jpegm;
struct SwsContext *sws_ctx;
struct AVPacket packet;
AVCodecContext *avctx = avcodec_alloc_context3(s->codec);
memset(&packet, 0, sizeof(packet));
avctx->width = yuvbuf.w;
avctx->height = yuvbuf.h;
avctx->time_base.num = 1;
avctx->time_base.den = 1;
avctx->pix_fmt = AV_PIX_FMT_YUVJ420P;
error = avcodec_open2(avctx, s->codec, NULL);
if (error != 0) {
ms_error("avcodec_open() failed: %i", error);
cleanup(s, NULL, FALSE);
av_free(avctx);
goto end;
}
sws_ctx = sws_getContext(avctx->width, avctx->height, AV_PIX_FMT_YUV420P, avctx->width, avctx->height,
avctx->pix_fmt, SWS_FAST_BILINEAR, NULL, NULL, NULL);
if (sws_ctx == NULL) {
ms_error(" sws_getContext() failed.");
cleanup(s, avctx, FALSE);
goto end;
}
jpegm = ms_yuv_buf_alloc(&yuvjpeg, avctx->width, avctx->height);
m = getq(&s->entry_q);
ms_filter_unlock(f);
uint32_t frame_ts = mblk_get_timestamp_info(m);
if (ms_yuv_buf_init_from_mblk(&yuvbuf, m) == 0) {
int error, got_pict;
size_t comp_buf_sz = msgdsize(m);
uint8_t *comp_buf = (uint8_t *)ms_malloc0(comp_buf_sz);
mblk_t *jpegm;
struct SwsContext *sws_ctx;
struct AVPacket packet;
AVCodecContext *avctx = avcodec_alloc_context3(s->codec);
memset(&packet, 0, sizeof(packet));
avctx->width = yuvbuf.w;
avctx->height = yuvbuf.h;
avctx->time_base.num = 1;
avctx->time_base.den = 1;
avctx->pix_fmt = AV_PIX_FMT_YUVJ420P;
error = avcodec_open2(avctx, s->codec, NULL);
if (error != 0) {
ms_error("avcodec_open() failed: %i", error);
cleanup(s, NULL, FALSE);
av_free(avctx);
goto end;
}
sws_ctx = sws_getContext(avctx->width, avctx->height, AV_PIX_FMT_YUV420P, avctx->width, avctx->height,
avctx->pix_fmt, SWS_FAST_BILINEAR, NULL, NULL, NULL);
if (sws_ctx == NULL) {
ms_error(" sws_getContext() failed.");
cleanup(s, avctx, FALSE);
goto end;
}
jpegm = ms_yuv_buf_alloc(&yuvjpeg, avctx->width, avctx->height);
#if LIBSWSCALE_VERSION_INT >= AV_VERSION_INT(0, 9, 0)
if (sws_scale(sws_ctx, (const uint8_t *const *)yuvbuf.planes, yuvbuf.strides, 0, avctx->height,
yuvjpeg.planes, yuvjpeg.strides) < 0) {
if (sws_scale(sws_ctx, (const uint8_t *const *)yuvbuf.planes, yuvbuf.strides, 0, avctx->height, yuvjpeg.planes,
yuvjpeg.strides) < 0) {
#else
if (sws_scale(sws_ctx, (uint8_t **)yuvbuf.planes, yuvbuf.strides, 0, avctx->height, yuvjpeg.planes,
yuvjpeg.strides) < 0) {
if (sws_scale(sws_ctx, (uint8_t **)yuvbuf.planes, yuvbuf.strides, 0, avctx->height, yuvjpeg.planes,
yuvjpeg.strides) < 0) {
#endif
ms_error("sws_scale() failed.");
sws_freeContext(sws_ctx);
cleanup(s, avctx, FALSE);
freemsg(jpegm);
goto end;
}
ms_error("sws_scale() failed.");
sws_freeContext(sws_ctx);
cleanup(s, avctx, FALSE);
freemsg(jpegm);
goto end;
}
sws_freeContext(sws_ctx);
av_frame_unref(s->pict);
avpicture_fill((AVPicture *)s->pict, (uint8_t *)jpegm->b_rptr, avctx->pix_fmt, avctx->width, avctx->height);
packet.data = comp_buf;
packet.size = (int)comp_buf_sz;
packet.pts = frame_ts;
error = avcodec_encode_video2(avctx, &packet, s->pict, &got_pict);
if (error < 0) {
ms_error("Could not encode jpeg picture.");
av_frame_unref(s->pict);
avpicture_fill((AVPicture *)s->pict, (uint8_t *)jpegm->b_rptr, avctx->pix_fmt, avctx->width, avctx->height);
packet.data = comp_buf;
packet.size = (int)comp_buf_sz;
packet.pts = frame_ts;
error = avcodec_encode_video2(avctx, &packet, s->pict, &got_pict);
if (error < 0) {
ms_error("Could not encode jpeg picture.");
} else {
ms_filter_lock(f);
if (s->file != NULL && fwrite(comp_buf, packet.size, 1, s->file) > 0) {
ms_message("Snapshot done");
} else {
if (fwrite(comp_buf, packet.size, 1, s->file) > 0) {
ms_message("Snapshot done");
} else {
ms_error("Error writing snapshot.");
}
ms_error("Error writing snapshot.");
}
ms_free(comp_buf);
cleanup(s, avctx, TRUE);
freemsg(jpegm);
ms_filter_unlock(f);
}
ms_free(comp_buf);
cleanup(s, avctx, TRUE);
freemsg(jpegm);
}
freemsg(m);
}
static void jpg_process(MSFilter *f) {
JpegWriter *s = (JpegWriter *)f->data;
ms_filter_lock(f);
if (s->file != NULL && s->codec != NULL) {
mblk_t *img = ms_queue_peek_last(f->inputs[0]);
if (img != NULL) {
ms_queue_remove(f->inputs[0], img);
putq(&s->entry_q, img);
ms_worker_thread_add_task(s->process_thread, jpg_process_frame_task, (void *)f);
}
goto end;
}
end:
ms_filter_unlock(f);
ms_queue_flush(f->inputs[0]);
}
......
......@@ -24,6 +24,7 @@
#include "bctoolbox/utils.hh"
#include "bctoolbox/vfs.h"
#include "mediastreamer2/msasync.h"
#include "mediastreamer2/msjpegwriter.h"
#include "mediastreamer2/msvideo.h"
#include "turbojpeg.h"
......@@ -40,6 +41,8 @@ typedef struct {
char *tmpFilename;
tjhandle turboJpeg;
MSFilter *f;
MSWorkerThread *process_thread;
queue_t entry_q;
} JpegWriter;
static void close_file(JpegWriter *obj, bool_t doRenaming) {
......@@ -81,11 +84,16 @@ static void jpg_init(MSFilter *f) {
if (s->turboJpeg == NULL) {
ms_error("TurboJpeg init error:%s", tjGetErrorStr());
}
s->process_thread = ms_worker_thread_new("MSJpegWriter");
qinit(&s->entry_q);
f->data = s;
}
static void jpg_uninit(MSFilter *f) {
JpegWriter *s = (JpegWriter *)f->data;
ms_worker_thread_destroy(s->process_thread, TRUE);
s->process_thread = NULL;
flushq(&s->entry_q, 0);
s->f = NULL;
if (s->file != NULL) {
close_file(s, FALSE);
......@@ -110,48 +118,72 @@ static int take_snapshot(MSFilter *f, void *arg) {
}
static void cleanup(JpegWriter *s, bool_t success) {
ms_filter_lock(s->f);
if (s->file) {
close_file(s, success);
}
ms_filter_unlock(s->f);
}
static void jpg_process(MSFilter *f) {
bool_t success = FALSE;
static void jpg_process_frame_task(void *obj) {
MSFilter *f = (MSFilter *)obj;
JpegWriter *s = (JpegWriter *)f->data;
ms_filter_lock(f);
if (s->file != NULL && s->turboJpeg != NULL) {
int error;
MSPicture yuvbuf;
unsigned char *jpegBuffer = NULL;
unsigned long jpegSize = 0;
int error;
MSPicture yuvbuf;
unsigned char *jpegBuffer = NULL;
unsigned long jpegSize = 0;
bool_t success = FALSE;
mblk_t *m = NULL;
mblk_t *m = ms_queue_peek_last(f->inputs[0]);
ms_filter_lock(f);
m = getq(&s->entry_q);
ms_filter_unlock(f);
if (ms_yuv_buf_init_from_mblk(&yuvbuf, m) != 0) goto end;
if (ms_yuv_buf_init_from_mblk(&yuvbuf, m) != 0) goto end;
error = tjCompressFromYUVPlanes(
s->turboJpeg,
// This auto cast has the purpose to support multiple versions of turboJPEG where parameter can be const.
bctoolbox::Utils::auto_cast<unsigned char **>(yuvbuf.planes), yuvbuf.w, yuvbuf.strides, yuvbuf.h,
TJSAMP_420, &jpegBuffer, &jpegSize, 100, TJFLAG_ACCURATEDCT);
error = tjCompressFromYUVPlanes(
s->turboJpeg,
// This auto cast has the purpose to support multiple versions of turboJPEG where parameter can be const.
bctoolbox::Utils::auto_cast<unsigned char **>(yuvbuf.planes), yuvbuf.w, yuvbuf.strides, yuvbuf.h, TJSAMP_420,
&jpegBuffer, &jpegSize, 100, TJFLAG_ACCURATEDCT);
if (error != 0) {
ms_error("tjCompressFromYUVPlanes() failed: %s", tjGetErrorStr());
if (jpegBuffer != NULL) tjFree(jpegBuffer);
goto end;
}
if (bctbx_file_write2(s->file, jpegBuffer, jpegSize) != BCTBX_VFS_ERROR) {
ms_message("Snapshot done with turbojpeg");
success = TRUE;
} else {
ms_error("Error writing snapshot.");
}
if (error != 0) {
ms_error("tjCompressFromYUVPlanes() failed: %s", tjGetErrorStr());
if (jpegBuffer != NULL) tjFree(jpegBuffer);
goto end;
}
tjFree(jpegBuffer);
ms_filter_lock(f);
if (s->file != NULL && bctbx_file_write2(s->file, jpegBuffer, jpegSize) != BCTBX_VFS_ERROR) {
ms_message("Snapshot done with turbojpeg");
success = TRUE;
} else {
ms_error("Error writing snapshot.");
}
ms_filter_unlock(f);
tjFree(jpegBuffer);
end:
freemsg(m);
cleanup(s, success);
}
static void jpg_process(MSFilter *f) {
JpegWriter *s = (JpegWriter *)f->data;
ms_filter_lock(f);
if (s->file != NULL && s->turboJpeg != NULL) {
mblk_t *img = ms_queue_peek_last(f->inputs[0]);
if (img != NULL) {
ms_queue_remove(f->inputs[0], img);
putq(&s->entry_q, img);
ms_worker_thread_add_task(s->process_thread, jpg_process_frame_task, (void *)f);
}
}
ms_filter_unlock(f);
ms_queue_flush(f->inputs[0]);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment