Commit 0cc3ab2f authored by Pekka Pessi's avatar Pekka Pessi
Browse files

Fixed tport.c on Win32 port.

Added a separate "connecting" phase.
Checking EWOULDBLOCK in addition to EAGAIN.
Fixed TP_AI_CLOSE and TP_AI_SHUTDOWN flags.
Improved logging.

darcs-hash:20051111173822-65a35-4e3ed06b2975405da108791d755cb1becf5960e3.gz
parent 8f087758
......@@ -209,7 +209,8 @@ struct tport_s {
unsigned tp_connected : 1; /**< Has real connection */
unsigned tp_reusable:1; /**< Can this connection be reused */
unsigned tp_closed : 1; /**< This transport is closed */
unsigned tp_recv_close:1; /**< Remote end has sent FIN */
/**< Remote end has sent FIN (2) or we should not just read */
unsigned tp_recv_close:2;
/** We will send FIN (1) or have sent FIN (2) */
unsigned tp_send_close:2;
......@@ -393,10 +394,10 @@ struct tport_master {
/* Halfclose (shutdown(c, 1)) connection after sending message */
#define TP_AI_SHUTDOWN 0x0020
/* Close connection (shutdown(c, 2)) after sending message */
#define TP_AI_CLOSE 0x0020
#define TP_AI_CLOSE 0x0040
/* Address was inaddr_any */
#define TP_AI_ANY 0x0040
#define TP_AI_ANY 0x0080
typedef long unsigned LU; /* for printf() and friends */
......@@ -781,6 +782,7 @@ static int
tport_setname(tport_t *, char const *, su_sockaddr_t const *, char const *),
tport_recv(su_root_magic_t *m, su_wait_t *w, tport_t *self),
tport_accept(su_root_magic_t *m, su_wait_t *w, tport_t *self),
tport_connected(su_root_magic_t *m, su_wait_t *w, tport_t *self),
tport_resolve(tport_t *self, msg_t *msg, tp_name_t const *tpn),
tport_send_msg(tport_t *, msg_t *, tp_name_t const *tpn,
struct sigcomp_compartment *cc),
......@@ -961,7 +963,7 @@ tport_t *tport_tcreate(tp_stack_t *stack,
if (!mr)
return NULL;
SU_DEBUG_7(("%s(): %p\n", __func__, mr));
SU_DEBUG_7(("%s(): %p\n", "tport_create", mr));
mr->mr_stack = stack;
mr->mr_tpac = tpac;
......@@ -1254,7 +1256,7 @@ tport_primary_t *tport_listen(tport_master_t *mr, su_addrinfo_t const *ai,
return TPORT_LISTEN_ERROR(su_errno(), su_wait_create);
/* Register receiving or accepting function with events specified above */
index = su_root_register(mr->mr_root, wait, wakeup, pri->pri_primary, 0);
index = su_root_register(mr->mr_root, wait, wakeup, pri->pri_primary, 0);
if (index == -1)
return TPORT_LISTEN_ERROR(su_errno(), su_root_register);
if (tport_setname(pri->pri_primary, protoname, su, canon) == -1)
......@@ -1371,6 +1373,8 @@ tport_t *tport_connect(tport_primary_t *pri,
su_socket_t s = SOCKET_ERROR;
int index = 0, err;
su_wait_t wait[1] = { SU_WAIT_INIT };
su_wakeup_f wakeup = tport_recv;
int events = SU_WAIT_IN | SU_WAIT_ERR;
int errlevel = 3;
char buf[TPORT_HOSTPORTSIZE];
......@@ -1429,21 +1433,23 @@ tport_t *tport_connect(tport_primary_t *pri,
if (ai->ai_socktype == SOCK_STREAM) {
int one = 1;
if (setsockopt(s, SOL_TCP, TCP_NODELAY, &one, sizeof one) == -1)
if (setsockopt(s, SOL_TCP, TCP_NODELAY, (void *)&one, sizeof one) == -1)
TPORT_CONNECT_ERROR(su_errno(), setsockopt(TCP_NODELAY));
}
if (connect(s, ai->ai_addr, ai->ai_addrlen) == SOCKET_ERROR) {
err = su_errno();
if (err != EINPROGRESS && err != EAGAIN)
if (err != EINPROGRESS && err != EAGAIN && err != EWOULDBLOCK)
TPORT_CONNECT_ERROR(err, connect);
events = SU_WAIT_CONNECT | SU_WAIT_ERR;
wakeup = tport_connected;
}
if (su_wait_create(wait, s, SU_WAIT_IN | SU_WAIT_HUP) == -1)
if (su_wait_create(wait, s, events) == -1)
TPORT_CONNECT_ERROR(su_errno(), su_wait_create);
/* Register receiving function with events specified above */
if ((index = su_root_register(mr->mr_root, wait, tport_recv, self, 0)) == -1)
if ((index = su_root_register(mr->mr_root, wait, wakeup, self, 0)) == -1)
TPORT_CONNECT_ERROR(su_errno(), su_root_register);
/* */
......@@ -1453,7 +1459,7 @@ tport_t *tport_connect(tport_primary_t *pri,
self->tp_socket = s;
self->tp_index = index;
self->tp_events = SU_WAIT_IN | SU_WAIT_HUP;
self->tp_events = events;
self->tp_connected = ai->ai_socktype != SOCK_DGRAM;
self->tp_conn_orient = 1;
......@@ -2206,12 +2212,12 @@ void tport_close(tport_t *self)
{
int i;
SU_DEBUG_5(("%s(%p): " TPN_FORMAT "\n", __func__, self,
SU_DEBUG_5(("%s(%p): " TPN_FORMAT "\n", "tport_close", self,
TPN_ARGS(self->tp_name)));
self->tp_closed = 1;
self->tp_send_close = 1;
self->tp_recv_close = 1;
self->tp_send_close = 3;
self->tp_recv_close = 3;
#if HAVE_TLS
if (self->tp_tls != NULL) {
......@@ -2229,9 +2235,11 @@ void tport_close(tport_t *self)
if (self->tp_index)
su_root_deregister(self->tp_master->mr_root, self->tp_index);
self->tp_index = 0;
#if SU_HAVE_BSDSOCK
if (self->tp_socket != -1)
su_close(self->tp_socket);
self->tp_socket = -1;
#endif
if (self->tp_params->tpp_sdwn_error && self->tp_pused)
tport_error_report(self, -1, NULL);
......@@ -2269,12 +2277,12 @@ int tport_shutdown(tport_t *self, int how)
if (self == NULL || tport_is_primary(self))
return -1;
SU_DEBUG_7(("%s(%p, %d)\n", __func__, self, how));
SU_DEBUG_7(("%s(%p, %d)\n", "tport_shutdown", self, how));
if (!tport_is_tcp(self) ||
how < 0 ||
(how == 0 && self->tp_send_close) ||
(how == 1 && self->tp_recv_close) ||
(how == 1 && self->tp_recv_close > 1) ||
how >= 2) {
tport_close(self);
return 1;
......@@ -2290,7 +2298,7 @@ int tport_shutdown(tport_t *self, int how)
shutdown(self->tp_socket, how);
if (how == 0) {
self->tp_recv_close = 1;
self->tp_recv_close = 2;
self->tp_events &= ~SU_WAIT_IN;
if (self->tp_params->tpp_sdwn_error && self->tp_pused)
tport_error_report(self, -1, NULL);
......@@ -2601,7 +2609,7 @@ static void tport_error_report(tport_t *self, int errcode,
/* Mark this connection as unusable */
if (errcode > 0 && tport_is_connected(self))
self->tp_send_close = 2, self->tp_recv_close = 1;
self->tp_reusable = 0;
if (addr == NULL && tport_is_connection_oriented(self))
addr = self->tp_addr;
......@@ -2663,7 +2671,7 @@ int tport_accept(su_root_magic_t *m, su_wait_t *w, tport_t *_pri)
return 0;
}
setsockopt(s, SOL_TCP, TCP_NODELAY, &one, sizeof one);
setsockopt(s, SOL_TCP, TCP_NODELAY, (void *)&one, sizeof one);
SU_CANONIZE_SOCKADDR(su);
......@@ -2698,6 +2706,7 @@ int tport_accept(su_root_magic_t *m, su_wait_t *w, tport_t *_pri)
self->tp_index = i;
self->tp_connected = 1;
self->tp_conn_orient = 1;
self->tp_events = events;
self->tp_addr[0] = su[0];
self->tp_addrlen = sulen;
......@@ -2757,6 +2766,54 @@ msg_t *tport_msg_alloc(tport_t const *self, unsigned size)
}
}
/** Process events for socket waiting to be connected
*/
static int tport_connected(su_root_magic_t *magic, su_wait_t *w, tport_t *self)
{
int events = su_wait_events(w, self->tp_socket);
tport_master_t *mr = self->tp_master;
tport_primary_t *pri = self->tp_pri;
su_wait_t wait[1] = { SU_WAIT_INIT };
int error;
SU_DEBUG_7(("tport_connected(%p): events%s%s\n", self,
events & SU_WAIT_CONNECT ? " CONNECTED" : "",
events & SU_WAIT_ERR ? " ERR" : ""));
#if HAVE_POLL
assert(w->fd == self->tp_socket);
#endif
if (events & SU_WAIT_ERR)
tport_error_event(self, events);
if (!(events & SU_WAIT_CONNECT) || self->tp_closed) {
return 0;
}
error = su_soerror(self->tp_socket);
if (error) {
tport_error_report(self, error, NULL);
return 0;
}
su_root_deregister(mr->mr_root, self->tp_index);
self->tp_index = -1;
self->tp_events = SU_WAIT_IN | SU_WAIT_ERR;
if (su_wait_create(wait, self->tp_socket, self->tp_events) == -1 ||
(self->tp_index = su_root_register(mr->mr_root,
wait, tport_recv, self, 0))
== -1) {
tport_close(self);
}
else if (self->tp_queue && self->tp_queue[self->tp_qhead]) {
tport_send_event(self, events);
}
return 0;
}
/** Process events for connected socket
*/
static int tport_recv(su_root_magic_t *magic, su_wait_t *w, tport_t *self)
......@@ -2769,11 +2826,12 @@ static int tport_recv(su_root_magic_t *magic, su_wait_t *w, tport_t *self)
old_mask = tls_events(self->tp_tls, self->tp_events);
#endif
SU_DEBUG_7(("tport_recv(%p): events%s%s%s%s\n", self,
SU_DEBUG_7(("tport_recv(%p): events%s%s%s%s%s\n", self,
events & SU_WAIT_IN ? " IN" : "",
events & SU_WAIT_HUP ? " HUP" : "",
events & SU_WAIT_OUT ? " OUT" : "",
events & SU_WAIT_ERR ? " ERR" : ""));
events & SU_WAIT_ERR ? " ERR" : "",
self->tp_closed ? " (closed)" : ""));
#if HAVE_POLL
assert(w->fd == self->tp_socket);
......@@ -2886,7 +2944,7 @@ static void tport_recv_event(tport_t *self, int event)
su_time_t now;
int again;
SU_DEBUG_7(("%s(%p)\n", __func__, self));
SU_DEBUG_7(("%s(%p)\n", "tport_recv_event", self));
do {
now = su_now();
......@@ -2895,7 +2953,7 @@ static void tport_recv_event(tport_t *self, int event)
if ((again = tport_recv_data(self)) < 0) {
int error = su_errno();
if (error != EAGAIN) {
if (error != EAGAIN && error != EWOULDBLOCK) {
tport_error_report(self, error, NULL);
/* Failure: shutdown socket */
if (tport_is_connected(self))
......@@ -3134,7 +3192,7 @@ int tport_recv_data(tport_t *self)
self->tp_params->tpp_drop &&
su_randint(0, 1000) < self->tp_params->tpp_drop) {
char sample[1];
recv(self->tp_socket, &sample, 1, 0);
recv(self->tp_socket, sample, 1, 0);
SU_DEBUG_3(("tport(%p): simulated packet loss!\n", self));
return 0;
}
......@@ -3223,7 +3281,7 @@ static int tport_recv_iovec(tport_t const *self,
static int tport_recv_error_report(tport_t *self)
{
if (su_errno() == EAGAIN)
if (su_errno() == EAGAIN && su_errno() != EWOULDBLOCK)
return 1;
/* Report error */
......@@ -3491,7 +3549,7 @@ static int tport_recv_sigcomp_r(tport_t *self,
char const *pn = self->tp_protoname;
int err = su_errno();
if (err == EAGAIN) {
if (err == EAGAIN || err == EWOULDBLOCK) {
SU_DEBUG_7(("%s(%p): recv from %s: EAGAIN\n", __func__, self, pn));
return 1;
}
......@@ -3775,7 +3833,8 @@ int tport_recv_tls(tport_t *self)
if (N == 0) /* End-of-stream */
return 0;
else if (N == -1) {
if (su_errno() == EAGAIN) {
int err = su_errno();
if (err == EAGAIN || err == EWOULDBLOCK) {
tport_events(self);
return 1;
}
......@@ -4070,7 +4129,9 @@ tport_t *tport_tsend(tport_t *self,
* If there is already an queued message,
* put this message straight in the queue
*/
if (self->tp_queue && self->tp_queue[self->tp_qhead]) {
if (self->tp_queue && self->tp_queue[self->tp_qhead] ||
/* ...or we are connecting */
(self->tp_events & (SU_WAIT_CONNECT | SU_WAIT_OUT))) {
if (tport_queue(self, msg) < 0) {
SU_DEBUG_9(("tport_queue failed in tsend\n"));
return NULL;
......@@ -4224,7 +4285,7 @@ int tport_vsend(tport_t *self,
n = tport_vsend_iovec(self, msg, iov, iovused);
}
if (tpn == NULL)
if (tpn == NULL || tport_is_connection_oriented(self))
tpn = self->tp_name;
if (n == -1)
......@@ -4292,7 +4353,7 @@ int tport_send_error(tport_t *self, msg_t *msg,
/*Xyzzy*/
}
if (error == EAGAIN) {
if (error == EAGAIN || error == EWOULDBLOCK) {
SU_DEBUG_5(("tport_vsend(%p): %s with (s=%d %s/%s:%s%s)\n",
self, "EAGAIN", self->tp_socket,
tpn->tpn_proto, tpn->tpn_host, tpn->tpn_port, comp));
......@@ -4486,7 +4547,7 @@ int tport_sigcomp_vsend(tport_t const *self,
if (m == -1) {
int error = su_errno();
if (error != EAGAIN) {
if (error != EAGAIN && error != EWOULDBLOCK) {
sigcomp_compressor_free(c);
sc->sc_compressor = NULL;
sc->sc_output = NULL; sc->sc_input = NULL;
......@@ -4580,7 +4641,8 @@ int tport_tls_writevec(tport_t *self,
self->tp_tls, iov[i].siv_base, (LU)iov[i].siv_len, n));
if (n < 0) {
if (su_errno() == EAGAIN)
int err = su_errno();
if (err == EAGAIN || err == EWOULDBLOCK)
break;
SU_DEBUG_3(("tls_write: %s\n", strerror(errno)));
return -1;
......@@ -5090,8 +5152,9 @@ tport_udp_error(tport_t const *self, su_sockaddr_t name[1])
n = recvmsg(self->tp_socket, msg, MSG_ERRQUEUE);
if (n < 0) {
if (su_errno() != EAGAIN)
SU_DEBUG_1(("%s: recvmsg: %s\n", __func__, su_strerror(su_errno())));
int err = su_errno();
if (err != EAGAIN && err != EWOULDBLOCK)
SU_DEBUG_1(("%s: recvmsg: %s\n", __func__, su_strerror(err)));
return 0;
}
......@@ -6296,7 +6359,7 @@ static int thrp_udp_recv_deliver(tport_threadpool_t *thrp,
if (thrp_udp_recv(thrp, tpd) < 0) {
tpd->tpd_errorcode = su_errno();
assert(tpd->tpd_errorcode);
if (tpd->tpd_errorcode == EAGAIN)
if (tpd->tpd_errorcode == EAGAIN || tpd->tpd_errorcode == EWOULDBLOCK)
return 0;
} else if (tpd->tpd_msg) {
int n = msg_extract(tpd->tpd_msg); (void)n;
......@@ -6359,7 +6422,7 @@ int thrp_udp_recv(tport_threadpool_t *thrp, thrp_udp_deliver_t *tpd)
N = recv(tp->tp_socket, &sample, 1, MSG_PEEK);
if (N < 0) {
if (su_errno() == EAGAIN)
if (su_errno() == EAGAIN || su_errno() == EWOULDBLOCK)
N = 0;
} else if (N == 0) {
SU_DEBUG_1(("thrp_udp_recv(%p): zero len packet\n", thrp));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment