Commit 83ae5ab9 authored by Pekka Pessi's avatar Pekka Pessi
Browse files

su: initial support for epoll in su_root_t/su_port_t.

darcs-hash:20060929131355-65a35-3d3df5d242ac6cbb4b02cd8a06ec89bdd2598ed5.gz
parent e7099da8
......@@ -79,6 +79,9 @@ SOFIA_BEGIN_DECLS
/** Initializer for a wait object. @HI */
#define SU_WAIT_INIT { INVALID_SOCKET, 0, 0 }
/** Maximum number of sources supported by su_wait() */
#define SU_WAIT_MAX (0x7fffffff)
#elif SU_HAVE_WINSOCK
#define SU_WAIT_CMP(x, y) ((intptr_t)(x) - (intptr_t)(y))
......@@ -95,6 +98,8 @@ SOFIA_BEGIN_DECLS
#define SU_WAIT_INIT NULL
#define SU_WAIT_MAX (64)
#else
#define SU_WAIT_CMP(x, y)
#define SU_WAIT_IN
......@@ -362,6 +367,7 @@ SOFIAPUBFUN su_root_magic_t *su_root_magic(su_root_t *root);
SOFIAPUBFUN int su_root_register(su_root_t*, su_wait_t *,
su_wakeup_f, su_wakeup_arg_t *,
int priority);
/* This is slow. Deprecated. */
SOFIAPUBFUN int su_root_unregister(su_root_t*, su_wait_t *,
su_wakeup_f, su_wakeup_arg_t*);
SOFIAPUBFUN int su_root_deregister(su_root_t*, int);
......
......@@ -815,8 +815,8 @@ int su_port_osx_register(su_port_t *self,
else
size = 2 * self->sup_size_waits;
if (size < SU_MIN_WAITS)
size = SU_MIN_WAITS;
if (size < SU_WAIT_MIN)
size = SU_WAIT_MIN;
/* Too large */
if (-3 - size > 0)
......
......@@ -75,10 +75,16 @@
#define MBOX_SEND 0
#endif
#undef HAVE_EPOLL
#if HAVE_EPOLL
#include <sys/epoll.h>
#define POLL2EPOLL_NEEDED \
(POLLIN != EPOLLIN || POLLOUT != EPOLLOUT || POLLPRI != EPOLLPRI || \
POLLERR != EPOLLERR || POLLHUP != EPOLLHUP)
#define POLL2EPOLL(e) (e & (POLLIN|POLLOUT|POLLPRI|POLLERR|POLLHUP))
#define EPOLL2POLL(e) (e & (POLLIN|POLLOUT|POLLPRI|POLLERR|POLLHUP))
#endif
static void su_port_lock(su_port_t *self, char const *who);
......@@ -115,11 +121,6 @@ void su_port_break(su_port_t *self);
static
su_duration_t su_port_step(su_port_t *self, su_duration_t tout);
#if 0
unsigned su_port_query(su_port_t *, su_wait_t *, unsigned n_waits);
void su_port_event(su_port_t *, su_wait_t *waitobj);
#endif
static
int su_port_own_thread(su_port_t const *port);
......@@ -204,7 +205,8 @@ struct su_port_s {
su_wait_t sup_mbox_wait;
#endif
#if HAVE_EPOLL
#if HAVE_EPOLL
/** epoll() fd */
int sup_epoll;
#endif
......@@ -219,8 +221,21 @@ struct su_port_s {
int sup_pri_offset; /**< Offset to prioritized waits */
int sup_free_index; /**< Number of first free index */
int *sup_indices; /** Indices to registrations */
#if !SU_HAVE_WINSOCK
#define INDEX_MAX (0x7fffffff)
#else
/* We use WSAWaitForMultipleEvents() */
#define INDEX_MAX (64)
#endif
/** Indices from index returned by su_root_register() to tables below.
*
* Free elements are negative. Free elements form a list, value of free
* element is (0 - index of next free element).
*
* First element sup_indices[0] points to first free element.
*/
int *sup_indices;
int *sup_reverses; /** Reverse index */
su_wakeup_f *sup_wait_cbs;
......@@ -368,8 +383,6 @@ su_port_t *su_port_create(void)
SU_PORT_INITLOCK(self);
self->sup_tail = &self->sup_head;
self->sup_free_index = -1;
self->sup_multishot = (SU_ENABLE_MULTISHOT_POLL) != 0;
#if SU_HAVE_PTHREADS
......@@ -378,9 +391,11 @@ su_port_t *su_port_create(void)
#if HAVE_EPOLL
self->sup_epoll = epoll_create(su_root_size_hint);
if (self->sup_epoll == -1) {
why = "su_port_init: epoll_create"; goto error;
}
if (self->sup_epoll == -1)
SU_DEBUG_3(("su_port(%p): epoll_create(): %s\n", self, strerror(errno)));
else
SU_DEBUG_0(("su_port(%p): epoll_create() => %u: OK\n",
self, self->sup_epoll));
#endif
#if SU_HAVE_MBOX
......@@ -632,8 +647,9 @@ int su_port_getmsgs(su_port_t *self)
* (0 is normal, 1 important, 2 realtime)
*
* @return
* The function @su_port_register returns nonzero index of the wait object,
* or -1 upon an error. */
* Positive index of the wait object,
* or -1 upon an error.
*/
int su_port_register(su_port_t *self,
su_root_t *root,
su_wait_t *wait,
......@@ -647,6 +663,9 @@ int su_port_register(su_port_t *self,
n = self->sup_n_waits;
if (n >= SU_WAIT_MAX)
return su_seterrno(ENOMEM);
if (n >= self->sup_size_waits) {
/* Reallocate size arrays */
int size;
......@@ -657,31 +676,24 @@ int su_port_register(su_port_t *self,
su_wakeup_arg_t **wait_args;
su_root_t **wait_tasks;
assert(self->sup_free_index == -1);
if (self->sup_size_waits == 0)
size = su_root_size_hint;
else
size = 2 * self->sup_size_waits;
if (size < SU_MIN_WAITS)
size = SU_MIN_WAITS;
if (size < SU_WAIT_MIN)
size = SU_WAIT_MIN;
/* Too large */
if (-3 - size > 0)
return (errno = ENOMEM), -1;
indices = realloc(self->sup_indices, size * sizeof(*indices));
indices = realloc(self->sup_indices, (size + 1) * sizeof(*indices));
if (indices) {
self->sup_indices = indices;
for (i = self->sup_size_waits; i < size - 1; i++)
indices[i] = -3 - i;
if (self->sup_size_waits < size) {
indices[i] = -1;
self->sup_free_index = -2 - self->sup_size_waits;
}
for (i = self->sup_size_waits; i <= size; i++)
indices[i] = -1 - i;
}
reverses = realloc(self->sup_reverses, size * sizeof(*waits));
......@@ -716,25 +728,52 @@ int su_port_register(su_port_t *self,
self->sup_size_waits = size;
}
self->sup_n_waits++;
i = -self->sup_indices[0]; assert(i <= self->sup_size_waits);
#if HAVE_EPOLL
if (self->sup_epoll != -1) {
struct epoll_event ev;
ev.events = POLL2EPOLL(wait->events);
ev.data.u64 = 0;
ev.data.u32 = (uint32_t)i;
if (epoll_ctl(self->sup_epoll, EPOLL_CTL_ADD, wait->fd, &ev) == -1) {
SU_DEBUG_0(("EPOLL_CTL_ADD(%u, %u) failed: %s\n",
wait->fd, ev.events, strerror(errno)));
return -1;
}
}
else
#endif
if (priority > 0) {
/* Insert */
for (; n > 0; n--) {
for (n = self->sup_n_waits; n > 0; n--) {
self->sup_reverses[n] = self->sup_reverses[n-1];
self->sup_waits[n] = self->sup_waits[n-1];
self->sup_wait_cbs[n] = self->sup_wait_cbs[n-1];
self->sup_wait_args[n] = self->sup_wait_args[n-1];
self->sup_wait_roots[n] = self->sup_wait_roots[n-1];
}
if (self->sup_n_waits > 1) {
for (j = 0; j < self->sup_size_waits; j++) {
if (self->sup_indices[j] >= 0)
self->sup_indices[j]++;
}
}
self->sup_pri_offset++;
}
else {
/* Append - no need to move anything */
n = self->sup_n_waits;
}
i = -2 - self->sup_free_index; assert(i < self->sup_size_waits);
self->sup_free_index = self->sup_indices[i];
self->sup_n_waits++;
self->sup_indices[0] = self->sup_indices[i]; /* Free index */
self->sup_indices[i] = n;
self->sup_reverses[n] = i;
self->sup_waits[n] = *wait;
......@@ -742,74 +781,84 @@ int su_port_register(su_port_t *self,
self->sup_wait_args[n] = arg;
self->sup_wait_roots[n] = root;
if (n == 0 && self->sup_n_waits > 1) {
for (j = 0; j < self->sup_size_waits; j++) {
if (self->sup_indices[j] >= 0)
self->sup_indices[j]++;
}
}
self->sup_indices[i] = n;
self->sup_registers++;
return i + !SU_HAVE_MBOX; /* Mailbox has index 0 */
/* Just like epoll, we return -1 or positive integer */
return i;
}
/** Deregister a su_wait_t object. */
static
int su_port_deregister0(su_port_t *self, int i, su_wait_t wait[1])
int su_port_deregister0(su_port_t *self, int i)
{
int n, j, N, *indices, *reverses;
i -= !SU_HAVE_MBOX;
int n, N, *indices, *reverses;
N = self->sup_n_waits;
indices = self->sup_indices;
reverses = self->sup_reverses;
n = indices[i];
n = indices[i]; assert(n >= 0);
if (n < 0)
return -1;
#if HAVE_EPOLL
if (self->sup_epoll != -1) {
su_wait_t *wait = &self->sup_waits[n];
struct epoll_event ev;
assert(i == self->sup_reverses[n]);
ev.events = POLL2EPOLL(wait->events);
ev.data.u64 = (uint64_t)0;
ev.data.u32 = (uint32_t)i;
self->sup_n_waits = N = N - 1;
if (epoll_ctl(self->sup_epoll, EPOLL_CTL_DEL, wait->fd, &ev) == -1) {
SU_DEBUG_1(("su_port(%p): EPOLL_CTL_DEL(%u): %s\n", self,
wait->fd, su_strerror(su_errno())));
}
}
#endif
wait[0] = self->sup_waits[n];
N = --self->sup_n_waits;
if (N > 0) {
if (n < self->sup_pri_offset) {
j = self->sup_pri_offset - 1;
if (n != j) {
assert(reverses[j] >= 0);
indices[reverses[j]] = n;
self->sup_reverses[n] = self->sup_reverses[j];
self->sup_waits[n] = self->sup_waits[j];
self->sup_wait_cbs[n] = self->sup_wait_cbs[j];
self->sup_wait_args[n] = self->sup_wait_args[j];
self->sup_wait_roots[n] = self->sup_wait_roots[j];
n = j;
}
self->sup_pri_offset = j;
}
if (n < N) {
assert(reverses[N] >= 0);
indices[reverses[N]] = n;
self->sup_reverses[n] = self->sup_reverses[N];
self->sup_waits[n] = self->sup_waits[N];
self->sup_wait_cbs[n] = self->sup_wait_cbs[N];
self->sup_wait_args[n] = self->sup_wait_args[N];
self->sup_wait_roots[n] = self->sup_wait_roots[N];
if (n < self->sup_pri_offset) {
int j = --self->sup_pri_offset;
if (n != j) {
assert(reverses[j] > 0);
assert(indices[reverses[j]] == j);
indices[reverses[j]] = n;
reverses[n] = reverses[j];
self->sup_waits[n] = self->sup_waits[j];
self->sup_wait_cbs[n] = self->sup_wait_cbs[j];
self->sup_wait_args[n] = self->sup_wait_args[j];
self->sup_wait_roots[n] = self->sup_wait_roots[j];
n = j;
}
}
indices[i] = self->sup_free_index;
self->sup_free_index = -2 - i;
if (n < N) {
assert(reverses[N] > 0);
assert(indices[reverses[N]] == N);
indices[reverses[N]] = n;
reverses[n] = reverses[N];
self->sup_waits[n] = self->sup_waits[N];
self->sup_wait_cbs[n] = self->sup_wait_cbs[N];
self->sup_wait_args[n] = self->sup_wait_args[N];
self->sup_wait_roots[n] = self->sup_wait_roots[N];
n = N;
}
reverses[n] = -1;
memset(&self->sup_waits[n], 0, sizeof self->sup_waits[n]);
self->sup_wait_cbs[n] = NULL;
self->sup_wait_args[n] = NULL;
self->sup_wait_roots[n] = NULL;
indices[i] = indices[0];
indices[0] = -i;
self->sup_registers++;
return (int)i + !SU_HAVE_MBOX;
return i;
}
......@@ -825,7 +874,9 @@ int su_port_deregister0(su_port_t *self, int i, su_wait_t wait[1])
* @param callback - callback function pointer (may be NULL)
* @param arg - argument given to callback function when it is invoked
* (may be NULL)
*
*
* @deprecated Use su_port_deregister() instead.
*
* @return Nonzero index of the wait object, or -1 upon an error.
*/
int su_port_unregister(su_port_t *self,
......@@ -835,25 +886,15 @@ int su_port_unregister(su_port_t *self,
su_wakeup_arg_t *arg)
{
int n, N;
int i, *indices;
su_wait_t dummy[1];
assert(self);
assert(SU_PORT_OWN_THREAD(self));
i = (unsigned)-1;
N = self->sup_n_waits;
indices = self->sup_indices;
for (n = 0; n < N; n++) {
if (SU_WAIT_CMP(wait[0], self->sup_waits[n]) == 0) {
/* Found - delete it */
if (indices[n] == n)
i = n;
else for (i = 0; i < self->sup_size_waits; i++)
if (indices[i] == n)
break;
return su_port_deregister0(self, i, dummy);
return su_port_deregister0(self, self->sup_reverses[n]);
}
}
......@@ -881,12 +922,13 @@ int su_port_deregister(su_port_t *self, int i)
assert(self);
assert(SU_PORT_OWN_THREAD(self));
if (i <= 0 || i - !SU_HAVE_MBOX >= self->sup_size_waits)
return -1;
assert(i - !SU_HAVE_MBOX < self->sup_size_waits);
if (i <= 0 || i > self->sup_size_waits)
return su_seterrno(EBADF);
retval = su_port_deregister0(self, i, wait);
if (self->sup_indices[i] < 0)
return su_seterrno(EBADF);
retval = su_port_deregister0(self, i);
su_wait_destroy(wait);
......@@ -908,8 +950,7 @@ int su_port_deregister(su_port_t *self, int i)
int su_port_unregister_all(su_port_t *self,
su_root_t *root)
{
unsigned i, j;
unsigned n_waits;
int i, j, index, N;
int *indices, *reverses;
su_wait_t *waits;
su_wakeup_f *wait_cbs;
......@@ -918,7 +959,7 @@ int su_port_unregister_all(su_port_t *self,
assert(SU_PORT_OWN_THREAD(self));
n_waits = self->sup_n_waits;
N = self->sup_n_waits;
indices = self->sup_indices;
reverses = self->sup_reverses;
waits = self->sup_waits;
......@@ -926,28 +967,58 @@ int su_port_unregister_all(su_port_t *self,
wait_args = self->sup_wait_args;
wait_roots = self->sup_wait_roots;
for (i = j = 0; (unsigned)i < n_waits; i++) {
for (i = j = 0; i < N; i++) {
index = reverses[i]; assert(index > 0 && indices[index] == i);
if (wait_roots[i] == root) {
/* XXX - we should free all resources associated with this */
indices[reverses[i]] = self->sup_free_index;
self->sup_free_index = -2 - reverses[i];
/* XXX - we should free all resources associated with this, too */
#if HAVE_EPOLL
if (self->sup_epoll != -1) {
int fd = waits[i].fd;
struct epoll_event ev;
ev.events = POLL2EPOLL(waits[i].events);
ev.data.u64 = (uint64_t)0;
ev.data.u32 = (uint32_t)index;
if (epoll_ctl(self->sup_epoll, EPOLL_CTL_DEL, fd, &ev) == -1) {
SU_DEBUG_1(("EPOLL_CTL_DEL(%u): %s\n",
waits[i].fd, su_strerror(su_errno())));
}
}
#endif
if (i < self->sup_pri_offset)
self->sup_pri_offset--;
indices[index] = indices[0];
indices[0] = -index;
continue;
}
indices[reverses[i]] = j;
reverses[j] = reverses[i];
waits[j] = waits[i];
wait_cbs[j] = wait_cbs[i];
wait_args[j] = wait_args[i];
wait_roots[j] = wait_roots[i];
if (i != j) {
indices[index] = j;
reverses[j] = reverses[i];
waits[j] = waits[i];
wait_cbs[j] = wait_cbs[i];
wait_args[j] = wait_args[i];
wait_roots[j] = wait_roots[i];
}
j++;
}
for (i = j; i < N; i++) {
reverses[i] = -1;
wait_cbs[i] = NULL;
wait_args[i] = NULL;
wait_roots[i] = NULL;
}
memset(&waits[j], 0, (char *)&waits[N] - (char *)&waits[j]);
self->sup_n_waits = j;
self->sup_registers++;
return n_waits - j;
return N - j;
}
/**Set mask for a registered event. @internal
......@@ -966,25 +1037,42 @@ int su_port_unregister_all(su_port_t *self,
int su_port_eventmask(su_port_t *self, int index, int socket, int events)
{
int n;
assert(self);
assert(SU_PORT_OWN_THREAD(self));
if (index <= 0 || index > self->sup_size_waits)
return -1;
return su_seterrno(EBADF);
n = self->sup_indices[index];
if (n < 0)
return su_seterrno(EBADF);
n = self->sup_indices[index - !SU_HAVE_MBOX];
#if HAVE_EPOLL
if (self->sup_epoll != -1) {
su_wait_t *wait = &self->sup_waits[n];
struct epoll_event ev;
if (n < 0)
return -1;
wait->events = events;
ev.events = POLL2EPOLL(events);
ev.data.u64 = (uint64_t)0;
ev.data.u32 = (uint32_t)index;
if (epoll_ctl(self->sup_epoll, EPOLL_CTL_MOD, wait->fd, &ev) == -1) {
SU_DEBUG_1(("su_port(%p): EPOLL_CTL_MOD(%u): %s\n", self,
wait->fd, su_strerror(su_errno())));
return -1;
}
}
#endif
return su_wait_mask(&self->sup_waits[n], socket, events);
}
#if 0
/** @internal
*
* Copies the su_wait_t objects from the port. The number of wait objects
* can be found out by calling su_port_query() with @a n_waits as zero.
* Copies the su_wait_t objects from the port. The number of wait objects
* can be found out by calling su_port_query() with @a n_waits as zero.
*
* @note This function is called only by friends.
*
......@@ -1011,6 +1099,7 @@ unsigned su_port_query(su_port_t *self, su_wait_t *waits, unsigned n_waits)
return n;
}
#endif
/** @internal Enable multishot mode.
*
......@@ -1029,7 +1118,7 @@ unsigned su_port_query(su_port_t *self, su_wait_t *waits, unsigned n_waits)
*/
int su_port_multishot(su_port_t *self, int multishot)
{
if (multishot == -1)
if (multishot < 0)
return self->sup_multishot;
else if (multishot == 0 || multishot == 1)
return self->sup_multishot = multishot;
......@@ -1158,11 +1247,45 @@ int su_port_wait_events(su_port_t *self, su_duration_t tout)
{
int i, events = 0;
su_wait_t *waits = self->sup_waits;
unsigned n = self->sup_n_waits;
int n = self->sup_n_waits;
su_root_t *root;
#if HAVE_POLL
unsigned version = self->sup_registers;
#endif
su_root_t *root;
#if HAVE_EPOLL
if (self->sup_epoll != -1) {
int const M = 4;
struct epoll_event ev[M];
int j, index;
int *indices = self->sup_indices;
n = epoll_wait(self->sup_epoll, ev, self->sup_multishot ? M : 1, tout);
assert(n <= M);
for (j = 0; j < n; j++) {
su_root_t *root;
su_root_magic_t *magic;
if (!ev[j].events || ev[j].data.u32 > INDEX_MAX)
continue;
index = (int)ev[j].data.u32;
assert(index > 0 && index <= self->sup_size_waits);
i = indices[index]; assert(i >= 0 && i <= self->sup_n_waits);
root = self->sup_wait_roots[i];
magic = root ? su_root_magic(root) : NULL;
waits[i].revents = ev[j].events;