Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > Queue_impl
changeset 0:85af604dee9b
initial add
| author | Me |
|---|---|
| date | Sat, 22 May 2010 19:51:09 -0700 |
| parents | |
| children | 81f6687d52d1 |
| files | BlockingQueue.c BlockingQueue.h PrivateQueue.c PrivateQueue.h |
| diffstat | 4 files changed, 531 insertions(+), 0 deletions(-) [+] |
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/BlockingQueue.c Sat May 22 19:51:09 2010 -0700 1.3 @@ -0,0 +1,314 @@ 1.4 +/* 1.5 + * Copyright 2009 OpenSourceCodeStewardshipFoundation.org 1.6 + * Licensed under GNU General Public License version 2 1.7 + * 1.8 + * NOTE: this version of SRSW correct as of April 25, 2010 1.9 + * 1.10 + * Author: seanhalle@yahoo.com 1.11 + */ 1.12 + 1.13 + 1.14 +#include <stdio.h> 1.15 +#include <errno.h> 1.16 +#include <pthread.h> 1.17 +#include <stdlib.h> 1.18 + 1.19 +#include "BlockingQueue.h" 1.20 + 1.21 +#define INC(x) (++x == 1024) ? (x) = 0 : (x) 1.22 + 1.23 + 1.24 +//=========================================================================== 1.25 +//Normal pthread Q 1.26 + 1.27 +QueueStruc* makeQ() 1.28 + { 1.29 + QueueStruc* retQ; 1.30 + int status; 1.31 + retQ = (QueueStruc *) malloc( sizeof( QueueStruc ) ); 1.32 + 1.33 + 1.34 + status = pthread_mutex_init( &retQ->mutex_t, NULL); 1.35 + if (status < 0) 1.36 + { 1.37 + perror("Error in creating mutex:"); 1.38 + exit(1); 1.39 + return NULL; 1.40 + } 1.41 + 1.42 + status = pthread_cond_init ( &retQ->cond_w_t, NULL); 1.43 + if (status < 0) 1.44 + { 1.45 + perror("Error in creating cond_var:"); 1.46 + exit(1); 1.47 + return NULL; 1.48 + } 1.49 + 1.50 + status = pthread_cond_init ( &retQ->cond_r_t, NULL); 1.51 + if (status < 0) 1.52 + { 1.53 + perror("Error in creating cond_var:"); 1.54 + exit(1); 1.55 + return NULL; 1.56 + } 1.57 + 1.58 + retQ->count = 0; 1.59 + retQ->readPos = 0; 1.60 + retQ->writePos = 0; 1.61 + retQ -> w_empty = retQ -> w_full = 0; 1.62 + 1.63 + return retQ; 1.64 + } 1.65 + 1.66 +void * readQ( QueueStruc *Q ) 1.67 + { void *ret; 1.68 + int status, wt; 1.69 + pthread_mutex_lock( &Q->mutex_t ); 1.70 + { 1.71 + while( Q -> count == 0 ) 1.72 + { Q -> w_empty = 1; 1.73 + status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); 1.74 + if (status != 0) 1.75 + { perror("Thread wait error: "); 1.76 + exit(1); 1.77 + } 1.78 + } 1.79 + Q -> w_empty = 0; 1.80 + Q -> count -= 1; 1.81 + ret = Q->data[ Q->readPos ]; 1.82 + INC( Q->readPos ); 1.83 + wt = Q -> w_full; 1.84 + Q -> w_full = 0; 1.85 + } 1.86 + pthread_mutex_unlock( &Q->mutex_t ); 1.87 + if (wt) pthread_cond_signal( &Q->cond_w_t ); 1.88 + 1.89 + return( ret ); 1.90 + } 1.91 + 1.92 +void writeQ( void * in, QueueStruc* Q ) 1.93 + { 1.94 + int status, wt; 1.95 + pthread_mutex_lock( &Q->mutex_t ); 1.96 + { 1.97 + while( Q->count >= 1024 ) 1.98 + { 1.99 + Q -> w_full = 1; 1.100 + // pthread_cond_broadcast( &Q->cond_r_t ); 1.101 + status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); 1.102 + if (status != 0) 1.103 + { perror("Thread wait error: "); 1.104 + exit(1); 1.105 + } 1.106 + } 1.107 + Q -> w_full = 0; 1.108 + Q->count += 1; 1.109 + Q->data[ Q->writePos ] = in; 1.110 + INC( Q->writePos ); 1.111 + wt = Q -> w_empty; 1.112 + Q -> w_empty = 0; 1.113 + // pthread_cond_broadcast( &Q->cond_r_t ); 1.114 + } 1.115 + pthread_mutex_unlock( &Q->mutex_t ); 1.116 + if( wt ) pthread_cond_signal( &Q->cond_r_t ); 1.117 + } 1.118 + 1.119 + 1.120 +//=========================================================================== 1.121 +// multi reader multi writer fast Q via CAS 1.122 +#ifndef _GNU_SOURCE 1.123 +#define _GNU_SOURCE 1.124 + 1.125 +/*This is a blocking queue, but it uses CAS instr plus yield() when empty 1.126 + * or full 1.127 + *It uses CAS because it's meant to have more than one reader and more than 1.128 + * one writer. 1.129 + */ 1.130 + 1.131 +CASQueueStruc* makeCASQ() 1.132 + { 1.133 + CASQueueStruc* retQ; 1.134 + retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) ); 1.135 + 1.136 + retQ->insertLock = UNLOCKED; 1.137 + retQ->extractLock= UNLOCKED; 1.138 + //TODO: check got pointer syntax right 1.139 + retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty 1.140 + retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be 1.141 + retQ->endOfData = &(retQ->startOfData[1023]); 1.142 + 1.143 + return retQ; 1.144 + } 1.145 + 1.146 + 1.147 +void* readCASQ( CASQueueStruc* Q ) 1.148 + { void *out = 0; 1.149 + int tries = 0; 1.150 + int startOfData = Q->startOfData; 1.151 + int endOfData = Q->endOfData; 1.152 + 1.153 + int success = FALSE; 1.154 + 1.155 + while( !success ) 1.156 + { success = 1.157 + __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); 1.158 + if( success ) 1.159 + { 1.160 + volatile int insertPos = Q->insertPos; 1.161 + volatile int extractPos = Q->extractPos; 1.162 + 1.163 + //if not empty -- extract just below insert when empty 1.164 + if( insertPos - extractPos != 1 && 1.165 + !(extractPos == endOfData && insertPos == startOfData)) 1.166 + { //move before read 1.167 + if( extractPos == endOfData ) //write new pos exactly once, correctly 1.168 + { Q->extractPos = startOfData; //can't overrun then fix it 'cause 1.169 + } // other thread might read bad pos 1.170 + else 1.171 + { Q->extractPos++; 1.172 + } 1.173 + out = *(Q->extractPos); 1.174 + Q->extractLock = UNLOCKED; 1.175 + return out; 1.176 + } 1.177 + else //Q is empty 1.178 + { success = FALSE; 1.179 + Q->extractLock = UNLOCKED;//have to try again, release for others 1.180 + } 1.181 + } 1.182 + //Q is busy or empty 1.183 + tries++; 1.184 + if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock 1.185 + } 1.186 + } 1.187 + 1.188 +void writeCASQ( void * in, CASQueueStruc* Q ) 1.189 + { 1.190 + int tries = 0; 1.191 + int startOfData = Q->startOfData; 1.192 + int endOfData = Q->endOfData; 1.193 + 1.194 + int success = FALSE; 1.195 + 1.196 + while( !success ) 1.197 + { success = 1.198 + __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); 1.199 + if( success ) 1.200 + { 1.201 + volatile int insertPos = Q->insertPos; 1.202 + volatile int extractPos = Q->extractPos; 1.203 + 1.204 + //check if room to insert.. can't use a count variable 1.205 + // 'cause both insertor Thd and extractor Thd would write it 1.206 + if( extractPos - insertPos != 1 && 1.207 + !(insertPos == endOfData && extractPos == startOfData)) 1.208 + { *(insertPos) = in; //insert before move 1.209 + if( insertPos == endOfData ) //write new pos exactly once, correctly 1.210 + { Q->insertPos = startOfData; 1.211 + } 1.212 + else 1.213 + { Q->insertPos++; 1.214 + } 1.215 + Q->insertLock = UNLOCKED; 1.216 + return; 1.217 + } 1.218 + else //Q is full 1.219 + { success = FALSE; 1.220 + Q->insertLock = UNLOCKED;//have to try again, release for others 1.221 + } 1.222 + } 1.223 + tries++; 1.224 + if( tries > 10000 ) pthread_yield();//yield not guaranteed 1.225 + } 1.226 + } 1.227 + 1.228 +#endif //_GNU_SOURCE 1.229 + 1.230 +//=========================================================================== 1.231 +//Single reader single writer super fast Q.. no atomic instrs.. 1.232 + 1.233 + 1.234 +/*This is a blocking queue, but it uses no atomic instructions, just does 1.235 + * busy-waiting when empty or full (but yield() if waits too long) 1.236 + * 1.237 + *It doesn't need any atomic instructions because only a single thread 1.238 + * extracts and only a single thread inserts, and it has no locations that 1.239 + * are written by both. It writes before moving and moves before reading, 1.240 + * and never lets write position and read position be the same, so dis- 1.241 + * synchrony can only ever cause an unnecessary call to yield(), never a 1.242 + * wrong value (by monotonicity of movement of pointers, plus single writer 1.243 + * to pointers, plus sequence of write before change pointer, plus 1.244 + * assumptions that if thread A semantically writes X before Y, then thread 1.245 + * B will see the writes in that order.) 1.246 + */ 1.247 + 1.248 +SRSWQueueStruc* makeSRSWQ() 1.249 + { 1.250 + SRSWQueueStruc* retQ; 1.251 + retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) ); 1.252 + 1.253 + retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty 1.254 + retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be 1.255 + retQ->endOfData = &(retQ->startOfData[1023]); 1.256 + 1.257 + return retQ; 1.258 + } 1.259 + 1.260 + 1.261 +void* readSRSWQ( SRSWQueueStruc* Q ) 1.262 + { void *out = 0; 1.263 + int tries = 0; 1.264 + int startOfData = Q->startOfData; 1.265 + int endOfData = Q->endOfData; 1.266 + 1.267 + while( TRUE ) 1.268 + { //not certain the volatile reads need to be done, but safe.. 1.269 + volatile int insertPos = Q->insertPos; 1.270 + volatile int extractPos = Q->extractPos; 1.271 + 1.272 + //if not empty -- extract just below insert when empty 1.273 + if( insertPos - extractPos != 1 && 1.274 + !(extractPos == endOfData && insertPos == startOfData)) 1.275 + { //move before read 1.276 + if( extractPos == endOfData ) //write new pos exactly once, correctly 1.277 + { Q->extractPos = startOfData; //can't overrun then fix it 'cause 1.278 + } // other thread might read bad pos 1.279 + else 1.280 + { Q->extractPos++; 1.281 + } 1.282 + out = *(Q->extractPos); 1.283 + return out; 1.284 + } 1.285 + //Q is empty 1.286 + tries++; 1.287 + if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock 1.288 + } 1.289 + } 1.290 + 1.291 +void writeSRSWQ( void * in, SRSWQueueStruc* Q ) 1.292 + { 1.293 + int tries = 0; 1.294 + int startOfData = Q->startOfData; 1.295 + int endOfData = Q->endOfData; 1.296 + 1.297 + while( TRUE ) 1.298 + { //not certain the volatile reads need to be done, but safe.. 1.299 + volatile int insertPos = Q->insertPos; 1.300 + volatile int extractPos = Q->extractPos; 1.301 + 1.302 + if( extractPos - insertPos != 1 && 1.303 + !(insertPos == endOfData && extractPos == startOfData)) 1.304 + { *(insertPos) = in; //insert before move 1.305 + if( insertPos == endOfData ) //write new pos exactly once, correctly 1.306 + { Q->insertPos = startOfData; 1.307 + } 1.308 + else 1.309 + { Q->insertPos++; 1.310 + } 1.311 + return; 1.312 + } 1.313 + //Q is full 1.314 + tries++; 1.315 + if( tries > 10000 ) pthread_yield();//yield not guaranteed 1.316 + } 1.317 + }
2.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 2.2 +++ b/BlockingQueue.h Sat May 22 19:51:09 2010 -0700 2.3 @@ -0,0 +1,72 @@ 2.4 +/* 2.5 + * Copyright 2009 OpenSourceStewardshipFoundation.org 2.6 + * Licensed under GNU General Public License version 2 2.7 + * 2.8 + * Author: seanhalle@yahoo.com 2.9 + */ 2.10 + 2.11 +#ifndef _BLOCKING_QUEUE_H 2.12 +#define _BLOCKING_QUEUE_H 2.13 + 2.14 +#include <pthread.h> 2.15 + 2.16 +#define TRUE 1 2.17 +#define FALSE 0 2.18 + 2.19 +#define LOCKED 1 2.20 +#define UNLOCKED 0 2.21 + 2.22 + 2.23 +/* It is the data that is shared so only need one mutex. */ 2.24 +typedef 2.25 +struct 2.26 + { 2.27 + pthread_mutex_t mutex_t; 2.28 + pthread_cond_t cond_w_t; 2.29 + pthread_cond_t cond_r_t; 2.30 + int count; 2.31 + int readPos; 2.32 + int writePos; 2.33 + void* data[1024]; //an array of pointers 2.34 + int w_empty; 2.35 + int w_full; 2.36 + } 2.37 +QueueStruc; 2.38 + 2.39 + 2.40 +typedef 2.41 +struct 2.42 + { int insertLock; 2.43 + int extractLock; 2.44 + void* *insertPos; 2.45 + void* *extractPos; 2.46 + void* startOfData[1024]; //data is pointers 2.47 + void* *endOfData; //set when make queue 2.48 + } 2.49 +CASQueueStruc; 2.50 + 2.51 + 2.52 +typedef 2.53 +struct 2.54 + { void* *insertPos; 2.55 + void* *extractPos; 2.56 + void* startOfData[1024]; //data is pointers 2.57 + void* *endOfData; //set when make queue 2.58 + } 2.59 +SRSWQueueStruc; 2.60 + 2.61 + 2.62 +QueueStruc* makeQ(); 2.63 +void* readQ( QueueStruc *Q ); 2.64 +void writeQ( void *in, QueueStruc *Q ); 2.65 + 2.66 +CASQueueStruc* makeCASQ(); 2.67 +void* readCASQ( CASQueueStruc *Q ); 2.68 +void writeCASQ( void *in, CASQueueStruc *Q ); 2.69 + 2.70 +SRSWQueueStruc* makeSRSWQ(); 2.71 +void* readSRSWQ( SRSWQueueStruc *Q ); 2.72 +void writeSRSWQ( void *in, SRSWQueueStruc *Q ); 2.73 + 2.74 +#endif /* _BLOCKING_QUEUE_H */ 2.75 +
3.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 3.2 +++ b/PrivateQueue.c Sat May 22 19:51:09 2010 -0700 3.3 @@ -0,0 +1,110 @@ 3.4 +/* 3.5 + * Copyright 2009 OpenSourceCodeStewardshipFoundation.org 3.6 + * Licensed under GNU General Public License version 2 3.7 + * 3.8 + * NOTE: this version of SRSW correct as of April 25, 2010 3.9 + * 3.10 + * Author: seanhalle@yahoo.com 3.11 + */ 3.12 + 3.13 + 3.14 +#include <stdio.h> 3.15 +#include <string.h> 3.16 +#include <errno.h> 3.17 +#include <stdlib.h> 3.18 + 3.19 +#include "PrivateQueue.h" 3.20 + 3.21 + 3.22 + 3.23 +//=========================================================================== 3.24 + 3.25 +/*This kind of queue is private to a single core at a time -- has no 3.26 + * synchronizations 3.27 + */ 3.28 + 3.29 +PrivQueueStruc* makePrivQ() 3.30 + { 3.31 + PrivQueueStruc* retQ; 3.32 + retQ = (PrivQueueStruc *) malloc( sizeof( PrivQueueStruc ) ); 3.33 + 3.34 + retQ->startOfData = malloc( 1024 * sizeof(void *) ); 3.35 + 3.36 + retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty 3.37 + retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be 3.38 + retQ->endOfData = &(retQ->startOfData[1023]); 3.39 + 3.40 + return retQ; 3.41 + } 3.42 + 3.43 +PrivQueueStruc* make_larger_PrivQ( PrivQueueStruc *Q ) 3.44 + { int oldSize, newSize; 3.45 + void **oldStartOfData; 3.46 + 3.47 + oldSize = Q->endOfData - Q->startOfData; 3.48 + newSize = 2 * oldSize; 3.49 + Q->startOfData = malloc( newSize * sizeof(void *) ); 3.50 + memcpy(Q->startOfData, oldStartOfData, oldSize * sizeof(void *)); 3.51 + free(oldStartOfData); 3.52 + 3.53 + Q->extractPos = &(Q->startOfData[0]); //side by side == empty 3.54 + Q->insertPos = &(Q->startOfData[1]); // so start pos's have to be 3.55 + Q->endOfData = &(Q->startOfData[newSize - 1]); 3.56 + 3.57 + return Q; 3.58 + } 3.59 + 3.60 + 3.61 +/*Returns NULL when queue is empty 3.62 + */ 3.63 +void* readPrivQ( PrivQueueStruc* Q ) 3.64 + { void *out = 0; 3.65 + int startOfData = Q->startOfData; 3.66 + int endOfData = Q->endOfData; 3.67 + 3.68 + volatile int insertPos = Q->insertPos; 3.69 + volatile int extractPos = Q->extractPos; 3.70 + 3.71 + //if not empty -- extract just below insert when empty 3.72 + if( insertPos - extractPos != 1 && 3.73 + !(extractPos == endOfData && insertPos == startOfData)) 3.74 + { //move before read 3.75 + if( extractPos == endOfData ) //write new pos exactly once, correctly 3.76 + { Q->extractPos = startOfData; //can't overrun then fix it 'cause 3.77 + } // other thread might read bad pos 3.78 + else 3.79 + { Q->extractPos++; 3.80 + } 3.81 + out = *(Q->extractPos); 3.82 + return out; 3.83 + } 3.84 + //Q is empty 3.85 + return NULL; 3.86 + } 3.87 + 3.88 +/*Returns false when the queue was full. 3.89 + * have option of calling make_larger_PrivQ to make more room, then try again 3.90 + */ 3.91 +bool8 writePrivQ( void * in, PrivQueueStruc* Q ) 3.92 + { 3.93 + int startOfData = Q->startOfData; 3.94 + int endOfData = Q->endOfData; 3.95 + 3.96 + volatile int insertPos = Q->insertPos; 3.97 + volatile int extractPos = Q->extractPos; 3.98 + 3.99 + if( extractPos - insertPos != 1 && 3.100 + !(insertPos == endOfData && extractPos == startOfData)) 3.101 + { *(insertPos) = in; //insert before move 3.102 + if( insertPos == endOfData ) //write new pos exactly once, correctly 3.103 + { Q->insertPos = startOfData; 3.104 + } 3.105 + else 3.106 + { Q->insertPos++; 3.107 + } 3.108 + return TRUE; 3.109 + } 3.110 + //Q is full 3.111 + return FALSE; 3.112 + } 3.113 +
4.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 4.2 +++ b/PrivateQueue.h Sat May 22 19:51:09 2010 -0700 4.3 @@ -0,0 +1,35 @@ 4.4 +/* 4.5 + * Copyright 2009 OpenSourceStewardshipFoundation.org 4.6 + * Licensed under GNU General Public License version 2 4.7 + * 4.8 + * Author: seanhalle@yahoo.com 4.9 + */ 4.10 + 4.11 +#ifndef _PRIVATE_QUEUE_H 4.12 +#define _PRIVATE_QUEUE_H 4.13 + 4.14 +#include <pthread.h> 4.15 + 4.16 +#define TRUE 1 4.17 +#define FALSE 0 4.18 + 4.19 +#define LOCKED 1 4.20 +#define UNLOCKED 0 4.21 + 4.22 + 4.23 +/* It is the data that is shared so only need one mutex. */ 4.24 +typedef struct 4.25 + { void **insertPos; 4.26 + void **extractPos; 4.27 + void **startOfData; //data is pointers 4.28 + void **endOfData; //set when alloc data 4.29 + } 4.30 +PrivQueueStruc; 4.31 + 4.32 + 4.33 +PrivQueueStruc* makePrivQ ( ); 4.34 +void* readPrivQ ( PrivQueueStruc *Q ); 4.35 +void writePrivQ( void *in, PrivQueueStruc *Q ); 4.36 + 4.37 +#endif /* _PRIVATE_QUEUE_H */ 4.38 +
