mediarelay.cc 15.7 KB
Newer Older
Simon Morlat's avatar
Simon Morlat committed
1
/*
2 3
	Flexisip, a flexible SIP proxy server with media capabilities.
	Copyright (C) 2010-2015  Belledonne Communications SARL, All rights reserved.
4

5 6 7 8
	This program is free software: you can redistribute it and/or modify
	it under the terms of the GNU Affero General Public License as
	published by the Free Software Foundation, either version 3 of the
	License, or (at your option) any later version.
9

10 11 12 13
	This program is distributed in the hope that it will be useful,
	but WITHOUT ANY WARRANTY; without even the implied warranty of
	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
	GNU Affero General Public License for more details.
14

15 16
	You should have received a copy of the GNU Affero General Public License
	along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
*/
Simon Morlat's avatar
Simon Morlat committed
18

19
#include "flexisip-config.h"
Simon Morlat's avatar
Simon Morlat committed
20 21 22 23
#include "agent.hh"
#include "mediarelay.hh"

#include <poll.h>
24 25
#include <sys/time.h>
#include <sys/resource.h>
Simon Morlat's avatar
Simon Morlat committed
26 27 28 29

#include <algorithm>
#include <list>

30
using namespace std;
Simon Morlat's avatar
Simon Morlat committed
31

32 33 34
PollFd::PollFd(int init_size) : mCurSize(init_size) {
	mPfd = (struct pollfd *)malloc(mCurSize * sizeof(struct pollfd));
	mCurIndex = 0;
35 36
}

37
PollFd::~PollFd() {
38 39 40
	free(mPfd);
}

41 42
void PollFd::reset() {
	mCurIndex = 0;
43 44
}

45 46 47 48
int PollFd::addFd(int fd, unsigned int events) {
	if (mCurIndex == mCurSize) {
		mCurSize *= 2;
		mPfd = (struct pollfd *)realloc(mPfd, mCurSize * sizeof(struct pollfd));
49
	}
50 51 52
	mPfd[mCurIndex].fd = fd;
	mPfd[mCurIndex].events = events;
	mPfd[mCurIndex].revents = 0;
53 54 55
	return mCurIndex++;
}

56 57
unsigned int PollFd::getREvents(int index) const {
	if (index >= mCurSize) {
58 59 60 61 62 63
		LOGA("Bad access to pollfd table.");
		return 0;
	}
	return mPfd[index].revents;
}

64 65
RelayChannel::RelayChannel(RelaySession *relaySession, const std::pair<std::string, std::string> &relayIps,
						   bool preventLoops)
Simon Morlat's avatar
Simon Morlat committed
66
	: mDir(SendRecv), mLocalIp(relayIps.first), mRemoteIp(std::string("undefined")) {
67
	mPfdIndex = -1;
68
	mSession = relaySession->getRelayServer()->createRtpSession(relayIps.second);
69 70
	mSockets[0] = rtp_session_get_rtp_socket(mSession);
	mSockets[1] = rtp_session_get_rtcp_socket(mSession);
71
	mSockAddrSize[0] = mSockAddrSize[1] = 0;
72 73 74
	mPacketsReceived = 0;
	mPacketsSent = 0;
	mPreventLoop = preventLoops;
75
	mHasMultipleTargets = false;
76
	mDestAddrChanged = false;
Simon Morlat's avatar
Simon Morlat committed
77 78
	mRecvErrorCount[0] = mRecvErrorCount[1] = 0;
	mRemotePort[0] = mRemotePort[1] = -1;
79 80
}

81
bool RelayChannel::checkSocketsValid() {
82
	return mSockets[0] != -1 && mSockets[1] != -1;
83
}
84

85
RelayChannel::~RelayChannel() {
Yann Diorcet's avatar
Yann Diorcet committed
86
	rtp_session_destroy(mSession);
87 88
}

89 90
const char *RelayChannel::dirToString(Dir dir) {
	switch (dir) {
91 92 93 94
		case SendOnly:
			return "SendOnly";
		case SendRecv:
			return "SendRecv";
95 96
		case Inactive:
			return "Inactive";
97
	}
98
	return "invalid";
99 100
}

Simon Morlat's avatar
Simon Morlat committed
101 102 103
void RelayChannel::setRemoteAddr(const string &ip, int rtp_port, int rtcp_port, Dir dir) {
	LOGD("RelayChannel [%p] is now configured local=[%s|%i:%i]  remote=[%s|%i:%i] dir=[%s]", this, getLocalIp().c_str(),
		 getLocalPort(), getLocalPort()+1, ip.c_str(), rtp_port, rtcp_port, dirToString(dir));
104 105
	bool dest_ok = true;

Simon Morlat's avatar
Simon Morlat committed
106
	if (rtp_port > 0 && mPreventLoop) {
107 108 109
		if (strcmp(ip.c_str(), getLocalIp().c_str()) == 0) {
			LOGW("RelayChannel [%p] wants to loop to local machine, not allowed.", this);
			dest_ok = false;
110 111
		}
	}
112

Simon Morlat's avatar
Simon Morlat committed
113 114
	mRemotePort[0] = rtp_port;
	mRemotePort[1] = rtcp_port;
115
	mRemoteIp = ip;
116
	mDir = dir;
117

Simon Morlat's avatar
Simon Morlat committed
118
	if (dest_ok && rtp_port != 0) {
119
		struct addrinfo *res = NULL;
120
		struct addrinfo hints = {0};
121 122
		char portstr[20];
		int err;
123

124 125 126 127
		if (mDestAddrChanged){
			LOGW("RelayChannel [%p] is being set new destination address but was fixed previously in this session, so ignoring this request.", this);
			return;
		}
128

Simon Morlat's avatar
Simon Morlat committed
129 130 131 132 133 134 135 136 137 138 139 140
		for (int i = 0; i < 2; ++i){
			mRecvErrorCount[i] = 0;
			snprintf(portstr, sizeof(portstr), "%i", mRemotePort[i]);
			hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
			err = getaddrinfo(ip.c_str(), portstr, &hints, &res);
			if (err != 0) {
				LOGE("RelayChannel::RelayChannel() failed for %s:%i : %s", ip.c_str(), mRemotePort[i], gai_strerror(err));
			} else {
				memcpy(&mSockAddr[i], res->ai_addr, res->ai_addrlen);
				mSockAddrSize[i] = res->ai_addrlen;
				freeaddrinfo(res);
			}
141 142

		}
143
	} else {
144
		/*case where client declined the stream (0 port in SDP) or destination address is invalid*/
145
		mDestAddrChanged = false;
146 147
		mSockAddrSize[0] = 0;
		mSockAddrSize[1] = 0;
Simon Morlat's avatar
Simon Morlat committed
148
	}
149
}
Yann Diorcet's avatar
Yann Diorcet committed
150

151
void RelayChannel::fillPollFd(PollFd *pfd) {
152 153 154
	mPfdIndex = -1;
	if (mSockets[0] == -1)
		return; // no socket to monitor
155
	for (int i = 0; i < 2; ++i) {
156 157 158
		int index = pfd->addFd(mSockets[i], POLLIN);
		if (mPfdIndex == -1)
			mPfdIndex = index;
159
	}
Simon Morlat's avatar
Simon Morlat committed
160
}
Yann Diorcet's avatar
Yann Diorcet committed
161

162 163 164
bool RelayChannel::checkPollFd(const PollFd *pfd, int i) {
	if (mPfdIndex != -1) {
		return pfd->getREvents(mPfdIndex + i);
165 166 167 168
	}
	return false;
}

169
int RelayChannel::recv(int i, uint8_t *buf, size_t buflen) {
170 171
	struct sockaddr_storage ss;
	socklen_t addrsize = sizeof(ss);
172

173
	int err = recvfrom(mSockets[i], buf, buflen, 0, (struct sockaddr *)&ss, &addrsize);
174
	if (err > 0) {
175
		mPacketsReceived++;
Simon Morlat's avatar
Simon Morlat committed
176 177 178 179 180 181 182 183
		if (mSockAddrSize[i] == 0){
			/* Remote destination has never been set previously (for example if 183 or 200 OK is not yet received),
			 * but we receive a packet.
			 * Our policy is to drop the packet until the destination address is set.*/
			LOGW("RelayChannel[%p]: remote address not set, packet ignored.", this);
			return 0;
		}
		mRecvErrorCount[i] = 0;
184 185 186 187 188 189
		if (addrsize != mSockAddrSize[i] || memcmp(&ss, &mSockAddr[i], addrsize) != 0 ){
			LOGD("RelayChannel[%p] destination address changed.", this);
			mSockAddrSize[i] = addrsize;
			memcpy(&mSockAddr[i], &ss, addrsize);
			mDestAddrChanged = true;
		}
190

Simon Morlat's avatar
Simon Morlat committed
191
		mSockAddrSize[i] = addrsize;
192
		if (mDir == SendOnly || mDir == Inactive) {
193
			/*LOGD("ignored packet");*/
194 195
			return 0;
		}
196 197
		if (mFilter &&
			mFilter->onIncomingTransfer(buf, buflen, (struct sockaddr *)&mSockAddr[i], mSockAddrSize[i]) == false) {
198 199
			return 0;
		}
200
	} else if (err == -1) {
Simon Morlat's avatar
Simon Morlat committed
201
		LOGW("Error receiving on port %i from %s:%i: %s", getLocalPort(), mRemoteIp.c_str(), mRemotePort[i],
202 203
			 strerror(errno));
		if (errno == ECONNREFUSED) {
Simon Morlat's avatar
Simon Morlat committed
204
			mRecvErrorCount[i]++;
205
		}
Yann Diorcet's avatar
Yann Diorcet committed
206 207
	}
	return err;
Simon Morlat's avatar
Simon Morlat committed
208 209
}

210
int RelayChannel::send(int i, uint8_t *buf, size_t buflen) {
211
	int err = 0;
212
	/*if destination address is working mSockAddrSize>0*/
Simon Morlat's avatar
Simon Morlat committed
213
	if (mRemotePort[i] > 0 && mSockAddrSize[i] > 0 && mDir != Inactive && mRecvErrorCount[i] < sMaxRecvErrors) {
214 215
		if (!mFilter || mFilter->onOutgoingTransfer(buf, buflen, (struct sockaddr *)&mSockAddr[i], mSockAddrSize[i])) {
			err = sendto(mSockets[i], buf, buflen, 0, (struct sockaddr *)&mSockAddr[i], mSockAddrSize[i]);
216
			mPacketsSent++;
217 218
			if (err == -1) {
				LOGW("Error sending %i bytes (localport=%i dest=%s:%i) : %s", (int)buflen, getLocalPort() + i,
Simon Morlat's avatar
Simon Morlat committed
219
					 mRemoteIp.c_str(), mRemotePort[i], strerror(errno));
220 221
			} else if (err != (int)buflen) {
				LOGW("Only %i bytes sent over %i bytes (localport=%i dest=%s:%i)", err, (int)buflen, getLocalPort() + i,
Simon Morlat's avatar
Simon Morlat committed
222
					 mRemoteIp.c_str(), mRemotePort[i]);
223 224
			}
		}
225
	} else {
226
		/*LOGW("Not sending media, destination not valid or inactive stream."); */
Yann Diorcet's avatar
Yann Diorcet committed
227
	}
228
	return err;
229 230
}

231 232
void RelayChannel::setFilter(shared_ptr<MediaFilter> filter) {
	mFilter = filter;
233 234
}

235 236 237
RelaySession::RelaySession(MediaRelayServer *server, const string &frontId,
						   const std::pair<std::string, std::string> &relayIps)
	: mServer(server), mFrontId(frontId) {
238
	mLastActivityTime = getCurrentTime();
239
	mUsed = true;
240
	mFront = make_shared<RelayChannel>(this, relayIps, mServer->loopPreventionEnabled());
241 242
}

243 244 245 246 247
shared_ptr<RelayChannel> RelaySession::getChannel(const string &partyId, const string &trId) {
	if (partyId == mFrontId)
		return mFront;
	if (mBack)
		return mBack;
248

249
	shared_ptr<RelayChannel> ret;
250

251
	mMutex.lock();
252 253 254
	auto it = mBacks.find(trId);
	if (it != mBacks.end()) {
		ret = (*it).second;
255
	}
256
	mMutex.unlock();
257
	return ret;
258 259
}

260
std::shared_ptr<RelayChannel> RelaySession::createBranch(const std::string &trId,
261 262
		 const std::pair<std::string, std::string> &relayIps,
		 bool hasMultipleTargets) {
263
	shared_ptr<RelayChannel> ret;
264
	mMutex.lock();
265
	ret = make_shared<RelayChannel>(this, relayIps, mServer->loopPreventionEnabled());
266
	ret->setMultipleTargets(hasMultipleTargets);
267
	mBacks.insert(make_pair(trId, ret));
268
	mMutex.unlock();
269
	LOGD("RelaySession [%p]: branch corresponding to transaction [%s] added.", this, trId.c_str());
270
	return ret;
271 272
}

273 274
void RelaySession::removeBranch(const std::string &trId) {
	bool removed = false;
275
	mMutex.lock();
276 277 278
	auto it = mBacks.find(trId);
	if (it != mBacks.end()) {
		removed = true;
279 280 281
		mBacks.erase(it);
	}
	mMutex.unlock();
282
	if (removed) {
283
		LOGD("RelaySession [%p]: branch corresponding to transaction [%s] removed.", this, trId.c_str());
284
	}
285 286
}

287
int RelaySession::getActiveBranchesCount() {
288 289
	int count = 0;
	mMutex.lock();
290
	for (auto it = mBacks.begin(); it != mBacks.end(); ++it) {
Simon Morlat's avatar
Simon Morlat committed
291
		if ((*it).second->getRemoteRtpPort() > 0)
292
			count++;
293 294 295 296 297 298
	}
	mMutex.unlock();
	LOGD("getActiveBranchesCount(): %i", count);
	return count;
}

299 300 301 302 303 304
void RelaySession::setEstablished(const std::string &tr_id) {
	if (mBack)
		return;
	shared_ptr<RelayChannel> winner = getChannel("", tr_id);
	if (winner) {
		LOGD("RelaySession [%p] is established.", this);
305
		mMutex.lock();
306
		mBack = winner;
307 308
		mBacks.clear();
		mMutex.unlock();
309
	} else LOGE("RelaySession [%p] is with from an unknown branch [%s].", this, tr_id.c_str());
310 311
}

312
void RelaySession::fillPollFd(PollFd *pfd) {
313
	mMutex.lock();
314

315 316 317 318 319 320
	if (mFront)
		mFront->fillPollFd(pfd);
	if (mBack)
		mBack->fillPollFd(pfd);
	else {
		for (auto it = mBacks.begin(); it != mBacks.end(); ++it) {
321 322 323
			(*it).second->fillPollFd(pfd);
		}
	}
324
	mMutex.unlock();
325 326
}

327
void RelaySession::checkPollFd(const PollFd *pfd, time_t curtime) {
328
	int i;
329
	mMutex.lock();
330 331 332 333 334 335 336 337
	for (i = 0; i < 2; ++i) {
		if (mFront && mFront->checkPollFd(pfd, i))
			transfer(curtime, mFront, i);
		if (!mBack) {
			for (auto it = mBacks.begin(); it != mBacks.end(); ++it) {
				shared_ptr<RelayChannel> chan = (*it).second;
				if (chan->checkPollFd(pfd, i))
					transfer(curtime, chan, i);
338
			}
339 340
		} else if (mBack->checkPollFd(pfd, i)) {
			transfer(curtime, mBack, i);
341 342
		}
	}
343
	mMutex.unlock();
Simon Morlat's avatar
Simon Morlat committed
344 345
}

346
RelaySession::~RelaySession() {
Simon Morlat's avatar
Simon Morlat committed
347
	LOGD("RelaySession %p destroyed", this);
Simon Morlat's avatar
Simon Morlat committed
348 349
}

Yann Diorcet's avatar
Yann Diorcet committed
350
void RelaySession::unuse() {
351
	struct statistics {
Simon Morlat's avatar
Simon Morlat committed
352 353 354
		int port;
		unsigned long recv;
		unsigned long sent;
355
	} front = {0, 0, 0}, back = {0, 0, 0};
356

357
	LOGD("RelaySession [%p] terminated.", this);
358

Simon Morlat's avatar
Simon Morlat committed
359
	mMutex.lock();
Yann Diorcet's avatar
Yann Diorcet committed
360
	mUsed = false;
361 362 363 364
	if (mFront) {
		front.port = mFront->getLocalPort();
		front.recv = mFront->getReceivedPackets();
		front.sent = mFront->getSentPackets();
365
	}
366 367 368 369
	if (mBack) {
		back.port = mBack->getLocalPort();
		back.recv = mBack->getReceivedPackets();
		back.sent = mBack->getSentPackets();
370
	}
371
	mFront.reset();
Simon Morlat's avatar
Simon Morlat committed
372
	mBacks.clear();
373
	mBack.reset();
Simon Morlat's avatar
Simon Morlat committed
374
	mMutex.unlock();
375

Simon Morlat's avatar
Simon Morlat committed
376
	/*do not log while holding a mutex*/
377
	if (front.port > 0) {
378
		LOGD("Front on port [%i] received [%lu] and sent [%lu] packets.", front.port, front.recv, front.sent);
379 380
	}
	if (back.port > 0) {
381
		LOGD("Back on port [%i] received [%lu] and sent [%lu] packets.", back.port, back.recv, back.sent);
382
	}
Simon Morlat's avatar
Simon Morlat committed
383 384
}

385
bool RelaySession::checkChannels() {
386
	mMutex.lock();
387
	for (auto itb = mBacks.begin(); itb != mBacks.end(); ++itb) {
388
		if (!(*itb).second->checkSocketsValid()) {
389 390 391 392
			mMutex.unlock();
			return false;
		}
	}
Simon Morlat's avatar
Simon Morlat committed
393 394 395
	if (!mFront->checkSocketsValid()) {
		mMutex.unlock();
		return false;
396 397 398 399 400
	}
	mMutex.unlock();
	return true;
}

401
void RelaySession::transfer(time_t curtime, const shared_ptr<RelayChannel> &chan, int i) {
Simon Morlat's avatar
Simon Morlat committed
402
	uint8_t buf[1500];
Yann Diorcet's avatar
Yann Diorcet committed
403
	const int maxsize = sizeof(buf);
404
	int recv_len;
405

406
	mLastActivityTime = curtime;
407
	recv_len = chan->recv(i, buf, maxsize);
408
	if (recv_len > 0) {
409 410 411 412 413
		if (chan == mFront) {
			if (mBack) {
				mBack->send(i, buf, recv_len);
			} else {
				for (auto it = mBacks.begin(); it != mBacks.end(); ++it) {
414
					shared_ptr<RelayChannel> dest = (*it).second;
415
					dest->send(i, buf, recv_len);
416
				}
Yann Diorcet's avatar
Yann Diorcet committed
417
			}
418 419
		} else {
			mFront->send(i, buf, recv_len);
Simon Morlat's avatar
Simon Morlat committed
420 421 422 423
		}
	}
}

424
MediaRelayServer::MediaRelayServer(MediaRelay *module) : mModule(module) {
Yann Diorcet's avatar
Yann Diorcet committed
425
	mRunning = false;
426
	mSessionsCount = 0;
Yann Diorcet's avatar
Yann Diorcet committed
427
	if (pipe(mCtlPipe) == -1) {
Simon Morlat's avatar
Simon Morlat committed
428 429 430
		LOGF("Could not create MediaRelayServer control pipe.");
	}
}
431

432
Agent *MediaRelayServer::getAgent() {
433
	return mModule->getAgent();
434
}
435

436
RtpSession *MediaRelayServer::createRtpSession(const std::string &bindIp) {
437
	RtpSession *session = rtp_session_new(RTP_SESSION_SENDRECV);
438 439 440
#if ORTP_HAS_REUSEADDR
	rtp_session_set_reuseaddr(session, FALSE);
#endif
441
	for (int i = 0; i < 100; ++i) {
442
		int port = ((rand() % (mModule->mMaxPort - mModule->mMinPort)) + mModule->mMinPort) & 0xfffe;
443

444
#if ORTP_ABI_VERSION >= 9
445
		if (rtp_session_set_local_addr(session, bindIp.c_str(), port, port + 1) == 0) {
446
#else
447
		if (rtp_session_set_local_addr(session, bindIp.c_str(), port) == 0) {
448
#endif
449 450 451 452
			return session;
		}
	}

Simon Morlat's avatar
Simon Morlat committed
453
	LOGE("Could not find a random port on interface %s !", bindIp.c_str());
454 455 456
	return session;
}

Yann Diorcet's avatar
Yann Diorcet committed
457 458 459
void MediaRelayServer::start() {
	mRunning = true;
	pthread_create(&mThread, NULL, &MediaRelayServer::threadFunc, this);
460 461
}

Yann Diorcet's avatar
Yann Diorcet committed
462 463 464 465
MediaRelayServer::~MediaRelayServer() {
	if (mRunning) {
		mRunning = false;
		if (write(mCtlPipe[1], "e", 1) == -1)
466
			LOGE("MediaRelayServer: Fail to write to control pipe.");
Yann Diorcet's avatar
Yann Diorcet committed
467
		pthread_join(mThread, NULL);
468
	}
Simon Morlat's avatar
Simon Morlat committed
469
	mSessions.clear();
470
	mSessionsCount = 0;
Simon Morlat's avatar
Simon Morlat committed
471 472 473 474
	close(mCtlPipe[0]);
	close(mCtlPipe[1]);
}

475 476 477
shared_ptr<RelaySession> MediaRelayServer::createSession(const std::string &frontId,
														 const std::pair<std::string, std::string> &frontRelayIps) {
	shared_ptr<RelaySession> s = make_shared<RelaySession>(this, frontId, frontRelayIps);
Simon Morlat's avatar
Simon Morlat committed
478 479
	mMutex.lock();
	mSessions.push_back(s);
480
	mSessionsCount++;
Simon Morlat's avatar
Simon Morlat committed
481
	mMutex.unlock();
Yann Diorcet's avatar
Yann Diorcet committed
482 483 484
	if (!mRunning)
		start();

485
	LOGD("There are now %zu relay sessions running on MediaRelayServer [%p]", mSessionsCount, this);
Simon Morlat's avatar
Simon Morlat committed
486
	/*write to the control pipe to wakeup the server thread */
487
	update();
Simon Morlat's avatar
Simon Morlat committed
488 489 490
	return s;
}

491 492 493 494 495 496
void MediaRelayServer::update() {
	/*write to the control pipe to wakeup the server thread */
	if (write(mCtlPipe[1], "e", 1) == -1)
		LOGE("MediaRelayServer: fail to write to control pipe.");
}

497
static void set_high_prio() {
498
	struct sched_param param;
499 500
	int policy = SCHED_RR;
	int result = 0;
501
	int max_prio;
502

503
	memset(&param, 0, sizeof(param));
504

505
	max_prio = sched_get_priority_max(policy);
506 507 508
	param.sched_priority = max_prio;
	if ((result = pthread_setschedparam(pthread_self(), policy, &param))) {
		if (result == EPERM) {
509
			/*
510
				The linux kernel has
511 512 513 514
				sched_get_priority_max(SCHED_OTHER)=sched_get_priority_max(SCHED_OTHER)=0.
				As long as we can't use SCHED_RR or SCHED_FIFO, the only way to increase priority of a calling thread
				is to use setpriority().
			*/
515 516 517
			if (setpriority(PRIO_PROCESS, 0, -20) == -1) {
				LOGD("MediaRelayServer setpriority() failed: %s, nevermind.", strerror(errno));
			} else {
518 519
				LOGD("MediaRelayServer priority increased to maximum.");
			}
520 521
		} else
			LOGW("MediaRelayServer: pthread_setschedparam failed: %s", strerror(result));
522
	} else {
523 524
		LOGD("MediaRelayServer: priority set to [%s] and value [%i]", policy == SCHED_FIFO ? "SCHED_FIFO" : "SCHED_RR",
			 param.sched_priority);
525 526 527
	}
}

Yann Diorcet's avatar
Yann Diorcet committed
528
void MediaRelayServer::run() {
529 530
	PollFd pfd(512);
	int ctl_index;
Simon Morlat's avatar
Simon Morlat committed
531
	int err;
532

533
	set_high_prio();
Yann Diorcet's avatar
Yann Diorcet committed
534
	while (mRunning) {
535
		pfd.reset();
536
		// fill the pollfd table
Simon Morlat's avatar
Simon Morlat committed
537
		mMutex.lock();
538
		for (auto it = mSessions.begin(); it != mSessions.end(); ++it) {
539 540
			if ((*it)->isUsed())
				(*it)->fillPollFd(&pfd);
541
		}
Simon Morlat's avatar
Simon Morlat committed
542
		mMutex.unlock();
543

544
		ctl_index = pfd.addFd(mCtlPipe[0], POLLIN);
545

546
		err = poll(pfd.getPfd(), pfd.getCurIndex(), 1000);
Yann Diorcet's avatar
Yann Diorcet committed
547
		if (err > 0) {
548
			// examine pollfd results
549
			if (pfd.getREvents(ctl_index) & POLLIN) {
550
				char tmp;
Yann Diorcet's avatar
Yann Diorcet committed
551
				if (read(mCtlPipe[0], &tmp, 1) == -1) {
552 553
					LOGE("Fail to read from control pipe.");
				}
Simon Morlat's avatar
Simon Morlat committed
554
			}
555
			time_t curtime = getCurrentTime();
556 557 558 559
			mMutex.lock();
			for (auto it = mSessions.begin(); it != mSessions.end();) {
				if (!(*it)->isUsed()) {
					it = mSessions.erase(it);
560 561
					mSessionsCount--;
					LOGD("There are now %i relay sessions running.", (int)mSessionsCount);
562
				} else {
563
					(*it)->checkPollFd(&pfd, curtime);
564
					++it;
Yann Diorcet's avatar
Yann Diorcet committed
565
				}
Simon Morlat's avatar
Simon Morlat committed
566
			}
567
			mMutex.unlock();
Simon Morlat's avatar
Simon Morlat committed
568 569 570 571
		}
	}
}

Yann Diorcet's avatar
Yann Diorcet committed
572
void *MediaRelayServer::threadFunc(void *arg) {
573
	MediaRelayServer *zis = (MediaRelayServer *)arg;
Simon Morlat's avatar
Simon Morlat committed
574 575 576
	zis->run();
	return NULL;
}