netsim.c 16.3 KB
Newer Older
1
/*
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
 * The oRTP library is an RTP (Realtime Transport Protocol - rfc3550) implementation with additional features.
 * Copyright (C) 2017 Belledonne Communications SARL
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 */
19 20 21 22 23

#include "ortp/ortp.h"
#include "utils.h"
#include "ortp/rtpsession.h"
#include "rtpsession_priv.h"
24
#include <bctoolbox/port.h>
25

26 27
static void rtp_session_schedule_outbound_network_simulator(RtpSession *session, ortpTimeSpec *sleep_until);

28 29
static OrtpNetworkSimulatorCtx* simulator_ctx_new(void){
	OrtpNetworkSimulatorCtx *ctx=(OrtpNetworkSimulatorCtx*)ortp_malloc0(sizeof(OrtpNetworkSimulatorCtx));
30
	qinit(&ctx->latency_q);
31
	qinit(&ctx->q);
32 33
	qinit(&ctx->send_q);
	ortp_mutex_init(&ctx->mutex,NULL);
34 35 36
	return ctx;
}

37
static void ortp_network_simulator_dump_stats(OrtpNetworkSimulatorCtx *sim) {
38
	int drop_by_flush=sim->latency_q.q_mcount+sim->q.q_mcount;
39
	if (sim->total_count>0){
40
		ortp_message("Network simulation: dump stats. Statistics are:"
41 42 43 44 45 46 47 48
			"%d/%d(%.1f%%, param=%.1f) packets dropped by loss, "
			"%d/%d(%.1f%%) packets dropped by congestion, "
			"%d/%d(%.1f%%) packets flushed."
			, sim->drop_by_loss, sim->total_count, sim->drop_by_loss*100.f/sim->total_count, sim->params.loss_rate
			, sim->drop_by_congestion, sim->total_count, sim->drop_by_congestion*100.f/sim->total_count
			, drop_by_flush, sim->total_count, drop_by_flush*100.f/sim->total_count
		);
	}
49 50 51
}
void ortp_network_simulator_destroy(OrtpNetworkSimulatorCtx *sim){
	ortp_network_simulator_dump_stats(sim);
52
	flushq(&sim->latency_q,0);
53
	flushq(&sim->q,0);
54 55 56 57 58 59
	flushq(&sim->send_q,0);
	if (sim->thread_started){
		sim->thread_started=FALSE;
		ortp_thread_join(sim->thread, NULL);
	}
	ortp_mutex_destroy(&sim->mutex);
60 61 62
	ortp_free(sim);
}

63
#ifndef _WIN32
Simon Morlat's avatar
Simon Morlat committed
64 65 66 67 68 69 70 71 72 73
static const char *sched_policy_to_string(int policy){
	switch(policy){
		case SCHED_OTHER: return "SCHED_OTHER";
		case SCHED_RR: return "SCHED_RR";
		case SCHED_FIFO: return "SCHED_FIFO";
	}
	return "SCHED_INVALID";
}
#endif

74
static void set_high_prio(void){
75
#ifndef _WIN32
76 77 78 79 80 81
	const char *sched_pref=getenv("ORTP_SIMULATOR_SCHED_POLICY");
	int policy=SCHED_OTHER;
	struct sched_param param;
	int result=0;
	char* env_prio_c=NULL;
	int min_prio, max_prio, env_prio;
82

83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
	if (sched_pref && strcasecmp(sched_pref,"SCHED_RR")==0){
		policy=SCHED_RR;
	}else if (sched_pref && strcasecmp(sched_pref,"SCHED_FIFO")==0){
		policy=SCHED_FIFO;
	}

	memset(&param,0,sizeof(param));

	min_prio = sched_get_priority_min(policy);
	max_prio = sched_get_priority_max(policy);
	env_prio_c = getenv("ORTP_SIMULATOR_SCHED_PRIO");

	env_prio = (env_prio_c == NULL)?max_prio:atoi(env_prio_c);

	env_prio = MAX(MIN(env_prio, max_prio), min_prio);

	param.sched_priority=env_prio;
	if((result=pthread_setschedparam(pthread_self(),policy, &param))) {
		ortp_warning("Ortp simulator: set pthread_setschedparam failed: %s",strerror(result));
	} else {
Simon Morlat's avatar
Simon Morlat committed
103 104
		ortp_message("ortp network simulator: sched policy set to %s and priority value (%i)",
				sched_policy_to_string(policy), param.sched_priority);
105 106 107 108 109 110 111 112 113
	}
#endif
}

static void * outboud_simulator_thread(void *ctx){
	RtpSession *session=(RtpSession*)ctx;
	OrtpNetworkSimulatorCtx *sim=session->net_sim_ctx;
	ortpTimeSpec sleep_until;
	set_high_prio();
114

115 116 117 118
	while(sim->thread_started){
		sleep_until.tv_sec=0;
		sleep_until.tv_nsec=0;
		rtp_session_schedule_outbound_network_simulator(session, &sleep_until);
119
		if (sleep_until.tv_sec!=0) ortp_sleep_until(&sleep_until);
120 121 122 123 124
		else ortp_sleep_ms(1);
	}
	return NULL;
}

125
const char *ortp_network_simulator_mode_to_string(OrtpNetworkSimulatorMode mode){
126 127 128 129 130 131 132
	switch(mode){
		case OrtpNetworkSimulatorInbound:
			return "Inbound";
		case OrtpNetworkSimulatorOutbound:
			return "Outbound";
		case OrtpNetworkSimulatorOutboundControlled:
			return "OutboundControlled";
133 134
		case OrtpNetworkSimulatorInvalid:
			return "Invalid";
135 136 137 138
	}
	return "invalid";
}

139 140 141 142 143 144 145
OrtpNetworkSimulatorMode ortp_network_simulator_mode_from_string(const char *str){
	if (strcasecmp(str,"Inbound")==0) return OrtpNetworkSimulatorInbound;
	if (strcasecmp(str,"Outbound")==0) return OrtpNetworkSimulatorOutbound;
	if (strcasecmp(str,"OutboundControlled")==0) return OrtpNetworkSimulatorOutboundControlled;
	return OrtpNetworkSimulatorInvalid;
}

146 147 148
void rtp_session_enable_network_simulation(RtpSession *session, const OrtpNetworkSimulatorParams *params){
	OrtpNetworkSimulatorCtx *sim=session->net_sim_ctx;
	if (params->enabled){
149
		if (sim==NULL) {
150
			sim=simulator_ctx_new();
151 152 153
		} else {
			ortp_network_simulator_dump_stats(sim);
		}
154
		sim->drop_by_congestion=sim->drop_by_loss=sim->total_count=0;
155
		sim->params=*params;
156
		if (sim->params.jitter_burst_density>0 && sim->params.jitter_strength>0 && sim->params.max_bandwidth==0){
157 158 159 160
			sim->params.max_bandwidth=1024000;
			ortp_message("Network simulation: jitter requested but max_bandwidth is not set. Using default value of %f bits/s.",
				sim->params.max_bandwidth);
		}
jehan's avatar
jehan committed
161
		if (sim->params.max_bandwidth && sim->params.max_buffer_size==0) {
162
			sim->params.max_buffer_size=(int)sim->params.max_bandwidth;
163
			ortp_message("Network simulation: Max buffer size not set for RTP session [%p], using [%i]",session,sim->params.max_buffer_size);
jehan's avatar
jehan committed
164
		}
165
		session->net_sim_ctx=sim;
166
		if ((params->mode==OrtpNetworkSimulatorOutbound || params->mode==OrtpNetworkSimulatorOutboundControlled) && !sim->thread_started){
167 168 169
			sim->thread_started=TRUE;
			ortp_thread_create(&sim->thread, NULL, outboud_simulator_thread, session);
		}
170

171
		ortp_message("Network simulation: enabled with the following parameters:\n"
172
				"\tlatency=%d\n"
173 174 175
				"\tloss_rate=%.1f\n"
				"\tconsecutive_loss_probability=%.1f\n"
				"\tmax_bandwidth=%.1f\n"
176
				"\tmax_buffer_size=%d\n"
177
				"\tjitter_density=%.1f\n"
178
				"\tjitter_strength=%.1f\n"
179
				"\tmode=%s",
180 181 182 183 184
				params->latency,
				params->loss_rate,
				params->consecutive_loss_probability,
				params->max_bandwidth,
				params->max_buffer_size,
185
				params->jitter_burst_density,
186
				params->jitter_strength,
187
				ortp_network_simulator_mode_to_string(params->mode)
188
    			);
189 190
	}else{
		session->net_sim_ctx=NULL;
Ghislain MARY's avatar
Ghislain MARY committed
191
		ortp_message("rtp_session_enable_network_simulation:DISABLING NETWORK SIMULATION");
192
		if (sim!=NULL) ortp_network_simulator_destroy(sim);
193 194 195 196 197 198 199
	}
}

static int64_t elapsed_us(struct timeval *tv1, struct timeval *tv2){
	return ((tv2->tv_sec-tv1->tv_sec)*1000000LL)+((tv2->tv_usec-tv1->tv_usec));
}

200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
static mblk_t * simulate_latency(RtpSession *session, mblk_t *input){
	OrtpNetworkSimulatorCtx *sim=session->net_sim_ctx;
	struct timeval current;
	mblk_t *output=NULL;
	uint32_t current_ts;
	ortp_gettimeofday(&current,NULL);
	/*since we must store expiration date in reserved2(32bits) only(reserved1
	already used), we need to reduce time stamp to milliseconds only*/
	current_ts = 1000*current.tv_sec + current.tv_usec/1000;

	/*queue the packet - store expiration timestamps in reserved fields*/
	if (input){
		input->reserved2 = current_ts + sim->params.latency;
		putq(&sim->latency_q,input);
	}

	if ((output=peekq(&sim->latency_q))!=NULL){
		if (TIME_IS_NEWER_THAN(current_ts, output->reserved2)){
			output->reserved2=0;
			getq(&sim->latency_q);
			/*return the first dequeued packet*/
			return output;
		}
	}

	return NULL;
}

228
static int simulate_jitter_by_bit_budget_reduction(OrtpNetworkSimulatorCtx *sim, int budget_increase){
229
	unsigned int r=ortp_random()%1000;
230
	float threshold,score;
231
	int budget_adjust=0;
232
	uint64_t now=ortp_get_cur_time_ms();
233

234 235 236
	if (sim->last_jitter_event==0){
		sim->last_jitter_event=ortp_get_cur_time_ms();
	}
237

238
	if (sim->in_jitter_event){
239 240 241
		threshold=100;
		score=(float)r;
	}else{
242
		score=1000.0f*(float)r*(now-sim->last_jitter_event)*sim->params.jitter_burst_density*1e-6f;
243 244
		threshold=500;
	}
245
	if (score>(int)threshold){
246
		int64_t strength_rand=(int64_t)(sim->params.jitter_strength * (float)(ortp_random()%1000));
247
		sim->in_jitter_event=TRUE;
248
		budget_adjust=(int)-((int64_t)budget_increase*strength_rand/1000LL);
249
		/*ortp_message("jitter in progress... bit_budget_adjustement=%i, bit_budget=%i",budget_adjust,sim->bit_budget);*/
250
	}else if (sim->in_jitter_event){
251
		/*ortp_message("jitter ended.");*/
252
		sim->in_jitter_event=FALSE;
253
		sim->last_jitter_event=ortp_get_cur_time_ms();
254 255 256 257 258
	}
	return budget_adjust;
}

static mblk_t *simulate_bandwidth_limit_and_jitter(RtpSession *session, mblk_t *input){
259 260 261 262
	OrtpNetworkSimulatorCtx *sim=session->net_sim_ctx;
	struct timeval current;
	int64_t elapsed;
	int bits;
263
	int budget_increase;
264
	mblk_t *output=NULL;
265
	int overhead=(session->rtp.gs.sockfamily==AF_INET6) ? IP6_UDP_OVERHEAD : IP_UDP_OVERHEAD;
266

267
	ortp_gettimeofday(&current,NULL);
268

269 270
	if (sim->last_check.tv_sec==0){
		sim->last_check=current;
Simon Morlat's avatar
Simon Morlat committed
271
		sim->bit_budget=0;
272 273 274
	}
	/*update the budget */
	elapsed=elapsed_us(&sim->last_check,&current);
275
	budget_increase=(int)((elapsed*(int64_t)sim->params.max_bandwidth)/1000000LL);
276 277
	sim->bit_budget+=budget_increase;
	sim->bit_budget+=simulate_jitter_by_bit_budget_reduction(sim,budget_increase);
278 279 280 281
	sim->last_check=current;
	/* queue the packet for sending*/
	if (input){
		putq(&sim->q,input);
282
		bits=((int)msgdsize(input)+overhead)*8;
283 284 285
		sim->qsize+=bits;
	}
	/*flow control*/
jehan's avatar
jehan committed
286
	while (sim->qsize>=sim->params.max_buffer_size){
287
		// ortp_message("rtp_session_network_simulate(): discarding packets.");
288 289
		output=getq(&sim->q);
		if (output){
290
			bits=((int)msgdsize(output)+overhead)*8;
291
			sim->qsize-=bits;
292
			sim->drop_by_congestion++;
293
			freemsg(output);
294 295
		}
	}
296

297
	output=NULL;
298

299 300 301 302
	/*see if we can output a packet*/
	if (sim->bit_budget>=0){
		output=getq(&sim->q);
		if (output){
303
			bits=((int)msgdsize(output)+overhead)*8;
304 305 306 307
			sim->bit_budget-=bits;
			sim->qsize-=bits;
		}
	}
Simon Morlat's avatar
Simon Morlat committed
308 309 310 311
	if (output==NULL && input==NULL && sim->bit_budget>=0){
		/* unused budget is lost...*/
		sim->last_check.tv_sec=0;
	}
312 313 314
	return output;
}

315
static mblk_t *simulate_loss_rate(OrtpNetworkSimulatorCtx *net_sim_ctx, mblk_t *input){
316
	int rrate;
317
	float loss_rate=net_sim_ctx->params.loss_rate*10.0f;
318

319
	/*in order to simulate bursts of dropped packets, take into account a different probability after a loss occurred*/
320
	if (net_sim_ctx->consecutive_drops>0){
321
		loss_rate=net_sim_ctx->params.consecutive_loss_probability*1000.0f;
322
	}
323

324 325
	rrate = ortp_random() % 1000;

326
	if (rrate >= loss_rate) {
327 328
		if (net_sim_ctx->consecutive_drops){
			/*after a burst of lost packets*/
329
			net_sim_ctx->drops_to_ignore=net_sim_ctx->consecutive_drops - (int)(((float)net_sim_ctx->consecutive_drops*net_sim_ctx->params.loss_rate)/100.0f);
330 331
			net_sim_ctx->consecutive_drops=0;
		}
332 333 334 335
		return input;
	}
	if (net_sim_ctx->drops_to_ignore>0){
		net_sim_ctx->drops_to_ignore--;
Yann Diorcet's avatar
Yann Diorcet committed
336 337
		return input;
	}
338 339 340
	if (net_sim_ctx->params.consecutive_loss_probability>0){
		net_sim_ctx->consecutive_drops++;
	}
341
	net_sim_ctx->drop_by_loss++;
342
	freemsg(input);
Yann Diorcet's avatar
Yann Diorcet committed
343 344 345
	return NULL;
}

346
mblk_t * rtp_session_network_simulate(RtpSession *session, mblk_t *input, bool_t *is_rtp_packet){
347 348
	OrtpNetworkSimulatorCtx *sim=session->net_sim_ctx;
	mblk_t *om=NULL;
349

350
	om=input;
351

Ghislain MARY's avatar
Ghislain MARY committed
352
	/*while packet is stored in network simulator queue, keep its type in reserved1 space*/
353
	if (om != NULL){
354
		sim->total_count++;
355 356 357 358 359
		om->reserved1 = *is_rtp_packet;
	}

	if (sim->params.latency>0){
		om=simulate_latency(session,om);
360 361
	}

Ghislain MARY's avatar
Ghislain MARY committed
362
	if ((sim->params.loss_rate > 0) && (om != NULL)) {
Simon Morlat's avatar
Simon Morlat committed
363
		if (sim->params.rtp_only == TRUE) {
Ghislain MARY's avatar
Ghislain MARY committed
364 365 366 367 368 369
			if (*is_rtp_packet == TRUE) {
				om = simulate_loss_rate(sim, om);
			}
		} else {
			om = simulate_loss_rate(sim, om);
		}
370
	}
Ghislain MARY's avatar
Ghislain MARY committed
371

372 373 374 375
	if (sim->params.max_bandwidth>0){
		om=simulate_bandwidth_limit_and_jitter(session,om);
	}

376 377 378 379 380 381
	/*finally when releasing the packet from the simulator, reset the reserved1 space to default,
	since it will be used by mediastreamer later*/
	if (om != NULL){
		*is_rtp_packet = om->reserved1;
		om->reserved1 = 0;
	}
382 383 384
	return om;
}

385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
/*compares two ortpTimeSpec s1 and s2.
 * returns a negative value if s1 is earlier than s2, 0 if they are equal, a positive value if s1 is later than s2*/
int ortp_timespec_compare(const ortpTimeSpec *s1, const ortpTimeSpec *s2){
	int64_t secdiff = s1->tv_sec - s2->tv_sec;
	if (secdiff == 0){
		int64_t nsec_diff = s1->tv_nsec - s2->tv_nsec;
		if (nsec_diff < 0){
			return -1;
		}else if (nsec_diff > 0){
			return 1;
		}else return 0;
	}else if (secdiff < 0){
		return -1;
	}else
		return 1;
}

402 403 404 405 406
static mblk_t * rtp_session_netsim_find_next_packet_to_send(RtpSession *session){
	mblk_t *om;
	ortpTimeSpec min_packet_time = { 0, 0};
	ortpTimeSpec packet_time;
	mblk_t *next_packet = NULL;
407

408 409 410 411 412 413 414
	for(om = qbegin(&session->net_sim_ctx->send_q); !qend(&session->net_sim_ctx->send_q, om); om = qnext(&session->net_sim_ctx->send_q, om)){
		packet_time.tv_sec=om->timestamp.tv_sec;
		packet_time.tv_nsec=om->timestamp.tv_usec*1000LL;
		if (packet_time.tv_sec == 0 && packet_time.tv_nsec == 0){
			/*this is a packet to drop*/
			return om;
		}
415
		if (min_packet_time.tv_sec == 0 || ortp_timespec_compare(&packet_time, &min_packet_time) < 0){
416 417 418 419 420 421 422
			min_packet_time = packet_time;
			next_packet = om;
		}
	}
	return next_packet;
}

423 424 425 426
static void rtp_session_schedule_outbound_network_simulator(RtpSession *session, ortpTimeSpec *sleep_until){
	mblk_t *om;
	int count=0;
	bool_t is_rtp_packet;
427

428 429
	if (!session->net_sim_ctx)
		return;
430

431 432
	if (!session->net_sim_ctx->params.enabled)
		return;
433

434 435 436 437 438 439 440
	if (session->net_sim_ctx->params.mode==OrtpNetworkSimulatorOutbound){
		sleep_until->tv_sec=0;
		sleep_until->tv_nsec=0;
		ortp_mutex_lock(&session->net_sim_ctx->mutex);
		while((om=getq(&session->net_sim_ctx->send_q))!=NULL){
			count++;
			ortp_mutex_unlock(&session->net_sim_ctx->mutex);
441
			is_rtp_packet=om->reserved1; /*it was set by rtp_session_sendto()*/
442 443
			om=rtp_session_network_simulate(session,om, &is_rtp_packet);
			if (om){
Simon Morlat's avatar
Simon Morlat committed
444
				_ortp_sendto(rtp_session_get_socket(session, is_rtp_packet), om, 0, (struct sockaddr*)&om->net_addr, om->net_addrlen);
445 446 447 448 449 450 451 452 453 454
				freemsg(om);
			}
			ortp_mutex_lock(&session->net_sim_ctx->mutex);
		}
		ortp_mutex_unlock(&session->net_sim_ctx->mutex);
		if (count==0){
			/*even if no packets were queued, we have to schedule the simulator*/
			is_rtp_packet=TRUE;
			om=rtp_session_network_simulate(session,NULL, &is_rtp_packet);
			if (om){
Simon Morlat's avatar
Simon Morlat committed
455
				_ortp_sendto(rtp_session_get_socket(session, is_rtp_packet), om, 0, (struct sockaddr*)&om->net_addr, om->net_addrlen);
456 457 458 459
				freemsg(om);
			}
		}
	}else if (session->net_sim_ctx->params.mode==OrtpNetworkSimulatorOutboundControlled){
460
		ortpTimeSpec current={0};
461 462
		ortpTimeSpec packet_time;
		mblk_t *todrop=NULL;
463

464
		ortp_mutex_lock(&session->net_sim_ctx->mutex);
465
		while((om = rtp_session_netsim_find_next_packet_to_send(session)) != NULL){
466
			is_rtp_packet=om->reserved1; /*it was set by rtp_session_sendto()*/
467 468 469
			ortp_mutex_unlock(&session->net_sim_ctx->mutex);
			if (todrop) {
				freemsg(todrop); /*free the last message while the mutex is not held*/
470
				todrop = NULL;
471
			}
472
			_ortp_get_cur_time(&current,TRUE);
473 474
			packet_time.tv_sec=om->timestamp.tv_sec;
			packet_time.tv_nsec=om->timestamp.tv_usec*1000LL;
475 476
			if (is_rtp_packet && om->timestamp.tv_sec==0 && om->timestamp.tv_usec==0){
				todrop = om; /*simulate a packet loss, only RTP packets can be dropped. Timestamp is not set for RTCP packets*/
477 478
			}else if (ortp_timespec_compare(&packet_time, &current) <= 0){
				/*it is time to send this packet*/
479
				
480
				_ortp_sendto(is_rtp_packet ? session->rtp.gs.socket : session->rtcp.gs.socket, om, 0, (struct sockaddr*)&om->net_addr, om->net_addrlen);
481
				todrop = om;
482 483 484
			}else {
				/*no packet is to be sent yet; set the time at which we want to be called*/
				*sleep_until=packet_time;
485
				ortp_mutex_lock(&session->net_sim_ctx->mutex);
486
				break;
487 488
			}
			ortp_mutex_lock(&session->net_sim_ctx->mutex);
489
			if (todrop) remq(&session->net_sim_ctx->send_q, todrop); /* remove the message while the mutex is held*/
490 491 492 493
		}
		ortp_mutex_unlock(&session->net_sim_ctx->mutex);
		if (todrop) freemsg(todrop);
		if (sleep_until->tv_sec==0){
494 495
			_ortp_get_cur_time(&current,TRUE);
			/*no pending packet in the queue yet, schedule a wake up not too far*/
496 497 498 499 500 501
			sleep_until->tv_sec=current.tv_sec;
			sleep_until->tv_nsec=current.tv_nsec+1000000LL; /*in 1 ms*/
		}
	}
}