annotate BlockingQueue.c @ 50:93a5782d064b

adding netbeans project directories to repository
author Sean Halle <seanhalle@yahoo.com>
date Fri, 14 Feb 2014 07:54:22 -0800
parents 1ea30ca7093c
children
rev   line source
Me@19 1 /*
seanhalle@44 2 * Copyright 2009 OpenSourceResearchInstitute.org
Me@19 3 * Licensed under GNU General Public License version 2
Me@19 4 *
Me@19 5 * Author: seanhalle@yahoo.com
Me@19 6 */
Me@19 7
Me@19 8
Me@19 9 #include <stdio.h>
Me@19 10 #include <errno.h>
Me@19 11 #include <pthread.h>
Me@19 12 #include <stdlib.h>
Me@19 13 #include <sched.h>
seanhalle@29 14 #include <string.h>
Me@19 15
Me@19 16 #include "BlockingQueue.h"
seanhalle@49 17 #include <PR__include/prqueue.h>
seanhalle@49 18 #include <PR__include/prmalloc.h>
Me@19 19
Me@19 20 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
Me@19 21
Me@19 22 #define SPINLOCK_TRIES 100000
Me@19 23
Me@19 24
seanhalle@49 25 //========================== pthread based queue =========================
seanhalle@49 26 /*Not currently implemented.. however, copied this code from place where
seanhalle@49 27 * did equivalent.. Idea is to just make a private queue, then protect
seanhalle@49 28 * access with a lock.. copied code snippet below is how access was
seanhalle@49 29 * protected.. just roll this inside a "readBlockingQ()" function.. do
seanhalle@49 30 * equivalent inside writeBlockingQ() function.. to make one, just add
seanhalle@49 31 * the lock to the queue structure..
seanhalle@49 32 */
seanhalle@49 33 /*
seanhalle@49 34 production = NULL;
seanhalle@49 35 while( production == NULL )
seanhalle@49 36 { pthread_mutex_lock( &queueAccessLock );
seanhalle@49 37 production = readPrivQ( commQ );
seanhalle@49 38 pthread_mutex_unlock( &queueAccessLock );
seanhalle@49 39 // If empty, yields and tries again.
seanhalle@49 40 if( production == NULL) sched_yield();
seanhalle@49 41 }
seanhalle@49 42 */
Me@19 43
Me@19 44 //===========================================================================
Me@19 45 // multi reader multi writer fast Q via CAS
Me@19 46 #ifndef _GNU_SOURCE
Me@19 47 #define _GNU_SOURCE
Me@19 48
Me@19 49 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
Me@19 50 * or full
Me@19 51 *It uses CAS because it's meant to have more than one reader and more than
Me@19 52 * one writer.
Me@19 53 */
Me@19 54
Me@19 55 CASQueueStruc* makeCASQ()
Me@19 56 {
Me@19 57 CASQueueStruc* retQ;
seanhalle@49 58 retQ = (CASQueueStruc *) PR__malloc( sizeof( CASQueueStruc ) );
Me@19 59
Me@19 60 retQ->insertLock = UNLOCKED;
Me@19 61 retQ->extractLock= UNLOCKED;
seanhalle@29 62
seanhalle@29 63 retQ->extractPos = (volatile void**)&(retQ->startOfData[0]); //side by side == empty
seanhalle@29 64 retQ->insertPos = (volatile void**)&(retQ->startOfData[1]); // so start pos's have to be
Me@19 65 retQ->endOfData = &(retQ->startOfData[1023]);
Me@19 66
Me@19 67 return retQ;
Me@19 68 }
Me@19 69
Me@19 70
Me@19 71 void* readCASQ( CASQueueStruc* Q )
Me@19 72 { void *out = 0;
seanhalle@44 73 int32 tries = 0;
Me@19 74 void **startOfData = Q->startOfData;
Me@19 75 void **endOfData = Q->endOfData;
Me@19 76
seanhalle@44 77 int32 gotLock = FALSE;
Me@19 78
Me@19 79 while( TRUE )
Me@19 80 { //this intrinsic returns true if the lock held "UNLOCKED", in which
Me@19 81 // case it now holds "LOCKED" -- if it already held "LOCKED", then
Me@19 82 // gotLock is FALSE
Me@19 83 gotLock =
Me@19 84 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
Me@19 85 //NOTE: checked assy, and it does lock correctly..
Me@19 86 if( gotLock )
Me@19 87 {
seanhalle@29 88 void **insertPos = (void **)Q->insertPos;
seanhalle@29 89 void **extractPos = (void **)Q->extractPos;
Me@19 90
Me@19 91 //if not empty -- extract just below insert when empty
Me@19 92 if( insertPos - extractPos != 1 &&
Me@19 93 !(extractPos == endOfData && insertPos == startOfData))
Me@19 94 { //move before read
Me@19 95 if( extractPos == endOfData ) //write new pos exactly once, correctly
Me@19 96 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
Me@19 97 } // other thread might read bad pos
Me@19 98 else
Me@19 99 { Q->extractPos++;
Me@19 100 }
seanhalle@29 101 out = (void *) *(Q->extractPos);
Me@19 102 Q->extractLock = UNLOCKED;
Me@19 103 return out;
Me@19 104 }
Me@19 105 else //Q is empty
Me@19 106 { Q->extractLock = UNLOCKED;//empty, so release lock for others
Me@19 107 }
Me@19 108 }
Me@19 109 //Q is busy or empty
Me@19 110 tries++;
Me@19 111 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
Me@19 112 }
Me@19 113 }
Me@19 114
Me@19 115 void writeCASQ( void * in, CASQueueStruc* Q )
Me@19 116 {
seanhalle@44 117 int32 tries = 0;
Me@19 118 //TODO: need to make Q volatile? Want to do this Q in assembly!
Me@19 119 //Have no idea what GCC's going to do to this code
Me@19 120 void **startOfData = Q->startOfData;
Me@19 121 void **endOfData = Q->endOfData;
Me@19 122
seanhalle@44 123 int32 gotLock = FALSE;
Me@19 124
Me@19 125 while( TRUE )
Me@19 126 { //this intrinsic returns true if the lock held "UNLOCKED", in which
Me@19 127 // case it now holds "LOCKED" -- if it already held "LOCKED", then
Me@19 128 // gotLock is FALSE
Me@19 129 gotLock =
Me@19 130 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
Me@19 131 if( gotLock )
Me@19 132 {
seanhalle@29 133 void **insertPos = (void **)Q->insertPos;
seanhalle@29 134 void **extractPos = (void **)Q->extractPos;
Me@19 135
Me@19 136 //check if room to insert.. can't use a count variable
Me@19 137 // 'cause both insertor Thd and extractor Thd would write it
Me@19 138 if( extractPos - insertPos != 1 &&
Me@19 139 !(insertPos == endOfData && extractPos == startOfData))
Me@19 140 { *(Q->insertPos) = in; //insert before move
Me@19 141 if( insertPos == endOfData )
Me@19 142 { Q->insertPos = startOfData;
Me@19 143 }
Me@19 144 else
Me@19 145 { Q->insertPos++;
Me@19 146 }
Me@19 147 Q->insertLock = UNLOCKED;
Me@19 148 return;
Me@19 149 }
Me@19 150 else //Q is full
Me@19 151 { Q->insertLock = UNLOCKED;//full, so release lock for others
Me@19 152 }
Me@19 153 }
Me@19 154 tries++;
Me@19 155 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
Me@19 156 }
Me@19 157 }
Me@19 158
Me@19 159 #endif //_GNU_SOURCE
Me@19 160
Me@19 161
Me@19 162 //===========================================================================
Me@19 163 //Single reader single writer super fast Q.. no atomic instrs..
Me@19 164
Me@19 165
Me@19 166 /*This is a blocking queue, but it uses no atomic instructions, just does
Me@19 167 * yield() when empty or full
Me@19 168 *
Me@19 169 *It doesn't need any atomic instructions because only a single thread
Me@19 170 * extracts and only a single thread inserts, and it has no locations that
Me@19 171 * are written by both. It writes before moving and moves before reading,
Me@19 172 * and never lets write position and read position be the same, so dis-
Me@19 173 * synchrony can only ever cause an unnecessary call to yield(), never a
Me@19 174 * wrong value (by monotonicity of movement of pointers, plus single writer
Me@19 175 * to pointers, plus sequence of write before change pointer, plus
Me@19 176 * assumptions that if thread A semantically writes X before Y, then thread
Me@19 177 * B will see the writes in that order.)
Me@19 178 */
Me@19 179
Me@19 180 SRSWQueueStruc* makeSRSWQ()
Me@19 181 {
Me@19 182 SRSWQueueStruc* retQ;
seanhalle@49 183 retQ = (SRSWQueueStruc *) PR__malloc( sizeof( SRSWQueueStruc ) );
Me@19 184 memset( retQ->startOfData, 0, 1024 * sizeof(void *) );
Me@19 185
Me@19 186 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
Me@19 187 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
Me@19 188 retQ->endOfData = &(retQ->startOfData[1023]);
Me@19 189
Me@19 190 return retQ;
Me@19 191 }
Me@19 192
Me@19 193 void
Me@19 194 freeSRSWQ( SRSWQueueStruc* Q )
Me@19 195 {
seanhalle@48 196 PR__free( Q );
Me@19 197 }
Me@19 198
Me@19 199 void* readSRSWQ( SRSWQueueStruc* Q )
Me@19 200 { void *out = 0;
seanhalle@44 201 int32 tries = 0;
Me@19 202
Me@19 203 while( TRUE )
Me@19 204 {
Me@19 205 if( Q->insertPos - Q->extractPos != 1 &&
Me@19 206 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
Me@19 207 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
Me@19 208 else Q->extractPos++; //move before read
Me@19 209 out = *(Q->extractPos);
Me@19 210 return out;
Me@19 211 }
Me@19 212 //Q is empty
Me@19 213 tries++;
Me@19 214 if( tries > SPINLOCK_TRIES ) pthread_yield();
Me@19 215 }
Me@19 216 }
Me@19 217
Me@19 218
Me@19 219 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
Me@19 220 { void *out = 0;
seanhalle@44 221 int32 tries = 0;
Me@19 222
Me@19 223 while( TRUE )
Me@19 224 {
Me@19 225 if( Q->insertPos - Q->extractPos != 1 &&
Me@19 226 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
Me@19 227 { Q->extractPos++; //move before read
Me@19 228 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
Me@19 229 out = *(Q->extractPos);
Me@19 230 return out;
Me@19 231 }
Me@19 232 //Q is empty
Me@19 233 tries++;
Me@19 234 if( tries > 10 ) return NULL; //long enough for writer to finish
Me@19 235 }
Me@19 236 }
Me@19 237
Me@19 238
Me@19 239 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
Me@19 240 {
seanhalle@44 241 int32 tries = 0;
Me@19 242
Me@19 243 while( TRUE )
Me@19 244 {
Me@19 245 if( Q->extractPos - Q->insertPos != 1 &&
Me@19 246 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
Me@19 247 { *(Q->insertPos) = in; //insert before move
Me@19 248 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
Me@19 249 else Q->insertPos++;
Me@19 250 return;
Me@19 251 }
Me@19 252 //Q is full
Me@19 253 tries++;
Me@19 254 if( tries > SPINLOCK_TRIES ) pthread_yield();
Me@19 255 }
Me@19 256 }
Me@19 257
Me@19 258
Me@19 259
Me@19 260 //===========================================================================
Me@19 261 //Single reader Multiple writer super fast Q.. no atomic instrs..
Me@19 262
Me@19 263
Me@19 264 /*This is a blocking queue, but it uses no atomic instructions, just does
Me@19 265 * yield() when empty or full
Me@19 266 *
Me@19 267 *It doesn't need any atomic instructions because only a single thread
Me@19 268 * extracts and only a single thread inserts, and it has no locations that
Me@19 269 * are written by both. It writes before moving and moves before reading,
Me@19 270 * and never lets write position and read position be the same, so dis-
Me@19 271 * synchrony can only ever cause an unnecessary call to yield(), never a
Me@19 272 * wrong value (by monotonicity of movement of pointers, plus single writer
Me@19 273 * to pointers, plus sequence of write before change pointer, plus
Me@19 274 * assumptions that if thread A semantically writes X before Y, then thread
Me@19 275 * B will see the writes in that order.)
Me@19 276 *
Me@19 277 *The multi-writer version is implemented as a hierarchy. Each writer has
Me@19 278 * its own single-reader single-writer queue. The reader simply does a
Me@19 279 * round-robin harvesting from them.
Me@19 280 *
Me@19 281 *A writer must first register itself with the queue, and receives an ID back
Me@19 282 * It then uses that ID on each write operation.
Me@19 283 *
Me@19 284 *The implementation is:
Me@19 285 *Physically:
Me@19 286 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
Me@19 287 * -] it also has read-pointer to the last queue a write was taken from.
Me@19 288 *
Me@19 289 *Action-Patterns:
Me@19 290 * -] To add a writer
Me@19 291 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
Me@19 292 * --]] internally addWriterToQ does:
Me@19 293 * ---]]] if needs more room, makes a larger writer-array
Me@19 294 * ---]]] copies the old writer-array into the new
Me@19 295 * ---]]] makes a new SRSW queue an puts it into the array
Me@19 296 * ---]]] returns the index to the new SRSW queue as the ID
Me@19 297 * -] To write
Me@19 298 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
Me@19 299 * --]] this call may block, via repeated yield() calls
Me@19 300 * --]] internally, writeSRMWQ does:
Me@19 301 * ---]]] uses the writerID as index to get the SRSW queue for that writer
Me@19 302 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
Me@19 303 * -] To Read
Me@19 304 * --]] reader calls readSRMWQ, passing the Q struc
Me@19 305 * --]] this call may block, via repeated yield() calls
Me@19 306 * --]] internally, readSRMWQ does:
Me@19 307 * ---]]] gets saved index of last SRSW queue read from
Me@19 308 * ---]]] increments index and gets indexed queue
Me@19 309 * ---]]] does a non-blocking read of that queue
Me@19 310 * ---]]] if gets something, saves index and returns that value
Me@19 311 * ---]]] if gets null, then goes to next queue
Me@19 312 * ---]]] if got null from all the queues then does yield() then tries again
Me@19 313 *
Me@19 314 *Note: "0" is used as the value null, so SRSW queues must only contain
Me@19 315 * pointers, and cannot use 0 as a valid pointer value.
Me@19 316 *
Me@19 317 */
Me@19 318
Me@19 319 SRMWQueueStruc* makeSRMWQ()
Me@19 320 { SRMWQueueStruc* retQ;
Me@19 321
seanhalle@49 322 retQ = (SRMWQueueStruc *) PR__malloc( sizeof( SRMWQueueStruc ) );
Me@19 323
Me@19 324 retQ->numInternalQs = 0;
Me@19 325 retQ->internalQsSz = 10;
seanhalle@49 326 retQ->internalQs = PR__malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *));
Me@19 327
Me@19 328 retQ->lastQReadFrom = 0;
Me@19 329
Me@19 330 return retQ;
Me@19 331 }
Me@19 332
Me@19 333 /* ---]]] if needs more room, makes a larger writer-array
Me@19 334 * ---]]] copies the old writer-array into the new
Me@19 335 * ---]]] makes a new SRSW queue an puts it into the array
Me@19 336 * ---]]] returns the index to the new SRSW queue as the ID
Me@19 337 *
Me@19 338 *NOTE: assuming all adds are completed before any writes or reads are
Me@19 339 * performed.. otherwise, this needs to be re-done carefully, probably with
Me@19 340 * a lock.
Me@19 341 */
Me@19 342 int addWriterToSRMWQ( SRMWQueueStruc* Q )
Me@19 343 { int oldSz, i;
Me@19 344 SRSWQueueStruc * *oldArray;
Me@19 345
Me@19 346 (Q->numInternalQs)++;
Me@19 347 if( Q->numInternalQs >= Q->internalQsSz )
Me@19 348 { //full, so make bigger
Me@19 349 oldSz = Q->internalQsSz;
Me@19 350 oldArray = Q->internalQs;
Me@19 351 Q->internalQsSz *= 2;
seanhalle@49 352 Q->internalQs = PR__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
Me@19 353 for( i = 0; i < oldSz; i++ )
Me@19 354 { Q->internalQs[i] = oldArray[i];
Me@19 355 }
seanhalle@48 356 PR__free( oldArray );
Me@19 357 }
Me@19 358 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
Me@19 359 return Q->numInternalQs - 1;
Me@19 360 }
Me@19 361
Me@19 362
Me@19 363 /* ---]]] gets saved index of last SRSW queue read-from
Me@19 364 * ---]]] increments index and gets indexed queue
Me@19 365 * ---]]] does a non-blocking read of that queue
Me@19 366 * ---]]] if gets something, saves index and returns that value
Me@19 367 * ---]]] if gets null, then goes to next queue
Me@19 368 * ---]]] if got null from all the queues then does yield() then tries again
Me@19 369 */
Me@19 370 void* readSRMWQ( SRMWQueueStruc* Q )
Me@19 371 { SRSWQueueStruc *readQ;
Me@19 372 void *readValue = 0;
seanhalle@44 373 int32 tries = 0;
seanhalle@44 374 int32 QToReadFrom = 0;
Me@19 375
Me@19 376 QToReadFrom = Q->lastQReadFrom;
Me@19 377
Me@19 378 while( TRUE )
Me@19 379 { QToReadFrom++;
Me@19 380 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
Me@19 381 readQ = Q->internalQs[ QToReadFrom ];
Me@19 382 readValue = readSRSWQ_NonBlocking( readQ );
Me@19 383
Me@19 384 if( readValue != 0 ) //got a value, return it
Me@19 385 { Q->lastQReadFrom = QToReadFrom;
Me@19 386 return readValue;
Me@19 387 }
Me@19 388 else //SRSW Q just read is empty
Me@19 389 { //check if all queues have been tried
Me@19 390 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
Me@19 391 { tries++; //give a writer a chance to finish before yield
Me@19 392 if( tries > SPINLOCK_TRIES ) pthread_yield();
Me@19 393 }
Me@19 394 }
Me@19 395 }
Me@19 396 }
Me@19 397
Me@19 398
Me@19 399 /*
Me@19 400 * ---]]] uses the writerID as index to get the SRSW queue for that writer
Me@19 401 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
Me@19 402 */
Me@19 403 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
Me@19 404 {
Me@19 405 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
Me@19 406
Me@19 407 writeSRSWQ( in, Q->internalQs[ writerID ] );
Me@19 408 }