Commit f17b2640 authored by Gautier Pelloux-Prayer's avatar Gautier Pelloux-Prayer
Browse files

use packets duplication for the burst period to reduce effictive loss rate (in progress)

parent 9023257c
......@@ -90,6 +90,7 @@ struct _MSQosAnalyserDesc{
bool_t (*process_rtcp)(MSQosAnalyser *obj, mblk_t *rtcp);
void (*suggest_action)(MSQosAnalyser *obj, MSRateControlAction *action);
bool_t (*has_improved)(MSQosAnalyser *obj);
void (*update)(MSQosAnalyser *);
void (*uninit)(MSQosAnalyser *);
};
......
......@@ -240,6 +240,17 @@ static int sort_points(const rtcpstatspoint_t *p1, const rtcpstatspoint_t *p2){
return p1->bandwidth > p2->bandwidth;
}
static double stateful_qos_analyser_upload_bandwidth(MSStatefulQosAnalyser *obj){
double up_bw=rtp_session_get_send_bandwidth(obj->session)/1000.0;
double avg_up_bw=(obj->interval_count)?obj->upload_bandwidth_sum/obj->interval_count:up_bw;
P(GREEN "latest_up_bw=%f vs sum_up_bw=%f\n", up_bw, avg_up_bw);
obj->interval_count=0;
obj->upload_bandwidth_sum=0;
return up_bw;
}
static bool_t stateful_analyser_process_rtcp(MSQosAnalyser *objbase, mblk_t *rtcp){
MSStatefulQosAnalyser *obj=(MSStatefulQosAnalyser*)objbase;
rtpstats_t *cur;
......@@ -250,7 +261,6 @@ static bool_t stateful_analyser_process_rtcp(MSQosAnalyser *objbase, mblk_t *rtc
rb=rtcp_RR_get_report_block(rtcp,0);
}
if (rb && report_block_get_ssrc(rb)==rtp_session_get_send_ssrc(obj->session)){
obj->curindex++;
cur=&obj->stats[obj->curindex % STATS_HISTORY];
......@@ -265,26 +275,45 @@ static bool_t stateful_analyser_process_rtcp(MSQosAnalyser *objbase, mblk_t *rtc
cur->int_jitter=1000.0*(float)report_block_get_interarrival_jitter(rb)/(float)obj->clockrate;
cur->rt_prop=rtp_session_get_round_trip_propagation(obj->session);
ms_message("MSQosAnalyser: lost_percentage=%f, int_jitter=%f ms, rt_prop=%f sec",cur->lost_percentage,cur->int_jitter,cur->rt_prop);
if (obj->curindex>2){
double up_bw = stateful_qos_analyser_upload_bandwidth(obj);
int cum_loss_curr,uniq_emitted;
double loss_rate = cur->lost_percentage/100.0;
int cum_loss=report_block_get_cum_packet_loss(rb);
cum_loss=(cum_loss>>23&1) ? cum_loss-0xFFFFFF-1 : cum_loss;
cum_loss_curr=cum_loss - obj->cum_loss_prev;
uniq_emitted=report_block_get_high_ext_seq(rb) - obj->last_seq;
if (obj->last_seq > 0){
int total_emitted=uniq_emitted * (1 + obj->session->duplication_ratio);
printf("RECEIVE cumloss=%d uniq_emitted=%d total_emitted=%d\n", cum_loss_curr, uniq_emitted, total_emitted);
loss_rate=100 * (1. - (uniq_emitted - cum_loss_curr) * 1.f / total_emitted);
/*printf("RECEIVE expec=%d - received=%d =%d --> cum_loss=%ld\n", expected_packets, stream->hwrcv_since_last_SR, packet_loss, stream->stats.cum_packet_loss);*/
printf("RECEIVE estimated loss rate=%f vs 'real'=%f\n", loss_rate, report_block_get_fraction_lost(rb)/2.56);
}
if (obj->curindex % 10 == 6){
P(YELLOW "SKIPPED first MIN burst %d: %f %f\n", obj->curindex-1, rtp_session_get_send_bandwidth(obj->session)/1000.0, cur->lost_percentage/100.0);
P(YELLOW "SKIPPED first MIN burst %d: %f %f\n", obj->curindex-1, up_bw, loss_rate);
}else{
obj->latest=ms_new0(rtcpstatspoint_t, 1);
obj->latest->timestamp=time(0);
obj->latest->bandwidth=rtp_session_get_send_bandwidth(obj->session)/1000.0;
obj->latest->loss_percent=cur->lost_percentage/100.0;
obj->latest->bandwidth=up_bw;
obj->latest->loss_percent=loss_rate;
obj->latest->rtt=cur->rt_prop;
obj->rtcpstatspoint = ms_list_insert_sorted(obj->rtcpstatspoint, obj->latest, (MSCompareFunc)sort_points);
obj->rtcpstatspoint=ms_list_insert_sorted(obj->rtcpstatspoint, obj->latest, (MSCompareFunc)sort_points);
P(YELLOW "one more %d: %f %f\n", obj->curindex-1, obj->latest->bandwidth, obj->latest->loss_percent);
}
obj->cum_loss_prev=cum_loss;
obj->last_seq=report_block_get_high_ext_seq(rb);
if (ms_list_size(obj->rtcpstatspoint) > ESTIM_HISTORY){
P(RED "Reached list maximum capacity (count=%d)", ms_list_size(obj->rtcpstatspoint));
/*clean everything which occurred 60 sec or more ago*/
time_t now = time(0) - 60;
obj->rtcpstatspoint = ms_list_remove_custom(obj->rtcpstatspoint, (MSCompareFunc)earlier_than, &now);
time_t clear_time = time(0) - 60;
obj->rtcpstatspoint = ms_list_remove_custom(obj->rtcpstatspoint, (MSCompareFunc)earlier_than, &clear_time);
P(RED "--> Cleaned list (count=%d)\n", ms_list_size(obj->rtcpstatspoint));
}
}
......@@ -292,147 +321,6 @@ static bool_t stateful_analyser_process_rtcp(MSQosAnalyser *objbase, mblk_t *rtc
return rb!=NULL;
}
// static float compute_network_state(MSStatefulQosAnalyser *obj){
// int i;
// double x_mean = 0.;
// double y_mean = 0.;
// double x_square_sum = 0.;
// double x_y_sum = 0.;
// int last = obj->curindex - 1;
// int f = 2;//last > 15 ? last - 13 : 2; //always skip the 2 first ones
// int n = (last - f + 1);
// double mean_bw = 0.;
// double mean_diff = 0.;
// int x_min_ind = f;
// int x_max_ind = f;
// bool_t lossy_network = FALSE;
// double diff, m, b;
// uint8_t previous_state = obj->network_state;
// uint8_t unstable_state = MSQosAnalyserNetworkUnstable;
// double rtt = ((rtcpstatspoint_t*)obj->latest->data)->rtt;
// obj->network_state = MSQosAnalyserNetworkFine;
// for (i = f; i <= last; i++){
// double x = obj->rtcpstatspoint[i].bandwidth;
// double y = obj->rtcpstatspoint[i].loss_percent;
// if (x < obj->rtcpstatspoint[x_min_ind].bandwidth) x_min_ind = i;
// if (x > obj->rtcpstatspoint[x_max_ind].bandwidth) x_max_ind = i;
// x_mean += x;
// y_mean += y;
// x_y_sum += x * y;
// x_square_sum += x * x;
// mean_bw += x * (1 - y);
// }
// x_mean /= n;
// y_mean /= n;
// mean_bw /= n;
// P("\tEstimated BW by avg is %f kbit/s\n", mean_bw);
// diff = (obj->rtcpstatspoint[x_max_ind].bandwidth - obj->rtcpstatspoint[x_min_ind].bandwidth) / x_mean;
// if (n > 2 && diff > 0.05) unstable_state = MSQosAnalyserNetworkLossy;
// m = (x_y_sum - x_mean * y_mean * n) /
// (x_square_sum - x_mean * x_mean * n);
// b = (y_mean - m * x_mean);
// for (i = f; i <= last; i++){
// double x = obj->rtcpstatspoint[i].bandwidth;
// double y = obj->rtcpstatspoint[i].loss_percent;
// mean_diff += fabs(m * x + b - y);
// }
// mean_diff /= n;
// lossy_network |= (y_mean > 0.1);
// /*to compute estimated BW, we need a minimum x-axis interval size*/
// if (diff > 0.05){
// double avail_bw = (fabs(m) > 0.0001f) ? -b/m : mean_bw;
// P("\tEstimated BW by interpolation is %f kbit/s:\ty=%f x + %f\n", avail_bw, m, b);
// if (b > 0.1){
// lossy_network = TRUE;
// /*}else if (m > 0.05){*/
// }else if ((obj->rtcpstatspoint[x_max_ind].loss_percent - obj->rtcpstatspoint[x_min_ind].loss_percent) / y_mean > 0.1){
// lossy_network = FALSE;
// if we are below the limitation BW, consider network is now fine
// if (obj->rtcpstatspoint[last].bandwidth < .95*mean_bw){
// obj->network_state = MSQosAnalyserNetworkFine;
// }else{
// obj->network_state = MSQosAnalyserNetworkCongested;
// }
// }
// }
// if (obj->network_state == MSQosAnalyserNetworkFine){
// lossy_network |= (y_mean > .1 && obj->rtcpstatspoint[last].loss_percent > y_mean / 2.);
// if (lossy_network){
// /*since congestion may loss a high number of packets, stay in congested network while
// this is not a bit more stable*/
// if (previous_state == MSQosAnalyserNetworkCongested){
// obj->network_state = MSQosAnalyserNetworkCongested;
// obj->network_state = unstable_state; // !!!
// } else{
// obj->network_state = unstable_state;
// }
// /*another hint for a bad network: packets drop mean difference is high*/
// } else if (mean_diff > .1){
// obj->network_state = (rtt > .5) ? MSQosAnalyserNetworkCongested : unstable_state;
// }
// }
// if (obj->network_state == MSQosAnalyserNetworkLossy){
// if (obj->rtcpstatspoint[x_min_ind].bandwidth / mean_bw > 1.){
// P(RED "Network was suggested lossy, but since we did not try its lower bound capability, "
// "we will consider this is congestion for yet\n");
// obj->network_state = MSQosAnalyserNetworkCongested;
// }
// }
// if (diff > 0.05 && b < -0.05){
// if (obj->network_state != MSQosAnalyserNetworkCongested){
// P("Even if network state is now %s, the congestion limit might be at %f\n",
// ms_qos_analyser_network_state_name(obj->network_state), mean_bw);
// }
// }
// P("I think it is a %s network\n", ms_qos_analyser_network_state_name(obj->network_state));
// if (obj->network_state == MSQosAnalyserNetworkLossy){
// //hack
// if (obj->rtcpstatspoint[last].bandwidth>x_mean && obj->rtcpstatspoint[last].loss_percent>1.5*y_mean){
// P(RED "lossy network and congestion probably too!\n");
// mean_bw = (1 - (obj->rtcpstatspoint[x_max_ind].loss_percent - y_mean)) * obj->rtcpstatspoint[x_max_ind].bandwidth;
// }
// mean_bw = obj->rtcpstatspoint[last].bandwidth * 2;
// }
// return mean_bw;
// }
/*static void sort_array(rtcpstatspoint_t *pts,int start, int end){
int i,j;
for (i = start; i < end; ++i){
for (j = i+1; j <= end; j++){
if (pts[i].bandwidth > pts[j].bandwidth){
rtcpstatspoint_t tmp;
memcpy(&tmp, &pts[i], sizeof(rtcpstatspoint_t));
memcpy(&pts[i], &pts[j], sizeof(rtcpstatspoint_t));
memcpy(&pts[j], &tmp, sizeof(rtcpstatspoint_t));
}
}
}
}*/
static float lerp(float inf, float sup, float v){
return inf + (sup - inf) * v;
}
......@@ -563,11 +451,12 @@ static void stateful_analyser_suggest_action(MSQosAnalyser *objbase, MSRateContr
float curbw = obj->latest ? obj->latest->bandwidth : 0.f;
float bw = /*0; if (FALSE)*/ compute_available_bw(obj);
/*rtp_session_set_duplication_ratio(obj->session, 0);*/
/*try a burst every 50 seconds (10 RTCP packets)*/
if (obj->curindex % 10 < 2){
if (obj->curindex > 10){
P(YELLOW "try burst!\n");
bw *= 3;
/*bw *= 3;*/
rtp_session_set_duplication_ratio(obj->session, 2);
}
/*test a min burst to avoid overestimation of available bandwidth*/
else if (obj->curindex % 10 == 5 || obj->curindex % 10 == 6){
......@@ -644,10 +533,43 @@ static bool_t stateful_analyser_has_improved(MSQosAnalyser *objbase){
return FALSE;
}
// static void stateful_analyser_update(MSQosAnalyser *objbase){
// MSStatefulQosAnalyser *obj=(MSStatefulQosAnalyser*)objbase;
// obj->interval_count++;
// obj->upload_bandwidth_sum+=rtp_session_get_send_bandwidth(obj->session)/1000.0;
// switch (obj->burst_state){
// case MSStatefulQosAnalyserBurstEnable:{
// obj->burst_state=MSStatefulQosAnalyserBurstInProgress;
// obj->start_seq_number=obj->last_seq_number=obj->session->rtp.snd_seq;
// ortp_gettimeofday(&obj->start_time, NULL);
// rtp_session_set_duplication_ratio(obj->session, 2);
// }
// case MSStatefulQosAnalyserBurstInProgress:{
// struct timeval now;
// double elapsed;
// obj->last_seq_number=obj->session->rtp.snd_seq;
// ortp_gettimeofday(&now,NULL);
// elapsed=((now.tv_sec-obj->start_time.tv_sec)*1000.0) + ((now.tv_usec-obj->start_time.tv_usec)/1000.0);
// /*burst should last 1sec*/
// if (elapsed > 1.){
// obj->burst_state=MSStatefulQosAnalyserBurstDisable;
// rtp_session_set_duplication_ratio(obj->session, 0);
// }
// }
// case MSStatefulQosAnalyserBurstDisable:{
// }
// }
// }
static MSQosAnalyserDesc stateful_analyser_desc={
stateful_analyser_process_rtcp,
stateful_analyser_suggest_action,
stateful_analyser_has_improved
stateful_analyser_has_improved,
/*stateful_analyser_update*/
};
MSQosAnalyser * ms_stateful_qos_analyser_new(RtpSession *session){
......
......@@ -59,6 +59,12 @@ extern "C" {
double rtt;
} rtcpstatspoint_t;
typedef enum _MSStatefulQosAnalyserBurstState{
MSStatefulQosAnalyserBurstEnable,
MSStatefulQosAnalyserBurstInProgress,
MSStatefulQosAnalyserBurstDisable,
}MSStatefulQosAnalyserBurstState;
typedef struct _MSStatefulQosAnalyser{
MSQosAnalyser parent;
RtpSession *session;
......@@ -66,14 +72,23 @@ extern "C" {
rtpstats_t stats[STATS_HISTORY];
int curindex;
bool_t rt_prop_doubled;
bool_t pad[3];
MSQosAnalyserNetworkState network_state;
MSList *rtcpstatspoint;
rtcpstatspoint_t *latest;
double network_loss_rate;
double congestion_bandwidth;
int cum_loss_prev;
int last_seq;
MSStatefulQosAnalyserBurstState burst_state;
uint32_t start_seq_number;
uint32_t last_seq_number;
struct timeval start_time;
uint32_t interval_count;
double upload_bandwidth_sum;
}MSStatefulQosAnalyser;
#ifdef __cplusplus
}
......
......@@ -126,7 +126,7 @@ static void handle_queue_events(video_stream_manager_t * stream_mgr, OrtpEvQueue
OrtpEventType evt=ortp_event_get_type(ev);
OrtpEventData *evd=ortp_event_get_data(ev);
if (evt == ORTP_EVENT_RTCP_PACKET_RECEIVED/* || evt == ORTP_EVENT_RTCP_PACKET_EMITTED*/) {
if (evt == ORTP_EVENT_RTCP_PACKET_RECEIVED) {
const report_block_t *rb=NULL;
if (rtcp_is_SR(evd->packet)){
rb=rtcp_SR_get_report_block(evd->packet,0);
......@@ -146,8 +146,7 @@ static void handle_queue_events(video_stream_manager_t * stream_mgr, OrtpEvQueue
stream_mgr->latest_stats.loss=100.0*(float)report_block_get_fraction_lost(rb)/256.0;
stream_mgr->latest_stats.rtt=rtp_session_get_round_trip_propagation(stream_mgr->stream->ms.sessions.rtp_session);
ms_message("mediastreamer2_video_stream_tester: %s RTCP packet: loss=%f, RTT=%f, network_state=%d"
,(evt == ORTP_EVENT_RTCP_PACKET_RECEIVED) ? "RECEIVED" : "EMITTED"
ms_message("mediastreamer2_video_stream_tester: received RTCP packet: loss=%f, RTT=%f, network_state=%d"
,stream_mgr->latest_stats.loss
,stream_mgr->latest_stats.rtt
,stream_mgr->latest_stats.network_state);
......@@ -175,7 +174,7 @@ static void start_adaptive_video_stream(video_stream_manager_t * marielle, video
media_stream_enable_adaptive_bitrate_control(&marielle->stream->ms,TRUE);
video_manager_start(marielle, payload, margaux->local_rtp, initial_bitrate, marielle_webcam);
video_stream_set_direction(margaux->stream,VideoStreamRecvOnly);
video_manager_start(margaux, payload, marielle->local_rtp, -1, margaux_webcam);
rtp_session_enable_network_simulation(margaux->stream->ms.sessions.rtp_session,&params);
......@@ -263,7 +262,7 @@ static void adaptive_vp8() {
video_stream_manager_t * marielle, * margaux;
INIT();
start_adaptive_video_stream(marielle, margaux, VP8_PAYLOAD_TYPE, 300000, 0,25, 500, 16);
start_adaptive_video_stream(marielle, margaux, VP8_PAYLOAD_TYPE, 300000, 0, 25, 50, 16);
CU_ASSERT_IN_RANGE(marielle->loss_estim, 20, 30);
CU_ASSERT_IN_RANGE(marielle->congestion_bw_estim, 200, 1000);
DEINIT();
......@@ -271,7 +270,7 @@ static void adaptive_vp8() {
/*very low bandwidth cause a lot of packets to be dropped since congestion is
always present even if we are below the limit due to encoding variance*/
INIT();
start_adaptive_video_stream(marielle, margaux, VP8_PAYLOAD_TYPE, 300000, 40000,0, 50, 16);
start_adaptive_video_stream(marielle, margaux, VP8_PAYLOAD_TYPE, 300000, 40000, 0, 50, 16);
CU_ASSERT_IN_RANGE(marielle->loss_estim, 0, 15);
CU_ASSERT_IN_RANGE(marielle->congestion_bw_estim, 20, 65);
DEINIT();
......@@ -289,10 +288,72 @@ static void adaptive_vp8() {
DEINIT();
}
static void packet_duplication() {
video_stream_manager_t * marielle, * margaux;
int loss_rate = 42;
int target_bw = 0;
int latency = 0;
int payload = VP8_PAYLOAD_TYPE;
int initial_bitrate = 300000;
int max_recv_rtcp_packet = 6;
float dup_ratio = 1;
MSWebCam * marielle_webcam=ms_web_cam_manager_get_default_cam (ms_web_cam_manager_get());
MSWebCam * margaux_webcam=ms_web_cam_manager_get_cam(ms_web_cam_manager_get(), "StaticImage: Static picture");
OrtpNetworkSimulatorParams params={0};
/*this variable should not be changed, since algorithm results rely on this value
(the bigger it is, the more accurate is bandwidth estimation)*/
int rtcp_interval=2500;
INIT();
params.enabled=TRUE;
params.loss_rate=loss_rate;
params.max_bandwidth=target_bw;
params.latency=latency;
media_stream_enable_adaptive_bitrate_control(&marielle->stream->ms,TRUE);
rtp_session_set_duplication_ratio(marielle->stream->ms.sessions.rtp_session, dup_ratio);
video_manager_start(marielle, payload, margaux->local_rtp, initial_bitrate, marielle_webcam);
video_stream_set_direction(margaux->stream,VideoStreamRecvOnly);
video_manager_start(margaux, payload, marielle->local_rtp, -1, margaux_webcam);
rtp_session_enable_network_simulation(margaux->stream->ms.sessions.rtp_session,&params);
rtp_session_set_rtcp_report_interval(margaux->stream->ms.sessions.rtp_session, rtcp_interval);
OrtpEvQueue * evq=ortp_ev_queue_new();
rtp_session_register_event_queue(marielle->stream->ms.sessions.rtp_session,evq);
/*just wait for timeout*/
int retry=0;
int packets_after_start=max_recv_rtcp_packet - (15000.0/rtcp_interval);
int timeout_ms=((packets_after_start > 0) ? 15000 + (packets_after_start + .5) * 5000 : (max_recv_rtcp_packet + .5) * rtcp_interval);
while (retry++ <timeout_ms/100) {
media_stream_iterate(&marielle->stream->ms);
media_stream_iterate(&margaux->stream->ms);
handle_queue_events(marielle, evq);
if (retry%10==0) {
ms_message("stream [%p] bandwidth usage: [d=%.1f,u=%.1f] kbit/sec" ,
&marielle->stream->ms, media_stream_get_down_bw(&marielle->stream->ms)/1000, media_stream_get_up_bw(&marielle->stream->ms)/1000);
ms_message("stream [%p] bandwidth usage: [d=%.1f,u=%.1f] kbit/sec" ,
&margaux->stream->ms, media_stream_get_down_bw(&margaux->stream->ms)/1000, media_stream_get_up_bw(&margaux->stream->ms)/1000);
}
ms_usleep(100000);
}
rtp_session_unregister_event_queue(marielle->stream->ms.sessions.rtp_session,evq);
ortp_ev_queue_destroy(evq);
const rtp_stats_t *stats = rtp_session_get_stats(margaux->stream->ms.sessions.rtp_session);
CU_ASSERT_EQUAL(stats->duplicated, stats->packet_recv / 2);
CU_ASSERT_EQUAL(stats->cum_packet_loss, -stats->duplicated);
DEINIT();
}
static test_t tests[]={
{ "Lossy network", lossy_network },
{ "Stability detection", stability_network_detection },
{ "Adaptive video stream [VP8]", adaptive_vp8 },
{ "Packet duplication", packet_duplication },
};
test_suite_t video_stream_test_suite={
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment