forkcontext.cc 16.1 KB
Newer Older
1
/*
2 3
	Flexisip, a flexible SIP proxy server with media capabilities.
	Copyright (C) 2010-2015  Belledonne Communications SARL, All rights reserved.
4

5 6 7 8
	This program is free software: you can redistribute it and/or modify
	it under the terms of the GNU Affero General Public License as
	published by the Free Software Foundation, either version 3 of the
	License, or (at your option) any later version.
9

10 11 12 13
	This program is distributed in the hope that it will be useful,
	but WITHOUT ANY WARRANTY; without even the implied warranty of
	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
	GNU Affero General Public License for more details.
14

15 16
	You should have received a copy of the GNU Affero General Public License
	along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
*/
18

19 20 21
#include <flexisip/agent.hh>
#include <flexisip/forkcontext.hh>
#include <flexisip/registrardb.hh>
22
#include <sofia-sip/sip_status.h>
23

24
using namespace std;
25
using namespace flexisip;
26

27
const int ForkContext::sUrgentCodes[] = {401, 407, 415, 420, 484, 488, 606, 603, 0};
28

29
const int ForkContext::sAllCodesUrgent[] = {-1, 0};
30

31 32
ForkContextListener::~ForkContextListener() {
}
33

34 35
void ForkContext::__timer_callback(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg) {
	(static_cast<ForkContext *>(arg))->processLateTimeout();
36 37
}

38 39
void ForkContext::sOnFinished(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg) {
	(static_cast<ForkContext *>(arg))->onFinished();
40 41
}

42 43 44 45
void ForkContext::sOnNextBanches(su_root_magic_t* magic, su_timer_t* t, su_timer_arg_t* arg) {
	(static_cast<ForkContext *>(arg))->onNextBranches();
}

46
ForkContext::ForkContext(Agent *agent, const shared_ptr<RequestSipEvent> &event, shared_ptr<ForkContextConfig> cfg,
47
						 ForkContextListener *listener)
48
	: mListener(listener), mNextBranchesTimer(NULL), mCurrentPriority(-1), mAgent(agent),
49 50
	  mEvent(make_shared<RequestSipEvent>(event)), // Is this deep copy really necessary ?
	  mCfg(cfg), mLateTimer(NULL), mFinishTimer(NULL) {
Simon Morlat's avatar
Simon Morlat committed
51
	init();
52 53
}

54
void ForkContext::onLateTimeout() {
55 56
}

Simon Morlat's avatar
Simon Morlat committed
57 58
void ForkContext::processLateTimeout() {
	su_timer_destroy(mLateTimer);
59
	mLateTimer = NULL;
Simon Morlat's avatar
Simon Morlat committed
60 61
	onLateTimeout();
	setFinished();
62 63
}

64
struct dest_finder {
65 66 67 68
	dest_finder(const url_t *ctt) {
		cttport = url_port(ctt);
		ctthost = ctt->url_host;
		// don't care about transport
69
	}
70 71 72
	bool operator()(const shared_ptr<BranchInfo> &br) {
		const url_t *dest = br->mRequest->getMsgSip()->getSip()->sip_request->rq_url;
		return 0 == strcmp(url_port(dest), cttport) && 0 == strcmp(dest->url_host, ctthost);
Simon Morlat's avatar
Simon Morlat committed
73
	}
74 75
	const char *ctthost;
	const char *cttport;
Simon Morlat's avatar
Simon Morlat committed
76 77
};

78
struct uid_finder {
79
	uid_finder(const string &uid) : mUid(uid) {
80 81 82
	}
	bool operator()(const shared_ptr<BranchInfo> &br) {
		return mUid == br->mUid;
Simon Morlat's avatar
Simon Morlat committed
83 84 85 86
	}
	const string mUid;
};

87
shared_ptr<BranchInfo> ForkContext::findBranchByUid(const string &uid) {
88
	auto it = find_if(mWaitingBranches.begin(), mWaitingBranches.end(), uid_finder(uid));
89

90
	if (it != mWaitingBranches.end())
91
		return *it;
92

Simon Morlat's avatar
Simon Morlat committed
93 94 95
	return shared_ptr<BranchInfo>();
}

96
shared_ptr<BranchInfo> ForkContext::findBranchByDest(const url_t *dest) {
97
	auto it = find_if(mWaitingBranches.begin(), mWaitingBranches.end(), dest_finder(dest));
98

99
	if (it != mWaitingBranches.end())
100
		return *it;
101

Simon Morlat's avatar
Simon Morlat committed
102 103 104
	return shared_ptr<BranchInfo>();
}

105 106 107
bool ForkContext::isUrgent(int code, const int urgentCodes[]) {
	if (urgentCodes[0] == -1)
		return true; /*everything is urgent*/
108 109

	for (int i = 0; urgentCodes[i] != 0; i++) {
110 111
		if (code == urgentCodes[i])
			return true;
Simon Morlat's avatar
Simon Morlat committed
112
	}
113

Simon Morlat's avatar
Simon Morlat committed
114 115 116
	return false;
}

117 118 119 120
static bool isConsidered(int code, bool ignore503And408){
	return ignore503And408 ? (!(code == 503 || code == 408)) : true;
}

121
shared_ptr<BranchInfo> ForkContext::_findBestBranch(const int urgentCodes[], bool ignore503And408) {
Simon Morlat's avatar
Simon Morlat committed
122
	shared_ptr<BranchInfo> best;
123
	
124 125
	for (const auto& br : mWaitingBranches) {
		int code = br->getStatus();
126
		if (code >= 200 && isConsidered(code, ignore503And408)) {
127
			if (best == NULL) {
128
				best = br;
129
			} else {
130 131
				if (br->getStatus() / 100 < best->getStatus() / 100)
					best = br;
Simon Morlat's avatar
Simon Morlat committed
132 133 134
			}
		}
	}
135

136 137
	if (best == NULL)
		return shared_ptr<BranchInfo>();
138

139
	if (urgentCodes) {
140 141 142
		for (const auto& br : mWaitingBranches) {
			int code = br->getStatus();

143
			if (code > 0  && isConsidered(code, ignore503And408) && isUrgent(code, urgentCodes)) {
144
				best = br;
Simon Morlat's avatar
Simon Morlat committed
145 146 147 148
				break;
			}
		}
	}
149

Simon Morlat's avatar
Simon Morlat committed
150 151 152
	return best;
}

153 154
shared_ptr<BranchInfo> ForkContext::findBestBranch(const int urgentCodes[], bool avoid503And408){
	shared_ptr<BranchInfo> ret;
155 156

	if (avoid503And408 == false)
157
		ret = _findBestBranch(urgentCodes, false);
158
	else {
159
		ret = _findBestBranch(urgentCodes, true);
160 161 162

		if (ret == NULL)
			ret = _findBestBranch(urgentCodes, false);
163
	}
164

165 166 167
	return ret;
}

168
bool ForkContext::allBranchesAnswered(bool ignore_errors_and_timeouts) const {
169 170 171
	for (const auto& br : mWaitingBranches) {
		int code = br->getStatus();

172 173 174 175 176
		if (code < 200)
			return false;
		if ((code == 503 || code == 408) && ignore_errors_and_timeouts)
			return false;
	}
177

178 179 180 181
	return true;
}

bool ForkContext::allCurrentBranchesAnswered(bool ignore_errors_and_timeouts) const {
182 183 184
	for (const auto& br : mCurrentBranches) {
		int code = br->getStatus();

185 186 187 188
		if (code < 200)
			return false;
		if ((code == 503 || code == 408) && ignore_errors_and_timeouts)
			return false;
Simon Morlat's avatar
Simon Morlat committed
189
	}
190

Simon Morlat's avatar
Simon Morlat committed
191 192 193
	return true;
}

194
void ForkContext::removeBranch(const shared_ptr<BranchInfo> &br) {
195 196
	SLOGD << "ForkContext [" << this << "] branch [" << br.get() << "] removed.";

197 198
	mWaitingBranches.remove(br);
	mCurrentBranches.remove(br);
Simon Morlat's avatar
Simon Morlat committed
199 200 201
	br->clear();
}

202
const list<shared_ptr<BranchInfo>> &ForkContext::getBranches() const {
203
	return mWaitingBranches;
Simon Morlat's avatar
Simon Morlat committed
204
}
Simon Morlat's avatar
Simon Morlat committed
205

206 207 208
// this implementation looks for already pending or failed transactions and then rejects handling of a new one that
// would already been tried.
bool ForkContext::onNewRegister(const url_t *url, const string &uid) {
209 210 211 212 213 214 215 216 217 218 219
	shared_ptr<BranchInfo> br, br_by_url;
	
	/*
	 * Check gruu. If the request was targeting a gruu address, the uid of the contact who has just registered shall match.
	 */
	string target_gr;
	if (ModuleToolbox::getUriParameter(mEvent->getSip()->sip_request->rq_url, "gr", target_gr)) {
		if (uid.find(target_gr) == string::npos){ //to compare regardless of < >
			/* This request was targetting a gruu address, but this REGISTER is not coming from our target contact.*/
			return false;
		}
Simon Morlat's avatar
Simon Morlat committed
220
	}
221
	
222
	br = findBranchByUid(uid);
223
	br_by_url = findBranchByDest(url);
224
	if (br) {
225
		int code = br->getStatus();
226 227 228 229 230 231 232
		if (code == 503 || code == 408){
			LOGD("ForkContext %p: onNewRegister(): instance failed to receive the request previously.", this);
			return true;
		} else if (code >= 200) {
			/* 
			 * This instance has already accepted or declined the request.
			 * We should not send it the request again.
233
			 */
234
			LOGD("ForkContext %p: onNewRegister(): instance has already answered the request.", this);
235
			return false;
236 237 238 239 240 241 242 243 244 245 246
		} else {
			/* 
			 * No response, or a provisional response is received. We can cannot conclude on what to do.
			 * The transaction might succeeed in near future, or it might be dead. 
			 * However, if the contact's uri is new, there is a high probability that the client reconnected 
			 * from a new socket, in which case the current branch will receive no response. 
			 */
			if (br_by_url == nullptr){
				LOGD("ForkContext %p: onNewRegister(): instance reconnected.", this);
				return true;
			}
247 248
		}
	}
249 250 251 252 253
	if (br_by_url) {
		LOGD("ForkContext %p: onNewRegister(): pending transaction for this destination.", this);
		return false;
	}
	return true;
Simon Morlat's avatar
Simon Morlat committed
254 255
}

Simon Morlat's avatar
Simon Morlat committed
256 257
void ForkContext::init() {
	mIncoming = mEvent->createIncomingTransaction();
258

259 260 261 262 263 264
	if (mCfg->mForkLate && mLateTimer == NULL) {
		/*this timer is for when outgoing transaction all die prematuraly, we still need to wait that late register
		 * arrive.*/
		mLateTimer = su_timer_create(su_root_task(mAgent->getRoot()), 0);
		su_timer_set_interval(mLateTimer, &ForkContext::__timer_callback, this,
							  (su_duration_t)mCfg->mDeliveryTimeout * (su_duration_t)1000);
265
	}
266 267
}

268
bool compareGreaterBranch(const shared_ptr<BranchInfo> &lhs, const shared_ptr<BranchInfo> &rhs) {
269 270 271
	return lhs->mPriority > rhs->mPriority;
}

272
void ForkContext::addBranch(const shared_ptr<RequestSipEvent> &ev, const shared_ptr<ExtendedContact> &contact) {
273 274
	shared_ptr<OutgoingTransaction> ot = ev->createOutgoingTransaction();
	shared_ptr<BranchInfo> br = createBranchInfo();
275

276
	if (mIncoming && mWaitingBranches.size() == 0) {
277 278 279
		/*for some reason shared_from_this() cannot be invoked within the ForkContext constructor, so we do this
		 * initialization now*/
		mIncoming->setProperty<ForkContext>("ForkContext", shared_from_this());
280
	}
281

282 283
	// unlink the incoming and outgoing transactions which is done by default, since now the forkcontext is managing
	// them.
284
	ev->unlinkTransactions();
285 286 287 288
	br->mRequest = ev;
	br->mTransaction = ot;
	br->mUid = contact->mUniqueId;
	br->mContact = contact;
289
	br->mPriority = contact->mQ;
290

291
	ot->setProperty("BranchInfo", br);
292 293 294
	
	// Clear answered branches with same uid.
	shared_ptr<BranchInfo> oldBr = findBranchByUid(br->mUid);
Simon Morlat's avatar
Simon Morlat committed
295
	if (oldBr && oldBr->getStatus() >= 200){
296 297 298 299
		LOGD("ForkContext [%p]: new fork branch [%p] clears out old branch [%p]", this, br.get(), oldBr.get());
		removeBranch(oldBr);
	}
	
Simon Morlat's avatar
Simon Morlat committed
300
	onNewBranch(br);
301 302 303 304 305 306 307 308 309

	mWaitingBranches.push_back(br);
	mWaitingBranches.sort(compareGreaterBranch);

	if (mCurrentPriority != -1 && mCurrentPriority <= br->mPriority) {
		mCurrentBranches.push_back(br);
		mAgent->injectRequestEvent(br->mRequest);
	}

310
	LOGD("ForkContext [%p]: new fork branch [%p]", this, br.get());
311 312
}

313
shared_ptr<ForkContext> ForkContext::get(const shared_ptr<IncomingTransaction> &tr) {
314 315 316
	return tr->getProperty<ForkContext>("ForkContext");
}

317
shared_ptr<ForkContext> ForkContext::get(const shared_ptr<OutgoingTransaction> &tr) {
318
	shared_ptr<BranchInfo> br = tr->getProperty<BranchInfo>("BranchInfo");
319 320 321
	return br ? br->mForkCtx : shared_ptr<ForkContext>();
}

322
bool ForkContext::processCancel(const shared_ptr<RequestSipEvent> &ev) {
Simon Morlat's avatar
Simon Morlat committed
323
	shared_ptr<IncomingTransaction> transaction = dynamic_pointer_cast<IncomingTransaction>(ev->getIncomingAgent());
324

325 326
	if (transaction && ev->getMsgSip()->getSip()->sip_request->rq_method == sip_method_cancel) {
		shared_ptr<ForkContext> ctx = ForkContext::get(transaction);
327

328
		if (ctx) {
329
			ctx->onCancel(ev);
330

331 332
			if (ctx->shouldFinish())
				ctx->setFinished();
333

334
			// let ev go through all the chain, however it will not be forwarded.
335 336
			return true;
		}
Simon Morlat's avatar
Simon Morlat committed
337
	}
338

Simon Morlat's avatar
Simon Morlat committed
339
	return false;
340 341
}

342
bool ForkContext::processResponse(const shared_ptr<ResponseSipEvent> &ev) {
Simon Morlat's avatar
Simon Morlat committed
343
	shared_ptr<OutgoingTransaction> transaction = dynamic_pointer_cast<OutgoingTransaction>(ev->getOutgoingAgent());
344

Simon Morlat's avatar
Simon Morlat committed
345
	if (transaction != NULL) {
346
		shared_ptr<BranchInfo> binfo = transaction->getProperty<BranchInfo>("BranchInfo");
347

348 349
		if (binfo) {
			auto copyEv = make_shared<ResponseSipEvent>(ev); // make a copy
Simon Morlat's avatar
Simon Morlat committed
350
			copyEv->suspendProcessing();
351 352
			binfo->mLastResponse = copyEv;
			binfo->mForkCtx->onResponse(binfo, copyEv);
353

354
			// the event may go through but it will not be sent*/
Simon Morlat's avatar
Simon Morlat committed
355
			ev->setIncomingAgent(shared_ptr<IncomingAgent>());
356

357 358 359
			if (!copyEv->isSuspended()) {
				// LOGD("A response has been submitted");
				// copyEv has been resubmited, so stop original event.
360
				ev->terminateProcessing();
361 362
			} else {
				// LOGD("The response has been retained");
Simon Morlat's avatar
Simon Morlat committed
363
			}
364 365 366 367 368 369

			if (binfo->mForkCtx->allCurrentBranchesAnswered()) {
				if (binfo->mForkCtx->hasNextBranches())
					binfo->mForkCtx->start();
			}

Simon Morlat's avatar
Simon Morlat committed
370
			return true;
371 372
		} else {
			// LOGD("ForkContext: un-processed response");
Simon Morlat's avatar
Simon Morlat committed
373 374
		}
	}
375

Simon Morlat's avatar
Simon Morlat committed
376
	return false;
377 378
}

379 380 381 382 383
void ForkContext::onNextBranches() {
	if (hasNextBranches())
		start();
}

384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
bool ForkContext::hasNextBranches() {
	if (mCurrentPriority == -1 && !mWaitingBranches.empty())
		return true;

	for(auto& br : mWaitingBranches) {
		if (br->mPriority < mCurrentPriority)
			return true;
	}

	return false;
}

void ForkContext::nextBranches() {
	/* Clear all current branches is there is any */
	mCurrentBranches.clear();

	/* Get next priority value */
401
	if (mCurrentPriority == -1 && !mWaitingBranches.empty()) {
402 403
		mCurrentPriority = mWaitingBranches.front()->mPriority;
	} else {
404
		for(const auto& br : mWaitingBranches) {
405 406 407 408 409 410 411 412
			if (br->mPriority < mCurrentPriority) {
				mCurrentPriority = br->mPriority;
				break;
			}
		}
	}

	/* Stock all wanted branches */
413
	for(const auto& br : mWaitingBranches) {
414 415 416 417 418 419
		if (br->mPriority == mCurrentPriority)
			mCurrentBranches.push_back(br);
	}
}

void ForkContext::start() {
420 421 422 423 424 425
	/* Remove existing timer */
	if (mNextBranchesTimer) {
		su_timer_destroy(mNextBranchesTimer);
		mNextBranchesTimer = NULL;
	}

426 427 428 429 430 431
	/* Prepare branches */
	nextBranches();

	LOGD("Started forking branches with priority [%p]: %f", this, mCurrentPriority);

	/* Start the processing */
432
	for(const auto& br : mCurrentBranches) {
433 434
		mAgent->injectRequestEvent(br->mRequest);
	}
435 436

	if (mCfg->mCurrentBranchesTimeout > 0 && hasNextBranches()) {
437
		/* Start the timer for next branches */
438 439 440
		mNextBranchesTimer = su_timer_create(su_root_task(mAgent->getRoot()), 0);
		su_timer_set_interval(mNextBranchesTimer, &ForkContext::sOnNextBanches, this, (su_duration_t)mCfg->mCurrentBranchesTimeout * (su_duration_t)1000);
	}
441 442
}

443 444 445 446 447
const shared_ptr<RequestSipEvent> &ForkContext::getEvent() {
	return mEvent;
}

ForkContext::~ForkContext() {
448 449
	if (mLateTimer)
		su_timer_destroy(mLateTimer);
450 451 452

	if (mNextBranchesTimer)
		su_timer_destroy(mNextBranchesTimer);
453 454
}

455 456
void ForkContext::onFinished() {
	su_timer_destroy(mFinishTimer);
457
	mFinishTimer = NULL;
458

459
	// force references to be loosed immediately, to avoid circular dependencies.
460 461
	mEvent.reset();
	mIncoming.reset();
462

463 464
	for_each(mWaitingBranches.begin(), mWaitingBranches.end(), mem_fn(&BranchInfo::clear));
	mWaitingBranches.clear();
465

466
	for_each(mCurrentBranches.begin(), mCurrentBranches.end(), mem_fn(&BranchInfo::clear));
467
	mCurrentBranches.clear();
468

469
	mListener->onForkContextFinished(shared_from_this());
470
	mSelf.reset(); // this must be the last thing to do
471 472
}

473 474
void ForkContext::setFinished() {
	if (mFinishTimer) {
475 476 477
		/*already finishing, ignore*/
		return;
	}
478

479
	if (mLateTimer) {
480
		su_timer_destroy(mLateTimer);
481
		mLateTimer = NULL;
482
	}
483 484 485 486 487 488

	if (mNextBranchesTimer) {
		su_timer_destroy(mNextBranchesTimer);
		mNextBranchesTimer = NULL;
	}

489 490
	mSelf = shared_from_this(); // to prevent destruction until finishTimer arrives
	mFinishTimer = su_timer_create(su_root_task(mAgent->getRoot()), 0);
491 492 493 494 495
	su_timer_set_interval(mFinishTimer, &ForkContext::sOnFinished, this, (su_duration_t)0);
}

bool ForkContext::shouldFinish() {
	return true;
496 497
}

498
void ForkContext::onNewBranch(const shared_ptr<BranchInfo> &br) {
Simon Morlat's avatar
Simon Morlat committed
499 500
}

501
void ForkContext::onCancel(const shared_ptr<RequestSipEvent> &ev) {
Simon Morlat's avatar
Simon Morlat committed
502 503
}

504
void ForkContext::addKey(const string &key) {
505
     mKeys.push_back(key);
506 507
}

508
const list<string> &ForkContext::getKeys() const{
509
     return mKeys;
510 511
}

512
shared_ptr<BranchInfo> ForkContext::createBranchInfo() {
Simon Morlat's avatar
Simon Morlat committed
513 514 515
	return make_shared<BranchInfo>(shared_from_this());
}

516 517
// called by implementors to request the forwarding of a response from this branch, regardless of whether it was
// retained previously or not*/
518
shared_ptr<ResponseSipEvent> ForkContext::forwardResponse(const shared_ptr<BranchInfo> &br) {
519 520 521
	if (br->mLastResponse) {
		if (mIncoming) {
			int code = br->mLastResponse->getMsgSip()->getSip()->sip_status->st_status;
Simon Morlat's avatar
Simon Morlat committed
522
			forwardResponse(br->mLastResponse);
523

524
			if (code >= 200) {
Simon Morlat's avatar
Simon Morlat committed
525 526
				br->mTransaction.reset();
			}
527

Simon Morlat's avatar
Simon Morlat committed
528
			return br->mLastResponse;
529 530 531
		} else
			br->mLastResponse->setIncomingAgent(shared_ptr<IncomingAgent>());
	} else {
Simon Morlat's avatar
Simon Morlat committed
532 533
		LOGE("ForkContext::forwardResponse(): no response received on this branch");
	}
534

535
	return shared_ptr<ResponseSipEvent>();
Simon Morlat's avatar
Simon Morlat committed
536 537
}

538
shared_ptr<ResponseSipEvent> ForkContext::forwardResponse(const shared_ptr<ResponseSipEvent> &ev) {
539 540
	if (mIncoming) {
		int code = ev->getMsgSip()->getSip()->sip_status->st_status;
Simon Morlat's avatar
Simon Morlat committed
541
		ev->setIncomingAgent(mIncoming);
542
		mLastResponseSent = ev;
543

544
		if (ev->isSuspended()) {
Simon Morlat's avatar
Simon Morlat committed
545
			mAgent->injectResponseEvent(ev);
546
		} else {
Simon Morlat's avatar
Simon Morlat committed
547 548
			mAgent->sendResponseEvent(ev);
		}
549

550
		if (code >= 200) {
Simon Morlat's avatar
Simon Morlat committed
551
			mIncoming.reset();
552

553
			if (shouldFinish())
554
				setFinished();
Simon Morlat's avatar
Simon Morlat committed
555
		}
556

Simon Morlat's avatar
Simon Morlat committed
557 558
		return ev;
	}
559

560
	return shared_ptr<ResponseSipEvent>();
Simon Morlat's avatar
Simon Morlat committed
561 562
}

563 564 565
int ForkContext::getLastResponseCode() const {
	if (mLastResponseSent)
		return mLastResponseSent->getMsgSip()->getSip()->sip_status->st_status;
566

Simon Morlat's avatar
Simon Morlat committed
567 568 569
	return 0;
}

570 571
void BranchInfo::clear() {
	if (mTransaction) {
Simon Morlat's avatar
Simon Morlat committed
572 573 574
		mTransaction->removeProperty("BranchInfo");
		mTransaction.reset();
	}
575

Simon Morlat's avatar
Simon Morlat committed
576 577 578 579 580
	mRequest.reset();
	mLastResponse.reset();
	mForkCtx.reset();
}

581
BranchInfo::~BranchInfo() {
Simon Morlat's avatar
Simon Morlat committed
582 583
	clear();
}