Commit 0169e883 authored by Pekka Pessi's avatar Pekka Pessi

su: separated su_poll_port and su_epoll_port implementations.

Added a few test cases and fixed errors in su_port.c.

darcs-hash:20070126184414-65a35-b3d2a1864afa68cf7511c6b02a1dfef4a89dcb0c.gz
parent e40e2640
...@@ -79,7 +79,8 @@ libsu_la_SOURCES = \ ...@@ -79,7 +79,8 @@ libsu_la_SOURCES = \
su_time.c su_time0.c \ su_time.c su_time0.c \
su_wait.c su_root.c su_timer.c \ su_wait.c su_root.c su_timer.c \
su_port.c su_port.h \ su_port.c su_port.h \
su_base_port.c su_pthread_port.c su_poll_port.c su_select_port.c \ su_base_port.c su_pthread_port.c \
su_poll_port.c su_epoll_port.c su_select_port.c \
su_localinfo.c \ su_localinfo.c \
su_os_nw.c \ su_os_nw.c \
su_taglist.c su_tag.c su_tag_io.c \ su_taglist.c su_tag.c su_tag_io.c \
......
This diff is collapsed.
...@@ -38,15 +38,6 @@ ...@@ -38,15 +38,6 @@
#include "config.h" #include "config.h"
/* React to multiple events per one poll() to make sure
* that high-priority events can never completely mask other events.
* Enabled by default on all platforms except WIN32 */
#ifndef WIN32
#define SU_ENABLE_MULTISHOT_POLL 1
#else
#define SU_ENABLE_MULTISHOT_POLL 0
#endif
#include <stdlib.h> #include <stdlib.h>
#include <assert.h> #include <assert.h>
#include <stdarg.h> #include <stdarg.h>
......
...@@ -51,36 +51,13 @@ ...@@ -51,36 +51,13 @@
#include "su_port.h" #include "su_port.h"
#include "sofia-sip/su_alloc.h" #include "sofia-sip/su_alloc.h"
/* React to multiple events per one poll() to make sure /** Port based on poll(). */
* that high-priority events can never completely mask other events.
* Enabled by default on all platforms except WIN32 */
#if !defined(WIN32)
#define SU_ENABLE_MULTISHOT_POLL 1
#else
#define SU_ENABLE_MULTISHOT_POLL 0
#endif
#if HAVE_EPOLL
#include <sys/epoll.h>
#define POLL2EPOLL_NEEDED \
(POLLIN != EPOLLIN || POLLOUT != EPOLLOUT || POLLPRI != EPOLLPRI || \
POLLERR != EPOLLERR || POLLHUP != EPOLLHUP)
#define POLL2EPOLL(e) (e & (POLLIN|POLLOUT|POLLPRI|POLLERR|POLLHUP))
#define EPOLL2POLL(e) (e & (POLLIN|POLLOUT|POLLPRI|POLLERR|POLLHUP))
#endif
/** Port based on poll() or epoll(). */
struct su_poll_port_s { struct su_poll_port_s {
su_pthread_port_t sup_base[1]; su_pthread_port_t sup_base[1];
#if HAVE_EPOLL #define sup_home sup_base->sup_base->sup_home
/** epoll() fd */
int sup_epoll;
#endif
unsigned sup_multishot; /**< Multishot operation? */ unsigned sup_multishot; /**< Multishot operation? */
unsigned sup_registers; /** Counter incremented by unsigned sup_registers; /** Counter incremented by
...@@ -173,21 +150,6 @@ static void su_poll_port_deinit(void *arg) ...@@ -173,21 +150,6 @@ static void su_poll_port_deinit(void *arg)
SU_DEBUG_9(("%s(%p) called\n", "su_poll_port_deinit", self)); SU_DEBUG_9(("%s(%p) called\n", "su_poll_port_deinit", self));
su_pthread_port_deinit(self); su_pthread_port_deinit(self);
if (self->sup_waits)
free(self->sup_waits), self->sup_waits = NULL;
if (self->sup_wait_cbs)
free(self->sup_wait_cbs), self->sup_wait_cbs = NULL;
if (self->sup_wait_args)
free(self->sup_wait_args), self->sup_wait_args = NULL;
if (self->sup_wait_roots)
free(self->sup_wait_roots), self->sup_wait_roots = NULL;
if (self->sup_reverses)
free(self->sup_reverses), self->sup_reverses = NULL;
if (self->sup_indices)
free(self->sup_indices), self->sup_indices = NULL;
SU_DEBUG_9(("%s(%p) freed registrations\n", "su_poll_port_deinit", self));
} }
static void su_poll_port_decref(su_port_t *self, int blocking, char const *who) static void su_poll_port_decref(su_port_t *self, int blocking, char const *who)
...@@ -233,6 +195,7 @@ int su_poll_port_register(su_port_t *self, ...@@ -233,6 +195,7 @@ int su_poll_port_register(su_port_t *self,
return su_seterrno(ENOMEM); return su_seterrno(ENOMEM);
if (n >= self->sup_size_waits) { if (n >= self->sup_size_waits) {
su_home_t *h = self->sup_home;
/* Reallocate size arrays */ /* Reallocate size arrays */
int size; int size;
int *indices; int *indices;
...@@ -254,7 +217,7 @@ int su_poll_port_register(su_port_t *self, ...@@ -254,7 +217,7 @@ int su_poll_port_register(su_port_t *self,
if (-3 - size > 0) if (-3 - size > 0)
return (errno = ENOMEM), -1; return (errno = ENOMEM), -1;
indices = realloc(self->sup_indices, (size + 1) * sizeof(*indices)); indices = su_realloc(h, self->sup_indices, (size + 1) * sizeof(*indices));
if (indices) { if (indices) {
self->sup_indices = indices; self->sup_indices = indices;
...@@ -265,27 +228,27 @@ int su_poll_port_register(su_port_t *self, ...@@ -265,27 +228,27 @@ int su_poll_port_register(su_port_t *self,
indices[i] = -1 - i; indices[i] = -1 - i;
} }
reverses = realloc(self->sup_reverses, size * sizeof(*waits)); reverses = su_realloc(h, self->sup_reverses, size * sizeof(*waits));
if (reverses) { if (reverses) {
for (i = self->sup_size_waits; i < size; i++) for (i = self->sup_size_waits; i < size; i++)
reverses[i] = -1; reverses[i] = -1;
self->sup_reverses = reverses; self->sup_reverses = reverses;
} }
waits = realloc(self->sup_waits, size * sizeof(*waits)); waits = su_realloc(h, self->sup_waits, size * sizeof(*waits));
if (waits) if (waits)
self->sup_waits = waits; self->sup_waits = waits;
wait_cbs = realloc(self->sup_wait_cbs, size * sizeof(*wait_cbs)); wait_cbs = su_realloc(h, self->sup_wait_cbs, size * sizeof(*wait_cbs));
if (wait_cbs) if (wait_cbs)
self->sup_wait_cbs = wait_cbs; self->sup_wait_cbs = wait_cbs;
wait_args = realloc(self->sup_wait_args, size * sizeof(*wait_args)); wait_args = su_realloc(h, self->sup_wait_args, size * sizeof(*wait_args));
if (wait_args) if (wait_args)
self->sup_wait_args = wait_args; self->sup_wait_args = wait_args;
/* Add sup_wait_roots array, if needed */ /* Add sup_wait_roots array, if needed */
wait_tasks = realloc(self->sup_wait_roots, size * sizeof(*wait_tasks)); wait_tasks = su_realloc(h, self->sup_wait_roots, size * sizeof(*wait_tasks));
if (wait_tasks) if (wait_tasks)
self->sup_wait_roots = wait_tasks; self->sup_wait_roots = wait_tasks;
...@@ -299,22 +262,6 @@ int su_poll_port_register(su_port_t *self, ...@@ -299,22 +262,6 @@ int su_poll_port_register(su_port_t *self,
i = -self->sup_indices[0]; assert(i <= self->sup_size_waits); i = -self->sup_indices[0]; assert(i <= self->sup_size_waits);
#if HAVE_EPOLL
if (self->sup_epoll != -1) {
struct epoll_event ev;
ev.events = POLL2EPOLL(wait->events);
ev.data.u64 = 0;
ev.data.u32 = (uint32_t)i;
if (epoll_ctl(self->sup_epoll, EPOLL_CTL_ADD, wait->fd, &ev) == -1) {
SU_DEBUG_0(("EPOLL_CTL_ADD(%u, %u) failed: %s\n",
wait->fd, ev.events, strerror(errno)));
return -1;
}
}
else
#endif
if (priority > 0) { if (priority > 0) {
/* Insert */ /* Insert */
for (n = self->sup_n_waits; n > 0; n--) { for (n = self->sup_n_waits; n > 0; n--) {
...@@ -362,22 +309,6 @@ static int su_poll_port_deregister0(su_port_t *self, int i, int destroy_wait) ...@@ -362,22 +309,6 @@ static int su_poll_port_deregister0(su_port_t *self, int i, int destroy_wait)
n = indices[i]; assert(n >= 0); n = indices[i]; assert(n >= 0);
#if HAVE_EPOLL
if (self->sup_epoll != -1) {
su_wait_t *wait = &self->sup_waits[n];
struct epoll_event ev;
ev.events = POLL2EPOLL(wait->events);
ev.data.u64 = (uint64_t)0;
ev.data.u32 = (uint32_t)i;
if (epoll_ctl(self->sup_epoll, EPOLL_CTL_DEL, wait->fd, &ev) == -1) {
SU_DEBUG_1(("su_port(%p): EPOLL_CTL_DEL(%u): %s\n", self,
wait->fd, su_strerror(su_errno())));
}
}
#endif
if (destroy_wait) if (destroy_wait)
su_wait_destroy(&self->sup_waits[n]); su_wait_destroy(&self->sup_waits[n]);
...@@ -538,21 +469,6 @@ int su_poll_port_unregister_all(su_port_t *self, ...@@ -538,21 +469,6 @@ int su_poll_port_unregister_all(su_port_t *self,
if (wait_roots[i] == root) { if (wait_roots[i] == root) {
/* XXX - we should free all resources associated with this, too */ /* XXX - we should free all resources associated with this, too */
#if HAVE_EPOLL
if (self->sup_epoll != -1) {
int fd = waits[i].fd;
struct epoll_event ev;
ev.events = POLL2EPOLL(waits[i].events);
ev.data.u64 = (uint64_t)0;
ev.data.u32 = (uint32_t)index;
if (epoll_ctl(self->sup_epoll, EPOLL_CTL_DEL, fd, &ev) == -1) {
SU_DEBUG_1(("EPOLL_CTL_DEL(%u): %s\n",
waits[i].fd, su_strerror(su_errno())));
}
}
#endif
if (i < self->sup_pri_offset) if (i < self->sup_pri_offset)
self->sup_pri_offset--; self->sup_pri_offset--;
...@@ -612,25 +528,6 @@ int su_poll_port_eventmask(su_port_t *self, int index, int socket, int events) ...@@ -612,25 +528,6 @@ int su_poll_port_eventmask(su_port_t *self, int index, int socket, int events)
if (n < 0) if (n < 0)
return su_seterrno(EBADF); return su_seterrno(EBADF);
#if HAVE_EPOLL
if (self->sup_epoll != -1) {
su_wait_t *wait = &self->sup_waits[n];
struct epoll_event ev;
wait->events = events;
ev.events = POLL2EPOLL(events);
ev.data.u64 = (uint64_t)0;
ev.data.u32 = (uint32_t)index;
if (epoll_ctl(self->sup_epoll, EPOLL_CTL_MOD, wait->fd, &ev) == -1) {
SU_DEBUG_1(("su_port(%p): EPOLL_CTL_MOD(%u): %s\n", self,
wait->fd, su_strerror(su_errno())));
return -1;
}
}
#endif
return su_wait_mask(&self->sup_waits[n], socket, events); return su_wait_mask(&self->sup_waits[n], socket, events);
} }
...@@ -677,43 +574,6 @@ int su_poll_port_wait_events(su_port_t *self, su_duration_t tout) ...@@ -677,43 +574,6 @@ int su_poll_port_wait_events(su_port_t *self, su_duration_t tout)
su_root_t *root; su_root_t *root;
unsigned version = self->sup_registers; unsigned version = self->sup_registers;
#if HAVE_EPOLL
if (self->sup_epoll != -1) {
int const M = 4;
struct epoll_event ev[M];
int j, index;
int *indices = self->sup_indices;
n = epoll_wait(self->sup_epoll, ev,
self->sup_multishot ? M : 1,
tout);
assert(n <= M);
for (j = 0; j < n; j++) {
su_root_t *root;
su_root_magic_t *magic;
if (!ev[j].events || ev[j].data.u32 > INDEX_MAX)
continue;
index = (int)ev[j].data.u32;
assert(index > 0 && index <= self->sup_size_waits);
i = indices[index]; assert(i >= 0 && i <= self->sup_n_waits);
root = self->sup_wait_roots[i];
magic = root ? su_root_magic(root) : NULL;
waits[i].revents = ev[j].events;
self->sup_wait_cbs[i](magic, &waits[i], self->sup_wait_args[i]);
events++;
/* Callback function used su_register()/su_deregister() */
if (version != self->sup_registers)
break;
}
return n < 0 ? n : events;
}
#endif
i = su_wait(waits, (unsigned)n, tout); i = su_wait(waits, (unsigned)n, tout);
if (i >= 0 && i < n) { if (i >= 0 && i < n) {
...@@ -792,16 +652,6 @@ su_port_t *su_poll_port_create(void) ...@@ -792,16 +652,6 @@ su_port_t *su_poll_port_create(void)
self->sup_multishot = SU_ENABLE_MULTISHOT_POLL; self->sup_multishot = SU_ENABLE_MULTISHOT_POLL;
#if HAVE_EPOLL
self->sup_epoll = epoll_create(su_root_size_hint);
if (self->sup_epoll == -1)
SU_DEBUG_3(("%s(%p): epoll_create() => %u: %s\n",
"su_port_create", self, self->sup_epoll, strerror(errno)));
else
SU_DEBUG_9(("%s(%p): epoll_create() => %u: %s\n",
"su_port_create", self, self->sup_epoll, "OK"));
#endif
if (su_pthread_port_init(self, su_poll_port_vtable) < 0) if (su_pthread_port_init(self, su_poll_port_vtable) < 0)
return su_home_unref(su_port_home(self)), NULL; return su_home_unref(su_port_home(self)), NULL;
......
...@@ -44,50 +44,63 @@ ...@@ -44,50 +44,63 @@
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
static su_port_t *(*create)(void); static su_port_t *(*preferred_su_port_create)(void);
/** Explicitly set the preferred su_port_t implementation.
*
* @sa su_epoll_port_create(), su_poll_port_create(), su_select_port_create()
*/
void su_port_prefer(su_port_t *(*implementation)(void)) void su_port_prefer(su_port_t *(*implementation)(void))
{ {
create = implementation; preferred_su_port_create = implementation;
} }
/** Create the preferred su_port_t implementation. /** Create the preferred su_port_t implementation.
*/ */
su_port_t *su_port_create(void) su_port_t *su_port_create(void)
{ {
if (create == NULL) { if (preferred_su_port_create == NULL) {
char const *SU_PORT = getenv("SU_PORT"); char const *SU_PORT = getenv("SU_PORT");
su_port_t *(*create)(void) = NULL;
if (SU_PORT == NULL) if (SU_PORT == NULL)
; ;
#if HAVE_POLL_PORT #if HAVE_POLL_PORT
#if HAVE_EPOLL #if HAVE_EPOLL
else if (strcmp("SU_PORT", "epoll") == 0) else if (strcmp(SU_PORT, "epoll") == 0)
create = su_epoll_port_create; create = su_epoll_port_create;
#endif #endif
else if (strcmp("SU_PORT", "poll") == 0) else if (strcmp(SU_PORT, "poll") == 0)
create = su_poll_port_create; create = su_poll_port_create;
#else
#error no poll!
#endif #endif
#if HAVE_SELECT #if HAVE_SELECT
else if (strcmp("SU_PORT", "select") == 0) else if (strcmp(SU_PORT, "select") == 0)
create = su_select_port_create; create = su_select_port_create;
#endif #endif
else {
if (create == NULL) {
create = su_epoll_port_create;
#if HAVE_POLL_PORT #if HAVE_POLL_PORT
#if HAVE_EPOLL #if HAVE_EPOLL
create = su_epoll_port_create; create = su_epoll_port_create;
#else #else
create = su_poll_port_create; create = su_poll_port_create;
#endif #endif
#endif #else
#if HAVE_SELECT #if HAVE_SELECT
create = su_select_port_create; create = su_select_port_create;
#endif
#endif #endif
} }
if (create)
preferred_su_port_create = create;
} }
if (create) if (preferred_su_port_create)
return create(); return preferred_su_port_create();
return NULL; return NULL;
} }
...@@ -155,6 +155,17 @@ SOFIAPUBFUN su_root_t *su_root_create_with_port(su_root_magic_t *magic, ...@@ -155,6 +155,17 @@ SOFIAPUBFUN su_root_t *su_root_create_with_port(su_root_magic_t *magic,
su_port_t *port) su_port_t *port)
__attribute__((__malloc__)); __attribute__((__malloc__));
/* ---------------------------------------------------------------------- */
/* React to multiple events per one poll() to make sure
* that high-priority events can never completely mask other events.
* Enabled by default on all platforms except WIN32 */
#if !defined(WIN32)
#define SU_ENABLE_MULTISHOT_POLL 1
#else
#define SU_ENABLE_MULTISHOT_POLL 0
#endif
/* ---------------------------------------------------------------------- */ /* ---------------------------------------------------------------------- */
/* Virtual functions */ /* Virtual functions */
......
...@@ -57,15 +57,6 @@ ...@@ -57,15 +57,6 @@
#include <sys/time.h> #include <sys/time.h>
#endif #endif
/* React to multiple events per one select() to make sure
* that high-priority events can never completely mask other events.
* Enabled by default on all platforms except WIN32 */
#if !defined(WIN32)
#define SU_ENABLE_MULTISHOT_POLL 1
#else
#define SU_ENABLE_MULTISHOT_POLL 0
#endif
/** Port based on select(). */ /** Port based on select(). */
struct su_select_port_s { struct su_select_port_s {
......
...@@ -41,6 +41,9 @@ struct su_root_magic_s; ...@@ -41,6 +41,9 @@ struct su_root_magic_s;
#include "su_poll_port.c" #include "su_poll_port.c"
#undef HAVE_EPOLL
#define HAVE_EPOLL 0
#if HAVE_FUNC #if HAVE_FUNC
#elif HAVE_FUNCTION #elif HAVE_FUNCTION
#define __func__ __FUNCTION__ #define __func__ __FUNCTION__
......
...@@ -48,6 +48,7 @@ char const *name = "su_root_test"; ...@@ -48,6 +48,7 @@ char const *name = "su_root_test";
typedef struct root_test_s root_test_t; typedef struct root_test_s root_test_t;
typedef struct test_ep_s test_ep_t; typedef struct test_ep_s test_ep_t;
typedef struct test_ep_s test_ep_at[1];
#define SU_ROOT_MAGIC_T root_test_t #define SU_ROOT_MAGIC_T root_test_t
#define SU_WAKEUP_ARG_T test_ep_t #define SU_WAKEUP_ARG_T test_ep_t
...@@ -55,14 +56,15 @@ typedef struct test_ep_s test_ep_t; ...@@ -55,14 +56,15 @@ typedef struct test_ep_s test_ep_t;
#include <sofia-sip/su_wait.h> #include <sofia-sip/su_wait.h>
#include <sofia-sip/su_alloc.h> #include <sofia-sip/su_alloc.h>
typedef struct test_ep_s { struct test_ep_s {
test_ep_t *next, **prev, **list;
int i; int i;
int s; int s;
su_wait_t wait[1]; su_wait_t wait[1];
int registered; int registered;
socklen_t addrlen; socklen_t addrlen;
su_sockaddr_t addr[1]; su_sockaddr_t addr[1];
} test_ep_at[1]; };
struct root_test_s { struct root_test_s {
su_home_t rt_home[1]; su_home_t rt_home[1];
...@@ -82,6 +84,8 @@ struct root_test_s { ...@@ -82,6 +84,8 @@ struct root_test_s {
unsigned rt_success_deinit:1; unsigned rt_success_deinit:1;
test_ep_at rt_ep[5]; test_ep_at rt_ep[5];
int rt_sockets, rt_woken;
}; };
/** Test root initialization */ /** Test root initialization */
...@@ -179,7 +183,6 @@ static int wakeup4(root_test_t *rt, su_wait_t *w, test_ep_t *ep) ...@@ -179,7 +183,6 @@ static int wakeup4(root_test_t *rt, su_wait_t *w, test_ep_t *ep)
static static
su_wakeup_f wakeups[5] = { wakeup0, wakeup1, wakeup2, wakeup3, wakeup4 }; su_wakeup_f wakeups[5] = { wakeup0, wakeup1, wakeup2, wakeup3, wakeup4 };
static static
void test_run(root_test_t *rt) void test_run(root_test_t *rt)
{ {
...@@ -271,6 +274,118 @@ static int register_test(root_test_t *rt) ...@@ -271,6 +274,118 @@ static int register_test(root_test_t *rt)
END(); END();
} }
int wakeup_remove(root_test_t *rt, su_wait_t *w, test_ep_t *node)
{
char buffer[64];
ssize_t x;
test_ep_t *n = node->next;
su_wait_events(w, node->s);
x = recv(node->s, buffer, sizeof(buffer), 0);
if (x < 0)
fprintf(stderr, "%s: %s\n", "recv", su_strerror(su_errno()));
if (node->prev) { /* first run */
*node->prev = n;
if (n) {
*node->prev = node->next;
node->next->prev = node->prev;
sendto(n->s, "foo", 3, 0, (void *)n->addr, n->addrlen);
}
node->next = NULL;
node->prev = NULL;
if (!*node->list) {
su_root_break(rt->rt_root);
}
}
else { /* second run */
if (++rt->rt_woken == rt->rt_sockets)
su_root_break(rt->rt_root);
}
return 0;
}
int event_test(root_test_t rt[1])
{
BEGIN();
int i = 0, N = 2048;
test_ep_t *n, *nodes, *list = NULL;
su_sockaddr_t su[1];
socklen_t sulen;
TEST_1(nodes = calloc(N, sizeof *nodes));
memset(su, 0, sulen = sizeof su);
su->su_len = sizeof su->su_sin;
su->su_family = AF_INET;
su->su_sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
for (i = 0; i < N; i++) {
n = nodes + i;
n->s = su_socket(AF_INET, SOCK_DGRAM, 0);
if (n->s == INVALID_SOCKET)
break;
n->addrlen = sizeof n->addr;
if (bind(n->s, (void *)su, sulen) < 0 ||
getsockname(n->s, (void *)n->addr, &n->addrlen) ||
su_wait_create(n->wait, n->s, SU_WAIT_IN)) {
su_close(n->s);
break;
}
n->registered = su_root_register(rt->rt_root, n->wait, wakeup_remove, n, 0);
if (n->registered < 0) {
su_wait_destroy(n->wait);
su_close(n->s);
break;
}
n->list = &list, n->prev = &list;
if ((n->next = list))
n->next->prev = &n->next;
list = n;
}
TEST_1(i >= 1);
N = i;
/* Wake up socket at a time */
n = list; sendto(n->s, "foo", 3, 0, (void *)n->addr, n->addrlen);
su_root_run(rt->rt_root);
for (i = 0; i < N; i++) {
n = nodes + i;
TEST_1(n->prev == NULL);
sendto(n->s, "bar", 3, 0, (void *)n->addr, n->addrlen);
}
rt->rt_sockets = N;
/* Wake up all sockets */
su_root_run(rt->rt_root);
for (i = 0; i < N; i++) {
n = nodes + i;
su_root_deregister(rt->rt_root, n->registered);
TEST_1(su_close(n->s) == 0);
}
END();
}
int fail_init(su_root_t *root, root_test_t *rt) int fail_init(su_root_t *root, root_test_t *rt)
{ {
rt->rt_fail_init = 1; rt->rt_fail_init = 1;
...@@ -353,6 +468,7 @@ int main(int argc, char *argv[]) ...@@ -353,6 +468,7 @@ int main(int argc, char *argv[])
retval |= init_test(rt); retval |= init_test(rt);
retval |= register_test(rt); retval |= register_test(rt);
retval |= event_test(rt);
retval |= clone_test(rt); retval |= clone_test(rt);
su_root_threading(rt->rt_root, 0); su_root_threading(rt->rt_root, 0);
retval |= clone_test(rt); retval |= clone_test(rt);
......