changeset 19:1ed562d601d9

Newly created project repository -- commit sub-states
author Me@portablequad
date Tue, 07 Feb 2012 12:51:29 -0800
parents 53c614b781ce
children b5ae7fbb1f01 27b341a31a21
files BlockingQueue.c PrivateQueue.c PrivateQueue.h
diffstat 3 files changed, 648 insertions(+), 648 deletions(-) [+]
line diff
     1.1 --- a/BlockingQueue.c	Thu Nov 04 17:50:29 2010 -0700
     1.2 +++ b/BlockingQueue.c	Tue Feb 07 12:51:29 2012 -0800
     1.3 @@ -1,470 +1,470 @@
     1.4 -/*
     1.5 - *  Copyright 2009 OpenSourceStewardshipFoundation.org
     1.6 - *  Licensed under GNU General Public License version 2
     1.7 - *
     1.8 - * Author: seanhalle@yahoo.com
     1.9 - */
    1.10 -
    1.11 -
    1.12 -#include <stdio.h>
    1.13 -#include <errno.h>
    1.14 -#include <pthread.h>
    1.15 -#include <stdlib.h>
    1.16 -#include <sched.h>
    1.17 -
    1.18 -#include "BlockingQueue.h"
    1.19 -
    1.20 -#define INC(x) (++x == 1024) ? (x) = 0 : (x)
    1.21 -
    1.22 -#define SPINLOCK_TRIES 100000
    1.23 -
    1.24 -//===========================================================================
    1.25 -//Normal pthread Q
    1.26 -
    1.27 -PThdQueueStruc* makePThdQ()
    1.28 - {
    1.29 -   PThdQueueStruc* retQ;
    1.30 -   int retCode;
    1.31 -   retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) );
    1.32 -
    1.33 -
    1.34 -   retCode =
    1.35 -   pthread_mutex_init( &retQ->mutex_t,  NULL);
    1.36 -   if(retCode){perror("Error in creating mutex:"); exit(1);}
    1.37 -
    1.38 -   retCode = pthread_cond_init ( &retQ->cond_w_t, NULL);
    1.39 -   if(retCode){perror("Error in creating cond_var:"); exit(1);}
    1.40 -
    1.41 -   retCode = pthread_cond_init ( &retQ->cond_r_t, NULL);
    1.42 -   if(retCode){perror("Error in creating cond_var:"); exit(1);}
    1.43 -
    1.44 -   retQ->count    = 0;
    1.45 -   retQ->readPos  = 0;
    1.46 -   retQ->writePos = 0;
    1.47 -   retQ->w_empty  = 0;
    1.48 -   retQ->w_full   = 0;
    1.49 -
    1.50 -   return retQ;
    1.51 - }
    1.52 -
    1.53 -void * readPThdQ( PThdQueueStruc *Q )
    1.54 - { void *ret;
    1.55 -   int retCode, wt;
    1.56 -   pthread_mutex_lock( &Q->mutex_t );
    1.57 -    {
    1.58 -      while( Q -> count == 0 )
    1.59 -       { Q -> w_empty = 1;
    1.60 -         retCode =
    1.61 -         pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
    1.62 -         if( retCode ){ perror("Thread wait error: "); exit(1); }
    1.63 -       }
    1.64 -      Q -> w_empty = 0;
    1.65 -      Q -> count -= 1;
    1.66 -      ret = Q->data[ Q->readPos ];
    1.67 -      INC( Q->readPos );
    1.68 -      wt = Q -> w_full;
    1.69 -      Q -> w_full = 0;
    1.70 -    }
    1.71 -   pthread_mutex_unlock( &Q->mutex_t );
    1.72 -   if (wt)
    1.73 -      pthread_cond_signal( &Q->cond_w_t );
    1.74 -
    1.75 -         //printf("Q out: %d\n", ret);
    1.76 -   return( ret );
    1.77 - }
    1.78 -
    1.79 -void writePThdQ( void * in, PThdQueueStruc* Q )
    1.80 - {
    1.81 -   int status, wt;
    1.82 -         //printf("Q in: %d\n", in);
    1.83 -
    1.84 -   pthread_mutex_lock( &Q->mutex_t );
    1.85 -    {
    1.86 -      while( Q->count >= 1024 )
    1.87 -       {
    1.88 -         Q -> w_full = 1;
    1.89 -         status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t );
    1.90 -         if (status != 0)
    1.91 -          { perror("Thread wait error: ");
    1.92 -            exit(1);
    1.93 -          }
    1.94 -       }
    1.95 -
    1.96 -      Q -> w_full = 0;
    1.97 -      Q->count += 1;
    1.98 -      Q->data[ Q->writePos ] = in;
    1.99 -      INC( Q->writePos );
   1.100 -      wt = Q -> w_empty;
   1.101 -      Q -> w_empty = 0;
   1.102 -    }
   1.103 -
   1.104 -   pthread_mutex_unlock( &Q->mutex_t );
   1.105 -   if( wt )  pthread_cond_signal( &Q->cond_r_t );
   1.106 - }
   1.107 -
   1.108 -
   1.109 -//===========================================================================
   1.110 -// multi reader  multi writer fast Q   via CAS
   1.111 -#ifndef _GNU_SOURCE
   1.112 -#define _GNU_SOURCE
   1.113 -
   1.114 -/*This is a blocking queue, but it uses CAS instr plus yield() when empty
   1.115 - * or full
   1.116 - *It uses CAS because it's meant to have more than one reader and more than
   1.117 - * one writer.
   1.118 - */
   1.119 -
   1.120 -CASQueueStruc* makeCASQ()
   1.121 - {
   1.122 -   CASQueueStruc* retQ;
   1.123 -   retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) );
   1.124 -
   1.125 -   retQ->insertLock = UNLOCKED;
   1.126 -   retQ->extractLock= UNLOCKED;
   1.127 -   //TODO: check got pointer syntax right
   1.128 -   retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
   1.129 -   retQ->insertPos  = &(retQ->startOfData[1]); // so start pos's have to be
   1.130 -   retQ->endOfData  = &(retQ->startOfData[1023]);
   1.131 -
   1.132 -   return retQ;
   1.133 - }
   1.134 -
   1.135 -
   1.136 -void* readCASQ( CASQueueStruc* Q )
   1.137 - { void  *out    = 0;
   1.138 -   int    tries  = 0;
   1.139 -   void **startOfData = Q->startOfData;
   1.140 -   void **endOfData   = Q->endOfData;
   1.141 -
   1.142 -   int  gotLock = FALSE;
   1.143 -
   1.144 -   while( TRUE )
   1.145 -    {    //this intrinsic returns true if the lock held "UNLOCKED", in which
   1.146 -         // case it now holds "LOCKED" -- if it already held "LOCKED", then
   1.147 -         // gotLock is FALSE
   1.148 -      gotLock =
   1.149 -         __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
   1.150 -            //NOTE: checked assy, and it does lock correctly..
   1.151 -      if( gotLock )
   1.152 -       {
   1.153 -         void **insertPos  = Q->insertPos;
   1.154 -         void **extractPos = Q->extractPos;
   1.155 -
   1.156 -            //if not empty -- extract just below insert when empty
   1.157 -         if( insertPos - extractPos != 1 &&
   1.158 -             !(extractPos == endOfData && insertPos == startOfData))
   1.159 -          {    //move before read
   1.160 -            if( extractPos == endOfData ) //write new pos exactly once, correctly
   1.161 -             { Q->extractPos = startOfData; //can't overrun then fix it 'cause
   1.162 -             }                              // other thread might read bad pos
   1.163 -            else
   1.164 -             { Q->extractPos++;
   1.165 -             }
   1.166 -            out = *(Q->extractPos);
   1.167 -            Q->extractLock = UNLOCKED;
   1.168 -            return out;
   1.169 -          }
   1.170 -         else //Q is empty
   1.171 -          { Q->extractLock = UNLOCKED;//empty, so release lock for others
   1.172 -          }
   1.173 -       }
   1.174 -         //Q is busy or empty
   1.175 -      tries++;
   1.176 -      if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
   1.177 -    }
   1.178 - }
   1.179 -
   1.180 -void writeCASQ( void * in, CASQueueStruc* Q )
   1.181 - {
   1.182 -   int  tries   = 0;
   1.183 -      //TODO: need to make Q volatile?  Want to do this Q in assembly!
   1.184 -      //Have no idea what GCC's going to do to this code
   1.185 -   void **startOfData = Q->startOfData;
   1.186 -   void **endOfData   = Q->endOfData;
   1.187 -
   1.188 -   int  gotLock = FALSE;
   1.189 -
   1.190 -   while( TRUE )
   1.191 -    {    //this intrinsic returns true if the lock held "UNLOCKED", in which
   1.192 -         // case it now holds "LOCKED" -- if it already held "LOCKED", then
   1.193 -         // gotLock is FALSE
   1.194 -      gotLock =
   1.195 -         __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
   1.196 -      if( gotLock )
   1.197 -       {
   1.198 -         void **insertPos  = Q->insertPos;
   1.199 -         void **extractPos = Q->extractPos;
   1.200 -
   1.201 -            //check if room to insert.. can't use a count variable
   1.202 -            // 'cause both insertor Thd and extractor Thd would write it
   1.203 -         if( extractPos - insertPos != 1 &&
   1.204 -             !(insertPos == endOfData && extractPos == startOfData))
   1.205 -          { *(Q->insertPos) = in;   //insert before move
   1.206 -            if( insertPos == endOfData )
   1.207 -             { Q->insertPos = startOfData;
   1.208 -             }
   1.209 -            else
   1.210 -             { Q->insertPos++;
   1.211 -             }
   1.212 -            Q->insertLock = UNLOCKED;
   1.213 -            return;
   1.214 -          }
   1.215 -         else //Q is full
   1.216 -          { Q->insertLock = UNLOCKED;//full, so release lock for others
   1.217 -          }
   1.218 -       }
   1.219 -      tries++;
   1.220 -      if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
   1.221 -    }
   1.222 - }
   1.223 -
   1.224 -#endif  //_GNU_SOURCE
   1.225 -
   1.226 -
   1.227 -//===========================================================================
   1.228 -//Single reader single writer super fast Q.. no atomic instrs..
   1.229 -
   1.230 -
   1.231 -/*This is a blocking queue, but it uses no atomic instructions, just does
   1.232 - * yield() when empty or full
   1.233 - *
   1.234 - *It doesn't need any atomic instructions because only a single thread
   1.235 - * extracts and only a single thread inserts, and it has no locations that
   1.236 - * are written by both.  It writes before moving and moves before reading,
   1.237 - * and never lets write position and read position be the same, so dis-
   1.238 - * synchrony can only ever cause an unnecessary call to yield(), never a
   1.239 - * wrong value (by monotonicity of movement of pointers, plus single writer
   1.240 - * to pointers, plus sequence of write before change pointer, plus
   1.241 - * assumptions that if thread A semantically writes X before Y, then thread
   1.242 - * B will see the writes in that order.)
   1.243 - */
   1.244 -
   1.245 -SRSWQueueStruc* makeSRSWQ()
   1.246 - {
   1.247 -   SRSWQueueStruc* retQ;
   1.248 -   retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) );
   1.249 -   memset( retQ->startOfData, 0, 1024 * sizeof(void *) );
   1.250 -   
   1.251 -   retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
   1.252 -   retQ->insertPos  = &(retQ->startOfData[1]); // so start pos's have to be
   1.253 -   retQ->endOfData  = &(retQ->startOfData[1023]);
   1.254 -   
   1.255 -   return retQ;
   1.256 - }
   1.257 -
   1.258 -void
   1.259 -freeSRSWQ( SRSWQueueStruc* Q )
   1.260 - {
   1.261 -   free( Q );
   1.262 - }
   1.263 -
   1.264 -void* readSRSWQ( SRSWQueueStruc* Q )
   1.265 - { void *out    = 0;
   1.266 -   int  tries   = 0;
   1.267 -
   1.268 -   while( TRUE )
   1.269 -    {
   1.270 -      if( Q->insertPos - Q->extractPos != 1 &&
   1.271 -          !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
   1.272 -       { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
   1.273 -         else Q->extractPos++;           //move before read
   1.274 -         out = *(Q->extractPos);
   1.275 -         return out;
   1.276 -       }
   1.277 -         //Q is empty
   1.278 -      tries++;
   1.279 -      if( tries > SPINLOCK_TRIES ) pthread_yield();
   1.280 -    }
   1.281 - }
   1.282 -
   1.283 -
   1.284 -void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
   1.285 - { void *out    = 0;
   1.286 -   int  tries   = 0;
   1.287 -
   1.288 -   while( TRUE )
   1.289 -    {
   1.290 -      if( Q->insertPos - Q->extractPos != 1 &&
   1.291 -          !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
   1.292 -       { Q->extractPos++;           //move before read
   1.293 -         if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
   1.294 -         out = *(Q->extractPos);
   1.295 -         return out;
   1.296 -       }
   1.297 -         //Q is empty
   1.298 -      tries++;
   1.299 -      if( tries > 10 ) return NULL; //long enough for writer to finish
   1.300 -    }
   1.301 - }
   1.302 -
   1.303 -
   1.304 -void writeSRSWQ( void * in, SRSWQueueStruc* Q )
   1.305 - {
   1.306 -   int  tries   = 0;
   1.307 -   
   1.308 -   while( TRUE )
   1.309 -    {
   1.310 -      if( Q->extractPos - Q->insertPos != 1 &&
   1.311 -          !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
   1.312 -       { *(Q->insertPos) = in;   //insert before move
   1.313 -         if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
   1.314 -         else Q->insertPos++;
   1.315 -         return;
   1.316 -       }
   1.317 -         //Q is full
   1.318 -      tries++;
   1.319 -      if( tries > SPINLOCK_TRIES ) pthread_yield();
   1.320 -    }
   1.321 - }
   1.322 -
   1.323 -
   1.324 -
   1.325 -//===========================================================================
   1.326 -//Single reader Multiple writer super fast Q.. no atomic instrs..
   1.327 -
   1.328 -
   1.329 -/*This is a blocking queue, but it uses no atomic instructions, just does
   1.330 - * yield() when empty or full
   1.331 - *
   1.332 - *It doesn't need any atomic instructions because only a single thread
   1.333 - * extracts and only a single thread inserts, and it has no locations that
   1.334 - * are written by both.  It writes before moving and moves before reading,
   1.335 - * and never lets write position and read position be the same, so dis-
   1.336 - * synchrony can only ever cause an unnecessary call to yield(), never a
   1.337 - * wrong value (by monotonicity of movement of pointers, plus single writer
   1.338 - * to pointers, plus sequence of write before change pointer, plus
   1.339 - * assumptions that if thread A semantically writes X before Y, then thread
   1.340 - * B will see the writes in that order.)
   1.341 - *
   1.342 - *The multi-writer version is implemented as a hierarchy.  Each writer has
   1.343 - * its own single-reader single-writer queue.  The reader simply does a
   1.344 - * round-robin harvesting from them.
   1.345 - *
   1.346 - *A writer must first register itself with the queue, and receives an ID back
   1.347 - * It then uses that ID on each write operation.
   1.348 - *
   1.349 - *The implementation is:
   1.350 - *Physically:
   1.351 - * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
   1.352 - * -] it also has read-pointer to the last queue a write was taken from.
   1.353 - *
   1.354 - *Action-Patterns:
   1.355 - * -] To add a writer
   1.356 - * --]] writer-thread calls addWriterToQ(), remember the ID it returns
   1.357 - * --]] internally addWriterToQ does:
   1.358 - * ---]]] if needs more room, makes a larger writer-array
   1.359 - * ---]]] copies the old writer-array into the new
   1.360 - * ---]]] makes a new SRSW queue an puts it into the array
   1.361 - * ---]]] returns the index to the new SRSW queue as the ID
   1.362 - * -] To write
   1.363 - * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
   1.364 - * --]] this call may block, via repeated yield() calls
   1.365 - * --]] internally, writeSRMWQ does:
   1.366 - * ---]]] uses the writerID as index to get the SRSW queue for that writer
   1.367 - * ---]]] performs writeQ on that queue (may block via repeated yield calls)
   1.368 - * -] To Read
   1.369 - * --]] reader calls readSRMWQ, passing the Q struc
   1.370 - * --]] this call may block, via repeated yield() calls
   1.371 - * --]] internally, readSRMWQ does:
   1.372 - * ---]]] gets saved index of last SRSW queue read from
   1.373 - * ---]]] increments index and gets indexed queue
   1.374 - * ---]]] does a non-blocking read of that queue
   1.375 - * ---]]] if gets something, saves index and returns that value
   1.376 - * ---]]] if gets null, then goes to next queue
   1.377 - * ---]]] if got null from all the queues then does yield() then tries again
   1.378 - *
   1.379 - *Note: "0" is used as the value null, so SRSW queues must only contain
   1.380 - * pointers, and cannot use 0 as a valid pointer value.
   1.381 - *
   1.382 - */
   1.383 -
   1.384 -SRMWQueueStruc* makeSRMWQ()
   1.385 - { SRMWQueueStruc* retQ;
   1.386 -   
   1.387 -   retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) );
   1.388 -   
   1.389 -   retQ->numInternalQs = 0;
   1.390 -   retQ->internalQsSz  = 10;
   1.391 -   retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *));
   1.392 -   
   1.393 -   retQ->lastQReadFrom = 0;
   1.394 -   
   1.395 -   return retQ;
   1.396 - }
   1.397 -
   1.398 -/* ---]]] if needs more room, makes a larger writer-array
   1.399 - * ---]]] copies the old writer-array into the new
   1.400 - * ---]]] makes a new SRSW queue an puts it into the array
   1.401 - * ---]]] returns the index to the new SRSW queue as the ID
   1.402 - *
   1.403 - *NOTE: assuming all adds are completed before any writes or reads are
   1.404 - * performed..  otherwise, this needs to be re-done carefully, probably with
   1.405 - * a lock.
   1.406 - */
   1.407 -int addWriterToSRMWQ( SRMWQueueStruc* Q )
   1.408 - { int oldSz, i;
   1.409 -   SRSWQueueStruc * *oldArray;
   1.410 -   
   1.411 -   (Q->numInternalQs)++;
   1.412 -   if( Q->numInternalQs >= Q->internalQsSz )
   1.413 -    { //full, so make bigger
   1.414 -      oldSz            = Q->internalQsSz;
   1.415 -      oldArray         = Q->internalQs;
   1.416 -      Q->internalQsSz *= 2;
   1.417 -      Q->internalQs    = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
   1.418 -      for( i = 0; i < oldSz; i++ )
   1.419 -       { Q->internalQs[i] = oldArray[i];
   1.420 -       }
   1.421 -      free( oldArray );
   1.422 -    }
   1.423 -   Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
   1.424 -   return Q->numInternalQs - 1;
   1.425 - }
   1.426 -
   1.427 -
   1.428 -/* ---]]] gets saved index of last SRSW queue read-from
   1.429 - * ---]]] increments index and gets indexed queue
   1.430 - * ---]]] does a non-blocking read of that queue
   1.431 - * ---]]] if gets something, saves index and returns that value
   1.432 - * ---]]] if gets null, then goes to next queue
   1.433 - * ---]]] if got null from all the queues then does yield() then tries again
   1.434 - */
   1.435 -void* readSRMWQ( SRMWQueueStruc* Q )
   1.436 - { SRSWQueueStruc *readQ;
   1.437 -   void *readValue   = 0;
   1.438 -   int   tries       = 0;
   1.439 -   int   QToReadFrom = 0;
   1.440 -   
   1.441 -   QToReadFrom = Q->lastQReadFrom;
   1.442 -   
   1.443 -   while( TRUE )
   1.444 -    { QToReadFrom++;
   1.445 -      if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
   1.446 -      readQ = Q->internalQs[ QToReadFrom ];
   1.447 -      readValue = readSRSWQ_NonBlocking( readQ );
   1.448 -      
   1.449 -      if( readValue != 0 ) //got a value, return it
   1.450 -       { Q->lastQReadFrom = QToReadFrom;
   1.451 -         return readValue;
   1.452 -       }
   1.453 -      else  //SRSW Q just read is empty
   1.454 -       {    //check if all queues have been tried
   1.455 -         if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
   1.456 -          { tries++; //give a writer a chance to finish before yield
   1.457 -            if( tries > SPINLOCK_TRIES ) pthread_yield();
   1.458 -          }
   1.459 -       }
   1.460 -    }
   1.461 - }
   1.462 -
   1.463 -
   1.464 -/* 
   1.465 - * ---]]] uses the writerID as index to get the SRSW queue for that writer
   1.466 - * ---]]] performs writeQ on that queue (may block via repeated yield calls)
   1.467 - */
   1.468 -void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
   1.469 - { 
   1.470 -   if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
   1.471 -   
   1.472 -   writeSRSWQ( in, Q->internalQs[ writerID ] );
   1.473 - }
   1.474 +/*
   1.475 + *  Copyright 2009 OpenSourceStewardshipFoundation.org
   1.476 + *  Licensed under GNU General Public License version 2
   1.477 + *
   1.478 + * Author: seanhalle@yahoo.com
   1.479 + */
   1.480 +
   1.481 +
   1.482 +#include <stdio.h>
   1.483 +#include <errno.h>
   1.484 +#include <pthread.h>
   1.485 +#include <stdlib.h>
   1.486 +#include <sched.h>
   1.487 +
   1.488 +#include "BlockingQueue.h"
   1.489 +
   1.490 +#define INC(x) (++x == 1024) ? (x) = 0 : (x)
   1.491 +
   1.492 +#define SPINLOCK_TRIES 100000
   1.493 +
   1.494 +//===========================================================================
   1.495 +//Normal pthread Q
   1.496 +
   1.497 +PThdQueueStruc* makePThdQ()
   1.498 + {
   1.499 +   PThdQueueStruc* retQ;
   1.500 +   int retCode;
   1.501 +   retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) );
   1.502 +
   1.503 +
   1.504 +   retCode =
   1.505 +   pthread_mutex_init( &retQ->mutex_t,  NULL);
   1.506 +   if(retCode){perror("Error in creating mutex:"); exit(1);}
   1.507 +
   1.508 +   retCode = pthread_cond_init ( &retQ->cond_w_t, NULL);
   1.509 +   if(retCode){perror("Error in creating cond_var:"); exit(1);}
   1.510 +
   1.511 +   retCode = pthread_cond_init ( &retQ->cond_r_t, NULL);
   1.512 +   if(retCode){perror("Error in creating cond_var:"); exit(1);}
   1.513 +
   1.514 +   retQ->count    = 0;
   1.515 +   retQ->readPos  = 0;
   1.516 +   retQ->writePos = 0;
   1.517 +   retQ->w_empty  = 0;
   1.518 +   retQ->w_full   = 0;
   1.519 +
   1.520 +   return retQ;
   1.521 + }
   1.522 +
   1.523 +void * readPThdQ( PThdQueueStruc *Q )
   1.524 + { void *ret;
   1.525 +   int retCode, wt;
   1.526 +   pthread_mutex_lock( &Q->mutex_t );
   1.527 +    {
   1.528 +      while( Q -> count == 0 )
   1.529 +       { Q -> w_empty = 1;
   1.530 +         retCode =
   1.531 +         pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
   1.532 +         if( retCode ){ perror("Thread wait error: "); exit(1); }
   1.533 +       }
   1.534 +      Q -> w_empty = 0;
   1.535 +      Q -> count -= 1;
   1.536 +      ret = Q->data[ Q->readPos ];
   1.537 +      INC( Q->readPos );
   1.538 +      wt = Q -> w_full;
   1.539 +      Q -> w_full = 0;
   1.540 +    }
   1.541 +   pthread_mutex_unlock( &Q->mutex_t );
   1.542 +   if (wt)
   1.543 +      pthread_cond_signal( &Q->cond_w_t );
   1.544 +
   1.545 +         //printf("Q out: %d\n", ret);
   1.546 +   return( ret );
   1.547 + }
   1.548 +
   1.549 +void writePThdQ( void * in, PThdQueueStruc* Q )
   1.550 + {
   1.551 +   int status, wt;
   1.552 +         //printf("Q in: %d\n", in);
   1.553 +
   1.554 +   pthread_mutex_lock( &Q->mutex_t );
   1.555 +    {
   1.556 +      while( Q->count >= 1024 )
   1.557 +       {
   1.558 +         Q -> w_full = 1;
   1.559 +         status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t );
   1.560 +         if (status != 0)
   1.561 +          { perror("Thread wait error: ");
   1.562 +            exit(1);
   1.563 +          }
   1.564 +       }
   1.565 +
   1.566 +      Q -> w_full = 0;
   1.567 +      Q->count += 1;
   1.568 +      Q->data[ Q->writePos ] = in;
   1.569 +      INC( Q->writePos );
   1.570 +      wt = Q -> w_empty;
   1.571 +      Q -> w_empty = 0;
   1.572 +    }
   1.573 +
   1.574 +   pthread_mutex_unlock( &Q->mutex_t );
   1.575 +   if( wt )  pthread_cond_signal( &Q->cond_r_t );
   1.576 + }
   1.577 +
   1.578 +
   1.579 +//===========================================================================
   1.580 +// multi reader  multi writer fast Q   via CAS
   1.581 +#ifndef _GNU_SOURCE
   1.582 +#define _GNU_SOURCE
   1.583 +
   1.584 +/*This is a blocking queue, but it uses CAS instr plus yield() when empty
   1.585 + * or full
   1.586 + *It uses CAS because it's meant to have more than one reader and more than
   1.587 + * one writer.
   1.588 + */
   1.589 +
   1.590 +CASQueueStruc* makeCASQ()
   1.591 + {
   1.592 +   CASQueueStruc* retQ;
   1.593 +   retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) );
   1.594 +
   1.595 +   retQ->insertLock = UNLOCKED;
   1.596 +   retQ->extractLock= UNLOCKED;
   1.597 +   //TODO: check got pointer syntax right
   1.598 +   retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
   1.599 +   retQ->insertPos  = &(retQ->startOfData[1]); // so start pos's have to be
   1.600 +   retQ->endOfData  = &(retQ->startOfData[1023]);
   1.601 +
   1.602 +   return retQ;
   1.603 + }
   1.604 +
   1.605 +
   1.606 +void* readCASQ( CASQueueStruc* Q )
   1.607 + { void  *out    = 0;
   1.608 +   int    tries  = 0;
   1.609 +   void **startOfData = Q->startOfData;
   1.610 +   void **endOfData   = Q->endOfData;
   1.611 +
   1.612 +   int  gotLock = FALSE;
   1.613 +
   1.614 +   while( TRUE )
   1.615 +    {    //this intrinsic returns true if the lock held "UNLOCKED", in which
   1.616 +         // case it now holds "LOCKED" -- if it already held "LOCKED", then
   1.617 +         // gotLock is FALSE
   1.618 +      gotLock =
   1.619 +         __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
   1.620 +            //NOTE: checked assy, and it does lock correctly..
   1.621 +      if( gotLock )
   1.622 +       {
   1.623 +         void **insertPos  = Q->insertPos;
   1.624 +         void **extractPos = Q->extractPos;
   1.625 +
   1.626 +            //if not empty -- extract just below insert when empty
   1.627 +         if( insertPos - extractPos != 1 &&
   1.628 +             !(extractPos == endOfData && insertPos == startOfData))
   1.629 +          {    //move before read
   1.630 +            if( extractPos == endOfData ) //write new pos exactly once, correctly
   1.631 +             { Q->extractPos = startOfData; //can't overrun then fix it 'cause
   1.632 +             }                              // other thread might read bad pos
   1.633 +            else
   1.634 +             { Q->extractPos++;
   1.635 +             }
   1.636 +            out = *(Q->extractPos);
   1.637 +            Q->extractLock = UNLOCKED;
   1.638 +            return out;
   1.639 +          }
   1.640 +         else //Q is empty
   1.641 +          { Q->extractLock = UNLOCKED;//empty, so release lock for others
   1.642 +          }
   1.643 +       }
   1.644 +         //Q is busy or empty
   1.645 +      tries++;
   1.646 +      if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
   1.647 +    }
   1.648 + }
   1.649 +
   1.650 +void writeCASQ( void * in, CASQueueStruc* Q )
   1.651 + {
   1.652 +   int  tries   = 0;
   1.653 +      //TODO: need to make Q volatile?  Want to do this Q in assembly!
   1.654 +      //Have no idea what GCC's going to do to this code
   1.655 +   void **startOfData = Q->startOfData;
   1.656 +   void **endOfData   = Q->endOfData;
   1.657 +
   1.658 +   int  gotLock = FALSE;
   1.659 +
   1.660 +   while( TRUE )
   1.661 +    {    //this intrinsic returns true if the lock held "UNLOCKED", in which
   1.662 +         // case it now holds "LOCKED" -- if it already held "LOCKED", then
   1.663 +         // gotLock is FALSE
   1.664 +      gotLock =
   1.665 +         __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
   1.666 +      if( gotLock )
   1.667 +       {
   1.668 +         void **insertPos  = Q->insertPos;
   1.669 +         void **extractPos = Q->extractPos;
   1.670 +
   1.671 +            //check if room to insert.. can't use a count variable
   1.672 +            // 'cause both insertor Thd and extractor Thd would write it
   1.673 +         if( extractPos - insertPos != 1 &&
   1.674 +             !(insertPos == endOfData && extractPos == startOfData))
   1.675 +          { *(Q->insertPos) = in;   //insert before move
   1.676 +            if( insertPos == endOfData )
   1.677 +             { Q->insertPos = startOfData;
   1.678 +             }
   1.679 +            else
   1.680 +             { Q->insertPos++;
   1.681 +             }
   1.682 +            Q->insertLock = UNLOCKED;
   1.683 +            return;
   1.684 +          }
   1.685 +         else //Q is full
   1.686 +          { Q->insertLock = UNLOCKED;//full, so release lock for others
   1.687 +          }
   1.688 +       }
   1.689 +      tries++;
   1.690 +      if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
   1.691 +    }
   1.692 + }
   1.693 +
   1.694 +#endif  //_GNU_SOURCE
   1.695 +
   1.696 +
   1.697 +//===========================================================================
   1.698 +//Single reader single writer super fast Q.. no atomic instrs..
   1.699 +
   1.700 +
   1.701 +/*This is a blocking queue, but it uses no atomic instructions, just does
   1.702 + * yield() when empty or full
   1.703 + *
   1.704 + *It doesn't need any atomic instructions because only a single thread
   1.705 + * extracts and only a single thread inserts, and it has no locations that
   1.706 + * are written by both.  It writes before moving and moves before reading,
   1.707 + * and never lets write position and read position be the same, so dis-
   1.708 + * synchrony can only ever cause an unnecessary call to yield(), never a
   1.709 + * wrong value (by monotonicity of movement of pointers, plus single writer
   1.710 + * to pointers, plus sequence of write before change pointer, plus
   1.711 + * assumptions that if thread A semantically writes X before Y, then thread
   1.712 + * B will see the writes in that order.)
   1.713 + */
   1.714 +
   1.715 +SRSWQueueStruc* makeSRSWQ()
   1.716 + {
   1.717 +   SRSWQueueStruc* retQ;
   1.718 +   retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) );
   1.719 +   memset( retQ->startOfData, 0, 1024 * sizeof(void *) );
   1.720 +   
   1.721 +   retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
   1.722 +   retQ->insertPos  = &(retQ->startOfData[1]); // so start pos's have to be
   1.723 +   retQ->endOfData  = &(retQ->startOfData[1023]);
   1.724 +   
   1.725 +   return retQ;
   1.726 + }
   1.727 +
   1.728 +void
   1.729 +freeSRSWQ( SRSWQueueStruc* Q )
   1.730 + {
   1.731 +   free( Q );
   1.732 + }
   1.733 +
   1.734 +void* readSRSWQ( SRSWQueueStruc* Q )
   1.735 + { void *out    = 0;
   1.736 +   int  tries   = 0;
   1.737 +
   1.738 +   while( TRUE )
   1.739 +    {
   1.740 +      if( Q->insertPos - Q->extractPos != 1 &&
   1.741 +          !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
   1.742 +       { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
   1.743 +         else Q->extractPos++;           //move before read
   1.744 +         out = *(Q->extractPos);
   1.745 +         return out;
   1.746 +       }
   1.747 +         //Q is empty
   1.748 +      tries++;
   1.749 +      if( tries > SPINLOCK_TRIES ) pthread_yield();
   1.750 +    }
   1.751 + }
   1.752 +
   1.753 +
   1.754 +void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
   1.755 + { void *out    = 0;
   1.756 +   int  tries   = 0;
   1.757 +
   1.758 +   while( TRUE )
   1.759 +    {
   1.760 +      if( Q->insertPos - Q->extractPos != 1 &&
   1.761 +          !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
   1.762 +       { Q->extractPos++;           //move before read
   1.763 +         if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
   1.764 +         out = *(Q->extractPos);
   1.765 +         return out;
   1.766 +       }
   1.767 +         //Q is empty
   1.768 +      tries++;
   1.769 +      if( tries > 10 ) return NULL; //long enough for writer to finish
   1.770 +    }
   1.771 + }
   1.772 +
   1.773 +
   1.774 +void writeSRSWQ( void * in, SRSWQueueStruc* Q )
   1.775 + {
   1.776 +   int  tries   = 0;
   1.777 +   
   1.778 +   while( TRUE )
   1.779 +    {
   1.780 +      if( Q->extractPos - Q->insertPos != 1 &&
   1.781 +          !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
   1.782 +       { *(Q->insertPos) = in;   //insert before move
   1.783 +         if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
   1.784 +         else Q->insertPos++;
   1.785 +         return;
   1.786 +       }
   1.787 +         //Q is full
   1.788 +      tries++;
   1.789 +      if( tries > SPINLOCK_TRIES ) pthread_yield();
   1.790 +    }
   1.791 + }
   1.792 +
   1.793 +
   1.794 +
   1.795 +//===========================================================================
   1.796 +//Single reader Multiple writer super fast Q.. no atomic instrs..
   1.797 +
   1.798 +
   1.799 +/*This is a blocking queue, but it uses no atomic instructions, just does
   1.800 + * yield() when empty or full
   1.801 + *
   1.802 + *It doesn't need any atomic instructions because only a single thread
   1.803 + * extracts and only a single thread inserts, and it has no locations that
   1.804 + * are written by both.  It writes before moving and moves before reading,
   1.805 + * and never lets write position and read position be the same, so dis-
   1.806 + * synchrony can only ever cause an unnecessary call to yield(), never a
   1.807 + * wrong value (by monotonicity of movement of pointers, plus single writer
   1.808 + * to pointers, plus sequence of write before change pointer, plus
   1.809 + * assumptions that if thread A semantically writes X before Y, then thread
   1.810 + * B will see the writes in that order.)
   1.811 + *
   1.812 + *The multi-writer version is implemented as a hierarchy.  Each writer has
   1.813 + * its own single-reader single-writer queue.  The reader simply does a
   1.814 + * round-robin harvesting from them.
   1.815 + *
   1.816 + *A writer must first register itself with the queue, and receives an ID back
   1.817 + * It then uses that ID on each write operation.
   1.818 + *
   1.819 + *The implementation is:
   1.820 + *Physically:
   1.821 + * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
   1.822 + * -] it also has read-pointer to the last queue a write was taken from.
   1.823 + *
   1.824 + *Action-Patterns:
   1.825 + * -] To add a writer
   1.826 + * --]] writer-thread calls addWriterToQ(), remember the ID it returns
   1.827 + * --]] internally addWriterToQ does:
   1.828 + * ---]]] if needs more room, makes a larger writer-array
   1.829 + * ---]]] copies the old writer-array into the new
   1.830 + * ---]]] makes a new SRSW queue an puts it into the array
   1.831 + * ---]]] returns the index to the new SRSW queue as the ID
   1.832 + * -] To write
   1.833 + * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
   1.834 + * --]] this call may block, via repeated yield() calls
   1.835 + * --]] internally, writeSRMWQ does:
   1.836 + * ---]]] uses the writerID as index to get the SRSW queue for that writer
   1.837 + * ---]]] performs writeQ on that queue (may block via repeated yield calls)
   1.838 + * -] To Read
   1.839 + * --]] reader calls readSRMWQ, passing the Q struc
   1.840 + * --]] this call may block, via repeated yield() calls
   1.841 + * --]] internally, readSRMWQ does:
   1.842 + * ---]]] gets saved index of last SRSW queue read from
   1.843 + * ---]]] increments index and gets indexed queue
   1.844 + * ---]]] does a non-blocking read of that queue
   1.845 + * ---]]] if gets something, saves index and returns that value
   1.846 + * ---]]] if gets null, then goes to next queue
   1.847 + * ---]]] if got null from all the queues then does yield() then tries again
   1.848 + *
   1.849 + *Note: "0" is used as the value null, so SRSW queues must only contain
   1.850 + * pointers, and cannot use 0 as a valid pointer value.
   1.851 + *
   1.852 + */
   1.853 +
   1.854 +SRMWQueueStruc* makeSRMWQ()
   1.855 + { SRMWQueueStruc* retQ;
   1.856 +   
   1.857 +   retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) );
   1.858 +   
   1.859 +   retQ->numInternalQs = 0;
   1.860 +   retQ->internalQsSz  = 10;
   1.861 +   retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *));
   1.862 +   
   1.863 +   retQ->lastQReadFrom = 0;
   1.864 +   
   1.865 +   return retQ;
   1.866 + }
   1.867 +
   1.868 +/* ---]]] if needs more room, makes a larger writer-array
   1.869 + * ---]]] copies the old writer-array into the new
   1.870 + * ---]]] makes a new SRSW queue an puts it into the array
   1.871 + * ---]]] returns the index to the new SRSW queue as the ID
   1.872 + *
   1.873 + *NOTE: assuming all adds are completed before any writes or reads are
   1.874 + * performed..  otherwise, this needs to be re-done carefully, probably with
   1.875 + * a lock.
   1.876 + */
   1.877 +int addWriterToSRMWQ( SRMWQueueStruc* Q )
   1.878 + { int oldSz, i;
   1.879 +   SRSWQueueStruc * *oldArray;
   1.880 +   
   1.881 +   (Q->numInternalQs)++;
   1.882 +   if( Q->numInternalQs >= Q->internalQsSz )
   1.883 +    { //full, so make bigger
   1.884 +      oldSz            = Q->internalQsSz;
   1.885 +      oldArray         = Q->internalQs;
   1.886 +      Q->internalQsSz *= 2;
   1.887 +      Q->internalQs    = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
   1.888 +      for( i = 0; i < oldSz; i++ )
   1.889 +       { Q->internalQs[i] = oldArray[i];
   1.890 +       }
   1.891 +      free( oldArray );
   1.892 +    }
   1.893 +   Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
   1.894 +   return Q->numInternalQs - 1;
   1.895 + }
   1.896 +
   1.897 +
   1.898 +/* ---]]] gets saved index of last SRSW queue read-from
   1.899 + * ---]]] increments index and gets indexed queue
   1.900 + * ---]]] does a non-blocking read of that queue
   1.901 + * ---]]] if gets something, saves index and returns that value
   1.902 + * ---]]] if gets null, then goes to next queue
   1.903 + * ---]]] if got null from all the queues then does yield() then tries again
   1.904 + */
   1.905 +void* readSRMWQ( SRMWQueueStruc* Q )
   1.906 + { SRSWQueueStruc *readQ;
   1.907 +   void *readValue   = 0;
   1.908 +   int   tries       = 0;
   1.909 +   int   QToReadFrom = 0;
   1.910 +   
   1.911 +   QToReadFrom = Q->lastQReadFrom;
   1.912 +   
   1.913 +   while( TRUE )
   1.914 +    { QToReadFrom++;
   1.915 +      if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
   1.916 +      readQ = Q->internalQs[ QToReadFrom ];
   1.917 +      readValue = readSRSWQ_NonBlocking( readQ );
   1.918 +      
   1.919 +      if( readValue != 0 ) //got a value, return it
   1.920 +       { Q->lastQReadFrom = QToReadFrom;
   1.921 +         return readValue;
   1.922 +       }
   1.923 +      else  //SRSW Q just read is empty
   1.924 +       {    //check if all queues have been tried
   1.925 +         if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
   1.926 +          { tries++; //give a writer a chance to finish before yield
   1.927 +            if( tries > SPINLOCK_TRIES ) pthread_yield();
   1.928 +          }
   1.929 +       }
   1.930 +    }
   1.931 + }
   1.932 +
   1.933 +
   1.934 +/* 
   1.935 + * ---]]] uses the writerID as index to get the SRSW queue for that writer
   1.936 + * ---]]] performs writeQ on that queue (may block via repeated yield calls)
   1.937 + */
   1.938 +void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
   1.939 + { 
   1.940 +   if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
   1.941 +   
   1.942 +   writeSRSWQ( in, Q->internalQs[ writerID ] );
   1.943 + }
     2.1 --- a/PrivateQueue.c	Thu Nov 04 17:50:29 2010 -0700
     2.2 +++ b/PrivateQueue.c	Tue Feb 07 12:51:29 2012 -0800
     2.3 @@ -1,141 +1,141 @@
     2.4 -/*
     2.5 - *  Copyright 2009 OpenSourceStewardshipFoundation.org
     2.6 - *  Licensed under GNU General Public License version 2
     2.7 - *
     2.8 - * NOTE: this version of SRSW correct as of April 25, 2010
     2.9 - *
    2.10 - * Author: seanhalle@yahoo.com
    2.11 - */
    2.12 -
    2.13 -
    2.14 -#include <stdio.h>
    2.15 -#include <string.h>
    2.16 -#include <errno.h>
    2.17 -#include <stdlib.h>
    2.18 -
    2.19 -#include "PrivateQueue.h"
    2.20 -
    2.21 -
    2.22 -
    2.23 -//===========================================================================
    2.24 -
    2.25 -/*This kind of queue is private to a single core at a time -- has no
    2.26 - * synchronizations 
    2.27 - */
    2.28 -
    2.29 -PrivQueueStruc* makePrivQ()
    2.30 - {
    2.31 -   PrivQueueStruc* retQ;
    2.32 -   retQ = (PrivQueueStruc *) malloc( sizeof( PrivQueueStruc ) );
    2.33 -
    2.34 -   retQ->startOfData = malloc( 1024 * sizeof(void *) );
    2.35 -   memset( retQ->startOfData, 0, 1024 * sizeof(void *) );
    2.36 -   retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
    2.37 -   retQ->insertPos  = &(retQ->startOfData[1]); // so start pos's have to be
    2.38 -   retQ->endOfData  = &(retQ->startOfData[1023]);
    2.39 -
    2.40 -   return retQ;
    2.41 - }
    2.42 -
    2.43 -
    2.44 -void
    2.45 -enlargePrivQ( PrivQueueStruc *Q )
    2.46 - { int    oldSize, newSize;
    2.47 -   void **oldStartOfData;
    2.48 -
    2.49 -   oldSize           = Q->endOfData - Q->startOfData;
    2.50 -   newSize           = 2 * oldSize;
    2.51 -   oldStartOfData = Q->startOfData;
    2.52 -   Q->startOfData = malloc( newSize * sizeof(void *) );
    2.53 -   memcpy(Q->startOfData, oldStartOfData, oldSize * sizeof(void *));
    2.54 -   free(oldStartOfData);
    2.55 -   
    2.56 -   Q->extractPos  = &(Q->startOfData[0]); //side by side == empty
    2.57 -   Q->insertPos   = &(Q->startOfData[1]); // so start pos's have to be
    2.58 -   Q->endOfData   = &(Q->startOfData[newSize - 1]);
    2.59 - }
    2.60 -
    2.61 -
    2.62 -/*Returns NULL when queue is empty
    2.63 - */
    2.64 -void* readPrivQ( PrivQueueStruc* Q )
    2.65 - { void *out    = 0;
    2.66 -   void **startOfData = Q->startOfData;
    2.67 -   void **endOfData   = Q->endOfData;
    2.68 -
    2.69 -   void **insertPos  = Q->insertPos;
    2.70 -   void **extractPos = Q->extractPos;
    2.71 -
    2.72 -      //if not empty -- (extract is just below insert when empty)
    2.73 -   if( insertPos - extractPos != 1 &&
    2.74 -       !(extractPos == endOfData && insertPos == startOfData))
    2.75 -    {    //move before read
    2.76 -      if( extractPos == endOfData ) //write new pos exactly once, correctly
    2.77 -       { Q->extractPos = startOfData; //can't overrun then fix it 'cause
    2.78 -       }                              // other thread might read bad pos
    2.79 -      else
    2.80 -       { Q->extractPos++;
    2.81 -       }
    2.82 -      out = *(Q->extractPos);
    2.83 -      return out;
    2.84 -    }
    2.85 -      //Q is empty
    2.86 -   return NULL;
    2.87 - }
    2.88 -
    2.89 -
    2.90 -/*Expands the queue size automatically when it's full
    2.91 - */
    2.92 -void
    2.93 -writePrivQ( void * in, PrivQueueStruc* Q )
    2.94 - {
    2.95 -   void **startOfData = Q->startOfData;
    2.96 -   void **endOfData   = Q->endOfData;
    2.97 -   
    2.98 -   void **insertPos  = Q->insertPos;
    2.99 -   void **extractPos = Q->extractPos;
   2.100 -
   2.101 -tryAgain:
   2.102 -      //Full? (insert is just below extract when full)
   2.103 -   if( extractPos - insertPos != 1 &&
   2.104 -       !(insertPos == endOfData && extractPos == startOfData))
   2.105 -    { *(Q->insertPos) = in;   //insert before move
   2.106 -      if( insertPos == endOfData ) //write new pos exactly once, correctly
   2.107 -       { Q->insertPos = startOfData;
   2.108 -       }
   2.109 -      else
   2.110 -       { Q->insertPos++;
   2.111 -       }
   2.112 -      return;
   2.113 -    }
   2.114 -      //Q is full
   2.115 -   enlargePrivQ( Q );
   2.116 -   goto tryAgain;
   2.117 - }
   2.118 -
   2.119 -
   2.120 -/*Returns false when the queue was full.
   2.121 - * have option of calling make_larger_PrivQ to make more room, then try again
   2.122 - */
   2.123 -int writeIfSpacePrivQ( void * in, PrivQueueStruc* Q )
   2.124 - {
   2.125 -   void **startOfData = Q->startOfData;
   2.126 -   void **endOfData   = Q->endOfData;
   2.127 -
   2.128 -   void **insertPos  = Q->insertPos;
   2.129 -   void **extractPos = Q->extractPos;
   2.130 -
   2.131 -   if( extractPos - insertPos != 1 &&
   2.132 -       !(insertPos == endOfData && extractPos == startOfData))
   2.133 -    { *(Q->insertPos) = in;   //insert before move
   2.134 -      if( insertPos == endOfData ) //write new pos exactly once, correctly
   2.135 -       { Q->insertPos = startOfData;
   2.136 -       }
   2.137 -      else
   2.138 -       { Q->insertPos++;
   2.139 -       }
   2.140 -      return TRUE;
   2.141 -    }
   2.142 -      //Q is full
   2.143 -   return FALSE;
   2.144 - }
   2.145 +/*
   2.146 + *  Copyright 2009 OpenSourceStewardshipFoundation.org
   2.147 + *  Licensed under GNU General Public License version 2
   2.148 + *
   2.149 + * NOTE: this version of SRSW correct as of April 25, 2010
   2.150 + *
   2.151 + * Author: seanhalle@yahoo.com
   2.152 + */
   2.153 +
   2.154 +
   2.155 +#include <stdio.h>
   2.156 +#include <string.h>
   2.157 +#include <errno.h>
   2.158 +#include <stdlib.h>
   2.159 +
   2.160 +#include "PrivateQueue.h"
   2.161 +
   2.162 +
   2.163 +
   2.164 +//===========================================================================
   2.165 +
   2.166 +/*This kind of queue is private to a single core at a time -- has no
   2.167 + * synchronizations 
   2.168 + */
   2.169 +
   2.170 +PrivQueueStruc* makePrivQ()
   2.171 + {
   2.172 +   PrivQueueStruc* retQ;
   2.173 +   retQ = (PrivQueueStruc *) malloc( sizeof( PrivQueueStruc ) );
   2.174 +
   2.175 +   retQ->startOfData = malloc( 1024 * sizeof(void *) );
   2.176 +   memset( retQ->startOfData, 0, 1024 * sizeof(void *) );
   2.177 +   retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
   2.178 +   retQ->insertPos  = &(retQ->startOfData[1]); // so start pos's have to be
   2.179 +   retQ->endOfData  = &(retQ->startOfData[1023]);
   2.180 +
   2.181 +   return retQ;
   2.182 + }
   2.183 +
   2.184 +
   2.185 +void
   2.186 +enlargePrivQ( PrivQueueStruc *Q )
   2.187 + { int    oldSize, newSize;
   2.188 +   void **oldStartOfData;
   2.189 +
   2.190 +   oldSize           = Q->endOfData - Q->startOfData;
   2.191 +   newSize           = 2 * oldSize;
   2.192 +   oldStartOfData = Q->startOfData;
   2.193 +   Q->startOfData = malloc( newSize * sizeof(void *) );
   2.194 +   memcpy(Q->startOfData, oldStartOfData, oldSize * sizeof(void *));
   2.195 +   free(oldStartOfData);
   2.196 +   
   2.197 +   Q->extractPos  = &(Q->startOfData[0]); //side by side == empty
   2.198 +   Q->insertPos   = &(Q->startOfData[1]); // so start pos's have to be
   2.199 +   Q->endOfData   = &(Q->startOfData[newSize - 1]);
   2.200 + }
   2.201 +
   2.202 +
   2.203 +/*Returns NULL when queue is empty
   2.204 + */
   2.205 +void* readPrivQ( PrivQueueStruc* Q )
   2.206 + { void *out    = 0;
   2.207 +   void **startOfData = Q->startOfData;
   2.208 +   void **endOfData   = Q->endOfData;
   2.209 +
   2.210 +   void **insertPos  = Q->insertPos;
   2.211 +   void **extractPos = Q->extractPos;
   2.212 +
   2.213 +      //if not empty -- (extract is just below insert when empty)
   2.214 +   if( insertPos - extractPos != 1 &&
   2.215 +       !(extractPos == endOfData && insertPos == startOfData))
   2.216 +    {    //move before read
   2.217 +      if( extractPos == endOfData ) //write new pos exactly once, correctly
   2.218 +       { Q->extractPos = startOfData; //can't overrun then fix it 'cause
   2.219 +       }                              // other thread might read bad pos
   2.220 +      else
   2.221 +       { Q->extractPos++;
   2.222 +       }
   2.223 +      out = *(Q->extractPos);
   2.224 +      return out;
   2.225 +    }
   2.226 +      //Q is empty
   2.227 +   return NULL;
   2.228 + }
   2.229 +
   2.230 +
   2.231 +/*Expands the queue size automatically when it's full
   2.232 + */
   2.233 +void
   2.234 +writePrivQ( void * in, PrivQueueStruc* Q )
   2.235 + {
   2.236 +   void **startOfData = Q->startOfData;
   2.237 +   void **endOfData   = Q->endOfData;
   2.238 +   
   2.239 +   void **insertPos  = Q->insertPos;
   2.240 +   void **extractPos = Q->extractPos;
   2.241 +
   2.242 +tryAgain:
   2.243 +      //Full? (insert is just below extract when full)
   2.244 +   if( extractPos - insertPos != 1 &&
   2.245 +       !(insertPos == endOfData && extractPos == startOfData))
   2.246 +    { *(Q->insertPos) = in;   //insert before move
   2.247 +      if( insertPos == endOfData ) //write new pos exactly once, correctly
   2.248 +       { Q->insertPos = startOfData;
   2.249 +       }
   2.250 +      else
   2.251 +       { Q->insertPos++;
   2.252 +       }
   2.253 +      return;
   2.254 +    }
   2.255 +      //Q is full
   2.256 +   enlargePrivQ( Q );
   2.257 +   goto tryAgain;
   2.258 + }
   2.259 +
   2.260 +
   2.261 +/*Returns false when the queue was full.
   2.262 + * have option of calling make_larger_PrivQ to make more room, then try again
   2.263 + */
   2.264 +int writeIfSpacePrivQ( void * in, PrivQueueStruc* Q )
   2.265 + {
   2.266 +   void **startOfData = Q->startOfData;
   2.267 +   void **endOfData   = Q->endOfData;
   2.268 +
   2.269 +   void **insertPos  = Q->insertPos;
   2.270 +   void **extractPos = Q->extractPos;
   2.271 +
   2.272 +   if( extractPos - insertPos != 1 &&
   2.273 +       !(insertPos == endOfData && extractPos == startOfData))
   2.274 +    { *(Q->insertPos) = in;   //insert before move
   2.275 +      if( insertPos == endOfData ) //write new pos exactly once, correctly
   2.276 +       { Q->insertPos = startOfData;
   2.277 +       }
   2.278 +      else
   2.279 +       { Q->insertPos++;
   2.280 +       }
   2.281 +      return TRUE;
   2.282 +    }
   2.283 +      //Q is full
   2.284 +   return FALSE;
   2.285 + }
     3.1 --- a/PrivateQueue.h	Thu Nov 04 17:50:29 2010 -0700
     3.2 +++ b/PrivateQueue.h	Tue Feb 07 12:51:29 2012 -0800
     3.3 @@ -1,37 +1,37 @@
     3.4 -/*
     3.5 - *  Copyright 2009 OpenSourceStewardshipFoundation.org
     3.6 - *  Licensed under GNU General Public License version 2
     3.7 - *
     3.8 - * Author: seanhalle@yahoo.com
     3.9 - */
    3.10 -
    3.11 -#ifndef _PRIVATE_QUEUE_H
    3.12 -#define	_PRIVATE_QUEUE_H
    3.13 -
    3.14 -#include <pthread.h>
    3.15 -
    3.16 -#define TRUE     1
    3.17 -#define FALSE    0
    3.18 -
    3.19 -#define LOCKED   1
    3.20 -#define UNLOCKED 0
    3.21 -
    3.22 -
    3.23 -/* It is the data that is shared so only need one mutex. */
    3.24 -typedef struct
    3.25 - { void      **insertPos;
    3.26 -   void      **extractPos;
    3.27 -   void      **startOfData;  //data is pointers
    3.28 -   void      **endOfData;    //set when alloc data
    3.29 - }
    3.30 -PrivQueueStruc;
    3.31 -
    3.32 -
    3.33 -PrivQueueStruc*  makePrivQ ( );
    3.34 -void*            readPrivQ ( PrivQueueStruc *Q );
    3.35 -void             writePrivQ( void *in, PrivQueueStruc *Q );
    3.36 -int              writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ); //return
    3.37 -                    // false when full
    3.38 -
    3.39 -#endif	/* _PRIVATE_QUEUE_H */
    3.40 -
    3.41 +/*
    3.42 + *  Copyright 2009 OpenSourceStewardshipFoundation.org
    3.43 + *  Licensed under GNU General Public License version 2
    3.44 + *
    3.45 + * Author: seanhalle@yahoo.com
    3.46 + */
    3.47 +
    3.48 +#ifndef _PRIVATE_QUEUE_H
    3.49 +#define	_PRIVATE_QUEUE_H
    3.50 +
    3.51 +#include <pthread.h>
    3.52 +
    3.53 +#define TRUE     1
    3.54 +#define FALSE    0
    3.55 +
    3.56 +#define LOCKED   1
    3.57 +#define UNLOCKED 0
    3.58 +
    3.59 +
    3.60 +/* It is the data that is shared so only need one mutex. */
    3.61 +typedef struct
    3.62 + { void      **insertPos;
    3.63 +   void      **extractPos;
    3.64 +   void      **startOfData;  //data is pointers
    3.65 +   void      **endOfData;    //set when alloc data
    3.66 + }
    3.67 +PrivQueueStruc;
    3.68 +
    3.69 +
    3.70 +PrivQueueStruc*  makePrivQ ( );
    3.71 +void*            readPrivQ ( PrivQueueStruc *Q );
    3.72 +void             writePrivQ( void *in, PrivQueueStruc *Q );
    3.73 +int              writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ); //return
    3.74 +                    // false when full
    3.75 +
    3.76 +#endif	/* _PRIVATE_QUEUE_H */
    3.77 +