Commit 910bee0c authored by Pekka Pessi's avatar Pekka Pessi

tport_threadpool.c does not work at the moment, disable it.

darcs-hash:20060426171941-65a35-2a2230c2de6151e26859765351ddf53bd684c252.gz
parent 37072e21
......@@ -41,11 +41,14 @@ libtport_la_SOURCES = tport.c tport_logging.c \
tport_stub_stun.c tport_stub_sigcomp.c \
tport_type_udp.c tport_type_tcp.c tport_type_sctp.c \
tport_type_connect.c tport_type_stun.c \
tport_threadpool.c tport_internal.h \
tport_internal.h \
tport_tag.c tport_tag_ref.c $(USE_TLS_SRC)
# to make sure all files end up in the dist package
EXTRA_libtport_la_SOURCES = $(TLS_SRC) tport_sigcomp.c
EXTRA_libtport_la_SOURCES = $(TLS_SRC)
# Disable for now
EXTRA_libtport_la_SOURCES += tport_sigcomp.c tport_threadpool.c
BUILT_SOURCES = tport_tag_ref.c
......
......@@ -1185,7 +1185,9 @@ tport_vtable_t const *tport_vtables[TPORT_NUMBER_OF_TYPES + 1] =
&tport_tcp_vtable,
&tport_udp_client_vtable,
&tport_udp_vtable,
#if 0
&tport_threadpool_vtable,
#endif
#if HAVE_SOFIA_STUN
&tport_stun_vtable,
#endif
......
......@@ -57,13 +57,19 @@ static char const __func__[] = "tport_threadpool";
/* ==== Thread pools =================================================== */
/* typedef struct tport_threadpool tport_threadpool_t; */
typedef struct threadpool threadpool_t;
struct tport_threadpool
typedef struct {
tport_primary_t tptp_primary;
threadpool_t *tptp_pool; /**< Worker threads */
unsigned tptp_poolsize;
} tport_threadpool_t;
struct threadpool
{
/* Shared */
su_clone_r thrp_clone;
tport_primary_t *thrp_tport;
tport_threadpool_t *thrp_tport;
int thrp_killing; /* Threadpool is being killed */
......@@ -89,7 +95,7 @@ struct tport_threadpool
typedef struct
{
tport_threadpool_t *tpd_thrp;
threadpool_t *tpd_thrp;
int tpd_errorcode;
msg_t *tpd_msg;
su_time_t tpd_when;
......@@ -104,7 +110,7 @@ typedef struct
union tport_su_msg_arg
{
tport_threadpool_t *thrp;
threadpool_t *thrp;
thrp_udp_deliver_t thrp_udp_deliver[1];
};
......@@ -124,12 +130,12 @@ static int tport_thread_send(tport_t *tp,
tport_vtable_t const tport_threadpool_vtable =
{
"udp", tport_type_local,
sizeof (tport_primary_t),
sizeof (tport_threadpool_t),
tport_threadpool_init_primary,
tport_threadpool_deinit_primary,
NULL,
NULL,
sizeof (tport_t),
0, /* No secondary transports! */
NULL,
NULL,
NULL,
......@@ -141,30 +147,30 @@ tport_vtable_t const tport_threadpool_vtable =
tport_thread_send
};
static int thrp_udp_init(su_root_t *, tport_threadpool_t *);
static void thrp_udp_deinit(su_root_t *, tport_threadpool_t *);
static int thrp_udp_event(tport_threadpool_t *thrp,
static int thrp_udp_init(su_root_t *, threadpool_t *);
static void thrp_udp_deinit(su_root_t *, threadpool_t *);
static int thrp_udp_event(threadpool_t *thrp,
su_wait_t *w,
tport_t *_tp);
static int thrp_udp_recv_deliver(tport_threadpool_t *thrp,
static int thrp_udp_recv_deliver(threadpool_t *thrp,
tport_t const *tp,
thrp_udp_deliver_t *tpd,
int events);
static int thrp_udp_recv(tport_threadpool_t *thrp, thrp_udp_deliver_t *tpd);
static int thrp_udp_recv(threadpool_t *thrp, thrp_udp_deliver_t *tpd);
#if HAVE_SIGCOMP
static int thrp_udvm_decompress(tport_threadpool_t *thrp,
static int thrp_udvm_decompress(threadpool_t *thrp,
thrp_udp_deliver_t *tpd);
#endif
static void thrp_udp_deliver(tport_threadpool_t *thrp,
static void thrp_udp_deliver(threadpool_t *thrp,
su_msg_r msg,
union tport_su_msg_arg *arg);
static void thrp_udp_deliver_report(tport_threadpool_t *thrp,
static void thrp_udp_deliver_report(threadpool_t *thrp,
su_msg_r m,
union tport_su_msg_arg *arg);
static void thrp_udp_send(tport_threadpool_t *thrp,
static void thrp_udp_send(threadpool_t *thrp,
su_msg_r msg,
union tport_su_msg_arg *arg);
static void thrp_udp_send_report(tport_threadpool_t *thrp,
static void thrp_udp_send_report(threadpool_t *thrp,
su_msg_r msg,
union tport_su_msg_arg *arg);
......@@ -176,8 +182,9 @@ int tport_threadpool_init_primary(tport_primary_t *pri,
tagi_t const *tags,
char const **return_culprit)
{
tport_threadpool_t *tptp = (tport_threadpool_t *)pri;
tport_t *tp = pri->pri_primary;
tport_threadpool_t *thrp = pri->pri_threadpool;
threadpool_t *thrp;
int i, N = tp->tp_params->tpp_thrpsize;
assert(ai->ai_socktype == SOCK_DGRAM);
......@@ -185,7 +192,7 @@ int tport_threadpool_init_primary(tport_primary_t *pri,
if (tport_udp_init_primary(pri, tpn, ai, tags, return_culprit) < 0)
return -1;
if (N == 0 || thrp != NULL)
if (N == 0)
return 0;
thrp = su_zalloc(tp->tp_home, (sizeof *thrp) * N);
......@@ -194,15 +201,15 @@ int tport_threadpool_init_primary(tport_primary_t *pri,
su_setblocking(tp->tp_socket, 0);
pri->pri_threadpool = thrp;
pri->pri_thrpsize = N;
tptp->tptp_pool = thrp;
tptp->tptp_poolsize = N;
for (i = 0; i < N; i++) {
#if HAVE_SIGCOMP
if (tport_has_sigcomp(tp))
thrp[i].thrp_compartment = tport_primary_compartment(tp->tp_master);
#endif
thrp[i].thrp_tport = pri;
thrp[i].thrp_tport = tptp;
if (su_clone_start(pri->pri_master->mr_root,
thrp[i].thrp_clone,
thrp + i,
......@@ -227,8 +234,9 @@ int tport_threadpool_init_primary(tport_primary_t *pri,
static
void tport_threadpool_deinit_primary(tport_primary_t *pri)
{
tport_threadpool_t *thrp = pri->pri_threadpool;
int i, N = pri->pri_thrpsize;
tport_threadpool_t *tptp = (tport_threadpool_t *)pri;
threadpool_t *thrp = tptp->tptp_pool;
int i, N = pri->tptp_poolsize;
if (!thrp)
return;
......@@ -241,14 +249,15 @@ void tport_threadpool_deinit_primary(tport_primary_t *pri)
for (i = 0; i < N; i++)
su_clone_wait(pri->pri_master->mr_root, thrp[i].thrp_clone);
su_free(pri->pri_home, thrp), pri->pri_threadpool = NULL;
su_free(pri->pri_home, tptp), tptp->tptp_pool = NULL;
tptp->tptp_poolsize = 0;
SU_DEBUG_3(("%s(%p): zapped threadpool\n", __func__, pri));
}
static int thrp_udp_init(su_root_t *root, tport_threadpool_t *thrp)
static int thrp_udp_init(su_root_t *root, threadpool_t *thrp)
{
tport_t *tp = thrp->thrp_tport->pri_primary;
tport_t *tp = thrp->thrp_tport->tptp_primary->pri_primary;
su_wait_t wait[1];
assert(tp);
......@@ -266,7 +275,7 @@ static int thrp_udp_init(su_root_t *root, tport_threadpool_t *thrp)
return 0;
}
static void thrp_udp_deinit(su_root_t *root, tport_threadpool_t *thrp)
static void thrp_udp_deinit(su_root_t *root, threadpool_t *thrp)
{
if (thrp->thrp_reg)
su_root_deregister(root, thrp->thrp_reg), thrp->thrp_reg = 0;
......@@ -274,23 +283,23 @@ static void thrp_udp_deinit(su_root_t *root, tport_threadpool_t *thrp)
}
static inline void
thrp_yield(tport_threadpool_t *thrp)
thrp_yield(threadpool_t *thrp)
{
su_root_eventmask(thrp->thrp_root, thrp->thrp_reg,
thrp->thrp_tport->pri_primary->tp_socket, 0);
tport_t *tp = thrp->thrp_tport->tptp_primary->pri_primary;
su_root_eventmask(thrp->thrp_root, thrp->thrp_reg, tp->tp_socket, 0);
thrp->thrp_yield = 1;
}
static inline void
thrp_gain(tport_threadpool_t *thrp)
thrp_gain(threadpool_t *thrp)
{
su_root_eventmask(thrp->thrp_root, thrp->thrp_reg,
thrp->thrp_tport->pri_primary->tp_socket,
SU_WAIT_IN | SU_WAIT_ERR);
tport_t *tp = thrp->thrp_tport->tptp_primary->pri_primary;
int events = SU_WAIT_IN | SU_WAIT_ERR;
su_root_eventmask(thrp->thrp_root, thrp->thrp_reg, tp->tp_socket, events);
thrp->thrp_yield = 0;
}
static int thrp_udp_event(tport_threadpool_t *thrp,
static int thrp_udp_event(threadpool_t *thrp,
su_wait_t *w,
tport_t *tp)
{
......@@ -337,7 +346,7 @@ static int thrp_udp_event(tport_threadpool_t *thrp,
}
}
static int thrp_udp_recv_deliver(tport_threadpool_t *thrp,
static int thrp_udp_recv_deliver(threadpool_t *thrp,
tport_t const *tp,
thrp_udp_deliver_t *tpd,
int events)
......@@ -369,7 +378,8 @@ static int thrp_udp_recv_deliver(tport_threadpool_t *thrp,
assert(tpd->tpd_errorcode);
if (tpd->tpd_errorcode == EAGAIN || tpd->tpd_errorcode == EWOULDBLOCK)
return 0;
} else if (tpd->tpd_msg) {
}
else if (tpd->tpd_msg) {
int n = msg_extract(tpd->tpd_msg); (void)n;
thrp->thrp_rcvd_msgs++;
......@@ -386,7 +396,7 @@ static int thrp_udp_recv_deliver(tport_threadpool_t *thrp,
if (tpd->tpd_msg || tpd->tpd_errorcode) {
if (qlen >= tp->tp_params->tpp_thrprqsize) {
SU_DEBUG_7(("tport recv queue %i: %u\n",
(int)(thrp - tp->tp_pri->pri_threadpool), qlen));
(int)(thrp - tp->tp_pri->tptp_pool), qlen));
thrp_yield(thrp);
}
......@@ -409,7 +419,7 @@ static pthread_mutex_t mutex[1] = { PTHREAD_MUTEX_INITIALIZER };
/** Receive a UDP packet by threadpool. */
static
int thrp_udp_recv(tport_threadpool_t *thrp, thrp_udp_deliver_t *tpd)
int thrp_udp_recv(threadpool_t *thrp, thrp_udp_deliver_t *tpd)
{
tport_t const *tp = thrp->thrp_tport->pri_primary;
unsigned char sample[2];
......@@ -510,7 +520,7 @@ int thrp_udp_recv(tport_threadpool_t *thrp, thrp_udp_deliver_t *tpd)
#if HAVE_SIGCOMP
static
int thrp_udvm_decompress(tport_threadpool_t *thrp, thrp_udp_deliver_t *tpd)
int thrp_udvm_decompress(threadpool_t *thrp, thrp_udp_deliver_t *tpd)
{
struct sigcomp_udvm *udvm = tpd->tpd_udvm;
struct sigcomp_buffer *output;
......@@ -575,7 +585,7 @@ void thrp_udp_deliver(su_root_magic_t *magic,
union tport_su_msg_arg *arg)
{
thrp_udp_deliver_t *tpd = arg->thrp_udp_deliver;
tport_threadpool_t *thrp = tpd->tpd_thrp;
threadpool_t *thrp = tpd->tpd_thrp;
tport_t *tp = thrp->thrp_tport->pri_primary;
su_time_t now = su_now();
......@@ -609,7 +619,7 @@ void thrp_udp_deliver(su_root_magic_t *magic,
}
static
void thrp_udp_deliver_report(tport_threadpool_t *thrp,
void thrp_udp_deliver_report(threadpool_t *thrp,
su_msg_r m,
union tport_su_msg_arg *arg)
{
......@@ -632,14 +642,15 @@ int tport_thread_send(tport_t *tp,
struct sigcomp_compartment *cc,
unsigned mtu)
{
tport_threadpool_t *thrp = tp->tp_pri->pri_threadpool;
threadpool_t *thrp = tp->tp_pri->tptp_pool;
thrp_udp_deliver_t *tpd;
int i, N = tp->tp_pri->pri_thrpsize;
int i, N = tp->tp_pri->tptp_poolsize;
su_msg_r m;
unsigned totalqlen = 0;
unsigned qlen;
if (!tp->tp_pri->pri_threadpool)
if (!tp->tp_pri->tptp_pool)
return tport_prepare_and_send(tp, msg, tpn, cc, mtu);
SU_DEBUG_9(("tport_thread_send()\n"));
......@@ -651,7 +662,7 @@ int tport_thread_send(tport_t *tp,
/* Select thread with shortest queue */
for (i = 1; i < N; i++) {
tport_threadpool_t *other = tp->tp_pri->pri_threadpool + i;
threadpool_t *other = tp->tp_pri->tptp_pool + i;
unsigned len = other->thrp_s_sent - other->thrp_s_recv;
if (len < qlen ||
......@@ -697,7 +708,7 @@ int tport_thread_send(tport_t *tp,
/** thrp_udp_send() is run by threadpool to send the message. */
static
void thrp_udp_send(tport_threadpool_t *thrp,
void thrp_udp_send(threadpool_t *thrp,
su_msg_r m,
union tport_su_msg_arg *arg)
{
......@@ -782,7 +793,7 @@ void thrp_udp_send_report(su_root_magic_t *magic,
union tport_su_msg_arg *arg)
{
thrp_udp_deliver_t *tpd = arg->thrp_udp_deliver;
tport_threadpool_t *thrp = tpd->tpd_thrp;
threadpool_t *thrp = tpd->tpd_thrp;
tport_t *tp = thrp->thrp_tport->pri_primary;
assert(magic != thrp);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment