Commit 593f86ff authored by Mickaël Turnel's avatar Mickaël Turnel

Set the reception of rtp packets asynchronous for windows

parent 5f8fcddc
......@@ -357,6 +357,10 @@ typedef struct _RtpStream
jitter_stats_t jitter_stats;
struct _OrtpCongestionDetector *congdetect;
struct _OrtpVideoBandwidthEstimator *video_bw_estimator;
ortp_thread_t win_t;
volatile bool_t is_win_thread_running;
queue_t winrq;
ortp_mutex_t winrq_lock;
}RtpStream;
typedef struct _RtcpStream
......
......@@ -310,6 +310,12 @@ rtp_session_init (RtpSession * session, int mode)
ortp_bw_estimator_init(&session->rtp.gs.recv_bw_estimator, 0.95f, 0.01f);
ortp_bw_estimator_init(&session->rtcp.gs.recv_bw_estimator, 0.95f, 0.01f);
#if defined(_WIN32) || defined(_WIN32_WCE)
session->rtp.is_win_thread_running = FALSE;
qinit(&session->rtp.winrq);
ortp_mutex_init(&session->rtp.winrq_lock);
#endif
}
void rtp_session_enable_congestion_detection(RtpSession *session, bool_t enabled){
......@@ -1559,6 +1565,12 @@ static void ortp_stream_uninit(OrtpStream *os){
void rtp_session_uninit (RtpSession * session)
{
#if defined(_WIN32) || defined(_WIN32_WCE)
session->rtp.is_win_thread_running = FALSE;
flushq(&session->rtp.winrq, FLUSHALL);
ortp_mutex_destroy(&session->rtp.winrq_lock);
#endif
RtpTransport *rtp_meta_transport = NULL;
RtpTransport *rtcp_meta_transport = NULL;
/* first of all remove the session from the scheduler */
......
......@@ -1645,56 +1645,111 @@ void rtp_session_process_incoming(RtpSession * session, mblk_t *mp, bool_t is_rt
}
}
int rtp_session_rtp_recv (RtpSession * session, uint32_t user_ts) {
void* rtp_session_recvfrom_async(void* obj) {
RtpSession *session = (RtpSession*) obj;
int error;
struct sockaddr_storage remaddr;
socklen_t addrlen = sizeof (remaddr);
mblk_t *mp;
#if defined(_WIN32) || defined(_WIN32_WCE)
WSAPOLLFD fdarray = {0};
fdarray.fd = session->rtp.gs.socket;
fdarray.events = POLLRDNORM;
while(session->rtp.is_win_thread_running)
{
int ret = WSAPoll(&fdarray, 1, 10);
if (ret == SOCKET_ERROR) {
ortp_warning("Error rtp recv sync thread for windows: error while polling [%i]", WSAGetLastError());
} else if (ret > 0) {
#endif
bool_t sock_connected=!!(session->flags & RTP_SOCKET_CONNECTED);
mp = msgb_allocator_alloc(&session->rtp.gs.allocator, session->recv_buf_size);
if (sock_connected){
error = rtp_session_recvfrom(session, TRUE, mp, 0, NULL, NULL);
}else if (rtp_session_using_transport(session, rtp)) {
error = (session->rtp.gs.tr->t_recvfrom)(session->rtp.gs.tr, mp, 0, (struct sockaddr *) &remaddr, &addrlen);
} else {
error = rtp_session_recvfrom(session, TRUE, mp, 0, (struct sockaddr *) &remaddr, &addrlen);
}
if (error > 0) {
mp->b_wptr+=error;
#if defined(_WIN32) || defined(_WIN32_WCE)
ortp_mutex_lock(&session->rtp.winrq_lock);
#endif
putq(&session->rtp.winrq, mp);
#if defined(_WIN32) || defined(_WIN32_WCE)
ortp_mutex_unlock(&session->rtp.winrq_lock);
#endif
} else {
int errnum;
if (error==-1 && !is_would_block_error((errnum=getSocketErrorCode())) )
{
if (session->on_network_error.count>0){
rtp_signal_table_emit3(&session->on_network_error,"Error receiving RTP packet",ORTP_INT_TO_POINTER(getSocketErrorCode()));
}else ortp_warning("Error receiving RTP packet: %s, err num [%i],error [%i]",getSocketError(),errnum,error);
#if TARGET_OS_IPHONE
/*hack for iOS and non-working socket because of background mode*/
if (errnum==ENOTCONN){
/*re-create new sockets */
rtp_session_set_local_addr(session,session->rtp.gs.sockfamily==AF_INET ? "0.0.0.0" : "::0",session->rtp.gs.loc_port,session->rtcp.gs.loc_port);
}
#endif
}
freemsg(mp);
}
#if defined(_WIN32) || defined(_WIN32_WCE)
}
}
ortp_thread_exit(NULL);
#endif
}
int rtp_session_rtp_recv (RtpSession * session, uint32_t user_ts) {
mblk_t *mp;
if ((session->rtp.gs.socket==(ortp_socket_t)-1) && !rtp_session_using_transport(session, rtp)) return -1; /*session has no sockets for the moment*/
while (1)
{
bool_t sock_connected=!!(session->flags & RTP_SOCKET_CONNECTED);
mp = msgb_allocator_alloc(&session->rtp.gs.allocator, session->recv_buf_size);
mp->reserved1 = user_ts;
if (sock_connected){
error=rtp_session_recvfrom(session, TRUE, mp, 0, NULL, NULL);
}else if (rtp_session_using_transport(session, rtp)) {
error = (session->rtp.gs.tr->t_recvfrom)(session->rtp.gs.tr, mp, 0, (struct sockaddr *) &remaddr, &addrlen);
} else {
error = rtp_session_recvfrom(session, TRUE, mp, 0, (struct sockaddr *) &remaddr, &addrlen);
}
if (error > 0){
mp->b_wptr+=error;
rtp_session_process_incoming(session, mp, TRUE, user_ts, FALSE);
}
else
{
#if defined(_WIN32) || defined(_WIN32_WCE)
if (!session->rtp.is_win_thread_running) {
int errnum;
if (error==-1 && !is_would_block_error((errnum=getSocketErrorCode())) )
{
if (session->on_network_error.count>0){
rtp_signal_table_emit3(&session->on_network_error,"Error receiving RTP packet",ORTP_INT_TO_POINTER(getSocketErrorCode()));
}else ortp_warning("Error receiving RTP packet: %s, err num [%i],error [%i]",getSocketError(),errnum,error);
#if TARGET_OS_IPHONE
/*hack for iOS and non-working socket because of background mode*/
if (errnum==ENOTCONN){
/*re-create new sockets */
rtp_session_set_local_addr(session,session->rtp.gs.sockfamily==AF_INET ? "0.0.0.0" : "::0",session->rtp.gs.loc_port,session->rtcp.gs.loc_port);
}
#endif
}else{
/*EWOULDBLOCK errors or transports returning 0 are ignored.*/
rtp_session_process_incoming(session, NULL, TRUE, user_ts, FALSE);
session->rtp.is_win_thread_running = TRUE;
if ((errnum = ortp_thread_create(&session->rtp.win_t, NULL, rtp_session_recvfrom_async, (void*)session)) != 0) {
ortp_warning("Error creating rtp recv async thread for windows: error [%i]", errnum);
session->rtp.is_win_thread_running = FALSE;
return -1;
}
freemsg(mp);
}
#else
rtp_session_recvfrom_async((void*)session);
#endif
#if defined(_WIN32) || defined(_WIN32_WCE)
ortp_mutex_lock(&session->rtp.winrq_lock);
#endif
mp = getq(&session->rtp.winrq);
#if defined(_WIN32) || defined(_WIN32_WCE)
ortp_mutex_unlock(&session->rtp.winrq_lock);
#endif
if (mp != NULL) {
mp->reserved1 = user_ts;
rtp_session_process_incoming(session, mp, TRUE, user_ts, FALSE);
} else {
rtp_session_process_incoming(session, NULL, TRUE, user_ts, FALSE);
return -1;
}
}
return error;
return -1;
}
int rtp_session_rtcp_recv (RtpSession * session) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment