rtpparse.c 12.1 KB
Newer Older
aymeric's avatar
aymeric committed
1
/*
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
 * The oRTP library is an RTP (Realtime Transport Protocol - rfc3550) implementation with additional features.
 * Copyright (C) 2017 Belledonne Communications SARL
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 */
aymeric's avatar
aymeric committed
19 20


21 22 23 24
#ifdef HAVE_CONFIG_H
#include "ortp-config.h"
#endif

25
#include "ortp/ortp.h"
aymeric's avatar
aymeric committed
26 27 28
#include "jitterctl.h"
#include "utils.h"
#include "rtpsession_priv.h"
29
#include "congestiondetector.h"
aymeric's avatar
aymeric committed
30

31
static bool_t queue_packet(queue_t *q, int maxrqsz, mblk_t *mp, rtp_header_t *rtp, int *discarded, int *duplicate)
aymeric's avatar
aymeric committed
32 33 34 35
{
	mblk_t *tmp;
	int header_size;
	*discarded=0;
36
	*duplicate=0;
aymeric's avatar
aymeric committed
37 38 39 40 41
	header_size=RTP_FIXED_HEADER_SIZE+ (4*rtp->cc);
	if ((mp->b_wptr - mp->b_rptr)==header_size){
		ortp_debug("Rtp packet contains no data.");
		(*discarded)++;
		freemsg(mp);
42
		return FALSE;
aymeric's avatar
aymeric committed
43
	}
44

aymeric's avatar
aymeric committed
45
	/* and then add the packet to the queue */
46 47 48 49 50
	if (rtp_putq(q,mp) < 0) {
		/* It was a duplicate packet */
		(*duplicate)++;
	}

aymeric's avatar
aymeric committed
51 52 53 54 55 56 57
	/* make some checks: q size must not exceed RtpStream::max_rq_size */
	while (q->q_mcount > maxrqsz)
	{
		/* remove the oldest mblk_t */
		tmp=getq(q);
		if (mp!=NULL)
		{
58
			ortp_warning("rtp_putq: Queue is full. Discarding message with ts=%u",((rtp_header_t*)mp->b_rptr)->timestamp);
aymeric's avatar
aymeric committed
59 60 61 62
			freemsg(tmp);
			(*discarded)++;
		}
	}
63
	return TRUE;
aymeric's avatar
aymeric committed
64 65
}

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
static void compute_mean_and_deviation(uint32_t nb, double x, double *olds, double *oldm, double *news, double *newm) {
	*newm = *oldm + (x - *oldm) / nb;
	*news = *olds + ((x - *oldm) * (x - *newm));
	*oldm = *newm;
	*olds = *news;
}

static void update_rtcp_xr_stat_summary(RtpSession *session, mblk_t *mp, uint32_t local_str_ts) {
	rtp_header_t *rtp = (rtp_header_t *)mp->b_rptr;
	int64_t diff = (int64_t)rtp->timestamp - (int64_t)local_str_ts;

	/* TTL/HL statistics */
	if (session->rtcp_xr_stats.rcv_since_last_stat_summary == 1) {
		session->rtcp_xr_stats.min_ttl_or_hl_since_last_stat_summary = 255;
		session->rtcp_xr_stats.max_ttl_or_hl_since_last_stat_summary = 0;
		session->rtcp_xr_stats.olds_ttl_or_hl_since_last_stat_summary = 0;
		session->rtcp_xr_stats.oldm_ttl_or_hl_since_last_stat_summary = mp->ttl_or_hl;
		session->rtcp_xr_stats.newm_ttl_or_hl_since_last_stat_summary = mp->ttl_or_hl;
	}
	compute_mean_and_deviation(session->rtcp_xr_stats.rcv_since_last_stat_summary,
		(double)mp->ttl_or_hl,
		&session->rtcp_xr_stats.olds_ttl_or_hl_since_last_stat_summary,
		&session->rtcp_xr_stats.oldm_ttl_or_hl_since_last_stat_summary,
		&session->rtcp_xr_stats.news_ttl_or_hl_since_last_stat_summary,
		&session->rtcp_xr_stats.newm_ttl_or_hl_since_last_stat_summary);
	if (mp->ttl_or_hl < session->rtcp_xr_stats.min_ttl_or_hl_since_last_stat_summary) {
		session->rtcp_xr_stats.min_ttl_or_hl_since_last_stat_summary = mp->ttl_or_hl;
	}
	if (mp->ttl_or_hl > session->rtcp_xr_stats.max_ttl_or_hl_since_last_stat_summary) {
		session->rtcp_xr_stats.max_ttl_or_hl_since_last_stat_summary = mp->ttl_or_hl;
	}

	/* Jitter statistics */
	if (session->rtcp_xr_stats.rcv_since_last_stat_summary == 1) {
		session->rtcp_xr_stats.min_jitter_since_last_stat_summary = 0xFFFFFFFF;
		session->rtcp_xr_stats.max_jitter_since_last_stat_summary = 0;
	} else {
		int64_t signed_jitter = diff - session->rtcp_xr_stats.last_jitter_diff_since_last_stat_summary;
		uint32_t jitter;
		if (signed_jitter < 0) {
			jitter = (uint32_t)(-signed_jitter);
		} else {
			jitter = (uint32_t)(signed_jitter);
		}
		compute_mean_and_deviation(session->rtcp_xr_stats.rcv_since_last_stat_summary - 1,
			(double)jitter,
			&session->rtcp_xr_stats.olds_jitter_since_last_stat_summary,
			&session->rtcp_xr_stats.oldm_jitter_since_last_stat_summary,
			&session->rtcp_xr_stats.news_jitter_since_last_stat_summary,
			&session->rtcp_xr_stats.newm_jitter_since_last_stat_summary);
		if (jitter < session->rtcp_xr_stats.min_jitter_since_last_stat_summary) {
			session->rtcp_xr_stats.min_jitter_since_last_stat_summary = jitter;
		}
		if (jitter > session->rtcp_xr_stats.max_jitter_since_last_stat_summary) {
			session->rtcp_xr_stats.max_jitter_since_last_stat_summary = jitter;
		}
	}
	session->rtcp_xr_stats.last_jitter_diff_since_last_stat_summary = diff;
}

aymeric's avatar
aymeric committed
126 127 128
void rtp_session_rtp_parse(RtpSession *session, mblk_t *mp, uint32_t local_str_ts, struct sockaddr *addr, socklen_t addrlen)
{
	int i;
129 130
	int discarded;
	int duplicate;
aymeric's avatar
aymeric committed
131 132 133
	rtp_header_t *rtp;
	int msgsize;
	RtpStream *rtpstream=&session->rtp;
134
	rtp_stats_t *stats=&session->stats;
135

136
	msgsize=(int)(mp->b_wptr-mp->b_rptr);
aymeric's avatar
aymeric committed
137 138 139

	if (msgsize<RTP_FIXED_HEADER_SIZE){
		ortp_warning("Packet too small to be a rtp packet (%i)!",msgsize);
140
		session->stats.bad++;
aymeric's avatar
aymeric committed
141 142 143 144 145 146 147 148 149 150 151 152
		ortp_global_stats.bad++;
		freemsg(mp);
		return;
	}
	rtp=(rtp_header_t*)mp->b_rptr;
	if (rtp->version!=2)
	{
		/* try to see if it is a STUN packet */
		uint16_t stunlen=*((uint16_t*)(mp->b_rptr + sizeof(uint16_t)));
		stunlen = ntohs(stunlen);
		if (stunlen+20==mp->b_wptr-mp->b_rptr){
			/* this looks like a stun packet */
153
			rtp_session_update_remote_sock_addr(session,mp,TRUE,TRUE);
aymeric's avatar
aymeric committed
154 155 156 157
			if (session->eventqs!=NULL){
				OrtpEvent *ev=ortp_event_new(ORTP_EVENT_STUN_PACKET_RECEIVED);
				OrtpEventData *ed=ortp_event_get_data(ev);
				ed->packet=mp;
158 159
				memcpy(&ed->source_addr,addr,addrlen);
				ed->source_addrlen=addrlen;
160
				ed->info.socket_type = OrtpRTPSocket;
aymeric's avatar
aymeric committed
161 162 163 164
				rtp_session_dispatch_event(session,ev);
				return;
			}
		}
165
		/* discard in two case: the packet is not stun OR nobody is interested by STUN (no eventqs) */
166
		ortp_debug("Receiving rtp packet with version number %d!=2...discarded", rtp->version);
167 168 169 170
		stats->bad++;
		ortp_global_stats.bad++;
		freemsg(mp);
		return;
aymeric's avatar
aymeric committed
171 172 173 174 175 176 177 178
	}

	/* only count non-stun packets. */
	ortp_global_stats.packet_recv++;
	stats->packet_recv++;
	ortp_global_stats.hw_recv+=msgsize;
	stats->hw_recv+=msgsize;
	session->rtp.hwrcv_since_last_SR++;
179
	session->rtcp_xr_stats.rcv_since_last_stat_summary++;
aymeric's avatar
aymeric committed
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195

	/* convert all header data from network order to host order */
	rtp->seq_number=ntohs(rtp->seq_number);
	rtp->timestamp=ntohl(rtp->timestamp);
	rtp->ssrc=ntohl(rtp->ssrc);
	/* convert csrc if necessary */
	if (rtp->cc*sizeof(uint32_t) > (uint32_t) (msgsize-RTP_FIXED_HEADER_SIZE)){
		ortp_debug("Receiving too short rtp packet.");
		stats->bad++;
		ortp_global_stats.bad++;
		freemsg(mp);
		return;
	}

#ifndef PERF
	/* Write down the last RTP/RTCP packet reception time. */
196
	ortp_gettimeofday(&session->last_recv_time, NULL);
aymeric's avatar
aymeric committed
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
#endif

	for (i=0;i<rtp->cc;i++)
		rtp->csrc[i]=ntohl(rtp->csrc[i]);
	/*the goal of the following code is to lock on an incoming SSRC to avoid
	receiving "mixed streams"*/
	if (session->ssrc_set){
		/*the ssrc is set, so we must check it */
		if (session->rcv.ssrc!=rtp->ssrc){
			if (session->inc_ssrc_candidate==rtp->ssrc){
				session->inc_same_ssrc_count++;
			}else{
				session->inc_same_ssrc_count=0;
				session->inc_ssrc_candidate=rtp->ssrc;
			}
212
			if (session->inc_same_ssrc_count>=session->rtp.ssrc_changed_thres){
213
				/* store the sender rtp address to do symmetric RTP */
214
				rtp_session_update_remote_sock_addr(session,mp,TRUE,FALSE);
215
				session->rtp.rcv_last_ts = rtp->timestamp;
aymeric's avatar
aymeric committed
216 217 218 219 220 221 222 223 224 225
				session->rcv.ssrc=rtp->ssrc;
				rtp_signal_table_emit(&session->on_ssrc_changed);
			}else{
				/*discard the packet*/
				ortp_debug("Receiving packet with unknown ssrc.");
				stats->bad++;
				ortp_global_stats.bad++;
				freemsg(mp);
				return;
			}
226
		} else{
227 228 229 230
			/* The SSRC change must not happen if we still receive
			ssrc from the initial source. */
			session->inc_same_ssrc_count=0;
		}
aymeric's avatar
aymeric committed
231 232 233
	}else{
		session->ssrc_set=TRUE;
		session->rcv.ssrc=rtp->ssrc;
234
		rtp_session_update_remote_sock_addr(session,mp,TRUE,FALSE);
aymeric's avatar
aymeric committed
235
	}
236

aymeric's avatar
aymeric committed
237 238 239 240 241 242 243 244 245 246
	/* update some statistics */
	{
		poly32_t *extseq=(poly32_t*)&rtpstream->hwrcv_extseq;
		if (rtp->seq_number>extseq->split.lo){
			extseq->split.lo=rtp->seq_number;
		}else if (rtp->seq_number<200 && extseq->split.lo>((1<<16) - 200)){
			/* this is a check for sequence number looping */
			extseq->split.lo=rtp->seq_number;
			extseq->split.hi++;
		}
247 248 249 250 251

		/* the first sequence number received should be initialized at the beginning
		or at any resync, so that the first receiver reports contains valid loss rate*/
		if (!(session->flags & RTP_SESSION_RECV_SEQ_INIT)){
			rtp_session_set_flag(session, RTP_SESSION_RECV_SEQ_INIT);
252
			rtpstream->hwrcv_seq_at_last_SR=rtp->seq_number-1;
253 254 255 256
			session->rtcp_xr_stats.rcv_seq_at_last_stat_summary=rtp->seq_number-1;
		}
		if (stats->packet_recv==1){
			session->rtcp_xr_stats.first_rcv_seq=extseq->one;
257
		}
258
		session->rtcp_xr_stats.last_rcv_seq=extseq->one;
aymeric's avatar
aymeric committed
259
	}
260

aymeric's avatar
aymeric committed
261
	/* check for possible telephone events */
262
	if (rtp_profile_is_telephone_event(session->snd.profile, rtp->paytype)){
263
		queue_packet(&session->rtp.tev_rq,session->rtp.jittctl.params.max_packets,mp,rtp,&discarded,&duplicate);
264 265
		stats->discarded+=discarded;
		ortp_global_stats.discarded+=discarded;
266 267
		stats->packet_dup_recv+=duplicate;
		ortp_global_stats.packet_dup_recv+=duplicate;
268
		session->rtcp_xr_stats.discarded_count += discarded;
269
		session->rtcp_xr_stats.dup_since_last_stat_summary += duplicate;
aymeric's avatar
aymeric committed
270 271
		return;
	}
272

aymeric's avatar
aymeric committed
273 274 275 276 277
	/* check for possible payload type change, in order to update accordingly our clock-rate dependant
	parameters */
	if (session->hw_recv_pt!=rtp->paytype){
		rtp_session_update_payload_type(session,rtp->paytype);
	}
278

279
	/* Drop the packets while the RTP_SESSION_FLUSH flag is set. */
280 281 282 283
	if (session->flags & RTP_SESSION_FLUSH) {
		freemsg(mp);
		return;
	}
284

285 286
	jitter_control_new_packet(&session->rtp.jittctl,rtp->timestamp,local_str_ts);

287
	if (session->congestion_detector_enabled && session->rtp.congdetect){
288 289 290 291 292 293 294
		if (ortp_congestion_detector_record(session->rtp.congdetect,rtp->timestamp,local_str_ts)) {
			OrtpEvent *ev=ortp_event_new(ORTP_EVENT_CONGESTION_STATE_CHANGED);
			OrtpEventData *ed=ortp_event_get_data(ev);
			ed->info.congestion_detected = session->rtp.congdetect->state == CongestionStateDetected;
			rtp_session_dispatch_event(session,ev);
		}
	}
295
	
296 297
	update_rtcp_xr_stat_summary(session, mp, local_str_ts);

aymeric's avatar
aymeric committed
298 299 300
	if (session->flags & RTP_SESSION_FIRST_PACKET_DELIVERED) {
		/* detect timestamp important jumps in the future, to workaround stupid rtp senders */
		if (RTP_TIMESTAMP_IS_NEWER_THAN(rtp->timestamp,session->rtp.rcv_last_ts+session->rtp.ts_jump)){
Simon Morlat's avatar
Simon Morlat committed
301
			ortp_warning("rtp_parse: timestamp jump in the future detected.");
302
			rtp_signal_table_emit2(&session->on_timestamp_jump,&rtp->timestamp);
aymeric's avatar
aymeric committed
303
		}
304
		else if (RTP_TIMESTAMP_IS_STRICTLY_NEWER_THAN(session->rtp.rcv_last_ts,rtp->timestamp)
305 306 307
			|| RTP_SEQ_IS_STRICTLY_GREATER_THAN(session->rtp.rcv_last_seq,rtp->seq_number)){
			/* don't queue packets older than the last returned packet to the application, or whose sequence number
			 is behind the last packet returned to the application*/
aymeric's avatar
aymeric committed
308 309 310
			/* Call timstamp jumb in case of
			 * large negative Ts jump or if ts is set to 0
			*/
311

aymeric's avatar
aymeric committed
312
			if ( RTP_TIMESTAMP_IS_STRICTLY_NEWER_THAN(session->rtp.rcv_last_ts, rtp->timestamp + session->rtp.ts_jump) ){
Simon Morlat's avatar
Simon Morlat committed
313
				ortp_warning("rtp_parse: negative timestamp jump detected");
314
				rtp_signal_table_emit2(&session->on_timestamp_jump, &rtp->timestamp);
aymeric's avatar
aymeric committed
315
			}
Simon Morlat's avatar
Simon Morlat committed
316
			ortp_error("rtp_parse: discarding too old packet (seq_num=%i, ts=%u)",rtp->seq_number, rtp->timestamp);
aymeric's avatar
aymeric committed
317 318 319
			freemsg(mp);
			stats->outoftime++;
			ortp_global_stats.outoftime++;
320
			session->rtcp_xr_stats.discarded_count++;
aymeric's avatar
aymeric committed
321 322 323
			return;
		}
	}
324

325
	if (queue_packet(&session->rtp.rq,session->rtp.jittctl.params.max_packets,mp,rtp,&discarded,&duplicate))
326
		jitter_control_update_size(&session->rtp.jittctl,&session->rtp.rq);
327 328
	stats->discarded+=discarded;
	ortp_global_stats.discarded+=discarded;
329 330
	stats->packet_dup_recv+=duplicate;
	ortp_global_stats.packet_dup_recv+=duplicate;
331
	session->rtcp_xr_stats.discarded_count += discarded;
332
	session->rtcp_xr_stats.dup_since_last_stat_summary += duplicate;
333 334 335
	if ((discarded == 0) && (duplicate == 0)) {
		session->rtcp_xr_stats.rcv_count++;
	}
aymeric's avatar
aymeric committed
336 337
}