netsim.c 14.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
/*
  The oRTP library is an RTP (Realtime Transport Protocol - rfc3550) stack.
  Copyright (C) 2011 Belledonne Communications SARL
  Author: Simon MORLAT simon.morlat@linphone.org

  This library is free software; you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public
  License as published by the Free Software Foundation; either
  version 2.1 of the License, or (at your option) any later version.

  This library is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  Lesser General Public License for more details.

  You should have received a copy of the GNU Lesser General Public
  License along with this library; if not, write to the Free Software
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
*/

#include "ortp/ortp.h"
#include "utils.h"
#include "ortp/rtpsession.h"
#include "rtpsession_priv.h"

26 27
static void rtp_session_schedule_outbound_network_simulator(RtpSession *session, ortpTimeSpec *sleep_until);

28 29
static OrtpNetworkSimulatorCtx* simulator_ctx_new(void){
	OrtpNetworkSimulatorCtx *ctx=(OrtpNetworkSimulatorCtx*)ortp_malloc0(sizeof(OrtpNetworkSimulatorCtx));
30
	qinit(&ctx->latency_q);
31
	qinit(&ctx->q);
32 33
	qinit(&ctx->send_q);
	ortp_mutex_init(&ctx->mutex,NULL);
34 35 36 37
	return ctx;
}

void ortp_network_simulator_destroy(OrtpNetworkSimulatorCtx *sim){
38
	int drop_by_flush=sim->latency_q.q_mcount+sim->q.q_mcount;
39 40 41 42 43 44 45 46 47 48
	if (sim->total_count>0){
		ortp_message("Network simulation: destroyed. Statistics are:"
			"%d/%d(%.1f%%, param=%.1f) packets dropped by loss, "
			"%d/%d(%.1f%%) packets dropped by congestion, "
			"%d/%d(%.1f%%) packets flushed."
			, sim->drop_by_loss, sim->total_count, sim->drop_by_loss*100.f/sim->total_count, sim->params.loss_rate
			, sim->drop_by_congestion, sim->total_count, sim->drop_by_congestion*100.f/sim->total_count
			, drop_by_flush, sim->total_count, drop_by_flush*100.f/sim->total_count
		);
	}
49
	flushq(&sim->latency_q,0);
50
	flushq(&sim->q,0);
51 52 53 54 55 56
	flushq(&sim->send_q,0);
	if (sim->thread_started){
		sim->thread_started=FALSE;
		ortp_thread_join(sim->thread, NULL);
	}
	ortp_mutex_destroy(&sim->mutex);
57 58 59
	ortp_free(sim);
}

60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
static void set_high_prio(){
#ifndef WIN32
	const char *sched_pref=getenv("ORTP_SIMULATOR_SCHED_POLICY");
	int policy=SCHED_OTHER;
	struct sched_param param;
	int result=0;
	char* env_prio_c=NULL;
	int min_prio, max_prio, env_prio;
		
	if (sched_pref && strcasecmp(sched_pref,"SCHED_RR")==0){
		policy=SCHED_RR;
	}else if (sched_pref && strcasecmp(sched_pref,"SCHED_FIFO")==0){
		policy=SCHED_FIFO;
	}

	memset(&param,0,sizeof(param));

	min_prio = sched_get_priority_min(policy);
	max_prio = sched_get_priority_max(policy);
	env_prio_c = getenv("ORTP_SIMULATOR_SCHED_PRIO");

	env_prio = (env_prio_c == NULL)?max_prio:atoi(env_prio_c);

	env_prio = MAX(MIN(env_prio, max_prio), min_prio);

	param.sched_priority=env_prio;
	if((result=pthread_setschedparam(pthread_self(),policy, &param))) {
		ortp_warning("Ortp simulator: set pthread_setschedparam failed: %s",strerror(result));
	} else {
		ortp_message("Priority set to %i and value (%i)",
				policy, param.sched_priority);
	}
#endif
}

static void * outboud_simulator_thread(void *ctx){
	RtpSession *session=(RtpSession*)ctx;
	OrtpNetworkSimulatorCtx *sim=session->net_sim_ctx;
	ortpTimeSpec sleep_until;
	set_high_prio();
	
	while(sim->thread_started){
		sleep_until.tv_sec=0;
		sleep_until.tv_nsec=0;
		rtp_session_schedule_outbound_network_simulator(session, &sleep_until);
		if (sleep_until.tv_sec!=0 || sleep_until.tv_nsec!=0 ) ortp_sleep_until(&sleep_until);
		else ortp_sleep_ms(1);
	}
	return NULL;
}

111
const char *ortp_network_simulator_mode_to_string(OrtpNetworkSimulatorMode mode){
112 113 114 115 116 117 118
	switch(mode){
		case OrtpNetworkSimulatorInbound:
			return "Inbound";
		case OrtpNetworkSimulatorOutbound:
			return "Outbound";
		case OrtpNetworkSimulatorOutboundControlled:
			return "OutboundControlled";
119 120
		case OrtpNetworkSimulatorInvalid:
			return "Invalid";
121 122 123 124
	}
	return "invalid";
}

125 126 127 128 129 130 131
OrtpNetworkSimulatorMode ortp_network_simulator_mode_from_string(const char *str){
	if (strcasecmp(str,"Inbound")==0) return OrtpNetworkSimulatorInbound;
	if (strcasecmp(str,"Outbound")==0) return OrtpNetworkSimulatorOutbound;
	if (strcasecmp(str,"OutboundControlled")==0) return OrtpNetworkSimulatorOutboundControlled;
	return OrtpNetworkSimulatorInvalid;
}

132 133 134
void rtp_session_enable_network_simulation(RtpSession *session, const OrtpNetworkSimulatorParams *params){
	OrtpNetworkSimulatorCtx *sim=session->net_sim_ctx;
	if (params->enabled){
135 136
		if (sim==NULL)
			sim=simulator_ctx_new();
137
		sim->drop_by_congestion=sim->drop_by_loss=sim->total_count=0;
138
		sim->params=*params;
139
		if (sim->params.jitter_burst_density>0 && sim->params.jitter_strength>0 && sim->params.max_bandwidth==0){
140 141 142 143
			sim->params.max_bandwidth=1024000;
			ortp_message("Network simulation: jitter requested but max_bandwidth is not set. Using default value of %f bits/s.",
				sim->params.max_bandwidth);
		}
jehan's avatar
jehan committed
144 145
		if (sim->params.max_bandwidth && sim->params.max_buffer_size==0) {
			sim->params.max_buffer_size=sim->params.max_bandwidth;
146
			ortp_message("Network simulation: Max buffer size not set for RTP session [%p], using [%i]",session,sim->params.max_buffer_size);
jehan's avatar
jehan committed
147
		}
148 149 150 151
		if (params->mode==OrtpNetworkSimulatorOutbound && !sim->thread_started){
			sim->thread_started=TRUE;
			ortp_thread_create(&sim->thread, NULL, outboud_simulator_thread, session);
		}
152
		session->net_sim_ctx=sim;
153

154
		ortp_message("Network simulation: enabled with the following parameters:\n"
155
				"\tlatency=%d\n"
156 157 158
				"\tloss_rate=%.1f\n"
				"\tconsecutive_loss_probability=%.1f\n"
				"\tmax_bandwidth=%.1f\n"
159
				"\tmax_buffer_size=%d\n"
160
				"\tjitter_density=%.1f\n"
161 162
				"\tjitter_strength=%.1f\n"
				"\tmode=%s\n",
163 164 165 166 167
				params->latency,
				params->loss_rate,
				params->consecutive_loss_probability,
				params->max_bandwidth,
				params->max_buffer_size,
168
				params->jitter_burst_density,
169
				params->jitter_strength,
170
				ortp_network_simulator_mode_to_string(params->mode)
171
    			);
172 173
	}else{
		session->net_sim_ctx=NULL;
174
		if (sim!=NULL) ortp_network_simulator_destroy(sim);
175 176 177 178 179 180 181
	}
}

static int64_t elapsed_us(struct timeval *tv1, struct timeval *tv2){
	return ((tv2->tv_sec-tv1->tv_sec)*1000000LL)+((tv2->tv_usec-tv1->tv_usec));
}

182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
static mblk_t * simulate_latency(RtpSession *session, mblk_t *input){
	OrtpNetworkSimulatorCtx *sim=session->net_sim_ctx;
	struct timeval current;
	mblk_t *output=NULL;
	uint32_t current_ts;
	ortp_gettimeofday(&current,NULL);
	/*since we must store expiration date in reserved2(32bits) only(reserved1
	already used), we need to reduce time stamp to milliseconds only*/
	current_ts = 1000*current.tv_sec + current.tv_usec/1000;

	/*queue the packet - store expiration timestamps in reserved fields*/
	if (input){
		input->reserved2 = current_ts + sim->params.latency;
		putq(&sim->latency_q,input);
	}

	if ((output=peekq(&sim->latency_q))!=NULL){
		if (TIME_IS_NEWER_THAN(current_ts, output->reserved2)){
			output->reserved2=0;
			getq(&sim->latency_q);
			/*return the first dequeued packet*/
			return output;
		}
	}

	return NULL;
}

210
static int simulate_jitter_by_bit_budget_reduction(OrtpNetworkSimulatorCtx *sim, int budget_increase){
211
	unsigned int r=ortp_random()%1000;
212
	float threshold,score;
213
	int budget_adjust=0;
214
	uint64_t now=ortp_get_cur_time_ms();
215

216 217 218
	if (sim->last_jitter_event==0){
		sim->last_jitter_event=ortp_get_cur_time_ms();
	}
219

220
	if (sim->in_jitter_event){
221 222 223 224
		threshold=100;
		score=(float)r;
	}else{
		score=1000.0*(float)r*(now-sim->last_jitter_event)*sim->params.jitter_burst_density*1e-6;
225 226
		threshold=500;
	}
227 228
	if (score>(int)threshold){
		int64_t strength_rand=sim->params.jitter_strength * (float)(ortp_random()%1000);
229
		sim->in_jitter_event=TRUE;
230 231
		budget_adjust=-((int64_t)budget_increase*strength_rand/1000LL);
		/*ortp_message("jitter in progress... bit_budget_adjustement=%i, bit_budget=%i",budget_adjust,sim->bit_budget);*/
232
	}else if (sim->in_jitter_event){
233
		/*ortp_message("jitter ended.");*/
234
		sim->in_jitter_event=FALSE;
235
		sim->last_jitter_event=ortp_get_cur_time_ms();
236 237 238 239 240
	}
	return budget_adjust;
}

static mblk_t *simulate_bandwidth_limit_and_jitter(RtpSession *session, mblk_t *input){
241 242 243 244
	OrtpNetworkSimulatorCtx *sim=session->net_sim_ctx;
	struct timeval current;
	int64_t elapsed;
	int bits;
245
	int budget_increase;
246
	mblk_t *output=NULL;
247
	int overhead=(session->rtp.gs.sockfamily==AF_INET6) ? IP6_UDP_OVERHEAD : IP_UDP_OVERHEAD;
248

249
	ortp_gettimeofday(&current,NULL);
250

251 252
	if (sim->last_check.tv_sec==0){
		sim->last_check=current;
Simon Morlat's avatar
Simon Morlat committed
253
		sim->bit_budget=0;
254 255 256
	}
	/*update the budget */
	elapsed=elapsed_us(&sim->last_check,&current);
257 258 259
	budget_increase=(elapsed*(int64_t)sim->params.max_bandwidth)/1000000LL;
	sim->bit_budget+=budget_increase;
	sim->bit_budget+=simulate_jitter_by_bit_budget_reduction(sim,budget_increase);
260 261 262 263 264 265 266 267
	sim->last_check=current;
	/* queue the packet for sending*/
	if (input){
		putq(&sim->q,input);
		bits=(msgdsize(input)+overhead)*8;
		sim->qsize+=bits;
	}
	/*flow control*/
jehan's avatar
jehan committed
268
	while (sim->qsize>=sim->params.max_buffer_size){
269
		// ortp_message("rtp_session_network_simulate(): discarding packets.");
270 271 272 273
		output=getq(&sim->q);
		if (output){
			bits=(msgdsize(output)+overhead)*8;
			sim->qsize-=bits;
274
			sim->drop_by_congestion++;
275
			freemsg(output);
276 277
		}
	}
278

279
	output=NULL;
280

281 282 283 284 285 286 287 288 289
	/*see if we can output a packet*/
	if (sim->bit_budget>=0){
		output=getq(&sim->q);
		if (output){
			bits=(msgdsize(output)+overhead)*8;
			sim->bit_budget-=bits;
			sim->qsize-=bits;
		}
	}
Simon Morlat's avatar
Simon Morlat committed
290 291 292 293
	if (output==NULL && input==NULL && sim->bit_budget>=0){
		/* unused budget is lost...*/
		sim->last_check.tv_sec=0;
	}
294 295 296
	return output;
}

297
static mblk_t *simulate_loss_rate(OrtpNetworkSimulatorCtx *net_sim_ctx, mblk_t *input){
298
	int rrate;
299
	float loss_rate=net_sim_ctx->params.loss_rate*10.0;
300 301 302

	/*in order to simulate bursts of dropped packets, take into account a different probability after a loss occured*/
	if (net_sim_ctx->consecutive_drops>0){
303
		loss_rate=net_sim_ctx->params.consecutive_loss_probability*1000.0;
304
	}
305

306 307
	rrate = ortp_random() % 1000;

308
	if (rrate >= loss_rate) {
309 310
		if (net_sim_ctx->consecutive_drops){
			/*after a burst of lost packets*/
311
			net_sim_ctx->drops_to_ignore=net_sim_ctx->consecutive_drops - ((net_sim_ctx->consecutive_drops*net_sim_ctx->params.loss_rate)/100);
312 313
			net_sim_ctx->consecutive_drops=0;
		}
314 315 316 317
		return input;
	}
	if (net_sim_ctx->drops_to_ignore>0){
		net_sim_ctx->drops_to_ignore--;
Yann Diorcet's avatar
Yann Diorcet committed
318 319
		return input;
	}
320 321 322
	if (net_sim_ctx->params.consecutive_loss_probability>0){
		net_sim_ctx->consecutive_drops++;
	}
323
	net_sim_ctx->drop_by_loss++;
324
	freemsg(input);
Yann Diorcet's avatar
Yann Diorcet committed
325 326 327
	return NULL;
}

328
mblk_t * rtp_session_network_simulate(RtpSession *session, mblk_t *input, bool_t *is_rtp_packet){
329 330
	OrtpNetworkSimulatorCtx *sim=session->net_sim_ctx;
	mblk_t *om=NULL;
331

332
	om=input;
333 334

	/*while packet is stored in network simulator queue, keep its type in reserved1 space*/
335
	if (om != NULL){
336
		sim->total_count++;
337 338 339 340 341
		om->reserved1 = *is_rtp_packet;
	}

	if (sim->params.latency>0){
		om=simulate_latency(session,om);
342 343
	}

344
	if (sim->params.max_bandwidth>0){
345
		om=simulate_bandwidth_limit_and_jitter(session,om);
346
	}
347
	if (sim->params.loss_rate>0 && om){
348
		om=simulate_loss_rate(sim,om);
349
	}
350 351 352 353 354 355
	/*finally when releasing the packet from the simulator, reset the reserved1 space to default,
	since it will be used by mediastreamer later*/
	if (om != NULL){
		*is_rtp_packet = om->reserved1;
		om->reserved1 = 0;
	}
356 357 358
	return om;
}

359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
static void rtp_session_schedule_outbound_network_simulator(RtpSession *session, ortpTimeSpec *sleep_until){
	mblk_t *om;
	int count=0;
	bool_t is_rtp_packet;
	
	if (!session->net_sim_ctx)
		return;
	
	if (!session->net_sim_ctx->params.enabled)
		return;
	
	if (session->net_sim_ctx->params.mode==OrtpNetworkSimulatorOutbound){
		sleep_until->tv_sec=0;
		sleep_until->tv_nsec=0;
		ortp_mutex_lock(&session->net_sim_ctx->mutex);
		while((om=getq(&session->net_sim_ctx->send_q))!=NULL){
			count++;
			ortp_mutex_unlock(&session->net_sim_ctx->mutex);
			is_rtp_packet=om->reserved1; /*it was set by _rtp_session_sendto()*/
			om=rtp_session_network_simulate(session,om, &is_rtp_packet);
			if (om){
				_ortp_sendto(is_rtp_packet ? session->rtp.gs.socket : session->rtcp.gs.socket, om, 0, (struct sockaddr*)&om->net_addr, om->net_addrlen);
				freemsg(om);
			}
			ortp_mutex_lock(&session->net_sim_ctx->mutex);
		}
		ortp_mutex_unlock(&session->net_sim_ctx->mutex);
		if (count==0){
			/*even if no packets were queued, we have to schedule the simulator*/
			is_rtp_packet=TRUE;
			om=rtp_session_network_simulate(session,NULL, &is_rtp_packet);
			if (om){
				_ortp_sendto(is_rtp_packet ? session->rtp.gs.socket : session->rtcp.gs.socket, om, 0, (struct sockaddr*)&om->net_addr, om->net_addrlen);
				freemsg(om);
			}
		}
	}else if (session->net_sim_ctx->params.mode==OrtpNetworkSimulatorOutboundControlled){
#if defined(ORTP_TIMESTAMP)
		ortpTimeSpec current;
		ortpTimeSpec packet_time;
		mblk_t *todrop=NULL;
		ortp_mutex_lock(&session->net_sim_ctx->mutex);
		while((om=peekq(&session->net_sim_ctx->send_q))!=NULL){
			ortp_mutex_unlock(&session->net_sim_ctx->mutex);
			if (todrop) {
				freemsg(todrop); /*free the last message while the mutex is not held*/
				todrop=NULL;
			}
			ortp_get_cur_time(&current);
			packet_time.tv_sec=om->timestamp.tv_sec;
			packet_time.tv_nsec=om->timestamp.tv_usec*1000LL;
Simon Morlat's avatar
Simon Morlat committed
410 411 412
			else if (om->timestamp.tv_sec==0 && om->timestamp.tv_usec==0){
				todrop=om; /*simulate a packet loss*/
			}else if (packet_time.tv_sec<=current.tv_sec && packet_time.tv_nsec<=current.tv_nsec){
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
				is_rtp_packet=om->reserved1; /*it was set by _rtp_session_sendto()*/
				_ortp_sendto(is_rtp_packet ? session->rtp.gs.socket : session->rtcp.gs.socket, om, 0, (struct sockaddr*)&om->net_addr, om->net_addrlen);
				todrop=om;
			}else {
				/*no packet is to be sent yet; set the time at which we want to be called*/
				*sleep_until=packet_time;
				break; 
			}
			ortp_mutex_lock(&session->net_sim_ctx->mutex);
			if (todrop) getq(&session->net_sim_ctx->send_q); /* pop the message while the mutex is held*/
		}
		ortp_mutex_unlock(&session->net_sim_ctx->mutex);
		if (todrop) freemsg(todrop);
		if (sleep_until->tv_sec==0){
			/*no packet in the queue yet, schedule a wake up not too far*/
			sleep_until->tv_sec=current.tv_sec;
			sleep_until->tv_nsec=current.tv_nsec+1000000LL; /*in 1 ms*/
		}
#else
		ortp_mutex_lock(&session->net_sim_ctx->mutex);
		while((om=getq(&session->net_sim_ctx->send_q))!=NULL){
			ortp_mutex_unlock(&session->net_sim_ctx->mutex);
			freemsg(om);
			ortp_error("Network simulator is in mode OrtpNetworkSimulatorOutboundControlled but oRTP wasn't compiled with --enable-ntp-timestamp.");
			ortp_mutex_lock(&session->net_sim_ctx->mutex);
		}
#endif
	}
}