Skip to content
Snippets Groups Projects
Commit a603208f authored by rswindell's avatar rswindell
Browse files

Message queue implementation (uni or bi-directional) for inter-thread

communications.
parent 3ee7e6bd
No related branches found
No related tags found
No related merge requests found
/* msg_queue.c */
/* Uni or Bi-directional FIFO message queue */
/* $Id$ */
/****************************************************************************
* @format.tab-size 4 (Plain Text/Source Code File Header) *
* @format.use-tabs true (see http://www.synchro.net/ptsc_hdr.html) *
* *
* Copyright 2004 Rob Swindell - http://www.synchro.net/copyright.html *
* *
* This library is free software; you can redistribute it and/or *
* modify it under the terms of the GNU Lesser General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* See the GNU Lesser General Public License for more details: lgpl.txt or *
* http://www.fsf.org/copyleft/lesser.html *
* *
* Anonymous FTP access to the most recent released source is available at *
* ftp://vert.synchro.net, ftp://cvs.synchro.net and ftp://ftp.synchro.net *
* *
* Anonymous CVS access to the development source and modification history *
* is available at cvs.synchro.net:/cvsroot/sbbs, example: *
* cvs -d :pserver:anonymous@cvs.synchro.net:/cvsroot/sbbs login *
* (just hit return, no password is necessary) *
* cvs -d :pserver:anonymous@cvs.synchro.net:/cvsroot/sbbs checkout src *
* *
* For Synchronet coding style and modification guidelines, see *
* http://www.synchro.net/source.html *
* *
* You are encouraged to submit any modifications (preferably in Unix diff *
* format) via e-mail to mods@synchro.net *
* *
* Note: If this box doesn't appear square, then you need to fix your tabs. *
****************************************************************************/
#include <stdlib.h> /* malloc */
#include <string.h> /* memset */
#include "msg_queue.h"
msg_queue_t* msgQueueInit(msg_queue_t* q, long flags)
{
if(q==NULL) {
if((q=(msg_queue_t*)malloc(sizeof(msg_queue_t)))==NULL)
return(NULL);
flags |= MSG_QUEUE_MALLOC;
}
memset(q,0,sizeof(msg_queue_t));
q->flags = flags;
q->refs = 1;
q->owner_thread_id = GetCurrentThreadId();
if(q->flags&MSG_QUEUE_BIDIR)
listInit(&q->in,LINK_LIST_DONT_FREE|LINK_LIST_SEMAPHORE);
listInit(&q->out,LINK_LIST_DONT_FREE|LINK_LIST_SEMAPHORE);
return(q);
}
BOOL msgQueueFree(msg_queue_t* q)
{
if(q==NULL)
return(FALSE);
listFree(&q->in);
listFree(&q->out);
if(q->flags&MSG_QUEUE_MALLOC)
free(q);
return(TRUE);
}
long msgQueueAttach(msg_queue_t* q)
{
if(q==NULL)
return(-1);
q->refs++;
return(q->refs);
}
long msgQueueDetach(msg_queue_t* q)
{
int refs;
if(q==NULL || q->refs<1)
return(-1);
if((refs=--q->refs)==0)
msgQueueFree(q);
return(refs);
}
void* msgQueueSetPrivateData(msg_queue_t* q, void* p)
{
void* old;
if(q==NULL)
return(NULL);
old=q->private_data;
q->private_data=p;
return(old);
}
void* msgQueueGetPrivateData(msg_queue_t* q)
{
if(q==NULL)
return(NULL);
return(q->private_data);
}
static link_list_t* msgQueueReadList(msg_queue_t* q)
{
if(q==NULL)
return(NULL);
if((q->flags&MSG_QUEUE_BIDIR)
&& q->owner_thread_id == GetCurrentThreadId())
return(&q->in);
return(&q->out);
}
static link_list_t* msgQueueWriteList(msg_queue_t* q)
{
if(q==NULL)
return(NULL);
if(!(q->flags&MSG_QUEUE_BIDIR)
|| q->owner_thread_id == GetCurrentThreadId())
return(&q->out);
return(&q->in);
}
long msgQueueReadLevel(msg_queue_t* q)
{
return listCountNodes(msgQueueReadList(q));
}
static BOOL list_wait(link_list_t* list, long timeout)
{
#if defined(LINK_LIST_THREADSAFE)
if(timeout==-1) /* infinite */
return listSemWait(list)==0;
if(timeout==0) /* poll */
return listSemTryWait(list)==0;
return listSemTryWaitBlock(list,timeout)==0);
#else
return(TRUE);
#endif
}
void* msgQueueRead(msg_queue_t* q, long timeout)
{
if(!list_wait(msgQueueReadList(q),timeout))
return(NULL);
return listPopFirstNode(msgQueueReadList(q));
}
void* msgQueuePeek(msg_queue_t* q, long timeout)
{
if(!list_wait(msgQueueReadList(q),timeout))
return(NULL);
return listNodeData(listFirstNode(msgQueueReadList(q)));
}
void* msgQueueFind(msg_queue_t* q, const void* data, size_t length)
{
return listRemoveNode(msgQueueReadList(q)
,listFindNode(msgQueueReadList(q),data,length));
}
list_node_t* msgQueueFirstNode(msg_queue_t* q)
{
return listFirstNode(msgQueueReadList(q));
}
list_node_t* msgQueueLastNode(msg_queue_t* q)
{
return listLastNode(msgQueueReadList(q));
}
long msgQueueWriteLevel(msg_queue_t* q)
{
return listCountNodes(msgQueueWriteList(q));
}
BOOL msgQueueWrite(msg_queue_t* q, const void* data, size_t length)
{
return listPushNodeData(msgQueueWriteList(q),data,length)!=NULL;
}
/* msg_queue.h */
/* Uni or Bi-directional FIFO message queue */
/* $Id$ */
/****************************************************************************
* @format.tab-size 4 (Plain Text/Source Code File Header) *
* @format.use-tabs true (see http://www.synchro.net/ptsc_hdr.html) *
* *
* Copyright 2004 Rob Swindell - http://www.synchro.net/copyright.html *
* *
* This library is free software; you can redistribute it and/or *
* modify it under the terms of the GNU Lesser General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* See the GNU Lesser General Public License for more details: lgpl.txt or *
* http://www.fsf.org/copyleft/lesser.html *
* *
* Anonymous FTP access to the most recent released source is available at *
* ftp://vert.synchro.net, ftp://cvs.synchro.net and ftp://ftp.synchro.net *
* *
* Anonymous CVS access to the development source and modification history *
* is available at cvs.synchro.net:/cvsroot/sbbs, example: *
* cvs -d :pserver:anonymous@cvs.synchro.net:/cvsroot/sbbs login *
* (just hit return, no password is necessary) *
* cvs -d :pserver:anonymous@cvs.synchro.net:/cvsroot/sbbs checkout src *
* *
* For Synchronet coding style and modification guidelines, see *
* http://www.synchro.net/source.html *
* *
* You are encouraged to submit any modifications (preferably in Unix diff *
* format) via e-mail to mods@synchro.net *
* *
* Note: If this box doesn't appear square, then you need to fix your tabs. *
****************************************************************************/
#ifndef _MSG_QUEUE_H
#define _MSG_QUEUE_H
#include "link_list.h"
#if defined(__cplusplus)
extern "C" {
#endif
typedef struct {
link_list_t in;
link_list_t out;
DWORD owner_thread_id; /* reads from in, writes to out */
long refs;
unsigned long flags; /* private use flags */
void* private_data;
} msg_queue_t;
#define MSG_QUEUE_MALLOC (1<<0) /* Queue allocated with malloc() */
#define MSG_QUEUE_BIDIR (1<<1) /* Bi-directional message queue */
msg_queue_t* msgQueueInit(msg_queue_t*, long flags);
BOOL msgQueueFree(msg_queue_t*);
long msgQueueAttach(msg_queue_t*);
long msgQueueDetach(msg_queue_t*);
/* Get/Set queue private data */
void* msgQueueSetPrivateData(msg_queue_t*, void*);
void* msgQueueGetPrivateData(msg_queue_t*);
long msgQueueReadLevel(msg_queue_t*);
void* msgQueueRead(msg_queue_t*, long timeout);
void* msgQueuePeek(msg_queue_t*, long timeout);
void* msgQueueFind(msg_queue_t*, const void*, size_t length);
list_node_t* msgQueueFirstNode(msg_queue_t*);
list_node_t* msgQueueLastNode(msg_queue_t*);
#define msgQueueNextNode(node) listNextNode(node)
#define msgQueuePrevNode(node) listPrevNode(node)
#define msgQueueNodeData(node) listNodeData(node)
long msgQueueWriteLevel(msg_queue_t*);
BOOL msgQueueWrite(msg_queue_t*, const void*, size_t length);
#if defined(__cplusplus)
}
#endif
#endif /* Don't add anything after this line */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment