registrardb-redis-sofia-event.h 3.51 KB
Newer Older
1
/*
2 3
    Flexisip, a flexible SIP proxy server with media capabilities.
    Copyright (C) 2010-2015  Belledonne Communications SARL, All rights reserved.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU Affero General Public License as
    published by the Free Software Foundation, either version 3 of the
    License, or (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Affero General Public License for more details.

    You should have received a copy of the GNU Affero General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

19
#pragma once
20

21
#include <flexisip/common.hh>
22 23 24 25 26 27 28 29 30
#include <hiredis/hiredis.h>
#include <hiredis/async.h>

#ifndef SU_WAIT_H
#define SU_WAKEUP_ARG_T redisSofiaEvents
#define SU_ROOT_MAGIC_T redisAsyncContext
#include <sofia-sip/su_wait.h>
#endif

31 32 33 34 35
namespace flexisip {

struct redisSofiaEvents;
typedef struct redisSofiaEvents redisSofiaEvents;

36
typedef struct redisSofiaEvents {
37 38 39 40 41
	redisAsyncContext *context;
	su_root_t *root;
	su_wait_t wait;
	int index;
	int eventmask;
42 43 44
} redisSofiaEvents;

static int redisSofiaEvent(su_root_magic_t *magic, su_wait_t *wait, su_wakeup_arg_t *e) {
45 46 47 48 49
	if (wait->revents & SU_WAIT_IN)
		redisAsyncHandleRead(((redisSofiaEvents*)e)->context);
	if (wait->revents & SU_WAIT_OUT)
		redisAsyncHandleWrite(((redisSofiaEvents*)e)->context);
	return 0;
50 51 52
}

static void addWaitMask(void *privdata, int mask) {
53
	redisSofiaEvents *e = (redisSofiaEvents*)privdata;
54
	redisContext *c = &(e->context->c);
55 56
	e->eventmask |= mask;
	su_root_eventmask(e->root, e->index, c->fd, e->eventmask);
57
}
58

59
static void delWaitMask(void *privdata, int mask) {
60
	redisSofiaEvents *e = (redisSofiaEvents*)privdata;
61
	redisContext *c = &(e->context->c);
62 63
	e->eventmask &= ~mask;
	su_root_eventmask(e->root, e->index, c->fd, e->eventmask);
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
}

static void redisSofiaAddRead(void *privdata) {
	addWaitMask(privdata, SU_WAIT_IN);
}

static void redisSofiaDelRead(void *privdata) {
	delWaitMask(privdata, SU_WAIT_IN);
}

static void redisSofiaAddWrite(void *privdata) {
	addWaitMask(privdata, SU_WAIT_OUT);
}

static void redisSofiaDelWrite(void *privdata) {
	delWaitMask(privdata, SU_WAIT_OUT);
}

// Note: async.h requires this method to be idempotent; it is not the case.
static void redisSofiaCleanup(void *privdata) {
84 85 86 87
	redisSofiaEvents *e = (redisSofiaEvents*)privdata;
	su_root_deregister(e->root, e->index);
	LOGI("Redis sofia event cleaned %p", e->context);
	free(e);
88 89 90
}

static int redisSofiaAttach(redisAsyncContext *ac, su_root_t *root) {
91 92 93 94 95 96 97 98 99 100 101
	redisContext *c = &(ac->c);
	redisSofiaEvents *e;

	/* Nothing should be attached when something is already attached */
	if (ac->ev.data != NULL)
		return REDIS_ERR;

	/* Create container for context and r/w events */
	e = (redisSofiaEvents *)malloc(sizeof(*e));
	e->context = ac;
	e->root = root;
102
	e->eventmask = 0;
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118

	/* Register functions to start/stop listening for events */
	ac->ev.addRead = redisSofiaAddRead;
	ac->ev.delRead = redisSofiaDelRead;
	ac->ev.addWrite = redisSofiaAddWrite;
	ac->ev.delWrite = redisSofiaDelWrite;
	ac->ev.cleanup = redisSofiaCleanup;
	ac->ev.data = e;

	/* Initialize and install read/write events */
	if (0 != su_wait_create(&e->wait, c->fd, SU_WAIT_IN | SU_WAIT_OUT)) {
		return REDIS_ERR;
	}
	e->index = su_root_register(root, &e->wait, redisSofiaEvent, e, su_pri_normal);

	return REDIS_OK;
119
}
120 121

}