Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > Queue_impl
changeset 19:1ed562d601d9
Newly created project repository -- commit sub-states
| author | Me@portablequad |
|---|---|
| date | Tue, 07 Feb 2012 12:51:29 -0800 |
| parents | 53c614b781ce |
| children | b5ae7fbb1f01 27b341a31a21 |
| files | BlockingQueue.c PrivateQueue.c PrivateQueue.h |
| diffstat | 3 files changed, 648 insertions(+), 648 deletions(-) [+] |
line diff
1.1 --- a/BlockingQueue.c Thu Nov 04 17:50:29 2010 -0700 1.2 +++ b/BlockingQueue.c Tue Feb 07 12:51:29 2012 -0800 1.3 @@ -1,470 +1,470 @@ 1.4 -/* 1.5 - * Copyright 2009 OpenSourceStewardshipFoundation.org 1.6 - * Licensed under GNU General Public License version 2 1.7 - * 1.8 - * Author: seanhalle@yahoo.com 1.9 - */ 1.10 - 1.11 - 1.12 -#include <stdio.h> 1.13 -#include <errno.h> 1.14 -#include <pthread.h> 1.15 -#include <stdlib.h> 1.16 -#include <sched.h> 1.17 - 1.18 -#include "BlockingQueue.h" 1.19 - 1.20 -#define INC(x) (++x == 1024) ? (x) = 0 : (x) 1.21 - 1.22 -#define SPINLOCK_TRIES 100000 1.23 - 1.24 -//=========================================================================== 1.25 -//Normal pthread Q 1.26 - 1.27 -PThdQueueStruc* makePThdQ() 1.28 - { 1.29 - PThdQueueStruc* retQ; 1.30 - int retCode; 1.31 - retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) ); 1.32 - 1.33 - 1.34 - retCode = 1.35 - pthread_mutex_init( &retQ->mutex_t, NULL); 1.36 - if(retCode){perror("Error in creating mutex:"); exit(1);} 1.37 - 1.38 - retCode = pthread_cond_init ( &retQ->cond_w_t, NULL); 1.39 - if(retCode){perror("Error in creating cond_var:"); exit(1);} 1.40 - 1.41 - retCode = pthread_cond_init ( &retQ->cond_r_t, NULL); 1.42 - if(retCode){perror("Error in creating cond_var:"); exit(1);} 1.43 - 1.44 - retQ->count = 0; 1.45 - retQ->readPos = 0; 1.46 - retQ->writePos = 0; 1.47 - retQ->w_empty = 0; 1.48 - retQ->w_full = 0; 1.49 - 1.50 - return retQ; 1.51 - } 1.52 - 1.53 -void * readPThdQ( PThdQueueStruc *Q ) 1.54 - { void *ret; 1.55 - int retCode, wt; 1.56 - pthread_mutex_lock( &Q->mutex_t ); 1.57 - { 1.58 - while( Q -> count == 0 ) 1.59 - { Q -> w_empty = 1; 1.60 - retCode = 1.61 - pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); 1.62 - if( retCode ){ perror("Thread wait error: "); exit(1); } 1.63 - } 1.64 - Q -> w_empty = 0; 1.65 - Q -> count -= 1; 1.66 - ret = Q->data[ Q->readPos ]; 1.67 - INC( Q->readPos ); 1.68 - wt = Q -> w_full; 1.69 - Q -> w_full = 0; 1.70 - } 1.71 - pthread_mutex_unlock( &Q->mutex_t ); 1.72 - if (wt) 1.73 - pthread_cond_signal( &Q->cond_w_t ); 1.74 - 1.75 - //printf("Q out: %d\n", ret); 1.76 - return( ret ); 1.77 - } 1.78 - 1.79 -void writePThdQ( void * in, PThdQueueStruc* Q ) 1.80 - { 1.81 - int status, wt; 1.82 - //printf("Q in: %d\n", in); 1.83 - 1.84 - pthread_mutex_lock( &Q->mutex_t ); 1.85 - { 1.86 - while( Q->count >= 1024 ) 1.87 - { 1.88 - Q -> w_full = 1; 1.89 - status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); 1.90 - if (status != 0) 1.91 - { perror("Thread wait error: "); 1.92 - exit(1); 1.93 - } 1.94 - } 1.95 - 1.96 - Q -> w_full = 0; 1.97 - Q->count += 1; 1.98 - Q->data[ Q->writePos ] = in; 1.99 - INC( Q->writePos ); 1.100 - wt = Q -> w_empty; 1.101 - Q -> w_empty = 0; 1.102 - } 1.103 - 1.104 - pthread_mutex_unlock( &Q->mutex_t ); 1.105 - if( wt ) pthread_cond_signal( &Q->cond_r_t ); 1.106 - } 1.107 - 1.108 - 1.109 -//=========================================================================== 1.110 -// multi reader multi writer fast Q via CAS 1.111 -#ifndef _GNU_SOURCE 1.112 -#define _GNU_SOURCE 1.113 - 1.114 -/*This is a blocking queue, but it uses CAS instr plus yield() when empty 1.115 - * or full 1.116 - *It uses CAS because it's meant to have more than one reader and more than 1.117 - * one writer. 1.118 - */ 1.119 - 1.120 -CASQueueStruc* makeCASQ() 1.121 - { 1.122 - CASQueueStruc* retQ; 1.123 - retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) ); 1.124 - 1.125 - retQ->insertLock = UNLOCKED; 1.126 - retQ->extractLock= UNLOCKED; 1.127 - //TODO: check got pointer syntax right 1.128 - retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty 1.129 - retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be 1.130 - retQ->endOfData = &(retQ->startOfData[1023]); 1.131 - 1.132 - return retQ; 1.133 - } 1.134 - 1.135 - 1.136 -void* readCASQ( CASQueueStruc* Q ) 1.137 - { void *out = 0; 1.138 - int tries = 0; 1.139 - void **startOfData = Q->startOfData; 1.140 - void **endOfData = Q->endOfData; 1.141 - 1.142 - int gotLock = FALSE; 1.143 - 1.144 - while( TRUE ) 1.145 - { //this intrinsic returns true if the lock held "UNLOCKED", in which 1.146 - // case it now holds "LOCKED" -- if it already held "LOCKED", then 1.147 - // gotLock is FALSE 1.148 - gotLock = 1.149 - __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); 1.150 - //NOTE: checked assy, and it does lock correctly.. 1.151 - if( gotLock ) 1.152 - { 1.153 - void **insertPos = Q->insertPos; 1.154 - void **extractPos = Q->extractPos; 1.155 - 1.156 - //if not empty -- extract just below insert when empty 1.157 - if( insertPos - extractPos != 1 && 1.158 - !(extractPos == endOfData && insertPos == startOfData)) 1.159 - { //move before read 1.160 - if( extractPos == endOfData ) //write new pos exactly once, correctly 1.161 - { Q->extractPos = startOfData; //can't overrun then fix it 'cause 1.162 - } // other thread might read bad pos 1.163 - else 1.164 - { Q->extractPos++; 1.165 - } 1.166 - out = *(Q->extractPos); 1.167 - Q->extractLock = UNLOCKED; 1.168 - return out; 1.169 - } 1.170 - else //Q is empty 1.171 - { Q->extractLock = UNLOCKED;//empty, so release lock for others 1.172 - } 1.173 - } 1.174 - //Q is busy or empty 1.175 - tries++; 1.176 - if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable 1.177 - } 1.178 - } 1.179 - 1.180 -void writeCASQ( void * in, CASQueueStruc* Q ) 1.181 - { 1.182 - int tries = 0; 1.183 - //TODO: need to make Q volatile? Want to do this Q in assembly! 1.184 - //Have no idea what GCC's going to do to this code 1.185 - void **startOfData = Q->startOfData; 1.186 - void **endOfData = Q->endOfData; 1.187 - 1.188 - int gotLock = FALSE; 1.189 - 1.190 - while( TRUE ) 1.191 - { //this intrinsic returns true if the lock held "UNLOCKED", in which 1.192 - // case it now holds "LOCKED" -- if it already held "LOCKED", then 1.193 - // gotLock is FALSE 1.194 - gotLock = 1.195 - __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); 1.196 - if( gotLock ) 1.197 - { 1.198 - void **insertPos = Q->insertPos; 1.199 - void **extractPos = Q->extractPos; 1.200 - 1.201 - //check if room to insert.. can't use a count variable 1.202 - // 'cause both insertor Thd and extractor Thd would write it 1.203 - if( extractPos - insertPos != 1 && 1.204 - !(insertPos == endOfData && extractPos == startOfData)) 1.205 - { *(Q->insertPos) = in; //insert before move 1.206 - if( insertPos == endOfData ) 1.207 - { Q->insertPos = startOfData; 1.208 - } 1.209 - else 1.210 - { Q->insertPos++; 1.211 - } 1.212 - Q->insertLock = UNLOCKED; 1.213 - return; 1.214 - } 1.215 - else //Q is full 1.216 - { Q->insertLock = UNLOCKED;//full, so release lock for others 1.217 - } 1.218 - } 1.219 - tries++; 1.220 - if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable 1.221 - } 1.222 - } 1.223 - 1.224 -#endif //_GNU_SOURCE 1.225 - 1.226 - 1.227 -//=========================================================================== 1.228 -//Single reader single writer super fast Q.. no atomic instrs.. 1.229 - 1.230 - 1.231 -/*This is a blocking queue, but it uses no atomic instructions, just does 1.232 - * yield() when empty or full 1.233 - * 1.234 - *It doesn't need any atomic instructions because only a single thread 1.235 - * extracts and only a single thread inserts, and it has no locations that 1.236 - * are written by both. It writes before moving and moves before reading, 1.237 - * and never lets write position and read position be the same, so dis- 1.238 - * synchrony can only ever cause an unnecessary call to yield(), never a 1.239 - * wrong value (by monotonicity of movement of pointers, plus single writer 1.240 - * to pointers, plus sequence of write before change pointer, plus 1.241 - * assumptions that if thread A semantically writes X before Y, then thread 1.242 - * B will see the writes in that order.) 1.243 - */ 1.244 - 1.245 -SRSWQueueStruc* makeSRSWQ() 1.246 - { 1.247 - SRSWQueueStruc* retQ; 1.248 - retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) ); 1.249 - memset( retQ->startOfData, 0, 1024 * sizeof(void *) ); 1.250 - 1.251 - retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty 1.252 - retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be 1.253 - retQ->endOfData = &(retQ->startOfData[1023]); 1.254 - 1.255 - return retQ; 1.256 - } 1.257 - 1.258 -void 1.259 -freeSRSWQ( SRSWQueueStruc* Q ) 1.260 - { 1.261 - free( Q ); 1.262 - } 1.263 - 1.264 -void* readSRSWQ( SRSWQueueStruc* Q ) 1.265 - { void *out = 0; 1.266 - int tries = 0; 1.267 - 1.268 - while( TRUE ) 1.269 - { 1.270 - if( Q->insertPos - Q->extractPos != 1 && 1.271 - !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) 1.272 - { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData; 1.273 - else Q->extractPos++; //move before read 1.274 - out = *(Q->extractPos); 1.275 - return out; 1.276 - } 1.277 - //Q is empty 1.278 - tries++; 1.279 - if( tries > SPINLOCK_TRIES ) pthread_yield(); 1.280 - } 1.281 - } 1.282 - 1.283 - 1.284 -void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q ) 1.285 - { void *out = 0; 1.286 - int tries = 0; 1.287 - 1.288 - while( TRUE ) 1.289 - { 1.290 - if( Q->insertPos - Q->extractPos != 1 && 1.291 - !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) 1.292 - { Q->extractPos++; //move before read 1.293 - if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData; 1.294 - out = *(Q->extractPos); 1.295 - return out; 1.296 - } 1.297 - //Q is empty 1.298 - tries++; 1.299 - if( tries > 10 ) return NULL; //long enough for writer to finish 1.300 - } 1.301 - } 1.302 - 1.303 - 1.304 -void writeSRSWQ( void * in, SRSWQueueStruc* Q ) 1.305 - { 1.306 - int tries = 0; 1.307 - 1.308 - while( TRUE ) 1.309 - { 1.310 - if( Q->extractPos - Q->insertPos != 1 && 1.311 - !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData)) 1.312 - { *(Q->insertPos) = in; //insert before move 1.313 - if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData; 1.314 - else Q->insertPos++; 1.315 - return; 1.316 - } 1.317 - //Q is full 1.318 - tries++; 1.319 - if( tries > SPINLOCK_TRIES ) pthread_yield(); 1.320 - } 1.321 - } 1.322 - 1.323 - 1.324 - 1.325 -//=========================================================================== 1.326 -//Single reader Multiple writer super fast Q.. no atomic instrs.. 1.327 - 1.328 - 1.329 -/*This is a blocking queue, but it uses no atomic instructions, just does 1.330 - * yield() when empty or full 1.331 - * 1.332 - *It doesn't need any atomic instructions because only a single thread 1.333 - * extracts and only a single thread inserts, and it has no locations that 1.334 - * are written by both. It writes before moving and moves before reading, 1.335 - * and never lets write position and read position be the same, so dis- 1.336 - * synchrony can only ever cause an unnecessary call to yield(), never a 1.337 - * wrong value (by monotonicity of movement of pointers, plus single writer 1.338 - * to pointers, plus sequence of write before change pointer, plus 1.339 - * assumptions that if thread A semantically writes X before Y, then thread 1.340 - * B will see the writes in that order.) 1.341 - * 1.342 - *The multi-writer version is implemented as a hierarchy. Each writer has 1.343 - * its own single-reader single-writer queue. The reader simply does a 1.344 - * round-robin harvesting from them. 1.345 - * 1.346 - *A writer must first register itself with the queue, and receives an ID back 1.347 - * It then uses that ID on each write operation. 1.348 - * 1.349 - *The implementation is: 1.350 - *Physically: 1.351 - * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s 1.352 - * -] it also has read-pointer to the last queue a write was taken from. 1.353 - * 1.354 - *Action-Patterns: 1.355 - * -] To add a writer 1.356 - * --]] writer-thread calls addWriterToQ(), remember the ID it returns 1.357 - * --]] internally addWriterToQ does: 1.358 - * ---]]] if needs more room, makes a larger writer-array 1.359 - * ---]]] copies the old writer-array into the new 1.360 - * ---]]] makes a new SRSW queue an puts it into the array 1.361 - * ---]]] returns the index to the new SRSW queue as the ID 1.362 - * -] To write 1.363 - * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID 1.364 - * --]] this call may block, via repeated yield() calls 1.365 - * --]] internally, writeSRMWQ does: 1.366 - * ---]]] uses the writerID as index to get the SRSW queue for that writer 1.367 - * ---]]] performs writeQ on that queue (may block via repeated yield calls) 1.368 - * -] To Read 1.369 - * --]] reader calls readSRMWQ, passing the Q struc 1.370 - * --]] this call may block, via repeated yield() calls 1.371 - * --]] internally, readSRMWQ does: 1.372 - * ---]]] gets saved index of last SRSW queue read from 1.373 - * ---]]] increments index and gets indexed queue 1.374 - * ---]]] does a non-blocking read of that queue 1.375 - * ---]]] if gets something, saves index and returns that value 1.376 - * ---]]] if gets null, then goes to next queue 1.377 - * ---]]] if got null from all the queues then does yield() then tries again 1.378 - * 1.379 - *Note: "0" is used as the value null, so SRSW queues must only contain 1.380 - * pointers, and cannot use 0 as a valid pointer value. 1.381 - * 1.382 - */ 1.383 - 1.384 -SRMWQueueStruc* makeSRMWQ() 1.385 - { SRMWQueueStruc* retQ; 1.386 - 1.387 - retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) ); 1.388 - 1.389 - retQ->numInternalQs = 0; 1.390 - retQ->internalQsSz = 10; 1.391 - retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *)); 1.392 - 1.393 - retQ->lastQReadFrom = 0; 1.394 - 1.395 - return retQ; 1.396 - } 1.397 - 1.398 -/* ---]]] if needs more room, makes a larger writer-array 1.399 - * ---]]] copies the old writer-array into the new 1.400 - * ---]]] makes a new SRSW queue an puts it into the array 1.401 - * ---]]] returns the index to the new SRSW queue as the ID 1.402 - * 1.403 - *NOTE: assuming all adds are completed before any writes or reads are 1.404 - * performed.. otherwise, this needs to be re-done carefully, probably with 1.405 - * a lock. 1.406 - */ 1.407 -int addWriterToSRMWQ( SRMWQueueStruc* Q ) 1.408 - { int oldSz, i; 1.409 - SRSWQueueStruc * *oldArray; 1.410 - 1.411 - (Q->numInternalQs)++; 1.412 - if( Q->numInternalQs >= Q->internalQsSz ) 1.413 - { //full, so make bigger 1.414 - oldSz = Q->internalQsSz; 1.415 - oldArray = Q->internalQs; 1.416 - Q->internalQsSz *= 2; 1.417 - Q->internalQs = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *)); 1.418 - for( i = 0; i < oldSz; i++ ) 1.419 - { Q->internalQs[i] = oldArray[i]; 1.420 - } 1.421 - free( oldArray ); 1.422 - } 1.423 - Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ(); 1.424 - return Q->numInternalQs - 1; 1.425 - } 1.426 - 1.427 - 1.428 -/* ---]]] gets saved index of last SRSW queue read-from 1.429 - * ---]]] increments index and gets indexed queue 1.430 - * ---]]] does a non-blocking read of that queue 1.431 - * ---]]] if gets something, saves index and returns that value 1.432 - * ---]]] if gets null, then goes to next queue 1.433 - * ---]]] if got null from all the queues then does yield() then tries again 1.434 - */ 1.435 -void* readSRMWQ( SRMWQueueStruc* Q ) 1.436 - { SRSWQueueStruc *readQ; 1.437 - void *readValue = 0; 1.438 - int tries = 0; 1.439 - int QToReadFrom = 0; 1.440 - 1.441 - QToReadFrom = Q->lastQReadFrom; 1.442 - 1.443 - while( TRUE ) 1.444 - { QToReadFrom++; 1.445 - if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0; 1.446 - readQ = Q->internalQs[ QToReadFrom ]; 1.447 - readValue = readSRSWQ_NonBlocking( readQ ); 1.448 - 1.449 - if( readValue != 0 ) //got a value, return it 1.450 - { Q->lastQReadFrom = QToReadFrom; 1.451 - return readValue; 1.452 - } 1.453 - else //SRSW Q just read is empty 1.454 - { //check if all queues have been tried 1.455 - if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty 1.456 - { tries++; //give a writer a chance to finish before yield 1.457 - if( tries > SPINLOCK_TRIES ) pthread_yield(); 1.458 - } 1.459 - } 1.460 - } 1.461 - } 1.462 - 1.463 - 1.464 -/* 1.465 - * ---]]] uses the writerID as index to get the SRSW queue for that writer 1.466 - * ---]]] performs writeQ on that queue (may block via repeated yield calls) 1.467 - */ 1.468 -void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID ) 1.469 - { 1.470 - if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error 1.471 - 1.472 - writeSRSWQ( in, Q->internalQs[ writerID ] ); 1.473 - } 1.474 +/* 1.475 + * Copyright 2009 OpenSourceStewardshipFoundation.org 1.476 + * Licensed under GNU General Public License version 2 1.477 + * 1.478 + * Author: seanhalle@yahoo.com 1.479 + */ 1.480 + 1.481 + 1.482 +#include <stdio.h> 1.483 +#include <errno.h> 1.484 +#include <pthread.h> 1.485 +#include <stdlib.h> 1.486 +#include <sched.h> 1.487 + 1.488 +#include "BlockingQueue.h" 1.489 + 1.490 +#define INC(x) (++x == 1024) ? (x) = 0 : (x) 1.491 + 1.492 +#define SPINLOCK_TRIES 100000 1.493 + 1.494 +//=========================================================================== 1.495 +//Normal pthread Q 1.496 + 1.497 +PThdQueueStruc* makePThdQ() 1.498 + { 1.499 + PThdQueueStruc* retQ; 1.500 + int retCode; 1.501 + retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) ); 1.502 + 1.503 + 1.504 + retCode = 1.505 + pthread_mutex_init( &retQ->mutex_t, NULL); 1.506 + if(retCode){perror("Error in creating mutex:"); exit(1);} 1.507 + 1.508 + retCode = pthread_cond_init ( &retQ->cond_w_t, NULL); 1.509 + if(retCode){perror("Error in creating cond_var:"); exit(1);} 1.510 + 1.511 + retCode = pthread_cond_init ( &retQ->cond_r_t, NULL); 1.512 + if(retCode){perror("Error in creating cond_var:"); exit(1);} 1.513 + 1.514 + retQ->count = 0; 1.515 + retQ->readPos = 0; 1.516 + retQ->writePos = 0; 1.517 + retQ->w_empty = 0; 1.518 + retQ->w_full = 0; 1.519 + 1.520 + return retQ; 1.521 + } 1.522 + 1.523 +void * readPThdQ( PThdQueueStruc *Q ) 1.524 + { void *ret; 1.525 + int retCode, wt; 1.526 + pthread_mutex_lock( &Q->mutex_t ); 1.527 + { 1.528 + while( Q -> count == 0 ) 1.529 + { Q -> w_empty = 1; 1.530 + retCode = 1.531 + pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); 1.532 + if( retCode ){ perror("Thread wait error: "); exit(1); } 1.533 + } 1.534 + Q -> w_empty = 0; 1.535 + Q -> count -= 1; 1.536 + ret = Q->data[ Q->readPos ]; 1.537 + INC( Q->readPos ); 1.538 + wt = Q -> w_full; 1.539 + Q -> w_full = 0; 1.540 + } 1.541 + pthread_mutex_unlock( &Q->mutex_t ); 1.542 + if (wt) 1.543 + pthread_cond_signal( &Q->cond_w_t ); 1.544 + 1.545 + //printf("Q out: %d\n", ret); 1.546 + return( ret ); 1.547 + } 1.548 + 1.549 +void writePThdQ( void * in, PThdQueueStruc* Q ) 1.550 + { 1.551 + int status, wt; 1.552 + //printf("Q in: %d\n", in); 1.553 + 1.554 + pthread_mutex_lock( &Q->mutex_t ); 1.555 + { 1.556 + while( Q->count >= 1024 ) 1.557 + { 1.558 + Q -> w_full = 1; 1.559 + status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); 1.560 + if (status != 0) 1.561 + { perror("Thread wait error: "); 1.562 + exit(1); 1.563 + } 1.564 + } 1.565 + 1.566 + Q -> w_full = 0; 1.567 + Q->count += 1; 1.568 + Q->data[ Q->writePos ] = in; 1.569 + INC( Q->writePos ); 1.570 + wt = Q -> w_empty; 1.571 + Q -> w_empty = 0; 1.572 + } 1.573 + 1.574 + pthread_mutex_unlock( &Q->mutex_t ); 1.575 + if( wt ) pthread_cond_signal( &Q->cond_r_t ); 1.576 + } 1.577 + 1.578 + 1.579 +//=========================================================================== 1.580 +// multi reader multi writer fast Q via CAS 1.581 +#ifndef _GNU_SOURCE 1.582 +#define _GNU_SOURCE 1.583 + 1.584 +/*This is a blocking queue, but it uses CAS instr plus yield() when empty 1.585 + * or full 1.586 + *It uses CAS because it's meant to have more than one reader and more than 1.587 + * one writer. 1.588 + */ 1.589 + 1.590 +CASQueueStruc* makeCASQ() 1.591 + { 1.592 + CASQueueStruc* retQ; 1.593 + retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) ); 1.594 + 1.595 + retQ->insertLock = UNLOCKED; 1.596 + retQ->extractLock= UNLOCKED; 1.597 + //TODO: check got pointer syntax right 1.598 + retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty 1.599 + retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be 1.600 + retQ->endOfData = &(retQ->startOfData[1023]); 1.601 + 1.602 + return retQ; 1.603 + } 1.604 + 1.605 + 1.606 +void* readCASQ( CASQueueStruc* Q ) 1.607 + { void *out = 0; 1.608 + int tries = 0; 1.609 + void **startOfData = Q->startOfData; 1.610 + void **endOfData = Q->endOfData; 1.611 + 1.612 + int gotLock = FALSE; 1.613 + 1.614 + while( TRUE ) 1.615 + { //this intrinsic returns true if the lock held "UNLOCKED", in which 1.616 + // case it now holds "LOCKED" -- if it already held "LOCKED", then 1.617 + // gotLock is FALSE 1.618 + gotLock = 1.619 + __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); 1.620 + //NOTE: checked assy, and it does lock correctly.. 1.621 + if( gotLock ) 1.622 + { 1.623 + void **insertPos = Q->insertPos; 1.624 + void **extractPos = Q->extractPos; 1.625 + 1.626 + //if not empty -- extract just below insert when empty 1.627 + if( insertPos - extractPos != 1 && 1.628 + !(extractPos == endOfData && insertPos == startOfData)) 1.629 + { //move before read 1.630 + if( extractPos == endOfData ) //write new pos exactly once, correctly 1.631 + { Q->extractPos = startOfData; //can't overrun then fix it 'cause 1.632 + } // other thread might read bad pos 1.633 + else 1.634 + { Q->extractPos++; 1.635 + } 1.636 + out = *(Q->extractPos); 1.637 + Q->extractLock = UNLOCKED; 1.638 + return out; 1.639 + } 1.640 + else //Q is empty 1.641 + { Q->extractLock = UNLOCKED;//empty, so release lock for others 1.642 + } 1.643 + } 1.644 + //Q is busy or empty 1.645 + tries++; 1.646 + if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable 1.647 + } 1.648 + } 1.649 + 1.650 +void writeCASQ( void * in, CASQueueStruc* Q ) 1.651 + { 1.652 + int tries = 0; 1.653 + //TODO: need to make Q volatile? Want to do this Q in assembly! 1.654 + //Have no idea what GCC's going to do to this code 1.655 + void **startOfData = Q->startOfData; 1.656 + void **endOfData = Q->endOfData; 1.657 + 1.658 + int gotLock = FALSE; 1.659 + 1.660 + while( TRUE ) 1.661 + { //this intrinsic returns true if the lock held "UNLOCKED", in which 1.662 + // case it now holds "LOCKED" -- if it already held "LOCKED", then 1.663 + // gotLock is FALSE 1.664 + gotLock = 1.665 + __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); 1.666 + if( gotLock ) 1.667 + { 1.668 + void **insertPos = Q->insertPos; 1.669 + void **extractPos = Q->extractPos; 1.670 + 1.671 + //check if room to insert.. can't use a count variable 1.672 + // 'cause both insertor Thd and extractor Thd would write it 1.673 + if( extractPos - insertPos != 1 && 1.674 + !(insertPos == endOfData && extractPos == startOfData)) 1.675 + { *(Q->insertPos) = in; //insert before move 1.676 + if( insertPos == endOfData ) 1.677 + { Q->insertPos = startOfData; 1.678 + } 1.679 + else 1.680 + { Q->insertPos++; 1.681 + } 1.682 + Q->insertLock = UNLOCKED; 1.683 + return; 1.684 + } 1.685 + else //Q is full 1.686 + { Q->insertLock = UNLOCKED;//full, so release lock for others 1.687 + } 1.688 + } 1.689 + tries++; 1.690 + if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable 1.691 + } 1.692 + } 1.693 + 1.694 +#endif //_GNU_SOURCE 1.695 + 1.696 + 1.697 +//=========================================================================== 1.698 +//Single reader single writer super fast Q.. no atomic instrs.. 1.699 + 1.700 + 1.701 +/*This is a blocking queue, but it uses no atomic instructions, just does 1.702 + * yield() when empty or full 1.703 + * 1.704 + *It doesn't need any atomic instructions because only a single thread 1.705 + * extracts and only a single thread inserts, and it has no locations that 1.706 + * are written by both. It writes before moving and moves before reading, 1.707 + * and never lets write position and read position be the same, so dis- 1.708 + * synchrony can only ever cause an unnecessary call to yield(), never a 1.709 + * wrong value (by monotonicity of movement of pointers, plus single writer 1.710 + * to pointers, plus sequence of write before change pointer, plus 1.711 + * assumptions that if thread A semantically writes X before Y, then thread 1.712 + * B will see the writes in that order.) 1.713 + */ 1.714 + 1.715 +SRSWQueueStruc* makeSRSWQ() 1.716 + { 1.717 + SRSWQueueStruc* retQ; 1.718 + retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) ); 1.719 + memset( retQ->startOfData, 0, 1024 * sizeof(void *) ); 1.720 + 1.721 + retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty 1.722 + retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be 1.723 + retQ->endOfData = &(retQ->startOfData[1023]); 1.724 + 1.725 + return retQ; 1.726 + } 1.727 + 1.728 +void 1.729 +freeSRSWQ( SRSWQueueStruc* Q ) 1.730 + { 1.731 + free( Q ); 1.732 + } 1.733 + 1.734 +void* readSRSWQ( SRSWQueueStruc* Q ) 1.735 + { void *out = 0; 1.736 + int tries = 0; 1.737 + 1.738 + while( TRUE ) 1.739 + { 1.740 + if( Q->insertPos - Q->extractPos != 1 && 1.741 + !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) 1.742 + { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData; 1.743 + else Q->extractPos++; //move before read 1.744 + out = *(Q->extractPos); 1.745 + return out; 1.746 + } 1.747 + //Q is empty 1.748 + tries++; 1.749 + if( tries > SPINLOCK_TRIES ) pthread_yield(); 1.750 + } 1.751 + } 1.752 + 1.753 + 1.754 +void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q ) 1.755 + { void *out = 0; 1.756 + int tries = 0; 1.757 + 1.758 + while( TRUE ) 1.759 + { 1.760 + if( Q->insertPos - Q->extractPos != 1 && 1.761 + !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) 1.762 + { Q->extractPos++; //move before read 1.763 + if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData; 1.764 + out = *(Q->extractPos); 1.765 + return out; 1.766 + } 1.767 + //Q is empty 1.768 + tries++; 1.769 + if( tries > 10 ) return NULL; //long enough for writer to finish 1.770 + } 1.771 + } 1.772 + 1.773 + 1.774 +void writeSRSWQ( void * in, SRSWQueueStruc* Q ) 1.775 + { 1.776 + int tries = 0; 1.777 + 1.778 + while( TRUE ) 1.779 + { 1.780 + if( Q->extractPos - Q->insertPos != 1 && 1.781 + !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData)) 1.782 + { *(Q->insertPos) = in; //insert before move 1.783 + if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData; 1.784 + else Q->insertPos++; 1.785 + return; 1.786 + } 1.787 + //Q is full 1.788 + tries++; 1.789 + if( tries > SPINLOCK_TRIES ) pthread_yield(); 1.790 + } 1.791 + } 1.792 + 1.793 + 1.794 + 1.795 +//=========================================================================== 1.796 +//Single reader Multiple writer super fast Q.. no atomic instrs.. 1.797 + 1.798 + 1.799 +/*This is a blocking queue, but it uses no atomic instructions, just does 1.800 + * yield() when empty or full 1.801 + * 1.802 + *It doesn't need any atomic instructions because only a single thread 1.803 + * extracts and only a single thread inserts, and it has no locations that 1.804 + * are written by both. It writes before moving and moves before reading, 1.805 + * and never lets write position and read position be the same, so dis- 1.806 + * synchrony can only ever cause an unnecessary call to yield(), never a 1.807 + * wrong value (by monotonicity of movement of pointers, plus single writer 1.808 + * to pointers, plus sequence of write before change pointer, plus 1.809 + * assumptions that if thread A semantically writes X before Y, then thread 1.810 + * B will see the writes in that order.) 1.811 + * 1.812 + *The multi-writer version is implemented as a hierarchy. Each writer has 1.813 + * its own single-reader single-writer queue. The reader simply does a 1.814 + * round-robin harvesting from them. 1.815 + * 1.816 + *A writer must first register itself with the queue, and receives an ID back 1.817 + * It then uses that ID on each write operation. 1.818 + * 1.819 + *The implementation is: 1.820 + *Physically: 1.821 + * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s 1.822 + * -] it also has read-pointer to the last queue a write was taken from. 1.823 + * 1.824 + *Action-Patterns: 1.825 + * -] To add a writer 1.826 + * --]] writer-thread calls addWriterToQ(), remember the ID it returns 1.827 + * --]] internally addWriterToQ does: 1.828 + * ---]]] if needs more room, makes a larger writer-array 1.829 + * ---]]] copies the old writer-array into the new 1.830 + * ---]]] makes a new SRSW queue an puts it into the array 1.831 + * ---]]] returns the index to the new SRSW queue as the ID 1.832 + * -] To write 1.833 + * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID 1.834 + * --]] this call may block, via repeated yield() calls 1.835 + * --]] internally, writeSRMWQ does: 1.836 + * ---]]] uses the writerID as index to get the SRSW queue for that writer 1.837 + * ---]]] performs writeQ on that queue (may block via repeated yield calls) 1.838 + * -] To Read 1.839 + * --]] reader calls readSRMWQ, passing the Q struc 1.840 + * --]] this call may block, via repeated yield() calls 1.841 + * --]] internally, readSRMWQ does: 1.842 + * ---]]] gets saved index of last SRSW queue read from 1.843 + * ---]]] increments index and gets indexed queue 1.844 + * ---]]] does a non-blocking read of that queue 1.845 + * ---]]] if gets something, saves index and returns that value 1.846 + * ---]]] if gets null, then goes to next queue 1.847 + * ---]]] if got null from all the queues then does yield() then tries again 1.848 + * 1.849 + *Note: "0" is used as the value null, so SRSW queues must only contain 1.850 + * pointers, and cannot use 0 as a valid pointer value. 1.851 + * 1.852 + */ 1.853 + 1.854 +SRMWQueueStruc* makeSRMWQ() 1.855 + { SRMWQueueStruc* retQ; 1.856 + 1.857 + retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) ); 1.858 + 1.859 + retQ->numInternalQs = 0; 1.860 + retQ->internalQsSz = 10; 1.861 + retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *)); 1.862 + 1.863 + retQ->lastQReadFrom = 0; 1.864 + 1.865 + return retQ; 1.866 + } 1.867 + 1.868 +/* ---]]] if needs more room, makes a larger writer-array 1.869 + * ---]]] copies the old writer-array into the new 1.870 + * ---]]] makes a new SRSW queue an puts it into the array 1.871 + * ---]]] returns the index to the new SRSW queue as the ID 1.872 + * 1.873 + *NOTE: assuming all adds are completed before any writes or reads are 1.874 + * performed.. otherwise, this needs to be re-done carefully, probably with 1.875 + * a lock. 1.876 + */ 1.877 +int addWriterToSRMWQ( SRMWQueueStruc* Q ) 1.878 + { int oldSz, i; 1.879 + SRSWQueueStruc * *oldArray; 1.880 + 1.881 + (Q->numInternalQs)++; 1.882 + if( Q->numInternalQs >= Q->internalQsSz ) 1.883 + { //full, so make bigger 1.884 + oldSz = Q->internalQsSz; 1.885 + oldArray = Q->internalQs; 1.886 + Q->internalQsSz *= 2; 1.887 + Q->internalQs = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *)); 1.888 + for( i = 0; i < oldSz; i++ ) 1.889 + { Q->internalQs[i] = oldArray[i]; 1.890 + } 1.891 + free( oldArray ); 1.892 + } 1.893 + Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ(); 1.894 + return Q->numInternalQs - 1; 1.895 + } 1.896 + 1.897 + 1.898 +/* ---]]] gets saved index of last SRSW queue read-from 1.899 + * ---]]] increments index and gets indexed queue 1.900 + * ---]]] does a non-blocking read of that queue 1.901 + * ---]]] if gets something, saves index and returns that value 1.902 + * ---]]] if gets null, then goes to next queue 1.903 + * ---]]] if got null from all the queues then does yield() then tries again 1.904 + */ 1.905 +void* readSRMWQ( SRMWQueueStruc* Q ) 1.906 + { SRSWQueueStruc *readQ; 1.907 + void *readValue = 0; 1.908 + int tries = 0; 1.909 + int QToReadFrom = 0; 1.910 + 1.911 + QToReadFrom = Q->lastQReadFrom; 1.912 + 1.913 + while( TRUE ) 1.914 + { QToReadFrom++; 1.915 + if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0; 1.916 + readQ = Q->internalQs[ QToReadFrom ]; 1.917 + readValue = readSRSWQ_NonBlocking( readQ ); 1.918 + 1.919 + if( readValue != 0 ) //got a value, return it 1.920 + { Q->lastQReadFrom = QToReadFrom; 1.921 + return readValue; 1.922 + } 1.923 + else //SRSW Q just read is empty 1.924 + { //check if all queues have been tried 1.925 + if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty 1.926 + { tries++; //give a writer a chance to finish before yield 1.927 + if( tries > SPINLOCK_TRIES ) pthread_yield(); 1.928 + } 1.929 + } 1.930 + } 1.931 + } 1.932 + 1.933 + 1.934 +/* 1.935 + * ---]]] uses the writerID as index to get the SRSW queue for that writer 1.936 + * ---]]] performs writeQ on that queue (may block via repeated yield calls) 1.937 + */ 1.938 +void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID ) 1.939 + { 1.940 + if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error 1.941 + 1.942 + writeSRSWQ( in, Q->internalQs[ writerID ] ); 1.943 + }
2.1 --- a/PrivateQueue.c Thu Nov 04 17:50:29 2010 -0700 2.2 +++ b/PrivateQueue.c Tue Feb 07 12:51:29 2012 -0800 2.3 @@ -1,141 +1,141 @@ 2.4 -/* 2.5 - * Copyright 2009 OpenSourceStewardshipFoundation.org 2.6 - * Licensed under GNU General Public License version 2 2.7 - * 2.8 - * NOTE: this version of SRSW correct as of April 25, 2010 2.9 - * 2.10 - * Author: seanhalle@yahoo.com 2.11 - */ 2.12 - 2.13 - 2.14 -#include <stdio.h> 2.15 -#include <string.h> 2.16 -#include <errno.h> 2.17 -#include <stdlib.h> 2.18 - 2.19 -#include "PrivateQueue.h" 2.20 - 2.21 - 2.22 - 2.23 -//=========================================================================== 2.24 - 2.25 -/*This kind of queue is private to a single core at a time -- has no 2.26 - * synchronizations 2.27 - */ 2.28 - 2.29 -PrivQueueStruc* makePrivQ() 2.30 - { 2.31 - PrivQueueStruc* retQ; 2.32 - retQ = (PrivQueueStruc *) malloc( sizeof( PrivQueueStruc ) ); 2.33 - 2.34 - retQ->startOfData = malloc( 1024 * sizeof(void *) ); 2.35 - memset( retQ->startOfData, 0, 1024 * sizeof(void *) ); 2.36 - retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty 2.37 - retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be 2.38 - retQ->endOfData = &(retQ->startOfData[1023]); 2.39 - 2.40 - return retQ; 2.41 - } 2.42 - 2.43 - 2.44 -void 2.45 -enlargePrivQ( PrivQueueStruc *Q ) 2.46 - { int oldSize, newSize; 2.47 - void **oldStartOfData; 2.48 - 2.49 - oldSize = Q->endOfData - Q->startOfData; 2.50 - newSize = 2 * oldSize; 2.51 - oldStartOfData = Q->startOfData; 2.52 - Q->startOfData = malloc( newSize * sizeof(void *) ); 2.53 - memcpy(Q->startOfData, oldStartOfData, oldSize * sizeof(void *)); 2.54 - free(oldStartOfData); 2.55 - 2.56 - Q->extractPos = &(Q->startOfData[0]); //side by side == empty 2.57 - Q->insertPos = &(Q->startOfData[1]); // so start pos's have to be 2.58 - Q->endOfData = &(Q->startOfData[newSize - 1]); 2.59 - } 2.60 - 2.61 - 2.62 -/*Returns NULL when queue is empty 2.63 - */ 2.64 -void* readPrivQ( PrivQueueStruc* Q ) 2.65 - { void *out = 0; 2.66 - void **startOfData = Q->startOfData; 2.67 - void **endOfData = Q->endOfData; 2.68 - 2.69 - void **insertPos = Q->insertPos; 2.70 - void **extractPos = Q->extractPos; 2.71 - 2.72 - //if not empty -- (extract is just below insert when empty) 2.73 - if( insertPos - extractPos != 1 && 2.74 - !(extractPos == endOfData && insertPos == startOfData)) 2.75 - { //move before read 2.76 - if( extractPos == endOfData ) //write new pos exactly once, correctly 2.77 - { Q->extractPos = startOfData; //can't overrun then fix it 'cause 2.78 - } // other thread might read bad pos 2.79 - else 2.80 - { Q->extractPos++; 2.81 - } 2.82 - out = *(Q->extractPos); 2.83 - return out; 2.84 - } 2.85 - //Q is empty 2.86 - return NULL; 2.87 - } 2.88 - 2.89 - 2.90 -/*Expands the queue size automatically when it's full 2.91 - */ 2.92 -void 2.93 -writePrivQ( void * in, PrivQueueStruc* Q ) 2.94 - { 2.95 - void **startOfData = Q->startOfData; 2.96 - void **endOfData = Q->endOfData; 2.97 - 2.98 - void **insertPos = Q->insertPos; 2.99 - void **extractPos = Q->extractPos; 2.100 - 2.101 -tryAgain: 2.102 - //Full? (insert is just below extract when full) 2.103 - if( extractPos - insertPos != 1 && 2.104 - !(insertPos == endOfData && extractPos == startOfData)) 2.105 - { *(Q->insertPos) = in; //insert before move 2.106 - if( insertPos == endOfData ) //write new pos exactly once, correctly 2.107 - { Q->insertPos = startOfData; 2.108 - } 2.109 - else 2.110 - { Q->insertPos++; 2.111 - } 2.112 - return; 2.113 - } 2.114 - //Q is full 2.115 - enlargePrivQ( Q ); 2.116 - goto tryAgain; 2.117 - } 2.118 - 2.119 - 2.120 -/*Returns false when the queue was full. 2.121 - * have option of calling make_larger_PrivQ to make more room, then try again 2.122 - */ 2.123 -int writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ) 2.124 - { 2.125 - void **startOfData = Q->startOfData; 2.126 - void **endOfData = Q->endOfData; 2.127 - 2.128 - void **insertPos = Q->insertPos; 2.129 - void **extractPos = Q->extractPos; 2.130 - 2.131 - if( extractPos - insertPos != 1 && 2.132 - !(insertPos == endOfData && extractPos == startOfData)) 2.133 - { *(Q->insertPos) = in; //insert before move 2.134 - if( insertPos == endOfData ) //write new pos exactly once, correctly 2.135 - { Q->insertPos = startOfData; 2.136 - } 2.137 - else 2.138 - { Q->insertPos++; 2.139 - } 2.140 - return TRUE; 2.141 - } 2.142 - //Q is full 2.143 - return FALSE; 2.144 - } 2.145 +/* 2.146 + * Copyright 2009 OpenSourceStewardshipFoundation.org 2.147 + * Licensed under GNU General Public License version 2 2.148 + * 2.149 + * NOTE: this version of SRSW correct as of April 25, 2010 2.150 + * 2.151 + * Author: seanhalle@yahoo.com 2.152 + */ 2.153 + 2.154 + 2.155 +#include <stdio.h> 2.156 +#include <string.h> 2.157 +#include <errno.h> 2.158 +#include <stdlib.h> 2.159 + 2.160 +#include "PrivateQueue.h" 2.161 + 2.162 + 2.163 + 2.164 +//=========================================================================== 2.165 + 2.166 +/*This kind of queue is private to a single core at a time -- has no 2.167 + * synchronizations 2.168 + */ 2.169 + 2.170 +PrivQueueStruc* makePrivQ() 2.171 + { 2.172 + PrivQueueStruc* retQ; 2.173 + retQ = (PrivQueueStruc *) malloc( sizeof( PrivQueueStruc ) ); 2.174 + 2.175 + retQ->startOfData = malloc( 1024 * sizeof(void *) ); 2.176 + memset( retQ->startOfData, 0, 1024 * sizeof(void *) ); 2.177 + retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty 2.178 + retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be 2.179 + retQ->endOfData = &(retQ->startOfData[1023]); 2.180 + 2.181 + return retQ; 2.182 + } 2.183 + 2.184 + 2.185 +void 2.186 +enlargePrivQ( PrivQueueStruc *Q ) 2.187 + { int oldSize, newSize; 2.188 + void **oldStartOfData; 2.189 + 2.190 + oldSize = Q->endOfData - Q->startOfData; 2.191 + newSize = 2 * oldSize; 2.192 + oldStartOfData = Q->startOfData; 2.193 + Q->startOfData = malloc( newSize * sizeof(void *) ); 2.194 + memcpy(Q->startOfData, oldStartOfData, oldSize * sizeof(void *)); 2.195 + free(oldStartOfData); 2.196 + 2.197 + Q->extractPos = &(Q->startOfData[0]); //side by side == empty 2.198 + Q->insertPos = &(Q->startOfData[1]); // so start pos's have to be 2.199 + Q->endOfData = &(Q->startOfData[newSize - 1]); 2.200 + } 2.201 + 2.202 + 2.203 +/*Returns NULL when queue is empty 2.204 + */ 2.205 +void* readPrivQ( PrivQueueStruc* Q ) 2.206 + { void *out = 0; 2.207 + void **startOfData = Q->startOfData; 2.208 + void **endOfData = Q->endOfData; 2.209 + 2.210 + void **insertPos = Q->insertPos; 2.211 + void **extractPos = Q->extractPos; 2.212 + 2.213 + //if not empty -- (extract is just below insert when empty) 2.214 + if( insertPos - extractPos != 1 && 2.215 + !(extractPos == endOfData && insertPos == startOfData)) 2.216 + { //move before read 2.217 + if( extractPos == endOfData ) //write new pos exactly once, correctly 2.218 + { Q->extractPos = startOfData; //can't overrun then fix it 'cause 2.219 + } // other thread might read bad pos 2.220 + else 2.221 + { Q->extractPos++; 2.222 + } 2.223 + out = *(Q->extractPos); 2.224 + return out; 2.225 + } 2.226 + //Q is empty 2.227 + return NULL; 2.228 + } 2.229 + 2.230 + 2.231 +/*Expands the queue size automatically when it's full 2.232 + */ 2.233 +void 2.234 +writePrivQ( void * in, PrivQueueStruc* Q ) 2.235 + { 2.236 + void **startOfData = Q->startOfData; 2.237 + void **endOfData = Q->endOfData; 2.238 + 2.239 + void **insertPos = Q->insertPos; 2.240 + void **extractPos = Q->extractPos; 2.241 + 2.242 +tryAgain: 2.243 + //Full? (insert is just below extract when full) 2.244 + if( extractPos - insertPos != 1 && 2.245 + !(insertPos == endOfData && extractPos == startOfData)) 2.246 + { *(Q->insertPos) = in; //insert before move 2.247 + if( insertPos == endOfData ) //write new pos exactly once, correctly 2.248 + { Q->insertPos = startOfData; 2.249 + } 2.250 + else 2.251 + { Q->insertPos++; 2.252 + } 2.253 + return; 2.254 + } 2.255 + //Q is full 2.256 + enlargePrivQ( Q ); 2.257 + goto tryAgain; 2.258 + } 2.259 + 2.260 + 2.261 +/*Returns false when the queue was full. 2.262 + * have option of calling make_larger_PrivQ to make more room, then try again 2.263 + */ 2.264 +int writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ) 2.265 + { 2.266 + void **startOfData = Q->startOfData; 2.267 + void **endOfData = Q->endOfData; 2.268 + 2.269 + void **insertPos = Q->insertPos; 2.270 + void **extractPos = Q->extractPos; 2.271 + 2.272 + if( extractPos - insertPos != 1 && 2.273 + !(insertPos == endOfData && extractPos == startOfData)) 2.274 + { *(Q->insertPos) = in; //insert before move 2.275 + if( insertPos == endOfData ) //write new pos exactly once, correctly 2.276 + { Q->insertPos = startOfData; 2.277 + } 2.278 + else 2.279 + { Q->insertPos++; 2.280 + } 2.281 + return TRUE; 2.282 + } 2.283 + //Q is full 2.284 + return FALSE; 2.285 + }
3.1 --- a/PrivateQueue.h Thu Nov 04 17:50:29 2010 -0700 3.2 +++ b/PrivateQueue.h Tue Feb 07 12:51:29 2012 -0800 3.3 @@ -1,37 +1,37 @@ 3.4 -/* 3.5 - * Copyright 2009 OpenSourceStewardshipFoundation.org 3.6 - * Licensed under GNU General Public License version 2 3.7 - * 3.8 - * Author: seanhalle@yahoo.com 3.9 - */ 3.10 - 3.11 -#ifndef _PRIVATE_QUEUE_H 3.12 -#define _PRIVATE_QUEUE_H 3.13 - 3.14 -#include <pthread.h> 3.15 - 3.16 -#define TRUE 1 3.17 -#define FALSE 0 3.18 - 3.19 -#define LOCKED 1 3.20 -#define UNLOCKED 0 3.21 - 3.22 - 3.23 -/* It is the data that is shared so only need one mutex. */ 3.24 -typedef struct 3.25 - { void **insertPos; 3.26 - void **extractPos; 3.27 - void **startOfData; //data is pointers 3.28 - void **endOfData; //set when alloc data 3.29 - } 3.30 -PrivQueueStruc; 3.31 - 3.32 - 3.33 -PrivQueueStruc* makePrivQ ( ); 3.34 -void* readPrivQ ( PrivQueueStruc *Q ); 3.35 -void writePrivQ( void *in, PrivQueueStruc *Q ); 3.36 -int writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ); //return 3.37 - // false when full 3.38 - 3.39 -#endif /* _PRIVATE_QUEUE_H */ 3.40 - 3.41 +/* 3.42 + * Copyright 2009 OpenSourceStewardshipFoundation.org 3.43 + * Licensed under GNU General Public License version 2 3.44 + * 3.45 + * Author: seanhalle@yahoo.com 3.46 + */ 3.47 + 3.48 +#ifndef _PRIVATE_QUEUE_H 3.49 +#define _PRIVATE_QUEUE_H 3.50 + 3.51 +#include <pthread.h> 3.52 + 3.53 +#define TRUE 1 3.54 +#define FALSE 0 3.55 + 3.56 +#define LOCKED 1 3.57 +#define UNLOCKED 0 3.58 + 3.59 + 3.60 +/* It is the data that is shared so only need one mutex. */ 3.61 +typedef struct 3.62 + { void **insertPos; 3.63 + void **extractPos; 3.64 + void **startOfData; //data is pointers 3.65 + void **endOfData; //set when alloc data 3.66 + } 3.67 +PrivQueueStruc; 3.68 + 3.69 + 3.70 +PrivQueueStruc* makePrivQ ( ); 3.71 +void* readPrivQ ( PrivQueueStruc *Q ); 3.72 +void writePrivQ( void *in, PrivQueueStruc *Q ); 3.73 +int writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ); //return 3.74 + // false when full 3.75 + 3.76 +#endif /* _PRIVATE_QUEUE_H */ 3.77 +
