Commit 3db6aaa1 authored by Pekka Pessi's avatar Pekka Pessi

Updated tport_connect() interface.

Calling tport_alloc_seconary() when client socket has been created, making
it possible to set socket options before connecting the socket. Currently,
this benefits SCTP and TLS.

This change affects tport_internal.h, tport.c, tport_type_sctp.c,
tport_type_tcp.c, and tport_type_tls.c.

darcs-hash:20060426165300-65a35-6d262b339a4362e16a2b20777e1d580efff33194.gz
parent 80bb5451
......@@ -760,13 +760,19 @@ int tport_set_events(tport_t *self, int set, int clear)
* object. The new transport initally shares parameters structure with the
* original transport.
*
* @param dad primary transport
* @param pri primary transport
* @param socket socket for transport
* @parma accepted true if the socket was accepted from server socket
*
* @return
* The function tport_alloc_seconary() returns a pointer to the newly
* created transport, or NULL upon an error.
* Pointer to the newly created transport, or NULL upon an error.
*
* @note The socket is always closed upon error.
*/
tport_t *tport_alloc_secondary(tport_primary_t *pri, int socket, int accepted)
tport_t *tport_alloc_secondary(tport_primary_t *pri,
int socket,
int accepted,
char const **return_reason)
{
tport_master_t *mr = pri->pri_master;
tport_t *self;
......@@ -790,13 +796,18 @@ tport_t *tport_alloc_secondary(tport_primary_t *pri, int socket, int accepted)
self->tp_socket = socket;
if (pri->pri_vtable->vtp_init_secondary &&
pri->pri_vtable->vtp_init_secondary(self, socket, accepted) < 0) {
pri->pri_vtable->vtp_init_secondary(self, socket, accepted,
return_reason) < 0) {
if (pri->pri_vtable->vtp_deinit_secondary)
pri->pri_vtable->vtp_deinit_secondary(self);
su_home_zap(self->tp_home);
return NULL;
}
}
else {
su_close(socket);
*return_reason = "malloc";
}
return self;
}
......@@ -817,6 +828,9 @@ tport_t *tport_connect(tport_primary_t *pri,
su_addrinfo_t *ai,
tp_name_t const *tpn)
{
if (ai == NULL || ai->ai_addrlen > sizeof (pri->pri_primary->tp_addr))
return NULL;
if (pri->pri_vtable->vtp_connect)
return pri->pri_vtable->vtp_connect(pri, ai, tpn);
else
......@@ -834,7 +848,7 @@ tport_t *tport_connect(tport_primary_t *pri,
* @param real_ai pointer to addrinfo structure describing real target
* @param tpn canonical name of node
*/
tport_t *tport_base_connect(tport_primary_t *pri,
tport_t *tport_base_connect(tport_primary_t *pri,
su_addrinfo_t *ai,
su_addrinfo_t *real_ai,
tp_name_t const *tpn)
......@@ -842,43 +856,49 @@ tport_t *tport_base_connect(tport_primary_t *pri,
tport_master_t *mr = pri->pri_master;
tport_t *self = NULL;
su_socket_t s = SOCKET_ERROR;
int index = 0, err;
su_socket_t s, server_socket;
su_wait_t wait[1] = { SU_WAIT_INIT };
su_wakeup_f wakeup = tport_wakeup;
int index = 0;
int events = SU_WAIT_IN | SU_WAIT_ERR;
int errlevel = 3;
int err, errlevel = 3;
char buf[TPORT_HOSTPORTSIZE];
char const *what;
if (ai == NULL || ai->ai_addrlen > sizeof (self->tp_addr))
return NULL;
/* Log an error, return error */
#define TPORT_CONNECT_ERROR(errno, what) \
#define TPORT_CONNECT_ERROR(errno, what) \
return \
((void)(err = errno, s != SOCKET_ERROR ? su_close(s) : 0, \
((void)(err = errno, \
su_wait_destroy(wait), \
(SU_LOG_LEVEL >= errlevel ? \
su_llog(tport_log, errlevel, \
"%s(%p): %s(pf=%d %s/%s): %s\n", \
__func__, pri, #what, ai->ai_family, \
tpn->tpn_proto, \
tport_hostport(buf, sizeof(buf), \
(void *)ai->ai_addr, 2), \
tport_hostport(buf, sizeof(buf), \
(void *)ai->ai_addr, 2), \
su_strerror(err)) : (void)0), \
tport_zap_secondary(self), \
su_seterrno(err)), \
(void *)NULL)
s = su_socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
if (s == SOCKET_ERROR)
TPORT_CONNECT_ERROR(su_errno(), "socket");
if (pri->pri_primary->tp_socket != SOCKET_ERROR) {
what = "tport_alloc_secondary";
if ((self = tport_alloc_secondary(pri, s, 0, &what)) == NULL)
TPORT_CONNECT_ERROR(errno, what);
self->tp_conn_orient = 1;
if ((server_socket = pri->pri_primary->tp_socket) != SOCKET_ERROR) {
su_sockaddr_t susa;
socklen_t susalen = sizeof(susa);
int pri_s = pri->pri_primary->tp_socket;
if (getsockname(pri_s, &susa.su_sa, &susalen) < 0) {
/* Bind this socket to same IP address as the primary server socket */
if (getsockname(server_socket, &susa.su_sa, &susalen) < 0) {
SU_DEBUG_3(("tport_connect: getsockname(): %s\n",
su_strerror(su_errno())));
}
......@@ -891,13 +911,9 @@ tport_t *tport_base_connect(tport_primary_t *pri,
}
}
#if SU_HAVE_IN6
if (s == SOCKET_ERROR) {
if (ai->ai_family == AF_INET6 && su_errno() == EAFNOSUPPORT)
errlevel = 7;
TPORT_CONNECT_ERROR(su_errno(), socket);
}
#endif
/* Set sockname for the tport */
if (tport_setname(self, tpn->tpn_proto, real_ai, tpn->tpn_canon) == -1)
TPORT_CONNECT_ERROR(su_errno(), tport_setname);
if (connect(s, ai->ai_addr, ai->ai_addrlen) == SOCKET_ERROR) {
err = su_errno();
......@@ -905,31 +921,32 @@ tport_t *tport_base_connect(tport_primary_t *pri,
TPORT_CONNECT_ERROR(err, connect);
events = SU_WAIT_CONNECT | SU_WAIT_ERR;
wakeup = tport_connected;
what = "connecting";
}
else {
what = "connected";
}
if (su_wait_create(wait, s, events) == -1)
if (su_wait_create(wait, s, self->tp_events = events) == -1)
TPORT_CONNECT_ERROR(su_errno(), su_wait_create);
if ((self = tport_alloc_secondary(pri, s, 0)) == NULL)
TPORT_CONNECT_ERROR(errno, tport_alloc_secondary);
/* Register receiving function with events specified above */
if ((index = su_root_register(mr->mr_root, wait, wakeup, self, 0)) == -1)
TPORT_CONNECT_ERROR(su_errno(), su_root_register);
/* Set sockname for the tport */
if (tport_setname(self, tpn->tpn_proto, real_ai, tpn->tpn_canon) == -1)
TPORT_CONNECT_ERROR(su_errno(), tport_setname);
self->tp_socket = s;
self->tp_index = index;
self->tp_events = events;
self->tp_conn_orient = 1;
SU_DEBUG_5(("%s(%p): %s " TPN_FORMAT "\n",
__func__, self, "connecting to",
TPN_ARGS(self->tp_name)));
self->tp_index = index;
if (ai == real_ai) {
SU_DEBUG_5(("%s(%p): %s to " TPN_FORMAT "\n",
__func__, self, what, TPN_ARGS(self->tp_name)));
}
else {
SU_DEBUG_5(("%s(%p): %s via %s to " TPN_FORMAT "\n",
__func__, self, what,
tport_hostport(buf, sizeof(buf), (void *)ai->ai_addr, 2),
TPN_ARGS(self->tp_name)));
}
tprb_append(&pri->pri_secondary, self);
return self;
......@@ -2313,6 +2330,7 @@ int tport_accept(tport_primary_t *pri, int events)
su_addrinfo_t ai[1];
su_sockaddr_t su[1];
su_socket_t s = SOCKET_ERROR, l = pri->pri_primary->tp_socket;
char const *reason = "accept";
if (events & SU_WAIT_ERR)
tport_error_event(pri->pri_primary);
......@@ -2332,10 +2350,8 @@ int tport_accept(tport_primary_t *pri, int events)
return 0;
}
SU_CANONIZE_SOCKADDR(su);
/* Alloc a new transport object, then register socket events with it */
self = tport_alloc_secondary(pri, s, 1);
self = tport_alloc_secondary(pri, s, 1, &reason);
if (self) {
int i;
......@@ -2343,11 +2359,10 @@ int tport_accept(tport_primary_t *pri, int events)
su_wakeup_f wakeup = tport_wakeup;
int events = SU_WAIT_IN|SU_WAIT_ERR|SU_WAIT_HUP;
su_wait_t wait[1] = { SU_WAIT_INIT };
self->tp_socket = s;
if (
/* Create wait object with appropriate events. */
SU_CANONIZE_SOCKADDR(su);
if (/* Create wait object with appropriate events. */
su_wait_create(wait, s, events) != -1
/* Register socket to root */
&&
......@@ -2374,9 +2389,6 @@ int tport_accept(tport_primary_t *pri, int events)
tport_close(self);
tport_zap_secondary(self);
}
else {
su_close(s);
}
/* XXX - report error ? */
......
......@@ -323,7 +323,8 @@ struct tport_vtable
int vtp_secondary_size; /* Size of secondary tport */
int (*vtp_init_secondary)(tport_t *, int socket, int accepted);
int (*vtp_init_secondary)(tport_t *, int socket, int accepted,
char const **return_reason);
void (*vtp_deinit_secondary)(tport_t *);
void (*vtp_shutdown)(tport_t *, int how);
int (*vtp_set_events)(tport_t const *self);
......@@ -363,11 +364,23 @@ int tport_primary_compression(tport_primary_t *pri,
char const *compression,
tagi_t const *tl);
tport_t *tport_alloc_secondary(tport_primary_t *pri, int socket, int accepted);
tport_t *tport_base_connect(tport_primary_t *pri,
su_addrinfo_t *ai,
su_addrinfo_t *name,
tp_name_t const *tpn);
int tport_stream_init_primary(tport_primary_t *pri,
su_socket_t socket,
tp_name_t tpn[1],
su_addrinfo_t *ai,
tagi_t const *tags,
char const **return_reason);
tport_t *tport_alloc_secondary(tport_primary_t *pri,
int socket,
int accepted,
char const **return_reason);
int tport_accept(tport_primary_t *pri, int events);
void tport_zap_secondary(tport_t *self);
......@@ -439,7 +452,8 @@ int tport_tcp_init_client(tport_primary_t *,
tp_name_t tpn[1],
su_addrinfo_t *, tagi_t const *,
char const **return_culprit);
int tport_tcp_init_secondary(tport_t *self, int socket, int accepted);
int tport_tcp_init_secondary(tport_t *self, int socket, int accepted,
char const **return_reason);
int tport_recv_stream(tport_t *self);
int tport_send_stream(tport_t const *self, msg_t *msg,
msg_iovec_t iov[], int iovused);
......
......@@ -56,6 +56,24 @@
/* ---------------------------------------------------------------------- */
/* SCTP */
#undef MAX_STREAMS
#define MAX_STREAMS MAX_STREAMS
enum { MAX_STREAMS = 1 };
typedef struct tport_sctp_t
{
tport_t sctp_base[1];
msg_t *sctp_recv[MAX_STREAMS];
struct sctp_send {
msg_t *ss_msg;
msg_iovec_t *ss_unsent; /**< Pointer to first unsent iovec */
unsigned ss_unsentlen; /**< Number of unsent iovecs */
msg_iovec_t *ss_iov; /**< Iovecs allocated for sending */
unsigned ss_iovlen; /**< Number of allocated iovecs */
} sctp_send[MAX_STREAMS];
} tport_sctp_t;
#define TP_SCTP_MSG_MAX (65536)
static int tport_sctp_init_primary(tport_primary_t *,
......@@ -66,7 +84,11 @@ static int tport_sctp_init_client(tport_primary_t *,
tp_name_t tpn[1],
su_addrinfo_t *, tagi_t const *,
char const **return_culprit);
static int tport_sctp_init_secondary(tport_t *self, int socket, int accepted);
static int tport_sctp_init_secondary(tport_t *self, int socket, int accepted,
char const **return_reason);
static int tport_sctp_init_socket(tport_primary_t *pri,
int socket,
char const **return_reason);
static int tport_recv_sctp(tport_t *self);
static int tport_send_sctp(tport_t const *self, msg_t *msg,
msg_iovec_t iov[], int iovused);
......@@ -119,17 +141,27 @@ static int tport_sctp_init_primary(tport_primary_t *pri,
tagi_t const *tags,
char const **return_culprit)
{
int socket;
if (pri->pri_params->tpp_mtu > TP_SCTP_MSG_MAX)
pri->pri_params->tpp_mtu = TP_SCTP_MSG_MAX;
return tport_tcp_init_primary(pri, tpn, ai, tags, return_culprit);
socket = su_socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
if (socket == SOCKET_ERROR)
return *return_culprit = "socket", -1;
if (tport_sctp_init_socket(pri, socket, return_culprit) < 0)
return -1;
return tport_stream_init_primary(pri, socket, tpn, ai, tags, return_culprit);
}
static int tport_sctp_init_client(tport_primary_t *pri,
tp_name_t tpn[1],
su_addrinfo_t *ai,
tagi_t const *tags,
char const **return_culprit)
tp_name_t tpn[1],
su_addrinfo_t *ai,
tagi_t const *tags,
char const **return_culprit)
{
if (pri->pri_params->tpp_mtu > TP_SCTP_MSG_MAX)
pri->pri_params->tpp_mtu = TP_SCTP_MSG_MAX;
......@@ -137,13 +169,35 @@ static int tport_sctp_init_client(tport_primary_t *pri,
return tport_tcp_init_client(pri, tpn, ai, tags, return_culprit);
}
static int tport_sctp_init_secondary(tport_t *self, int socket, int accepted)
static int tport_sctp_init_secondary(tport_t *self, int socket, int accepted,
char const **return_reason)
{
self->tp_connected = 1;
if (su_setblocking(socket, 0) < 0)
return -1;
return *return_reason = "su_setblocking", -1;
if (accepted) {
/* Accepted socket inherit the init information from listen socket */
return 0;
}
else {
return tport_sctp_init_socket(self->tp_pri, socket, return_reason);
}
}
/** Initialize a SCTP socket */
static int tport_sctp_init_socket(tport_primary_t *pri,
int socket,
char const **return_reason)
{
struct sctp_initmsg initmsg = { 0 };
initmsg.sinit_num_ostreams = MAX_STREAMS;
initmsg.sinit_max_instreams = MAX_STREAMS;
if (setsockopt(socket, SOL_SCTP, SCTP_INITMSG, &initmsg, sizeof initmsg) < 0)
return *return_reason = "SCTP_INITMSG", -1;
return 0;
}
......
......@@ -102,26 +102,36 @@ int tport_tcp_init_primary(tport_primary_t *pri,
tagi_t const *tags,
char const **return_culprit)
{
int s;
int socket;
s = su_socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
socket = su_socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
if (s == SOCKET_ERROR)
if (socket == SOCKET_ERROR)
return *return_culprit = "socket", -1;
pri->pri_primary->tp_socket = s;
return tport_stream_init_primary(pri, socket, tpn, ai, tags, return_culprit);
}
int tport_stream_init_primary(tport_primary_t *pri,
su_socket_t socket,
tp_name_t tpn[1],
su_addrinfo_t *ai,
tagi_t const *tags,
char const **return_culprit)
{
pri->pri_primary->tp_socket = socket;
#if defined(__linux__)
/* Linux does not allow reusing TCP port while this one is open,
so we can safely call su_setreuseaddr() before bind(). */
su_setreuseaddr(s, 1);
su_setreuseaddr(socket, 1);
#endif
if (tport_bind_socket(s, ai, return_culprit) == SOCKET_ERROR)
return -1;
if (tport_bind_socket(socket, ai, return_culprit) == SOCKET_ERROR)
return su_close(socket), -1;
if (listen(s, pri->pri_params->tpp_qsize) == SOCKET_ERROR)
return *return_culprit = "listen", -1;
if (listen(socket, pri->pri_params->tpp_qsize) == SOCKET_ERROR)
return *return_culprit = "listen", su_close(socket), -1;
#if !defined(__linux__)
/* Allow reusing TCP sockets
......@@ -129,7 +139,7 @@ int tport_tcp_init_primary(tport_primary_t *pri,
* On Solaris & BSD, call setreuseaddr() after bind in order to avoid
* binding to a port owned by an existing server.
*/
su_setreuseaddr(s, 1);
su_setreuseaddr(socket, 1);
#endif
pri->pri_primary->tp_events = SU_WAIT_ACCEPT;
......@@ -149,16 +159,17 @@ int tport_tcp_init_client(tport_primary_t *pri,
return 0;
}
int tport_tcp_init_secondary(tport_t *self, int socket, int accepted)
int tport_tcp_init_secondary(tport_t *self, int socket, int accepted,
char const **return_reason)
{
int one = 1;
self->tp_connected = 1;
if (setsockopt(socket, SOL_TCP, TCP_NODELAY, (void *)&one, sizeof one) == -1)
return -1;
return *return_reason = "TCP_NODELAY", -1;
if (su_setblocking(socket, 0) < 0)
return -1;
return *return_reason = "su_setblocking", -1;
return 0;
}
......
......@@ -65,7 +65,8 @@ static int tport_tls_init_master(tport_primary_t *pri,
tagi_t const *tags,
char const **return_culprit);
static void tport_tls_deinit_primary(tport_primary_t *pri);
static int tport_tls_init_secondary(tport_t *self, int socket, int accepted);
static int tport_tls_init_secondary(tport_t *self, int socket, int accepted,
char const **return_reason);
static void tport_tls_deinit_secondary(tport_t *self);
static void tport_tls_shutdown(tport_t *self, int how);
static int tport_tls_set_events(tport_t const *self);
......@@ -211,23 +212,22 @@ static void tport_tls_deinit_primary(tport_primary_t *pri)
tls_free(tlspri->tlspri_master), tlspri->tlspri_master = NULL;
}
static int tport_tls_init_secondary(tport_t *self, int socket, int accepted)
static int tport_tls_init_secondary(tport_t *self, int socket, int accepted,
char const **return_reason)
{
tport_tls_primary_t *tlspri = (tport_tls_primary_t *)self->tp_pri;
tport_tls_t *tlstp = (tport_tls_t *)self;
tls_t *master = tlspri->tlspri_master;
if (tport_tcp_init_secondary(self, socket, accepted) < 0)
if (tport_tcp_init_secondary(self, socket, accepted, return_reason) < 0)
return -1;
if (accepted)
if (accepted) {
tlstp->tlstp_context = tls_init_slave(master, socket);
else
tlstp->tlstp_context = tls_init_client(master, socket);
if (!tlstp->tlstp_context)
return -1;
if (!tlstp->tlstp_context)
return *return_reason = "tls_init_slave", -1;
}
return 0;
}
......@@ -402,11 +402,19 @@ int tport_tls_send(tport_t const *self,
msg_iovec_t iov[],
int iovlen)
{
tport_tls_primary_t *tlspri = (tport_tls_primary_t *)self->tp_pri;
tport_tls_t *tlstp = (tport_tls_t *)self;
enum { TLSBUFSIZE = 2048 };
int i, j, n, m, size = 0;
int oldmask, mask;
if (tlstp->tlstp_context == NULL) {
tls_t *master = tlspri->tlspri_master;
tlstp->tlstp_context = tls_init_client(master, self->tp_socket);
if (!tlstp->tlstp_context)
return -1;
}
oldmask = tls_events(tlstp->tlstp_context, self->tp_events);
#if 0
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment