Commit 4fb20afe authored by Pekka Pessi's avatar Pekka Pessi

su: added su_root_obtain(), su_root_release() and su_root_has_thread()

When root is created with su_root_create() or cloned with su_clone_start(),
the resulting root is obtained by the calling or created thread,
respectively.

The root can be released with su_root_release() and another thread can
obtain it.

The function su_root_has_thread() can be used to check if a thread has
obtained or released the root.

Implementation upgraded the su_port_own_thread() method as su_port_thread().

darcs-hash:20071005112209-55b16-a77833d930ab3e9d57c52cd32b01eef5507476d3.gz
parent 84b933aa
......@@ -118,7 +118,7 @@ static int su_source_eventmask(su_port_t *self,
static void su_source_run(su_port_t *self);
static void su_source_break(su_port_t *self);
static su_duration_t su_source_step(su_port_t *self, su_duration_t tout);
static int su_source_own_thread(su_port_t const *port);
static int su_source_thread(su_port_t *self, enum su_port_thread_op op);
static int su_source_add_prepoll(su_port_t *port,
su_root_t *root,
su_prepoll_f *,
......@@ -150,7 +150,7 @@ su_port_vtable_t const su_source_port_vtable[1] =
su_source_run,
su_source_break,
su_source_step,
su_source_own_thread,
su_source_thread,
su_source_add_prepoll,
su_source_remove_prepoll,
su_base_port_timers,
......@@ -180,6 +180,8 @@ struct su_source_s {
su_base_port_t sup_base[1];
GThread *sup_tid;
GStaticMutex sup_obtained[1];
GStaticMutex sup_mutex[1];
GSource *sup_source; /**< Backpointer to source */
......@@ -267,7 +269,8 @@ static int su_source_port_init(su_port_t *self,
GSource *gs = (GSource *)((char *)self - offsetof(SuSource, ss_port));
self->sup_source = gs;
self->sup_tid = g_thread_self();
g_static_mutex_init(self->sup_obtained);
g_static_mutex_init(self->sup_mutex);
......@@ -280,6 +283,7 @@ static void su_source_port_deinit(su_port_t *self)
su_base_port_deinit(self);
g_static_mutex_free(self->sup_mutex);
g_static_mutex_free(self->sup_obtained);
su_home_deinit(self->sup_base->sup_home);
}
......@@ -334,16 +338,46 @@ int su_source_send(su_port_t *self, su_msg_r rmsg)
}
/** @internal
* Checks if the calling thread owns the port object.
*
* Change or query ownership of the port object.
*
* @param self pointer to a port object
* @param op operation
*
* @retval true (nonzero) if the calling thread owns the port,
* @retval false (zero) otherwise.
* @ERRORS
* @ERROR EALREADY port already has an owner (or has no owner)
*/
int su_source_own_thread(su_port_t const *self)
static int su_source_thread(su_port_t *self, enum su_port_thread_op op)
{
return self == NULL || SU_SOURCE_OWN_THREAD(self);
GThread *me = g_thread_self();
switch (op) {
case su_port_thread_op_is_obtained:
if (self->sup_tid == me)
return 2;
else if (self->sup_tid)
return 1;
else
return 0;
case su_port_thread_op_release:
if (self->sup_tid != me)
return errno = EALREADY, -1;
self->sup_tid = NULL;
g_static_mutex_unlock(self->sup_obtained);
return 0;
case su_port_thread_op_obtain:
if (su_home_threadsafe(su_port_home(self)) == -1)
return -1;
g_static_mutex_lock(self->sup_obtained);
self->sup_tid = me;
return 0;
default:
return errno = ENOSYS, -1;
}
}
/* -- Registering and unregistering ------------------------------------- */
......
......@@ -477,6 +477,10 @@ SOFIAPUBFUN struct _GSource *su_root_gsource(su_root_t *self);
SOFIAPUBFUN int su_root_yield(su_root_t *root);
SOFIAPUBFUN int su_root_release(su_root_t *root);
SOFIAPUBFUN int su_root_obtain(su_root_t *root);
SOFIAPUBFUN int su_root_has_thread(su_root_t *root);
/* Timers */
SOFIAPUBFUN su_timer_t *su_timer_create(su_task_r const, su_duration_t msec)
__attribute__((__malloc__));
......
......@@ -73,8 +73,7 @@ int su_base_port_init(su_port_t *self, su_port_vtable_t const *vtable)
if (self) {
self->sup_vtable = vtable;
self->sup_tail = &self->sup_head;
return 0;
return su_port_obtain(self);
}
return -1;
......@@ -83,6 +82,8 @@ int su_base_port_init(su_port_t *self, su_port_vtable_t const *vtable)
/** @internal Deinit a base implementation of port. */
void su_base_port_deinit(su_port_t *self)
{
if (su_port_own_thread(self))
su_port_release(self);
}
void su_base_port_lock(su_port_t *self, char const *who)
......@@ -93,9 +94,27 @@ void su_base_port_unlock(su_port_t *self, char const *who)
{
}
int su_base_port_own_thread(su_port_t const *self)
/** @internal Dummy implementation of su_port_thread() method.
*
* Currently this is only used if SU_HAVE_PTHREADS is 0.
*/
int su_base_port_thread(su_port_t const *self,
enum su_port_thread_op op)
{
return 1;
switch (op) {
case su_port_thread_op_is_obtained:
return 2; /* Current thread has obtained the port */
case su_port_thread_op_release:
return errno = ENOSYS, -1;
case su_port_thread_op_obtain:
return 0; /* Allow initial obtain */
default:
return errno = ENOSYS, -1;
}
}
void su_base_port_incref(su_port_t *self, char const *who)
......
......@@ -136,7 +136,7 @@ su_port_vtable_t const su_devpoll_port_vtable[1] =
su_base_port_run,
su_base_port_break,
su_base_port_step,
su_pthread_port_own_thread,
su_pthread_port_thread,
su_base_port_add_prepoll,
su_base_port_remove_prepoll,
su_base_port_timers,
......
......@@ -131,7 +131,7 @@ su_port_vtable_t const su_epoll_port_vtable[1] =
su_base_port_run,
su_base_port_break,
su_base_port_step,
su_pthread_port_own_thread,
su_pthread_port_thread,
su_base_port_add_prepoll,
su_base_port_remove_prepoll,
su_base_port_timers,
......
......@@ -128,7 +128,7 @@ su_port_vtable_t const su_kqueue_port_vtable[1] =
su_base_port_run,
su_base_port_break,
su_base_port_step,
su_pthread_port_own_thread,
su_pthread_port_thread,
su_base_port_add_prepoll,
su_base_port_remove_prepoll,
su_base_port_timers,
......
......@@ -192,7 +192,7 @@ su_port_vtable_t const su_osx_port_vtable[1] =
su_osx_port_run,
su_osx_port_break,
su_osx_port_step,
su_pthread_port_own_thread,
su_pthread_port_thread,
su_base_port_add_prepoll,
su_base_port_remove_prepoll,
su_base_port_timers,
......
......@@ -129,7 +129,7 @@ su_port_vtable_t const su_poll_port_vtable[1] =
su_base_port_run,
su_base_port_break,
su_base_port_step,
su_pthread_port_own_thread,
su_pthread_port_thread,
su_base_port_add_prepoll,
su_base_port_remove_prepoll,
su_base_port_timers,
......
......@@ -85,6 +85,12 @@ struct su_root_s {
#define SU_ROOT_MAGIC(r) ((r) ? (r)->sur_magic : NULL)
enum su_port_thread_op {
su_port_thread_op_is_obtained,
su_port_thread_op_release,
su_port_thread_op_obtain
};
/** @internal Virtual function table for port */
typedef struct su_port_vtable {
unsigned su_vtable_size;
......@@ -113,7 +119,8 @@ typedef struct su_port_vtable {
void (*su_port_break)(su_port_t *self);
su_duration_t (*su_port_step)(su_port_t *self, su_duration_t tout);
int (*su_port_own_thread)(su_port_t const *port);
/* Reused slot */
int (*su_port_thread)(su_port_t *port, enum su_port_thread_op op);
int (*su_port_add_prepoll)(su_port_t *port,
su_root_t *root,
......@@ -324,7 +331,26 @@ su_inline
int su_port_own_thread(su_port_t const *self)
{
su_virtual_port_t const *base = (su_virtual_port_t *)self;
return base->sup_vtable->su_port_own_thread(self);
return base->sup_vtable->
su_port_thread((su_port_t *)self, su_port_thread_op_is_obtained) == 2;
}
su_inline int su_port_has_thread(su_port_t *self)
{
su_virtual_port_t *base = (su_virtual_port_t *)self;
return base->sup_vtable->su_port_thread(self, su_port_thread_op_is_obtained);
}
su_inline int su_port_release(su_port_t *self)
{
su_virtual_port_t *base = (su_virtual_port_t *)self;
return base->sup_vtable->su_port_thread(self, su_port_thread_op_release);
}
su_inline int su_port_obtain(su_port_t *self)
{
su_virtual_port_t *base = (su_virtual_port_t *)self;
return base->sup_vtable->su_port_thread(self, su_port_thread_op_obtain);
}
su_inline
......@@ -421,7 +447,8 @@ SOFIAPUBFUN void su_base_port_deinit(su_port_t *self);
SOFIAPUBFUN void su_base_port_lock(su_port_t *self, char const *who);
SOFIAPUBFUN void su_base_port_unlock(su_port_t *self, char const *who);
SOFIAPUBFUN int su_base_port_own_thread(su_port_t const *self);
SOFIAPUBFUN int su_base_port_thread(su_port_t const *self,
enum su_port_thread_op op);
SOFIAPUBFUN void su_base_port_incref(su_port_t *self, char const *who);
SOFIAPUBFUN int su_base_port_decref(su_port_t *self,
......@@ -451,7 +478,9 @@ SOFIAPUBFUN int su_base_port_remove_prepoll(su_port_t *self, su_root_t *root);
SOFIAPUBFUN su_timer_t **su_base_port_timers(su_port_t *self);
SOFIAPUBFUN int su_base_port_multishot(su_port_t *self, int multishot);
SOFIAPUBFUN int su_base_port_threadsafe(su_port_t *self);
SOFIAPUBFUN int su_base_port_yield(su_port_t *self);
SOFIAPUBFUN int su_base_port_start_shared(su_root_t *parent,
......@@ -473,6 +502,8 @@ typedef struct su_pthread_port_s {
struct su_pthread_port_waiting_parent
*sup_waiting_parent;
pthread_t sup_tid;
pthread_mutex_t sup_obtained[1];
#if 0
pthread_mutex_t sup_runlock[1];
pthread_cond_t sup_resume[1];
......@@ -489,7 +520,8 @@ SOFIAPUBFUN void su_pthread_port_deinit(su_port_t *self);
SOFIAPUBFUN void su_pthread_port_lock(su_port_t *self, char const *who);
SOFIAPUBFUN void su_pthread_port_unlock(su_port_t *self, char const *who);
SOFIAPUBFUN int su_pthread_port_own_thread(su_port_t const *self);
SOFIAPUBFUN int su_pthread_port_thread(su_port_t *self,
enum su_port_thread_op op);
#if 0 /* not yet */
SOFIAPUBFUN int su_pthread_port_send(su_port_t *self, su_msg_r rmsg);
......@@ -528,7 +560,7 @@ typedef su_base_port_t su_pthread_port_t;
#define su_pthread_port_deinit su_base_port_deinit
#define su_pthread_port_lock su_base_port_lock
#define su_pthread_port_unlock su_base_port_unlock
#define su_pthread_port_own_thread su_base_port_own_thread
#define su_pthread_port_thread su_base_port_thread
#define su_pthread_port_wait su_base_port_wait
#define su_pthread_port_execute su_base_port_execute
......
......@@ -72,16 +72,9 @@ int su_pthread_port_init(su_port_t *self, su_port_vtable_t const *vtable)
SU_DEBUG_9(("su_pthread_port_init(%p, %p) called\n",
(void *)self, (void *)vtable));
self->sup_tid = pthread_self();
pthread_mutex_init(self->sup_obtained, NULL);
if (su_base_port_init(self, vtable) == 0 &&
su_base_port_threadsafe(self) == 0) {
return 0;
}
else
su_base_port_deinit(self);
return -1;
return su_base_port_init(self, vtable);
}
......@@ -90,12 +83,13 @@ void su_pthread_port_deinit(su_port_t *self)
{
assert(self);
su_base_port_deinit(self);
#if 0
pthread_mutex_destroy(self->sup_runlock);
pthread_cond_destroy(self->sup_resume);
#endif
su_base_port_deinit(self);
pthread_mutex_destroy(self->sup_obtained);
}
void su_pthread_port_lock(su_port_t *self, char const *who)
......@@ -118,17 +112,46 @@ void su_pthread_port_unlock(su_port_t *self, char const *who)
}
/** @internal
* Checks if the calling thread owns the port object.
*
* Change or query ownership of the port object.
*
* @param self pointer to a port object
* @param op operation
*
* @retval true (nonzero) if the calling thread owns the port,
* @retval false (zero) otherwise.
* @ERRORS
* @ERROR EALREADY port already has an owner (or has no owner)
*/
int su_pthread_port_own_thread(su_port_t const *self)
int su_pthread_port_thread(su_port_t *self, enum su_port_thread_op op)
{
return self == NULL ||
pthread_equal(self->sup_tid, pthread_self());
pthread_t me = pthread_self();
switch (op) {
case su_port_thread_op_is_obtained:
if (self->sup_thread == 0)
return 0; /* No thread has obtained the port */
else if (pthread_equal(self->sup_tid, me))
return 2; /* Current thread has obtained the port */
else
return 1; /* A thread has obtained the port */
case su_port_thread_op_release:
if (!self->sup_thread || !pthread_equal(self->sup_tid, me))
return errno = EALREADY, -1;
self->sup_thread = 0;
pthread_mutex_unlock(self->sup_obtained);
return 0;
case su_port_thread_op_obtain:
su_home_threadsafe(su_port_home(self));
pthread_mutex_lock(self->sup_obtained);
self->sup_tid = me;
self->sup_thread = 1;
return 0;
default:
return errno = ENOSYS, -1;
}
}
/* -- Clones ------------------------------------------------------------ */
......
......@@ -837,6 +837,53 @@ int su_root_remove_prepoll(su_root_t *root)
return su_port_remove_prepoll(root->sur_port, root);
}
/** Release the root port for other threads.
*
* @NEW_1_12_7
*/
int su_root_release(su_root_t *root)
{
if (root == NULL || root->sur_port == NULL)
return (void)(errno = EFAULT), -1;
return su_port_release(root->sur_port);
}
/** Obtain the root port from other thread.
*
* @param root pointer to root object
*
* @retval 0 if successful
* @retval -1 upon an error
*
* @ERRORS
* @ERROR EFAULT
* @NEW_1_12_7
*/
int su_root_obtain(su_root_t *root)
{
if (root == NULL || root->sur_port == NULL)
return (void)(errno = EFAULT), -1;
return su_port_obtain(root->sur_port);
}
/**Check if a thread has obtained the root.
*
* @param root a pointer to root object
*
* @retval 2 if current thread has obtained the root
* @retval 1 if an another thread has obtained the root
* @retval 0 if no thread has obtained the root
* @retval -1 upon an error
*
* @NEW_1_12_7
*/
int su_root_has_thread(su_root_t *root)
{
if (root == NULL || root->sur_port == NULL)
return (void)(errno = EFAULT), -1;
return su_port_has_thread(root->sur_port);
}
/* =========================================================================
* Messages
*/
......
......@@ -148,7 +148,7 @@ su_port_vtable_t const su_select_port_vtable[1] =
su_base_port_run,
su_base_port_break,
su_base_port_step,
su_pthread_port_own_thread,
su_pthread_port_thread,
su_base_port_add_prepoll,
su_base_port_remove_prepoll,
su_base_port_timers,
......
......@@ -25,7 +25,7 @@
/**@ingroup su_wait
* @CFILE su_win32_port.c
*
* Port implementation using WSAEVENTs
* Port implementation using WSAEVENTs. Incomplete.
*
* @author Pekka Pessi <Pekka.Pessi@nokia.com>
* @author Kai Vehmanen <kai.vehmanen@nokia.com>
......@@ -127,7 +127,7 @@ su_port_vtable_t const su_wsevent_port_vtable[1] =
su_base_port_run,
su_base_port_break,
su_base_port_step,
su_pthread_port_own_thread,
su_pthread_port_thread,
su_base_port_add_prepoll,
su_base_port_remove_prepoll,
su_base_port_timers,
......
......@@ -55,6 +55,10 @@ typedef struct test_ep_s test_ep_t;
#include <sofia-sip/su_alloc.h>
#include <sofia-sip/su_log.h>
#if SU_HAVE_PTHREADS
#include <pthread.h>
#endif
struct test_ep_s {
test_ep_t *next, **prev, **list;
int i;
......@@ -95,8 +99,88 @@ struct root_test_s {
test_ep_at rt_ep[5];
int rt_sockets, rt_woken;
#if SU_HAVE_PTHREADS
struct {
/* Used to test su_root_obtain()/su_root_release() */
pthread_mutex_t mutex[1];
pthread_cond_t cond[1];
pthread_t slave;
int done;
pthread_mutex_t deinit[1];
} rt_sr;
#endif
};
int test_api(root_test_t *rt)
{
BEGIN();
TEST_1(rt->rt_root = su_root_create(NULL));
TEST_VOID(su_root_destroy(NULL));
TEST_P(su_root_name(NULL), NULL);
TEST_1(su_root_name(rt->rt_root) != NULL);
TEST(su_root_set_magic(NULL, rt), -1);
TEST_P(su_root_magic(rt->rt_root), NULL);
TEST(su_root_set_magic(rt->rt_root, rt), 0);
TEST_P(su_root_magic(rt->rt_root), rt);
TEST_P(su_root_magic(NULL), NULL);
TEST(su_root_register(NULL, NULL, NULL, NULL, 0), -1);
TEST(su_root_unregister(NULL, NULL, NULL, NULL), -1);
TEST(su_root_deregister(NULL, 0), -1);
TEST(su_root_deregister(rt->rt_root, 0), -1);
TEST(su_root_deregister(rt->rt_root, -1), -1);
TEST(su_root_eventmask(NULL, 0, -1, -1), -1);
TEST((long int)su_root_step(NULL, 0), -1L);
TEST((long int)su_root_sleep(NULL, 0), -1L);
TEST(su_root_multishot(NULL, 0), -1);
TEST_VOID((void)su_root_run(NULL));
TEST_VOID((void)su_root_break(NULL));
TEST_M(su_root_task(NULL), su_task_null, sizeof su_task_null);
TEST_M(su_root_parent(NULL), su_task_null, sizeof su_task_null);
TEST(su_root_add_prepoll(NULL, NULL, NULL), -1);
TEST(su_root_remove_prepoll(NULL), -1);
TEST_P(su_root_gsource(NULL), NULL);
TEST_VOID((void)su_root_gsource(rt->rt_root));
TEST(su_root_yield(NULL), -1);
TEST(su_root_release(NULL), -1);
TEST(su_root_obtain(NULL), -1);
TEST(su_root_has_thread(NULL), -1);
TEST(su_root_has_thread(rt->rt_root), 2);
TEST(su_root_release(rt->rt_root), 0);
TEST(su_root_has_thread(rt->rt_root), 0);
TEST(su_root_obtain(rt->rt_root), 0);
TEST(su_root_has_thread(rt->rt_root), 2);
TEST_VOID((void)su_root_destroy(rt->rt_root));
rt->rt_root = NULL;
END();
}
#if SU_HAVE_PTHREADS
#include <pthread.h>
void *suspend_resume_test_thread(void *_rt)
{
root_test_t *rt = _rt;
pthread_mutex_lock(rt->rt_sr.mutex);
rt->rt_root = su_root_create(rt);
rt->rt_sr.done = 1;
pthread_cond_signal(rt->rt_sr.cond);
pthread_mutex_unlock(rt->rt_sr.mutex);
su_root_release(rt->rt_root);
pthread_mutex_lock(rt->rt_sr.deinit);
su_root_obtain(rt->rt_root);
su_root_destroy(rt->rt_root);
rt->rt_root = NULL;
pthread_mutex_unlock(rt->rt_sr.deinit);
return NULL;
}
#endif
/** Test root initialization */
int init_test(root_test_t *rt,
char const *preference,
......@@ -112,7 +196,26 @@ int init_test(root_test_t *rt,
su_port_prefer(create, start);
#if SU_HAVE_PTHREADS
pthread_mutex_init(rt->rt_sr.mutex, NULL);
pthread_cond_init(rt->rt_sr.cond, NULL);
pthread_mutex_init(rt->rt_sr.deinit, NULL);
pthread_mutex_lock(rt->rt_sr.deinit);
pthread_create(&rt->rt_sr.slave, NULL, suspend_resume_test_thread, rt);
pthread_mutex_lock(rt->rt_sr.mutex);
while (rt->rt_sr.done == 0)
pthread_cond_wait(rt->rt_sr.cond, rt->rt_sr.mutex);
pthread_mutex_unlock(rt->rt_sr.mutex);
TEST_1(rt->rt_root);
TEST(su_root_obtain(rt->rt_root), 0);
TEST(su_root_has_thread(rt->rt_root), 2);
#else
TEST_1(rt->rt_root = su_root_create(rt));
#endif
printf("%s: testing %s (%s) implementation\n",
name, preference, su_root_name(rt->rt_root));
......@@ -141,8 +244,18 @@ static int deinit_test(root_test_t *rt)
{
BEGIN();
#if SU_HAVE_PTHREADS
TEST(su_root_has_thread(rt->rt_root), 2);
TEST(su_root_release(rt->rt_root), 0);
TEST(su_root_has_thread(rt->rt_root), 0);
pthread_mutex_unlock(rt->rt_sr.deinit);
pthread_join(rt->rt_sr.slave, NULL);
pthread_mutex_destroy(rt->rt_sr.mutex);
pthread_cond_destroy(rt->rt_sr.cond);
pthread_mutex_destroy(rt->rt_sr.deinit);
#else
TEST_VOID(su_root_destroy(rt->rt_root)); rt->rt_root = NULL;
TEST_VOID(su_root_destroy(NULL));
#endif
su_deinit();
......@@ -614,6 +727,8 @@ int main(int argc, char *argv[])
do {
rt = rt1, *rt = *rt0;
retval |= test_api(rt);
retval |= init_test(rt, prefer[i].name, prefer[i].create, prefer[i].start);
retval |= register_test(rt);
retval |= event_test(rt);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment