Commit 5fcc10b3 authored by Pekka Pessi's avatar Pekka Pessi
Browse files

su/sofia-sip/su_wait.h, su_root.c: added su_root_yield().

su_port.h, su_port.c: added su_port_yield(),
refactored su_port_run() and su_port_step(),
moved su_wait() and call to event callback in su_port_wait_events().

Based on su_yield()/su_port_wiat_events() patch contributed by Colin
Whittaker.

darcs-hash:20060724182340-65a35-99a5cfa9d2b98afd81bd64fb73be89a39ca2fe39.gz
parent ec1c0c11
...@@ -376,6 +376,8 @@ SOFIAPUBFUN int su_root_remove_prepoll(su_root_t *root); ...@@ -376,6 +376,8 @@ SOFIAPUBFUN int su_root_remove_prepoll(su_root_t *root);
SOFIAPUBFUN struct _GSource *su_root_gsource(su_root_t *self); SOFIAPUBFUN struct _GSource *su_root_gsource(su_root_t *self);
SOFIAPUBFUN int su_root_yield(su_root_t *root);
/* Timers */ /* Timers */
SOFIAPUBFUN su_timer_t *su_timer_create(su_task_r const, su_duration_t msec) SOFIAPUBFUN su_timer_t *su_timer_create(su_task_r const, su_duration_t msec)
__attribute__((__malloc__)); __attribute__((__malloc__));
......
...@@ -145,6 +145,9 @@ int su_port_multishot(su_port_t *port, int multishot); ...@@ -145,6 +145,9 @@ int su_port_multishot(su_port_t *port, int multishot);
static static
int su_port_threadsafe(su_port_t *port); int su_port_threadsafe(su_port_t *port);
static
int su_port_yield(su_port_t *port);
su_port_vtable_t const su_port_vtable[1] = su_port_vtable_t const su_port_vtable[1] =
{{ {{
/* su_vtable_size: */ sizeof su_port_vtable, /* su_vtable_size: */ sizeof su_port_vtable,
...@@ -167,9 +170,13 @@ su_port_vtable_t const su_port_vtable[1] = ...@@ -167,9 +170,13 @@ su_port_vtable_t const su_port_vtable[1] =
su_port_remove_prepoll, su_port_remove_prepoll,
su_port_timers, su_port_timers,
su_port_multishot, su_port_multishot,
su_port_threadsafe su_port_threadsafe,
su_port_yield
}}; }};
static int su_port_wait_events(su_port_t *self, su_duration_t tout);
/** /**
* Port is a per-thread reactor. * Port is a per-thread reactor.
* *
...@@ -607,6 +614,9 @@ int su_port_getmsgs(su_port_t *self) ...@@ -607,6 +614,9 @@ int su_port_getmsgs(su_port_t *self)
n++; n++;
} }
/* Check for wait events that may have been generated by this message */
su_port_wait_events(self, 0);
} }
return n; return n;
...@@ -1056,7 +1066,6 @@ int su_port_threadsafe(su_port_t *port) ...@@ -1056,7 +1066,6 @@ int su_port_threadsafe(su_port_t *port)
*/ */
void su_port_run(su_port_t *self) void su_port_run(su_port_t *self)
{ {
int i;
su_duration_t tout = 0; su_duration_t tout = 0;
assert(SU_PORT_OWN_THREAD(self)); assert(SU_PORT_OWN_THREAD(self));
...@@ -1076,44 +1085,10 @@ void su_port_run(su_port_t *self) ...@@ -1076,44 +1085,10 @@ void su_port_run(su_port_t *self)
if (!self->sup_running) if (!self->sup_running)
break; break;
{ if (self->sup_head) /* if there are messages do a quick wait */
su_wait_t *waits = self->sup_waits; tout = 0;
unsigned n = self->sup_n_waits;
unsigned version = self->sup_registers; su_port_wait_events(self, tout);
if (self->sup_head)
tout = 0;
i = su_wait(waits, n, tout);
if (i >= 0 && (unsigned)i < n) {
su_root_t *root;
#if HAVE_POLL
if (self->sup_multishot) {
for (; i < n; i++) {
if (waits[i].revents) {
root = self->sup_wait_roots[i];
self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL,
&waits[i],
self->sup_wait_args[i]);
/* Callback used su_register()/su_unregister() */
if (version != self->sup_registers)
break;
}
}
}
#else /* !HAVE_POLL */
if (0) {
}
#endif
else {
root = self->sup_wait_roots[i];
self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL,
&waits[i],
self->sup_wait_args[i]);
}
}
}
} }
} }
...@@ -1142,55 +1117,26 @@ void su_port_run_tune(su_port_t *self) ...@@ -1142,55 +1117,26 @@ void su_port_run_tune(su_port_t *self)
if (self->sup_timers) if (self->sup_timers)
timers = su_timer_expire(&self->sup_timers, &tout, su_now()); timers = su_timer_expire(&self->sup_timers, &tout, su_now());
if (!self->sup_running)
break;
if (self->sup_head) /* if there are messages do a quick wait */
tout = 0;
bedtime = su_now();
events = su_port_wait_events(self, tout);
woken = su_now();
if (messages || timers || events) if (messages || timers || events)
SU_DEBUG_1(("su_port_run(%p): %.6f: %u messages %u timers %u" SU_DEBUG_1(("su_port_run(%p): %.6f: %u messages %u timers %u "
"events slept %.6f/%.3f\n", "events slept %.6f/%.3f\n",
self, su_time_diff(woken, started), messages, timers, events, self, su_time_diff(woken, started), messages, timers, events,
su_time_diff(woken, bedtime), tout0 * 1e-3)); su_time_diff(woken, bedtime), tout0 * 1e-3));
if (!self->sup_running) if (!self->sup_running)
break; break;
{
su_wait_t *waits = self->sup_waits;
unsigned n = self->sup_n_waits;
unsigned version = self->sup_registers;
events = 0;
if (self->sup_head)
tout = 0;
bedtime = su_now();
i = su_wait(waits, n, tout);
woken = su_now();
if (i >= 0 && (unsigned)i < n) {
su_root_t *root;
if (self->sup_multishot) {
for (; i < n; i++) {
if (waits[i].revents) {
root = self->sup_wait_roots[i];
self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL,
&waits[i],
self->sup_wait_args[i]);
events++;
/* Callback used su_register()/su_unregister() */
if (version != self->sup_registers)
break;
}
}
} else {
root = self->sup_wait_roots[i];
self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL,
&waits[i],
self->sup_wait_args[i]);
events++;
}
}
}
} }
} }
#endif #endif
...@@ -1207,6 +1153,73 @@ void su_port_break(su_port_t *self) ...@@ -1207,6 +1153,73 @@ void su_port_break(su_port_t *self)
self->sup_running = 0; self->sup_running = 0;
} }
/** @internal
* The function @c su_port_wait_events() is used to poll() for wait objects
*
* @param self pointer to port
* @param tout timeout in milliseconds
*
* @return number of events handled
*/
static
int su_port_wait_events(su_port_t *self, su_duration_t tout)
{
int i, events = 0;
su_wait_t *waits = self->sup_waits;
unsigned n = self->sup_n_waits;
unsigned version = self->sup_registers;
su_root_t *root;
i = su_wait(waits, n, tout);
if (i >= 0 && (unsigned)i < n) {
#if HAVE_POLL
/* poll() can return events for multiple wait objects */
if (self->sup_multishot) {
for (; i < n; i++) {
if (waits[i].revents) {
root = self->sup_wait_roots[i];
self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL,
&waits[i],
self->sup_wait_args[i]);
events++;
/* Callback function used su_register()/su_unregister() */
if (version != self->sup_registers)
break;
}
}
}
#else /* !HAVE_POLL */
if (0) {
}
#endif
else {
root = self->sup_wait_roots[i];
self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL,
&self->sup_waits[i],
self->sup_wait_args[i]);
events++;
}
}
return events;
}
/** @internal
* Used to check wait events in callbacks that take lots of time
*
* This function does a timeout 0 poll() and runs wait objects.
*
* @param port pointer to port
*
* @return number of events handled
*/
static
int su_port_yield(su_port_t *port)
{
return su_port_wait_events(port, 0);
}
/** @internal Block until wait object is signaled or timeout. /** @internal Block until wait object is signaled or timeout.
* *
* This function waits for wait objects and the timers associated with * This function waits for wait objects and the timers associated with
...@@ -1219,14 +1232,12 @@ void su_port_break(su_port_t *self) ...@@ -1219,14 +1232,12 @@ void su_port_break(su_port_t *self)
* @param self pointer to port * @param self pointer to port
* @param tout timeout in milliseconds * @param tout timeout in milliseconds
* *
* @Return * @return
* Milliseconds to the next invocation of timer, or @c SU_WAIT_FOREVER if * Milliseconds to the next invocation of timer, or @c SU_WAIT_FOREVER if
* there are no active timers. * there are no active timers.
*/ */
su_duration_t su_port_step(su_port_t *self, su_duration_t tout) su_duration_t su_port_step(su_port_t *self, su_duration_t tout)
{ {
int i;
int timers = 0, messages = 0, events = 0;
su_time_t now = su_now(); su_time_t now = su_now();
assert(SU_PORT_OWN_THREAD(self)); assert(SU_PORT_OWN_THREAD(self));
...@@ -1235,61 +1246,25 @@ su_duration_t su_port_step(su_port_t *self, su_duration_t tout) ...@@ -1235,61 +1246,25 @@ su_duration_t su_port_step(su_port_t *self, su_duration_t tout)
self->sup_prepoll(self->sup_pp_magic, self->sup_pp_root); self->sup_prepoll(self->sup_pp_magic, self->sup_pp_root);
if (self->sup_head) if (self->sup_head)
messages = su_port_getmsgs(self); su_port_getmsgs(self);
if (self->sup_timers) if (self->sup_timers)
timers = su_timer_expire(&self->sup_timers, &tout, now); su_timer_expire(&self->sup_timers, &tout, now);
/* if there are messages do a quick wait */
if (self->sup_head) if (self->sup_head)
tout = 0; tout = 0;
{ if (su_port_wait_events(self, tout))
su_wait_t *waits = self->sup_waits; tout = 0;
unsigned n = self->sup_n_waits; else
unsigned version = self->sup_registers;
su_root_t *root;
i = su_wait(waits, n, tout);
tout = SU_WAIT_FOREVER; tout = SU_WAIT_FOREVER;
if (i >= 0 && (unsigned)i < n) {
tout = 0;
#if HAVE_POLL
if (self->sup_multishot) {
for (; i < n; i++) {
if (waits[i].revents) {
root = self->sup_wait_roots[i];
self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL,
&waits[i],
self->sup_wait_args[i]);
events++;
/* Callback function used su_register()/su_unregister() */
if (version != self->sup_registers)
break;
}
}
}
#else /* !HAVE_POLL */
if (0) {
}
#endif
else {
root = self->sup_wait_roots[i];
self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL,
&self->sup_waits[i],
self->sup_wait_args[i]);
events++;
}
}
}
if (self->sup_head) if (self->sup_head)
messages = su_port_getmsgs(self); su_port_getmsgs(self);
if (self->sup_timers) if (self->sup_timers)
timers += su_timer_expire(&self->sup_timers, &tout, su_now()); su_timer_expire(&self->sup_timers, &tout, su_now());
if (self->sup_head) if (self->sup_head)
tout = 0; tout = 0;
......
...@@ -136,6 +136,9 @@ typedef struct { ...@@ -136,6 +136,9 @@ typedef struct {
int (*su_port_multishot)(su_port_t *port, int multishot); int (*su_port_multishot)(su_port_t *port, int multishot);
int (*su_port_threadsafe)(su_port_t *port); int (*su_port_threadsafe)(su_port_t *port);
/* Extension from > 1.12.0 */
int (*su_port_yield)(su_port_t *port);
} su_port_vtable_t; } su_port_vtable_t;
SOFIAPUBFUN su_port_t *su_port_create(void) SOFIAPUBFUN su_port_t *su_port_create(void)
...@@ -296,6 +299,7 @@ su_duration_t su_port_step(su_port_t *self, su_duration_t tout) ...@@ -296,6 +299,7 @@ su_duration_t su_port_step(su_port_t *self, su_duration_t tout)
return (su_duration_t)-1; return (su_duration_t)-1;
} }
static inline static inline
int su_port_own_thread(su_port_t const *self) int su_port_own_thread(su_port_t const *self)
{ {
......
...@@ -821,6 +821,26 @@ su_duration_t su_root_sleep(su_root_t *self, su_duration_t duration) ...@@ -821,6 +821,26 @@ su_duration_t su_root_sleep(su_root_t *self, su_duration_t duration)
return retval; return retval;
} }
/** Check wait events in callbacks that take lots of time
*
* This function does a 0 timeout poll() and runs wait objects
*
* @param self pointer to root object
*/
int su_root_yield(su_root_t *self)
{
if (self && self->sur_task[0].sut_port) {
su_port_t *port = self->sur_task[0].sut_port;
/* Make sure we have su_port_yield extension */
if (port->sup_vtable->su_vtable_size >=
offsetof(su_port_vtable_t, su_port_yield)
&& port->sup_vtable->su_port_yield)
return port->sup_vtable->su_port_yield(port);
}
errno = EINVAL;
return -1;
}
/** Get task reference. /** Get task reference.
* *
* The function su_root_task() is used to retrieve the task reference * The function su_root_task() is used to retrieve the task reference
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment