Commit a9ffd72d authored by Simon Morlat's avatar Simon Morlat

new feature: multiple destinations can be added for a RtpSession.

parent c52efa88
......@@ -253,6 +253,8 @@ typedef unsigned char bool_t;
#define TRUE 1
#define FALSE 0
typedef struct _OList OList;
typedef struct ortpTimeSpec{
int64_t tv_sec;
int64_t tv_nsec;
......
......@@ -202,11 +202,16 @@ typedef struct OrtpRtcpXrStats {
uint32_t discarded_count;
} OrtpRtcpXrStats;
typedef struct _OrtpAddress{
struct sockaddr_storage addr;
socklen_t len;
}OrtpAddress;
typedef struct _OrtpStream {
ortp_socket_t socket;
int sockfamily;
int loc_port;
int rem_addrlen;
socklen_t rem_addrlen;
struct sockaddr_storage rem_addr;
struct _RtpTransport *tr;
mblk_t *cached_mp;
......@@ -216,6 +221,7 @@ typedef struct _OrtpStream {
unsigned int recv_bytes; /* used for bandwidth estimation */
float upload_bw;
float download_bw;
OList *aux_destinations; /*list of OrtpAddress */
} OrtpStream;
typedef struct _RtpStream
......@@ -382,6 +388,8 @@ rtp_session_set_remote_addr_full (RtpSession * session, const char * rtp_addr, i
/*same as previous function, old name:*/
ORTP_PUBLIC int rtp_session_set_remote_addr_and_port (RtpSession * session, const char * addr, int rtp_port, int rtcp_port);
ORTP_PUBLIC int rtp_session_set_remote_addr(RtpSession *session,const char *addr, int port);
ORTP_PUBLIC int rtp_session_add_aux_remote_addr_full(RtpSession * session, const char * rtp_addr, int rtp_port, const char * rtcp_addr, int rtcp_port);
ORTP_PUBLIC void rtp_session_clear_aux_remote_addr(RtpSession * session);
/* alternatively to the set_remote_addr() and set_local_addr(), an application can give
a valid socket (potentially connect()ed )to be used by the RtpSession */
ORTP_PUBLIC void rtp_session_set_sockets(RtpSession *session, int rtpfd, int rtcpfd);
......
......@@ -1390,6 +1390,22 @@ void rtp_session_dispatch_event(RtpSession *session, OrtpEvent *ev){
ortp_event_destroy(ev);
}
void ortp_stream_clear_aux_addresses(OrtpStream *os){
OList *elem;
for (elem=os->aux_destinations;elem!=NULL;elem=elem->next){
OrtpAddress *addr=(OrtpAddress*)elem->data;
ortp_free(addr);
}
os->aux_destinations=o_list_free(os->aux_destinations);
}
static void ortp_stream_uninit(OrtpStream *os){
if (os->cached_mp) {
freemsg(os->cached_mp);
os->cached_mp=NULL;
}
ortp_stream_clear_aux_addresses(os);
}
void rtp_session_uninit (RtpSession * session)
{
......@@ -1409,8 +1425,8 @@ void rtp_session_uninit (RtpSession * session)
wait_point_uninit(&session->snd.wp);
wait_point_uninit(&session->rcv.wp);
if (session->current_tev!=NULL) freemsg(session->current_tev);
if (session->rtp.gs.cached_mp!=NULL) freemsg(session->rtp.gs.cached_mp);
if (session->rtcp.gs.cached_mp!=NULL) freemsg(session->rtcp.gs.cached_mp);
ortp_stream_uninit(&session->rtp.gs);
ortp_stream_uninit(&session->rtcp.gs);
if (session->full_sdes != NULL)
freemsg(session->full_sdes);
if (session->minimal_sdes != NULL)
......
......@@ -82,6 +82,9 @@ static LPFN_WSARECVMSG ortp_WSARecvMsg = NULL;
# endif
#endif
static int
_rtp_session_set_remote_addr_full (RtpSession * session, const char * rtp_addr, int rtp_port, const char * rtcp_addr, int rtcp_port, bool_t is_aux);
static bool_t try_connect(int fd, const struct sockaddr *dest, socklen_t addrlen){
if (connect(fd,dest,addrlen)<0){
ortp_warning("Could not connect() socket: %s",getSocketError());
......@@ -668,17 +671,6 @@ int rtp_session_get_local_rtcp_port(const RtpSession *session){
return (session->rtcp.gs.loc_port>0) ? session->rtcp.gs.loc_port : -1;
}
static char * ortp_inet_ntoa(struct sockaddr *addr, int addrlen, char *dest, int destlen){
int err;
dest[0]=0;
err=getnameinfo(addr,addrlen,dest,destlen,NULL,0,NI_NUMERICHOST);
if (err!=0){
ortp_warning("getnameinfo error: %s",gai_strerror(err));
}
return dest;
}
/**
*rtp_session_set_remote_addr:
*@session: a rtp session freshly created.
......@@ -693,7 +685,7 @@ static char * ortp_inet_ntoa(struct sockaddr *addr, int addrlen, char *dest, int
**/
int
rtp_session_set_remote_addr (RtpSession * session, const char * addr, int port){
return rtp_session_set_remote_addr_full (session, addr, port, addr, port+1);
return rtp_session_set_remote_addr_full(session, addr, port, addr, port+1);
}
/**
......@@ -712,11 +704,29 @@ rtp_session_set_remote_addr (RtpSession * session, const char * addr, int port){
**/
int
rtp_session_set_remote_addr_full (RtpSession * session, const char * rtp_addr, int rtp_port, const char * rtcp_addr, int rtcp_port)
{
rtp_session_set_remote_addr_full (RtpSession * session, const char * rtp_addr, int rtp_port, const char * rtcp_addr, int rtcp_port){
return _rtp_session_set_remote_addr_full(session,rtp_addr,rtp_port,rtcp_addr,rtcp_port,FALSE);
}
static int
_rtp_session_set_remote_addr_full (RtpSession * session, const char * rtp_addr, int rtp_port, const char * rtcp_addr, int rtcp_port, bool_t is_aux){
int err;
struct addrinfo hints, *res0, *res;
char num[8];
struct sockaddr_storage *rtp_saddr=&session->rtp.gs.rem_addr;
socklen_t *rtp_saddr_len=&session->rtp.gs.rem_addrlen;
struct sockaddr_storage *rtcp_saddr=&session->rtcp.gs.rem_addr;
socklen_t *rtcp_saddr_len=&session->rtcp.gs.rem_addrlen;
OrtpAddress *aux_rtp=NULL,*aux_rtcp=NULL;
if (is_aux){
aux_rtp=ortp_malloc0(sizeof(OrtpAddress));
rtp_saddr=&aux_rtp->addr;
rtp_saddr_len=&aux_rtp->len;
aux_rtcp=ortp_malloc0(sizeof(OrtpAddress));
rtcp_saddr=&aux_rtcp->addr;
rtcp_saddr_len=&aux_rtcp->len;
}
memset(&hints, 0, sizeof(hints));
hints.ai_family = (session->rtp.gs.socket == -1) ? AF_UNSPEC : session->rtp.gs.sockfamily;
......@@ -727,7 +737,8 @@ rtp_session_set_remote_addr_full (RtpSession * session, const char * rtp_addr, i
err = getaddrinfo(rtp_addr, num, &hints, &res0);
if (err) {
ortp_warning ("Error in socket address: %s", gai_strerror(err));
return -1;
err=-1;
goto end;
}
if (session->rtp.gs.socket == -1){
/* the session has not its socket bound, do it */
......@@ -736,16 +747,19 @@ rtp_session_set_remote_addr_full (RtpSession * session, const char * rtp_addr, i
if (res0->ai_addr->sa_family==AF_INET6)
err = rtp_session_set_local_addr (session, "::", -1, -1);
else err=rtp_session_set_local_addr (session, "0.0.0.0", -1, -1);
if (err<0) return -1;
if (err<0) {
err=-1;
goto end;
}
}
err=1;
err=-1;
for (res = res0; res; res = res->ai_next) {
/* set a destination address that has the same type as the local address */
if (res->ai_family==session->rtp.gs.sockfamily ) {
memcpy( &session->rtp.gs.rem_addr, res->ai_addr, res->ai_addrlen);
session->rtp.gs.rem_addrlen=res->ai_addrlen;
memcpy(rtp_saddr, res->ai_addr, res->ai_addrlen);
*rtp_saddr_len=res->ai_addrlen;
err=0;
break;
}
......@@ -753,7 +767,7 @@ rtp_session_set_remote_addr_full (RtpSession * session, const char * rtp_addr, i
freeaddrinfo(res0);
if (err) {
ortp_warning("Could not set destination for RTP socket to %s:%i.",rtp_addr,rtp_port);
return -1;
goto end;
}
memset(&hints, 0, sizeof(hints));
......@@ -764,22 +778,23 @@ rtp_session_set_remote_addr_full (RtpSession * session, const char * rtp_addr, i
err = getaddrinfo(rtcp_addr, num, &hints, &res0);
if (err) {
ortp_warning ("Error: %s", gai_strerror(err));
return err;
err=-1;
goto end;
}
err=1;
err=-1;
for (res = res0; res; res = res->ai_next) {
/* set a destination address that has the same type as the local address */
if (res->ai_family==session->rtp.gs.sockfamily ) {
err=0;
memcpy( &session->rtcp.gs.rem_addr, res->ai_addr, res->ai_addrlen);
session->rtcp.gs.rem_addrlen=res->ai_addrlen;
memcpy(rtcp_saddr, res->ai_addr, res->ai_addrlen);
*rtcp_saddr_len=res->ai_addrlen;
break;
}
}
freeaddrinfo(res0);
if (err) {
ortp_warning("Could not set destination for RCTP socket to %s:%i.",rtcp_addr,rtcp_port);
return -1;
goto end;
}
if (can_connect(session)){
......@@ -803,20 +818,54 @@ rtp_session_set_remote_addr_full (RtpSession * session, const char * rtp_addr, i
session->flags&=~RTP_SOCKET_CONNECTED;
session->flags&=~RTCP_SOCKET_CONNECTED;
}
ortp_message("rtp session [%p] set to rtp [%s:%i] rtcp [%s:%i]" ,session
,rtp_addr
,rtp_port
,rtcp_addr
,rtcp_port);
return 0;
ortp_message("RtpSession [%p] sending to rtp [%s:%i] rtcp [%s:%i] %s" ,session
,rtp_addr
,rtp_port
,rtcp_addr
,rtcp_port
,is_aux ? "as auxiliary destination" : "");
end:
if (is_aux){
if (err==-1){
ortp_free(aux_rtp);
ortp_free(aux_rtcp);
}else{
session->rtp.gs.aux_destinations=o_list_append(session->rtp.gs.aux_destinations,aux_rtp);
session->rtcp.gs.aux_destinations=o_list_append(session->rtcp.gs.aux_destinations,aux_rtcp);
}
}
return err;
}
int rtp_session_set_remote_addr_and_port(RtpSession * session, const char * addr, int rtp_port, int rtcp_port){
return rtp_session_set_remote_addr_full(session,addr,rtp_port,addr,rtcp_port);
}
void rtp_session_set_sockets(RtpSession *session, int rtpfd, int rtcpfd)
{
/**
*rtp_session_add_remote_aux_addr_full:
*@session: a rtp session freshly created.
*@rtp_addr: a local IP address in the xxx.xxx.xxx.xxx form.
*@rtp_port: a local rtp port.
*@rtcp_addr: a local IP address in the xxx.xxx.xxx.xxx form.
*@rtcp_port: a local rtcp port.
*
* Add an auxiliary remote address for the rtp session, ie a destination address where rtp packet
* are sent.
*
* Returns: 0 on success.
**/
int
rtp_session_add_aux_remote_addr_full(RtpSession * session, const char * rtp_addr, int rtp_port, const char * rtcp_addr, int rtcp_port){
return _rtp_session_set_remote_addr_full(session,rtp_addr,rtp_port,rtcp_addr,rtcp_port,TRUE);
}
void rtp_session_clear_aux_remote_addr(RtpSession * session){
ortp_stream_clear_aux_addresses(&session->rtp.gs);
ortp_stream_clear_aux_addresses(&session->rtcp.gs);
}
void rtp_session_set_sockets(RtpSession *session, int rtpfd, int rtcpfd){
if (rtpfd!=-1) set_non_blocking_socket(rtpfd);
if (rtcpfd!=-1) set_non_blocking_socket(rtcpfd);
session->rtp.gs.socket=rtpfd;
......@@ -926,17 +975,53 @@ static void update_recv_bytes(OrtpStream *os, int nbytes) {
os->recv_bytes += nbytes + overhead;
}
static void log_send_error(RtpSession *session, const char *type, mblk_t *m, struct sockaddr *destaddr, socklen_t destlen){
char host[65]={0};
char port[12]={0};
getnameinfo(destaddr,destlen,host,sizeof(host)-1,port,sizeof(port)-1,NI_NUMERICHOST|NI_NUMERICSERV);
ortp_warning ("RtpSession [%p] error sending [%s] packet [%p] to ip=[%s] port=[%s]: %s",
session, type, m, host, port, getSocketError());
}
int
rtp_session_rtp_send (RtpSession * session, mblk_t * m)
{
static int rtp_session_rtp_sendto(RtpSession * session, mblk_t * m, struct sockaddr *destaddr, socklen_t destlen, bool_t is_aux){
int error;
ortp_socket_t sockfd=session->rtp.gs.socket;
if (rtp_session_using_transport(session, rtp)){
error = (session->rtp.gs.tr->t_sendto) (session->rtp.gs.tr,m,0,destaddr,destlen);
}else{
#ifdef USE_SENDMSG
error=rtp_sendmsg(sockfd,m,destaddr,destlen);
#else
if (m->b_cont!=NULL)
msgpullup(m,-1);
error = sendto (sockfd, (char*)m->b_rptr, (int) (m->b_wptr - m->b_rptr),
0,destaddr,destlen);
#endif
}
if (!is_aux){
/*errors to auxiliary destinations are not notified*/
if (error < 0){
if (session->on_network_error.count>0){
rtp_signal_table_emit3(&session->on_network_error,(long)"Error sending RTP packet",INT_TO_POINTER(getSocketErrorCode()));
}else log_send_error(session,"rtp",m,destaddr,destlen);
session->rtp.send_errno=getSocketErrorCode();
}else{
update_sent_bytes(&session->rtp.gs, error);
}
}
return error;
}
int
rtp_session_rtp_send (RtpSession * session, mblk_t * m){
int error=0;
int i;
rtp_header_t *hdr;
struct sockaddr *destaddr=(struct sockaddr*)&session->rtp.gs.rem_addr;
socklen_t destlen=session->rtp.gs.rem_addrlen;
ortp_socket_t sockfd=session->rtp.gs.socket;
OList *elem=NULL;
hdr = (rtp_header_t *) m->b_rptr;
if (hdr->version == 0) {
/* We are probably trying to send a STUN packet so don't change its content. */
......@@ -953,38 +1038,57 @@ rtp_session_rtp_send (RtpSession * session, mblk_t * m)
destaddr=NULL;
destlen=0;
}
/*first send to main destination*/
if (destlen) error=rtp_session_rtp_sendto(session,m,destaddr,destlen,FALSE);
/*then iterate over auxiliary destinations*/
for(elem=session->rtp.gs.aux_destinations;elem!=NULL;elem=elem->next){
OrtpAddress *addr=(OrtpAddress*)elem->data;
rtp_session_rtp_sendto(session,m,(struct sockaddr*)&addr->addr,addr->len,TRUE);
}
freemsg(m);
return error;
}
if (rtp_session_using_transport(session, rtp)){
error = (session->rtp.gs.tr->t_sendto) (session->rtp.gs.tr,m,0,destaddr,destlen);
}else{
static int rtp_session_rtcp_sendto(RtpSession * session, mblk_t * m, struct sockaddr *destaddr, socklen_t destlen, bool_t is_aux){
int error=0;
ortp_socket_t sockfd=session->rtcp.gs.socket;
if (rtp_session_using_transport(session, rtcp)){
error = (session->rtcp.gs.tr->t_sendto) (session->rtcp.gs.tr, m, 0,
destaddr, destlen);
}
else{
#ifdef USE_SENDMSG
error=rtp_sendmsg(sockfd,m,destaddr,destlen);
error=rtp_sendmsg(sockfd,m,destaddr, destlen);
#else
if (m->b_cont!=NULL)
if (m->b_cont!=NULL){
msgpullup(m,-1);
error = sendto (sockfd, (char*)m->b_rptr, (int) (m->b_wptr - m->b_rptr),
0,destaddr,destlen);
}
error = sendto(sockfd, (char*)m->b_rptr,(int) (m->b_wptr - m->b_rptr), 0, destaddr, destlen);
#endif
}
if (error < 0){
if (session->on_network_error.count>0){
rtp_signal_table_emit3(&session->on_network_error,(long)"Error sending RTP packet",INT_TO_POINTER(getSocketErrorCode()));
}else ortp_warning ("[%p] Error sending rtp packet: %s ; socket=%i", session, getSocketError(), sockfd);
session->rtp.send_errno=getSocketErrorCode();
}else{
update_sent_bytes(&session->rtp.gs, error);
if (!is_aux){
if (error < 0){
if (session->on_network_error.count>0){
rtp_signal_table_emit3(&session->on_network_error,(long)"Error sending RTCP packet",INT_TO_POINTER(getSocketErrorCode()));
}else{
log_send_error(session,"rtcp",m,destaddr,destlen);
}
} else {
update_sent_bytes(&session->rtcp.gs, error);
update_avg_rtcp_size(session, error);
}
}
freemsg (m);
return error;
}
int
rtp_session_rtcp_send (RtpSession * session, mblk_t * m)
{
rtp_session_rtcp_send (RtpSession * session, mblk_t * m){
int error=0;
ortp_socket_t sockfd=session->rtcp.gs.socket;
struct sockaddr *destaddr=(struct sockaddr*)&session->rtcp.gs.rem_addr;
socklen_t destlen=session->rtcp.gs.rem_addrlen;
OList *elem=NULL;
bool_t using_connected_socket=(session->flags & RTCP_SOCKET_CONNECTED)!=0;
if (using_connected_socket) {
......@@ -992,38 +1096,17 @@ rtp_session_rtcp_send (RtpSession * session, mblk_t * m)
destlen=0;
}
if (session->rtcp.enabled &&
( (sockfd!=(ortp_socket_t)-1 && (session->rtcp.gs.rem_addrlen>0 ||using_connected_socket))
|| rtp_session_using_transport(session, rtcp) ) ){
if (rtp_session_using_transport(session, rtcp)){
error = (session->rtcp.gs.tr->t_sendto) (session->rtcp.gs.tr, m, 0,
destaddr, destlen);
}
else{
#ifdef USE_SENDMSG
error=rtp_sendmsg(sockfd,m,destaddr, destlen);
#else
if (m->b_cont!=NULL){
msgpullup(m,-1);
}
error = sendto (sockfd, (char*)m->b_rptr,
(int) (m->b_wptr - m->b_rptr), 0,
destaddr, destlen);
#endif
if (session->rtcp.enabled){
if ( (sockfd!=(ortp_socket_t)-1 && (destlen>0 || using_connected_socket))
|| rtp_session_using_transport(session, rtcp) ) {
rtp_session_rtcp_sendto(session,m,destaddr,destlen,FALSE);
}
if (error < 0){
char host[65];
if (session->on_network_error.count>0){
rtp_signal_table_emit3(&session->on_network_error,(long)"Error sending RTCP packet",INT_TO_POINTER(getSocketErrorCode()));
}else{
ortp_warning ("[%p] Error sending rtcp packet %p: %s ; socket=%i; addr=%s",
session, m, getSocketError(), session->rtcp.gs.socket, ortp_inet_ntoa((struct sockaddr*)&session->rtcp.gs.rem_addr,session->rtcp.gs.rem_addrlen,host,sizeof(host)) );
}
} else {
update_sent_bytes(&session->rtcp.gs, error);
update_avg_rtcp_size(session, error);
for(elem=session->rtcp.gs.aux_destinations;elem!=NULL;elem=elem->next){
OrtpAddress *addr=(OrtpAddress*)elem->data;
rtp_session_rtcp_sendto(session,m,(struct sockaddr*)&addr->addr,addr->len,TRUE);
}
}else ortp_message("Not sending rtcp report: sockfd=%i, rem_addrlen=%i, connected=%i",sockfd,session->rtcp.gs.rem_addrlen,using_connected_socket);
}else ortp_message("Not sending rtcp report, rtcp disabled.");
freemsg (m);
return error;
}
......
......@@ -78,4 +78,6 @@ bool_t rtp_session_has_fb_packets_to_send(RtpSession *session);
void rtp_session_send_regular_rtcp_packet_and_reschedule(RtpSession *session, uint64_t tc);
void rtp_session_send_fb_rtcp_packet_and_reschedule(RtpSession *session);
void ortp_stream_clear_aux_addresses(OrtpStream *os);
#endif
......@@ -46,7 +46,7 @@ OList * o_list_append(OList *elem, void * data){
OList * o_list_free(OList *list){
OList *elem = list;
OList *tmp;
return_val_if_fail(list, list);
if (!list) return NULL;
while(elem->next!=NULL) {
tmp = elem;
elem = elem->next;
......
......@@ -36,8 +36,6 @@ struct _OList {
void *data;
};
typedef struct _OList OList;
#define o_list_next(elem) ((elem)->next)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment