Commit 9c4b018b authored by Simon Morlat's avatar Simon Morlat

rewrite fork contexts.

parent 9e88cfe4
...@@ -746,14 +746,6 @@ void Agent::injectResponseEvent(shared_ptr<ResponseSipEvent> ev) { ...@@ -746,14 +746,6 @@ void Agent::injectResponseEvent(shared_ptr<ResponseSipEvent> ev) {
doSendEvent(ev, it, mModules.end()); doSendEvent(ev, it, mModules.end());
} }
void Agent::sendTransactionEvent(shared_ptr<TransactionEvent> ev) {
SLOGD << "Propagating new Transaction Event " << ev->transaction.get()
<< " " << ev->getKindName();
list<Module*>::iterator it;
for (it = mModules.begin(); it != mModules.end(); ++it) {
(*it)->processTransactionEvent(ev);
}
}
/** /**
* This is a dangerous function when called at the wrong time. * This is a dangerous function when called at the wrong time.
......
...@@ -156,8 +156,6 @@ class Agent: public IncomingAgent, public OutgoingAgent, public std::enable_shar ...@@ -156,8 +156,6 @@ class Agent: public IncomingAgent, public OutgoingAgent, public std::enable_shar
Module *findModule(const std::string &modname) const; Module *findModule(const std::string &modname) const;
int onIncomingMessage(msg_t *msg, const sip_t *sip); int onIncomingMessage(msg_t *msg, const sip_t *sip);
nth_engine_t* getHttpEngine() {return mHttpEngine; } nth_engine_t* getHttpEngine() {return mHttpEngine; }
protected:
void sendTransactionEvent(std::shared_ptr<TransactionEvent> ev);
private: private:
virtual void send(const std::shared_ptr<MsgSip> &msg, url_string_t const *u, tag_type_t tag, tag_value_t value, ...); virtual void send(const std::shared_ptr<MsgSip> &msg, url_string_t const *u, tag_type_t tag, tag_value_t value, ...);
virtual void reply(const std::shared_ptr<MsgSip> &msg, int status, char const *phrase, tag_type_t tag, tag_value_t value, ...); virtual void reply(const std::shared_ptr<MsgSip> &msg, int status, char const *phrase, tag_type_t tag, tag_value_t value, ...);
......
...@@ -133,6 +133,14 @@ void SipEvent::restartProcessing() { ...@@ -133,6 +133,14 @@ void SipEvent::restartProcessing() {
} }
} }
std::shared_ptr<IncomingTransaction> SipEvent::getIncomingTransaction(){
return dynamic_pointer_cast<IncomingTransaction>(getIncomingAgent());
}
std::shared_ptr<OutgoingTransaction> SipEvent::getOutgoingTransaction(){
return dynamic_pointer_cast<OutgoingTransaction>(getOutgoingAgent());
}
RequestSipEvent::RequestSipEvent(shared_ptr<IncomingAgent> incomingAgent, RequestSipEvent::RequestSipEvent(shared_ptr<IncomingAgent> incomingAgent,
const shared_ptr<MsgSip> &msgSip, const shared_ptr<MsgSip> &msgSip,
shared_ptr<tport_t> inTport shared_ptr<tport_t> inTport
......
...@@ -147,6 +147,8 @@ public: ...@@ -147,6 +147,8 @@ public:
} }
void setEventLog(const std::shared_ptr<EventLog> & log); void setEventLog(const std::shared_ptr<EventLog> & log);
void flushLog();/*to be used exceptionally when an eventlog needs to be flushed immediately, for example because you need to submit a new one.*/ void flushLog();/*to be used exceptionally when an eventlog needs to be flushed immediately, for example because you need to submit a new one.*/
std::shared_ptr<IncomingTransaction> getIncomingTransaction();
std::shared_ptr<OutgoingTransaction> getOutgoingTransaction();
protected: protected:
Module *mCurrModule; Module *mCurrModule;
std::shared_ptr<MsgSip> mMsgSip; std::shared_ptr<MsgSip> mMsgSip;
......
...@@ -24,9 +24,12 @@ ...@@ -24,9 +24,12 @@
using namespace ::std; using namespace ::std;
ForkBasicContext::ForkBasicContext(Agent *agent, const std::shared_ptr<RequestSipEvent> &event, shared_ptr<ForkContextConfig> cfg, ForkContextListener* listener) : ForkBasicContext::ForkBasicContext(Agent *agent, const std::shared_ptr<RequestSipEvent> &event, shared_ptr<ForkContextConfig> cfg, ForkContextListener* listener) :
ForkContext(agent, event,cfg,listener), mDeliveredCount(0) { ForkContext(agent, event,cfg,listener) {
LOGD("New ForkBasicContext %p", this); LOGD("New ForkBasicContext %p", this);
mDecisionTimer=NULL; mDecisionTimer=NULL;
//start the acceptance timer immediately
mDecisionTimer=su_timer_create(su_root_task(mAgent->getRoot()), 0);
su_timer_set_interval(mDecisionTimer, &ForkBasicContext::sOnDecisionTimer, this, (su_duration_t)20000);
} }
ForkBasicContext::~ForkBasicContext() { ForkBasicContext::~ForkBasicContext() {
...@@ -35,78 +38,39 @@ ForkBasicContext::~ForkBasicContext() { ...@@ -35,78 +38,39 @@ ForkBasicContext::~ForkBasicContext() {
LOGD("Destroy ForkBasicContext %p", this); LOGD("Destroy ForkBasicContext %p", this);
} }
void ForkBasicContext::forward(const shared_ptr<ResponseSipEvent> &ev) { void ForkBasicContext::onResponse(const shared_ptr<BranchInfo> &br, const shared_ptr<ResponseSipEvent> &event) {
sip_t *sip = ev->getMsgSip()->getSip(); int code=br->getStatus();
if (code>=200){
if (sip->sip_status->st_status >= 200 ) { if (code<300){
if (mDeliveredCount>1){ forwardResponse(br);
/*should only transfer one response*/ if (mDecisionTimer){
ev->setIncomingAgent(shared_ptr<IncomingAgent>()); su_timer_destroy(mDecisionTimer);
} mDecisionTimer=NULL;
mDeliveredCount++; }
checkFinished(); setFinished();
} }else{
} if (allBranchesAnswered()){
finishIncomingTransaction();
void ForkBasicContext::checkFinished(){ }
return ForkContext::checkFinished();
}
void ForkBasicContext::onRequest(const shared_ptr<IncomingTransaction> &transaction, shared_ptr<RequestSipEvent> &event) {
}
void ForkBasicContext::store(shared_ptr<ResponseSipEvent> &event) {
bool best = true;
if (mBestResponse != NULL) {
if (mBestResponse->getMsgSip()->getSip()->sip_status->st_status < event->getMsgSip()->getSip()->sip_status->st_status) {
best = false;
}
}
// Save
if (best) {
mBestResponse = make_shared<ResponseSipEvent>(event); // Copy event
mBestResponse->suspendProcessing();
}
// Don't forward
event->setIncomingAgent(shared_ptr<IncomingAgent>());
}
void ForkBasicContext::onResponse(const shared_ptr<OutgoingTransaction> &transaction, shared_ptr<ResponseSipEvent> &event) {
event->setIncomingAgent(mIncoming);
const shared_ptr<MsgSip> &ms = event->getMsgSip();
sip_t *sip = ms->getSip();
if (sip != NULL && sip->sip_status != NULL) {
LOGD("Fork: outgoingCallback %d", sip->sip_status->st_status);
if (sip->sip_status->st_status > 100 && sip->sip_status->st_status < 300) {
forward(event);
} else {
store(event);
} }
} }
} }
void ForkBasicContext::finishIncomingTransaction(){ void ForkBasicContext::finishIncomingTransaction(){
if (mIncoming != NULL && mDeliveredCount==0) {
if (mBestResponse == NULL) {
// Create response
shared_ptr<MsgSip> msgsip(mIncoming->createResponse(SIP_408_REQUEST_TIMEOUT));
shared_ptr<ResponseSipEvent> ev(new ResponseSipEvent(dynamic_pointer_cast<OutgoingAgent>(mAgent->shared_from_this()), msgsip));
ev->setIncomingAgent(mIncoming);
mAgent->sendResponseEvent(ev);
} else {
mAgent->injectResponseEvent(mBestResponse);
}
}
if (mDecisionTimer){ if (mDecisionTimer){
su_timer_destroy(mDecisionTimer); su_timer_destroy(mDecisionTimer);
mDecisionTimer=NULL; mDecisionTimer=NULL;
} }
mBestResponse.reset(); shared_ptr<BranchInfo> best=findBestBranch(sUrgentCodes);
mIncoming.reset(); if (best==NULL) {
// Create response
shared_ptr<MsgSip> msgsip(mIncoming->createResponse(SIP_408_REQUEST_TIMEOUT));
shared_ptr<ResponseSipEvent> ev(new ResponseSipEvent(dynamic_pointer_cast<OutgoingAgent>(mAgent->shared_from_this()), msgsip));
forwardResponse(ev);
}else{
forwardResponse(best);
}
setFinished();
} }
void ForkBasicContext::onDecisionTimer(){ void ForkBasicContext::onDecisionTimer(){
...@@ -118,30 +82,7 @@ void ForkBasicContext::sOnDecisionTimer(su_root_magic_t* magic, su_timer_t* t, s ...@@ -118,30 +82,7 @@ void ForkBasicContext::sOnDecisionTimer(su_root_magic_t* magic, su_timer_t* t, s
static_cast<ForkBasicContext*>(arg)->onDecisionTimer(); static_cast<ForkBasicContext*>(arg)->onDecisionTimer();
} }
bool ForkBasicContext::onNewRegister(const url_t *url, const string &uid){
void ForkBasicContext::onNew(const shared_ptr<IncomingTransaction> &transaction) {
ForkContext::onNew(transaction);
//start the acceptance timer immediately
mDecisionTimer=su_timer_create(su_root_task(mAgent->getRoot()), 0);
su_timer_set_interval(mDecisionTimer, &ForkBasicContext::sOnDecisionTimer, this, (su_duration_t)20000);
}
void ForkBasicContext::onDestroy(const shared_ptr<IncomingTransaction> &transaction) {
ForkContext::onDestroy(transaction);
}
void ForkBasicContext::onNew(const shared_ptr<OutgoingTransaction> &transaction) {
ForkContext::onNew(transaction);
}
void ForkBasicContext::onDestroy(const shared_ptr<OutgoingTransaction> &transaction) {
if (mOutgoings.size() == 1) {
finishIncomingTransaction();
}
ForkContext::onDestroy(transaction);
}
bool ForkBasicContext::onNewRegister(const sip_contact_t *ctt){
return false; return false;
} }
...@@ -28,22 +28,13 @@ ...@@ -28,22 +28,13 @@
class ForkBasicContext: public ForkContext { class ForkBasicContext: public ForkContext {
private: private:
int mDeliveredCount;
std::shared_ptr<ResponseSipEvent> mBestResponse;
void forward(const std::shared_ptr<ResponseSipEvent> &ev);
void store(std::shared_ptr<ResponseSipEvent> &event);
su_timer_t *mDecisionTimer; /*timeout after which an answer must be sent through the incoming transaction even if no success response was received on the outgoing transactions*/ su_timer_t *mDecisionTimer; /*timeout after which an answer must be sent through the incoming transaction even if no success response was received on the outgoing transactions*/
public: public:
ForkBasicContext(Agent *agent, const std::shared_ptr<RequestSipEvent> &event, std::shared_ptr<ForkContextConfig> cfg, ForkContextListener* listener); ForkBasicContext(Agent *agent, const std::shared_ptr<RequestSipEvent> &event, std::shared_ptr<ForkContextConfig> cfg, ForkContextListener* listener);
virtual ~ForkBasicContext(); virtual ~ForkBasicContext();
virtual void onNew(const std::shared_ptr<IncomingTransaction> &transaction); protected:
virtual void onRequest(const std::shared_ptr<IncomingTransaction> &transaction, std::shared_ptr<RequestSipEvent> &event); virtual void onResponse(const std::shared_ptr<BranchInfo> &br, const std::shared_ptr<ResponseSipEvent> &event);
virtual void onDestroy(const std::shared_ptr<IncomingTransaction> &transaction); virtual bool onNewRegister(const url_t *url, const std::string &uid);
virtual void onNew(const std::shared_ptr<OutgoingTransaction> &transaction);
virtual void onResponse(const std::shared_ptr<OutgoingTransaction> &transaction, std::shared_ptr<ResponseSipEvent> &event);
virtual void onDestroy(const std::shared_ptr<OutgoingTransaction> &transaction);
virtual bool onNewRegister(const sip_contact_t *ctt);
virtual void checkFinished();
private: private:
void finishIncomingTransaction(); void finishIncomingTransaction();
static void sOnDecisionTimer(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg); static void sOnDecisionTimer(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg);
......
This diff is collapsed.
...@@ -27,42 +27,34 @@ ...@@ -27,42 +27,34 @@
class ForkCallContext: public ForkContext { class ForkCallContext: public ForkContext {
private: private:
std::shared_ptr<ResponseSipEvent> mBestResponse;
su_timer_t *mShortTimer; //optionaly used to send retryable responses su_timer_t *mShortTimer; //optionaly used to send retryable responses
su_timer_t *mPushTimer; //used to track push responses su_timer_t *mPushTimer; //used to track push responses
int mLastResponseCodeSent;
bool mCancelled;
std::list<int> mForwardResponses;
std::shared_ptr<CallLog> mLog; std::shared_ptr<CallLog> mLog;
bool mCancelled;
public: public:
ForkCallContext(Agent *agent, const std::shared_ptr<RequestSipEvent> &event, std::shared_ptr<ForkContextConfig> cfg, ForkContextListener* listener); ForkCallContext(Agent *agent, const std::shared_ptr<RequestSipEvent> &event, std::shared_ptr<ForkContextConfig> cfg, ForkContextListener* listener);
~ForkCallContext(); ~ForkCallContext();
virtual void onNew(const std::shared_ptr<IncomingTransaction> &transaction);
virtual void onRequest(const std::shared_ptr<IncomingTransaction> &transaction, std::shared_ptr<RequestSipEvent> &event);
virtual void onDestroy(const std::shared_ptr<IncomingTransaction> &transaction);
virtual void onNew(const std::shared_ptr<OutgoingTransaction> &transaction);
virtual void onResponse(const std::shared_ptr<OutgoingTransaction> &transaction, std::shared_ptr<ResponseSipEvent> &event);
virtual void onDestroy(const std::shared_ptr<OutgoingTransaction> &transaction);
virtual void checkFinished();
virtual bool onNewRegister(const sip_contact_t *ctt);
void sendRinging(); void sendRinging();
bool isCompleted()const; bool isCompleted()const;
void onPushInitiated(const std::string &key); void onPushInitiated(const std::string &key);
void onPushError(const std::string &key, const std::string &errormsg); void onPushError(const std::string &key, const std::string &errormsg);
protected:
virtual void onResponse(const std::shared_ptr<BranchInfo> &br, const std::shared_ptr<ResponseSipEvent> &event);
virtual bool onNewRegister(const url_t *url, const std::string &uid);
virtual void cancel();
private: private:
bool isRetryableOrUrgent(int code); const int *getUrgentCodes();
void onShortTimer(); void onShortTimer();
void onPushTimer(); void onPushTimer();
void cancel(); void onLateTimeout();
void cancelOthers(const std::shared_ptr<OutgoingTransaction> &transaction = std::shared_ptr<OutgoingTransaction>()); void cancelOthers(const std::shared_ptr<BranchInfo> &br);
void decline(const std::shared_ptr<OutgoingTransaction> &transaction, std::shared_ptr<ResponseSipEvent> &ev);
void forward(const std::shared_ptr<ResponseSipEvent> &ev, bool force = false);
void store(std::shared_ptr<ResponseSipEvent> &ev);
void sendResponse(std::shared_ptr<ResponseSipEvent> ev, bool inject);
void logResponse(const std::shared_ptr<ResponseSipEvent> &ev); void logResponse(const std::shared_ptr<ResponseSipEvent> &ev);
static void sOnShortTimer(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg); static void sOnShortTimer(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg);
static void sOnPushTimer(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg); static void sOnPushTimer(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg);
int mActivePushes; int mActivePushes;
static const int sUrgentCodesWithout603[];
}; };
#endif //forkcallcontext_hh #endif //forkcallcontext_hh
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
using namespace ::std; using namespace ::std;
const int ForkContext::sUrgentCodes[]={401,407,415,420,484,488,606,603,0};
ForkContextConfig::ForkContextConfig() : mDeliveryTimeout(0),mUrgentTimeout(5), ForkContextConfig::ForkContextConfig() : mDeliveryTimeout(0),mUrgentTimeout(5),
mForkLate(false),mForkOneResponse(false), mForkNoGlobalDecline(false), mForkLate(false),mForkOneResponse(false), mForkNoGlobalDecline(false),
...@@ -28,49 +29,39 @@ ForkContextConfig::ForkContextConfig() : mDeliveryTimeout(0),mUrgentTimeout(5), ...@@ -28,49 +29,39 @@ ForkContextConfig::ForkContextConfig() : mDeliveryTimeout(0),mUrgentTimeout(5),
} }
void ForkContext::__timer_callback(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg){ void ForkContext::__timer_callback(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg){
(static_cast<ForkContext*>(arg))->onLateTimeout(); (static_cast<ForkContext*>(arg))->processLateTimeout();
} }
ForkContext::ForkContext(Agent *agent, const std::shared_ptr<RequestSipEvent> &event, shared_ptr<ForkContextConfig> cfg, ForkContextListener* listener) : ForkContext::ForkContext(Agent *agent, const std::shared_ptr<RequestSipEvent> &event, shared_ptr<ForkContextConfig> cfg, ForkContextListener* listener) :
mListener(listener), mListener(listener),
mAgent(agent), mAgent(agent),
mEvent(make_shared<RequestSipEvent>(event)), mEvent(make_shared<RequestSipEvent>(event)),
mIncoming(),
mOutgoings(),
mDestinationUris(),
mCfg(cfg), mCfg(cfg),
mLateTimer(NULL), mLateTimer(NULL),
mLateTimerExpired(false) { mLateTimerExpired(false) {
su_home_init(&mHome); su_home_init(&mHome);
init();
} }
void ForkContext::checkFinished(){ void ForkContext::onLateTimeout(){
bool finished=false;
if (!mIncoming && mOutgoings.empty()){
if (mLateTimer){
finished=mLateTimerExpired;
}else finished=true;
}
if (finished)
setFinished();
} }
void ForkContext::onLateTimeout(){ void ForkContext::processLateTimeout() {
LOGD("ForkContext: late timer expired."); su_timer_destroy(mLateTimer);
shared_ptr<ForkContext> me(shared_from_this()); //this is to keep the object alive at least the time of the timer callback.
//Indeed sofia does not hold a refcount to the ForkContext. During checkFinished() the refcount my drop to zero, but we need to the object
//to be kept alive until checkFinished() returns.
mLateTimerExpired=true; mLateTimerExpired=true;
checkFinished(); onLateTimeout();
setFinished();
} }
struct dest_finder{ struct dest_finder{
dest_finder(const url_t *ctt) { dest_finder(const url_t *ctt) {
cttport = url_port(ctt); cttport = url_port(ctt);
ctthost = ctt->url_host; ctthost = ctt->url_host;
// don't care about transport // don't care about transport
} }
bool operator()(const url_t *dest){ bool operator()(const shared_ptr<BranchInfo> &br){
const url_t *dest=br->mTransaction->getRequestUri();
return 0 == strcmp(url_port(dest), cttport) return 0 == strcmp(url_port(dest), cttport)
&& 0 == strcmp(dest->url_host, ctthost); && 0 == strcmp(dest->url_host, ctthost);
} }
...@@ -78,11 +69,81 @@ struct dest_finder{ ...@@ -78,11 +69,81 @@ struct dest_finder{
const char *cttport; const char *cttport;
}; };
struct uid_finder{
uid_finder(const std::string &uid) : mUid(uid){};
bool operator()(const shared_ptr<BranchInfo> &br){
return mUid==br->mUid;
}
const string mUid;
};
std::shared_ptr<BranchInfo> ForkContext::findBranchByUid(const std::string &uid){
auto it=find_if(mBranches.begin(),mBranches.end(),uid_finder(uid));
if (it!=mBranches.end()) return *it;
return shared_ptr<BranchInfo>();
}
std::shared_ptr<BranchInfo> ForkContext::findBranchByDest(const url_t *dest){
auto it=find_if(mBranches.begin(),mBranches.end(),dest_finder(dest));
if (it!=mBranches.end()) return *it;
return shared_ptr<BranchInfo>();
}
bool ForkContext::isUrgent(int code, const int urgentCodes[]){
int i;
for(i=0;urgentCodes[i]!=0;i++){
if (code==urgentCodes[i]) return true;
}
return false;
}
std::shared_ptr<BranchInfo> ForkContext::findBestBranch(const int urgentCodes[]){
shared_ptr<BranchInfo> best;
for (auto it=mBranches.begin();it!=mBranches.end();++it){
int code=(*it)->getStatus();
if (code>=200){
if (best==NULL){
best=(*it);
}else{
if ((*it)->getStatus()/100 < best->getStatus()/100 )
best=(*it);
}
}
}
if (best==NULL) return shared_ptr<BranchInfo>();
if (urgentCodes){
for (auto it=mBranches.begin();it!=mBranches.end();++it){
int code=(*it)->getStatus();
if (isUrgent(code,urgentCodes)){
best=(*it);
break;
}
}
}
return best;
}
bool ForkContext::allBranchesAnswered()const{
for (auto it=mBranches.begin();it!=mBranches.end();++it){
if ((*it)->getStatus()<200) return false;
}
return true;
}
void ForkContext::removeBranch(const shared_ptr<BranchInfo> &br){
mBranches.remove(br);
br->clear();
}
const std::list<std::shared_ptr<BranchInfo>> & ForkContext::getBranches(){
return mBranches;
}
//this implementation looks for already pending or failed transactions and then rejects handling of a new one that would already been tried. //this implementation looks for already pending or failed transactions and then rejects handling of a new one that would already been tried.
bool ForkContext::onNewRegister(const sip_contact_t* ctt){ bool ForkContext::onNewRegister(const url_t* url, const string & uid){
auto it=find_if(mDestinationUris.begin(),mDestinationUris.end(),dest_finder(ctt->m_url)); shared_ptr<BranchInfo> br=findBranchByDest(url);
if (it!=mDestinationUris.end()){ if (br){
LOGD("ForkContext %p: onNewRegister(): destination already handled.",this); LOGD("ForkContext %p: onNewRegister(): destination already handled.",this);
return false; return false;
} }
...@@ -90,8 +151,8 @@ bool ForkContext::onNewRegister(const sip_contact_t* ctt){ ...@@ -90,8 +151,8 @@ bool ForkContext::onNewRegister(const sip_contact_t* ctt){
} }
void ForkContext::onNew(const shared_ptr<IncomingTransaction> &transaction) { void ForkContext::init() {
mIncoming = transaction; mIncoming = mEvent->createIncomingTransaction();
if (mCfg->mForkLate && mLateTimer==NULL){ if (mCfg->mForkLate && mLateTimer==NULL){
/*this timer is for when outgoing transaction all die prematuraly, we still need to wait that late register arrive.*/ /*this timer is for when outgoing transaction all die prematuraly, we still need to wait that late register arrive.*/
mLateTimer=su_timer_create(su_root_task(mAgent->getRoot()), 0); mLateTimer=su_timer_create(su_root_task(mAgent->getRoot()), 0);
...@@ -99,19 +160,45 @@ void ForkContext::onNew(const shared_ptr<IncomingTransaction> &transaction) { ...@@ -99,19 +160,45 @@ void ForkContext::onNew(const shared_ptr<IncomingTransaction> &transaction) {
} }
} }
void ForkContext::onDestroy(const shared_ptr<IncomingTransaction> &transaction) { void ForkContext::addBranch(const shared_ptr<RequestSipEvent> &ev, const string &uid) {
mIncoming.reset(); shared_ptr<OutgoingTransaction> ot=ev->createOutgoingTransaction();
checkFinished(); shared_ptr<BranchInfo> br=createBranchInfo();
br->mRequest=ev;
br->mTransaction=ot;
br->mUid=uid;
ot->setProperty("BranchInfo",br);
mBranches.push_back(br);
onNewBranch(br);
} }
void ForkContext::onNew(const shared_ptr<OutgoingTransaction> &transaction) { bool ForkContext::processCancel ( const std::shared_ptr< RequestSipEvent >& ev ) {
mOutgoings.push_back(transaction); shared_ptr<IncomingTransaction> transaction = dynamic_pointer_cast<IncomingTransaction>(ev->getIncomingAgent());
mDestinationUris.push_back(url_hdup(&mHome,transaction->getRequestUri())); if (ev->getMsgSip()->getSip()->sip_request->rq_method==sip_method_cancel){
shared_ptr<BranchInfo> binfo=transaction->getProperty<BranchInfo>("BranchInfo");
binfo->mForkCtx->cancel();
return true;
}
return false;
} }
void ForkContext::onDestroy(const shared_ptr<OutgoingTransaction> &transaction) {
mOutgoings.remove(transaction); bool ForkContext::processResponse ( const shared_ptr< ResponseSipEvent >& ev ) {
checkFinished(); shared_ptr<OutgoingTransaction> transaction = dynamic_pointer_cast<OutgoingTransaction>(ev->getOutgoingAgent());
if (transaction != NULL) {
shared_ptr<BranchInfo> binfo=transaction->getProperty<BranchInfo>("BranchInfo");
if (binfo){
auto copyEv=make_shared<ResponseSipEvent>(ev); //make a copy
copyEv->suspendProcessing();
binfo->mLastResponse=copyEv;
binfo->mForkCtx->onResponse(binfo,copyEv);
if (copyEv->isSuspended()){
//let the event go through but it will not be sent*/
ev->setIncomingAgent(NULL);
}
return true;
}
}
return false;
} }
...@@ -133,7 +220,74 @@ void ForkContext::setFinished(){ ...@@ -133,7 +220,74 @@ void ForkContext::setFinished(){
//force reference to be loosed immediately, to avoid circular dependencies. //force reference to be loosed immediately, to avoid circular dependencies.
mEvent.reset(); mEvent.reset();
mIncoming.reset(); mIncoming.reset();
mOutgoings.clear(); for_each(mBranches.begin(),mBranches.end(),mem_fn(&BranchInfo::clear));
mBranches.clear();
mListener->onForkContextFinished(shared_from_this()); mListener->onForkContextFinished(shared_from_this());
} }
void ForkContext::onNewBranch ( const std::shared_ptr<BranchInfo> &br ) {
}
void ForkContext::cancel(){
}
std::shared_ptr< BranchInfo > ForkContext::createBranchInfo() {
return make_shared<BranchInfo>(shared_from_this());
}
//called by implementors to request the forwarding of a response from this branch, regardless of whether it was retained previously or not*/
std::shared_ptr<ResponseSipEvent> ForkContext::forwardResponse(const std::shared_ptr<BranchInfo> &br){
if (br->mLastResponse){
if (mIncoming){
int code=br->mLastResponse->getMsgSip()->getSip()->sip_status->st_status;
forwardResponse(br->mLastResponse);
if (code>=200) {
br->mTransaction.reset();
}
return br->mLastResponse;
}else br->mLastResponse->setIncomingAgent(NULL);
}else{
LOGE("ForkContext::forwardResponse(): no response received on this branch");
}
return std::shared_ptr<ResponseSipEvent>();
}
std::shared_ptr<ResponseSipEvent> ForkContext::forwardResponse(const std::shared_ptr<ResponseSipEvent> &ev){
if (mIncoming){
int code=ev->getMsgSip()->getSip()->sip_status->st_status;
ev->setIncomingAgent(mIncoming);
mLastResponseSent=ev;
if (ev->isSuspended()){
mAgent->injectResponseEvent(ev);
}else{
mAgent->sendResponseEvent(ev);
}
if (code>=200){
mIncoming.reset();
}
return ev;
}
return std::shared_ptr<ResponseSipEvent>();
}
int ForkContext::getLastResponseCode()const{
if (mLastResponseSent) return mLastResponseSent->getMsgSip()->getSip()->sip_status->st_status;
return 0;
}
void BranchInfo::clear(){
if (mTransaction){
mTransaction->removeProperty("BranchInfo");
mTransaction.reset();
}
mRequest.reset();
mLastResponse.reset();
mForkCtx.reset();
}
BranchInfo::~BranchInfo(){
clear();
}
...@@ -43,43 +43,89 @@ public: ...@@ -43,43 +43,89 @@ public:
virtual void onForkContextFinished(std::shared_ptr<ForkContext> ctx)=0; virtual void onForkContextFinished(std::shared_ptr<ForkContext> ctx)=0;
}; };
class BranchInfo{
public:
BranchInfo(std::shared_ptr<ForkContext> ctx) : mForkCtx(ctx){
}
virtual ~BranchInfo();
virtual void clear();
int getStatus(){
if (mLastResponse)
return mLastResponse->getMsgSip()->getSip()->sip_status->st_status;
return 0;
}
std::shared_ptr<ForkContext> mForkCtx;
std::string mUid;
std::shared_ptr<RequestSipEvent> mRequest;
std::shared_ptr<OutgoingTransaction> mTransaction;
std::shared_ptr<ResponseSipEvent> mLastResponse;
};
class ForkContext : public std::enable_shared_from_this<ForkContext>{ class ForkContext : public std::enable_shared_from_this<ForkContext>{
private: private:
static void __timer_callback(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg); static void __timer_callback(su_root_magic_t *magic, su_timer_t *t, su_timer_arg_t *arg);
ForkContextListener * mListener; ForkContextListener * mListener;