Commit d65fd637 authored by Pekka Pessi's avatar Pekka Pessi

tport: added ping-pong keepalive on TCP. replaced single tick with connection-specific timer

Now detecting closed connections on TLS, too.

Added tests for idle timeout, receive timeout, ping-pong timeout.

darcs-hash:20070725160906-65a35-0d568677e950abbc55b7ab10ae9a1fc8000155e6.gz
parent cad60e04
This diff is collapsed.
This diff is collapsed.
......@@ -126,7 +126,7 @@ typedef struct {
struct tport_s {
su_home_t tp_home[1]; /**< Memory home */
int tp_refs; /**< Number of references to tport */
ssize_t tp_refs; /**< Number of references to tport */
unsigned tp_black:1; /**< Used by red-black-tree */
......@@ -134,8 +134,13 @@ struct tport_s {
unsigned tp_conn_orient:1; /**< Is connection-oriented */
unsigned tp_has_connection:1; /**< Has real connection */
unsigned tp_reusable:1; /**< Can this connection be reused */
unsigned tp_closed : 1; /**< This transport is closed */
/**< Remote end has sent FIN (2) or we should not just read */
unsigned tp_closed : 1;
/**< This transport is closed.
*
* A closed transport is inserted into pri_closed list.
*/
/** Remote end has sent FIN (2) or we should not just read */
unsigned tp_recv_close:2;
/** We will send FIN (1) or have sent FIN (2) */
unsigned tp_send_close:2;
......@@ -154,10 +159,10 @@ struct tport_s {
tp_magic_t *tp_magic; /**< Context provided by consumer */
msg_t const *tp_rlogged; /**< Last logged when receiving */
msg_t const *tp_slogged; /**< Last logged when sending */
su_timer_t *tp_timer; /**< Timer object */
unsigned tp_time; /**< When this transport was last used */
su_time_t tp_ktime; /**< Keepalive timer updated */
su_time_t tp_ptime; /**< Ping sent */
tp_name_t tp_name[1]; /**< Transport name.
*
......@@ -182,15 +187,17 @@ struct tport_s {
/* ==== Receive queue ================================================== */
msg_t *tp_msg; /**< Message being received */
msg_t const *tp_rlogged; /**< Last logged when receiving */
su_time_t tp_rtime; /**< Last time received data */
unsigned short tp_ping; /**< Whitespace ping being received */
/* ==== Pending messages =============================================== */
tport_pending_t *tp_pending; /**< Pending requests */
tport_pending_t *tp_released; /**< Released pends */
unsigned short tp_reported; /**< Report counter */
unsigned tp_plen; /**< Size of tp_pending */
unsigned tp_pused; /**< Used pends */
unsigned short tp_reported; /**< Report counter */
unsigned short tp_pad;
tport_pending_t *tp_pending; /**< Pending requests */
tport_pending_t *tp_released; /**< Released pends */
/* ==== Send queue ===================================================== */
......@@ -203,6 +210,9 @@ struct tport_s {
msg_iovec_t *tp_iov; /**< Iovecs allocated for sending */
size_t tp_iovlen; /**< Number of allocated iovecs */
msg_t const *tp_slogged; /**< Last logged when sending */
su_time_t tp_stime; /**< Last time sent message */
/* ==== Extensions ===================================================== */
tport_compressor_t *tp_comp;
......@@ -232,10 +242,10 @@ struct tport_primary {
* tport_type_stun, etc.
*/
char pri_ident[16];
tport_primary_t *pri_next; /**< Next primary tport */
tport_t *pri_secondary; /**< Secondary tports */
tport_t *pri_open; /**< Open secondary tports */
tport_t *pri_closed; /**< Closed secondary tports */
unsigned pri_updating:1; /**< Currently updating address */
unsigned pri_natted:1; /**< Using natted address */
......@@ -245,7 +255,6 @@ struct tport_primary {
void *pri_stun_handle;
tport_params_t pri_params[1]; /**< Transport parameters */
};
/** Master structure */
......@@ -311,7 +320,7 @@ struct tport_master {
#endif
};
/** Virtual funtion table for transports */
/** Virtual function table for transports */
struct tport_vtable
{
char const *vtp_name;
......@@ -348,6 +357,9 @@ struct tport_vtable
int (*vtp_stun_response)(tport_t const *self,
void *msg, size_t msglen,
void *addr, socklen_t addrlen);
int (*vtp_next_secondary_timer)(tport_t *self, su_time_t *,
char const **return_why);
void (*vtp_secondary_timer)(tport_t *self, su_time_t);
};
int tport_register_type(tport_vtable_t const *vtp);
......@@ -392,11 +404,16 @@ tport_t *tport_alloc_secondary(tport_primary_t *pri,
int tport_accept(tport_primary_t *pri, int events);
void tport_zap_secondary(tport_t *self);
int tport_set_secondary_timer(tport_t *self);
void tport_base_timer(tport_t *self, su_time_t now);
int tport_bind_socket(int socket,
su_addrinfo_t *ai,
char const **return_culprit);
void tport_close(tport_t *self);
int tport_shutdown0(tport_t *self, int how);
int tport_has_queued(tport_t const *self);
int tport_error_event(tport_t *self);
void tport_recv_event(tport_t *self);
......@@ -418,6 +435,8 @@ int tport_send_msg(tport_t *self, msg_t *msg,
tp_name_t const *tpn,
struct sigcomp_compartment *cc);
void tport_send_queue(tport_t *self);
void tport_deliver(tport_t *self, msg_t *msg, msg_t *next,
tport_compressor_t *comp,
su_time_t now);
......@@ -434,6 +453,9 @@ void tport_dump_iovec(tport_t const *self, msg_t *msg,
size_t n, su_iovec_t const iov[], size_t iovused,
char const *what, char const *how);
int tport_tcp_ping(tport_t *self, su_time_t now);
int tport_tcp_pong(tport_t *self);
extern tport_vtable_t const tport_udp_vtable;
extern tport_vtable_t const tport_udp_client_vtable;
......@@ -465,6 +487,15 @@ int tport_recv_stream(tport_t *self);
ssize_t tport_send_stream(tport_t const *self, msg_t *msg,
msg_iovec_t iov[], size_t iovused);
int tport_tcp_next_timer(tport_t *self, su_time_t *, char const **);
void tport_tcp_timer(tport_t *self, su_time_t);
int tport_next_recv_timeout(tport_t *, su_time_t *, char const **);
void tport_recv_timeout_timer(tport_t *self, su_time_t now);
int tport_next_keepalive(tport_t *self, su_time_t *, char const **);
void tport_keepalive_timer(tport_t *self, su_time_t now);
extern tport_vtable_t const tport_sctp_vtable;
extern tport_vtable_t const tport_sctp_client_vtable;
extern tport_vtable_t const tport_tls_vtable;
......
......@@ -587,6 +587,10 @@ int tls_error(tls_t *tls, int ret, char const *who, char const *operation,
return 0;
case SSL_ERROR_SYSCALL:
if (SSL_get_shutdown(tls->con) & SSL_RECEIVED_SHUTDOWN)
return 0; /* EOS */
if (errno == 0)
return 0; /* EOS */
return -1;
default:
......@@ -665,10 +669,12 @@ int tls_want_read(tls_t *tls, int events)
if (tls && (events & tls->read_events)) {
int ret = tls_read(tls);
if (ret >= 0)
if (ret > 0)
return 1;
else if (errno == EAGAIN)
else if (ret == 0)
return 0;
else if (errno == EAGAIN)
return 2;
else
return -1;
}
......
......@@ -201,6 +201,8 @@ static tport_t *tport_http_connect(tport_primary_t *pri, su_addrinfo_t *ai,
return NULL;
}
tport_set_secondary_timer(tport);
return tport;
}
......
......@@ -37,12 +37,12 @@
#include "config.h"
#if HAVE_SCTP
#include "tport_internal.h"
#if HAVE_NETINET_SCTP_H
#include <netinet/sctp.h>
#undef HAVE_SCTP
#define HAVE_SCTP 1
#endif
#include <stdlib.h>
......@@ -63,7 +63,6 @@
#define SOL_SCTP IPPROTO_SCTP
#endif
enum { MAX_STREAMS = 1 };
typedef struct tport_sctp_t
{
......@@ -81,8 +80,6 @@ typedef struct tport_sctp_t
#define TP_SCTP_MSG_MAX (65536)
#if HAVE_SCTP
static int tport_sctp_init_primary(tport_primary_t *,
tp_name_t tpn[1],
su_addrinfo_t *, tagi_t const *,
......@@ -100,6 +97,9 @@ static int tport_recv_sctp(tport_t *self);
static ssize_t tport_send_sctp(tport_t const *self, msg_t *msg,
msg_iovec_t iov[], size_t iovused);
static int tport_sctp_next_timer(tport_t *self, su_time_t *, char const **);
static void tport_sctp_timer(tport_t *self, su_time_t);
tport_vtable_t const tport_sctp_client_vtable =
{
"sctp", tport_type_client,
......@@ -116,11 +116,14 @@ tport_vtable_t const tport_sctp_client_vtable =
NULL,
tport_recv_sctp,
tport_send_sctp,
NULL,
NULL,
NULL,
NULL,
tport_sctp_next_timer,
tport_sctp_timer,
};
#undef NEXT_VTABLE
#define NEXT_VTABLE &tport_sctp_client_vtable
tport_vtable_t const tport_sctp_vtable =
{
"sctp", tport_type_local,
......@@ -137,11 +140,14 @@ tport_vtable_t const tport_sctp_vtable =
NULL,
tport_recv_sctp,
tport_send_sctp,
NULL,
NULL,
NULL,
NULL,
tport_sctp_next_timer,
tport_sctp_timer,
};
#undef NEXT_VTABLE
#define NEXT_VTABLE &tport_sctp_vtable
static int tport_sctp_init_primary(tport_primary_t *pri,
tp_name_t tpn[1],
su_addrinfo_t *ai,
......@@ -263,4 +269,56 @@ static ssize_t tport_send_sctp(tport_t const *self, msg_t *msg,
return su_vsend(self->tp_socket, iov, iovused, MSG_NOSIGNAL, NULL, 0);
}
/** Calculate tick timer if send is pending. */
int tport_next_sctp_send_tick(tport_t *self,
su_time_t *return_target,
char const **return_why)
{
unsigned timeout = 100; /* Retry 10 times a second... */
if (tport_has_queued(self)) {
su_time_t ntime = su_time_add(self->tp_ktime, timeout);
if (su_time_cmp(ntime, *return_target) < 0)
*return_target = ntime, *return_why = "send tick";
}
return 0;
}
/** Tick timer if send is pending */
void tport_sctp_send_tick_timer(tport_t *self, su_time_t now)
{
unsigned timeout = 100;
/* Send timeout */
if (tport_has_queued(self) &&
su_time_cmp(su_time_add(self->tp_ktime, timeout), now) < 0) {
uint64_t bytes = self->tp_stats.sent_bytes;
su_time_t stime = self->tp_stime;
tport_send_queue(self);
if (self->tp_stats.sent_bytes == bytes)
self->tp_stime = stime; /* Restore send timestamp */
}
}
/** Calculate next timer for SCTP. */
int tport_sctp_next_timer(tport_t *self,
su_time_t *return_target,
char const **return_why)
{
return
tport_next_recv_timeout(self, return_target, return_why) |
tport_next_sctp_send_tick(self, return_target, return_why);
}
/** SCTP timer. */
void tport_sctp_timer(tport_t *self, su_time_t now)
{
tport_sctp_send_tick_timer(self, now);
tport_recv_timeout_timer(self, now);
tport_base_timer(self, now);
}
#endif
......@@ -45,6 +45,7 @@
#endif
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <assert.h>
#include <errno.h>
......@@ -76,6 +77,12 @@ tport_vtable_t const tport_tcp_vtable =
NULL,
tport_recv_stream,
tport_send_stream,
NULL,
NULL,
NULL,
NULL,
tport_tcp_next_timer,
tport_tcp_timer,
};
tport_vtable_t const tport_tcp_client_vtable =
......@@ -84,7 +91,7 @@ tport_vtable_t const tport_tcp_client_vtable =
sizeof (tport_primary_t),
tport_tcp_init_client,
NULL,
tport_accept,
NULL,
NULL,
sizeof (tport_t),
tport_tcp_init_secondary,
......@@ -94,6 +101,12 @@ tport_vtable_t const tport_tcp_client_vtable =
NULL,
tport_recv_stream,
tport_send_stream,
NULL,
NULL,
NULL,
NULL,
tport_tcp_next_timer,
tport_tcp_timer,
};
static int tport_tcp_setsndbuf(int socket, int atleast);
......@@ -205,6 +218,20 @@ static int tport_tcp_setsndbuf(int socket, int atleast)
#endif
}
/** Return span of whitespace from buffer */
static inline size_t ws_span(void *buffer, size_t len)
{
size_t i;
char const *b = buffer;
for (i = 0; i < len; i++) {
if (b[i] != '\r' && b[i] != '\n' && b[i] != ' ' && b[i] != '\t')
break;
}
return i;
}
/** Receive from stream.
*
* @retval -1 error
......@@ -217,7 +244,7 @@ int tport_recv_stream(tport_t *self)
{
msg_t *msg;
ssize_t n, N, veclen;
int err;
int err, initial;
msg_iovec_t iovec[msg_n_fragments] = {{ 0 }};
N = su_getmsgsize(self->tp_socket);
......@@ -233,6 +260,36 @@ int tport_recv_stream(tport_t *self)
return -1;
}
initial = self->tp_msg == NULL;
memset(&self->tp_ptime, 0, sizeof self->tp_ptime);
while (initial && N <= 8) { /* Check for whitespace */
char crlf[9];
size_t i;
n = su_recv(self->tp_socket, crlf, N, MSG_PEEK);
i = ws_span(crlf, n);
if (i == 0)
break;
n = su_recv(self->tp_socket, crlf, i, 0);
if (n <= 0)
return (int)n;
SU_DEBUG_7(("%s(%p): received keepalive\n", __func__, (void *)self));
N -= n, self->tp_ping += n;
if (N == 0) {
/* outbound-10 section 3.5.1 - send pong */
if (self->tp_ping >= 4)
tport_tcp_pong(self);
return 1;
}
}
veclen = tport_recv_iovec(self, &self->tp_msg, iovec, N, 0);
if (veclen == -1)
return -1;
......@@ -242,11 +299,30 @@ int tport_recv_stream(tport_t *self)
msg_set_address(msg, self->tp_addr, (socklen_t)(self->tp_addrlen));
n = su_vrecv(self->tp_socket, iovec, veclen, 0, NULL, NULL);
if (n == SOCKET_ERROR)
return tport_recv_error_report(self);
assert(n <= N);
/* Check if message contains only whitespace */
/* This can happen if multiple PINGs are received at once */
if (initial) {
size_t i = ws_span(iovec->siv_base, iovec->siv_len);
if (i + self->tp_ping >= 4)
tport_tcp_pong(self);
else
self->tp_ping += i;
if (i == iovec->siv_len && veclen == 1) {
SU_DEBUG_7(("%s(%p): received %u bytes of keepalive\n",
__func__, (void *)self, (unsigned)i));
msg_destroy(self->tp_msg), self->tp_msg = NULL;
return 1;
}
}
/* Write the received data to the message dump file */
if (self->tp_master->mr_dump_file)
tport_dump_iovec(self, msg, n, iovec, veclen, "recv", "from");
......@@ -254,9 +330,13 @@ int tport_recv_stream(tport_t *self)
/* Mark buffer as used */
msg_recv_commit(msg, n, n == 0);
if (n > 0)
self->tp_ping = 0;
return n != 0;
}
/** Send to stream */
ssize_t tport_send_stream(tport_t const *self, msg_t *msg,
msg_iovec_t iov[],
size_t iovused)
......@@ -267,3 +347,184 @@ ssize_t tport_send_stream(tport_t const *self, msg_t *msg,
#endif
return su_vsend(self->tp_socket, iov, iovused, MSG_NOSIGNAL, NULL, 0);
}
/** Calculate timeout if receive is incomplete. */
int tport_next_recv_timeout(tport_t *self,
su_time_t *return_target,
char const **return_why)
{
unsigned timeout = self->tp_params->tpp_timeout;
if (timeout < INT_MAX) {
/* Recv timeout */
if (self->tp_msg) {
su_time_t ntime = su_time_add(self->tp_rtime, timeout);
if (su_time_cmp(ntime, *return_target) < 0)
*return_target = ntime, *return_why = "recv timeout";
}
#if 0
/* Send timeout */
if (tport_has_queued(self)) {
su_time_t ntime = su_time_add(self->tp_stime, timeout);
if (su_time_cmp(ntime, *return_target) < 0)
*return_target = ntime, *return_why = "send timeout";
}
#endif
}
return 0;
}
/** Timeout timer if receive is incomplete */
void tport_recv_timeout_timer(tport_t *self, su_time_t now)
{
unsigned timeout = self->tp_params->tpp_timeout;
if (timeout < INT_MAX) {
if (self->tp_msg &&
su_time_cmp(su_time_add(self->tp_rtime, timeout), now) < 0) {
msg_t *msg = self->tp_msg;
msg_set_streaming(msg, 0);
msg_set_flags(msg, MSG_FLG_ERROR | MSG_FLG_TRUNC | MSG_FLG_TIMEOUT);
tport_deliver(self, msg, NULL, NULL, now);
self->tp_msg = NULL;
}
#if 0
/* Send timeout */
if (tport_has_queued(self) &&
su_time_cmp(su_time_add(self->tp_stime, timeout), now) < 0) {
stime = su_time_add(self->tp_stime, self->tp_params->tpp_timeout);
if (su_time_cmp(stime, target) < 0)
target = stime;
}
#endif
}
}
/** Calculate next timeout for keepalive */
int tport_next_keepalive(tport_t *self,
su_time_t *return_target,
char const **return_why)
{
/* Keepalive timer */
unsigned timeout = self->tp_params->tpp_keepalive;
if (timeout != 0 && timeout != UINT_MAX) {
if (!tport_has_queued(self)) {
su_time_t ntime = su_time_add(self->tp_ktime, timeout);
if (su_time_cmp(ntime, *return_target) < 0)
*return_target = ntime, *return_why = "keepalive";
}
}
timeout = self->tp_params->tpp_pingpong;
if (timeout != 0) {
if (self->tp_ptime.tv_sec && !self->tp_recv_close) {
su_time_t ntime = su_time_add(self->tp_ptime, timeout);
if (su_time_cmp(ntime, *return_target) < 0)
*return_target = ntime, *return_why = "waiting for pong";
}
}
return 0;
}
/** Keepalive timer. */
void tport_keepalive_timer(tport_t *self, su_time_t now)
{
unsigned timeout = self->tp_params->tpp_pingpong;
if (timeout != 0) {
if (self->tp_ptime.tv_sec && !self->tp_recv_close &&
su_time_cmp(su_time_add(self->tp_ptime, timeout), now) < 0) {
SU_DEBUG_3(("%s(%p): %s to " TPN_FORMAT "%s\n",
__func__, (void *)self,
"closing connection", TPN_ARGS(self->tp_name),
" because of PONG timeout"));
tport_close(self);
return;
}
}
timeout = self->tp_params->tpp_keepalive;
if (timeout != 0 && timeout != UINT_MAX) {
if (su_time_cmp(su_time_add(self->tp_ktime, timeout), now) < 0) {
tport_tcp_ping(self, now);
}
}
}
/** Send PING */
int tport_tcp_ping(tport_t *self, su_time_t now)
{
ssize_t n;
char *why = "";
if (tport_has_queued(self))
return 0;
n = send(self->tp_socket, "\r\n\r\n", 4, 0);
if (n > 0)
self->tp_ktime = now;
if (n == 4) {
if (self->tp_ptime.tv_sec == 0)
self->tp_ptime = now;
}
else if (n == -1) {
int error = su_errno();
why = " failed";
if (!su_is_blocking(error))
tport_error_report(self, error, NULL);
else
why = " blocking";
return -1;
}
SU_DEBUG_7(("%s(%p): %s to " TPN_FORMAT "%s\n",
__func__, (void *)self,
"sending PING", TPN_ARGS(self->tp_name), why));
return n == -1 ? -1 : 0;
}
/** Send pong */
int tport_tcp_pong(tport_t *self)
{
self->tp_ping = 0;
if (tport_has_queued(self) || !self->tp_params->tpp_pong2ping)
return 0;
SU_DEBUG_7(("%s(%p): %s to " TPN_FORMAT "%s\n",
__func__, (void *)self,
"sending PONG", TPN_ARGS(self->tp_name), ""));
return send(self->tp_socket, "\r\n", 2, 0);
}
/** Calculate next timer for TCP. */
int tport_tcp_next_timer(tport_t *self,
su_time_t *return_target,
char const **return_why)
{
return
tport_next_recv_timeout(self, return_target, return_why) |
tport_next_keepalive(self, return_target, return_why);
}
/** TCP timer. */
void tport_tcp_timer(tport_t *self, su_time_t now)
{
tport_recv_timeout_timer(self, now);
tport_keepalive_timer(self, now);
tport_base_timer(self, now);
}
......@@ -307,6 +307,8 @@ int tport_tls_events(tport_t *self, int events)
ret = tls_want_read(tlstp->tlstp_context, events);
if (ret > 0)
tport_recv_event(self);
else if (ret == 0) /* End-of-stream */
tport_shutdown0(self, 2);
else if (ret < 0)
tport_error_report(self, errno, NULL);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment