Commit c9f5fc93 authored by Simon Morlat's avatar Simon Morlat

enhance channel so that messages can be queued.

parent 632120c7
...@@ -19,6 +19,9 @@ ...@@ -19,6 +19,9 @@
#include "belle_sip_internal.h" #include "belle_sip_internal.h"
static void channel_prepare_continue(belle_sip_channel_t *obj);
static void channel_process_queue(belle_sip_channel_t *obj);
const char *belle_sip_channel_state_to_string(belle_sip_channel_state_t state){ const char *belle_sip_channel_state_to_string(belle_sip_channel_state_t state){
switch(state){ switch(state){
case BELLE_SIP_CHANNEL_INIT: case BELLE_SIP_CHANNEL_INIT:
...@@ -416,32 +419,47 @@ static void send_message(belle_sip_channel_t *obj, belle_sip_message_t *msg){ ...@@ -416,32 +419,47 @@ static void send_message(belle_sip_channel_t *obj, belle_sip_message_t *msg){
} }
void belle_sip_channel_prepare(belle_sip_channel_t *obj){ void belle_sip_channel_prepare(belle_sip_channel_t *obj){
obj->prepare=1; channel_prepare_continue(obj);
channel_process_queue(obj);
} }
void channel_process_queue(belle_sip_channel_t *obj){ static void channel_push_outgoing(belle_sip_channel_t *obj, belle_sip_message_t *msg){
belle_sip_object_ref(obj);/* we need to ref ourself because code below may trigger our destruction*/ obj->outgoing_messages=belle_sip_list_append(obj->outgoing_messages,msg);
}
static belle_sip_message_t *channel_pop_outgoing(belle_sip_channel_t *obj){
belle_sip_message_t *msg=NULL;
if (obj->outgoing_messages){
msg=(belle_sip_message_t*)obj->outgoing_messages->data;
obj->outgoing_messages=belle_sip_list_delete_link(obj->outgoing_messages,obj->outgoing_messages);
}
return msg;
}
static void channel_prepare_continue(belle_sip_channel_t *obj){
switch(obj->state){ switch(obj->state){
case BELLE_SIP_CHANNEL_INIT: case BELLE_SIP_CHANNEL_INIT:
if (obj->prepare) belle_sip_channel_resolve(obj); belle_sip_channel_resolve(obj);
break; break;
case BELLE_SIP_CHANNEL_RES_DONE: case BELLE_SIP_CHANNEL_RES_DONE:
if (obj->prepare) belle_sip_channel_connect(obj); belle_sip_channel_connect(obj);
break; break;
case BELLE_SIP_CHANNEL_READY: case BELLE_SIP_CHANNEL_READY:
if (obj->msg) { channel_process_queue(obj);
send_message(obj, obj->msg);
belle_sip_object_unref(obj->msg);
obj->msg=NULL;
}
break;
case BELLE_SIP_CHANNEL_ERROR:
if (obj->msg) belle_sip_error("channel %p: trying to send a message over a broken channel ???",obj);
break; break;
default: default:
break; break;
} }
}
static void channel_process_queue(belle_sip_channel_t *obj){
belle_sip_message_t *msg;
belle_sip_object_ref(obj);/* we need to ref ourself because code below may trigger our destruction*/
while((msg=channel_pop_outgoing(obj))!=NULL) {
send_message(obj, msg);
belle_sip_object_unref(msg);
}
belle_sip_object_unref(obj); belle_sip_object_unref(obj);
} }
...@@ -460,7 +478,6 @@ void belle_sip_channel_set_ready(belle_sip_channel_t *obj, const struct sockaddr ...@@ -460,7 +478,6 @@ void belle_sip_channel_set_ready(belle_sip_channel_t *obj, const struct sockaddr
} }
} }
channel_set_state(obj,BELLE_SIP_CHANNEL_READY); channel_set_state(obj,BELLE_SIP_CHANNEL_READY);
obj->prepare=0;
channel_process_queue(obj); channel_process_queue(obj);
} }
...@@ -470,10 +487,10 @@ static void channel_res_done(void *data, const char *name, struct addrinfo *res) ...@@ -470,10 +487,10 @@ static void channel_res_done(void *data, const char *name, struct addrinfo *res)
if (res){ if (res){
obj->peer=res; obj->peer=res;
channel_set_state(obj,BELLE_SIP_CHANNEL_RES_DONE); channel_set_state(obj,BELLE_SIP_CHANNEL_RES_DONE);
channel_prepare_continue(obj);
}else{ }else{
channel_set_state(obj,BELLE_SIP_CHANNEL_ERROR); channel_set_state(obj,BELLE_SIP_CHANNEL_ERROR);
} }
channel_process_queue(obj);
} }
void belle_sip_channel_resolve(belle_sip_channel_t *obj){ void belle_sip_channel_resolve(belle_sip_channel_t *obj){
...@@ -487,20 +504,18 @@ void belle_sip_channel_connect(belle_sip_channel_t *obj){ ...@@ -487,20 +504,18 @@ void belle_sip_channel_connect(belle_sip_channel_t *obj){
if(BELLE_SIP_OBJECT_VPTR(obj,belle_sip_channel_t)->connect(obj,obj->peer)) { if(BELLE_SIP_OBJECT_VPTR(obj,belle_sip_channel_t)->connect(obj,obj->peer)) {
belle_sip_error("Cannot connect to [%s://%s:%i]",belle_sip_channel_get_transport_name(obj),obj->peer_name,obj->peer_port); belle_sip_error("Cannot connect to [%s://%s:%i]",belle_sip_channel_get_transport_name(obj),obj->peer_name,obj->peer_port);
channel_set_state(obj,BELLE_SIP_CHANNEL_ERROR); channel_set_state(obj,BELLE_SIP_CHANNEL_ERROR);
channel_process_queue(obj);
} }
return; return;
} }
int belle_sip_channel_queue_message(belle_sip_channel_t *obj, belle_sip_message_t *msg){ int belle_sip_channel_queue_message(belle_sip_channel_t *obj, belle_sip_message_t *msg){
if (obj->msg!=NULL){ belle_sip_object_ref(msg);
belle_sip_error("Queue is not a queue, state=%s", belle_sip_channel_state_to_string(obj->state)); channel_push_outgoing(obj,msg);
return -1; if (obj->state==BELLE_SIP_CHANNEL_INIT){
}
obj->msg=(belle_sip_message_t*)belle_sip_object_ref(msg);
if (obj->state==BELLE_SIP_CHANNEL_INIT)
belle_sip_channel_prepare(obj); belle_sip_channel_prepare(obj);
channel_process_queue(obj); }else if (obj->state==BELLE_SIP_CHANNEL_READY) {
channel_process_queue(obj);
}
return 0; return 0;
} }
......
...@@ -82,10 +82,9 @@ struct belle_sip_channel{ ...@@ -82,10 +82,9 @@ struct belle_sip_channel{
int peer_port; int peer_port;
char *local_ip; char *local_ip;
int local_port; int local_port;
int prepare;
unsigned long resolver_id; unsigned long resolver_id;
struct addrinfo *peer; struct addrinfo *peer;
belle_sip_message_t *msg; belle_sip_list_t *outgoing_messages;
belle_sip_list_t* incoming_messages; belle_sip_list_t* incoming_messages;
belle_sip_channel_input_stream_t input_stream; belle_sip_channel_input_stream_t input_stream;
unsigned int recv_error:1; /* used to simulate network error. if <=0, channel_recv will return this value*/ unsigned int recv_error:1; /* used to simulate network error. if <=0, channel_recv will return this value*/
...@@ -140,9 +139,6 @@ const char *belle_sip_channel_get_local_address(belle_sip_channel_t *obj, int *p ...@@ -140,9 +139,6 @@ const char *belle_sip_channel_get_local_address(belle_sip_channel_t *obj, int *p
void channel_set_state(belle_sip_channel_t *obj, belle_sip_channel_state_t state); void channel_set_state(belle_sip_channel_t *obj, belle_sip_channel_state_t state);
/*remember that channel_process_queue() might trigger the destruction of the channel*/
void channel_process_queue(belle_sip_channel_t *obj);
/*just invokes the listeners to process data*/ /*just invokes the listeners to process data*/
int belle_sip_channel_process_data(belle_sip_channel_t *obj,unsigned int revents); int belle_sip_channel_process_data(belle_sip_channel_t *obj,unsigned int revents);
......
...@@ -69,7 +69,7 @@ void belle_sip_listening_point_clean_channels(belle_sip_listening_point_t *lp){ ...@@ -69,7 +69,7 @@ void belle_sip_listening_point_clean_channels(belle_sip_listening_point_t *lp){
belle_sip_list_t* iterator; belle_sip_list_t* iterator;
if ((existing_channels=belle_sip_list_size(lp->channels)) > 0) { if ((existing_channels=belle_sip_list_size(lp->channels)) > 0) {
belle_sip_warning("Listening point destroying [%i] channels",existing_channels); belle_sip_message("Listening point destroying [%i] channels",existing_channels);
} }
for (iterator=lp->channels;iterator!=NULL;iterator=iterator->next) { for (iterator=lp->channels;iterator!=NULL;iterator=iterator->next) {
belle_sip_channel_t *chan=(belle_sip_channel_t*)iterator->data; belle_sip_channel_t *chan=(belle_sip_channel_t*)iterator->data;
......
...@@ -146,7 +146,6 @@ static int stream_channel_process_data(belle_sip_channel_t *obj,unsigned int rev ...@@ -146,7 +146,6 @@ static int stream_channel_process_data(belle_sip_channel_t *obj,unsigned int rev
if (finalize_stream_connection(fd,(struct sockaddr*)&ss,&addrlen)) { if (finalize_stream_connection(fd,(struct sockaddr*)&ss,&addrlen)) {
belle_sip_error("Cannot connect to [%s://%s:%s]",belle_sip_channel_get_transport_name(obj),obj->peer_name,obj->peer_port); belle_sip_error("Cannot connect to [%s://%s:%s]",belle_sip_channel_get_transport_name(obj),obj->peer_name,obj->peer_port);
channel_set_state(obj,BELLE_SIP_CHANNEL_ERROR); channel_set_state(obj,BELLE_SIP_CHANNEL_ERROR);
channel_process_queue(obj);
return BELLE_SIP_STOP; return BELLE_SIP_STOP;
} }
belle_sip_source_set_events((belle_sip_source_t*)obj,BELLE_SIP_EVENT_READ|BELLE_SIP_EVENT_ERROR); belle_sip_source_set_events((belle_sip_source_t*)obj,BELLE_SIP_EVENT_READ|BELLE_SIP_EVENT_ERROR);
......
...@@ -200,7 +200,6 @@ static int tls_process_data(belle_sip_channel_t *obj,unsigned int revents){ ...@@ -200,7 +200,6 @@ static int tls_process_data(belle_sip_channel_t *obj,unsigned int revents){
process_error: process_error:
belle_sip_error("Cannot connect to [%s://%s:%i]",belle_sip_channel_get_transport_name(obj),obj->peer_name,obj->peer_port); belle_sip_error("Cannot connect to [%s://%s:%i]",belle_sip_channel_get_transport_name(obj),obj->peer_name,obj->peer_port);
channel_set_state(obj,BELLE_SIP_CHANNEL_ERROR); channel_set_state(obj,BELLE_SIP_CHANNEL_ERROR);
channel_process_queue(obj);
return BELLE_SIP_STOP; return BELLE_SIP_STOP;
} }
......
...@@ -47,7 +47,7 @@ static void process_dialog_terminated(void *user_ctx, const belle_sip_dialog_ter ...@@ -47,7 +47,7 @@ static void process_dialog_terminated(void *user_ctx, const belle_sip_dialog_ter
static void process_io_error(void *user_ctx, const belle_sip_io_error_event_t *event){ static void process_io_error(void *user_ctx, const belle_sip_io_error_event_t *event){
BELLESIP_UNUSED(user_ctx); BELLESIP_UNUSED(user_ctx);
BELLESIP_UNUSED(event); BELLESIP_UNUSED(event);
belle_sip_warning("process_io_error"); belle_sip_message("process_io_error");
belle_sip_main_loop_quit(belle_sip_stack_get_main_loop(stack)); belle_sip_main_loop_quit(belle_sip_stack_get_main_loop(stack));
/*CU_ASSERT(CU_FALSE);*/ /*CU_ASSERT(CU_FALSE);*/
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment