Commit 3e418f2b authored by Pekka Pessi's avatar Pekka Pessi

su: moved pthread-specific stuff into su_pthread_port.c from su_root.c.

Virtualized su_clone_start(), su_clone_wait(), su_task_execute(). Added them
to su_poll_port.c, su_epoll_port.c and su_select_port.c.

darcs-hash:20070201193758-65a35-79d8040b37d1ae02a99a938b5955d1a4a20172c4.gz
parent f4fa4dfc
......@@ -197,7 +197,7 @@ typedef SU_ROOT_MAGIC_T su_root_magic_t;
*/
typedef SU_WAKEUP_ARG_T su_wakeup_arg_t;
/** Wakeup callback function prototype.
/** Wakeup callback function pointer type.
*
* Whenever a registered wait object receives an event, the @link
* ::su_wakeup_f callback function @endlink is invoked.
......@@ -322,9 +322,13 @@ typedef su_msg_t * const su_msg_cr[1];
#define SU_MSG_R_INIT { NULL }
/** Message delivery function type. */
typedef void (*su_msg_f)(su_root_magic_t *magic,
su_msg_r msg,
su_msg_arg_t *arg);
typedef void su_msg_function(su_root_magic_t *magic,
su_msg_r msg,
su_msg_arg_t *arg);
/** Message delivery function pointer type. */
typedef su_msg_function *su_msg_f;
/* ---------------------------------------------------------------------- */
......
......@@ -38,6 +38,13 @@
#include "config.h"
#define su_base_port_s su_port_s
#define SU_CLONE_T su_msg_t
#include "sofia-sip/su.h"
#include "su_port.h"
#include "sofia-sip/su_alloc.h"
#include <stdlib.h>
#include <assert.h>
#include <stdarg.h>
......@@ -46,18 +53,14 @@
#include <limits.h>
#include <errno.h>
#define su_base_port_s su_port_s
#include "sofia-sip/su.h"
#include "su_port.h"
#include "sofia-sip/su_alloc.h"
#if 1
#define PORT_REFCOUNT_DEBUG(x) ((void)0)
#else
#define PORT_REFCOUNT_DEBUG(x) printf x
#endif
static int su_base_port_execute_msgs(su_msg_t *queue);
/**@internal
*
* Initialize a message port.
......@@ -154,19 +157,16 @@ int su_base_port_send(su_port_t *self, su_msg_r rmsg)
}
/** @internal
* Execute the messages in the incoming queue until the queue is empty..
* Execute the messages in the incoming queue.
*
* @param self - pointer to a port object
*
* @retval Number of messages sent
* @retval Number of messages executed
*/
int su_base_port_getmsgs(su_port_t *self)
{
int n = 0;
if (self->sup_head) {
su_msg_f f;
su_msg_t *msg, *queue;
su_msg_t *queue;
su_port_lock(self, "su_base_port_getmsgs");
......@@ -176,19 +176,83 @@ int su_base_port_getmsgs(su_port_t *self)
su_port_unlock(self, "su_base_port_getmsgs");
for (msg = queue; msg; msg = queue) {
queue = msg->sum_next;
msg->sum_next = NULL;
return su_base_port_execute_msgs(queue);
}
return 0;
}
int su_base_port_getmsgs_from_port(su_port_t *self, su_port_t *from)
{
su_msg_t *msg, *selected;
su_msg_t **next = &self->sup_head, **tail= &selected;
if (!*next)
return 0;
su_port_lock(self, "su_base_port_getmsgs_from_port");
while (*next) {
msg = *next;
if (msg->sum_from->sut_port == from) {
*tail = msg, *next = msg->sum_next, tail = &msg->sum_next;
}
else
next = &msg->sum_next;
}
*tail = NULL, self->sup_tail = next;
su_port_unlock(self, "su_base_port_getmsgs_from_port");
return su_base_port_execute_msgs(selected);
}
static
int su_base_port_getmsgs_of_root(su_port_t *self, su_root_t *root)
{
su_msg_t *msg, *selected;
su_msg_t **next = &self->sup_head, **tail= &selected;
if (!*next)
return 0;
su_port_lock(self, "su_base_port_getmsgs_of_root");
while (*next) {
msg = *next;
f = msg->sum_func;
if (f)
f(SU_ROOT_MAGIC(msg->sum_to->sut_root), &msg, msg->sum_data);
su_msg_delivery_report(&msg);
n++;
if (msg->sum_from->sut_root == root ||
msg->sum_to->sut_root == root) {
*tail = msg, *next = msg->sum_next, tail = &msg->sum_next;
}
else
next = &msg->sum_next;
}
*tail = NULL, self->sup_tail = next;
su_port_unlock(self, "su_base_port_getmsgs_of_root");
return su_base_port_execute_msgs(selected);
}
static int su_base_port_execute_msgs(su_msg_t *queue)
{
su_msg_t *msg;
int n = 0;
for (msg = queue; msg; msg = queue) {
su_msg_f f = msg->sum_func;
/* Check for wait events that may have been generated by messages */
self->sup_vtable->su_port_wait_events(self, 0);
queue = msg->sum_next, msg->sum_next = NULL;
if (f)
f(SU_ROOT_MAGIC(msg->sum_to->sut_root), &msg, msg->sum_data);
su_msg_delivery_report(&msg);
n++;
}
return n;
......@@ -199,8 +263,8 @@ int su_base_port_getmsgs(su_port_t *self)
* The function su_port_multishot() enables, disables or queries the
* multishot mode for the port. The multishot mode determines how the events
* are scheduled by port. If multishot mode is enabled, port serves all the
* sockets that have received network events. If it is disables, only first
* socket event is served.
* sockets that have received network events. If it is disabled, the
* socket events are server one at a time.
*
* @param self pointer to port object
* @param multishot multishot mode (0 => disables, 1 => enables, -1 => query)
......@@ -362,7 +426,11 @@ su_duration_t su_base_port_step(su_port_t *self, su_duration_t tout)
tout = SU_WAIT_FOREVER;
if (self->sup_head)
self->sup_vtable->su_port_getmsgs(self);
if (self->sup_vtable->su_port_getmsgs(self)) {
/* Check for wait events that may have been generated by messages */
if (self->sup_vtable->su_port_wait_events(self, 0))
tout = 0;
}
if (self->sup_timers)
su_timer_expire(&self->sup_timers, &tout, su_now());
......@@ -429,3 +497,81 @@ int su_base_port_yield(su_port_t *self)
{
return self->sup_vtable->su_port_wait_events(self, 0);
}
/* ======================================================================
* Clones
*/
#define SU_TASK_COPY(d, s, by) (void)((d)[0]=(s)[0], \
(s)->sut_port?(void)su_port_incref(s->sut_port, #by):(void)0)
static void su_base_port_clone_break(su_root_magic_t *m,
su_msg_r msg,
su_msg_arg_t *arg);
int su_base_port_start(su_root_t *parent,
su_clone_r return_clone,
su_root_magic_t *magic,
su_root_init_f init,
su_root_deinit_f deinit)
{
su_port_t *self = parent->sur_task->sut_port;
su_root_t *child;
child = su_salloc(su_port_home(self), sizeof *child);
if (!child)
return -1;
child->sur_magic = magic;
child->sur_deinit = deinit;
child->sur_threading = parent->sur_threading;
SU_TASK_COPY(child->sur_parent, su_root_task(parent),
su_base_port_clone_start);
SU_TASK_COPY(child->sur_task, child->sur_parent,
su_base_port_clone_start);
child->sur_task->sut_root = child;
if (su_msg_create(return_clone,
child->sur_task, su_root_task(parent),
su_base_port_clone_break,
0) == 0 &&
init(child, magic) == 0)
return 0;
deinit(child, magic);
su_msg_destroy(return_clone);
su_root_destroy(child);
return -1;
}
static void su_base_port_clone_break(su_root_magic_t *m,
su_msg_r msg,
su_msg_arg_t *arg)
{
_su_task_t const *task = su_msg_to(msg);
while (su_base_port_getmsgs_of_root(task->sut_port, task->sut_root))
;
su_root_destroy(task->sut_root);
}
void su_base_port_wait(su_clone_r rclone)
{
su_port_t *self;
su_root_t *root_to_wait;
self = su_msg_from(rclone)->sut_port;
assert(self == su_msg_to(rclone)->sut_port);
root_to_wait = su_msg_to(rclone)->sut_root;
assert(rclone[0]->sum_func == su_base_port_clone_break);
while (su_base_port_getmsgs_of_root(self, root_to_wait))
;
su_root_destroy(root_to_wait);
su_msg_destroy(rclone);
}
......@@ -35,9 +35,6 @@
#include "config.h"
#undef HAVE_EPOLL
#define HAVE_EPOLL 1
#include <stdlib.h>
#include <assert.h>
#include <stdarg.h>
......@@ -137,7 +134,11 @@ su_port_vtable_t const su_epoll_port_vtable[1] =
su_base_port_threadsafe,
su_base_port_yield,
su_epoll_port_wait_events,
su_base_port_getmsgs
su_base_port_getmsgs,
su_epoll_port_create,
su_pthread_port_start,
su_pthread_port_wait,
su_pthread_port_execute,
}};
......
......@@ -140,7 +140,11 @@ su_port_vtable_t const su_poll_port_vtable[1] =
su_base_port_threadsafe,
su_base_port_yield,
su_poll_port_wait_events,
su_base_port_getmsgs
su_base_port_getmsgs,
su_poll_port_create,
su_pthread_port_start,
su_pthread_port_wait,
su_pthread_port_execute,
}};
static void su_poll_port_deinit(void *arg)
......
......@@ -39,6 +39,10 @@
#include "config.h"
#define SU_CLONE_T su_msg_t
#define su_port_s su_virtual_port_s
#include "su_port.h"
#include <string.h>
......@@ -104,3 +108,59 @@ su_port_t *su_port_create(void)
return NULL;
}
/** Create a su-task (su_clone) using its own thread. */
int su_port_start(su_root_t *parent,
su_clone_r return_clone,
su_root_magic_t *magic,
su_root_init_f init,
su_root_deinit_f deinit)
{
su_port_vtable_t const *svp;
if (parent == NULL)
return su_seterrno(EINVAL); /* For now */
svp = parent->sur_task->sut_port->sup_vtable;
if (svp->su_port_start == NULL)
return su_seterrno(EINVAL);
return svp->su_port_start(parent, return_clone, magic, init, deinit);
}
void su_port_wait(su_clone_r rclone)
{
su_port_t *parentport;
assert(su_msg_to(rclone)->sut_port);
parentport = su_msg_from(rclone)->sut_port;
assert(parentport);
assert(parentport->sup_vtable->su_port_wait);
parentport->sup_vtable->su_port_wait(rclone);
}
int su_port_execute(su_task_r const task,
int (*function)(void *), void *arg,
int *return_value)
{
assert(task->sut_port->sup_vtable->su_port_execute);
return task->sut_port->sup_vtable->
su_port_execute(task, function, arg, return_value);
}
#if notyet
int su_port_pause(su_port_t *self)
{
assert(self->sup_vtable->su_port_pause);
return self->sup_vtable->su_port_pause(self);
}
int su_port_resume(su_port_t *self)
{
assert(self->sup_vtable->su_port_resume);
return self->sup_vtable->su_port_resume(self);
}
#endif
......@@ -86,7 +86,7 @@ struct su_root_s {
#define SU_ROOT_MAGIC(r) ((r) ? (r)->sur_magic : NULL)
/** Virtual function table for port */
typedef struct {
typedef struct su_port_vtable {
unsigned su_vtable_size;
void (*su_port_lock)(su_port_t *port, char const *who);
void (*su_port_unlock)(su_port_t *port, char const *who);
......@@ -133,6 +133,17 @@ typedef struct {
/* Extension from >= 1.12.4 */
int (*su_port_wait_events)(su_port_t *port, su_duration_t timeout);
int (*su_port_getmsgs)(su_port_t *port);
/* Extension from >= 1.12.5 - create a cloned port */
su_port_t *(*su_port_create)(void);
int (*su_port_start)(su_root_t *parent,
su_clone_r return_clone,
su_root_magic_t *magic,
su_root_init_f init,
su_root_deinit_f deinit);
void (*su_port_wait)(su_clone_r rclone);
int (*su_port_execute)(su_task_r const task,
int (*function)(void *), void *arg,
int *return_value);
} su_port_vtable_t;
SOFIAPUBFUN su_port_t *su_port_create(void)
......@@ -391,6 +402,25 @@ int su_port_threadsafe(su_port_t *self)
return -1;
}
static inline
int su_port_getmsgs(su_port_t *self)
{
su_virtual_port_t *base = (su_virtual_port_t *)self;
return base->sup_vtable->su_port_getmsgs(self);
}
SOFIAPUBFUN int su_port_start(su_root_t *parent,
su_clone_r return_clone,
su_root_magic_t *magic,
su_root_init_f init,
su_root_deinit_f deinit);
SOFIAPUBFUN void su_port_wait(su_clone_r rclone);
SOFIAPUBFUN int su_port_execute(su_task_r const task,
int (*function)(void *), void *arg,
int *return_value);
/* ---------------------------------------------------------------------- */
/** Base port object.
......@@ -438,6 +468,8 @@ SOFIAPUBFUN struct _GSource *su_base_port_gsource(su_port_t *self);
SOFIAPUBFUN su_socket_t su_base_port_mbox(su_port_t *self);
SOFIAPUBFUN int su_base_port_send(su_port_t *self, su_msg_r rmsg);
SOFIAPUBFUN int su_base_port_getmsgs(su_port_t *self);
SOFIAPUBFUN int su_base_port_getmsgs_from_port(su_port_t *self,
su_port_t *from);
SOFIAPUBFUN void su_base_port_run(su_port_t *self);
SOFIAPUBFUN void su_base_port_break(su_port_t *self);
......@@ -457,6 +489,13 @@ SOFIAPUBFUN int su_base_port_multishot(su_port_t *self, int multishot);
SOFIAPUBFUN int su_base_port_threadsafe(su_port_t *self);
SOFIAPUBFUN int su_base_port_yield(su_port_t *self);
SOFIAPUBFUN int su_base_port_start(su_root_t *parent,
su_clone_r return_clone,
su_root_magic_t *magic,
su_root_init_f init,
su_root_deinit_f deinit);
SOFIAPUBFUN void su_base_port_wait(su_clone_r rclone);
/* ---------------------------------------------------------------------- */
#if SU_HAVE_PTHREADS
......@@ -468,8 +507,16 @@ SOFIAPUBFUN int su_base_port_yield(su_port_t *self);
/** Pthread port object */
typedef struct su_pthread_port_s {
su_base_port_t sup_base[1];
struct su_pthread_port_waiting_parent
*sup_waiting_parent;
pthread_t sup_tid;
int sup_mbox_index;
pthread_mutex_t sup_runlock[1];
#if 0
pthread_cond_t sup_resume[1];
short sup_paused; /**< True if thread is paused */
#endif
short sup_thread; /**< True if thread is active */
short sup_mbox_index;
su_socket_t sup_mbox[SU_MBOX_SIZE];
} su_pthread_port_t;
......@@ -485,6 +532,27 @@ SOFIAPUBFUN int su_pthread_port_own_thread(su_port_t const *self);
SOFIAPUBFUN int su_pthread_port_send(su_port_t *self, su_msg_r rmsg);
SOFIAPUBFUN su_port_t *su_pthread_port_create(void);
SOFIAPUBFUN int su_pthread_port_start(su_root_t *parent,
su_clone_r return_clone,
su_root_magic_t *magic,
su_root_init_f init,
su_root_deinit_f deinit);
SOFIAPUBFUN void su_pthread_port_wait(su_clone_r rclone);
SOFIAPUBFUN int su_pthread_port_execute(su_task_r const task,
int (*function)(void *), void *arg,
int *return_value);
#if 0
SOFIAPUBFUN int su_pthread_port_pause(su_port_t *self);
SOFIAPUBFUN int su_pthread_port_resume(su_port_t *self);
#endif
SOFIAPUBFUN int su_pthread_port_execute(su_task_r const task,
int (*function)(void *), void *arg,
int *return_value);
#else
typedef su_base_port_t su_pthread_port_t;
......@@ -495,6 +563,9 @@ typedef su_base_port_t su_pthread_port_t;
#define su_pthread_port_unlock su_base_port_unlock
#define su_pthread_port_own_thread su_base_port_own_thread
#define su_pthread_port_send su_base_port_send
#define su_pthread_port_start su_base_port_start
#define su_pthread_port_wait su_base_port_wait
#define su_pthread_port_execute su_base_port_execute
#endif
......
......@@ -46,6 +46,7 @@
#include <errno.h>
#define su_pthread_port_s su_port_s
#define SU_CLONE_T su_msg_t
#include "sofia-sip/su.h"
#include "su_port.h"
......@@ -57,6 +58,9 @@
#define PORT_LOCK_DEBUG(x) printf x
#endif
#define SU_TASK_COPY(d, s, by) (void)((d)[0]=(s)[0], \
(s)->sut_port?(void)su_port_incref(s->sut_port, #by):(void)0)
#if HAVE_SOCKETPAIR
#define SU_MBOX_SEND 1
#else
......@@ -94,6 +98,12 @@ int su_pthread_port_init(su_port_t *self, su_port_vtable_t const *vtable)
self->sup_tid = pthread_self();
#if 0
pthread_mutex_init(self->sup_runlock, NULL);
pthread_mutex_lock(self->sup_runlock);
pthread_cond_init(self->sup_resume, NULL);
#endif
#if HAVE_SOCKETPAIR
#if defined(AF_LOCAL)
af = AF_LOCAL;
......@@ -170,6 +180,7 @@ int su_pthread_port_init(su_port_t *self, su_port_vtable_t const *vtable)
return -1;
}
/** @internal Deinit a base implementation of port. */
void su_pthread_port_deinit(su_port_t *self)
{
......@@ -186,9 +197,15 @@ void su_pthread_port_deinit(su_port_t *self)
su_close(self->sup_mbox[1]); self->sup_mbox[1] = INVALID_SOCKET;
#endif
#if 0
pthread_mutex_destroy(self->sup_runlock);
pthread_cond_destroy(self->sup_resume);
#endif
su_base_port_deinit(self);
}
void su_pthread_port_lock(su_port_t *self, char const *who)
{
PORT_LOCK_DEBUG(("%p at %s locking(%p)...",
......@@ -242,3 +259,433 @@ int su_pthread_port_own_thread(su_port_t const *self)
return self == NULL ||
pthread_equal(self->sup_tid, pthread_self());
}
/* -- Clones ------------------------------------------------------------ */
struct clone_args
{
su_root_t *parent;
su_root_magic_t *magic;
su_root_init_f init;
su_root_deinit_f deinit;
pthread_mutex_t mutex[1];
pthread_cond_t cv[1];
int retval;
su_msg_r clone;
};
static void *su_pthread_port_clone_main(void *varg);
static void su_pthread_port_return_to_parent(struct clone_args *arg,
int retval);
static su_msg_function su_pthread_port_clone_break;
/* Structure used to synchronize parent and clone in su_clone_wait() */
struct su_pthread_port_waiting_parent {
pthread_mutex_t deinit[1];
pthread_mutex_t mutex[1];
pthread_cond_t cv[1];
int waiting;
};
/** Start a clone task by a pthread.
*
* @internal
*
* Allocates and initializes a sub-task with its own thread. The sub-task is
* represented by clone handle to the rest of the application. The function
* su_clone_start() returns the clone handle in @a return_clone. The clone
* handle is used to communicate with the newly created clone task using
* messages.
*
* A new #su_root_t object is created for the sub-task with the @a magic as
* the root context pointer. Because the sub-task may or may not have its
* own thread, all its activity must be scheduled via this root object. In
* other words, the sub-task can be schedule
* -# I/O events with su_root_register()
* -# timers with su_timer_set(), su_timer_set_at() or su_timer_run()
* -# messages with su_msg_send().
*
* Messages can also be used to pass information between tasks or threads.
*
* After the new thread has been launched, the initialization routine is
* executed by the newly created thread. The calling thread blocks until
* the initialization routine completes. If the initialization routine
* returns #su_success (0), the sub-task is considered to be created
* successfully. After the successful initialization, the sub-task continues
* to execeute the function su_root_run().
*
* If the initalization function @a init fails, the sub-task (either the
* newly created thread or the current thread executing the su_clone_start()
* function) calls the deinitialization function, and su_clone_start()
* returns NULL.
*
* @param parent root to be cloned (may be NULL if multi-threaded)
* @param return_clone reference to a clone [OUT]
* @param magic pointer to user data
* @param init initialization function
* @param deinit deinitialization function
*
* @return 0 if successfull, -1 upon an error.
*
* @sa su_root_threading(), su_clone_task(), su_clone_stop(), su_clone_wait(),
* su_clone_forget().
*
*/
int su_pthread_port_start(su_root_t *parent,
su_clone_r return_clone,
su_root_magic_t *magic,
su_root_init_f init,
su_root_deinit_f deinit)
{
struct clone_args arg = {
/* parent: */ NULL,
/* magic: */ NULL,
/* init: */ NULL,
/* deinit: */ NULL,
/* mutex: */ { PTHREAD_MUTEX_INITIALIZER },
/* cv: */ { PTHREAD_COND_INITIALIZER },
/* retval: */ -1,
/* clone: */ SU_MSG_R_INIT,
};
int thread_created = 0;
pthread_t tid;
if (parent && !parent->sur_threading)
return su_base_port_start(parent, return_clone, magic, init, deinit);
arg.parent = parent;
arg.magic = magic;
arg.init = init;
arg.deinit = deinit;
pthread_mutex_lock(arg.mutex);
if (pthread_create(&tid, NULL, su_pthread_port_clone_main, &arg) == 0) {
pthread_cond_wait(arg.cv, arg.mutex);
thread_created = 1;
}
pthread_mutex_unlock(arg.mutex);
pthread_mutex_destroy(arg.mutex);
pthread_cond_destroy(arg.cv);
if (arg.retval != 0) {
if (thread_created)
pthread_join(tid, NULL);
return -1;
}
*return_clone = *arg.clone;
return 0;
}
/** Main function for clone thread.
*
* @internal
*/
static void *su_pthread_port_clone_main(void *varg)
{
struct clone_args *arg = (struct clone_args *)varg;
su_task_r task;
int zap;
#if SU_HAVE_WINSOCK
su_init();
#endif
task->sut_port =
arg->parent->sur_task->sut_port->sup_base->sup_vtable->
su_port_create();
if (task->sut_port) {
task->sut_port->sup_thread = 1;
task->sut_root = su_salloc(su_port_home(task->sut_port),
sizeof *task->sut_root);
if (task->sut_root) {
task->sut_root->sur_threading = 1; /* By default */
SU_TASK_COPY(task->sut_root->sur_parent, su_root_task(arg->parent),
su_pthread_port_clone_main);
SU_TASK_COPY(task->sut_root->sur_task, task,
su_pthread_port_clone_main);
if (su_msg_create(arg->clone,
task,
su_root_task(arg->parent),
su_pthread_port_clone_break,
0) == 0) {
task->sut_root->sur_magic = arg->magic;
task->sut_root->sur_deinit = arg->deinit;
if (arg->init(task->sut_root, arg->magic) == 0) {
su_pthread_port_return_to_parent(arg, 0), arg = NULL;
su_root_run(task->sut_root); /* Do the work */