Commit 05d0314a authored by Pekka Pessi's avatar Pekka Pessi

sofia-sip/su_wait.h: added su_msg_new(), su_msg_send_to(), su_msg_deinitializer()

Reduce overhead from message passing.

darcs-hash:20071126221227-65a35-4be08d4fccb3c323f3357a87f39914728a36f0a7.gz
parent a33afaef
...@@ -398,6 +398,9 @@ typedef void su_msg_function(su_root_magic_t *magic, ...@@ -398,6 +398,9 @@ typedef void su_msg_function(su_root_magic_t *magic,
su_msg_r msg, su_msg_r msg,
su_msg_arg_t *arg); su_msg_arg_t *arg);
/** Message deinitialize. */
typedef void su_msg_deinit_func(su_msg_arg_t *arg);
/** Message delivery function pointer type. */ /** Message delivery function pointer type. */
typedef su_msg_function *su_msg_f; typedef su_msg_function *su_msg_f;
...@@ -526,6 +529,7 @@ SOFIAPUBFUN int su_msg_create(su_msg_r msg, ...@@ -526,6 +529,7 @@ SOFIAPUBFUN int su_msg_create(su_msg_r msg,
su_task_r const to, su_task_r const from, su_task_r const to, su_task_r const from,
su_msg_f wakeup, isize_t size); su_msg_f wakeup, isize_t size);
SOFIAPUBFUN int su_msg_report(su_msg_r msg, su_msg_f report); SOFIAPUBFUN int su_msg_report(su_msg_r msg, su_msg_f report);
SOFIAPUBFUN int su_msg_deinitializer(su_msg_r msg, su_msg_deinit_func *);
SOFIAPUBFUN int su_msg_reply(su_msg_r reply, su_msg_cr msg, SOFIAPUBFUN int su_msg_reply(su_msg_r reply, su_msg_cr msg,
su_msg_f wakeup, isize_t size); su_msg_f wakeup, isize_t size);
SOFIAPUBFUN void su_msg_destroy(su_msg_r msg); SOFIAPUBFUN void su_msg_destroy(su_msg_r msg);
...@@ -537,6 +541,11 @@ SOFIAPUBFUN _su_task_r su_msg_from(su_msg_cr msg); ...@@ -537,6 +541,11 @@ SOFIAPUBFUN _su_task_r su_msg_from(su_msg_cr msg);
SOFIAPUBFUN _su_task_r su_msg_to(su_msg_cr msg); SOFIAPUBFUN _su_task_r su_msg_to(su_msg_cr msg);
SOFIAPUBFUN int su_msg_send(su_msg_r msg); SOFIAPUBFUN int su_msg_send(su_msg_r msg);
SOFIAPUBFUN int su_msg_new(su_msg_r msg, size_t size);
SOFIAPUBFUN int su_msg_send_to(su_msg_r msg,
su_task_r const to,
su_msg_f wakeup);
/** Does reference contain a message? */ /** Does reference contain a message? */
#if SU_HAVE_INLINE #if SU_HAVE_INLINE
static SU_INLINE static SU_INLINE
......
...@@ -268,8 +268,14 @@ static int su_base_port_execute_msgs(su_msg_t *queue) ...@@ -268,8 +268,14 @@ static int su_base_port_execute_msgs(su_msg_t *queue)
queue = msg->sum_next, msg->sum_next = NULL; queue = msg->sum_next, msg->sum_next = NULL;
if (f) if (f) {
f(SU_ROOT_MAGIC(msg->sum_to->sut_root), &msg, msg->sum_data); su_root_t *root = msg->sum_to->sut_root;
if (msg->sum_to->sut_port == NULL)
msg->sum_to->sut_root = NULL;
f(SU_ROOT_MAGIC(root), &msg, msg->sum_data);
}
su_msg_delivery_report(&msg); su_msg_delivery_report(&msg);
n++; n++;
} }
......
...@@ -61,12 +61,13 @@ SOFIA_BEGIN_DECLS ...@@ -61,12 +61,13 @@ SOFIA_BEGIN_DECLS
/** @internal Message */ /** @internal Message */
struct su_msg_s { struct su_msg_s {
isize_t sum_size; size_t sum_size;
su_msg_t *sum_next; su_msg_t *sum_next;
su_task_r sum_to; su_task_r sum_to;
su_task_r sum_from; su_task_r sum_from;
su_msg_f sum_func; su_msg_f sum_func;
su_msg_f sum_report; su_msg_f sum_report;
su_msg_deinit_func *sum_deinit;
su_msg_arg_t sum_data[1]; /* minimum size, may be extended */ su_msg_arg_t sum_data[1]; /* minimum size, may be extended */
}; };
...@@ -101,11 +102,11 @@ typedef struct su_port_vtable { ...@@ -101,11 +102,11 @@ typedef struct su_port_vtable {
struct _GSource *(*su_port_gsource)(su_port_t *port); struct _GSource *(*su_port_gsource)(su_port_t *port);
int (*su_port_send)(su_port_t *self, su_msg_r rmsg); int (*su_port_send)(su_port_t *self, su_msg_r rmsg);
int (*su_port_register)(su_port_t *self, int (*su_port_register)(su_port_t *self,
su_root_t *root, su_root_t *root,
su_wait_t *wait, su_wait_t *wait,
su_wakeup_f callback, su_wakeup_f callback,
su_wakeup_arg_t *arg, su_wakeup_arg_t *arg,
int priority); int priority);
int (*su_port_unregister)(su_port_t *port, int (*su_port_unregister)(su_port_t *port,
su_root_t *root, su_root_t *root,
su_wait_t *wait, su_wait_t *wait,
......
...@@ -875,6 +875,30 @@ int su_root_has_thread(su_root_t *root) ...@@ -875,6 +875,30 @@ int su_root_has_thread(su_root_t *root)
* Messages * Messages
*/ */
/**
* Allocate a su message of given size.
*
* Allocate a su message with given data size.
*
* @param rmsg handle to the new message (may be uninitialized prior calling)
* @param size size of the message data
*
* @retval 0 if successful,
* @retval -1 if message allocation fails.
*/
int su_msg_new(su_msg_r rmsg, size_t size)
{
su_msg_t *msg;
size_t total = sizeof(*msg) + (size_t)size;
*rmsg = msg = su_zalloc(NULL, (isize_t)total);
if (!*rmsg)
return -1;
msg->sum_size = total;
return 0;
}
/** /**
* Allocates a message of given size. * Allocates a message of given size.
* *
...@@ -896,20 +920,13 @@ int su_msg_create(su_msg_r rmsg, ...@@ -896,20 +920,13 @@ int su_msg_create(su_msg_r rmsg,
su_msg_f wakeup, su_msg_f wakeup,
isize_t size) isize_t size)
{ {
su_msg_t *msg; if (su_msg_new(rmsg, (size_t) size) == 0) {
SU_TASK_COPY(rmsg[0]->sum_to, to, su_msg_create);
msg = su_zalloc(NULL, sizeof(*msg) + size); SU_TASK_COPY(rmsg[0]->sum_from, from, su_msg_create);
rmsg[0]->sum_func = wakeup;
if (msg) {
msg->sum_size = sizeof(*msg) + size;
SU_TASK_COPY(msg->sum_to, to, su_msg_create);
SU_TASK_COPY(msg->sum_from, from, su_msg_create);
msg->sum_func = wakeup;
*rmsg = msg;
return 0; return 0;
} }
*rmsg = NULL;
return -1; return -1;
} }
...@@ -931,11 +948,31 @@ int su_msg_report(su_msg_r msg, ...@@ -931,11 +948,31 @@ int su_msg_report(su_msg_r msg,
return -1; return -1;
} }
/** Add a deinitializer function to a message.
*
* The deinitializer function is called when the message gets destroyed. It
* is called even if the message was never delivered. Note that the thread
* destroying the message and calling the deinit function is not necessarily
* the same that sent the message nor the original recipient.
*
* @param rmsg message reference
* @param deinit pointer to deinitializer function
*/
int su_msg_deinitializer(su_msg_r rmsg,
su_msg_deinit_func *deinit)
{
if (rmsg && rmsg[0]) {
rmsg[0]->sum_deinit = deinit;
return 0;
}
return -1;
}
/** /**
* Allocates a reply message of given size. * Allocates a reply message of given size.
* *
* @param reply handle to the new message (may be uninitialized prior calling) * @param reply handle to the new message (may be uninitialized prior calling)
* @param msg the incoming message * @param rmsg the incoming message
* @param wakeup function that is called when message is delivered * @param wakeup function that is called when message is delivered
* @param size size of the message data * @param size size of the message data
* *
...@@ -943,17 +980,17 @@ int su_msg_report(su_msg_r msg, ...@@ -943,17 +980,17 @@ int su_msg_report(su_msg_r msg,
* @retval -1 otherwise. * @retval -1 otherwise.
*/ */
int su_msg_reply(su_msg_r reply, su_msg_cr msg, int su_msg_reply(su_msg_r reply, su_msg_cr rmsg,
su_msg_f wakeup, isize_t size) su_msg_f wakeup, isize_t size)
{ {
su_msg_r msg0; su_msg_r rmsg0;
assert(msg != reply); assert(rmsg != reply);
*msg0 = *(su_msg_t **) msg; *rmsg0 = *(su_msg_t **) rmsg;
*reply = NULL; *reply = NULL;
return su_msg_create(reply, su_msg_from(msg0), su_msg_to(msg0), wakeup, size); return su_msg_create(reply, su_msg_from(rmsg0), su_msg_to(rmsg0), wakeup, size);
} }
...@@ -964,38 +1001,38 @@ int su_msg_reply(su_msg_r reply, su_msg_cr msg, ...@@ -964,38 +1001,38 @@ int su_msg_reply(su_msg_r reply, su_msg_cr msg,
* sending task. The sending task calls the delivery report function when it * sending task. The sending task calls the delivery report function when it
* has received the message. * has received the message.
*/ */
void su_msg_delivery_report(su_msg_r msg) void su_msg_delivery_report(su_msg_r rmsg)
{ {
su_task_r swap; su_task_r swap;
if (!msg || !msg[0]) if (!rmsg || !rmsg[0])
return; return;
if (!msg[0]->sum_report) { if (!rmsg[0]->sum_report) {
su_msg_destroy(msg); su_msg_destroy(rmsg);
return; return;
} }
*swap = *msg[0]->sum_from; *swap = *rmsg[0]->sum_from;
*msg[0]->sum_from = *msg[0]->sum_to; *rmsg[0]->sum_from = *rmsg[0]->sum_to;
*msg[0]->sum_to = *swap; *rmsg[0]->sum_to = *swap;
msg[0]->sum_func = msg[0]->sum_report; rmsg[0]->sum_func = rmsg[0]->sum_report;
msg[0]->sum_report = NULL; rmsg[0]->sum_report = NULL;
su_msg_send(msg); su_msg_send(rmsg);
} }
/** Save a message. */ /** Save a message. */
void su_msg_save(su_msg_r save, su_msg_r msg) void su_msg_save(su_msg_r save, su_msg_r rmsg)
{ {
if (save) { if (save) {
if (msg) if (rmsg)
save[0] = msg[0]; save[0] = rmsg[0];
else else
save[0] = NULL; save[0] = NULL;
} }
if (msg) if (rmsg)
msg[0] = NULL; rmsg[0] = NULL;
} }
/** /**
...@@ -1011,6 +1048,9 @@ void su_msg_destroy(su_msg_r rmsg) ...@@ -1011,6 +1048,9 @@ void su_msg_destroy(su_msg_r rmsg)
SU_TASK_ZAP(rmsg[0]->sum_to, su_msg_destroy); SU_TASK_ZAP(rmsg[0]->sum_to, su_msg_destroy);
SU_TASK_ZAP(rmsg[0]->sum_from, su_msg_destroy); SU_TASK_ZAP(rmsg[0]->sum_from, su_msg_destroy);
if (rmsg[0]->sum_deinit)
rmsg[0]->sum_deinit(rmsg[0]->sum_data);
su_free(NULL, rmsg[0]); su_free(NULL, rmsg[0]);
} }
...@@ -1048,13 +1088,13 @@ isize_t su_msg_size(su_msg_cr rmsg) ...@@ -1048,13 +1088,13 @@ isize_t su_msg_size(su_msg_cr rmsg)
* If the message handle contains NULL the function @c su_msg_from * If the message handle contains NULL the function @c su_msg_from
* returns NULL. * returns NULL.
* *
* @param msg message handle * @param rmsg message handle
* *
* @return The task handle of the sender is returned. * @return The task handle of the sender is returned.
*/ */
_su_task_r su_msg_from(su_msg_cr msg) _su_task_r su_msg_from(su_msg_cr rmsg)
{ {
return msg[0] ? msg[0]->sum_from : NULL; return rmsg[0] ? rmsg[0]->sum_from : NULL;
} }
/** Get destination task. /** Get destination task.
...@@ -1065,24 +1105,24 @@ _su_task_r su_msg_from(su_msg_cr msg) ...@@ -1065,24 +1105,24 @@ _su_task_r su_msg_from(su_msg_cr msg)
* If the message handle contains NULL the function @c su_msg_to * If the message handle contains NULL the function @c su_msg_to
* returns NULL. * returns NULL.
* *
* @param msg message handle * @param rmsg message handle
* *
* @return The task handle of the recipient is returned. * @return The task handle of the recipient is returned.
*/ */
_su_task_r su_msg_to(su_msg_cr msg) _su_task_r su_msg_to(su_msg_cr rmsg)
{ {
return msg[0] ? msg[0]->sum_to : NULL; return rmsg[0] ? rmsg[0]->sum_to : NULL;
} }
/** Remove references to 'from' and 'to' tasks from a message. /** Remove references to 'from' and 'to' tasks from a message.
* *
* @param msg message handle * @param rmsg message handle
*/ */
void su_msg_remove_refs(su_msg_cr msg) void su_msg_remove_refs(su_msg_cr rmsg)
{ {
if (msg[0]) { if (rmsg[0]) {
su_task_deinit(msg[0]->sum_to); su_task_deinit(rmsg[0]->sum_to);
su_task_deinit(msg[0]->sum_from); su_task_deinit(rmsg[0]->sum_from);
} }
} }
...@@ -1115,3 +1155,36 @@ int su_msg_send(su_msg_r rmsg) ...@@ -1115,3 +1155,36 @@ int su_msg_send(su_msg_r rmsg)
return 0; return 0;
} }
/** Send message to the @a to_task and mark @a from_task as sender */
SOFIAPUBFUN int su_msg_send_to(su_msg_r rmsg,
su_task_r const to_task,
su_msg_f wakeup)
{
assert(rmsg); assert(to_task);
if (rmsg[0]) {
su_msg_t *msg = rmsg[0];
if (wakeup)
msg->sum_func = wakeup;
if (msg->sum_to->sut_port &&
msg->sum_to->sut_port != to_task->sut_port) {
SU_TASK_ZAP(msg->sum_to, "su_msg_send_to");
}
if (to_task->sut_port != NULL) {
msg->sum_to->sut_port = NULL;
msg->sum_to->sut_root = to_task->sut_root;
return su_port_send(to_task->sut_port, rmsg);
}
su_msg_destroy(rmsg);
errno = EINVAL;
return -1;
}
return 0;
}
...@@ -41,6 +41,7 @@ char const *name = "torture_su_root"; ...@@ -41,6 +41,7 @@ char const *name = "torture_su_root";
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include <assert.h>
#define TSTFLAGS rt->rt_flags #define TSTFLAGS rt->rt_flags
#include <sofia-sip/tstdef.h> #include <sofia-sip/tstdef.h>
...@@ -50,6 +51,7 @@ typedef struct test_ep_s test_ep_t; ...@@ -50,6 +51,7 @@ typedef struct test_ep_s test_ep_t;
#define SU_ROOT_MAGIC_T root_test_t #define SU_ROOT_MAGIC_T root_test_t
#define SU_WAKEUP_ARG_T test_ep_t #define SU_WAKEUP_ARG_T test_ep_t
#define SU_MSG_ARG_T root_test_t *
#include <sofia-sip/su_wait.h> #include <sofia-sip/su_wait.h>
#include <sofia-sip/su_alloc.h> #include <sofia-sip/su_alloc.h>
...@@ -83,6 +85,9 @@ struct root_test_s { ...@@ -83,6 +85,9 @@ struct root_test_s {
su_clone_r rt_clone; su_clone_r rt_clone;
unsigned rt_msg_received;
unsigned rt_msg_destroyed;
unsigned rt_fail_init:1; unsigned rt_fail_init:1;
unsigned rt_fail_deinit:1; unsigned rt_fail_deinit:1;
unsigned rt_success_init:1; unsigned rt_success_init:1;
...@@ -591,6 +596,22 @@ static int set_execute_bit_and_return_3(void *void_rt) ...@@ -591,6 +596,22 @@ static int set_execute_bit_and_return_3(void *void_rt)
return 3; return 3;
} }
static void deinit_simple_msg(su_msg_arg_t *arg)
{
root_test_t *rt = *arg;
rt->rt_msg_destroyed++;
}
static void receive_simple_msg(root_test_t *rt,
su_msg_r msg,
su_msg_arg_t *arg)
{
assert(rt == *arg);
rt->rt_msg_received =
su_task_cmp(su_msg_from(msg), su_task_null) == 0 &&
su_task_cmp(su_msg_to(msg), su_task_null) == 0;
}
static int clone_test(root_test_t rt[1]) static int clone_test(root_test_t rt[1])
{ {
BEGIN(); BEGIN();
...@@ -629,7 +650,21 @@ static int clone_test(root_test_t rt[1]) ...@@ -629,7 +650,21 @@ static int clone_test(root_test_t rt[1])
&retval), 0); &retval), 0);
TEST(retval, 3); TEST(retval, 3);
TEST_1(rt->rt_executed); TEST_1(rt->rt_executed);
/* Deliver message with su_msg_send() */
TEST(su_msg_new(m, sizeof &rt), 0);
*su_msg_data(m) = rt;
rt->rt_msg_received = 0;
rt->rt_msg_destroyed = 0;
TEST(su_msg_deinitializer(m, deinit_simple_msg), 0);
TEST(su_msg_send_to(m, su_clone_task(rt->rt_clone), receive_simple_msg), 0);
while (rt->rt_msg_destroyed == 0)
su_root_step(rt->rt_root, 1);
TEST(rt->rt_msg_received, 1);
/* Make sure 3-way handshake is done as expected */ /* Make sure 3-way handshake is done as expected */
TEST(su_msg_create(m, TEST(su_msg_create(m,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment