Commit a13e6fd3 authored by Pekka Pessi's avatar Pekka Pessi

tport: added tport_is_clear_to_send(), allow use of tport_pending() without msg

The error callback from tport can now be registered even if there is no
request pending on transport (e.g., when keeping a transport connection open
for inbound messages).

darcs-hash:20070601191534-65a35-7b3e22bb692673dc70e1630d1e63313ed840dcdc.gz
parent 02734a69
......@@ -279,6 +279,9 @@ TPORT_DLL int tport_is_shutdown(tport_t const *self);
/** Test if transport is connected. @NEW_1_12_5 */
TPORT_DLL int tport_is_connected(tport_t const *self);
/** Test if transport can be used to send message. @NEW_1_12_7 */
TPORT_DLL int tport_is_clear_to_send(tport_t const *self);
/** Set transport magic. */
TPORT_DLL void tport_set_magic(tport_t *self, tp_magic_t *magic);
......
......@@ -403,7 +403,7 @@ TPORT_DLL extern tag_typedef_t tptag_log;
* Use with tport_tcreate(), nua_create(), nta_agent_create(),
* nth_engine_create(), or initial nth_site_create().
*
* @sa #TPORT_DUMP, TPTAG_DUMP()
* @sa #TPORT_LOG environment variable, TPTAG_DUMP()
*
* @NEW_1_12_5
*/
......@@ -418,7 +418,7 @@ TPORT_DLL extern tag_typedef_t tptag_dump;
* Use with tport_tcreate(), nta_agent_create(), nua_create(),
* nth_engine_create(), or initial nth_site_create().
*
* @sa #TPORT_DUMP, TPTAG_LOG().
* @sa #TPORT_DUMP environment variable, TPTAG_LOG().
*
* @NEW_1_12_5
*/
......
......@@ -166,7 +166,7 @@ int tport_is_secondary(tport_t const *self)
self->tp_pri->pri_primary != self;
}
/** Test if transport has been registered */
/** Test if transport has been registered to su_root_t */
int tport_is_registered(tport_t const *self)
{
return self->tp_index != 0;
......@@ -291,6 +291,19 @@ int tport_is_connected(tport_t const *self)
return self->tp_is_connected;
}
/** Test if transport can be used to send message. @NEW_1_12_7 */
int tport_is_clear_to_send(tport_t const *self)
{
return
tport_is_master(self) ||
tport_is_primary(self) ||
(tport_is_secondary(self) &&
tport_is_registered(self) &&
self->tp_reusable &&
!self->tp_closed &&
!self->tp_send_close);
}
/** MTU for transport */
su_inline unsigned tport_mtu(tport_t const *self)
{
......@@ -1964,7 +1977,7 @@ int tport_addrinfo_copy(su_addrinfo_t *dst, void *addr, socklen_t addrlen,
*/
void tport_close(tport_t *self)
{
SU_DEBUG_5(("%s(%p): " TPN_FORMAT "\n", "tport_close", (void *)self,
SU_DEBUG_5(("%s(%p): " TPN_FORMAT "\n", "tport_close", (void *)self,
TPN_ARGS(self->tp_name)));
self->tp_closed = 1;
......@@ -2361,6 +2374,7 @@ void tport_error_report(tport_t *self, int errcode,
else if (errcode > 0)
errmsg = su_strerror(errcode);
else
/* Should be something like ENOTCONN */
errcode = 0, errmsg = "stream closed";
if (addr && addr->su_family == AF_UNSPEC)
......@@ -2370,12 +2384,12 @@ void tport_error_report(tport_t *self, int errcode,
if (errcode > 0 && tport_has_connection(self))
self->tp_reusable = 0;
if (addr == NULL && tport_is_connection_oriented(self))
addr = self->tp_addr;
/* Report error */
if (addr && tport_pending_error(self, addr, errcode))
;
else if (tport_is_secondary(self) &&
tport_pending_error(self, NULL, errcode) > 0)
;
else if (self->tp_master->mr_tpac->tpac_error) {
char *dstname = NULL;
char hp[TPORT_HOSTPORTSIZE];
......@@ -3071,30 +3085,24 @@ tport_t *tport_tsend(tport_t *self,
/* Select a primary protocol, make a fresh connection */
self = primary->pri_primary;
}
else if (tport_is_secondary(self) && tport_is_clear_to_send(self)) {
self = self;
}
/*
* Try to find an already open connection to the destination,
* or get a primary protocol
*/
else {
if (tport_is_secondary(self) &&
tport_is_registered(self) &&
self->tp_reusable &&
!self->tp_closed &&
!self->tp_send_close) {
self = self;
/* If primary, resolve the destination address, store it in the msg */
if (tport_resolve(primary->pri_primary, msg, tpn) < 0) {
return NULL;
}
/*
* Try to find an already open connection to the destination,
* or get a primary protocol
*/
else {
/* If primary, resolve the destination address, store it in the msg */
if (tport_resolve(primary->pri_primary, msg, tpn) < 0) {
return NULL;
}
resolved = 1;
self = tport_by_addrinfo(primary, msg_addrinfo(msg), tpn);
resolved = 1;
self = tport_by_addrinfo(primary, msg_addrinfo(msg), tpn);
if (!self)
self = primary->pri_primary;
}
if (!self)
self = primary->pri_primary;
}
if (tport_is_primary(self)) {
......@@ -3879,9 +3887,12 @@ int tport_pend(tport_t *self,
{
tport_pending_t *pending;
if (self == NULL || msg == NULL || callback == NULL || client == NULL)
if (self == NULL || callback == NULL || client == NULL)
return -1;
if (msg == NULL && tport_is_primary(self))
return -1;
SU_DEBUG_7(("tport_pend(%p): pending %p for %s/%s:%s (already %u)\n",
(void *)self, (void *)msg,
self->tp_protoname, self->tp_host, self->tp_port,
......@@ -3931,7 +3942,7 @@ int tport_release(tport_t *self,
{
tport_pending_t *pending;
if (self == NULL || msg == NULL || pendd <= 0 || pendd > (int)self->tp_plen)
if (self == NULL || pendd <= 0 || pendd > (int)self->tp_plen)
return su_seterrno(EINVAL), -1;
pending = self->tp_pending + (pendd - 1);
......@@ -3969,7 +3980,7 @@ tport_pending_error(tport_t *self, su_sockaddr_t const *dst, int error)
msg_t *msg;
su_addrinfo_t const *ai;
assert(self); assert(dst);
assert(self);
callbacks = 0;
reported = ++self->tp_reported;
......@@ -3980,22 +3991,25 @@ tport_pending_error(tport_t *self, su_sockaddr_t const *dst, int error)
for (i = 0; i < self->tp_plen; i++) {
pending = self->tp_pending + i;
if (!pending->p_callback || !pending->p_msg)
if (!pending->p_callback)
continue;
if (pending->p_reported == reported)
continue;
msg = pending->p_msg;
ai = msg_addrinfo(msg);
if (su_cmp_sockaddr(dst, (su_sockaddr_t *)ai->ai_addr) != 0)
continue;
if (dst && msg) {
ai = msg_addrinfo(msg);
pending->p_reported = reported;
if (su_cmp_sockaddr(dst, (su_sockaddr_t *)ai->ai_addr) != 0)
continue;
}
msg_set_errno(msg, error);
pending->p_reported = reported;
pending->p_callback(self->TP_STACK, pending->p_client, self, msg, error);
callbacks++;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment