Commit d797bca0 authored by Pekka Pessi's avatar Pekka Pessi

Added HTTP CONNECT.

darcs-hash:20060324053838-88462-1e179e06a88541140cd10ce89b4404b2ba9bb4d2.gz
parent 8970cb2b
......@@ -418,6 +418,7 @@ struct tport_master {
su_sockaddr_t sockaddr;
#endif
} mr_nat[1];
};
/** Virtual funtion table */
......@@ -429,7 +430,6 @@ struct tport_vtable
int vtp_pri_size; /* Size of primary tport */
int (*vtp_init_primary)(tport_primary_t *pri,
int socket,
tp_name_t const tpn[1],
su_addrinfo_t *ai, tagi_t const *,
char const **return_culprit);
......@@ -449,6 +449,7 @@ struct tport_vtable
int (*vtp_recv)(tport_t *self);
int (*vtp_send)(tport_t const *self, msg_t *msg,
msg_iovec_t iov[], int iovused);
void (*vtp_deliver)(tport_t *self, msg_t *msg, su_time_t now);
};
#define STACK_RECV(tp, msg, now) \
......@@ -816,8 +817,10 @@ int tport_nat_finish(tport_primary_t *self);
static
tport_t *tport_connect(tport_primary_t *pri, su_addrinfo_t *ai,
tp_name_t const *tpn);
static tport_t *tport_base_connect(tport_primary_t *pri, su_addrinfo_t *ai,
tp_name_t const *tpn);
static tport_t *tport_base_connect(tport_primary_t *pri,
su_addrinfo_t *ai,
su_addrinfo_t *name,
tp_name_t const *tpn);
static int bind6only_check(void);
......@@ -842,6 +845,10 @@ int tport_getaddrinfo(char const *node, char const *service,
static void tport_freeaddrinfo(su_addrinfo_t *ai);
static
int tport_addrinfo_copy(su_addrinfo_t *dst, void *addr, socklen_t addrlen,
su_addrinfo_t const *src);
static int
tport_bind_client(tport_master_t *self, tp_name_t const *tpn,
char const * const transports[], enum tport_via public,
......@@ -902,9 +909,10 @@ static void tport_parse(tport_t *self, int complete, su_time_t now);
static void tport_deliver(tport_t *, msg_t *msg, msg_t *next,
struct sigcomp_udvm **udvm, su_time_t now);
void tport_base_deliver(tport_t *self, msg_t *msg, su_time_t now);
static tport_primary_t *tport_alloc_primary(tport_master_t *mr,
tport_vtable_t const *vtable,
int socket,
tp_name_t const tpn[1],
su_addrinfo_t *ai,
tagi_t const *tags,
......@@ -1127,7 +1135,6 @@ void tport_destroy(tport_t *self)
static
tport_primary_t *tport_alloc_primary(tport_master_t *mr,
tport_vtable_t const *vtable,
int socket,
tp_name_t const tpn[1],
su_addrinfo_t *ai,
tagi_t const *tags,
......@@ -1156,7 +1163,7 @@ tport_primary_t *tport_alloc_primary(tport_master_t *mr,
memcpy(tp->tp_params, mr->mr_params, sizeof (*tp->tp_params));
tp->tp_reusable = mr->mr_master->tp_reusable;
if (socket != SOCKET_ERROR)
if (!pri->pri_public)
tp->tp_addrinfo->ai_addr = &tp->tp_addr->su_sa;
SU_DEBUG_5(("%s(%p): new primary tport %p\n", __func__, mr, pri));
......@@ -1169,8 +1176,8 @@ tport_primary_t *tport_alloc_primary(tport_master_t *mr,
*return_culprit = "alloc";
else if (tport_set_params(tp, TAG_NEXT(tags)) < 0)
*return_culprit = "tport_set_params";
else if (vtable->vtp_init_primary(pri, socket, tpn, ai, tags,
return_culprit) < 0)
else if (vtable->vtp_init_primary &&
vtable->vtp_init_primary(pri, tpn, ai, tags, return_culprit) < 0)
;
else if (tport_setname(tp, vtable->vtp_name, ai, tpn->tpn_canon) == -1)
*return_culprit = "tport_setname";
......@@ -1188,119 +1195,6 @@ tport_primary_t *tport_alloc_primary(tport_master_t *mr,
return NULL;
}
#if 0
/** Process events for socket waiting to be connected
*/
static int tport_http_connected(su_root_magic_t *magic, su_wait_t *w, tport_t *self)
{
int events = su_wait_events(w, self->tp_socket);
tport_master_t *mr = self->tp_master;
su_wait_t wait[1] = { SU_WAIT_INIT };
int error;
SU_DEBUG_7(("%s(%p): events%s%s\n", __func__, self,
events & SU_WAIT_CONNECT ? " CONNECTED" : "",
events & SU_WAIT_ERR ? " ERR" : ""));
#if HAVE_POLL
assert(w->fd == self->tp_socket);
#endif
if (events & SU_WAIT_ERR)
tport_error_event(self);
if (!(events & SU_WAIT_CONNECT) || self->tp_closed) {
return 0;
}
error = su_soerror(self->tp_socket);
if (error) {
tport_error_report(self, error, NULL);
return 0;
}
#if 0
str = su_sprintf(autohome, "CONNECT %s HTTP/1.1\nUser-agent: %s\n\n",
url_string->us_str, ua);
#endif
su_root_deregister(mr->mr_root, self->tp_index);
self->tp_index = -1;
tport_set_events(self, SU_WAIT_IN | SU_WAIT_ERR,
SU_WAIT_CONNECT | );
self->tp_events =
if (su_wait_create(wait, self->tp_socket, self->tp_events) == -1 ||
(self->tp_index = su_root_register(mr->mr_root,
wait, tport_wakeup, self, 0))
== -1) {
tport_close(self);
}
else if (self->tp_queue && self->tp_queue[self->tp_qhead]) {
tport_send_event(self, events);
}
return 0;
}
int tport_tcp_socket_http_connect(su_socket_t s, const char *httpd,
tagi_t *tags)
{
char ua[] = "sofia-sip-1.11.6";
char *str;
int len, err;
url_string_t *url_string;
url_t const *url;
su_addrinfo_t *res = NULL, *ai, hints[1] = {{ 0 }};
su_sockaddr_t sa[1];
su_home_t autohome[SU_HOME_AUTO_SIZE(2048)];
su_home_auto(autohome, sizeof autohome);
#if a0
/* Get the SIP proxy address */
tl_gets(tags,
URLTAG_URL_REF(url_string),
TAG_END());
#endif
if ((err = su_getaddrinfo(httpd, NULL, hints, &res)) != 0) {
/* STUN_ERROR(err, su_getaddrinfo); */
return -1;
}
memset(sa, 0, sizeof(*sa));
for (ai = res; ai; ai = ai->ai_next) {
if (ai->ai_family != AF_INET)
continue;
/* info->ai_flags = ai->ai_flags; */
sa->su_family = ai->ai_family;
sa->su_len = ai->ai_addrlen;
memcpy(&sa->su_sa, ai->ai_addr, sizeof(su_sockaddr_t));
break;
}
/* If no port specified, use default */
if (!sa->su_port)
sa->su_port = htons(443);
if (res)
su_freeaddrinfo(res);
if (connect(s, &sa->su_sa, sa->su_len) == SOCKET_ERROR) {
err = su_errno();
if (err != EINPROGRESS && err != EAGAIN && err != EWOULDBLOCK)
/* TPORT_CONNECT_ERROR(err, connect) */;
}
su_home_deinit(autohome);
return 0;
}
#endif
/** Destroy a primary transport and its secondary transports. @internal */
static
......@@ -1343,7 +1237,7 @@ void tport_zap_primary(tport_primary_t *pri)
* registers the socket with suitable events to the root.
*
* @param dad parent (master or primary) transport object
* @param ainfo pointer to addrinfo structure
* @param ai pointer to addrinfo structure
* @param canon canonical name of node
* @param protoname name of the protocol
*/
......@@ -1356,10 +1250,6 @@ tport_primary_t *tport_listen(tport_master_t *mr,
{
tport_primary_t *pri = NULL;
su_socket_t s = SOCKET_ERROR;
int index = 0;
su_wait_t wait[1] = { SU_WAIT_INIT };
int err;
int errlevel = 3;
char buf[TPORT_HOSTPORTSIZE];
......@@ -1370,8 +1260,8 @@ tport_primary_t *tport_listen(tport_master_t *mr,
su_sockaddr_t *su = (void *)ai->ai_addr;
/* Log an error, return error */
#define TPORT_LISTEN_ERROR(errno, what) \
((void)(err = errno, s != SOCKET_ERROR ? su_close(s) : 0, \
#define TPORT_LISTEN_ERROR(errno, what) \
((void)(err = errno, \
((err == EADDRINUSE || err == EAFNOSUPPORT || \
err == ESOCKTNOSUPPORT || err == EPROTONOSUPPORT || \
err == ENOPROTOOPT ? 7 : 3) < SU_LOG_LEVEL ? \
......@@ -1385,18 +1275,8 @@ tport_primary_t *tport_listen(tport_master_t *mr,
su_seterrno(err)), \
(void *)NULL)
s = su_socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
#if SU_HAVE_IN6
if (s == SOCKET_ERROR) {
if (ai->ai_family == AF_INET6 && su_errno() == EAFNOSUPPORT)
errlevel = 7;
return TPORT_LISTEN_ERROR(su_errno(), "socket");
}
#endif
/* Create a primary transport object for another transport. */
pri = tport_alloc_primary(mr, vtable, s, tpn, ai, tags, &culprit);
pri = tport_alloc_primary(mr, vtable, tpn, ai, tags, &culprit);
if (pri == NULL)
return TPORT_LISTEN_ERROR(errno, culprit);
......@@ -1414,19 +1294,25 @@ tport_primary_t *tport_listen(tport_master_t *mr,
}
#endif
if (su_wait_create(wait, s, pri->pri_primary->tp_events) == -1)
return TPORT_LISTEN_ERROR(su_errno(), "su_wait_create");
if (pri->pri_primary->tp_socket != SOCKET_ERROR) {
int index = 0;
tport_t *tp = pri->pri_primary;
su_wait_t wait[1] = { SU_WAIT_INIT };
if (su_wait_create(wait, tp->tp_socket, tp->tp_events) == -1)
return TPORT_LISTEN_ERROR(su_errno(), "su_wait_create");
/* Register receiving or accepting function with events specified above */
index = su_root_register(mr->mr_root, wait, tport_wakeup_pri, tp, 0);
if (index == -1) {
su_wait_destroy(wait);
return TPORT_LISTEN_ERROR(su_errno(), "su_root_register");
}
/* Register receiving or accepting function with events specified above */
index = su_root_register(mr->mr_root, wait, tport_wakeup_pri,
pri->pri_primary, 0);
if (index == -1)
return TPORT_LISTEN_ERROR(su_errno(), "su_root_register");
tp->tp_index = index;
}
pri->pri_primary->tp_socket = s;
pri->pri_primary->tp_index = index;
pri->pri_primary->tp_connected = 0;
pri->pri_primary->tp_conn_orient = ai->ai_socktype != SOCK_DGRAM;
#if 0
if (nat_bound) {
......@@ -1444,17 +1330,19 @@ tport_primary_t *tport_listen(tport_master_t *mr,
}
static
int bind_socket(int socket,
su_addrinfo_t *ai,
char const **return_culprit)
int tport_bind_socket(int socket,
su_addrinfo_t *ai,
char const **return_culprit)
{
su_sockaddr_t *su = (su_sockaddr_t *)ai->ai_addr;
if (bind(socket, ai->ai_addr, ai->ai_addrlen) == -1)
if (bind(socket, ai->ai_addr, ai->ai_addrlen) == -1) {
return *return_culprit = "bind", -1;
}
if (getsockname(socket, ai->ai_addr, &ai->ai_addrlen) == SOCKET_ERROR)
if (getsockname(socket, ai->ai_addr, &ai->ai_addrlen) == SOCKET_ERROR) {
return *return_culprit = "getsockname", -1;
}
#if defined (__linux__) && defined (SU_HAVE_IN6)
if (ai->ai_family == AF_INET6) {
......@@ -1526,7 +1414,7 @@ tport_t *tport_alloc_secondary(tport_primary_t *pri, int socket, int accepted)
tport_master_t *mr = pri->pri_master;
tport_t *self;
self = su_home_clone(mr->mr_home, sizeof *self);
self = su_home_clone(mr->mr_home, pri->pri_vtable->vtp_secondary_size);
if (self) {
SU_DEBUG_7(("%s(%p): new secondary tport %p\n", __func__, pri, self));
......@@ -1575,7 +1463,7 @@ tport_t *tport_connect(tport_primary_t *pri,
if (pri->pri_vtable->vtp_connect)
return pri->pri_vtable->vtp_connect(pri, ai, tpn);
else
return tport_base_connect(pri, ai, tpn);
return tport_base_connect(pri, ai, ai, tpn);
}
/**Create a connected transport object with socket.
......@@ -1585,12 +1473,13 @@ tport_t *tport_connect(tport_primary_t *pri,
* root.
*
* @param pri primary transport object
* @param ai pointer to addrinfo structure
* @param name pointer to addrinfo structure
* @param tpn canonical name of node
*/
static
tport_t *tport_base_connect(tport_primary_t *pri,
su_addrinfo_t *ai,
su_addrinfo_t *name,
tp_name_t const *tpn)
{
tport_master_t *mr = pri->pri_master;
......@@ -1672,13 +1561,12 @@ tport_t *tport_base_connect(tport_primary_t *pri,
TPORT_CONNECT_ERROR(su_errno(), su_root_register);
/* Set sockname for the tport */
if (tport_setname(self, tpn->tpn_proto, ai, tpn->tpn_canon) == -1)
if (tport_setname(self, tpn->tpn_proto, name, tpn->tpn_canon) == -1)
TPORT_CONNECT_ERROR(su_errno(), tport_setname);
self->tp_socket = s;
self->tp_index = index;
self->tp_events = events;
self->tp_connected = ai->ai_socktype != SOCK_DGRAM;
self->tp_conn_orient = 1;
#if HAVE_SIGCOMP
......@@ -1919,6 +1807,10 @@ tport_vtable_t const *tport_vtable_by_name(char const *protoname,
continue;
if (strcasecmp(vtable->vtp_name, protoname))
continue;
assert(vtable->vtp_pri_size >= sizeof (tport_primary_t));
assert(vtable->vtp_secondary_size >= sizeof (tport_t));
return vtable;
}
......@@ -1989,6 +1881,9 @@ int tport_bind_client(tport_master_t *mr,
tport_vtable_t const *vtable;
if (public == tport_type_local)
public = tport_type_client;
SU_DEBUG_5(("%s(%p) to " TPN_FORMAT "\n", __func__, mr, TPN_ARGS(tpn)));
memset(tpn0, 0, sizeof(tpn0));
......@@ -2018,7 +1913,7 @@ int tport_bind_client(tport_master_t *mr,
hints->ai_canonname = "*";
if (!(pri = tport_alloc_primary(mr, vtable, -1, tpn0, hints, tags, &why)))
if (!(pri = tport_alloc_primary(mr, vtable, tpn0, hints, tags, &why)))
break;
pri->pri_public = tport_type_client; /* XXX */
......@@ -2127,12 +2022,8 @@ int tport_bind_server(tport_master_t *mr,
if (!vtable)
continue;
memcpy(ainfo, ai, sizeof ainfo);
tport_addrinfo_copy(ainfo, su, sizeof su, ai);
ainfo->ai_canonname = (char *)canon;
ainfo->ai_addr = memcpy(su, ai->ai_addr, ai->ai_addrlen);
ainfo->ai_next = NULL;
su->su_port = htons(port);
SU_DEBUG_9(("%s(%p): calling tport_listen for %s\n",
......@@ -2154,8 +2045,8 @@ int tport_bind_server(tport_master_t *mr,
not_supported = 0;
if (port0 == 0 && port == 0) {
port = port1 = ntohs(pri->pri_primary->tp_addr->su_port);
assert(port != 0);
port = port1 = ntohs(su->su_port);
assert(public != tport_type_server || port != 0);
}
}
......@@ -2513,6 +2404,24 @@ void tport_freeaddrinfo(su_addrinfo_t *ai)
}
}
static
int tport_addrinfo_copy(su_addrinfo_t *dst, void *addr, socklen_t addrlen,
su_addrinfo_t const *src)
{
if (addrlen < src->ai_addrlen)
return -1;
memcpy(dst, src, sizeof *dst);
if (src->ai_addrlen < addrlen)
memset(addr, 0, addrlen);
dst->ai_addr = memcpy(addr, src->ai_addr, src->ai_addrlen);
dst->ai_next = NULL;
return 0;
}
static
int tport_init_compression(tport_primary_t *pri,
char const *compression,
......@@ -2829,7 +2738,7 @@ int tport_setname(tport_t *self,
if (ai->ai_addr) {
assert(ai->ai_family), assert(ai->ai_socktype), assert(ai->ai_protocol);
memcpy(selfai->ai_addr, ai->ai_addr, selfai->ai_addrlen = ai->ai_addrlen);
memcpy(self->tp_addr, ai->ai_addr, selfai->ai_addrlen = ai->ai_addrlen);
}
return 0;
......@@ -3048,7 +2957,6 @@ int tport_accept(tport_primary_t *pri, int events)
(i = su_root_register(root, wait, wakeup, self, 0)) != -1) {
self->tp_index = i;
self->tp_connected = 1;
self->tp_conn_orient = 1;
self->tp_events = events;
......@@ -3162,6 +3070,17 @@ static int tport_wakeup_pri(su_root_magic_t *m, su_wait_t *w, tport_t *self)
assert(w->fd == self->tp_socket);
#endif
SU_DEBUG_7(("%s(%p): events%s%s%s%s%s%s\n",
"tport_wakeup_pri", self,
events & SU_WAIT_IN ? " IN" : "",
SU_WAIT_ACCEPT != SU_WAIT_IN &&
(events & SU_WAIT_ACCEPT) ? " ACCEPT" : "",
events & SU_WAIT_OUT ? " OUT" : "",
events & SU_WAIT_HUP ? " HUP" : "",
events & SU_WAIT_ERR ? " ERR" : "",
self->tp_closed ? " (closed)" : ""));
if (pri->pri_vtable->vtp_wakeup_pri)
return pri->pri_vtable->vtp_wakeup_pri(pri, events);
else
......@@ -3177,6 +3096,14 @@ static int tport_wakeup(su_root_magic_t *magic, su_wait_t *w, tport_t *self)
assert(w->fd == self->tp_socket);
#endif
SU_DEBUG_7(("%s(%p): events%s%s%s%s%s\n",
"tport_wakeup", self,
events & SU_WAIT_IN ? " IN" : "",
events & SU_WAIT_OUT ? " OUT" : "",
events & SU_WAIT_HUP ? " HUP" : "",
events & SU_WAIT_ERR ? " ERR" : "",
self->tp_closed ? " (closed)" : ""));
if (self->tp_pri->pri_vtable->vtp_wakeup)
return self->tp_pri->pri_vtable->vtp_wakeup(self, events);
else
......@@ -3187,14 +3114,6 @@ static int tport_base_wakeup(tport_t *self, int events)
{
int error = 0;
SU_DEBUG_7(("%s(%p): events%s%s%s%s%s\n",
"tport_base_wakeup", self,
events & SU_WAIT_IN ? " IN" : "",
events & SU_WAIT_OUT ? " OUT" : "",
events & SU_WAIT_HUP ? " HUP" : "",
events & SU_WAIT_ERR ? " ERR" : "",
self->tp_closed ? " (closed)" : ""));
if (events & SU_WAIT_ERR)
error = tport_error_event(self);
......@@ -3272,7 +3191,14 @@ static void tport_recv_event(tport_t *self)
now = su_now();
/* Receive data from socket */
if ((again = tport_recv_data(self)) < 0) {
again = tport_recv_data(self);
self->tp_time = su_time_ms(now);
if (again == 3) /* STUN keepalive */
return;
if (again < 0) {
int error = su_errno();
if (error != EAGAIN && error != EWOULDBLOCK) {
......@@ -3281,15 +3207,12 @@ static void tport_recv_event(tport_t *self)
if (tport_is_connected(self))
tport_close(self);
return;
} else {
}
else {
SU_DEBUG_3(("%s: recvfrom(): %s (%d)\n", __func__,
su_strerror(EAGAIN), EAGAIN));
}
}
else if (again == 3) /* STUN keepalive */
return;
self->tp_time = su_time_ms(now);
if (again >= 0)
tport_parse(self, !again, now);
......@@ -3421,17 +3344,21 @@ void tport_deliver(tport_t *self, msg_t *msg, msg_t *next,
}
SU_DEBUG_7(("%s(%p): %smsg %p (%u bytes)"
" to stack from " TPN_FORMAT " next=%p\n",
" from " TPN_FORMAT " next=%p\n",
__func__, self, error ? "bad " : "", msg, msg_size(msg),
TPN_ARGS(d->d_from), next));
ref = tport_incref(self);
/* Pass message to the protocol stack */
STACK_RECV(self, msg, now);
if (self->tp_pri->pri_vtable->vtp_deliver) {
self->tp_pri->pri_vtable->vtp_deliver(self, msg, now);
}
else
tport_base_deliver(self, msg, now);
#if HAVE_SIGCOMP
if (d->d_udvm && *d->d_udvm)
sigcomp_udvm_accept(*d->d_udvm, NULL);
sigcomp_udvm_accept(*d->d_udvm, NULL); /* reject */
#endif
tport_decref(&ref);
......@@ -3439,6 +3366,13 @@ void tport_deliver(tport_t *self, msg_t *msg, msg_t *next,
d->d_msg = NULL;
}
/** Pass message to the protocol stack */
void
tport_base_deliver(tport_t *self, msg_t *msg, su_time_t now)
{
STACK_RECV(self, msg, now);
}
/** Return source transport object for delivered message */
tport_t *tport_delivered_by(tport_t const *tp, msg_t const *msg)
{
......@@ -3489,13 +3423,12 @@ tport_delivered_using_udvm(tport_t *tp, msg_t const *msg,
}
static int tport_recv_dgram_r(tport_t const *self, msg_t **mmsg, int N);
static int tport_recv_stun_dgram(tport_t const *self, int N);
static int tport_recv_sigcomp_dgram(tport_t *self, int N);
#if HAVE_SIGCOMP
static int tport_recv_sigcomp_stream(tport_t *self);
#endif
static int tport_recv_sigcomp_dgram(tport_t *self, int N);
static int tport_recv_sctp(tport_t *self);
/** Allocate message for N bytes,
......@@ -3571,334 +3504,41 @@ static int tport_recv_error_report(tport_t *self)
return -1;
}
/** Receive datagram.
*
* @retval -1 error
* @retval 0 end-of-stream
*/
static
int tport_recv_dgram(tport_t *self)
#if HAVE_SIGCOMP
/** Try to receive stream data using SigComp. */
static int tport_recv_sigcomp_stream(tport_t *self)
{
int N;
int s = self->tp_socket;
unsigned char sample[2];
struct sigcomp_udvm *udvm;
int retval;
/* Simulate packet loss */
if (self->tp_params->tpp_drop &&
su_randint(0, 1000) < self->tp_params->tpp_drop) {
recv(self->tp_socket, sample, 1, 0);
SU_DEBUG_3(("tport(%p): simulated packet loss!\n", self));
return 0;
}
SU_DEBUG_7(("%s(%p)\n", __func__, self));
/* Peek for first two bytes in message:
determine if this is stun, sigcomp or sip
*/
N = recv(s, sample, sizeof sample, MSG_PEEK | MSG_TRUNC);
/* Peek for first byte in stream,
determine if this is a compressed stream or not */
if (self->tp_sigcomp->sc_infmt == format_is_unknown) {
unsigned char sample;
int n;
if (N < 0) {
if (su_errno() == EAGAIN || su_errno() == EWOULDBLOCK)
N = 0;
}
else if (N <= 1) {
SU_DEBUG_1(("%s(%p): runt of %u bytes\n", "tport_recv_dgram", self, N));
recv(s, sample, sizeof sample, 0);
N = 0;
}
#if MSG_TRUNC
else if ((N = su_getmsgsize(s)) < 0)
SU_DEBUG_1(("%s: su_getmsgsize(): %s (%d)\n", __func__,
su_strerror(su_errno()), su_errno()));
#endif
else if ((sample[0] & 0xf8) == 0xf8) {
return tport_recv_sigcomp_dgram(self, N); /* SigComp */
}
else if (sample[0] == 0 || sample[0] == 1) {
return tport_recv_stun_dgram(self, N); /* STUN */
}
else
return tport_recv_dgram_r(self, &self->tp_msg, N);
n = recv(self->tp_socket, &sample, 1, MSG_PEEK);
if (n < 0)
return n;
return N;
}
if (n == 0) {
recv(self->tp_socket, &sample, 1, 0);
return 0; /* E-o-S from first message */
}
/** Receive datagram statelessly.
*
* @retval -1 error
* @retval 0 end-of-stream
* @retval 1 normal receive (should never happen)
* @retval 2 incomplete recv, recv again (should never happen)
* @retval 3 STUN keepalive, ignore
*/
static
int tport_recv_dgram_r(tport_t const *self, msg_t **mmsg, int N)
{
msg_t *msg;
int n, veclen;
su_sockaddr_t *from;
socklen_t *fromlen;
msg_iovec_t iovec[msg_n_fragments] = {{ 0 }};
if ((sample & 0xf8) != 0xf8) {
/* Not SigComp, receive as usual */
if (tport_is_primary(self)) {
SU_DEBUG_1(("%s(%p): receive semantics not implemented\n",
__func__, self));
su_seterrno(EINVAL); /* Internal error */
return -1;
}
assert(*mmsg == NULL);
veclen = tport_recv_iovec(self, mmsg, iovec, N, 1);
if (veclen < 0)
return -1;
msg = *mmsg;
n = su_vrecv(self->tp_socket, iovec, veclen, 0,
from = msg_addr(msg), fromlen = msg_addrlen(msg));
if (n == SOCKET_ERROR) {
int error = su_errno();
msg_destroy(msg); *mmsg = NULL;
su_seterrno(error);
return -1;
}
SU_CANONIZE_SOCKADDR(from);
assert(n <= N); /* FIONREAD tells the size of all messages.. */
if (self->tp_master->mr_dump_file && !self->tp_pri->pri_threadpool)
tport_dump_iovec(self, msg, n, iovec, veclen, "recv", "from");
msg_recv_commit(msg, n, 1); /* Mark buffer as used */
return 0;
}
/** Receive data from datagram using SigComp. */
static int tport_recv_sigcomp_dgram(tport_t *self, int N)
{
char dummy[1];
int error = EBADMSG;
#if HAVE_SIGCOMP
struct sigcomp_udvm *udvm;
if (self->tp_sigcomp->sc_udvm == 0)
self->tp_sigcomp->sc_udvm = tport_init_udvm(self);
udvm = self->tp_sigcomp->sc_udvm;
if (udvm) {
retval = tport_recv_sigcomp_r(self, &self->tp_msg, udvm, N);
if (retval < 0)
sigcomp_udvm_reject(udvm);
return retval;
}
error = su_errno();
#endif
recv(self->tp_socket, dummy, 1, 0); /* remove msg from socket */
/* XXX - send NACK ? */
return su_seterrno(error);