Commit 47c28a41 authored by Simon Morlat's avatar Simon Morlat

new fork contexts start working quite well

parent 9c4b018b
......@@ -232,6 +232,19 @@ void RequestSipEvent::linkTransactions(){
}
}
void RequestSipEvent::unlinkTransactions() {
shared_ptr<OutgoingTransaction> ot;
shared_ptr<IncomingTransaction> it;
if (mOutgoingAgent && mIncomingAgent &&
(ot=dynamic_pointer_cast<OutgoingTransaction>(mOutgoingAgent))!=NULL &&
(it=dynamic_pointer_cast<IncomingTransaction>(mIncomingAgent))!=NULL){
ot->mIncoming=NULL;
it->mOutgoing=NULL;
}
}
void RequestSipEvent::suspendProcessing() {
SipEvent::suspendProcessing();
......
......@@ -201,6 +201,7 @@ public:
bool findIncomingSubject(const char *searched);
const char *findIncomingSubject(const std::list<std::string> &in);
bool mRecordRouteAdded;
void unlinkTransactions();
private:
void linkTransactions();
};
......
......@@ -47,7 +47,6 @@ void ForkBasicContext::onResponse(const shared_ptr<BranchInfo> &br, const shared
su_timer_destroy(mDecisionTimer);
mDecisionTimer=NULL;
}
setFinished();
}else{
if (allBranchesAnswered()){
finishIncomingTransaction();
......@@ -70,7 +69,6 @@ void ForkBasicContext::finishIncomingTransaction(){
}else{
forwardResponse(best);
}
setFinished();
}
void ForkBasicContext::onDecisionTimer(){
......
......@@ -56,13 +56,11 @@ void ForkCallContext::cancel() {
void ForkCallContext::cancelOthers(const shared_ptr<BranchInfo> & br) {
auto branches=getBranches();
for (auto it = branches.begin(); it != branches.end();) {
for (auto it = branches.begin(); it != branches.end();++it) {
if (*it != br) {
shared_ptr<OutgoingTransaction> tr = (*it)->mTransaction;
removeBranch(*it);
tr->cancel();
} else {
++it;
}
}
}
......
......@@ -42,8 +42,6 @@ protected:
virtual void onResponse(const std::shared_ptr<BranchInfo> &br, const std::shared_ptr<ResponseSipEvent> &event);
virtual bool onNewRegister(const url_t *url, const std::string &uid);
virtual void cancel();
private:
const int *getUrgentCodes();
void onShortTimer();
......
......@@ -32,12 +32,17 @@ void ForkContext::__timer_callback(su_root_magic_t *magic, su_timer_t *t, su_tim
(static_cast<ForkContext*>(arg))->processLateTimeout();
}
void ForkContext::sOnFinished(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg){
(static_cast<ForkContext*>(arg))->onFinished();
}
ForkContext::ForkContext(Agent *agent, const std::shared_ptr<RequestSipEvent> &event, shared_ptr<ForkContextConfig> cfg, ForkContextListener* listener) :
mListener(listener),
mAgent(agent),
mEvent(make_shared<RequestSipEvent>(event)),
mCfg(cfg),
mLateTimer(NULL),
mFinishTimer(NULL),
mLateTimerExpired(false) {
su_home_init(&mHome);
init();
......@@ -48,6 +53,7 @@ void ForkContext::onLateTimeout(){
void ForkContext::processLateTimeout() {
su_timer_destroy(mLateTimer);
mLateTimer=NULL;
mLateTimerExpired=true;
onLateTimeout();
setFinished();
......@@ -61,7 +67,7 @@ struct dest_finder{
// don't care about transport
}
bool operator()(const shared_ptr<BranchInfo> &br){
const url_t *dest=br->mTransaction->getRequestUri();
const url_t *dest=br->mRequest->getMsgSip()->getSip()->sip_request->rq_url;
return 0 == strcmp(url_port(dest), cttport)
&& 0 == strcmp(dest->url_host, ctthost);
}
......@@ -163,20 +169,31 @@ void ForkContext::init() {
void ForkContext::addBranch(const shared_ptr<RequestSipEvent> &ev, const string &uid) {
shared_ptr<OutgoingTransaction> ot=ev->createOutgoingTransaction();
shared_ptr<BranchInfo> br=createBranchInfo();
if (mIncoming && mBranches.size()==0){
/*for some reason shared_from_this() cannot be invoked within the ForkContext constructor, so we do this initialization now*/
mIncoming->setProperty<ForkContext>("ForkContext",shared_from_this());
}
//unlink the incoming and outgoing transactions which is done by default, since now the forkcontext is managing them.
ev->unlinkTransactions();
br->mRequest=ev;
br->mTransaction=ot;
br->mUid=uid;
ot->setProperty("BranchInfo",br);
mBranches.push_back(br);
onNewBranch(br);
mBranches.push_back(br);
}
bool ForkContext::processCancel ( const std::shared_ptr< RequestSipEvent >& ev ) {
shared_ptr<IncomingTransaction> transaction = dynamic_pointer_cast<IncomingTransaction>(ev->getIncomingAgent());
if (ev->getMsgSip()->getSip()->sip_request->rq_method==sip_method_cancel){
shared_ptr<BranchInfo> binfo=transaction->getProperty<BranchInfo>("BranchInfo");
binfo->mForkCtx->cancel();
return true;
shared_ptr<ForkContext> ctx=transaction->getProperty<ForkContext>("ForkContext");
if (ctx) {
ctx->cancel();
ev->terminateProcessing();
return true;
}
}
return false;
}
......@@ -191,11 +208,18 @@ bool ForkContext::processResponse ( const shared_ptr< ResponseSipEvent >& ev ) {
copyEv->suspendProcessing();
binfo->mLastResponse=copyEv;
binfo->mForkCtx->onResponse(binfo,copyEv);
if (copyEv->isSuspended()){
//let the event go through but it will not be sent*/
ev->setIncomingAgent(NULL);
//the event may go through but it will not be sent*/
ev->setIncomingAgent(NULL);
if (!copyEv->isSuspended()){
//LOGD("A response has been submitted");
//copyEv has been resubmited, so stop original event.
ev->terminateProcessing();
}else {
//LOGD("The response has been retained");
}
return true;
}else{
//LOGD("ForkContext: un-processed response");
}
}
return false;
......@@ -212,17 +236,30 @@ ForkContext::~ForkContext() {
su_home_deinit(&mHome);
}
void ForkContext::setFinished(){
if (mLateTimer){
su_timer_destroy(mLateTimer);
mLateTimer=NULL;
}
//force reference to be loosed immediately, to avoid circular dependencies.
void ForkContext::onFinished() {
su_timer_destroy(mFinishTimer);
mFinishTimer=NULL;
//force references to be loosed immediately, to avoid circular dependencies.
mEvent.reset();
mIncoming.reset();
for_each(mBranches.begin(),mBranches.end(),mem_fn(&BranchInfo::clear));
mBranches.clear();
mListener->onForkContextFinished(shared_from_this());
mSelf.reset(); //this must be the last thing to do
}
void ForkContext::setFinished(){
if (mLateTimer){
su_timer_destroy(mLateTimer);
mLateTimer=NULL;
}
mSelf=shared_from_this();//to prevent destruction until finishTimer arrives
mFinishTimer=su_timer_create(su_root_task(mAgent->getRoot()), 0);
su_timer_set_interval(mFinishTimer, &ForkContext::sOnFinished, this, (su_duration_t)0);
}
bool ForkContext::shouldFinish() {
return true;
}
void ForkContext::onNewBranch ( const std::shared_ptr<BranchInfo> &br ) {
......@@ -265,6 +302,9 @@ std::shared_ptr<ResponseSipEvent> ForkContext::forwardResponse(const std::shared
}
if (code>=200){
mIncoming.reset();
if (shouldFinish()){
setFinished();
}
}
return ev;
}
......
......@@ -64,6 +64,7 @@ public:
class ForkContext : public std::enable_shared_from_this<ForkContext>{
private:
static void __timer_callback(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg);
static void sOnFinished(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg);
ForkContextListener * mListener;
std::list<std::shared_ptr<BranchInfo>> mBranches;
void init();
......@@ -75,9 +76,11 @@ protected:
std::shared_ptr<ResponseSipEvent> mLastResponseSent;
std::shared_ptr<IncomingTransaction> mIncoming;
std::shared_ptr<ForkContextConfig> mCfg;
std::shared_ptr<ForkContext> mSelf;
su_timer_t *mLateTimer;
su_timer_t *mFinishTimer;
bool mLateTimerExpired;
//Terminate the fork process. Do not do anything after calling setFinished(), because it might destroy this.
//Mark the fork process as terminated. The real destruction is performed asynchrously, in next main loop iteration.
void setFinished();
//Used by derived class to allocate a derived type of BranchInfo if necessary.
virtual std::shared_ptr<BranchInfo> createBranchInfo();
......@@ -89,6 +92,10 @@ protected:
virtual void onResponse(const std::shared_ptr<BranchInfo> &br, const std::shared_ptr<ResponseSipEvent> &event)=0;
//Notifies the expiry of the final fork timeout.
virtual void onLateTimeout();
//Requests the derived class if the fork context should finish now.
virtual bool shouldFinish();
//Notifies the destruction of the fork context. Implementors should use it to perform their unitialization, but shall never forget to upcall to the parent class !*/
virtual void onFinished();
//Request the forwarding the last response from a given branch
std::shared_ptr<ResponseSipEvent> forwardResponse(const std::shared_ptr<BranchInfo> &br);
//Request the forwarding of a response supplied in argument.
......
......@@ -34,7 +34,7 @@ ForkMessageContext::ForkMessageContext(Agent *agent, const std::shared_ptr<Reque
//start the acceptance timer immediately
if (mCfg->mForkLate && mCfg->mDeliveryTimeout>30){
mAcceptanceTimer=su_timer_create(su_root_task(mAgent->getRoot()), 0);
su_timer_set_interval(mAcceptanceTimer, &ForkMessageContext::sOnAcceptanceTimer, this, (su_duration_t)20000);
su_timer_set_interval(mAcceptanceTimer, &ForkMessageContext::sOnAcceptanceTimer, this, (su_duration_t)16000);
}
mAcceptanceTimer=NULL;
mDeliveredCount=0;
......@@ -46,6 +46,11 @@ ForkMessageContext::~ForkMessageContext() {
LOGD("Destroy ForkMessageContext %p", this);
}
bool ForkMessageContext::shouldFinish() {
return false; //the messaging fork context controls its termination.
}
void ForkMessageContext::checkFinished(){
if (mIncoming==NULL && !mCfg->mForkLate){
setFinished();
......@@ -129,7 +134,7 @@ void ForkMessageContext::sOnAcceptanceTimer(su_root_magic_t* magic, su_timer_t*
void ForkMessageContext::onNewBranch(const shared_ptr<BranchInfo> &br) {
if (br->mUid.size()>0){
/*check for a branch already existing with this uid*/
/*check for a branch already existing with this uid, and eventually clean it*/
shared_ptr<BranchInfo> tmp=findBranchByUid(br->mUid);
if (tmp){
removeBranch(tmp);
......
......@@ -40,6 +40,7 @@ protected:
virtual bool onNewRegister(const url_t *url, const std::string &uid);
virtual void onNewBranch(const std::shared_ptr<BranchInfo> &br);
virtual void onResponse(const std::shared_ptr<BranchInfo> &br, const std::shared_ptr<ResponseSipEvent> &ev);
virtual bool shouldFinish();
private:
static void sOnAcceptanceTimer(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg);
void acceptMessage();
......
......@@ -578,8 +578,8 @@ void ModuleRouter::onRequest(shared_ptr<RequestSipEvent> &ev) {
// Handle SipEvent associated with a Stateful transaction
if (sip->sip_request->rq_method==sip_method_cancel){
if (ForkContext::processCancel(ev))
return;
ForkContext::processCancel(ev);
return;
}
// Don't route registers
......
......@@ -242,18 +242,17 @@ void ModuleToolbox::cleanAndPrependRoute(su_home_t *home, Agent *ag, msg_t *msg,
}
void ModuleToolbox::cleanAndPrependRoutable(su_home_t *home, Agent *ag, msg_t *msg, sip_t *sip, const std::list<std::string> &routes){
// removes top route headers if they matches us
while (sip->sip_route != NULL && ag->isUs(sip->sip_route->r_url)) {
sip_route_remove(msg, sip);
}
SLOGD << "Removed top route headers";
for (auto it=routes.crbegin(); it != routes.crend(); ++it) {
sip_route_t *r = sip_route_format(home, "%s", it->c_str());
if (prependNewRoutable(msg, sip, sip->sip_route, r)) {
SLOGD << "Prepended routable " << *it;
}
}
// removes top route headers if they matches us
while (sip->sip_route != NULL && ag->isUs(sip->sip_route->r_url)) {
sip_route_remove(msg, sip);
}
SLOGD << "Removed top route headers";
}
url_t *ModuleToolbox::urlFromTportName(su_home_t *home, const tp_name_t *name){
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment