From a603208f205d8b06f9e6eefcaba30633cf5e1c6d Mon Sep 17 00:00:00 2001 From: rswindell <> Date: Wed, 10 Nov 2004 00:00:27 +0000 Subject: [PATCH] Message queue implementation (uni or bi-directional) for inter-thread communications. --- src/xpdev/msg_queue.c | 202 ++++++++++++++++++++++++++++++++++++++++++ src/xpdev/msg_queue.h | 86 ++++++++++++++++++ 2 files changed, 288 insertions(+) create mode 100644 src/xpdev/msg_queue.c create mode 100644 src/xpdev/msg_queue.h diff --git a/src/xpdev/msg_queue.c b/src/xpdev/msg_queue.c new file mode 100644 index 0000000000..52e3c67610 --- /dev/null +++ b/src/xpdev/msg_queue.c @@ -0,0 +1,202 @@ +/* msg_queue.c */ + +/* Uni or Bi-directional FIFO message queue */ + +/* $Id$ */ + +/**************************************************************************** + * @format.tab-size 4 (Plain Text/Source Code File Header) * + * @format.use-tabs true (see http://www.synchro.net/ptsc_hdr.html) * + * * + * Copyright 2004 Rob Swindell - http://www.synchro.net/copyright.html * + * * + * This library is free software; you can redistribute it and/or * + * modify it under the terms of the GNU Lesser General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * See the GNU Lesser General Public License for more details: lgpl.txt or * + * http://www.fsf.org/copyleft/lesser.html * + * * + * Anonymous FTP access to the most recent released source is available at * + * ftp://vert.synchro.net, ftp://cvs.synchro.net and ftp://ftp.synchro.net * + * * + * Anonymous CVS access to the development source and modification history * + * is available at cvs.synchro.net:/cvsroot/sbbs, example: * + * cvs -d :pserver:anonymous@cvs.synchro.net:/cvsroot/sbbs login * + * (just hit return, no password is necessary) * + * cvs -d :pserver:anonymous@cvs.synchro.net:/cvsroot/sbbs checkout src * + * * + * For Synchronet coding style and modification guidelines, see * + * http://www.synchro.net/source.html * + * * + * You are encouraged to submit any modifications (preferably in Unix diff * + * format) via e-mail to mods@synchro.net * + * * + * Note: If this box doesn't appear square, then you need to fix your tabs. * + ****************************************************************************/ + +#include <stdlib.h> /* malloc */ +#include <string.h> /* memset */ + +#include "msg_queue.h" + +msg_queue_t* msgQueueInit(msg_queue_t* q, long flags) +{ + if(q==NULL) { + if((q=(msg_queue_t*)malloc(sizeof(msg_queue_t)))==NULL) + return(NULL); + flags |= MSG_QUEUE_MALLOC; + } + + memset(q,0,sizeof(msg_queue_t)); + + q->flags = flags; + q->refs = 1; + q->owner_thread_id = GetCurrentThreadId(); + + if(q->flags&MSG_QUEUE_BIDIR) + listInit(&q->in,LINK_LIST_DONT_FREE|LINK_LIST_SEMAPHORE); + listInit(&q->out,LINK_LIST_DONT_FREE|LINK_LIST_SEMAPHORE); + + return(q); +} + +BOOL msgQueueFree(msg_queue_t* q) +{ + if(q==NULL) + return(FALSE); + + listFree(&q->in); + listFree(&q->out); + + if(q->flags&MSG_QUEUE_MALLOC) + free(q); + + return(TRUE); +} + +long msgQueueAttach(msg_queue_t* q) +{ + if(q==NULL) + return(-1); + + q->refs++; + + return(q->refs); +} + +long msgQueueDetach(msg_queue_t* q) +{ + int refs; + + if(q==NULL || q->refs<1) + return(-1); + + if((refs=--q->refs)==0) + msgQueueFree(q); + + return(refs); +} + +void* msgQueueSetPrivateData(msg_queue_t* q, void* p) +{ + void* old; + + if(q==NULL) + return(NULL); + + old=q->private_data; + q->private_data=p; + return(old); +} + +void* msgQueueGetPrivateData(msg_queue_t* q) +{ + if(q==NULL) + return(NULL); + return(q->private_data); +} + +static link_list_t* msgQueueReadList(msg_queue_t* q) +{ + if(q==NULL) + return(NULL); + + if((q->flags&MSG_QUEUE_BIDIR) + && q->owner_thread_id == GetCurrentThreadId()) + return(&q->in); + return(&q->out); +} + +static link_list_t* msgQueueWriteList(msg_queue_t* q) +{ + if(q==NULL) + return(NULL); + + if(!(q->flags&MSG_QUEUE_BIDIR) + || q->owner_thread_id == GetCurrentThreadId()) + return(&q->out); + return(&q->in); +} + +long msgQueueReadLevel(msg_queue_t* q) +{ + return listCountNodes(msgQueueReadList(q)); +} + +static BOOL list_wait(link_list_t* list, long timeout) +{ +#if defined(LINK_LIST_THREADSAFE) + if(timeout==-1) /* infinite */ + return listSemWait(list)==0; + if(timeout==0) /* poll */ + return listSemTryWait(list)==0; + + return listSemTryWaitBlock(list,timeout)==0); +#else + return(TRUE); +#endif +} + +void* msgQueueRead(msg_queue_t* q, long timeout) +{ + if(!list_wait(msgQueueReadList(q),timeout)) + return(NULL); + + return listPopFirstNode(msgQueueReadList(q)); +} + +void* msgQueuePeek(msg_queue_t* q, long timeout) +{ + if(!list_wait(msgQueueReadList(q),timeout)) + return(NULL); + + return listNodeData(listFirstNode(msgQueueReadList(q))); +} + +void* msgQueueFind(msg_queue_t* q, const void* data, size_t length) +{ + return listRemoveNode(msgQueueReadList(q) + ,listFindNode(msgQueueReadList(q),data,length)); +} + +list_node_t* msgQueueFirstNode(msg_queue_t* q) +{ + return listFirstNode(msgQueueReadList(q)); +} + +list_node_t* msgQueueLastNode(msg_queue_t* q) +{ + return listLastNode(msgQueueReadList(q)); +} + +long msgQueueWriteLevel(msg_queue_t* q) +{ + return listCountNodes(msgQueueWriteList(q)); +} + +BOOL msgQueueWrite(msg_queue_t* q, const void* data, size_t length) +{ + return listPushNodeData(msgQueueWriteList(q),data,length)!=NULL; +} + diff --git a/src/xpdev/msg_queue.h b/src/xpdev/msg_queue.h new file mode 100644 index 0000000000..2f76c22cb0 --- /dev/null +++ b/src/xpdev/msg_queue.h @@ -0,0 +1,86 @@ +/* msg_queue.h */ + +/* Uni or Bi-directional FIFO message queue */ + +/* $Id$ */ + +/**************************************************************************** + * @format.tab-size 4 (Plain Text/Source Code File Header) * + * @format.use-tabs true (see http://www.synchro.net/ptsc_hdr.html) * + * * + * Copyright 2004 Rob Swindell - http://www.synchro.net/copyright.html * + * * + * This library is free software; you can redistribute it and/or * + * modify it under the terms of the GNU Lesser General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * See the GNU Lesser General Public License for more details: lgpl.txt or * + * http://www.fsf.org/copyleft/lesser.html * + * * + * Anonymous FTP access to the most recent released source is available at * + * ftp://vert.synchro.net, ftp://cvs.synchro.net and ftp://ftp.synchro.net * + * * + * Anonymous CVS access to the development source and modification history * + * is available at cvs.synchro.net:/cvsroot/sbbs, example: * + * cvs -d :pserver:anonymous@cvs.synchro.net:/cvsroot/sbbs login * + * (just hit return, no password is necessary) * + * cvs -d :pserver:anonymous@cvs.synchro.net:/cvsroot/sbbs checkout src * + * * + * For Synchronet coding style and modification guidelines, see * + * http://www.synchro.net/source.html * + * * + * You are encouraged to submit any modifications (preferably in Unix diff * + * format) via e-mail to mods@synchro.net * + * * + * Note: If this box doesn't appear square, then you need to fix your tabs. * + ****************************************************************************/ + +#ifndef _MSG_QUEUE_H +#define _MSG_QUEUE_H + +#include "link_list.h" + +#if defined(__cplusplus) +extern "C" { +#endif + +typedef struct { + link_list_t in; + link_list_t out; + DWORD owner_thread_id; /* reads from in, writes to out */ + long refs; + unsigned long flags; /* private use flags */ + void* private_data; +} msg_queue_t; + +#define MSG_QUEUE_MALLOC (1<<0) /* Queue allocated with malloc() */ +#define MSG_QUEUE_BIDIR (1<<1) /* Bi-directional message queue */ + +msg_queue_t* msgQueueInit(msg_queue_t*, long flags); +BOOL msgQueueFree(msg_queue_t*); + +long msgQueueAttach(msg_queue_t*); +long msgQueueDetach(msg_queue_t*); + +/* Get/Set queue private data */ +void* msgQueueSetPrivateData(msg_queue_t*, void*); +void* msgQueueGetPrivateData(msg_queue_t*); + +long msgQueueReadLevel(msg_queue_t*); +void* msgQueueRead(msg_queue_t*, long timeout); +void* msgQueuePeek(msg_queue_t*, long timeout); +void* msgQueueFind(msg_queue_t*, const void*, size_t length); +list_node_t* msgQueueFirstNode(msg_queue_t*); +list_node_t* msgQueueLastNode(msg_queue_t*); +#define msgQueueNextNode(node) listNextNode(node) +#define msgQueuePrevNode(node) listPrevNode(node) +#define msgQueueNodeData(node) listNodeData(node) + +long msgQueueWriteLevel(msg_queue_t*); +BOOL msgQueueWrite(msg_queue_t*, const void*, size_t length); + +#if defined(__cplusplus) +} +#endif + +#endif /* Don't add anything after this line */ -- GitLab