Commit 251f445e authored by Martti Mela's avatar Martti Mela
Browse files

restructured RunLoop functionality in su_osx_runloop.c

darcs-hash:20060928154404-1b897-43e2fbcc537846426f199b52016e8f1d0e7fbabe.gz
parent 8860e97b
......@@ -68,7 +68,17 @@
#include "sofia-sip/su_alloc.h"
#if SU_HAVE_PTHREADS
/* Pthread implementation */
#include <pthread.h>
#define SU_HAVE_MBOX 1
#else
#define SU_HAVE_MBOX 0
#endif
#if HAVE_SOCKETPAIR
#define MBOX_SEND 1
#else
#define MBOX_SEND 0
#endif
static su_port_t *su_osx_runloop_create(void) __attribute__((__malloc__));
......@@ -84,27 +94,27 @@ static void su_source_finalize(GSource *source);
static int su_port_osx_getmsgs(su_port_t *self);
/* Callback for CFSocket */
static void su_port_osx_socket_cb(CFSocketRef s,
CFSocketCallBackType callbackType,
CFDataRef address,
const void *data,
void *info);
#if 0
/* Callbacks of Source Context */
/* static const void *su_port_osx_retain(const void *info); */
static void su_port_osx_release(const void *info);
static const void *su_port_osx_copy_description(const void *info);
static CFHashCode su_port_osx_hash(const void *info);
//static const void *su_port_osx_copy_description(const void *info);
//static CFHashCode su_port_osx_hash(const void *info);
static void su_port_osx_schedule(void *info, CFRunLoopRef rl, CFStringRef mode);
static void su_port_osx_cancel(void *info, CFRunLoopRef rl, CFStringRef mode);
#endif
static void su_port_osx_perform(const void *info);
#if 0
static
GSourceFuncs su_port_osx_funcs[1] = {{
su_port_osx_prepare,
su_port_osx_check,
su_port_osx_dispatch,
su_port_osx_finalize,
NULL,
NULL
}};
#else
static
CFRunLoopSourceContext su_port_osx_funcs[1] = {{
0, /* type */
NULL, /* context */
......@@ -119,13 +129,12 @@ CFRunLoopSourceContext su_port_osx_funcs[1] = {{
}};
#endif
static void su_port_osx_lock(su_port_t *self, char const *who);
static void su_port_osx_unlock(su_port_t *self, char const *who);
static void su_port_osx_incref(su_port_t *self, char const *who);
static void su_port_osx_decref(su_port_t *self, int blocking, char const *who);
static CFRunLoopSourceRef su_port_osx_runloop_source(su_port_t *port);
static CFRunLoopRef su_port_osx_runloop(su_port_t *port);
static int su_port_osx_send(su_port_t *self, su_msg_r rmsg);
......@@ -168,7 +177,7 @@ su_port_vtable_t const su_port_osx_vtable[1] =
su_port_osx_incref,
su_port_osx_decref,
su_port_osx_runloop_source, /* XXX - was: gsource, */
su_port_osx_runloop, /* XXX - was: gsource, */
su_port_osx_send,
su_port_osx_register,
......@@ -203,6 +212,11 @@ struct su_port_osx_s {
pthread_rwlock_t sup_ref[1];
#endif
#if SU_HAVE_MBOX
su_socket_t sup_mbox[MBOX_SEND + 1];
su_wait_t sup_mbox_wait;
#endif
CFRunLoopSourceRef sup_source;
CFRunLoopRef sup_main_loop;
......@@ -215,14 +229,20 @@ struct su_port_osx_s {
su_port_register() or
su_port_unregister()
*/
unsigned sup_n_waits;
unsigned sup_size_waits;
unsigned sup_max_index;
unsigned *sup_indices;
su_wait_t *sup_waits;
su_wakeup_f *sup_wait_cbs;
su_wakeup_arg_t**sup_wait_args;
su_root_t **sup_wait_roots;
unsigned sup_n_waits;
unsigned sup_size_waits;
int sup_pri_offset; /**< Offset to prioritized waits */
unsigned sup_max_index;
int sup_free_index; /**< Number of first free index */
int *sup_indices;
su_wait_t *sup_waits;
CFRunLoopSourceRef *sup_sources;
int *sup_reverses; /** Reverse index */
su_wakeup_f *sup_wait_cbs;
su_wakeup_arg_t **sup_wait_args;
su_root_t **sup_wait_roots;
/* Timer list */
su_timer_t *sup_timers;
......@@ -230,7 +250,6 @@ struct su_port_osx_s {
typedef struct _SuSource
{
CFRunLoopSourceRef ss_source[1];
su_port_t ss_port[1];
} SuSource;
......@@ -263,6 +282,12 @@ typedef struct _SuSource
#define enter (void)0
#endif
#if SU_HAVE_MBOX
static int su_port_osx_wakeup(su_root_magic_t *magic,
su_wait_t *w,
su_wakeup_arg_t *arg);
#endif
static void su_port_osx_destroy(su_port_t *self);
/** Create a root that uses GSource as reactor */
......@@ -281,36 +306,105 @@ su_root_t *su_root_osx_runloop_create(su_root_magic_t *magic)
*/
su_port_t *su_osx_runloop_create(void)
{
SuSource *ss;
su_port_t *self = su_home_clone(NULL, sizeof(*self));
SU_DEBUG_9(("su_osx_runloop_create() called\n"));
ss = (SuSource *) CFRunLoopSourceCreate(kCFAllocatorDefault, 0, su_port_osx_funcs);
if (ss) {
su_port_t *self = ss->ss_port;
if (self) {
#if SU_HAVE_MBOX
int af;
su_socket_t mb = INVALID_SOCKET;
char const *why;
#endif
self->sup_vtable = su_port_osx_vtable;
self->sup_source = ss->ss_source;
SU_PORT_OSX_INITREF(self);
SU_PORT_OSX_INITLOCK(self);
self->sup_tail = &self->sup_head;
self->sup_free_index = -1;
//self->sup_multishot = (SU_ENABLE_MULTISHOT_POLL) != 0;
#if SU_HAVE_PTHREADS
self->sup_tid = pthread_self();
#endif
SU_DEBUG_9(("su_port_osx_with_main_context() returns %p\n", self));
#if SU_HAVE_MBOX
#if HAVE_SOCKETPAIR
#if defined(AF_LOCAL)
af = AF_LOCAL;
#else
af = AF_UNIX;
#endif
if (socketpair(af, SOCK_STREAM, 0, self->sup_mbox) == -1) {
why = "su_port_init: socketpair"; goto error;
}
mb = self->sup_mbox[0];
su_setblocking(self->sup_mbox[0], 0);
su_setblocking(self->sup_mbox[1], 0);
#else
{
struct sockaddr_in sin = { sizeof(struct sockaddr_in), 0 };
socklen_t sinsize = sizeof sin;
struct sockaddr *sa = (struct sockaddr *)&sin;
af = PF_INET;
self->sup_mbox[0] = mb = su_socket(af, SOCK_DGRAM, IPPROTO_UDP);
if (mb == INVALID_SOCKET) {
why = "su_port_init: socket"; goto error;
}
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(INADDR_LOOPBACK); /* 127.1 */
/* Get a port for us */
if (bind(mb, sa, sizeof sin) == -1) {
why = "su_port_init: bind"; goto error;
}
if (getsockname(mb, sa, &sinsize) == -1) {
why = "su_port_init: getsockname"; goto error;
}
if (connect(mb, sa, sinsize) == -1) {
why = "su_port_init: connect"; goto error;
}
}
#endif
if (su_wait_create(&self->sup_mbox_wait, mb, SU_WAIT_IN) == -1) {
why = "su_port_init: su_wait_create"; goto error;
}
if (su_port_osx_register(self, NULL, &self->sup_mbox_wait, su_port_osx_wakeup,
(su_wakeup_arg_t *)self->sup_mbox, 0)
== -1) {
why = "su_port_create: su_port_register"; goto error;
}
SU_DEBUG_9(("su_port_create() returns %p\n", self));
return self;
} else {
su_perror("su_port_osx_with_main_context(): su_home_clone");
SU_DEBUG_9(("su_port_osx_with_main_context() fails\n"));
return NULL;
error:
su_perror(why);
su_port_osx_destroy(self), self = NULL;
#endif
}
SU_DEBUG_9(("su_port_create() returns %p\n", self));
return self;
}
#if 0
/** @internal Destroy a port. */
/* XXX -- static void su_port_osx_finalize(GSource *gs) */
static void su_port_osx_release(const void *info)
......@@ -325,21 +419,27 @@ static void su_port_osx_release(const void *info)
if (self->sup_waits)
free(self->sup_waits), self->sup_waits = NULL;
if (self->sup_sources)
free(self->sup_sources), self->sup_sources = NULL;
if (self->sup_wait_cbs)
free(self->sup_wait_cbs), self->sup_wait_cbs = NULL;
if (self->sup_wait_args)
free(self->sup_wait_args), self->sup_wait_args = NULL;
if (self->sup_wait_roots)
free(self->sup_wait_roots), self->sup_wait_roots = NULL;
if (self->sup_reverses)
free(self->sup_reverses), self->sup_reverses = NULL;
if (self->sup_indices)
free(self->sup_indices), self->sup_indices = NULL;
su_home_deinit(self->sup_home);
}
#endif
/* Seconds from 1.1.1900 to 1.1.1970 */
#define NTP_EPOCH 2208988800UL
#if 0
/* gboolean su_port_osx_prepare(void *info, int *return_tout); */
static
void su_port_osx_schedule(void *info, CFRunLoopRef rl, CFStringRef mode)
......@@ -392,7 +492,7 @@ void su_port_osx_cancel(void *info, CFRunLoopRef rl, CFStringRef mode)
return;
}
#endif
#if 0
static
......@@ -423,10 +523,21 @@ gboolean su_port_osx_check(GSource *gs)
#if 0
static
gboolean su_port_osx_dispatch(GSource *gs,
GSourceFunc callback,
gpointer user_data)
GSourceFunc callback,
gpointer user_data);
#endif
static
void su_port_osx_socket_cb(CFSocketRef s,
CFSocketCallBackType callbackType,
CFDataRef address,
const void *data,
void *info)
{
enter;
su_port_osx_perform(info);
}
static void su_port_osx_perform(const void *info)
{
SuSource *ss = (SuSource *) info; /* gs; */
......@@ -453,7 +564,6 @@ static void su_port_osx_perform(const void *info)
timers = su_timer_expire(&self->sup_timers, &tout, now);
}
#if 0 /* XXX -- mela: not needed? */
#if SU_HAVE_POLL
{
su_root_t *root;
......@@ -473,7 +583,6 @@ static void su_port_osx_perform(const void *info)
}
}
}
#endif
#endif
return;
......@@ -488,7 +597,7 @@ void su_port_osx_destroy(su_port_t *self)
#if SU_HAVE_MBOX
if (self->sup_mbox[0] != INVALID_SOCKET) {
su_port_unregister(self, NULL, &self->sup_mbox_wait, NULL,
su_port_osx_unregister(self, NULL, &self->sup_mbox_wait, NULL,
(su_wakeup_arg_t *)self->sup_mbox);
su_wait_destroy(&self->sup_mbox_wait);
su_close(self->sup_mbox[0]); self->sup_mbox[0] = INVALID_SOCKET;
......@@ -500,6 +609,8 @@ void su_port_osx_destroy(su_port_t *self)
#endif
if (self->sup_waits)
free(self->sup_waits), self->sup_waits = NULL;
if (self->sup_sources)
free(self->sup_sources), self->sup_sources = NULL;
if (self->sup_wait_cbs)
free(self->sup_wait_cbs), self->sup_wait_cbs = NULL;
if (self->sup_wait_args)
......@@ -542,11 +653,25 @@ static void su_port_osx_decref(su_port_t *self, int blocking, char const *who)
SU_PORT_OSX_DECREF(self, who);
}
CFRunLoopSourceRef su_port_osx_runloop_source(su_port_t *self)
CFRunLoopRef su_port_osx_runloop(su_port_t *self)
{
return self->sup_source;
return self->sup_main_loop;
}
#if SU_HAVE_MBOX
/** @internal Message box wakeup function. */
static int su_port_osx_wakeup(su_root_magic_t *magic, /* NULL */
su_wait_t *w,
su_wakeup_arg_t *arg)
{
char buf[32];
su_socket_t s = *(su_socket_t *)arg;
su_wait_events(w, s);
recv(s, buf, sizeof(buf), 0);
return 0;
}
#endif
/** @internal Send a message to the port. */
int su_port_osx_send(su_port_t *self, su_msg_r rmsg)
{
......@@ -659,9 +784,14 @@ int su_port_osx_register(su_port_t *self,
su_wakeup_arg_t *arg,
int priority)
{
unsigned i, j, I;
unsigned i, j;
unsigned n;
CFRunLoopRef rl;
CFRunLoopSourceRef *sources;
CFRunLoopSourceRef source;
CFSocketRef cf_socket;
int events = 0;
CFSocketContext cf_socket_cntx[1] = {{0, self, NULL, NULL, NULL}};
enter;
......@@ -672,40 +802,54 @@ int su_port_osx_register(su_port_t *self,
if (n >= self->sup_size_waits) {
/* Reallocate size arrays */
int size;
unsigned *indices;
int *indices;
int *reverses;
su_wait_t *waits;
su_wakeup_f *wait_cbs;
su_wakeup_arg_t **wait_args;
su_root_t **wait_tasks;
assert(self->sup_free_index == -1);
if (self->sup_size_waits == 0)
size = SU_MIN_WAITS;
size = su_root_size_hint;
else
size = 2 * self->sup_size_waits;
if (size < SU_MIN_WAITS)
size = SU_MIN_WAITS;
/* Too large */
if (-3 - size > 0)
return (errno = ENOMEM), -1;
indices = realloc(self->sup_indices, size * sizeof(*indices));
if (indices) {
self->sup_indices = indices;
for (i = self->sup_size_waits; i < size; i++)
indices[i] = UINT_MAX;
}
for (i = self->sup_size_waits; i < size - 1; i++)
indices[i] = -3 - i;
rl = CFRunLoopGetCurrent();
if (self->sup_size_waits < size) {
indices[i] = -1;
self->sup_free_index = -2 - self->sup_size_waits;
}
}
for (i = 0; i < self->sup_n_waits; i++)
CFRunLoopRemoveSource(rl, self->sup_waits[i].w_source,
kCFRunLoopDefaultMode);
/* g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[i]); */
reverses = realloc(self->sup_reverses, size * sizeof(*waits));
if (reverses) {
for (i = self->sup_size_waits; i < size; i++)
reverses[i] = -1;
self->sup_reverses = reverses;
}
waits = realloc(self->sup_waits, size * sizeof(*waits));
if (waits)
self->sup_waits = waits;
for (i = 0; i < self->sup_n_waits; i++)
CFRunLoopAddSource(rl, waits[i].w_source, kCFRunLoopDefaultMode);
/* g_source_add_poll(self->sup_source, (GPollFD*)&waits[i]); */
sources = realloc(self->sup_sources, size * sizeof(*sources));
if (sources)
self->sup_sources = sources;
wait_cbs = realloc(self->sup_wait_cbs, size * sizeof(*wait_cbs));
if (wait_cbs)
self->sup_wait_cbs = wait_cbs;
......@@ -719,7 +863,8 @@ int su_port_osx_register(su_port_t *self,
if (wait_tasks)
self->sup_wait_roots = wait_tasks;
if (!(indices && waits && wait_cbs && wait_args && wait_tasks)) {
if (!(indices &&
reverses && waits && wait_cbs && wait_args && wait_tasks)) {
return -1;
}
......@@ -728,52 +873,123 @@ int su_port_osx_register(su_port_t *self,
self->sup_n_waits++;
events = kCFSocketReadCallBack | kCFSocketAcceptCallBack |
kCFSocketDataCallBack | kCFSocketConnectCallBack |
kCFSocketWriteCallBack;
cf_socket = CFSocketCreateWithNative(kCFAllocatorDefault,
(CFSocketNativeHandle) su_wait_socket(wait),
events, su_port_osx_socket_cb, cf_socket_cntx);
source = CFSocketCreateRunLoopSource(kCFAllocatorDefault, cf_socket, 0);
rl = CFRunLoopGetCurrent();
CFRunLoopAddSource(rl, source, kCFRunLoopDefaultMode);
if (priority > 0) {
/* Insert */
for (; n > 0; n--) {
CFRunLoopRemoveSource(rl, self->sup_waits[n-1].w_source, kCFRunLoopDefaultMode);
/* g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[n-1]); */
self->sup_reverses[n] = self->sup_reverses[n-1];
self->sup_waits[n] = self->sup_waits[n-1];
CFRunLoopAddSource(rl, self->sup_waits[n].w_source, kCFRunLoopDefaultMode);
/* g_source_add_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]); */
self->sup_sources[n] = self->sup_sources[n-1];
self->sup_wait_cbs[n] = self->sup_wait_cbs[n-1];
self->sup_wait_args[n] = self->sup_wait_args[n-1];
self->sup_wait_roots[n] = self->sup_wait_roots[n-1];
}
self->sup_pri_offset++;
}
else {
/* Append - no need to move anything */
}
i = -2 - self->sup_free_index; assert(i < self->sup_size_waits);
self->sup_free_index = self->sup_indices[i];
self->sup_reverses[n] = i;
self->sup_waits[n] = *wait;
CFRunLoopAddSource(rl, self->sup_waits[n].w_source, kCFRunLoopDefaultMode);
/* g_source_add_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]); */
self->sup_wait_cbs[n] = callback;
self->sup_sources[n] = source;
self->sup_wait_args[n] = arg;
self->sup_wait_roots[n] = root;
I = self->sup_max_index;
if (n == 0 && self->sup_n_waits > 1) {
for (j = 0; j < self->sup_size_waits; j++) {
if (self->sup_indices[j] >= 0)
self->sup_indices[j]++;
}
}
for (i = 0; i < I; i++)
if (self->sup_indices[i] == UINT_MAX)
break;
else if (self->sup_indices[i] >= n)
self->sup_indices[i]++;
self->sup_indices[i] = n;
self->sup_registers++;
if (i == I)
self->sup_max_index++;
return i + !SU_HAVE_MBOX; /* Mailbox has index 0 */
}
if (n + 1 < self->sup_n_waits)
for (j = i; j < I; j++)
if (self->sup_indices[j] != UINT_MAX &&
self->sup_indices[j] >= n)
self->sup_indices[j]++;
/** Deregister a su_wait_t object. */
static
int su_port_osx_deregister0(su_port_t *self, int i, su_wait_t wait[1])
{
int n, j, N, *indices, *reverses;
CFRunLoopRef rl;
self->sup_indices[i] = n;
i -= !SU_HAVE_MBOX;
N = self->sup_n_waits;
indices = self->sup_indices;
reverses = self->sup_reverses;
n = indices[i];
if (n < 0)
return -1;
assert(i == self->sup_reverses[n]);
self->sup_n_waits = N = N - 1;
wait[0] = self->sup_waits[n];
rl = CFRunLoopGetCurrent();
if (N > 0) {
if (n < self->sup_pri_offset) {
j = self->sup_pri_offset - 1;
if (n != j) {
assert(reverses[j] >= 0);
indices[reverses[j]] = n;
self->sup_reverses[n] = self->sup_reverses[j];
self->sup_waits[n] = self->sup_waits[j];
self->sup_wait_cbs[n] = self->sup_wait_cbs[j];
/* XXX -- mela: really? */
CFRunLoopRemoveSource(rl, self->sup_sources[n], kCFRunLoopDefaultMode);
self->sup_sources[n] = self->sup_sources[j];
self->sup_wait_args[n] = self->sup_wait_args[j];
self->sup_wait_roots[n] = self->sup_wait_roots[j];
n = j;
}
self->sup_pri_offset = j;
}
if (n < N) {
assert(reverses[N] >= 0);
indices[reverses[N]] = n;
self->sup_reverses[n] = self->sup_reverses[N];
self->sup_waits[n] = self->sup_waits[N];
self->sup_wait_cbs[n] = self->sup_wait_cbs[N];
/* XXX -- mela: really? */
CFRunLoopRemoveSource(rl, self->sup_sources[n], kCFRunLoopDefaultMode);
self->sup_sources[n] = self->sup_sources[N];
self->sup_wait_args[n] = self->sup_wait_args[N];
self->sup_wait_roots[n] = self->sup_wait_roots[N];
}
}
indices[i] = self->sup_free_index;
self->sup_free_index = -2 - i;
self->sup_registers++;
return i + 1; /* 0 is failure */
return (int)i + !SU_HAVE_MBOX;
}
/** Unregister a su_wait_t object.
......@@ -792,77 +1008,40 @@ int su_port_osx_register(su_port_t *self,
* @return Nonzero index of the wait object, or -1 upon an error.
*/
int su_port_osx_unregister(su_port_t *self,