Commit 40b2f8b4 authored by Simon Morlat's avatar Simon Morlat

Rework MediaRelay

- don't allow remote address of channel to be set by discovery of an incoming packet before a SIP message sets its for the first time
- take into account a=rtcp (previously if client's RTCP port wasn't rtp port + 1, it could not work)
- set a threshold for connection refused errors to stop outgoing stream.
parent 20edf4ed
......@@ -40,7 +40,7 @@ include(GNUInstallDirs)
option(ENABLE_STRICT "Pass strict flags to the compiler" YES)
option(ENABLE_DATEHANDLER "Build DateHandler module" NO)
option(ENABLE_DOC "Build documentation" NO)
option(ENABLE_PDFDOC "Build pdf documentation" NO)
option(ENABLE_MONOTONIC_CLOCK_REGISTRATIONS "Enable monotonic clock for registrations" NO)
option(ENABLE_ODBC "Build ODBC support for database connection" NO)
option(ENABLE_PRESENCE "Build presence support" NO)
......@@ -209,7 +209,7 @@ if(ENABLE_PROTOBUF)
endif()
endif()
if(ENABLE_DOC)
if(ENABLE_PDFDOC)
FIND_PROGRAM_REQUIRED(PDFLATEX_PROG pdflatex)
endif()
......@@ -229,7 +229,7 @@ endif()
if(ENABLE_SOCI)
find_package(Soci REQUIRED COMPONENTS mysql)
find_path(SOCI_MYSQL_INCLUDES NAMES mysql.h PATH_SUFFIXES mysql)
find_path(SOCI_MYSQL_INCLUDES NAMES mysql.h PATH_SUFFIXES mysql HINTS /usr/include/mariadb )
endif()
......
......@@ -117,20 +117,20 @@ std::pair<string,int> RelayedCall::getChannelSources(int mline, const std::strin
}
/* Obtain destination (previously set by setChannelDestinations()*/
std::pair<string,int> RelayedCall::getChannelDestinations(int mline, const std::string & partyTag, const std::string &trId){
std::tuple<string,int,int> RelayedCall::getChannelDestinations(int mline, const std::string & partyTag, const std::string &trId){
if (mline >= sMaxSessions) {
return make_pair("",0);
return make_tuple("",0,0);
}
shared_ptr<RelaySession> s = mSessions[mline];
if (s != NULL) {
shared_ptr<RelayChannel> chan=s->getChannel(partyTag,trId);
if (chan) return make_pair(chan->getRemoteIp(),chan->getRemotePort());
if (chan) return make_tuple(chan->getRemoteIp(),chan->getRemoteRtpPort(), chan->getRemoteRtcpPort());
}
return make_pair("",0);
return make_tuple("",0,0);
}
void RelayedCall::setChannelDestinations(const shared_ptr<SdpModifier> &m, int mline, const string &ip, int port, const string & partyTag, const string &trId, bool isEarlyMedia){
void RelayedCall::setChannelDestinations(const shared_ptr<SdpModifier> &m, int mline, const string &ip, int rtp_port, int rtcp_port, const string & partyTag, const string &trId, bool isEarlyMedia){
if (mline >= sMaxSessions) {
return;
}
......@@ -169,7 +169,7 @@ void RelayedCall::setChannelDestinations(const shared_ptr<SdpModifier> &m, int m
}
}
configureRelayChannel(chan,m->mSip,m->mSession,mline);
chan->setRemoteAddr(ip, port,dir);
chan->setRemoteAddr(ip, rtp_port, rtcp_port, dir);
}
}
}
......
......@@ -25,6 +25,7 @@
#include "mediarelay.hh"
#include "sdp-modifier.hh"
#include <map>
#include <tuple>
......@@ -47,9 +48,9 @@ public:
std::pair<std::string,int> getChannelSources(int mline, const std::string & partyTag, const std::string &trId);
/* Obtain destination (previously set by setChannelDestinations()*/
std::pair<std::string,int> getChannelDestinations(int mline, const std::string & partyTag, const std::string &trId);
std::tuple<std::string,int,int> getChannelDestinations(int mline, const std::string & partyTag, const std::string &trId);
void setChannelDestinations(const std::shared_ptr<SdpModifier> &m, int mline, const std::string &ip, int port, const std::string & partyTag, const std::string &trId,
void setChannelDestinations(const std::shared_ptr<SdpModifier> &m, int mline, const std::string &ip, int rtp_port, int rtcp_port, const std::string & partyTag, const std::string &trId,
bool isEarlyMedia);
void removeBranch(const std::string &trId);
......
......@@ -63,7 +63,7 @@ unsigned int PollFd::getREvents(int index) const {
RelayChannel::RelayChannel(RelaySession *relaySession, const std::pair<std::string, std::string> &relayIps,
bool preventLoops)
: mDir(SendRecv), mLocalIp(relayIps.first), mRemoteIp(std::string("undefined")), mRemotePort(-1) {
: mDir(SendRecv), mLocalIp(relayIps.first), mRemoteIp(std::string("undefined")) {
mPfdIndex = -1;
mSession = relaySession->getRelayServer()->createRtpSession(relayIps.second);
mSockets[0] = rtp_session_get_rtp_socket(mSession);
......@@ -74,6 +74,8 @@ RelayChannel::RelayChannel(RelaySession *relaySession, const std::pair<std::stri
mPreventLoop = preventLoops;
mHasMultipleTargets = false;
mDestAddrChanged = false;
mRecvErrorCount[0] = mRecvErrorCount[1] = 0;
mRemotePort[0] = mRemotePort[1] = -1;
}
bool RelayChannel::checkSocketsValid() {
......@@ -96,23 +98,24 @@ const char *RelayChannel::dirToString(Dir dir) {
return "invalid";
}
void RelayChannel::setRemoteAddr(const string &ip, int port, Dir dir) {
LOGD("RelayChannel [%p] is now configured local=[%s:%i] remote=[%s:%i] dir=[%s]", this, getLocalIp().c_str(),
getLocalPort(), ip.c_str(), port, dirToString(dir));
void RelayChannel::setRemoteAddr(const string &ip, int rtp_port, int rtcp_port, Dir dir) {
LOGD("RelayChannel [%p] is now configured local=[%s|%i:%i] remote=[%s|%i:%i] dir=[%s]", this, getLocalIp().c_str(),
getLocalPort(), getLocalPort()+1, ip.c_str(), rtp_port, rtcp_port, dirToString(dir));
bool dest_ok = true;
if (port > 0 && mPreventLoop) {
if (rtp_port > 0 && mPreventLoop) {
if (strcmp(ip.c_str(), getLocalIp().c_str()) == 0) {
LOGW("RelayChannel [%p] wants to loop to local machine, not allowed.", this);
dest_ok = false;
}
}
mRemotePort = port;
mRemotePort[0] = rtp_port;
mRemotePort[1] = rtcp_port;
mRemoteIp = ip;
mDir = dir;
if (dest_ok && port != 0) {
if (dest_ok && rtp_port != 0) {
struct addrinfo *res = NULL;
struct addrinfo hints = {0};
char portstr[20];
......@@ -122,27 +125,20 @@ void RelayChannel::setRemoteAddr(const string &ip, int port, Dir dir) {
LOGW("RelayChannel [%p] is being set new destination address but was fixed previously in this session, so ignoring this request.", this);
return;
}
for (int i = 0; i < 2; ++i){
mRecvErrorCount[i] = 0;
snprintf(portstr, sizeof(portstr), "%i", mRemotePort[i]);
hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
err = getaddrinfo(ip.c_str(), portstr, &hints, &res);
if (err != 0) {
LOGE("RelayChannel::RelayChannel() failed for %s:%i : %s", ip.c_str(), mRemotePort[i], gai_strerror(err));
} else {
memcpy(&mSockAddr[i], res->ai_addr, res->ai_addrlen);
mSockAddrSize[i] = res->ai_addrlen;
freeaddrinfo(res);
}
snprintf(portstr, sizeof(portstr), "%i", port);
hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
err = getaddrinfo(ip.c_str(), portstr, &hints, &res);
if (err != 0) {
LOGE("RelayChannel::RelayChannel() failed for %s:%i : %s", ip.c_str(), port, gai_strerror(err));
} else {
memcpy(&mSockAddr[0], res->ai_addr, res->ai_addrlen);
mSockAddrSize[0] = res->ai_addrlen;
freeaddrinfo(res);
}
snprintf(portstr, sizeof(portstr), "%i", port + 1);
hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
err = getaddrinfo(ip.c_str(), portstr, &hints, &res);
if (err != 0) {
LOGE("RelayChannel::RelayChannel() failed for %s:%i : %s", ip.c_str(), port, gai_strerror(err));
} else {
memcpy(&mSockAddr[1], res->ai_addr, res->ai_addrlen);
mSockAddrSize[1] = res->ai_addrlen;
freeaddrinfo(res);
}
} else {
/*case where client declined the stream (0 port in SDP) or destination address is invalid*/
......@@ -176,6 +172,14 @@ int RelayChannel::recv(int i, uint8_t *buf, size_t buflen) {
int err = recvfrom(mSockets[i], buf, buflen, 0, (struct sockaddr *)&ss, &addrsize);
if (err > 0) {
mPacketsReceived++;
if (mSockAddrSize[i] == 0){
/* Remote destination has never been set previously (for example if 183 or 200 OK is not yet received),
* but we receive a packet.
* Our policy is to drop the packet until the destination address is set.*/
LOGW("RelayChannel[%p]: remote address not set, packet ignored.", this);
return 0;
}
mRecvErrorCount[i] = 0;
if (addrsize != mSockAddrSize[i] || memcmp(&ss, &mSockAddr[i], addrsize) != 0 ){
LOGD("RelayChannel[%p] destination address changed.", this);
mSockAddrSize[i] = addrsize;
......@@ -193,11 +197,10 @@ int RelayChannel::recv(int i, uint8_t *buf, size_t buflen) {
return 0;
}
} else if (err == -1) {
LOGW("Error receiving on port %i from %s:%i: %s", getLocalPort(), mRemoteIp.c_str(), mRemotePort + i,
LOGW("Error receiving on port %i from %s:%i: %s", getLocalPort(), mRemoteIp.c_str(), mRemotePort[i],
strerror(errno));
if (errno == ECONNREFUSED) {
/*this will avoid to continue sending if there are ICMP errors*/
mSockAddrSize[i] = 0;
mRecvErrorCount[i]++;
}
}
return err;
......@@ -206,16 +209,16 @@ int RelayChannel::recv(int i, uint8_t *buf, size_t buflen) {
int RelayChannel::send(int i, uint8_t *buf, size_t buflen) {
int err = 0;
/*if destination address is working mSockAddrSize>0*/
if (mRemotePort > 0 && mSockAddrSize[i] > 0 && mDir != Inactive) {
if (mRemotePort[i] > 0 && mSockAddrSize[i] > 0 && mDir != Inactive && mRecvErrorCount[i] < sMaxRecvErrors) {
if (!mFilter || mFilter->onOutgoingTransfer(buf, buflen, (struct sockaddr *)&mSockAddr[i], mSockAddrSize[i])) {
err = sendto(mSockets[i], buf, buflen, 0, (struct sockaddr *)&mSockAddr[i], mSockAddrSize[i]);
mPacketsSent++;
if (err == -1) {
LOGW("Error sending %i bytes (localport=%i dest=%s:%i) : %s", (int)buflen, getLocalPort() + i,
mRemoteIp.c_str(), mRemotePort, strerror(errno));
mRemoteIp.c_str(), mRemotePort[i], strerror(errno));
} else if (err != (int)buflen) {
LOGW("Only %i bytes sent over %i bytes (localport=%i dest=%s:%i)", err, (int)buflen, getLocalPort() + i,
mRemoteIp.c_str(), mRemotePort + i);
mRemoteIp.c_str(), mRemotePort[i]);
}
}
} else {
......@@ -283,7 +286,7 @@ int RelaySession::getActiveBranchesCount() {
int count = 0;
mMutex.lock();
for (auto it = mBacks.begin(); it != mBacks.end(); ++it) {
if ((*it).second->getRemotePort() > 0)
if ((*it).second->getRemoteRtpPort() > 0)
count++;
}
mMutex.unlock();
......
......@@ -201,12 +201,15 @@ class RelayChannel : public SdpMasqueradeContext{
RelayChannel(RelaySession *relaySession, const std::pair<std::string, std::string> &relayIps, bool preventLoops);
~RelayChannel();
bool checkSocketsValid();
void setRemoteAddr(const std::string &ip, int port, Dir dir);
void setRemoteAddr(const std::string &ip, int port, int rtcp_port, Dir dir);
const std::string &getRemoteIp() const {
return mRemoteIp;
}
int getRemotePort() const {
return mRemotePort;
int getRemoteRtpPort() const {
return mRemotePort[0];
}
int getRemoteRtcpPort() const{
return mRemotePort[1];
}
const std::string &getLocalIp() const {
return mLocalIp;
......@@ -234,16 +237,18 @@ class RelayChannel : public SdpMasqueradeContext{
static const char *dirToString(Dir dir);
private:
static const int sMaxRecvErrors = 50;
Dir mDir;
std::string mLocalIp;
std::string mRemoteIp;
int mRemotePort;
int mRemotePort[2];
RtpSession *mSession;
int mSockets[2];
struct sockaddr_storage mSockAddr[2]; /*the destination address in use*/
socklen_t mSockAddrSize[2];
std::shared_ptr<MediaFilter> mFilter;
int mPfdIndex;
int mRecvErrorCount[2];
uint64_t mPacketsSent;
uint64_t mPacketsReceived;
bool mPreventLoop;
......
......@@ -181,7 +181,7 @@ bool MediaRelay::processNewInvite(const shared_ptr<RelayedCall> &c, const shared
}
// assign destination address of offerer
m->iterateInOffer(bind(&RelayedCall::setChannelDestinations, c, m, _1, _2, _3, from_tag, transaction->getBranchId(),false));
m->iterateInOffer(bind(&RelayedCall::setChannelDestinations, c, m, _1, _2, _3, _4, from_tag, transaction->getBranchId(),false));
// Masquerade using ICE
m->addIceCandidateInOffer(bind(&RelayedCall::getChannelSources, c, _1, to_tag, transaction->getBranchId()),
......@@ -292,7 +292,7 @@ void MediaRelay::processResponseWithSDP(const shared_ptr<RelayedCall> &c, const
return;
}
//acquire destination ip/ports from answerer
m->iterateInAnswer(bind(&RelayedCall::setChannelDestinations, c, m, _1, _2, _3, to_tag, transaction->getBranchId(),isEarlyMedia));
m->iterateInAnswer(bind(&RelayedCall::setChannelDestinations, c, m, _1, _2, _3, _4, to_tag, transaction->getBranchId(),isEarlyMedia));
//push ICE relay candidates if necessary, and update the ICE states.
m->addIceCandidateInAnswer(bind(&RelayedCall::getChannelSources, c, _1, sip->sip_from->a_tag, transaction->getBranchId()),
......
......@@ -433,7 +433,7 @@ void SdpModifier::changeMediaConnection(sdp_media_t *mline, const char *relay_ip
}
void SdpModifier::addIceCandidate(std::function< std::pair<std::string,int>(int )> getRelayAddrFcn,
std::function< std::pair<std::string,int>(int )> getDestAddrFcn, std::function< MasqueradeContextPair(int )> getMasqueradeContexts, bool isOffer, bool forceRelay){
std::function< std::tuple<std::string,int,int>(int )> getDestAddrFcn, std::function< MasqueradeContextPair(int )> getMasqueradeContexts, bool isOffer, bool forceRelay){
char foundation[32];
sdp_media_t *mline=mSession->sdp_media;
uint64_t r;
......@@ -480,7 +480,7 @@ void SdpModifier::addIceCandidate(std::function< std::pair<std::string,int>(int
priority = (65535 << 8) | (256 - componentID);
ostringstream candidate_line;
candidate_line << foundation << ' ' << componentID << " UDP " << priority << ' ' << relayAddr.first.c_str() << ' ' << relayAddr.second + componentID - 1
<< " typ relay raddr " << destAddr.first << " rport " << destAddr.second + componentID - 1;
<< " typ relay raddr " << std::get<0>(destAddr) << " rport " << (componentID == 1 ? std::get<1>(destAddr) : std::get<2>(destAddr));
addMediaAttribute(mline, "candidate", candidate_line.str().c_str());
}
}
......@@ -490,16 +490,16 @@ void SdpModifier::addIceCandidate(std::function< std::pair<std::string,int>(int
}
void SdpModifier::addIceCandidateInOffer(std::function< std::pair<std::string,int>(int )> getRelayAddrFcn,
std::function< std::pair<std::string,int>(int )> getDestAddrFcn, std::function< MasqueradeContextPair(int )> getMasqueradeContexts, bool forceRelay){
std::function< std::tuple<std::string,int,int>(int )> getDestAddrFcn, std::function< MasqueradeContextPair(int )> getMasqueradeContexts, bool forceRelay){
addIceCandidate(getRelayAddrFcn, getDestAddrFcn, getMasqueradeContexts, true, forceRelay);
}
void SdpModifier::addIceCandidateInAnswer(std::function< std::pair<std::string,int>(int )> getRelayAddrFcn,
std::function< std::pair<std::string,int>(int )> getDestAddrFcn, std::function< MasqueradeContextPair(int )> getMasqueradeContexts, bool forceRelay){
std::function< std::tuple<std::string,int,int>(int )> getDestAddrFcn, std::function< MasqueradeContextPair(int )> getMasqueradeContexts, bool forceRelay){
addIceCandidate(getRelayAddrFcn, getDestAddrFcn, getMasqueradeContexts, false, forceRelay);
}
void SdpModifier::iterate(function<void(int, const string &, int )> fct){
void SdpModifier::iterate(function<void(int, const string &, int, int )> fct){
sdp_media_t *mline=mSession->sdp_media;
int i;
string global_c_address;
......@@ -509,18 +509,24 @@ void SdpModifier::iterate(function<void(int, const string &, int )> fct){
for(i=0;mline!=NULL;mline=mline->m_next,++i){
string ip=(mline->m_connections && mline->m_connections->c_address) ? mline->m_connections->c_address : global_c_address;
int port=mline->m_port;
int rtcp_port = port != 0 ? port + 1 : 0;
if (hasMediaAttribute(mline,mNortproxy.c_str()) ) continue;
fct(i, ip, port);
sdp_attribute_t *a_rtcp = sdp_attribute_find(mline->m_attributes,"rtcp");
if (a_rtcp && a_rtcp->a_value){
istringstream ist(string(a_rtcp->a_value));
ist >> rtcp_port;
}
fct(i, ip, port, rtcp_port);
}
}
void SdpModifier::iterateInOffer( function<void(int, const string &, int )> fct ) {
void SdpModifier::iterateInOffer( function<void(int, const string &, int, int )> fct ) {
iterate(fct);
}
void SdpModifier::iterateInAnswer( function<void(int, const string &, int )> fct) {
void SdpModifier::iterateInAnswer( function<void(int, const string &, int, int )> fct) {
iterate(fct);
}
......
......@@ -23,6 +23,7 @@
#include <string>
#include <list>
#include <memory>
#include <tuple>
#include "ortp/payloadtype.h"
......@@ -77,15 +78,15 @@ class SdpModifier{
void changeConnection(sdp_connection_t *c, const char *ip);
void changeMediaConnection(sdp_media_t *mline, const char *relay_ip);
void addIceCandidateInOffer(std::function< std::pair<std::string,int>(int )> getRelayAddrFcn,
std::function< std::pair<std::string,int>(int )> getDestAddrFcn,
std::function< std::tuple<std::string,int,int>(int )> getDestAddrFcn,
std::function< MasqueradeContextPair(int )> getMasqueradeContexts,
bool forceRelay);
void addIceCandidateInAnswer(std::function< std::pair<std::string,int>(int )> getRelayAddrFcn,
std::function< std::pair<std::string,int>(int )> getDestAddrFcn,
std::function< std::tuple<std::string,int,int>(int )> getDestAddrFcn,
std::function< MasqueradeContextPair(int )> getMasqueradeContexts,
bool forceRelay);
void iterateInOffer(std::function<void(int, const std::string &, int)>);
void iterateInAnswer(std::function<void(int, const std::string &, int)>);
void iterateInOffer(std::function<void(int, const std::string &, int, int)>);
void iterateInAnswer(std::function<void(int, const std::string &, int, int)>);
void masqueradeInOffer(std::function< std::pair<std::string,int>(int )> getAddrFcn);
void masqueradeInAnswer(std::function< std::pair<std::string,int>(int )> getAddrFcn);
void addAttribute(const char *name, const char *value);
......@@ -101,8 +102,8 @@ class SdpModifier{
sip_t *mSip;
private:
void addIceCandidate(std::function< std::pair<std::string,int>(int )> getRelayAddrFcn,
std::function< std::pair<std::string,int>(int )> getDestAddrFcn, std::function< MasqueradeContextPair(int )> getMasqueradeContexts, bool isOffer, bool forceRelay);
void iterate(std::function<void(int, const std::string &, int)>);
std::function< std::tuple<std::string,int,int>(int )> getDestAddrFcn, std::function< MasqueradeContextPair(int )> getMasqueradeContexts, bool isOffer, bool forceRelay);
void iterate(std::function<void(int, const std::string &, int, int)>);
void masquerade(std::function< std::pair<std::string,int>(int )> getAddrFcn);
void changeRtcpAttr(sdp_media_t *mline, const std::string & relayAddr, int port);
sdp_parser_t *mParser;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment