netsim.c 14.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
/*
  The oRTP library is an RTP (Realtime Transport Protocol - rfc3550) stack.
  Copyright (C) 2011 Belledonne Communications SARL
  Author: Simon MORLAT simon.morlat@linphone.org

  This library is free software; you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public
  License as published by the Free Software Foundation; either
  version 2.1 of the License, or (at your option) any later version.

  This library is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  Lesser General Public License for more details.

  You should have received a copy of the GNU Lesser General Public
  License along with this library; if not, write to the Free Software
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
*/

#include "ortp/ortp.h"
#include "utils.h"
#include "ortp/rtpsession.h"
#include "rtpsession_priv.h"

26 27
static void rtp_session_schedule_outbound_network_simulator(RtpSession *session, ortpTimeSpec *sleep_until);

28 29
static OrtpNetworkSimulatorCtx* simulator_ctx_new(void){
	OrtpNetworkSimulatorCtx *ctx=(OrtpNetworkSimulatorCtx*)ortp_malloc0(sizeof(OrtpNetworkSimulatorCtx));
30
	qinit(&ctx->latency_q);
31
	qinit(&ctx->q);
32 33
	qinit(&ctx->send_q);
	ortp_mutex_init(&ctx->mutex,NULL);
34 35 36 37
	return ctx;
}

void ortp_network_simulator_destroy(OrtpNetworkSimulatorCtx *sim){
38
	int drop_by_flush=sim->latency_q.q_mcount+sim->q.q_mcount;
39 40 41 42 43 44 45 46 47 48
	if (sim->total_count>0){
		ortp_message("Network simulation: destroyed. Statistics are:"
			"%d/%d(%.1f%%, param=%.1f) packets dropped by loss, "
			"%d/%d(%.1f%%) packets dropped by congestion, "
			"%d/%d(%.1f%%) packets flushed."
			, sim->drop_by_loss, sim->total_count, sim->drop_by_loss*100.f/sim->total_count, sim->params.loss_rate
			, sim->drop_by_congestion, sim->total_count, sim->drop_by_congestion*100.f/sim->total_count
			, drop_by_flush, sim->total_count, drop_by_flush*100.f/sim->total_count
		);
	}
49
	flushq(&sim->latency_q,0);
50
	flushq(&sim->q,0);
51 52 53 54 55 56
	flushq(&sim->send_q,0);
	if (sim->thread_started){
		sim->thread_started=FALSE;
		ortp_thread_join(sim->thread, NULL);
	}
	ortp_mutex_destroy(&sim->mutex);
57 58 59
	ortp_free(sim);
}

60
#ifndef _WIN32
Simon Morlat's avatar
Simon Morlat committed
61 62 63 64 65 66 67 68 69 70
static const char *sched_policy_to_string(int policy){
	switch(policy){
		case SCHED_OTHER: return "SCHED_OTHER";
		case SCHED_RR: return "SCHED_RR";
		case SCHED_FIFO: return "SCHED_FIFO";
	}
	return "SCHED_INVALID";
}
#endif

71
static void set_high_prio(){
72
#ifndef _WIN32
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
	const char *sched_pref=getenv("ORTP_SIMULATOR_SCHED_POLICY");
	int policy=SCHED_OTHER;
	struct sched_param param;
	int result=0;
	char* env_prio_c=NULL;
	int min_prio, max_prio, env_prio;
		
	if (sched_pref && strcasecmp(sched_pref,"SCHED_RR")==0){
		policy=SCHED_RR;
	}else if (sched_pref && strcasecmp(sched_pref,"SCHED_FIFO")==0){
		policy=SCHED_FIFO;
	}

	memset(&param,0,sizeof(param));

	min_prio = sched_get_priority_min(policy);
	max_prio = sched_get_priority_max(policy);
	env_prio_c = getenv("ORTP_SIMULATOR_SCHED_PRIO");

	env_prio = (env_prio_c == NULL)?max_prio:atoi(env_prio_c);

	env_prio = MAX(MIN(env_prio, max_prio), min_prio);

	param.sched_priority=env_prio;
	if((result=pthread_setschedparam(pthread_self(),policy, &param))) {
		ortp_warning("Ortp simulator: set pthread_setschedparam failed: %s",strerror(result));
	} else {
Simon Morlat's avatar
Simon Morlat committed
100 101
		ortp_message("ortp network simulator: sched policy set to %s and priority value (%i)",
				sched_policy_to_string(policy), param.sched_priority);
102 103 104 105 106 107 108 109 110 111 112 113 114 115
	}
#endif
}

static void * outboud_simulator_thread(void *ctx){
	RtpSession *session=(RtpSession*)ctx;
	OrtpNetworkSimulatorCtx *sim=session->net_sim_ctx;
	ortpTimeSpec sleep_until;
	set_high_prio();
	
	while(sim->thread_started){
		sleep_until.tv_sec=0;
		sleep_until.tv_nsec=0;
		rtp_session_schedule_outbound_network_simulator(session, &sleep_until);
116
		if (sleep_until.tv_sec!=0) ortp_sleep_until(&sleep_until);
117 118 119 120 121
		else ortp_sleep_ms(1);
	}
	return NULL;
}

122
const char *ortp_network_simulator_mode_to_string(OrtpNetworkSimulatorMode mode){
123 124 125 126 127 128 129
	switch(mode){
		case OrtpNetworkSimulatorInbound:
			return "Inbound";
		case OrtpNetworkSimulatorOutbound:
			return "Outbound";
		case OrtpNetworkSimulatorOutboundControlled:
			return "OutboundControlled";
130 131
		case OrtpNetworkSimulatorInvalid:
			return "Invalid";
132 133 134 135
	}
	return "invalid";
}

136 137 138 139 140 141 142
OrtpNetworkSimulatorMode ortp_network_simulator_mode_from_string(const char *str){
	if (strcasecmp(str,"Inbound")==0) return OrtpNetworkSimulatorInbound;
	if (strcasecmp(str,"Outbound")==0) return OrtpNetworkSimulatorOutbound;
	if (strcasecmp(str,"OutboundControlled")==0) return OrtpNetworkSimulatorOutboundControlled;
	return OrtpNetworkSimulatorInvalid;
}

143 144 145
void rtp_session_enable_network_simulation(RtpSession *session, const OrtpNetworkSimulatorParams *params){
	OrtpNetworkSimulatorCtx *sim=session->net_sim_ctx;
	if (params->enabled){
146 147
		if (sim==NULL)
			sim=simulator_ctx_new();
148
		sim->drop_by_congestion=sim->drop_by_loss=sim->total_count=0;
149
		sim->params=*params;
150
		if (sim->params.jitter_burst_density>0 && sim->params.jitter_strength>0 && sim->params.max_bandwidth==0){
151 152 153 154
			sim->params.max_bandwidth=1024000;
			ortp_message("Network simulation: jitter requested but max_bandwidth is not set. Using default value of %f bits/s.",
				sim->params.max_bandwidth);
		}
jehan's avatar
jehan committed
155 156
		if (sim->params.max_bandwidth && sim->params.max_buffer_size==0) {
			sim->params.max_buffer_size=sim->params.max_bandwidth;
157
			ortp_message("Network simulation: Max buffer size not set for RTP session [%p], using [%i]",session,sim->params.max_buffer_size);
jehan's avatar
jehan committed
158
		}
159
		session->net_sim_ctx=sim;
160
		if ((params->mode==OrtpNetworkSimulatorOutbound || params->mode==OrtpNetworkSimulatorOutboundControlled) && !sim->thread_started){
161 162 163
			sim->thread_started=TRUE;
			ortp_thread_create(&sim->thread, NULL, outboud_simulator_thread, session);
		}
164

165
		ortp_message("Network simulation: enabled with the following parameters:\n"
166
				"\tlatency=%d\n"
167 168 169
				"\tloss_rate=%.1f\n"
				"\tconsecutive_loss_probability=%.1f\n"
				"\tmax_bandwidth=%.1f\n"
170
				"\tmax_buffer_size=%d\n"
171
				"\tjitter_density=%.1f\n"
172 173
				"\tjitter_strength=%.1f\n"
				"\tmode=%s\n",
174 175 176 177 178
				params->latency,
				params->loss_rate,
				params->consecutive_loss_probability,
				params->max_bandwidth,
				params->max_buffer_size,
179
				params->jitter_burst_density,
180
				params->jitter_strength,
181
				ortp_network_simulator_mode_to_string(params->mode)
182
    			);
183 184
	}else{
		session->net_sim_ctx=NULL;
185
		if (sim!=NULL) ortp_network_simulator_destroy(sim);
186 187 188 189 190 191 192
	}
}

static int64_t elapsed_us(struct timeval *tv1, struct timeval *tv2){
	return ((tv2->tv_sec-tv1->tv_sec)*1000000LL)+((tv2->tv_usec-tv1->tv_usec));
}

193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
static mblk_t * simulate_latency(RtpSession *session, mblk_t *input){
	OrtpNetworkSimulatorCtx *sim=session->net_sim_ctx;
	struct timeval current;
	mblk_t *output=NULL;
	uint32_t current_ts;
	ortp_gettimeofday(&current,NULL);
	/*since we must store expiration date in reserved2(32bits) only(reserved1
	already used), we need to reduce time stamp to milliseconds only*/
	current_ts = 1000*current.tv_sec + current.tv_usec/1000;

	/*queue the packet - store expiration timestamps in reserved fields*/
	if (input){
		input->reserved2 = current_ts + sim->params.latency;
		putq(&sim->latency_q,input);
	}

	if ((output=peekq(&sim->latency_q))!=NULL){
		if (TIME_IS_NEWER_THAN(current_ts, output->reserved2)){
			output->reserved2=0;
			getq(&sim->latency_q);
			/*return the first dequeued packet*/
			return output;
		}
	}

	return NULL;
}

221
static int simulate_jitter_by_bit_budget_reduction(OrtpNetworkSimulatorCtx *sim, int budget_increase){
222
	unsigned int r=ortp_random()%1000;
223
	float threshold,score;
224
	int budget_adjust=0;
225
	uint64_t now=ortp_get_cur_time_ms();
226

227 228 229
	if (sim->last_jitter_event==0){
		sim->last_jitter_event=ortp_get_cur_time_ms();
	}
230

231
	if (sim->in_jitter_event){
232 233 234 235
		threshold=100;
		score=(float)r;
	}else{
		score=1000.0*(float)r*(now-sim->last_jitter_event)*sim->params.jitter_burst_density*1e-6;
236 237
		threshold=500;
	}
238 239
	if (score>(int)threshold){
		int64_t strength_rand=sim->params.jitter_strength * (float)(ortp_random()%1000);
240
		sim->in_jitter_event=TRUE;
241 242
		budget_adjust=-((int64_t)budget_increase*strength_rand/1000LL);
		/*ortp_message("jitter in progress... bit_budget_adjustement=%i, bit_budget=%i",budget_adjust,sim->bit_budget);*/
243
	}else if (sim->in_jitter_event){
244
		/*ortp_message("jitter ended.");*/
245
		sim->in_jitter_event=FALSE;
246
		sim->last_jitter_event=ortp_get_cur_time_ms();
247 248 249 250 251
	}
	return budget_adjust;
}

static mblk_t *simulate_bandwidth_limit_and_jitter(RtpSession *session, mblk_t *input){
252 253 254 255
	OrtpNetworkSimulatorCtx *sim=session->net_sim_ctx;
	struct timeval current;
	int64_t elapsed;
	int bits;
256
	int budget_increase;
257
	mblk_t *output=NULL;
258
	int overhead=(session->rtp.gs.sockfamily==AF_INET6) ? IP6_UDP_OVERHEAD : IP_UDP_OVERHEAD;
259

260
	ortp_gettimeofday(&current,NULL);
261

262 263
	if (sim->last_check.tv_sec==0){
		sim->last_check=current;
Simon Morlat's avatar
Simon Morlat committed
264
		sim->bit_budget=0;
265 266 267
	}
	/*update the budget */
	elapsed=elapsed_us(&sim->last_check,&current);
268 269 270
	budget_increase=(elapsed*(int64_t)sim->params.max_bandwidth)/1000000LL;
	sim->bit_budget+=budget_increase;
	sim->bit_budget+=simulate_jitter_by_bit_budget_reduction(sim,budget_increase);
271 272 273 274 275 276 277 278
	sim->last_check=current;
	/* queue the packet for sending*/
	if (input){
		putq(&sim->q,input);
		bits=(msgdsize(input)+overhead)*8;
		sim->qsize+=bits;
	}
	/*flow control*/
jehan's avatar
jehan committed
279
	while (sim->qsize>=sim->params.max_buffer_size){
280
		// ortp_message("rtp_session_network_simulate(): discarding packets.");
281 282 283 284
		output=getq(&sim->q);
		if (output){
			bits=(msgdsize(output)+overhead)*8;
			sim->qsize-=bits;
285
			sim->drop_by_congestion++;
286
			freemsg(output);
287 288
		}
	}
289

290
	output=NULL;
291

292 293 294 295 296 297 298 299 300
	/*see if we can output a packet*/
	if (sim->bit_budget>=0){
		output=getq(&sim->q);
		if (output){
			bits=(msgdsize(output)+overhead)*8;
			sim->bit_budget-=bits;
			sim->qsize-=bits;
		}
	}
Simon Morlat's avatar
Simon Morlat committed
301 302 303 304
	if (output==NULL && input==NULL && sim->bit_budget>=0){
		/* unused budget is lost...*/
		sim->last_check.tv_sec=0;
	}
305 306 307
	return output;
}

308
static mblk_t *simulate_loss_rate(OrtpNetworkSimulatorCtx *net_sim_ctx, mblk_t *input){
309
	int rrate;
310
	float loss_rate=net_sim_ctx->params.loss_rate*10.0;
311 312 313

	/*in order to simulate bursts of dropped packets, take into account a different probability after a loss occured*/
	if (net_sim_ctx->consecutive_drops>0){
314
		loss_rate=net_sim_ctx->params.consecutive_loss_probability*1000.0;
315
	}
316

317 318
	rrate = ortp_random() % 1000;

319
	if (rrate >= loss_rate) {
320 321
		if (net_sim_ctx->consecutive_drops){
			/*after a burst of lost packets*/
322
			net_sim_ctx->drops_to_ignore=net_sim_ctx->consecutive_drops - ((net_sim_ctx->consecutive_drops*net_sim_ctx->params.loss_rate)/100);
323 324
			net_sim_ctx->consecutive_drops=0;
		}
325 326 327 328
		return input;
	}
	if (net_sim_ctx->drops_to_ignore>0){
		net_sim_ctx->drops_to_ignore--;
Yann Diorcet's avatar
Yann Diorcet committed
329 330
		return input;
	}
331 332 333
	if (net_sim_ctx->params.consecutive_loss_probability>0){
		net_sim_ctx->consecutive_drops++;
	}
334
	net_sim_ctx->drop_by_loss++;
335
	freemsg(input);
Yann Diorcet's avatar
Yann Diorcet committed
336 337 338
	return NULL;
}

339
mblk_t * rtp_session_network_simulate(RtpSession *session, mblk_t *input, bool_t *is_rtp_packet){
340 341
	OrtpNetworkSimulatorCtx *sim=session->net_sim_ctx;
	mblk_t *om=NULL;
342

343
	om=input;
344 345

	/*while packet is stored in network simulator queue, keep its type in reserved1 space*/
346
	if (om != NULL){
347
		sim->total_count++;
348 349 350 351 352
		om->reserved1 = *is_rtp_packet;
	}

	if (sim->params.latency>0){
		om=simulate_latency(session,om);
353 354
	}

355
	if (sim->params.max_bandwidth>0){
356
		om=simulate_bandwidth_limit_and_jitter(session,om);
357
	}
358
	if (sim->params.loss_rate>0 && om){
359
		om=simulate_loss_rate(sim,om);
360
	}
361 362 363 364 365 366
	/*finally when releasing the packet from the simulator, reset the reserved1 space to default,
	since it will be used by mediastreamer later*/
	if (om != NULL){
		*is_rtp_packet = om->reserved1;
		om->reserved1 = 0;
	}
367 368 369
	return om;
}

370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
static void rtp_session_schedule_outbound_network_simulator(RtpSession *session, ortpTimeSpec *sleep_until){
	mblk_t *om;
	int count=0;
	bool_t is_rtp_packet;
	
	if (!session->net_sim_ctx)
		return;
	
	if (!session->net_sim_ctx->params.enabled)
		return;
	
	if (session->net_sim_ctx->params.mode==OrtpNetworkSimulatorOutbound){
		sleep_until->tv_sec=0;
		sleep_until->tv_nsec=0;
		ortp_mutex_lock(&session->net_sim_ctx->mutex);
		while((om=getq(&session->net_sim_ctx->send_q))!=NULL){
			count++;
			ortp_mutex_unlock(&session->net_sim_ctx->mutex);
			is_rtp_packet=om->reserved1; /*it was set by _rtp_session_sendto()*/
			om=rtp_session_network_simulate(session,om, &is_rtp_packet);
			if (om){
				_ortp_sendto(is_rtp_packet ? session->rtp.gs.socket : session->rtcp.gs.socket, om, 0, (struct sockaddr*)&om->net_addr, om->net_addrlen);
				freemsg(om);
			}
			ortp_mutex_lock(&session->net_sim_ctx->mutex);
		}
		ortp_mutex_unlock(&session->net_sim_ctx->mutex);
		if (count==0){
			/*even if no packets were queued, we have to schedule the simulator*/
			is_rtp_packet=TRUE;
			om=rtp_session_network_simulate(session,NULL, &is_rtp_packet);
			if (om){
				_ortp_sendto(is_rtp_packet ? session->rtp.gs.socket : session->rtcp.gs.socket, om, 0, (struct sockaddr*)&om->net_addr, om->net_addrlen);
				freemsg(om);
			}
		}
	}else if (session->net_sim_ctx->params.mode==OrtpNetworkSimulatorOutboundControlled){
#if defined(ORTP_TIMESTAMP)
408
		ortpTimeSpec current={0};
409 410 411 412 413 414 415 416 417
		ortpTimeSpec packet_time;
		mblk_t *todrop=NULL;
		ortp_mutex_lock(&session->net_sim_ctx->mutex);
		while((om=peekq(&session->net_sim_ctx->send_q))!=NULL){
			ortp_mutex_unlock(&session->net_sim_ctx->mutex);
			if (todrop) {
				freemsg(todrop); /*free the last message while the mutex is not held*/
				todrop=NULL;
			}
418
			_ortp_get_cur_time(&current,TRUE);
419 420
			packet_time.tv_sec=om->timestamp.tv_sec;
			packet_time.tv_nsec=om->timestamp.tv_usec*1000LL;
Simon Morlat's avatar
Simon Morlat committed
421
			if (om->timestamp.tv_sec==0 && om->timestamp.tv_usec==0){
Simon Morlat's avatar
Simon Morlat committed
422 423
				todrop=om; /*simulate a packet loss*/
			}else if (packet_time.tv_sec<=current.tv_sec && packet_time.tv_nsec<=current.tv_nsec){
424 425 426 427 428 429
				is_rtp_packet=om->reserved1; /*it was set by _rtp_session_sendto()*/
				_ortp_sendto(is_rtp_packet ? session->rtp.gs.socket : session->rtcp.gs.socket, om, 0, (struct sockaddr*)&om->net_addr, om->net_addrlen);
				todrop=om;
			}else {
				/*no packet is to be sent yet; set the time at which we want to be called*/
				*sleep_until=packet_time;
430
				ortp_mutex_lock(&session->net_sim_ctx->mutex);
431 432 433 434 435 436 437 438
				break; 
			}
			ortp_mutex_lock(&session->net_sim_ctx->mutex);
			if (todrop) getq(&session->net_sim_ctx->send_q); /* pop the message while the mutex is held*/
		}
		ortp_mutex_unlock(&session->net_sim_ctx->mutex);
		if (todrop) freemsg(todrop);
		if (sleep_until->tv_sec==0){
439 440
			_ortp_get_cur_time(&current,TRUE);
			/*no pending packet in the queue yet, schedule a wake up not too far*/
441 442 443
			sleep_until->tv_sec=current.tv_sec;
			sleep_until->tv_nsec=current.tv_nsec+1000000LL; /*in 1 ms*/
		}
444
		
445 446 447 448 449 450 451 452
#else
		ortp_mutex_lock(&session->net_sim_ctx->mutex);
		while((om=getq(&session->net_sim_ctx->send_q))!=NULL){
			ortp_mutex_unlock(&session->net_sim_ctx->mutex);
			freemsg(om);
			ortp_error("Network simulator is in mode OrtpNetworkSimulatorOutboundControlled but oRTP wasn't compiled with --enable-ntp-timestamp.");
			ortp_mutex_lock(&session->net_sim_ctx->mutex);
		}
453
		ortp_mutex_unlock(&session->net_sim_ctx->mutex);
454 455 456 457
#endif
	}
}