Commit ec6c4d52 authored by Sylvain Berfini's avatar Sylvain Berfini 🎩

Reworked a few things to make packet injection in reception possible

parent 9a1e61ab
......@@ -97,6 +97,7 @@ typedef struct _RtpTransportModifier
{
void *data;
struct _RtpSession *session;//<back pointer to the owning session, set by oRTP
struct _RtpTransport *transport;//<back point to the owning transport, set by oRTP
int (*t_process_on_send)(struct _RtpTransportModifier *t, mblk_t *msg);
int (*t_process_on_receive)(struct _RtpTransportModifier *t, mblk_t *msg);
void (*t_process_on_schedule)(struct _RtpTransportModifier *t); /*invoked each time rtp_session_recvm is called even is no message are available*/
......@@ -277,7 +278,6 @@ typedef struct _OrtpStream {
socklen_t loc_addrlen;
struct sockaddr_storage loc_addr;
struct _RtpTransport *tr;
mblk_t *cached_mp;
struct timeval send_bw_start; /* used for bandwidth estimation */
struct timeval recv_bw_start; /* used for bandwidth estimation */
unsigned int sent_bytes; /* used for bandwidth estimation */
......@@ -285,6 +285,7 @@ typedef struct _OrtpStream {
float upload_bw;
float download_bw;
OList *aux_destinations; /*list of OrtpAddress */
msgb_allocator_t allocator;
} OrtpStream;
typedef struct _RtpStream
......@@ -375,7 +376,6 @@ struct _RtpSession
RtpSignalTable on_rtcp_bye;
struct _OList *signal_tables;
struct _OList *eventqs;
msgb_allocator_t allocator;
RtpStream rtp;
RtcpStream rtcp;
OrtpRtcpXrStats rtcp_xr_stats;
......@@ -640,20 +640,11 @@ ORTP_PUBLIC void rtp_session_dispatch_event(RtpSession *session, OrtpEvent *ev);
ORTP_PUBLIC void rtp_session_set_reuseaddr(RtpSession *session, bool_t yes);
ORTP_PUBLIC int meta_rtp_transport_modifier_inject_packet(const RtpTransport *t, RtpTransportModifier *tpm, mblk_t *msg , int flags);
/**
* #RtpTransport object which can handle multiples security protocols. You can for instance use this object
* to use both sRTP and tunnel transporter. #mblk_t messages received and sent from the endpoint
* will pass through the list of modifiers given. First modifier in list will be first to modify the message
* in send mode and last in receive mode.
* @param[in] t #RtpTransport object that will be generated.
* @param[in] is_rtp Whether this object will be used for RTP packets or not.
* @param[in] endpoint #RtpTransport object in charge of sending/receiving packets. If NULL, it will use standards sendto and recvfrom functions.
* @param[in] modifiers_count number of #RtpModifier object given in the variadic list. Must be 0 if none are given.
* @return 0 if successful, -1 otherwise
**/
ORTP_PUBLIC int meta_rtp_transport_modifier_inject_packet_to(const RtpTransport *t, RtpTransportModifier *tpm, mblk_t *msg , int flags,const struct sockaddr *to, socklen_t tolen) ;
ORTP_PUBLIC int meta_rtp_transport_modifier_inject_packet_to_send(const RtpTransport *t, RtpTransportModifier *tpm, mblk_t *msg, int flags);
ORTP_PUBLIC int meta_rtp_transport_modifier_inject_packet_to_send_to(const RtpTransport *t, RtpTransportModifier *tpm, mblk_t *msg, int flags, const struct sockaddr *to, socklen_t tolen);
ORTP_PUBLIC int meta_rtp_transport_modifier_inject_packet_to_recv(const RtpTransport *t, RtpTransportModifier *tpm, mblk_t *msg, int flags);
void rtp_session_process_incoming(RtpSession * session, mblk_t *mp, bool_t is_rtp_packet, uint32_t ts);
void update_sent_bytes(OrtpStream *os, int nbytes);
/**
* get endpoint if any
......
......@@ -297,7 +297,8 @@ rtp_session_init (RtpSession * session, int mode)
session->symmetric_rtp = FALSE;
session->permissive=FALSE;
session->reuseaddr=TRUE;
msgb_allocator_init(&session->allocator);
msgb_allocator_init(&session->rtp.gs.allocator);
msgb_allocator_init(&session->rtcp.gs.allocator);
/*set default rtptransport*/
rtp_session_set_transports( session
,meta_rtp_transport_new(TRUE,NULL, 0)
......@@ -1206,11 +1207,11 @@ rtp_session_recvm_with_ts (RtpSession * session, uint32_t user_ts)
{
size_t msgsize = msgdsize(mp); /* evaluate how much bytes (including header) is received by app */
uint32_t packet_ts;
ortp_debug("Returning mp with ts=%i", packet_ts);
ortp_global_stats.recv += msgsize;
stream->stats.recv += msgsize;
rtp = (rtp_header_t *) mp->b_rptr;
packet_ts=rtp->timestamp;
ortp_debug("Returning mp with ts=%i", packet_ts);
/* check for payload type changes */
if (session->rcv.pt != rtp->paytype)
{
......@@ -1491,10 +1492,7 @@ void ortp_stream_clear_aux_addresses(OrtpStream *os){
}
static void ortp_stream_uninit(OrtpStream *os){
if (os->cached_mp) {
freemsg(os->cached_mp);
os->cached_mp=NULL;
}
msgb_allocator_init(&os->allocator);
ortp_stream_clear_aux_addresses(os);
}
......@@ -1526,7 +1524,6 @@ void rtp_session_uninit (RtpSession * session)
freemsg(session->minimal_sdes);
session->signal_tables = o_list_free(session->signal_tables);
msgb_allocator_uninit(&session->allocator);
if (session->net_sim_ctx)
ortp_network_simulator_destroy(session->net_sim_ctx);
......@@ -2092,9 +2089,9 @@ int meta_rtp_transport_sendto(RtpTransport *t, mblk_t *msg , int flags, const st
}
/**
* allow a modifier to inject a packet wich will be treated by successive modifiers
* allow a modifier to inject a packet which will be treated by successive modifiers
*/
int meta_rtp_transport_modifier_inject_packet(const RtpTransport *t, RtpTransportModifier *tpm, mblk_t *msg , int flags) {
int meta_rtp_transport_modifier_inject_packet_to_send(const RtpTransport *t, RtpTransportModifier *tpm, mblk_t *msg, int flags) {
struct sockaddr *to;
socklen_t tolen;
MetaRtpTransportImpl *m = (MetaRtpTransportImpl*)t->data;
......@@ -2108,53 +2105,97 @@ int meta_rtp_transport_modifier_inject_packet(const RtpTransport *t, RtpTranspor
to=(struct sockaddr*)&t->session->rtp.gs.rem_addr;
tolen=t->session->rtp.gs.rem_addrlen;
} else {
to=(struct sockaddr*)&t->session->rtcp.gs.rem_addr;
tolen=t->session->rtcp.gs.rem_addrlen;
to = (struct sockaddr*)&t->session->rtcp.gs.rem_addr;
tolen = t->session->rtcp.gs.rem_addrlen;
}
return meta_rtp_transport_modifier_inject_packet_to(t,tpm,msg,flags,to,tolen);
return meta_rtp_transport_modifier_inject_packet_to_send_to(t, tpm, msg, flags, to, tolen);
}
/**
* allow a modifier to inject a packet wich will be treated by successive modifiers
* allow a modifier to inject a packet which will be treated by successive modifiers
*/
int meta_rtp_transport_modifier_inject_packet_to(const RtpTransport *t, RtpTransportModifier *tpm, mblk_t *msg , int flags,const struct sockaddr *to, socklen_t tolen) {
int meta_rtp_transport_modifier_inject_packet_to_send_to(const RtpTransport *t, RtpTransportModifier *tpm, mblk_t *msg, int flags, const struct sockaddr *to, socklen_t tolen) {
size_t prev_ret;
int ret;
OList *elem;
bool_t packetInjected = tpm?FALSE:TRUE; /*if no modifier, start from the begening*/
bool_t foundMyself = tpm ? FALSE : TRUE; /*if no modifier, start from the beginning*/
MetaRtpTransportImpl *m = (MetaRtpTransportImpl*)t->data;
OList *elem = m->modifiers;
if (!m->has_set_session){
meta_rtp_set_session(t->session,m);
if (!m->has_set_session) {
meta_rtp_set_session(t->session, m);
}
prev_ret=msgdsize(msg);
for (elem=m->modifiers;elem!=NULL;elem=o_list_next(elem)){
prev_ret = msgdsize(msg);
for (;elem != NULL; elem = o_list_next(elem)) {
/* run modifiers only after packet injection, the modifier given in parameter is not applied */
if (packetInjected == TRUE) {
RtpTransportModifier *rtm=(RtpTransportModifier*)elem->data;
ret = rtm->t_process_on_send(rtm,msg);
RtpTransportModifier *rtm = (RtpTransportModifier*)elem->data;
if (foundMyself == TRUE) {
ret = rtm->t_process_on_send(rtm, msg);
if (ret<=0){
if (ret <= 0) {
// something went wrong in the modifier (failed to encrypt for instance)
return ret;
}
msg->b_wptr+=(ret-prev_ret);
prev_ret=ret;
msg->b_wptr += (ret - prev_ret);
prev_ret = ret;
}
/* check if we must inject the packet */
if (elem->data == tpm) {
packetInjected = TRUE;
if (rtm == tpm) {
foundMyself = TRUE;
}
}
if (m->endpoint != NULL) {
ret = m->endpoint->t_sendto(m->endpoint, msg, flags, to, tolen);
} else {
ret = _rtp_session_sendto(t->session, m->is_rtp, msg, flags, to, tolen);
}
update_sent_bytes(&t->session->rtp.gs, ret);
return ret;
}
if (m->endpoint!=NULL){
ret=m->endpoint->t_sendto(m->endpoint,msg,flags,to,tolen);
}else{
ret=_rtp_session_sendto(t->session, m->is_rtp,msg,flags,to,tolen);
/**
* allow a modifier to inject a packet which will be treated by successive modifiers
*/
int meta_rtp_transport_modifier_inject_packet_to_recv(const RtpTransport *t, RtpTransportModifier *tpm, mblk_t *msg, int flags) {
int ret = 0;
size_t prev_ret;
bool_t foundMyself = tpm ? FALSE : TRUE; /*if no modifier, start from the beginning*/
MetaRtpTransportImpl *m = (MetaRtpTransportImpl*)t->data;
OList *elem = m->modifiers;
OList *last_elem = NULL;
if (!m->has_set_session) {
meta_rtp_set_session(t->session, m);
}
for (;elem != NULL; elem = o_list_next(elem)) {
last_elem = elem;
}
prev_ret = msgdsize(msg);
for (;last_elem != NULL; last_elem = o_list_prev(last_elem)) {
/* run modifiers only after packet injection, the modifier given in parameter is not applied */
RtpTransportModifier *rtm = (RtpTransportModifier*)last_elem->data;
if (foundMyself == TRUE) {
ret = rtm->t_process_on_receive(rtm, msg);
if (ret < 0) {
// something went wrong in the modifier (failed to decrypt for instance)
break;
}
msg->b_wptr += (ret - prev_ret);
prev_ret = ret;
}
/* check if we must inject the packet */
if (rtm == tpm) {
foundMyself = TRUE;
}
}
rtp_session_process_incoming(t->session, msg, m->is_rtp, msg->reserved1);
return ret;
}
......@@ -2170,6 +2211,11 @@ int meta_rtp_transport_recvfrom(RtpTransport *t, mblk_t *msg, int flags, struct
if (m->endpoint!=NULL){
ret=m->endpoint->t_recvfrom(m->endpoint,msg,flags,from,fromlen);
if (ret > 0) {
/*store recv addr for use by modifiers*/
memcpy(&msg->net_addr,from,*fromlen);
msg->net_addrlen = *fromlen;
}
}else{
ret=rtp_session_rtp_recv_abstract(m->is_rtp?t->session->rtp.gs.socket:t->session->rtcp.gs.socket,msg,flags,from,fromlen);
}
......@@ -2190,10 +2236,6 @@ int meta_rtp_transport_recvfrom(RtpTransport *t, mblk_t *msg, int flags, struct
prev_ret=ret;
msg->b_wptr+=ret;
/*store recv addr for use by modifiers*/
memcpy(&msg->net_addr,from,*fromlen);
msg->net_addrlen = *fromlen;
for (;last_elem!=NULL;last_elem=o_list_prev(last_elem)){
RtpTransportModifier *rtm=(RtpTransportModifier*)last_elem->data;
ret = rtm->t_process_on_receive(rtm,msg);
......@@ -2262,6 +2304,7 @@ void meta_rtp_transport_destroy(RtpTransport *tp) {
for (elem=m->modifiers;elem!=NULL;elem=o_list_next(elem)){
RtpTransportModifier *rtm=(RtpTransportModifier*)elem->data;
rtm->transport = NULL;
rtm->t_destroy(rtm);
}
o_list_free(m->modifiers);
......@@ -2272,6 +2315,7 @@ void meta_rtp_transport_destroy(RtpTransport *tp) {
void meta_rtp_transport_append_modifier(RtpTransport *tp,RtpTransportModifier *tpm) {
MetaRtpTransportImpl *m = (MetaRtpTransportImpl*)tp->data;
tpm->transport = tp;
m->modifiers=o_list_append(m->modifiers, tpm);
if(m->has_set_session) {
tpm->session = tp->session;
......
......@@ -1086,7 +1086,7 @@ int _rtp_session_sendto(RtpSession *session, bool_t is_rtp, mblk_t *m, int flags
return ret;
}
static void update_sent_bytes(OrtpStream *os, int nbytes) {
void update_sent_bytes(OrtpStream *os, int nbytes) {
int overhead = ortp_stream_is_ipv6(os) ? IP6_UDP_OVERHEAD : IP_UDP_OVERHEAD;
if ((os->sent_bytes == 0) && (os->send_bw_start.tv_sec == 0) && (os->send_bw_start.tv_usec == 0)) {
/* Initialize bandwidth computing time when has not been started yet. */
......@@ -1113,7 +1113,6 @@ static void log_send_error(RtpSession *session, const char *type, mblk_t *m, str
static int rtp_session_rtp_sendto(RtpSession * session, mblk_t * m, struct sockaddr *destaddr, socklen_t destlen, bool_t is_aux){
int error;
if (rtp_session_using_transport(session, rtp)){
error = (session->rtp.gs.tr->t_sendto) (session->rtp.gs.tr,m,0,destaddr,destlen);
}else{
......@@ -1322,6 +1321,9 @@ int rtp_session_rtp_recv_abstract(ortp_socket_t socket, mblk_t *msg, int flags,
}
#endif
}
/*store recv addr for use by modifiers*/
memcpy(&msg->net_addr,from,*fromlen);
msg->net_addrlen = *fromlen;
}
return ret;
}
......@@ -1563,6 +1565,16 @@ static void rtp_process_incoming_packet(RtpSession * session, mblk_t * mp, bool_
}
}
void rtp_session_process_incoming(RtpSession * session, mblk_t *mp, bool_t is_rtp_packet, uint32_t ts) {
if (session->net_sim_ctx && session->net_sim_ctx->params.mode == OrtpNetworkSimulatorInbound) {
/*drain possible packets queued in the network simulator*/
mp = rtp_session_network_simulate(session, mp, &is_rtp_packet);
rtp_process_incoming_packet(session, mp, is_rtp_packet, ts);
} else if (mp != NULL) {
rtp_process_incoming_packet(session, mp, is_rtp_packet, ts);
}
}
int rtp_session_rtp_recv (RtpSession * session, uint32_t user_ts) {
int error;
ortp_socket_t sockfd=session->rtp.gs.socket;
......@@ -1578,10 +1590,9 @@ int rtp_session_rtp_recv (RtpSession * session, uint32_t user_ts) {
bool_t sock_connected=!!(session->flags & RTP_SOCKET_CONNECTED);
bool_t is_rtp_packet = TRUE;
if (session->rtp.gs.cached_mp==NULL){
session->rtp.gs.cached_mp = msgb_allocator_alloc(&session->allocator,session->recv_buf_size);
}
mp=session->rtp.gs.cached_mp;
mp = msgb_allocator_alloc(&session->rtp.gs.allocator, session->recv_buf_size);
mp->reserved1 = user_ts;
if (sock_connected){
error=rtp_session_rtp_recv_abstract(sockfd, mp, 0, NULL, NULL);
}else if (rtp_session_using_transport(session, rtp)) {
......@@ -1591,16 +1602,7 @@ int rtp_session_rtp_recv (RtpSession * session, uint32_t user_ts) {
}
if (error > 0){
mp->b_wptr+=error;
/*if we use the network simulator, store packet source address for later use(otherwise it will be used immediately)*/
memcpy(&mp->net_addr,&remaddr,addrlen);
mp->net_addrlen = addrlen;
if (session->net_sim_ctx && session->net_sim_ctx->params.mode==OrtpNetworkSimulatorInbound){
mp=rtp_session_network_simulate(session,mp,&is_rtp_packet);
}
rtp_process_incoming_packet(session,mp,is_rtp_packet,user_ts);
session->rtp.gs.cached_mp=NULL;
rtp_session_process_incoming(session, mp, is_rtp_packet, user_ts);
}
else
{
......@@ -1619,13 +1621,9 @@ int rtp_session_rtp_recv (RtpSession * session, uint32_t user_ts) {
#endif
}else{
/*EWOULDBLOCK errors or transports returning 0 are ignored.*/
if (session->net_sim_ctx && session->net_sim_ctx->params.mode==OrtpNetworkSimulatorInbound){
/*drain possible packets queued in the network simulator*/
mp=rtp_session_network_simulate(session,NULL,&is_rtp_packet);
rtp_process_incoming_packet(session,mp,is_rtp_packet,user_ts);
}
rtp_session_process_incoming(session, NULL, is_rtp_packet, user_ts);
}
/* don't free the cached_mp, it will be reused next time */
freemsg(mp);
return -1;
}
}
......@@ -1646,11 +1644,10 @@ int rtp_session_rtcp_recv (RtpSession * session) {
{
bool_t sock_connected=!!(session->flags & RTCP_SOCKET_CONNECTED);
bool_t is_rtp_packet = FALSE;
if (session->rtcp.gs.cached_mp==NULL){
session->rtcp.gs.cached_mp = allocb (RTCP_MAX_RECV_BUFSIZE, 0);
}
mp=session->rtcp.gs.cached_mp;
mp = msgb_allocator_alloc(&session->rtp.gs.allocator, RTCP_MAX_RECV_BUFSIZE);
mp->reserved1 = session->rtp.rcv_last_app_ts;
if (sock_connected){
error=rtp_session_rtp_recv_abstract(session->rtcp.gs.socket, mp, 0, NULL, NULL);
}else{
......@@ -1669,15 +1666,7 @@ int rtp_session_rtcp_recv (RtpSession * session) {
if (error > 0)
{
mp->b_wptr += error;
memcpy(&mp->net_addr,&remaddr,addrlen);
mp->net_addrlen = addrlen;
if (session->net_sim_ctx && session->net_sim_ctx->params.mode==OrtpNetworkSimulatorInbound){
mp=rtp_session_network_simulate(session,mp,&is_rtp_packet);
}
rtp_process_incoming_packet(session,mp,is_rtp_packet,session->rtp.rcv_last_app_ts);
session->rtcp.gs.cached_mp=NULL;
rtp_session_process_incoming(session, mp, is_rtp_packet, session->rtp.rcv_last_app_ts);
}
else
{
......@@ -1697,13 +1686,10 @@ int rtp_session_rtcp_recv (RtpSession * session) {
session->rtp.recv_errno=errnum;
}else{
/*EWOULDBLOCK errors or transports returning 0 are ignored.*/
if (session->net_sim_ctx && session->net_sim_ctx->params.mode==OrtpNetworkSimulatorInbound){
/*drain possible packets queued in the network simulator*/
mp=rtp_session_network_simulate(session,NULL,&is_rtp_packet);
rtp_process_incoming_packet(session,mp,is_rtp_packet,session->rtp.rcv_last_app_ts);
rtp_session_process_incoming(session, NULL, is_rtp_packet, session->rtp.rcv_last_app_ts);
}
}
/* don't free the cached_mp, it will be reused next time */
freemsg(mp);
return -1; /* avoids an infinite loop ! */
}
}
......
......@@ -39,6 +39,8 @@ void mblk_init(mblk_t *mp)
void mblk_meta_copy(const mblk_t *source, mblk_t *dest) {
dest->reserved1 = source->reserved1;
dest->reserved2 = source->reserved2;
memcpy(&dest->net_addr,&source->net_addr,source->net_addrlen);
dest->net_addrlen = source->net_addrlen;
#if defined(ORTP_TIMESTAMP)
dest->timestamp = source->timestamp;
#endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment