Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > Queue_impl
changeset 1:81f6687d52d1
Correct SRSW queue and correst CAS queue
| author | Me |
|---|---|
| date | Fri, 18 Jun 2010 17:49:38 -0700 |
| parents | 85af604dee9b |
| children | f4d50d8a1a38 |
| files | BlockingQueue.c |
| diffstat | 1 files changed, 211 insertions(+), 52 deletions(-) [+] |
line diff
1.1 --- a/BlockingQueue.c Sat May 22 19:51:09 2010 -0700 1.2 +++ b/BlockingQueue.c Fri Jun 18 17:49:38 2010 -0700 1.3 @@ -1,9 +1,7 @@ 1.4 /* 1.5 - * Copyright 2009 OpenSourceCodeStewardshipFoundation.org 1.6 + * Copyright 2009 OpenSourceStewardshipFoundation.org 1.7 * Licensed under GNU General Public License version 2 1.8 * 1.9 - * NOTE: this version of SRSW correct as of April 25, 2010 1.10 - * 1.11 * Author: seanhalle@yahoo.com 1.12 */ 1.13 1.14 @@ -12,11 +10,14 @@ 1.15 #include <errno.h> 1.16 #include <pthread.h> 1.17 #include <stdlib.h> 1.18 +#include <sched.h> 1.19 +#include <windows.h> 1.20 1.21 #include "BlockingQueue.h" 1.22 1.23 #define INC(x) (++x == 1024) ? (x) = 0 : (x) 1.24 1.25 +#define SPINLOCK_TRIES 100000 1.26 1.27 //=========================================================================== 1.28 //Normal pthread Q 1.29 @@ -67,6 +68,7 @@ 1.30 { 1.31 while( Q -> count == 0 ) 1.32 { Q -> w_empty = 1; 1.33 + // pthread_cond_broadcast( &Q->cond_w_t ); 1.34 status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); 1.35 if (status != 0) 1.36 { perror("Thread wait error: "); 1.37 @@ -79,6 +81,7 @@ 1.38 INC( Q->readPos ); 1.39 wt = Q -> w_full; 1.40 Q -> w_full = 0; 1.41 + //pthread_cond_broadcast( &Q->cond_w_t ); 1.42 } 1.43 pthread_mutex_unlock( &Q->mutex_t ); 1.44 if (wt) pthread_cond_signal( &Q->cond_w_t ); 1.45 @@ -142,11 +145,11 @@ 1.46 1.47 1.48 void* readCASQ( CASQueueStruc* Q ) 1.49 - { void *out = 0; 1.50 - int tries = 0; 1.51 - int startOfData = Q->startOfData; 1.52 - int endOfData = Q->endOfData; 1.53 - 1.54 + { void *out = 0; 1.55 + int tries = 0; 1.56 + void **startOfData = Q->startOfData; 1.57 + void **endOfData = Q->endOfData; 1.58 + 1.59 int success = FALSE; 1.60 1.61 while( !success ) 1.62 @@ -154,8 +157,8 @@ 1.63 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); 1.64 if( success ) 1.65 { 1.66 - volatile int insertPos = Q->insertPos; 1.67 - volatile int extractPos = Q->extractPos; 1.68 + void **insertPos = Q->insertPos; 1.69 + void **extractPos = Q->extractPos; 1.70 1.71 //if not empty -- extract just below insert when empty 1.72 if( insertPos - extractPos != 1 && 1.73 @@ -178,16 +181,18 @@ 1.74 } 1.75 //Q is busy or empty 1.76 tries++; 1.77 - if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock 1.78 + if( tries > SPINLOCK_TRIES ) SwitchToThread(); //WinAPI yield() 1.79 } 1.80 } 1.81 1.82 void writeCASQ( void * in, CASQueueStruc* Q ) 1.83 { 1.84 int tries = 0; 1.85 - int startOfData = Q->startOfData; 1.86 - int endOfData = Q->endOfData; 1.87 - 1.88 + //TODO: need to make Q volatile? Want to do this Q in assembly! 1.89 + //Have no idea what GCC's going to do to this code 1.90 + void **startOfData = Q->startOfData; 1.91 + void **endOfData = Q->endOfData; 1.92 + 1.93 int success = FALSE; 1.94 1.95 while( !success ) 1.96 @@ -195,14 +200,14 @@ 1.97 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); 1.98 if( success ) 1.99 { 1.100 - volatile int insertPos = Q->insertPos; 1.101 - volatile int extractPos = Q->extractPos; 1.102 + void **insertPos = Q->insertPos; 1.103 + void **extractPos = Q->extractPos; 1.104 1.105 //check if room to insert.. can't use a count variable 1.106 // 'cause both insertor Thd and extractor Thd would write it 1.107 if( extractPos - insertPos != 1 && 1.108 !(insertPos == endOfData && extractPos == startOfData)) 1.109 - { *(insertPos) = in; //insert before move 1.110 + { *(Q->insertPos) = in; //insert before move 1.111 if( insertPos == endOfData ) //write new pos exactly once, correctly 1.112 { Q->insertPos = startOfData; 1.113 } 1.114 @@ -218,18 +223,19 @@ 1.115 } 1.116 } 1.117 tries++; 1.118 - if( tries > 10000 ) pthread_yield();//yield not guaranteed 1.119 + if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() 1.120 } 1.121 } 1.122 1.123 #endif //_GNU_SOURCE 1.124 1.125 + 1.126 //=========================================================================== 1.127 //Single reader single writer super fast Q.. no atomic instrs.. 1.128 1.129 1.130 /*This is a blocking queue, but it uses no atomic instructions, just does 1.131 - * busy-waiting when empty or full (but yield() if waits too long) 1.132 + * yield() when empty or full 1.133 * 1.134 *It doesn't need any atomic instructions because only a single thread 1.135 * extracts and only a single thread inserts, and it has no locations that 1.136 @@ -258,57 +264,210 @@ 1.137 void* readSRSWQ( SRSWQueueStruc* Q ) 1.138 { void *out = 0; 1.139 int tries = 0; 1.140 - int startOfData = Q->startOfData; 1.141 - int endOfData = Q->endOfData; 1.142 1.143 while( TRUE ) 1.144 - { //not certain the volatile reads need to be done, but safe.. 1.145 - volatile int insertPos = Q->insertPos; 1.146 - volatile int extractPos = Q->extractPos; 1.147 - 1.148 - //if not empty -- extract just below insert when empty 1.149 - if( insertPos - extractPos != 1 && 1.150 - !(extractPos == endOfData && insertPos == startOfData)) 1.151 - { //move before read 1.152 - if( extractPos == endOfData ) //write new pos exactly once, correctly 1.153 - { Q->extractPos = startOfData; //can't overrun then fix it 'cause 1.154 - } // other thread might read bad pos 1.155 - else 1.156 - { Q->extractPos++; 1.157 - } 1.158 + { 1.159 + if( Q->insertPos - Q->extractPos != 1 && 1.160 + !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) 1.161 + { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData; 1.162 + else Q->extractPos++; //move before read 1.163 out = *(Q->extractPos); 1.164 return out; 1.165 } 1.166 //Q is empty 1.167 tries++; 1.168 - if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock 1.169 + if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() 1.170 } 1.171 } 1.172 1.173 + 1.174 +void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q ) 1.175 + { void *out = 0; 1.176 + int tries = 0; 1.177 + 1.178 + while( TRUE ) 1.179 + { 1.180 + if( Q->insertPos - Q->extractPos != 1 && 1.181 + !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) 1.182 + { Q->extractPos++; //move before read 1.183 + if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData; 1.184 + out = *(Q->extractPos); 1.185 + return out; 1.186 + } 1.187 + //Q is empty 1.188 + tries++; 1.189 + if( tries > 2 ) return 0; //long enough for writer to finish 1.190 + } 1.191 + } 1.192 + 1.193 + 1.194 void writeSRSWQ( void * in, SRSWQueueStruc* Q ) 1.195 { 1.196 int tries = 0; 1.197 - int startOfData = Q->startOfData; 1.198 - int endOfData = Q->endOfData; 1.199 1.200 while( TRUE ) 1.201 - { //not certain the volatile reads need to be done, but safe.. 1.202 - volatile int insertPos = Q->insertPos; 1.203 - volatile int extractPos = Q->extractPos; 1.204 - 1.205 - if( extractPos - insertPos != 1 && 1.206 - !(insertPos == endOfData && extractPos == startOfData)) 1.207 - { *(insertPos) = in; //insert before move 1.208 - if( insertPos == endOfData ) //write new pos exactly once, correctly 1.209 - { Q->insertPos = startOfData; 1.210 - } 1.211 - else 1.212 - { Q->insertPos++; 1.213 - } 1.214 + { 1.215 + if( Q->extractPos - Q->insertPos != 1 && 1.216 + !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData)) 1.217 + { *(Q->insertPos) = in; //insert before move 1.218 + if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData; 1.219 + else Q->insertPos++; 1.220 return; 1.221 } 1.222 //Q is full 1.223 tries++; 1.224 - if( tries > 10000 ) pthread_yield();//yield not guaranteed 1.225 + if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() 1.226 } 1.227 } 1.228 + 1.229 + 1.230 + 1.231 +//=========================================================================== 1.232 +//Single reader Multiple writer super fast Q.. no atomic instrs.. 1.233 + 1.234 + 1.235 +/*This is a blocking queue, but it uses no atomic instructions, just does 1.236 + * yield() when empty or full 1.237 + * 1.238 + *It doesn't need any atomic instructions because only a single thread 1.239 + * extracts and only a single thread inserts, and it has no locations that 1.240 + * are written by both. It writes before moving and moves before reading, 1.241 + * and never lets write position and read position be the same, so dis- 1.242 + * synchrony can only ever cause an unnecessary call to yield(), never a 1.243 + * wrong value (by monotonicity of movement of pointers, plus single writer 1.244 + * to pointers, plus sequence of write before change pointer, plus 1.245 + * assumptions that if thread A semantically writes X before Y, then thread 1.246 + * B will see the writes in that order.) 1.247 + * 1.248 + *The multi-writer version is implemented as a hierarchy. Each writer has 1.249 + * its own single-reader single-writer queue. The reader simply does a 1.250 + * round-robin harvesting from them. 1.251 + * 1.252 + *A writer must first register itself with the queue, and receives an ID back 1.253 + * It then uses that ID on each write operation. 1.254 + * 1.255 + *The implementation is: 1.256 + *Physically: 1.257 + * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s 1.258 + * -] it also has read-pointer to the last queue a write was taken from. 1.259 + * 1.260 + *Action-Patterns: 1.261 + * -] To add a writer 1.262 + * --]] writer-thread calls addWriterToQ(), remember the ID it returns 1.263 + * --]] internally addWriterToQ does: 1.264 + * ---]]] if needs more room, makes a larger writer-array 1.265 + * ---]]] copies the old writer-array into the new 1.266 + * ---]]] makes a new SRSW queue an puts it into the array 1.267 + * ---]]] returns the index to the new SRSW queue as the ID 1.268 + * -] To write 1.269 + * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID 1.270 + * --]] this call may block, via repeated yield() calls 1.271 + * --]] internally, writeSRMWQ does: 1.272 + * ---]]] uses the writerID as index to get the SRSW queue for that writer 1.273 + * ---]]] performs writeQ on that queue (may block via repeated yield calls) 1.274 + * -] To Read 1.275 + * --]] reader calls readSRMWQ, passing the Q struc 1.276 + * --]] this call may block, via repeated yield() calls 1.277 + * --]] internally, readSRMWQ does: 1.278 + * ---]]] gets saved index of last SRSW queue read from 1.279 + * ---]]] increments index and gets indexed queue 1.280 + * ---]]] does a non-blocking read of that queue 1.281 + * ---]]] if gets something, saves index and returns that value 1.282 + * ---]]] if gets null, then goes to next queue 1.283 + * ---]]] if got null from all the queues then does yield() then tries again 1.284 + * 1.285 + *Note: "0" is used as the value null, so SRSW queues must only contain 1.286 + * pointers, and cannot use 0 as a valid pointer value. 1.287 + * 1.288 + */ 1.289 + 1.290 +SRMWQueueStruc* makeSRMWQ() 1.291 + { SRMWQueueStruc* retQ; 1.292 + 1.293 + retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) ); 1.294 + 1.295 + retQ->numInternalQs = 0; 1.296 + retQ->internalQsSz = 10; 1.297 + retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *)); 1.298 + 1.299 + retQ->lastQReadFrom = 0; 1.300 + 1.301 + return retQ; 1.302 + } 1.303 + 1.304 +/* ---]]] if needs more room, makes a larger writer-array 1.305 + * ---]]] copies the old writer-array into the new 1.306 + * ---]]] makes a new SRSW queue an puts it into the array 1.307 + * ---]]] returns the index to the new SRSW queue as the ID 1.308 + * 1.309 + *NOTE: assuming all adds are completed before any writes or reads are 1.310 + * performed.. otherwise, this needs to be re-done carefully, probably with 1.311 + * a lock. 1.312 + */ 1.313 +int addWriterToSRMWQ( SRMWQueueStruc* Q ) 1.314 + { int oldSz, i; 1.315 + SRSWQueueStruc * *oldArray; 1.316 + 1.317 + (Q->numInternalQs)++; 1.318 + if( Q->numInternalQs >= Q->internalQsSz ) 1.319 + { //full, so make bigger 1.320 + oldSz = Q->internalQsSz; 1.321 + oldArray = Q->internalQs; 1.322 + Q->internalQsSz *= 2; 1.323 + Q->internalQs = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *)); 1.324 + for( i = 0; i < oldSz; i++ ) 1.325 + { Q->internalQs[i] = oldArray[i]; 1.326 + } 1.327 + free( oldArray ); 1.328 + } 1.329 + Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ(); 1.330 + return Q->numInternalQs - 1; 1.331 + } 1.332 + 1.333 + 1.334 +/* ---]]] gets saved index of last SRSW queue read-from 1.335 + * ---]]] increments index and gets indexed queue 1.336 + * ---]]] does a non-blocking read of that queue 1.337 + * ---]]] if gets something, saves index and returns that value 1.338 + * ---]]] if gets null, then goes to next queue 1.339 + * ---]]] if got null from all the queues then does yield() then tries again 1.340 + */ 1.341 +void* readSRMWQ( SRMWQueueStruc* Q ) 1.342 + { SRSWQueueStruc *readQ; 1.343 + void *readValue = 0; 1.344 + int tries = 0; 1.345 + int QToReadFrom = 0; 1.346 + 1.347 + QToReadFrom = Q->lastQReadFrom; 1.348 + 1.349 + while( TRUE ) 1.350 + { QToReadFrom++; 1.351 + if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0; 1.352 + readQ = Q->internalQs[ QToReadFrom ]; 1.353 + readValue = readSRSWQ_NonBlocking( readQ ); 1.354 + 1.355 + if( readValue != 0 ) //got a value, return it 1.356 + { Q->lastQReadFrom = QToReadFrom; 1.357 + return readValue; 1.358 + } 1.359 + else //SRSW Q just read is empty 1.360 + { //check if all queues have been tried 1.361 + if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty 1.362 + { tries++; //give a writer a chance to finish before yield 1.363 + if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() 1.364 + } 1.365 + } 1.366 + } 1.367 + } 1.368 + 1.369 + 1.370 +/* 1.371 + * ---]]] uses the writerID as index to get the SRSW queue for that writer 1.372 + * ---]]] performs writeQ on that queue (may block via repeated yield calls) 1.373 + */ 1.374 +void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID ) 1.375 + { 1.376 + if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error 1.377 + 1.378 + writeSRSWQ( in, Q->internalQs[ writerID ] ); 1.379 + }
