Commit 0099ccfa authored by Pekka Pessi's avatar Pekka Pessi

su_port: added working port using select(). refactored message box stuff into...

su_port: added working port using select(). refactored message box stuff into su_socket_port_t in su_socket_port.c.

darcs-hash:20070208182653-65a35-dcb0a6fe96c73f62c89052f9d1a5e5c10e3dc74b.gz
parent 4ff8b08b
......@@ -79,7 +79,7 @@ libsu_la_SOURCES = \
su_time.c su_time0.c \
su_wait.c su_root.c su_timer.c \
su_port.c su_port.h \
su_base_port.c su_pthread_port.c \
su_base_port.c su_pthread_port.c su_socket_port.c \
su_poll_port.c su_epoll_port.c su_select_port.c \
su_localinfo.c \
su_os_nw.c \
......@@ -94,6 +94,7 @@ EXTRA_libsu_la_SOURCES = \
inet_ntop.c inet_pton.c poll.c getopt.c \
su_tag_ref.c su_win32_port.c
libsu_la_CFLAGS = $(AM_CFLAGS) $(SOFIA_CFLAGS)
libsu_la_LIBADD = $(REPLACE_LIBADD)
libsu_la_DEPENDENCIES = $(REPLACE_LIBADD)
......
......@@ -2,18 +2,33 @@
rc=0
run=no
for SU_PORT in select epoll poll ; do
export SU_PORT
grep -q -i '^#define have_'$SU_PORT ../../config.h ||
continue
run=yes
if $VALGRIND ./test_su ; then
echo PASS: multithread test_su
echo PASS: multithread test_su with $SU_PORT
else
echo FAIL: multithread test_su failed
echo FAIL: multithread test_su with $SU_PORT failed
rc=1
fi
if $VALGRIND ./test_su -s ; then
echo PASS: singlethread test_su
echo PASS: singlethread test_su with $SU_PORT
else
echo FAIL: singlethread test_su failed
echo FAIL: singlethread test_su with $SU_PORT failed
rc=1
fi
done
test $run = no && exit 77
exit $rc
/*
* This file is part of the Sofia-SIP package
*
* Copyright (C) 2005 Nokia Corporation.
* Copyright (C) 2005, 2006, 2007 Nokia Corporation.
*
* Contact: Pekka Pessi <pekka.pessi@nokia.com>
*
......@@ -23,18 +23,21 @@
*/
/**@ingroup su_wait
* @CFILE su_poll_port.c
* @CFILE su_epoll_port.c
*
* Port implementation using poll()
* Port implementation using epoll(7)
*
* @author Pekka Pessi <Pekka.Pessi@nokia.com>
* @author Kai Vehmanen <kai.vehmanen@nokia.com>
*
* @date Created: Tue Sep 14 15:51:04 1999 ppessi
* @date Created: Fri Jan 26 20:44:14 2007 ppessi
* @date Original: Tue Sep 14 15:51:04 1999 ppessi
*/
#include "config.h"
#if HAVE_EPOLL
#include <stdlib.h>
#include <assert.h>
#include <stdarg.h>
......@@ -43,14 +46,12 @@
#include <limits.h>
#include <errno.h>
#define su_port_s su_poll_port_s
#define su_port_s su_epoll_port_s
#include "sofia-sip/su.h"
#include "su_port.h"
#include "sofia-sip/su_alloc.h"
#if HAVE_EPOLL
#include <sys/epoll.h>
#define POLL2EPOLL_NEEDED \
......@@ -62,8 +63,8 @@
/** Port based on epoll(). */
struct su_poll_port_s {
su_pthread_port_t sup_base[1];
struct su_epoll_port_s {
su_socket_port_t sup_base[1];
/** epoll fd */
int sup_epoll;
......@@ -88,7 +89,9 @@ struct su_poll_port_s {
} **sup_indices;
};
static void su_epoll_port_decref(su_port_t *self, int blocking, char const *who);
static void su_epoll_port_decref(su_port_t *self,
int blocking,
char const *who);
static int su_epoll_port_register(su_port_t *self,
su_root_t *root,
su_wait_t *wait,
......@@ -118,7 +121,7 @@ su_port_vtable_t const su_epoll_port_vtable[1] =
su_base_port_incref,
su_epoll_port_decref,
su_base_port_gsource,
su_pthread_port_send,
su_socket_port_send,
su_epoll_port_register,
su_epoll_port_unregister,
su_epoll_port_deregister,
......@@ -159,7 +162,7 @@ static void su_epoll_port_deinit(void *arg)
SU_DEBUG_9(("%s(%p) called\n", "su_epoll_port_deinit", (void* )self));
su_pthread_port_deinit(self);
su_socket_port_deinit(self->sup_base);
close(self->sup_epoll), self->sup_epoll = -1;
}
......@@ -434,9 +437,9 @@ int su_epoll_port_eventmask(su_port_t *self, int index, int socket, int events)
ev.data.u64 = (uint64_t)0;
ev.data.u32 = (uint32_t)index;
if (epoll_ctl(self->sup_epoll, EPOLL_CTL_MOD, ser->ser_wait->fd, &ev) == -1) {
if (epoll_ctl(self->sup_epoll, EPOLL_CTL_MOD, socket, &ev) == -1) {
SU_DEBUG_1(("su_port(%p): EPOLL_CTL_MOD(%u): %s\n", (void *)self,
ser->ser_wait->fd, su_strerror(su_errno())));
socket, su_strerror(su_errno())));
return -1;
}
......@@ -449,7 +452,7 @@ int su_epoll_port_eventmask(su_port_t *self, int index, int socket, int events)
* multishot mode determines how the events are scheduled by port. If
* multishot mode is enabled, port serves all the sockets that have received
* network events. If it is disabled, only first socket event is served.
p *
*
* @param self pointer to port object
* @param multishot multishot mode (0 => disables, 1 => enables, -1 => query)
*
......@@ -547,7 +550,7 @@ su_port_t *su_epoll_port_create(void)
self->sup_epoll = epoll;
self->sup_multishot = SU_ENABLE_MULTISHOT_POLL;
if (su_pthread_port_init(self, su_epoll_port_vtable) < 0)
if (su_socket_port_init(self->sup_base, su_epoll_port_vtable) < 0)
return su_home_unref(su_port_home(self)), NULL;
return self;
......@@ -562,6 +565,7 @@ int su_epoll_clone_start(su_root_t *parent,
return su_pthreaded_port_start(su_epoll_port_create,
parent, return_clone, magic, init, deinit);
}
#else
su_port_t *su_epoll_port_create(void)
......
......@@ -30,7 +30,8 @@
* @author Pekka Pessi <Pekka.Pessi@nokia.com>
* @author Kai Vehmanen <kai.vehmanen@nokia.com>
*
* @date Created: Tue Sep 14 15:51:04 1999 ppessi
* @date Create: Fri Jan 26 20:44:14 2007 ppessi
* @date Original: Tue Sep 14 15:51:04 1999 ppessi
*/
#include "config.h"
......@@ -54,9 +55,9 @@
/** Port based on poll(). */
struct su_poll_port_s {
su_pthread_port_t sup_base[1];
su_socket_port_t sup_base[1];
#define sup_home sup_base->sup_base->sup_home
#define sup_home sup_base->sup_base->sup_base->sup_home
unsigned sup_multishot; /**< Multishot operation? */
......@@ -124,7 +125,7 @@ su_port_vtable_t const su_poll_port_vtable[1] =
su_base_port_incref,
su_poll_port_decref,
su_base_port_gsource,
su_pthread_port_send,
su_socket_port_send,
su_poll_port_register,
su_poll_port_unregister,
su_poll_port_deregister,
......@@ -160,7 +161,7 @@ static void su_poll_port_deinit(void *arg)
SU_DEBUG_9(("%s(%p) called\n", "su_poll_port_deinit", (void *)self));
su_pthread_port_deinit(self);
su_socket_port_deinit(self->sup_base);
}
static void su_poll_port_decref(su_port_t *self, int blocking, char const *who)
......@@ -516,8 +517,8 @@ int su_poll_port_unregister_all(su_port_t *self,
/**Set mask for a registered event. @internal
*
* The function su_poll_port_eventmask() sets the mask describing events that can
* signal the registered callback.
* The function su_poll_port_eventmask() sets the mask describing events
* that can signal the registered callback.
*
* @param port pointer to port object
* @param index registration index
......@@ -663,7 +664,7 @@ su_port_t *su_poll_port_create(void)
self->sup_multishot = SU_ENABLE_MULTISHOT_POLL;
if (su_pthread_port_init(self, su_poll_port_vtable) < 0)
if (su_socket_port_init(self->sup_base, su_poll_port_vtable) < 0)
return su_home_unref(su_port_home(self)), NULL;
return self;
......
......@@ -518,22 +518,18 @@ SOFIAPUBFUN void su_base_port_wait(su_clone_r rclone);
#include <pthread.h>
#define SU_MBOX_SIZE 2
/** Pthread port object */
typedef struct su_pthread_port_s {
su_base_port_t sup_base[1];
struct su_pthread_port_waiting_parent
*sup_waiting_parent;
pthread_t sup_tid;
pthread_mutex_t sup_runlock[1];
#if 0
pthread_mutex_t sup_runlock[1];
pthread_cond_t sup_resume[1];
short sup_paused; /**< True if thread is paused */
#endif
short sup_thread; /**< True if thread is active */
short sup_mbox_index;
su_socket_t sup_mbox[SU_MBOX_SIZE];
} su_pthread_port_t;
/* Pthread methods */
......@@ -546,9 +542,9 @@ SOFIAPUBFUN void su_pthread_port_unlock(su_port_t *self, char const *who);
SOFIAPUBFUN int su_pthread_port_own_thread(su_port_t const *self);
#if 0 /* not yet */
SOFIAPUBFUN int su_pthread_port_send(su_port_t *self, su_msg_r rmsg);
#if 0 /* not yet */
SOFIAPUBFUN su_port_t *su_pthread_port_create(void);
SOFIAPUBFUN su_port_t *su_pthread_port_start(su_root_t *parent,
su_clone_r return_clone,
......@@ -584,12 +580,27 @@ typedef su_base_port_t su_pthread_port_t;
#define su_pthread_port_lock su_base_port_lock
#define su_pthread_port_unlock su_base_port_unlock
#define su_pthread_port_own_thread su_base_port_own_thread
#define su_pthread_port_send su_base_port_send
#define su_pthread_port_wait su_base_port_wait
#define su_pthread_port_execute su_base_port_execute
#endif
/* ====================================================================== */
/* Mailbox port using sockets */
#define SU_MBOX_SIZE 2
typedef struct su_socket_port_s {
su_pthread_port_t sup_base[1];
int sup_mbox_index;
su_socket_t sup_mbox[SU_MBOX_SIZE];
} su_socket_port_t;
SOFIAPUBFUN int su_socket_port_init(su_socket_port_t *,
su_port_vtable_t const *);
SOFIAPUBFUN void su_socket_port_deinit(su_socket_port_t *self);
SOFIAPUBFUN int su_socket_port_send(su_port_t *self, su_msg_r rmsg);
SOFIA_END_DECLS
#endif /* SU_PORT_H */
......@@ -61,24 +61,6 @@
#define SU_TASK_COPY(d, s, by) (void)((d)[0]=(s)[0], \
(s)->sut_port?(void)su_port_incref(s->sut_port, #by):(void)0)
#if HAVE_SOCKETPAIR
#define SU_MBOX_SEND 1
#else
#define SU_MBOX_SEND 0
#endif
/** @internal Message box wakeup function. */
static int su_mbox_port_wakeup(su_root_magic_t *magic, /* NULL */
su_wait_t *w,
su_wakeup_arg_t *arg)
{
char buf[32];
su_socket_t socket = *(su_socket_t*)arg;
su_wait_events(w, socket);
recv(socket, buf, sizeof(buf), 0);
return 0;
}
/**@internal
*
* Initializes a message port. It creates a mailbox used to wake up the
......@@ -90,93 +72,14 @@ int su_pthread_port_init(su_port_t *self, su_port_vtable_t const *vtable)
SU_DEBUG_9(("su_pthread_port_init(%p, %p) called\n",
(void *)self, (void *)vtable));
self->sup_tid = pthread_self();
if (su_base_port_init(self, vtable) == 0 &&
su_base_port_threadsafe(self) == 0) {
int af;
su_socket_t mb = INVALID_SOCKET;
su_wait_t wait[1] = { SU_WAIT_INIT };
char const *why;
self->sup_tid = pthread_self();
#if 0
pthread_mutex_init(self->sup_runlock, NULL);
pthread_mutex_lock(self->sup_runlock);
pthread_cond_init(self->sup_resume, NULL);
#endif
#if HAVE_SOCKETPAIR
#if defined(AF_LOCAL)
af = AF_LOCAL;
#else
af = AF_UNIX;
#endif
if (socketpair(af, SOCK_STREAM, 0, self->sup_mbox) == -1) {
why = "socketpair"; goto error;
}
mb = self->sup_mbox[0];
su_setblocking(self->sup_mbox[1], 0);
#else
{
struct sockaddr_in sin = { sizeof(struct sockaddr_in), 0 };
socklen_t sinsize = sizeof sin;
struct sockaddr *sa = (struct sockaddr *)&sin;
af = PF_INET;
self->sup_mbox[0] = mb = su_socket(af, SOCK_DGRAM, IPPROTO_UDP);
if (mb == INVALID_SOCKET) {
why = "socket"; goto error;
}
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(INADDR_LOOPBACK); /* 127.1 */
/* Get a port for us */
if (bind(mb, sa, sizeof sin) == -1) {
why = "bind"; goto error;
}
if (getsockname(mb, sa, &sinsize) == -1) {
why = "getsockname"; goto error;
}
if (connect(mb, sa, sinsize) == -1) {
why = "connect"; goto error;
}
}
#endif
su_setblocking(mb, 0);
if (su_wait_create(wait, mb, SU_WAIT_IN) == -1) {
why = "su_wait_create";
goto error;
}
self->sup_mbox_index = su_port_register(self, NULL, wait,
su_mbox_port_wakeup,
(void *)self->sup_mbox, 0);
if (self->sup_mbox_index <= 0) {
why = "su_port_register";
su_wait_destroy(wait);
goto error;
}
SU_DEBUG_9(("%s() returns %d\n", "su_pthread_port_init", 0));
return 0;
error:
su_log("%s: %s: %s\n",
"su_pthread_port_init", why, su_strerror(su_errno()));
su_pthread_port_deinit(self);
}
SU_DEBUG_9(("%s() returns %d\n", "su_pthread_port_init", -1));
else
su_base_port_deinit(self);
return -1;
}
......@@ -187,17 +90,6 @@ void su_pthread_port_deinit(su_port_t *self)
{
assert(self);
if (self->sup_mbox_index > 0)
su_port_deregister(self, self->sup_mbox_index);
self->sup_mbox_index = 0;
if (self->sup_mbox[0] && self->sup_mbox[0] != INVALID_SOCKET)
su_close(self->sup_mbox[0]); self->sup_mbox[0] = INVALID_SOCKET;
#if HAVE_SOCKETPAIR
if (self->sup_mbox[1] && self->sup_mbox[1] != INVALID_SOCKET)
su_close(self->sup_mbox[1]); self->sup_mbox[1] = INVALID_SOCKET;
#endif
#if 0
pthread_mutex_destroy(self->sup_runlock);
pthread_cond_destroy(self->sup_resume);
......@@ -225,27 +117,6 @@ void su_pthread_port_unlock(su_port_t *self, char const *who)
(void *)pthread_self(), who, self));
}
/** @internal Send a message to the port. */
int su_pthread_port_send(su_port_t *self, su_msg_r rmsg)
{
int wakeup = su_base_port_send(self, rmsg);
if (wakeup < 0)
return -1;
if (wakeup == 0)
return 0;
assert(self->sup_mbox[SU_MBOX_SEND] != INVALID_SOCKET);
if (send(self->sup_mbox[SU_MBOX_SEND], "X", 1, 0) == -1) {
#if HAVE_SOCKETPAIR
if (su_errno() != EWOULDBLOCK)
#endif
su_perror("su_msg_send: send()");
}
return 0;
}
/** @internal
* Checks if the calling thread owns the port object.
*
......
This diff is collapsed.
/*
* This file is part of the Sofia-SIP package
*
* Copyright (C) 2005 Nokia Corporation.
*
* Contact: Pekka Pessi <pekka.pessi@nokia.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA
*
*/
/**@ingroup su_wait
* @CFILE su_socket_port.c
*
* OS-Independent Syncronization Interface with socket mailbox
*
* This implements wakeup using sockets by su_port_send().
*
* @author Pekka Pessi <Pekka.Pessi@nokia.com>
* @author Kai Vehmanen <kai.vehmanen@nokia.com>
*
* @date Created: Tue Sep 14 15:51:04 1999 ppessi
*/
#include "config.h"
#include <stdlib.h>
#include <assert.h>
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
#include <limits.h>
#include <errno.h>
#define su_socket_port_s su_port_s
#define SU_CLONE_T su_msg_t
#include "sofia-sip/su.h"
#include "su_port.h"
#include "sofia-sip/su_alloc.h"
#if HAVE_SOCKETPAIR
#define SU_MBOX_SEND 1
#else
#define SU_MBOX_SEND 0
#endif
/** @internal Message box wakeup function. */
static int su_mbox_port_wakeup(su_root_magic_t *magic, /* NULL */
su_wait_t *w,
su_wakeup_arg_t *arg)
{
char buf[32];
su_socket_t socket = *(su_socket_t*)arg;
su_wait_events(w, socket);
recv(socket, buf, sizeof(buf), 0);
return 0;
}
/**@internal
*
* Initializes a message port. It creates a mailbox used to wake up the
* thread waiting on the port if needed. Currently, the mailbox is a
* socketpair or an UDP socket connected to itself.
*/
int su_socket_port_init(su_port_t *self, su_port_vtable_t const *vtable)
{
int retval = -1;
int af;
su_socket_t mb = INVALID_SOCKET;
su_wait_t wait[1] = { SU_WAIT_INIT };
char const *why;
SU_DEBUG_9(("su_socket_port_init(%p, %p) called\n", self, vtable));
if (su_pthread_port_init(self, vtable) != 0)
return -1;
#if HAVE_SOCKETPAIR
#if defined(AF_LOCAL)
af = AF_LOCAL;
#else
af = AF_UNIX;
#endif
if (socketpair(af, SOCK_STREAM, 0, self->sup_mbox) == -1) {
why = "socketpair"; goto error;
}
mb = self->sup_mbox[0];
su_setblocking(self->sup_mbox[1], 0);
#else
{
struct sockaddr_in sin = { sizeof(struct sockaddr_in), 0 };
socklen_t sinsize = sizeof sin;
struct sockaddr *sa = (struct sockaddr *)&sin;
af = PF_INET;
self->sup_mbox[0] = mb = su_socket(af, SOCK_DGRAM, IPPROTO_UDP);
if (mb == INVALID_SOCKET) {
why = "socket"; goto error;
}
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(INADDR_LOOPBACK); /* 127.1 */
/* Get a port for us */
if (bind(mb, sa, sizeof sin) == -1) {
why = "bind"; goto error;
}
if (getsockname(mb, sa, &sinsize) == -1) {
why = "getsockname"; goto error;
}
if (connect(mb, sa, sinsize) == -1) {
why = "connect"; goto error;
}
}
#endif
if (su_wait_create(wait, mb, SU_WAIT_IN) == -1) {
why = "su_wait_create";
goto error;
}
self->sup_mbox_index = su_port_register(self, NULL, wait,
su_mbox_port_wakeup,
(void *)self->sup_mbox, 0);
if (self->sup_mbox_index <= 0) {
why = "su_port_register";
su_wait_destroy(wait);
goto error;
}
retval = 0;
if (retval) {
error:
su_log("%s: %s: %s\n", "su_socket_port_init",
why, su_strerror(su_errno()));
}
return retval;
}
/** @internal Deinit a base implementation of port. */
void su_socket_port_deinit(su_port_t *self)
{
assert(self);
if (self->sup_mbox_index > 0)
su_port_deregister(self, self->sup_mbox_index);
self->sup_mbox_index = 0;
if (self->sup_mbox[0] && self->sup_mbox[0] != INVALID_SOCKET)
su_close(self->sup_mbox[0]); self->sup_mbox[0] = INVALID_SOCKET;
#if HAVE_SOCKETPAIR
if (self->sup_mbox[1] && self->sup_mbox[1] != INVALID_SOCKET)
su_close(self->sup_mbox[1]); self->sup_mbox[1] = INVALID_SOCKET;
#endif
su_pthread_port_deinit(self);
}
/** @internal Send a message to the port. */
int su_socket_port_send(su_port_t *self, su_msg_r rmsg)
{
int wakeup = su_base_port_send(self, rmsg);
if (wakeup < 0)
return -1;
if (wakeup) {
assert(self->sup_mbox[SU_MBOX_SEND] != INVALID_SOCKET);
if (send(self->sup_mbox[SU_MBOX_SEND], "X", 1, 0) == -1) {
#if HAVE_SOCKETPAIR
if (su_errno() != EWOULDBLOCK)
#endif
su_perror("su_msg_send: send()");
}
}
return 0;
}
......@@ -45,18 +45,21 @@
#include <limits.h>
#include <errno.h>
#define su_port_s su_poll_port_s
#define su_port_s su_win32_port_s
#include "sofia-sip/su.h"
#include "su_port.h"
#include "sofia-sip/su_alloc.h"
/** Port based on poll(). */
/** Port based on su_wait() aka WSAWaitForMultipleEvents. */
struct su_poll_port_s {
su_pthread_port_t sup_base[1];
/* We use WSAWaitForMultipleEvents() */
#define INDEX_MAX (64)
struct su_win32_port_s {
su_socket_port_t sup_base[1];
#define sup_home sup_base->sup_base->sup_home
#define sup_home sup_base->sup_base->sup_base->sup_home
unsigned sup_multishot; /**< Multishot operation? */
......@@ -69,13 +72,6 @@ struct su_poll_port_s {
int sup_size_waits; /**< Size of allocated su_waits */
int sup_pri_offset; /**< Offset to prioritized waits */
#if !SU_HAVE_WINSOCK
#define INDEX_MAX (0x7fffffff)
#else
/* We use WSAWaitForMultipleEvents() */
#define INDEX_MAX (64)
#endif
/** Indices from index returned by su_root_register() to tables below.
*
* Free elements are negative. Free elements form a list, value of free
......@@ -124,7 +120,7 @@ su_port_vtable_t const su_poll_port_vtable[1] =
su_base_port_incref,
su_poll_port_decref,
su_base_port_gsource,
su_pthread_port_send,
su_socket_port_send,
su_poll_port_register,
su_poll_port_unregister,
su_poll_port_deregister,
......@@ -160,7 +156,7 @@ static void su_poll_port_deinit(void *arg)
SU_DEBUG_9(("%s(%p) called\n", "su_poll_port_deinit", self));
su_pthread_port_deinit(self);
su_socket_port_deinit(self->sup_base);
}
static void su_poll_port_decref(su_port_t *self, int blocking, char const *who)
......@@ -668,7 +664,7 @@ su_port_t *su_poll_port_create(void)
self->sup_multishot = SU_ENABLE_MULTISHOT_POLL;
if (su_pthread_port_init(self, su_poll_port_vtable) < 0)
if (su_socket_port_init(self->sup_base, su_poll_port_vtable) < 0)
return su_home_unref(su_port_home(self)), NULL;
return self;
......
......@@ -100,7 +100,7 @@ int test_sockaddr(void)
TEST(su_setblocking(s, 1), 0);
TEST(su_close(s), 0);
su_freelocalinfo(res);