changeset 1:81f6687d52d1

Correct SRSW queue and correst CAS queue
author Me
date Fri, 18 Jun 2010 17:49:38 -0700
parents 85af604dee9b
children f4d50d8a1a38
files BlockingQueue.c
diffstat 1 files changed, 211 insertions(+), 52 deletions(-) [+]
line diff
     1.1 --- a/BlockingQueue.c	Sat May 22 19:51:09 2010 -0700
     1.2 +++ b/BlockingQueue.c	Fri Jun 18 17:49:38 2010 -0700
     1.3 @@ -1,9 +1,7 @@
     1.4  /*
     1.5 - *  Copyright 2009 OpenSourceCodeStewardshipFoundation.org
     1.6 + *  Copyright 2009 OpenSourceStewardshipFoundation.org
     1.7   *  Licensed under GNU General Public License version 2
     1.8   *
     1.9 - * NOTE: this version of SRSW correct as of April 25, 2010
    1.10 - *
    1.11   * Author: seanhalle@yahoo.com
    1.12   */
    1.13  
    1.14 @@ -12,11 +10,14 @@
    1.15  #include <errno.h>
    1.16  #include <pthread.h>
    1.17  #include <stdlib.h>
    1.18 +#include <sched.h>
    1.19 +#include <windows.h>
    1.20  
    1.21  #include "BlockingQueue.h"
    1.22  
    1.23  #define INC(x) (++x == 1024) ? (x) = 0 : (x)
    1.24  
    1.25 +#define SPINLOCK_TRIES 100000
    1.26  
    1.27  //===========================================================================
    1.28  //Normal pthread Q
    1.29 @@ -67,6 +68,7 @@
    1.30      {
    1.31        while( Q -> count == 0 )
    1.32         { Q -> w_empty = 1;
    1.33 +         // pthread_cond_broadcast( &Q->cond_w_t );
    1.34           status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
    1.35           if (status != 0)
    1.36            { perror("Thread wait error: ");
    1.37 @@ -79,6 +81,7 @@
    1.38        INC( Q->readPos );
    1.39        wt = Q -> w_full;
    1.40        Q -> w_full = 0;
    1.41 +      //pthread_cond_broadcast( &Q->cond_w_t );
    1.42      }
    1.43     pthread_mutex_unlock( &Q->mutex_t );
    1.44     if (wt)  pthread_cond_signal( &Q->cond_w_t );
    1.45 @@ -142,11 +145,11 @@
    1.46  
    1.47  
    1.48  void* readCASQ( CASQueueStruc* Q )
    1.49 - { void *out    = 0;
    1.50 -   int  tries   = 0;
    1.51 -   int  startOfData = Q->startOfData;
    1.52 -   int  endOfData   = Q->endOfData;
    1.53 -   
    1.54 + { void  *out    = 0;
    1.55 +   int    tries  = 0;
    1.56 +   void **startOfData = Q->startOfData;
    1.57 +   void **endOfData   = Q->endOfData;
    1.58 +
    1.59     int  success = FALSE;
    1.60  
    1.61     while( !success )
    1.62 @@ -154,8 +157,8 @@
    1.63           __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
    1.64        if( success )
    1.65         {
    1.66 -         volatile int insertPos  = Q->insertPos;
    1.67 -         volatile int extractPos = Q->extractPos;
    1.68 +         void **insertPos  = Q->insertPos;
    1.69 +         void **extractPos = Q->extractPos;
    1.70  
    1.71              //if not empty -- extract just below insert when empty
    1.72           if( insertPos - extractPos != 1 &&
    1.73 @@ -178,16 +181,18 @@
    1.74         }
    1.75           //Q is busy or empty
    1.76        tries++;
    1.77 -      if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock
    1.78 +      if( tries > SPINLOCK_TRIES ) SwitchToThread(); //WinAPI  yield()
    1.79      }
    1.80   }
    1.81  
    1.82  void writeCASQ( void * in, CASQueueStruc* Q )
    1.83   {
    1.84     int  tries   = 0;
    1.85 -   int  startOfData = Q->startOfData;
    1.86 -   int  endOfData   = Q->endOfData;
    1.87 -   
    1.88 +      //TODO: need to make Q volatile?  Want to do this Q in assembly!
    1.89 +      //Have no idea what GCC's going to do to this code
    1.90 +   void **startOfData = Q->startOfData;
    1.91 +   void **endOfData   = Q->endOfData;
    1.92 +
    1.93     int  success = FALSE;
    1.94  
    1.95     while( !success )
    1.96 @@ -195,14 +200,14 @@
    1.97           __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
    1.98        if( success )
    1.99         {
   1.100 -         volatile int insertPos  = Q->insertPos;
   1.101 -         volatile int extractPos = Q->extractPos;
   1.102 +         void **insertPos  = Q->insertPos;
   1.103 +         void **extractPos = Q->extractPos;
   1.104  
   1.105              //check if room to insert.. can't use a count variable
   1.106              // 'cause both insertor Thd and extractor Thd would write it
   1.107           if( extractPos - insertPos != 1 &&
   1.108               !(insertPos == endOfData && extractPos == startOfData))
   1.109 -          { *(insertPos) = in;   //insert before move
   1.110 +          { *(Q->insertPos) = in;   //insert before move
   1.111              if( insertPos == endOfData ) //write new pos exactly once, correctly
   1.112               { Q->insertPos = startOfData;
   1.113               }
   1.114 @@ -218,18 +223,19 @@
   1.115            }
   1.116         }
   1.117        tries++;
   1.118 -      if( tries > 10000 ) pthread_yield();//yield not guaranteed
   1.119 +      if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
   1.120      }
   1.121   }
   1.122  
   1.123  #endif  //_GNU_SOURCE
   1.124  
   1.125 +
   1.126  //===========================================================================
   1.127  //Single reader single writer super fast Q.. no atomic instrs..
   1.128  
   1.129  
   1.130  /*This is a blocking queue, but it uses no atomic instructions, just does
   1.131 - * busy-waiting when empty or full (but yield() if waits too long)
   1.132 + * yield() when empty or full
   1.133   *
   1.134   *It doesn't need any atomic instructions because only a single thread
   1.135   * extracts and only a single thread inserts, and it has no locations that
   1.136 @@ -258,57 +264,210 @@
   1.137  void* readSRSWQ( SRSWQueueStruc* Q )
   1.138   { void *out    = 0;
   1.139     int  tries   = 0;
   1.140 -   int  startOfData = Q->startOfData;
   1.141 -   int  endOfData   = Q->endOfData;
   1.142  
   1.143     while( TRUE )
   1.144 -    {    //not certain the volatile reads need to be done, but safe..
   1.145 -      volatile int insertPos  = Q->insertPos;
   1.146 -      volatile int extractPos = Q->extractPos;
   1.147 -
   1.148 -         //if not empty -- extract just below insert when empty
   1.149 -      if( insertPos - extractPos != 1 &&
   1.150 -          !(extractPos == endOfData && insertPos == startOfData))
   1.151 -       {    //move before read
   1.152 -         if( extractPos == endOfData ) //write new pos exactly once, correctly
   1.153 -          { Q->extractPos = startOfData; //can't overrun then fix it 'cause
   1.154 -          }                              // other thread might read bad pos
   1.155 -         else
   1.156 -          { Q->extractPos++;
   1.157 -          }
   1.158 +    {
   1.159 +      if( Q->insertPos - Q->extractPos != 1 &&
   1.160 +          !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
   1.161 +       { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
   1.162 +         else Q->extractPos++;           //move before read
   1.163           out = *(Q->extractPos);
   1.164           return out;
   1.165         }
   1.166           //Q is empty
   1.167        tries++;
   1.168 -      if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock
   1.169 +      if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
   1.170      }
   1.171   }
   1.172  
   1.173 +
   1.174 +void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
   1.175 + { void *out    = 0;
   1.176 +   int  tries   = 0;
   1.177 +
   1.178 +   while( TRUE )
   1.179 +    {
   1.180 +      if( Q->insertPos - Q->extractPos != 1 &&
   1.181 +          !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
   1.182 +       { Q->extractPos++;           //move before read
   1.183 +         if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
   1.184 +         out = *(Q->extractPos);
   1.185 +         return out;
   1.186 +       }
   1.187 +         //Q is empty
   1.188 +      tries++;
   1.189 +      if( tries > 2 ) return 0; //long enough for writer to finish
   1.190 +    }
   1.191 + }
   1.192 +
   1.193 +
   1.194  void writeSRSWQ( void * in, SRSWQueueStruc* Q )
   1.195   {
   1.196     int  tries   = 0;
   1.197 -   int  startOfData = Q->startOfData;
   1.198 -   int  endOfData   = Q->endOfData;
   1.199     
   1.200     while( TRUE )
   1.201 -    {    //not certain the volatile reads need to be done, but safe..
   1.202 -      volatile int insertPos  = Q->insertPos;
   1.203 -      volatile int extractPos = Q->extractPos;
   1.204 -      
   1.205 -      if( extractPos - insertPos != 1 &&
   1.206 -          !(insertPos == endOfData && extractPos == startOfData))
   1.207 -       { *(insertPos) = in;   //insert before move
   1.208 -         if( insertPos == endOfData ) //write new pos exactly once, correctly
   1.209 -          { Q->insertPos = startOfData;
   1.210 -          }
   1.211 -         else
   1.212 -          { Q->insertPos++;
   1.213 -          }
   1.214 +    {
   1.215 +      if( Q->extractPos - Q->insertPos != 1 &&
   1.216 +          !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
   1.217 +       { *(Q->insertPos) = in;   //insert before move
   1.218 +         if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
   1.219 +         else Q->insertPos++;
   1.220           return;
   1.221         }
   1.222           //Q is full
   1.223        tries++;
   1.224 -      if( tries > 10000 ) pthread_yield();//yield not guaranteed
   1.225 +      if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
   1.226      }
   1.227   }
   1.228 +
   1.229 +
   1.230 +
   1.231 +//===========================================================================
   1.232 +//Single reader Multiple writer super fast Q.. no atomic instrs..
   1.233 +
   1.234 +
   1.235 +/*This is a blocking queue, but it uses no atomic instructions, just does
   1.236 + * yield() when empty or full
   1.237 + *
   1.238 + *It doesn't need any atomic instructions because only a single thread
   1.239 + * extracts and only a single thread inserts, and it has no locations that
   1.240 + * are written by both.  It writes before moving and moves before reading,
   1.241 + * and never lets write position and read position be the same, so dis-
   1.242 + * synchrony can only ever cause an unnecessary call to yield(), never a
   1.243 + * wrong value (by monotonicity of movement of pointers, plus single writer
   1.244 + * to pointers, plus sequence of write before change pointer, plus
   1.245 + * assumptions that if thread A semantically writes X before Y, then thread
   1.246 + * B will see the writes in that order.)
   1.247 + *
   1.248 + *The multi-writer version is implemented as a hierarchy.  Each writer has
   1.249 + * its own single-reader single-writer queue.  The reader simply does a
   1.250 + * round-robin harvesting from them.
   1.251 + *
   1.252 + *A writer must first register itself with the queue, and receives an ID back
   1.253 + * It then uses that ID on each write operation.
   1.254 + *
   1.255 + *The implementation is:
   1.256 + *Physically:
   1.257 + * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
   1.258 + * -] it also has read-pointer to the last queue a write was taken from.
   1.259 + *
   1.260 + *Action-Patterns:
   1.261 + * -] To add a writer
   1.262 + * --]] writer-thread calls addWriterToQ(), remember the ID it returns
   1.263 + * --]] internally addWriterToQ does:
   1.264 + * ---]]] if needs more room, makes a larger writer-array
   1.265 + * ---]]] copies the old writer-array into the new
   1.266 + * ---]]] makes a new SRSW queue an puts it into the array
   1.267 + * ---]]] returns the index to the new SRSW queue as the ID
   1.268 + * -] To write
   1.269 + * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
   1.270 + * --]] this call may block, via repeated yield() calls
   1.271 + * --]] internally, writeSRMWQ does:
   1.272 + * ---]]] uses the writerID as index to get the SRSW queue for that writer
   1.273 + * ---]]] performs writeQ on that queue (may block via repeated yield calls)
   1.274 + * -] To Read
   1.275 + * --]] reader calls readSRMWQ, passing the Q struc
   1.276 + * --]] this call may block, via repeated yield() calls
   1.277 + * --]] internally, readSRMWQ does:
   1.278 + * ---]]] gets saved index of last SRSW queue read from
   1.279 + * ---]]] increments index and gets indexed queue
   1.280 + * ---]]] does a non-blocking read of that queue
   1.281 + * ---]]] if gets something, saves index and returns that value
   1.282 + * ---]]] if gets null, then goes to next queue
   1.283 + * ---]]] if got null from all the queues then does yield() then tries again
   1.284 + *
   1.285 + *Note: "0" is used as the value null, so SRSW queues must only contain
   1.286 + * pointers, and cannot use 0 as a valid pointer value.
   1.287 + *
   1.288 + */
   1.289 +
   1.290 +SRMWQueueStruc* makeSRMWQ()
   1.291 + { SRMWQueueStruc* retQ;
   1.292 +   
   1.293 +   retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) );
   1.294 +   
   1.295 +   retQ->numInternalQs = 0;
   1.296 +   retQ->internalQsSz  = 10;
   1.297 +   retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *));
   1.298 +   
   1.299 +   retQ->lastQReadFrom = 0;
   1.300 +   
   1.301 +   return retQ;
   1.302 + }
   1.303 +
   1.304 +/* ---]]] if needs more room, makes a larger writer-array
   1.305 + * ---]]] copies the old writer-array into the new
   1.306 + * ---]]] makes a new SRSW queue an puts it into the array
   1.307 + * ---]]] returns the index to the new SRSW queue as the ID
   1.308 + *
   1.309 + *NOTE: assuming all adds are completed before any writes or reads are
   1.310 + * performed..  otherwise, this needs to be re-done carefully, probably with
   1.311 + * a lock.
   1.312 + */
   1.313 +int addWriterToSRMWQ( SRMWQueueStruc* Q )
   1.314 + { int oldSz, i;
   1.315 +   SRSWQueueStruc * *oldArray;
   1.316 +   
   1.317 +   (Q->numInternalQs)++;
   1.318 +   if( Q->numInternalQs >= Q->internalQsSz )
   1.319 +    { //full, so make bigger
   1.320 +      oldSz            = Q->internalQsSz;
   1.321 +      oldArray         = Q->internalQs;
   1.322 +      Q->internalQsSz *= 2;
   1.323 +      Q->internalQs    = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
   1.324 +      for( i = 0; i < oldSz; i++ )
   1.325 +       { Q->internalQs[i] = oldArray[i];
   1.326 +       }
   1.327 +      free( oldArray );
   1.328 +    }
   1.329 +   Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
   1.330 +   return Q->numInternalQs - 1;
   1.331 + }
   1.332 +
   1.333 +
   1.334 +/* ---]]] gets saved index of last SRSW queue read-from
   1.335 + * ---]]] increments index and gets indexed queue
   1.336 + * ---]]] does a non-blocking read of that queue
   1.337 + * ---]]] if gets something, saves index and returns that value
   1.338 + * ---]]] if gets null, then goes to next queue
   1.339 + * ---]]] if got null from all the queues then does yield() then tries again
   1.340 + */
   1.341 +void* readSRMWQ( SRMWQueueStruc* Q )
   1.342 + { SRSWQueueStruc *readQ;
   1.343 +   void *readValue   = 0;
   1.344 +   int   tries       = 0;
   1.345 +   int   QToReadFrom = 0;
   1.346 +   
   1.347 +   QToReadFrom = Q->lastQReadFrom;
   1.348 +   
   1.349 +   while( TRUE )
   1.350 +    { QToReadFrom++;
   1.351 +      if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
   1.352 +      readQ = Q->internalQs[ QToReadFrom ];
   1.353 +      readValue = readSRSWQ_NonBlocking( readQ );
   1.354 +      
   1.355 +      if( readValue != 0 ) //got a value, return it
   1.356 +       { Q->lastQReadFrom = QToReadFrom;
   1.357 +         return readValue;
   1.358 +       }
   1.359 +      else  //SRSW Q just read is empty
   1.360 +       {    //check if all queues have been tried
   1.361 +         if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
   1.362 +          { tries++; //give a writer a chance to finish before yield
   1.363 +            if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
   1.364 +          }
   1.365 +       }
   1.366 +    }
   1.367 + }
   1.368 +
   1.369 +
   1.370 +/* 
   1.371 + * ---]]] uses the writerID as index to get the SRSW queue for that writer
   1.372 + * ---]]] performs writeQ on that queue (may block via repeated yield calls)
   1.373 + */
   1.374 +void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
   1.375 + { 
   1.376 +   if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
   1.377 +   
   1.378 +   writeSRSWQ( in, Q->internalQs[ writerID ] );
   1.379 + }