Commit ec53c026 authored by Pekka Pessi's avatar Pekka Pessi

su_source.c: refactored, provide latest virtual methods used by clones etc.

darcs-hash:20070202155136-65a35-f03737b64b394b58d46eda119a1461d12b7a2aa8.gz
parent 9cc93657
......@@ -40,12 +40,6 @@
#include "config.h"
#endif
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <limits.h>
#include <glib.h>
#define SU_PORT_IMPLEMENTATION 1
......@@ -61,16 +55,26 @@
#include "su_port.h"
#include "sofia-sip/su_alloc.h"
static su_port_t *su_source_create(void) __attribute__((__malloc__));
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <limits.h>
#if 1
#define PORT_LOCK_DEBUG(x) ((void)0)
#else
#define PORT_LOCK_DEBUG(x) printf x
#endif
static su_port_t *su_source_port_create(void) __attribute__((__malloc__));
static gboolean su_source_prepare(GSource *gs, gint *return_tout);
static gboolean su_source_check(GSource *gs);
static gboolean su_source_dispatch(GSource *gs,
GSourceFunc callback,
gpointer user_data);
GSourceFunc callback,
gpointer user_data);
static void su_source_finalize(GSource *source);
static int su_source_getmsgs(su_port_t *self);
static
GSourceFuncs su_source_funcs[1] = {{
su_source_prepare,
......@@ -110,21 +114,22 @@ static void su_source_break(su_port_t *self);
static su_duration_t su_source_step(su_port_t *self, su_duration_t tout);
static int su_source_own_thread(su_port_t const *port);
static int su_source_add_prepoll(su_port_t *port,
su_root_t *root,
su_prepoll_f *,
su_prepoll_magic_t *);
su_root_t *root,
su_prepoll_f *,
su_prepoll_magic_t *);
static int su_source_remove_prepoll(su_port_t *port,
su_root_t *root);
static su_timer_t **su_source_timers(su_port_t *port);
static int su_source_multishot(su_port_t *self, int multishot);
static int su_source_threadsafe(su_port_t *port);
static
su_port_vtable_t const su_source_vtable[1] =
static char const *su_source_name(su_port_t const *self);
static
su_port_vtable_t const su_source_port_vtable[1] =
{{
/* su_vtable_size: */ sizeof su_source_vtable,
/* su_vtable_size: */ sizeof su_source_port_vtable,
su_source_lock,
su_source_unlock,
su_source_incref,
su_source_decref,
......@@ -142,32 +147,38 @@ su_port_vtable_t const su_source_vtable[1] =
su_source_own_thread,
su_source_add_prepoll,
su_source_remove_prepoll,
su_source_timers,
su_base_port_timers,
su_source_multishot,
su_source_threadsafe
su_base_port_threadsafe,
/*su_source_yield*/ NULL,
/*su_source_wait_events*/ NULL,
su_base_port_getmsgs,
su_base_port_getmsgs_from,
su_source_name,
su_base_port_start_shared,
su_base_port_wait,
NULL,
}};
static char const *su_source_name(su_port_t const *self)
{
return "GSource";
}
/**
* Port is a per-thread reactor.
*
* Multiple root objects executed by single thread share a su_port_t object.
*/
struct su_source_s {
su_home_t sup_home[1];
su_port_vtable_t const *sup_vtable;
su_base_port_t sup_base[1];
GThread *sup_tid;
GStaticMutex sup_mutex[1];
GStaticRWLock sup_ref[1];
GSource *sup_source;
GMainLoop *sup_main_loop;
GSource *sup_source; /**< Backpointer to source */
GMainLoop *sup_main_loop; /**< Reference to mainloop while running */
/* Message list - this is protected by lock */
su_msg_t *sup_head;
su_msg_t **sup_tail;
/* Waits */
unsigned sup_registers; /** Counter incremented by
su_port_register() or
......@@ -181,9 +192,6 @@ struct su_source_s {
su_wakeup_f *sup_wait_cbs;
su_wakeup_arg_t**sup_wait_args;
su_root_t **sup_wait_roots;
/* Timer list */
su_timer_t *sup_timers;
};
typedef struct _SuSource
......@@ -198,10 +206,6 @@ typedef struct _SuSource
#define SU_SOURCE_INCREF(p, f) (g_source_ref(p->sup_source))
#define SU_SOURCE_DECREF(p, f) (g_source_unref(p->sup_source))
#define SU_SOURCE_INITLOCK(p) (g_static_mutex_init((p)->sup_mutex))
#define SU_SOURCE_LOCK(p, f) (g_static_mutex_lock((p)->sup_mutex))
#define SU_SOURCE_UNLOCK(p, f) (g_static_mutex_unlock((p)->sup_mutex))
#else
/* Debugging versions */
......@@ -209,15 +213,6 @@ typedef struct _SuSource
#define SU_SOURCE_DECREF(p, f) do { printf("decref(%p) by %s\n", (p), f), \
g_source_unref(p->sup_source); } while(0)
#define SU_SOURCE_INITLOCK(p) \
(g_static_mutex_init((p)->sup_mutex), printf("init_lock(%p)\n", p))
#define SU_SOURCE_LOCK(p, f) \
(printf("%ld at %s locking(%p)...", g_thread_self(), f, p), g_static_mutex_lock((p)->sup_mutex), printf(" ...%ld at %s locked(%p)...", g_thread_self(), f, p))
#define SU_SOURCE_UNLOCK(p, f) \
(g_static_mutex_unlock((p)->sup_mutex), printf(" ...%ld at %s unlocked(%p)\n", g_thread_self(), f, p))
#endif
#if HAVE_FUNC
......@@ -233,7 +228,7 @@ typedef struct _SuSource
/** Create a root that uses GSource as reactor */
su_root_t *su_glib_root_create(su_root_magic_t *magic)
{
return su_root_create_with_port(magic, su_source_create());
return su_root_create_with_port(magic, su_source_port_create());
}
/** Deprecated */
......@@ -259,41 +254,20 @@ GSource *su_glib_root_gsource(su_root_t *root)
/*=============== Private function definitions ===============*/
/**@internal
*
* Allocates and initializes a reactor and message port object.
*
* @return
* If successful a pointer to the new message port is returned, otherwise
* NULL is returned.
*/
su_port_t *su_source_create(void)
/** Initialize source port */
int su_source_port_init(su_port_t *self,
GSource *gs,
su_port_vtable_t const *vtable)
{
SuSource *ss;
SU_DEBUG_9(("su_source_create() called\n"));
ss = (SuSource *)g_source_new(su_source_funcs, (sizeof *ss));
if (ss) {
su_port_t *self = ss->ss_port;
self->sup_vtable = su_source_vtable;
self->sup_source = ss->ss_source;
SU_SOURCE_INITLOCK(self);
self->sup_tail = &self->sup_head;
self->sup_tid = g_thread_self();
if (su_base_port_init(self, vtable) < 0)
return -1;
SU_DEBUG_9(("su_source_with_main_context() returns %p\n", self));
self->sup_source = gs;
self->sup_tid = g_thread_self();
return self;
} else {
su_perror("su_source_with_main_context(): su_home_clone");
SU_DEBUG_9(("su_source_with_main_context() fails\n"));
return NULL;
}
g_static_mutex_init(self->sup_mutex);
return 0;
}
/** @internal Destroy a port. */
......@@ -307,23 +281,70 @@ void su_source_finalize(GSource *gs)
SU_DEBUG_9(("su_source_finalize() called\n"));
if (self->sup_waits)
free(self->sup_waits), self->sup_waits = NULL;
if (self->sup_wait_cbs)
free(self->sup_wait_cbs), self->sup_wait_cbs = NULL;
if (self->sup_wait_args)
free(self->sup_wait_args), self->sup_wait_args = NULL;
if (self->sup_wait_roots)
free(self->sup_wait_roots), self->sup_wait_roots = NULL;
if (self->sup_indices)
free(self->sup_indices), self->sup_indices = NULL;
su_home_deinit(self->sup_home);
g_static_mutex_free(self->sup_mutex);
su_base_port_deinit(self);
su_home_deinit(self->sup_base->sup_home);
}
void su_source_port_lock(su_port_t *self, char const *who)
{
PORT_LOCK_DEBUG(("%p at %s locking(%p)...",
(void *)g_thread_self(), who, self));
g_static_mutex_lock(self->sup_mutex);
PORT_LOCK_DEBUG((" ...%p at %s locked(%p)...",
(void *)g_thread_self(), who, self));
}
void su_source_port_unlock(su_port_t *self, char const *who)
{
g_static_mutex_unlock(self->sup_mutex);
PORT_LOCK_DEBUG((" ...%p at %s unlocked(%p)\n",
(void *)g_thread_self(), who, self));
}
/** @internal Send a message to the port. */
int su_source_send(su_port_t *self, su_msg_r rmsg)
{
int wakeup = su_base_port_send(self, rmsg);
GMainContext *gmc;
if (wakeup < 0)
return -1;
if (wakeup == 0)
return 0;
gmc = g_source_get_context(self->sup_source);
if (gmc)
g_main_context_wakeup(gmc);
return 0;
}
/** @internal
* Checks if the calling thread owns the port object.
*
* @param self pointer to a port object
*
* @retval true (nonzero) if the calling thread owns the port,
* @retval false (zero) otherwise.
*/
int su_source_own_thread(su_port_t const *self)
{
return self == NULL || SU_SOURCE_OWN_THREAD(self);
}
/* -- Registering and unregistering ------------------------------------- */
/* Seconds from 1.1.1900 to 1.1.1970 */
#define NTP_EPOCH 2208988800UL
/** Prepare to wait - calculate time to next timer */
static
gboolean su_source_prepare(GSource *gs, gint *return_tout)
{
......@@ -332,12 +353,12 @@ gboolean su_source_prepare(GSource *gs, gint *return_tout)
enter;
if (self->sup_head) {
if (self->sup_base->sup_head) {
*return_tout = 0;
return TRUE;
}
if (self->sup_timers) {
if (self->sup_base->sup_timers) {
su_time_t now;
GTimeVal gtimeval;
su_duration_t tout;
......@@ -346,7 +367,7 @@ gboolean su_source_prepare(GSource *gs, gint *return_tout)
now.tv_sec = gtimeval.tv_sec + 2208988800UL;
now.tv_usec = gtimeval.tv_usec;
tout = su_timer_next_expires(self->sup_timers, now);
tout = su_timer_next_expires(self->sup_base->sup_timers, now);
*return_tout = (tout < 0 || tout > (su_duration_t)G_MAXINT)?
-1 : (gint)tout;
......@@ -389,10 +410,10 @@ gboolean su_source_dispatch(GSource *gs,
enter;
if (self->sup_head)
su_source_getmsgs(self);
if (self->sup_base->sup_head)
su_base_port_getmsgs(self);
if (self->sup_timers) {
if (self->sup_base->sup_timers) {
su_time_t now;
GTimeVal gtimeval;
su_duration_t tout;
......@@ -405,7 +426,7 @@ gboolean su_source_dispatch(GSource *gs,
now.tv_sec = gtimeval.tv_sec + 2208988800UL;
now.tv_usec = gtimeval.tv_usec;
timers = su_timer_expire(&self->sup_timers, &tout, now);
timers = su_timer_expire(&self->sup_base->sup_timers, &tout, now);
}
#if SU_HAVE_POLL
......@@ -437,12 +458,20 @@ gboolean su_source_dispatch(GSource *gs,
static void su_source_lock(su_port_t *self, char const *who)
{
SU_SOURCE_LOCK(self, who);
PORT_LOCK_DEBUG(("%p at %s locking(%p)...",
(void *)g_thread_self(), who, self));
g_static_mutex_lock(self->sup_mutex);
PORT_LOCK_DEBUG((" ...%p at %s locked(%p)...",
(void *)g_thread_self(), who, self));
}
static void su_source_unlock(su_port_t *self, char const *who)
{
SU_SOURCE_UNLOCK(self, who);
g_static_mutex_unlock(self->sup_mutex);
PORT_LOCK_DEBUG((" ...%p at %s unlocked(%p)\n",
(void *)g_thread_self(), who, self));
}
static void su_source_incref(su_port_t *self, char const *who)
......@@ -461,81 +490,6 @@ GSource *su_source_gsource(su_port_t *self)
return self->sup_source;
}
/** @internal Send a message to the port. */
int su_source_send(su_port_t *self, su_msg_r rmsg)
{
enter;
if (self) {
su_msg_t *msg;
GMainContext *gmc;
SU_SOURCE_LOCK(self, "su_source_send");
msg = rmsg[0]; rmsg[0] = NULL;
*self->sup_tail = msg;
self->sup_tail = &msg->sum_next;
SU_SOURCE_UNLOCK(self, "su_source_send");
gmc = g_source_get_context(self->sup_source);
if (gmc)
g_main_context_wakeup(gmc);
return 0;
}
else {
su_msg_destroy(rmsg);
return -1;
}
}
/** @internal
* Execute the messages in the incoming queue until the queue is empty..
*
* @param self - pointer to a port object
*
* @retval 0 if there was a signal to handle,
* @retval -1 otherwise.
*/
static
int su_source_getmsgs(su_port_t *self)
{
enter;
if (self && self->sup_head) {
su_root_t *root;
su_msg_f f;
SU_SOURCE_INCREF(self, "su_source_getmsgs");
SU_SOURCE_LOCK(self, "su_source_getmsgs");
while (self->sup_head) {
su_msg_t *msg = self->sup_head;
self->sup_head = msg->sum_next;
if (!self->sup_head) {
assert(self->sup_tail == &msg->sum_next);
self->sup_tail = &self->sup_head;
}
root = msg->sum_to->sut_root;
f = msg->sum_func;
SU_SOURCE_UNLOCK(self, "su_source_getmsgs");
if (f)
f(su_root_magic(root), &msg, msg->sum_data);
su_msg_delivery_report(&msg);
SU_SOURCE_LOCK(self, "su_source_getmsgs");
}
SU_SOURCE_UNLOCK(self, "su_source_getmsgs");
SU_SOURCE_DECREF(self, "su_source_getmsgs");
return 0;
}
else
return -1;
}
/** @internal
*
* Register a @c su_wait_t object. The wait object, a callback function and
......@@ -927,13 +881,6 @@ int su_source_multishot(su_port_t *self, int multishot)
return (errno = EINVAL), -1;
}
/** @internal Enable threadsafe operation. */
static
int su_source_threadsafe(su_port_t *port)
{
return su_home_threadsafe(port->sup_home);
}
/** @internal Main loop.
*
......@@ -1032,18 +979,19 @@ su_duration_t su_source_step(su_port_t *self, su_duration_t tout)
return 0;
}
static int su_source_add_prepoll(su_port_t *port,
su_root_t *root,
su_prepoll_f *prepoll,
su_prepoll_magic_t *magic)
{
/* We could call prepoll in su_source_prepare()?? */
return -1;
}
/** @internal
* Checks if the calling thread owns the port object.
*
* @param self pointer to a port object
*
* @retval true (nonzero) if the calling thread owns the port,
* @retval false (zero) otherwise.
*/
int su_source_own_thread(su_port_t const *self)
static int su_source_remove_prepoll(su_port_t *port,
su_root_t *root)
{
return self == NULL || SU_SOURCE_OWN_THREAD(self);
return -1;
}
#if 0
......@@ -1074,52 +1022,34 @@ void su_source_dump(su_port_t const *self, FILE *f)
#endif
/* =========================================================================
* Pre-poll() callback
/**@internal
*
* Allocates and initializes a reactor and message port object.
*
* @return
* If successful a pointer to the new message port is returned, otherwise
* NULL is returned.
*/
int su_source_add_prepoll(su_port_t *port,
su_root_t *root,
su_prepoll_f *callback,
su_prepoll_magic_t *magic)
static su_port_t *su_source_port_create(void)
{
#if 0
if (port->sup_prepoll)
return -1;
SuSource *ss;
su_port_t *self = NULL;
port->sup_prepoll = callback;
port->sup_pp_magic = magic;
port->sup_pp_root = root;
SU_DEBUG_9(("su_source_port_create() called\n"));
return 0;
#else
return -1;
#endif
}
ss = (SuSource *)g_source_new(su_source_funcs, (sizeof *ss));
int su_source_remove_prepoll(su_port_t *port,
su_root_t *root)
{
#if 0
if (port->sup_pp_root != root)
return -1;
if (ss) {
self = ss->ss_port;
if (su_source_port_init(self, ss->ss_source, su_source_port_vtable) < 0)
g_source_unref(ss->ss_source), self = NULL;
} else {
su_perror("su_source_port_create(): g_source_new");
}
port->sup_prepoll = NULL;
port->sup_pp_magic = NULL;
port->sup_pp_root = NULL;
SU_DEBUG_1(("su_source_port_create() returns %p\n", self));
return 0;
#else
return -1;
#endif
return self;
}
/* =========================================================================
* Timers
*/
static
su_timer_t **su_source_timers(su_port_t *self)
{
return &self->sup_timers;
}
/* No su_source_port_start */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment