Commit 6bb52c55 authored by jehan's avatar jehan
Browse files

fix channel read of multiple message at a time

parent b29b7d95
......@@ -121,19 +121,8 @@ static int get_message_start_pos(char *buff, size_t bufflen) {
return -1;
}
static void belle_sip_channel_input_stream_reset(belle_sip_channel_input_stream_t* input_stream,int message_size) {
int message_residu=0;
if (message_size>0 && (input_stream->write_ptr-input_stream->read_ptr)>message_size) {
/*still message available, copy a beginning of stream ?*/
message_residu = input_stream->write_ptr-input_stream->read_ptr - message_size;
memcpy(input_stream->buff
,input_stream->read_ptr+message_size,
message_residu);
}
static void belle_sip_channel_input_stream_reset(belle_sip_channel_input_stream_t* input_stream) {
input_stream->read_ptr=input_stream->write_ptr=input_stream->buff;
input_stream->write_ptr+=message_residu;
input_stream->state=WAITING_MESSAGE_START;
input_stream->msg=NULL;
}
......@@ -145,7 +134,7 @@ int belle_sip_channel_process_data(belle_sip_channel_t *obj,unsigned int revents
int num;
int offset;
int i;
size_t message_size=0;
size_t read_size=0;
belle_sip_header_content_length_t* content_length_header;
int content_length;
......@@ -173,13 +162,13 @@ int belle_sip_channel_process_data(belle_sip_channel_t *obj,unsigned int revents
if ((offset=get_message_start_pos(obj->input_stream.read_ptr,num)) >=0 ) {
/*message found !*/
if (offset>0) {
belle_sip_warning("trashing [%i] bytes in frot of sip message on channel [%p]",offset,obj);
belle_sip_warning("trashing [%i] bytes in front of sip message on channel [%p]",offset,obj);
obj->input_stream.read_ptr+=offset;
}
obj->input_stream.state=MESSAGE_AQUISITION;
} else {
belle_sip_debug("Unexpected [%s] received on channel [%p], trashing",obj->input_stream.read_ptr,obj);
belle_sip_channel_input_stream_reset(&obj->input_stream,0);
belle_sip_channel_input_stream_reset(&obj->input_stream);
}
}
......@@ -188,17 +177,19 @@ int belle_sip_channel_process_data(belle_sip_channel_t *obj,unsigned int revents
for (i=0;i<obj->input_stream.write_ptr-obj->input_stream.read_ptr;i++) {
if (strncmp("\r\n\r\n",&obj->input_stream.read_ptr[i],4)==0) {
/*end of message found*/
belle_sip_message("read message from %s:%i\n%s",obj->peer_name,obj->peer_port,obj->input_stream.read_ptr);
belle_sip_message("channel [%p] read message from %s:%i\n%s",obj, obj->peer_name,obj->peer_port,obj->input_stream.read_ptr);
obj->input_stream.msg=belle_sip_message_parse_raw(obj->input_stream.read_ptr
,obj->input_stream.write_ptr-obj->input_stream.read_ptr
,&message_size);
if (obj->input_stream.msg && message_size > 0){
,&read_size);
obj->input_stream.read_ptr+=read_size;
if (obj->input_stream.msg && read_size > 0){
belle_sip_message("channel [%p] [%i] bytes parsed",obj,read_size);
belle_sip_object_ref(obj->input_stream.msg);
if (belle_sip_message_is_request(obj->input_stream.msg)) fix_incoming_via(BELLE_SIP_REQUEST(obj->input_stream.msg),obj->peer);
/*check for body*/
if ((content_length_header = (belle_sip_header_content_length_t*)belle_sip_message_get_header(obj->input_stream.msg,BELLE_SIP_CONTENT_LENGTH)) != NULL
&& belle_sip_header_content_length_get_content_length(content_length_header)>0) {
obj->input_stream.read_ptr+=message_size;
obj->input_stream.state=BODY_AQUISITION;
break; /*don't avoid to exist from loop, because 2 response can be linked*/
} else {
......@@ -208,7 +199,7 @@ int belle_sip_channel_process_data(belle_sip_channel_t *obj,unsigned int revents
}else{
belle_sip_error("Could not parse [%s], resetting channel [%p]",obj->input_stream.read_ptr,obj);
belle_sip_channel_input_stream_reset(&obj->input_stream,0);
belle_sip_channel_input_stream_reset(&obj->input_stream);
}
}
}
......@@ -218,8 +209,13 @@ int belle_sip_channel_process_data(belle_sip_channel_t *obj,unsigned int revents
content_length=belle_sip_header_content_length_get_content_length((belle_sip_header_content_length_t*)belle_sip_message_get_header(obj->input_stream.msg,BELLE_SIP_CONTENT_LENGTH));
if (content_length <= obj->input_stream.write_ptr-obj->input_stream.read_ptr) {
/*great body completed*/
belle_sip_message("read body from %s:%i\n%s",obj->peer_name,obj->peer_port,obj->input_stream.read_ptr);
belle_sip_message("channel [%p] read [%i] bytes of body from %s:%i\n%s" ,obj
,content_length
,obj->peer_name
,obj->peer_port
,obj->input_stream.read_ptr);
belle_sip_message_set_body(obj->input_stream.msg,obj->input_stream.read_ptr,content_length);
read_size+=content_length; /*read size is used in message ready to compute residu*/
obj->input_stream.read_ptr+=content_length;
goto message_ready;
......@@ -228,11 +224,14 @@ int belle_sip_channel_process_data(belle_sip_channel_t *obj,unsigned int revents
return BELLE_SIP_CONTINUE;
message_ready:
obj->incoming_messages=belle_sip_list_append(obj->incoming_messages,obj->input_stream.msg);
belle_sip_channel_input_stream_reset(&obj->input_stream,message_size);
obj->input_stream.msg=NULL;
obj->input_stream.state=WAITING_MESSAGE_START;
BELLE_SIP_INVOKE_LISTENERS_ARG1_ARG2(obj->listeners,belle_sip_channel_listener_t,on_event,obj,BELLE_SIP_EVENT_READ/*always a read event*/);
if (obj->input_stream.write_ptr-obj->input_stream.read_ptr>0) {
/*process residu*/
belle_sip_channel_process_data(obj,0);
} else {
belle_sip_channel_input_stream_reset(&obj->input_stream); /*end of strem, back to home*/
}
return BELLE_SIP_CONTINUE;
} else if (num == 0) {
......@@ -258,7 +257,7 @@ void belle_sip_channel_init(belle_sip_channel_t *obj, belle_sip_stack_t *stack,c
obj->local_ip=belle_sip_strdup(bindip);
obj->local_port=localport;
obj->recv_error=1;/*not set*/
belle_sip_channel_input_stream_reset(&obj->input_stream,0);
belle_sip_channel_input_stream_reset(&obj->input_stream);
}
void belle_sip_channel_set_socket(belle_sip_channel_t *obj, belle_sip_socket_t sock, belle_sip_source_func_t datafunc){
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment