eventqueue.c 6.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
mediastreamer2 library - modular sound and video processing and streaming
Copyright (C) 2010  Simon MORLAT (simon.morlat@linphone.org)

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
*/

Simon Morlat's avatar
Simon Morlat committed
20

21 22 23 24 25 26 27
#include "mediastreamer2/mseventqueue.h"
#include "mediastreamer2/msfilter.h"

#ifndef MS_EVENT_BUF_SIZE
#define MS_EVENT_BUF_SIZE 8192
#endif

28 29 30 31 32 33
typedef enum {
	OnlySynchronous,
	OnlyAsynchronous,
	Both
}InvocationMode;

34
static void ms_filter_invoke_callbacks(MSFilter **f, unsigned int id, void *arg, InvocationMode synchronous_mode);
35 36 37 38 39 40 41 42 43

struct _MSNotifyContext{
	MSFilterNotifyFunc fn;
	void *ud;
	int synchronous;
};

typedef struct _MSNotifyContext MSNotifyContext;

44 45 46 47 48 49 50 51
struct _MSEventQueue{
	ms_mutex_t mutex; /*could be replaced by an atomic counter for freeroom*/
	uint8_t *rptr;
	uint8_t *wptr;
	uint8_t *endptr;
	uint8_t *lim;
	int freeroom;
	int size;
52
	MSFilter *current_notifier;
53 54 55 56
	uint8_t buffer[MS_EVENT_BUF_SIZE];
};

static void write_event(MSEventQueue *q, MSFilter *f, unsigned int ev_id, void *arg){
Simon Morlat's avatar
Simon Morlat committed
57
	int argsize=ev_id & 0xff;
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
	int size=argsize+16;
	uint8_t *nextpos=q->wptr+size;

	if (q->freeroom<size){
		ms_error("Dropped event, no more free space in event buffer !");
		return;
	}
	
	if (nextpos>q->lim){
		/* need to wrap around */
		q->endptr=q->wptr;
		q->wptr=q->buffer;
		nextpos=q->wptr+size;
	}
	*(long*)q->wptr=(long)f;
	*(long*)(q->wptr+8)=(long)ev_id;
	if (argsize>0) memcpy(q->wptr+16,arg,argsize);
	q->wptr=nextpos;
	ms_mutex_lock(&q->mutex);
	q->freeroom-=size;
	ms_mutex_unlock(&q->mutex);
}

81 82 83 84 85 86 87 88 89 90
static int parse_event(uint8_t *rptr,MSFilter **f, unsigned int *id, void **data, int *argsize){
	int evsize;
	*f=(MSFilter *)*(long*)(rptr);
	*id=(unsigned int)*(long*)(rptr+8);
	*argsize=(*id) & 0xff;
	evsize=(*argsize)+16;
	*data=rptr+16;
	return evsize;
}

91 92 93 94 95 96
static bool_t read_event(MSEventQueue *q){
	int available=q->size-q->freeroom;
	if (available>0){
		MSFilter *f;
		unsigned int id;
		void *data;
97 98 99
		int argsize;
		int evsize;
		
100
		evsize=parse_event(q->rptr,&f,&id,&data,&argsize);
101 102 103 104 105
		if (f) {
			q->current_notifier=f;
			ms_filter_invoke_callbacks(&q->current_notifier,id,argsize>0 ? data : NULL, OnlyAsynchronous);
			q->current_notifier=NULL;
		}
106 107 108 109 110 111 112 113 114 115 116 117
		q->rptr+=evsize;
		if (q->rptr>=q->endptr){
			q->rptr=q->buffer;
		}
		ms_mutex_lock(&q->mutex);
		q->freeroom+=evsize;
		ms_mutex_unlock(&q->mutex);
		return TRUE;
	}
	return FALSE;
}

118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
/*clean all events belonging to a MSFilter that is about to be destroyed*/
void ms_event_queue_clean(MSEventQueue *q, MSFilter *destroyed){
	int freeroom=q->freeroom;
	uint8_t *rptr=q->rptr;
	
	while(q->size>freeroom){
		MSFilter *f;
		unsigned int id;
		void *data;
		int argsize;
		int evsize;
		
		evsize=parse_event(rptr,&f,&id,&data,&argsize);
		if (f==destroyed){
			ms_message("Cleaning pending event of MSFilter [%s:%p]",destroyed->desc->name,destroyed);
			*(long*)rptr=0;
		}
		rptr+=evsize;
		
		if (rptr>=q->endptr){
			rptr=q->buffer;
		}
		freeroom+=evsize;
	}
142 143 144
	if (q->current_notifier==destroyed){
		q->current_notifier=NULL;
	}
145 146
}

147 148 149 150 151 152 153 154 155 156 157 158 159
MSEventQueue *ms_event_queue_new(){
	MSEventQueue *q=ms_new0(MSEventQueue,1);
	int bufsize=MS_EVENT_BUF_SIZE;
	ms_mutex_init(&q->mutex,NULL);
	q->lim=q->buffer+bufsize;
	q->freeroom=bufsize;
	q->wptr=q->rptr=q->buffer;
	q->endptr=q->lim;
	q->size=bufsize;
	return q;
}

void ms_event_queue_destroy(MSEventQueue *q){
Simon Morlat's avatar
Simon Morlat committed
160 161 162 163
	/*compatibility code*/
	if (q==ms_factory_get_event_queue(ms_factory_get_fallback())){
		ms_factory_set_event_queue(ms_factory_get_fallback(),NULL);
	}
164 165 166 167 168
	ms_mutex_destroy(&q->mutex);
	ms_free(q);
}

void ms_set_global_event_queue(MSEventQueue *q){
Simon Morlat's avatar
Simon Morlat committed
169
	ms_factory_set_event_queue(ms_factory_get_fallback(),q);
170 171
}

172 173 174 175 176 177 178 179 180
void ms_event_queue_skip(MSEventQueue *q){
	int bufsize=q->size;
	q->lim=q->buffer+bufsize;
	q->freeroom=bufsize;
	q->wptr=q->rptr=q->buffer;
	q->endptr=q->lim;
}


181 182 183 184 185
void ms_event_queue_pump(MSEventQueue *q){
	while(read_event(q)){
	}
}

186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
static MSNotifyContext * ms_notify_context_new(MSFilterNotifyFunc fn, void *ud, bool_t synchronous){
	MSNotifyContext *ctx=ms_new0(MSNotifyContext,1);
	ctx->fn=fn;
	ctx->ud=ud;
	ctx->synchronous=synchronous;
	return ctx;
}

static void ms_notify_context_destroy(MSNotifyContext *obj){
	ms_free(obj);
}

void ms_filter_add_notify_callback(MSFilter *f, MSFilterNotifyFunc fn, void *ud, bool_t synchronous){
	f->notify_callbacks=ms_list_append(f->notify_callbacks,ms_notify_context_new(fn,ud,synchronous));
}

void ms_filter_remove_notify_callback(MSFilter *f, MSFilterNotifyFunc fn, void *ud){
	MSList *elem;
	MSList *found=NULL;
	for(elem=f->notify_callbacks;elem!=NULL;elem=elem->next){
		MSNotifyContext *ctx=(MSNotifyContext*)elem->data;
		if (ctx->fn==fn && ctx->ud==ud){
			found=elem;
			break;
		}
	}
	if (found){
		ms_notify_context_destroy((MSNotifyContext*)found->data);
		f->notify_callbacks=ms_list_remove_link(f->notify_callbacks,found);
	}else ms_warning("ms_filter_remove_notify_callback(filter=%p): no registered callback with fn=%p and ud=%p",f,fn,ud);
}

void ms_filter_clear_notify_callback(MSFilter *f){
	f->notify_callbacks=ms_list_free_with_data(f->notify_callbacks,(void (*)(void*))ms_notify_context_destroy);
}

222
static void ms_filter_invoke_callbacks(MSFilter **f, unsigned int id, void *arg, InvocationMode synchronous_mode){
223
	MSList *elem;
224
	for (elem=(*f)->notify_callbacks;elem!=NULL && *f!=NULL;elem=elem->next){
225 226 227
		MSNotifyContext *ctx=(MSNotifyContext*)elem->data;
		if (synchronous_mode==Both || (synchronous_mode==OnlyAsynchronous && !ctx->synchronous)
			|| (synchronous_mode==OnlySynchronous && ctx->synchronous))
228
			ctx->fn(ctx->ud,*f,id,arg);
229 230 231 232 233 234 235
	}
}

void ms_filter_set_notify_callback(MSFilter *f, MSFilterNotifyFunc fn, void *ud){
	ms_filter_add_notify_callback(f,fn,ud,FALSE);
}

236 237

void ms_filter_notify(MSFilter *f, unsigned int id, void *arg){
238
	if (f->notify_callbacks!=NULL){
Simon Morlat's avatar
Simon Morlat committed
239
		if (f->factory->evq==NULL){
240
			/* synchronous notification */
241
			ms_filter_invoke_callbacks(&f,id,arg,Both);
242
		}else{
243
			ms_filter_invoke_callbacks(&f,id,arg,OnlySynchronous);
Simon Morlat's avatar
Simon Morlat committed
244
			write_event(f->factory->evq,f,id,arg);
245 246 247 248 249 250 251
		}
	}
}

void ms_filter_notify_no_arg(MSFilter *f, unsigned int id){
	ms_filter_notify(f,id,NULL);
}
252

253
void ms_filter_clean_pending_events(MSFilter *f){
Simon Morlat's avatar
Simon Morlat committed
254 255
	if (f->factory->evq)
		ms_event_queue_clean(f->factory->evq,f);
256
}
257