Commit e40e2640 authored by Pekka Pessi's avatar Pekka Pessi

su_port.c etc: refactored su_port implementation.

Basic su_port.c implementation using pthreads and poll() is now divided into
three modules: su_base_port.c contains base implementation with su_base_*()
functions, su_pthread_port.c contains pthread-specific implementation and
su_poll_port() contains poll()/epoll()-specific parts. The decomposed
su_port allows implementations on different platforms and usages to share
code as far as possible.

This patch also introduces configure option --disable-poll-port which should
be used on systems with emulated poll, like older *BSD derivatives such as
OS X. Note however that su_select_poll.c is not completed yet.

darcs-hash:20070126155634-65a35-09612536f51f7cd14c33ba42278bc9ed2cee4144.gz
parent 8a5472ff
......@@ -29,6 +29,11 @@ libsofia-sip-ua:
- Added SIP header Refer-Sub and related functions
- Added <sofia-sip/sip_extra.h> include file
- Added auc_info() function (sofia-sip/auth_client.h)
- Added alternative implementations to event reactor object (su_port_t,
referenced by su_root_t) that can be changed at runtime
- Internal semantics of su_port_t reference counting have changed:
now su_port_create() has one reference, and su_root_create_with_port()
uses it reference
- This release is ABI/API compatible with applications linked against
any 1.12.x release. However, applications built against this release won't
work against an older library. The ABI has been tested with the nua module
......
......@@ -77,7 +77,9 @@ libsu_la_SOURCES = \
su_alloc.c su_alloc_lock.c su_strdup.c su_sprintf.c \
su_strlst.c su_vector.c \
su_time.c su_time0.c \
su_wait.c su_root.c su_timer.c su_port.c su_port.h \
su_wait.c su_root.c su_timer.c \
su_port.c su_port.h \
su_base_port.c su_pthread_port.c su_poll_port.c su_select_port.c \
su_localinfo.c \
su_os_nw.c \
su_taglist.c su_tag.c su_tag_io.c \
......
/*
* This file is part of the Sofia-SIP package
*
* Copyright (C) 2005 Nokia Corporation.
*
* Contact: Pekka Pessi <pekka.pessi@nokia.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA
*
*/
/**@ingroup su_wait
* @CFILE su_port.c
*
* OS-Independent Socket Syncronization Interface.
*
* This looks like nth reincarnation of "reactor". It implements the
* poll/select/WaitForMultipleObjects and message passing functionality.
*
* @author Pekka Pessi <Pekka.Pessi@nokia.com>
* @author Kai Vehmanen <kai.vehmanen@nokia.com>
*
* @date Created: Tue Sep 14 15:51:04 1999 ppessi
*/
#include "config.h"
#include <stdlib.h>
#include <assert.h>
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
#include <limits.h>
#include <errno.h>
#define su_base_port_s su_port_s
#include "sofia-sip/su.h"
#include "su_port.h"
#include "sofia-sip/su_alloc.h"
#if 1
#define PORT_REFCOUNT_DEBUG(x) ((void)0)
#else
#define PORT_REFCOUNT_DEBUG(x) printf x
#endif
/**@internal
*
* Initialize a message port.
*
* @retval 0 when successful
* @retval -1 upon an error
*/
int su_base_port_init(su_port_t *self, su_port_vtable_t const *vtable)
{
if (self) {
self->sup_vtable = vtable;
self->sup_tail = &self->sup_head;
return 0;
}
return -1;
}
/** @internal Deinit a base implementation of port. */
void su_base_port_deinit(su_port_t *self)
{
}
void su_base_port_lock(su_port_t *self, char const *who)
{
}
void su_base_port_unlock(su_port_t *self, char const *who)
{
}
int su_base_port_own_thread(su_port_t const *self)
{
return 1;
}
void su_base_port_incref(su_port_t *self, char const *who)
{
su_home_ref(self->sup_home);
PORT_REFCOUNT_DEBUG(("incref(%p) to %u by %s\n", self,
su_home_refcount(self->sup_home), who));
}
int su_base_port_decref(su_port_t *self, int blocking, char const *who)
{
int zapped = su_home_unref(self->sup_home);
PORT_REFCOUNT_DEBUG(("%s(%p) to %u%s by %s\n",
blocking ? "zapref" : "decref",
self, zapped ? 0 : su_home_refcount(self->sup_home),
blocking && !zapped ? " FAILED" :"",
who));
/* We should block until all references are destroyed */
if (blocking)
/* ...but we just abort() */
assert(zapped);
return zapped;
}
struct _GSource *su_base_port_gsource(su_port_t *self)
{
return NULL;
}
/** @internal Send a message to the port.
*
* @retval 1 if port thread needs to be woken
* @retval 0 if there are other messages in queue, too
* @retval -1 upon an error
*/
int su_base_port_send(su_port_t *self, su_msg_r rmsg)
{
if (self) {
int wakeup;
su_port_lock(self, "su_port_send");
wakeup = self->sup_head == NULL;
*self->sup_tail = rmsg[0]; rmsg[0] = NULL;
self->sup_tail = &(*self->sup_tail)->sum_next;
su_port_unlock(self, "su_port_send");
return wakeup;
}
else {
su_msg_destroy(rmsg);
return -1;
}
}
/** @internal
* Execute the messages in the incoming queue until the queue is empty..
*
* @param self - pointer to a port object
*
* @retval Number of messages sent
*/
int su_base_port_getmsgs(su_port_t *self)
{
int n = 0;
if (self->sup_head) {
su_msg_f f;
su_msg_t *msg, *queue;
su_port_lock(self, "su_base_port_getmsgs");
queue = self->sup_head;
self->sup_tail = &self->sup_head;
self->sup_head = NULL;
su_port_unlock(self, "su_base_port_getmsgs");
for (msg = queue; msg; msg = queue) {
queue = msg->sum_next;
msg->sum_next = NULL;
f = msg->sum_func;
if (f)
f(SU_ROOT_MAGIC(msg->sum_to->sut_root), &msg, msg->sum_data);
su_msg_delivery_report(&msg);
n++;
}
/* Check for wait events that may have been generated by messages */
self->sup_vtable->su_port_wait_events(self, 0);
}
return n;
}
/** @internal Enable multishot mode.
*
* The function su_port_multishot() enables, disables or queries the
* multishot mode for the port. The multishot mode determines how the events
* are scheduled by port. If multishot mode is enabled, port serves all the
* sockets that have received network events. If it is disables, only first
* socket event is served.
*
* @param self pointer to port object
* @param multishot multishot mode (0 => disables, 1 => enables, -1 => query)
*
* @retval 0 multishot mode is disabled
* @retval 1 multishot mode is enabled
* @retval -1 an error occurred
*/
int su_base_port_multishot(su_port_t *self, int multishot)
{
return 0;
}
/** @internal Enable threadsafe operation. */
int su_base_port_threadsafe(su_port_t *self)
{
return su_home_threadsafe(self->sup_home);
}
/** @internal Main loop.
*
* The function @c su_port_run() waits for wait objects and the timers
* associated with the port object. When any wait object is signaled or
* timer is expired, it invokes the callbacks, and returns waiting.
*
* The function @c su_port_run() runs until @c su_port_break() is called
* from a callback.
*
* @param self pointer to port object
*
*/
void su_base_port_run(su_port_t *self)
{
su_duration_t tout = 0;
assert(su_port_own_thread(self));
for (self->sup_running = 1; self->sup_running;) {
tout = 2000;
if (self->sup_prepoll)
self->sup_prepoll(self->sup_pp_magic, self->sup_pp_root);
if (self->sup_head)
self->sup_vtable->su_port_getmsgs(self);
if (self->sup_timers)
su_timer_expire(&self->sup_timers, &tout, su_now());
if (!self->sup_running)
break;
if (self->sup_head) /* if there are messages do a quick wait */
tout = 0;
self->sup_vtable->su_port_wait_events(self, tout);
}
}
#if tuning
/* This version can help tuning... */
void su_base_port_run_tune(su_port_t *self)
{
int i;
int timers = 0, messages = 0, events = 0;
su_duration_t tout = 0, tout0;
su_time_t started = su_now(), woken = started, bedtime = woken;
assert(su_port_own_thread(self));
for (self->sup_running = 1; self->sup_running;) {
tout = 2000;
timers = 0, messages = 0;
if (self->sup_prepoll)
self->sup_prepoll(self->sup_pp_magic, self->sup_pp_root);
if (self->sup_head)
messages = self->sup_vtable->su_port_getmsgs(self);
if (self->sup_timers)
timers = su_timer_expire(&self->sup_timers, &tout, su_now());
if (!self->sup_running)
break;
if (self->sup_head) /* if there are messages do a quick wait */
tout = 0;
bedtime = su_now();
events = self->sup_vtable->su_port_wait_events(self, tout);
woken = su_now();
if (messages || timers || events)
SU_DEBUG_1(("su_port_run(%p): %.6f: %u messages %u timers %u "
"events slept %.6f/%.3f\n",
self, su_time_diff(woken, started), messages, timers, events,
su_time_diff(woken, bedtime), tout * 1e-3));
if (!self->sup_running)
break;
}
}
#endif
/** @internal
* The function @c su_port_break() is used to terminate execution of @c
* su_port_run(). It can be called from a callback function.
*
* @param self pointer to port
*
*/
void su_base_port_break(su_port_t *self)
{
self->sup_running = 0;
}
/** @internal Block until wait object is signaled or timeout.
*
* This function waits for wait objects and the timers associated with
* the root object. When any wait object is signaled or timer is
* expired, it invokes the callbacks.
*
* This function returns when a callback has been invoked or @c tout
* milliseconds is elapsed.
*
* @param self pointer to port
* @param tout timeout in milliseconds
*
* @return
* Milliseconds to the next invocation of timer, or @c SU_WAIT_FOREVER if
* there are no active timers.
*/
su_duration_t su_base_port_step(su_port_t *self, su_duration_t tout)
{
su_time_t now = su_now();
assert(su_port_own_thread(self));
if (self->sup_prepoll)
self->sup_prepoll(self->sup_pp_magic, self->sup_pp_root);
if (self->sup_head)
self->sup_vtable->su_port_getmsgs(self);
if (self->sup_timers)
su_timer_expire(&self->sup_timers, &tout, now);
/* if there are messages do a quick wait */
if (self->sup_head)
tout = 0;
if (self->sup_vtable->su_port_wait_events(self, tout))
tout = 0;
else
tout = SU_WAIT_FOREVER;
if (self->sup_head)
self->sup_vtable->su_port_getmsgs(self);
if (self->sup_timers)
su_timer_expire(&self->sup_timers, &tout, su_now());
if (self->sup_head)
tout = 0;
return tout;
}
/* =========================================================================
* Pre-poll() callback
*/
int su_base_port_add_prepoll(su_port_t *self,
su_root_t *root,
su_prepoll_f *callback,
su_prepoll_magic_t *magic)
{
if (self->sup_prepoll)
return -1;
self->sup_prepoll = callback;
self->sup_pp_magic = magic;
self->sup_pp_root = root;
return 0;
}
int su_base_port_remove_prepoll(su_port_t *self,
su_root_t *root)
{
if (self->sup_pp_root != root)
return -1;
self->sup_prepoll = NULL;
self->sup_pp_magic = NULL;
self->sup_pp_root = NULL;
return 0;
}
/* =========================================================================
* Timers
*/
su_timer_t **su_base_port_timers(su_port_t *self)
{
return &self->sup_timers;
}
/* ====================================================================== */
/** @internal
* Used to check wait events in callbacks that take lots of time
*
* This function does a timeout 0 poll() and runs wait objects.
*
* @param port pointer to port
*
* @return number of events handled
*/
int su_base_port_yield(su_port_t *self)
{
return self->sup_vtable->su_port_wait_events(self, 0);
}
/*
* This file is part of the Sofia-SIP package
*
* Copyright (C) 2005 Nokia Corporation.
*
* Contact: Pekka Pessi <pekka.pessi@nokia.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA
*
*/
/**@ingroup su_wait
* @CFILE su_poll_port.c
*
* Port implementation using poll()
*
* @author Pekka Pessi <Pekka.Pessi@nokia.com>
* @author Kai Vehmanen <kai.vehmanen@nokia.com>
*
* @date Created: Tue Sep 14 15:51:04 1999 ppessi
*/
#include "config.h"
#if HAVE_POLL
#include <stdlib.h>
#include <assert.h>
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
#include <limits.h>
#include <errno.h>
#define su_port_s su_poll_port_s
#include "sofia-sip/su.h"
#include "su_port.h"
#include "sofia-sip/su_alloc.h"
/* React to multiple events per one poll() to make sure
* that high-priority events can never completely mask other events.
* Enabled by default on all platforms except WIN32 */
#if !defined(WIN32)
#define SU_ENABLE_MULTISHOT_POLL 1
#else
#define SU_ENABLE_MULTISHOT_POLL 0
#endif
#if HAVE_EPOLL
#include <sys/epoll.h>
#define POLL2EPOLL_NEEDED \
(POLLIN != EPOLLIN || POLLOUT != EPOLLOUT || POLLPRI != EPOLLPRI || \
POLLERR != EPOLLERR || POLLHUP != EPOLLHUP)
#define POLL2EPOLL(e) (e & (POLLIN|POLLOUT|POLLPRI|POLLERR|POLLHUP))
#define EPOLL2POLL(e) (e & (POLLIN|POLLOUT|POLLPRI|POLLERR|POLLHUP))
#endif
/** Port based on poll() or epoll(). */
struct su_poll_port_s {
su_pthread_port_t sup_base[1];
#if HAVE_EPOLL
/** epoll() fd */
int sup_epoll;
#endif
unsigned sup_multishot; /**< Multishot operation? */
unsigned sup_registers; /** Counter incremented by
su_port_register() or
su_port_unregister()
*/
int sup_n_waits; /**< Active su_wait_t in su_waits */
int sup_size_waits; /**< Size of allocated su_waits */
int sup_pri_offset; /**< Offset to prioritized waits */
#if !SU_HAVE_WINSOCK
#define INDEX_MAX (0x7fffffff)
#else
/* We use WSAWaitForMultipleEvents() */
#define INDEX_MAX (64)
#endif
/** Indices from index returned by su_root_register() to tables below.
*
* Free elements are negative. Free elements form a list, value of free
* element is (0 - index of next free element).
*
* First element sup_indices[0] points to first free element.
*/
int *sup_indices;
int *sup_reverses; /** Reverse index */
su_wakeup_f *sup_wait_cbs;
su_wakeup_arg_t**sup_wait_args;
su_root_t **sup_wait_roots;
su_wait_t *sup_waits;
};
static void su_poll_port_decref(su_port_t *, int blocking, char const *who);
static int su_poll_port_register(su_port_t *self,
su_root_t *root,
su_wait_t *wait,
su_wakeup_f callback,
su_wakeup_arg_t *arg,
int priority);
static int su_poll_port_unregister(su_port_t *port,
su_root_t *root,
su_wait_t *wait,
su_wakeup_f callback,
su_wakeup_arg_t *arg);
static int su_poll_port_deregister(su_port_t *self, int i);
static int su_poll_port_unregister_all(su_port_t *self, su_root_t *root);
static int su_poll_port_eventmask(su_port_t *self,
int index,
int socket,
int events);
static int su_poll_port_multishot(su_port_t *self, int multishot);
static int su_poll_port_wait_events(su_port_t *self, su_duration_t tout);
su_port_vtable_t const su_poll_port_vtable[1] =
{{
/* su_vtable_size: */ sizeof su_poll_port_vtable,
su_pthread_port_lock,
su_pthread_port_unlock,
su_base_port_incref,
su_poll_port_decref,
su_base_port_gsource,
su_pthread_port_send,
su_poll_port_register,
su_poll_port_unregister,
su_poll_port_deregister,
su_poll_port_unregister_all,
su_poll_port_eventmask,
su_base_port_run,
su_base_port_break,
su_base_port_step,
su_pthread_port_own_thread,
su_base_port_add_prepoll,
su_base_port_remove_prepoll,
su_base_port_timers,
su_poll_port_multishot,
su_base_port_threadsafe,
su_base_port_yield,
su_poll_port_wait_events,
su_base_port_getmsgs
}};
static void su_poll_port_deinit(void *arg)
{
su_port_t *self = arg;
SU_DEBUG_9(("%s(%p) called\n", "su_poll_port_deinit", self));
su_pthread_port_deinit(self);
if (self->sup_waits)
free(self->sup_waits), self->sup_waits = NULL;
if (self->sup_wait_cbs)
free(self->sup_wait_cbs), self->sup_wait_cbs = NULL;
if (self->sup_wait_args)
free(self->sup_wait_args), self->sup_wait_args = NULL;
if (self->sup_wait_roots)
free(self->sup_wait_roots), self->sup_wait_roots = NULL;
if (self->sup_reverses)
free(self->sup_reverses), self->sup_reverses = NULL;
if (self->sup_indices)
free(self->sup_indices), self->sup_indices = NULL;
SU_DEBUG_9(("%s(%p) freed registrations\n", "su_poll_port_deinit", self));
}
static void su_poll_port_decref(su_port_t *self, int blocking, char const *who)
{
su_base_port_decref(self, blocking, who);
}
/** @internal
*
* Register a @c su_wait_t object. The wait object, a callback function and
* an argument pointer is stored in the port object. The callback function
* will be called when the wait object is signaled.
*
* Please note if identical wait objects are inserted, only first one is
* ever signalled.
*
* @param self pointer to port
* @param root pointer to root object
* @param waits pointer to wait object
* @param callback callback function pointer
* @param arg argument given to callback function when it is invoked
* @param priority relative priority of the wait object
* (0 is normal, 1 important, 2 realtime)
*
* @return
* Positive index of the wait object,
* or -1 upon an error.
*/
int su_poll_port_register(su_port_t *self,
su_root_t *root,
su_wait_t *wait,
su_wakeup_f callback,
su_wakeup_arg_t *arg,
int priority)
{
int i, j, n;
assert(su_port_own_thread(self));
n = self->sup_n_waits;
if (n >= SU_WAIT_MAX)
return su_seterrno(ENOMEM);
if (n >= self->sup_size_waits) {
/* Reallocate size arrays */
int size;
int *indices;
int *reverses;
su_wait_t *waits;
su_wakeup_f *wait_cbs;
su_wakeup_arg_t **wait_args;
su_root_t **wait_tasks;
if (self->sup_size_waits == 0)
size = su_root_size_hint;
else
size = 2 * self->sup_size_waits;
if (size < SU_WAIT_MIN)
size = SU_WAIT_MIN;
/* Too large */
if (-3 - size > 0)
return (errno = ENOMEM), -1;
indices = realloc(self->sup_indices, (size + 1) * sizeof(*indices));
if (indices) {
self->sup_indices = indices;
if (self->sup_size_waits == 0)
indices[0] = -1;
for (i = self->sup_size_waits + 1; i <= size; i++)
indices[i] = -1 - i;
}
reverses = realloc(self->sup_reverses, size * sizeof(*waits));
if (reverses) {
for (i = self->sup_size_waits; i < size; i++)
reverses[i] = -1;
self->sup_reverses = reverses;
}
waits = realloc(self->sup_waits, size * sizeof(*waits));
if (waits)
self->sup_waits = waits;
wait_cbs = realloc(self->sup_wait_cbs, size * sizeof(*wait_cbs));
if (wait_cbs)
self->sup_wait_cbs = wait_cbs;
wait_args = realloc(self->sup_wait_args, size * sizeof(*wait_args));
if (wait_args)