Commit 8b89ebee authored by Simon Morlat's avatar Simon Morlat

fix problem in both jitter buffers with the timestamp rollovers 2**32.

Improve congetion detector.
parent 3ec4c4b2
......@@ -84,9 +84,9 @@ typedef struct _JitterControl
unsigned int count; /* number of packets handled in jitter_control_new_packet. Used internally only. */
int jitt_comp_ts; /* the nominal jitter buffer size converted in rtp time (same unit as timestamp) */
int adapt_jitt_comp_ts;
int64_t clock_offset_ts; /*offset difference between local and distant clock, in timestamp units*/
int64_t prev_clock_offset_ts;
int64_t olddiff;
int32_t clock_offset_ts; /*offset difference between local and distant clock, in timestamp units*/
int32_t prev_clock_offset_ts;
int32_t olddiff;
float jitter;
float inter_jitter; /* interarrival jitter as defined in the RFC */
float jitter_buffer_mean_size; /*effective size (fullness) of jitter buffer*/
......@@ -100,6 +100,8 @@ typedef struct _JitterControl
OrtpKalmanRLS kalman_rls;
double capped_clock_ratio;
uint32_t last_log_ts;
uint32_t local_ts_start;
uint32_t remote_ts_start;
} JitterControl;
typedef struct _WaitPoint
......
......@@ -25,60 +25,102 @@
#include <ortp/rtpsession.h>
static const unsigned int congestion_pending_duration_ms = 5000;
static const float congested_clock_ratio = 0.93;
const char *ortp_congestion_detector_state_to_string(OrtpCongestionState state){
switch (state){
case CongestionStateNormal:
return "CongestionStateNormal";
break;
case CongestionStateSuspected:
return "CongestionStatePending";
break;
case CongestionStateDetected:
return "CongestionStateDetected";
break;
case CongestionStateResolving:
return "CongestionStateResolving";
break;
}
return "invalid state";
}
void ortp_congestion_detector_reset(OrtpCongestionDetector *cd) {
if (cd->state!=CongestionStateNormal) {
ortp_message("Congestion detection from %s canceled"
, cd->state==CongestionStatePending?"TO CONFIRM...":"IN CONGESTION!");
static bool_t ortp_congestion_detector_set_state(OrtpCongestionDetector *cd, OrtpCongestionState state){
bool_t binary_state_changed = FALSE;
if (state == cd->state) return FALSE;
ortp_message("OrtpCongestionDetector: moving from state %s to state %s",
ortp_congestion_detector_state_to_string(cd->state),
ortp_congestion_detector_state_to_string(state));
cd->state = state;
if (state == CongestionStateDetected){
if (!cd->is_in_congestion){
cd->is_in_congestion = TRUE;
binary_state_changed = TRUE;
}
}else if (state == CongestionStateNormal){
cd->start_ms = (uint64_t)-1;
if (cd->is_in_congestion){
cd->is_in_congestion = FALSE;
binary_state_changed = TRUE;
}
}
cd->state = CongestionStateNormal;
cd->start_ms = (uint64_t)-1;
cd->start_jitter_ts = 0;
return binary_state_changed;
}
void ortp_congestion_detector_reset(OrtpCongestionDetector *cd) {
cd->initialized = FALSE;
cd->skip = FALSE;
ortp_congestion_detector_set_state(cd, CongestionStateNormal);
}
OrtpCongestionDetector * ortp_congestion_detector_new(RtpSession *session) {
OrtpCongestionDetector *cd = (OrtpCongestionDetector*)ortp_malloc0(sizeof(OrtpCongestionDetector));
ortp_congestion_detector_reset(cd);
cd->initialized = FALSE;
cd->session = session;
ortp_congestion_detector_reset(cd);
return cd;
}
/*
static uint32_t local_ts_to_remote_ts_rls(double clock_ratio, double offset, uint32_t local_ts){
return (uint32_t)( (int64_t)(clock_ratio*(double)local_ts) + (int64_t)offset);
}
*/
bool_t ortp_congestion_detector_record(OrtpCongestionDetector *cd, uint32_t packet_ts, uint32_t cur_str_ts) {
bool_t state_changed = FALSE;
int64_t diff=(int64_t)packet_ts - (int64_t)cur_str_ts;
bool_t binary_state_changed = FALSE;
bool_t clock_drift;
float jitter;
JitterControl *jitterctl = &cd->session->rtp.jittctl;
bool_t has_jitter;
//float deviation;
if (cd->skip) return FALSE;
packet_ts -= jitterctl->remote_ts_start;
cur_str_ts -= jitterctl->local_ts_start;
if (!cd->initialized) {
cd->initialized = TRUE;
ortp_kalman_rls_init(&cd->rls, 1, (double)diff);
cd->rls.lambda = 0.95f;
ortp_kalman_rls_init(&cd->rls, 1, packet_ts - cur_str_ts);
cd->rls.lambda = 0.99f;
if (jitterctl->params.buffer_algorithm != OrtpJitterBufferRecursiveLeastSquare){
ortp_error("ortp congestion detection requires RLS jitter buffer algorithm.");
cd->skip = TRUE;
}
}
ortp_kalman_rls_record(&cd->rls, cur_str_ts, packet_ts);
clock_drift = cd->rls.m < 0.9;
jitter = labs((long)(diff - jitterctl->clock_offset_ts) /*cd->start_jitter_ts*/) * 1000.f / jitterctl->clock_rate;
has_jitter = jitter > 300.f;
clock_drift = cd->rls.m < congested_clock_ratio || cd->rls.m < congested_clock_ratio * jitterctl->capped_clock_ratio || jitterctl->capped_clock_ratio < congested_clock_ratio ;
//deviation = ((int32_t)(packet_ts - local_ts_to_remote_ts_rls(cd->rls.m, cd->rls.b, cur_str_ts))) / (float)jitterctl->clock_rate;
//deviation = ortp_extremum_get_current(&jitterctl->max_ts_deviation)/(float)jitterctl->clock_rate;
//has_jitter = deviation > acceptable_deviation;
ortp_debug(
"%s clock=%f"
", diff=%ld"
", %f >? 300.f"
", clock_rate=%d"
", jb_slide=%f, jb_clock=%f"
"OrtpCongestionDetector state=%s clock=%f"
", jb->deviation=%f, jb->capped_clock_ratio=%f"
", down_bw=%0.f, up_bw=%0.f kbits/s"
, cd->state==CongestionStateNormal?"":cd->state==CongestionStatePending?"TO CONFIRM...":"IN CONGESTION!"
, ortp_congestion_detector_state_to_string(cd->state)
, cd->rls.m
, (long)diff
, jitter
, jitterctl->clock_rate
, jitterctl->kalman_rls.b, jitterctl->kalman_rls.m
, deviation, jitterctl->capped_clock_ratio
, rtp_session_get_recv_bandwidth_smooth(cd->session)*1e-3, rtp_session_get_send_bandwidth_smooth(cd->session)*1e-3
);
......@@ -86,34 +128,38 @@ bool_t ortp_congestion_detector_record(OrtpCongestionDetector *cd, uint32_t pack
case CongestionStateNormal:
if (clock_drift) {
cd->start_ms = ortp_get_cur_time_ms();
cd->start_jitter_ts = (int64_t)jitterctl->kalman_rls.b;
ortp_message("Congestion detection starts current jitter=%f...", jitterctl->kalman_rls.b);
cd->state = CongestionStatePending;
state_changed = TRUE;
binary_state_changed = ortp_congestion_detector_set_state(cd, CongestionStateSuspected);
}
break;
case CongestionStatePending:
if (!clock_drift && !has_jitter) {
// congestion has stopped - reinit everything
ortp_congestion_detector_reset(cd);
break;
case CongestionStateSuspected:
if (!clock_drift) {
// congestion has maybe stopped
binary_state_changed = ortp_congestion_detector_set_state(cd, CongestionStateNormal);
} else {
// congestion continues - if it has been for longer enough, trigger congestion flag
if (ortp_get_cur_time_ms() - cd->start_ms > congestion_pending_duration_ms) {
ortp_warning("In congestion for more than %d seconds, trigger flag!", congestion_pending_duration_ms / 1000);
cd->state = CongestionStateDetected;
state_changed = TRUE;
binary_state_changed = ortp_congestion_detector_set_state(cd, CongestionStateDetected);
}
}
break;
break;
case CongestionStateDetected:
if (!clock_drift && !has_jitter) {
// congestion has stopped - reinit everything
ortp_congestion_detector_reset(cd);
state_changed = TRUE;
if (!clock_drift) {
// congestion is maybe terminated, go resolving state
binary_state_changed = ortp_congestion_detector_set_state(cd, CongestionStateResolving);
cd->start_ms = ortp_get_cur_time_ms();
}
break;
case CongestionStateResolving:
if (clock_drift) {
binary_state_changed = ortp_congestion_detector_set_state(cd, CongestionStateDetected);
} else {
if (ortp_get_cur_time_ms() - cd->start_ms > congestion_pending_duration_ms) {
binary_state_changed = ortp_congestion_detector_set_state(cd, CongestionStateNormal);
}
}
break;
break;
}
return state_changed;
return binary_state_changed;
}
void ortp_congestion_detector_destroy(OrtpCongestionDetector *obj){
......
......@@ -26,17 +26,19 @@
struct _JitterControl;
typedef enum _OrtpCongestionState {
CongestionStateNormal = 1 << 0,
CongestionStatePending = 1 << 1,
CongestionStateDetected = 1 << 2
CongestionStateNormal,
CongestionStateSuspected,
CongestionStateDetected,
CongestionStateResolving
} OrtpCongestionState;
typedef struct _OrtpCongestionDetector{
OrtpKalmanRLS rls;
uint64_t start_ms;
int64_t start_jitter_ts;
bool_t initialized;
bool_t pad[3];
bool_t is_in_congestion;
bool_t skip;
bool_t pad[1];
OrtpCongestionState state;
struct _RtpSession *session;
}OrtpCongestionDetector;
......
......@@ -187,14 +187,14 @@ void jitter_control_new_packet(JitterControl *ctl, uint32_t packet_ts, uint32_t
static void jitter_control_update_interarrival_jitter(JitterControl *ctl, int64_t diff){
/*compute interarrival jitter*/
int delta;
delta= (int)(diff-ctl->olddiff);
int32_t delta;
delta= (int32_t)(diff-ctl->olddiff);
ctl->inter_jitter=(float) (ctl->inter_jitter+ (( (float)abs(delta) - ctl->inter_jitter)*(1/16.0)));
ctl->olddiff=diff;
}
void jitter_control_new_packet_basic(JitterControl *ctl, uint32_t packet_ts, uint32_t cur_str_ts){
int64_t diff=(int64_t)packet_ts - (int64_t)cur_str_ts;
int32_t diff = packet_ts - cur_str_ts;
double gap,slide;
if (ctl->count==0){
......@@ -231,40 +231,56 @@ static bool_t time_for_log(JitterControl *ctl, uint32_t cur_str_ts){
}
static uint32_t jitter_control_local_ts_to_remote_ts_rls(JitterControl *ctl, uint32_t local_ts){
return (uint32_t)( (int64_t)(ctl->capped_clock_ratio*(double)local_ts) + ctl->clock_offset_ts);
return (uint32_t)( (int64_t)(ctl->capped_clock_ratio*(double)(local_ts - ctl->local_ts_start) + ctl->clock_offset_ts));
}
/**************************** RLS *********************************/
void jitter_control_new_packet_rls(JitterControl *ctl, uint32_t packet_ts, uint32_t cur_str_ts){
int64_t diff=(int64_t)packet_ts - (int64_t)cur_str_ts;
int32_t diff = packet_ts - cur_str_ts;
int deviation;
bool_t jb_size_updated = FALSE;
if (ctl->count==0){
ctl->clock_offset_ts=ctl->prev_clock_offset_ts=diff;
ctl->olddiff=diff;
ctl->jitter=0;
ctl->clock_offset_ts = ctl->prev_clock_offset_ts = (int32_t)packet_ts;
/*
* Offset compensation. In order to avoid managing the rollover of the uint32_t timestamp, the timestamps passed
* to the kalman filter are substracted with their initial value.
* This allows a video stream to run 13hours (clockrate: 90 000), which looks at first sight
* sufficient for a VoIP application.
*/
ctl->local_ts_start = cur_str_ts;
ctl->remote_ts_start = packet_ts;
ctl->olddiff = diff;
ctl->jitter = 0;
ortp_extremum_init(&ctl->max_ts_deviation, (int)(ctl->params.refresh_ms / 1000.f * ctl->clock_rate));
ortp_extremum_record_max(&ctl->max_ts_deviation, cur_str_ts, (float)ctl->jitt_comp_ts);
ortp_extremum_record_max(&ctl->max_ts_deviation, 0, (float)ctl->jitt_comp_ts);
// clocks rate should be the same
ortp_kalman_rls_init(&ctl->kalman_rls, 1, (double)diff);
// clock rates should be the same
ortp_kalman_rls_init(&ctl->kalman_rls, 1.0, 0.0);
ctl->capped_clock_ratio = ctl->kalman_rls.m;
}
/*offset estimation tends to be smaller than reality when
jitter appears since it compensates the jitter */
ortp_kalman_rls_record(&ctl->kalman_rls, cur_str_ts, packet_ts);
/*Compute the deviation from the value predicted by the kalman filter*/
deviation = abs((int32_t)(packet_ts - jitter_control_local_ts_to_remote_ts_rls(ctl, cur_str_ts)));
/*update the kalman filter*/
ortp_kalman_rls_record(&ctl->kalman_rls, cur_str_ts - ctl->local_ts_start, packet_ts - ctl->remote_ts_start);
ctl->capped_clock_ratio=MAX(.5, MIN(ctl->kalman_rls.m, 2));
ctl->clock_offset_ts = (!(.5f<ctl->kalman_rls.m && ctl->kalman_rls.m<2.f))? diff : (int64_t)ctl->kalman_rls.b;
deviation=abs((int32_t)(packet_ts - jitter_control_local_ts_to_remote_ts_rls(ctl, cur_str_ts)));
ctl->capped_clock_ratio = MAX(.5, MIN(ctl->kalman_rls.m, 2));
if (.5f<ctl->kalman_rls.m && ctl->kalman_rls.m<2.f){
/*realistic clock ratio, the filter is well converged*/
ctl->clock_offset_ts = (int32_t)((int32_t)ctl->kalman_rls.b + ctl->remote_ts_start);
}else{
ctl->clock_offset_ts = diff;
}
/*ortp_message("deviation=%g ms", 1000.0*deviation/(double)ctl->clock_rate);*/
jitter_control_update_interarrival_jitter(ctl, diff);
cur_str_ts -= ctl->local_ts_start;
if (ctl->params.adaptive){
bool_t max_updated = ortp_extremum_record_max(&ctl->max_ts_deviation, cur_str_ts, (float)deviation);
float max_deviation = MAX(ortp_extremum_get_previous(&ctl->max_ts_deviation), ortp_extremum_get_current(&ctl->max_ts_deviation));
......
......@@ -281,6 +281,8 @@ void rtp_session_rtp_parse(RtpSession *session, mblk_t *mp, uint32_t local_str_t
return;
}
jitter_control_new_packet(&session->rtp.jittctl,rtp->timestamp,local_str_ts);
if (session->rtp.congdetect){
if (ortp_congestion_detector_record(session->rtp.congdetect,rtp->timestamp,local_str_ts)) {
OrtpEvent *ev=ortp_event_new(ORTP_EVENT_CONGESTION_STATE_CHANGED);
......@@ -289,9 +291,7 @@ void rtp_session_rtp_parse(RtpSession *session, mblk_t *mp, uint32_t local_str_t
rtp_session_dispatch_event(session,ev);
}
}
jitter_control_new_packet(&session->rtp.jittctl,rtp->timestamp,local_str_ts);
update_rtcp_xr_stat_summary(session, mp, local_str_ts);
if (session->flags & RTP_SESSION_FIRST_PACKET_DELIVERED) {
......
......@@ -177,13 +177,13 @@ mblk_t *rtp_getq(queue_t *q,uint32_t timestamp, int *rejected)
break;
}
if (old!=NULL) {
ortp_debug("rtp_getq: discarding too old packet with ts=%i",ts_found);
ortp_debug("rtp_getq: discarding too old packet with ts=%u",ts_found);
(*rejected)++;
freemsg(old);
}
ret=getq(q); /* dequeue the packet, since it has an interesting timestamp*/
ts_found=tmprtp->timestamp;
ortp_debug("rtp_getq: Found packet with ts=%i",tmprtp->timestamp);
ortp_debug("rtp_getq: Found packet with ts=%u",tmprtp->timestamp);
old=ret;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment