annotate BlockingQueue.c @ 25:7742a5e0d92e

Created pure_C brch
author kshalle
date Mon, 13 Feb 2012 13:29:51 -0800
parents 53c614b781ce
children b5ae7fbb1f01
rev   line source
Me@19 1 /*
Me@19 2 * Copyright 2009 OpenSourceStewardshipFoundation.org
Me@19 3 * Licensed under GNU General Public License version 2
Me@19 4 *
Me@19 5 * Author: seanhalle@yahoo.com
Me@19 6 */
Me@19 7
Me@19 8
Me@19 9 #include <stdio.h>
Me@19 10 #include <errno.h>
Me@19 11 #include <pthread.h>
Me@19 12 #include <stdlib.h>
Me@19 13 #include <sched.h>
Me@19 14
Me@19 15 #include "BlockingQueue.h"
Me@19 16
Me@19 17 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
Me@19 18
Me@19 19 #define SPINLOCK_TRIES 100000
Me@19 20
Me@19 21 //===========================================================================
Me@19 22 //Normal pthread Q
Me@19 23
Me@19 24 PThdQueueStruc* makePThdQ()
Me@19 25 {
Me@19 26 PThdQueueStruc* retQ;
Me@19 27 int retCode;
Me@19 28 retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) );
Me@19 29
Me@19 30
Me@19 31 retCode =
Me@19 32 pthread_mutex_init( &retQ->mutex_t, NULL);
Me@19 33 if(retCode){perror("Error in creating mutex:"); exit(1);}
Me@19 34
Me@19 35 retCode = pthread_cond_init ( &retQ->cond_w_t, NULL);
Me@19 36 if(retCode){perror("Error in creating cond_var:"); exit(1);}
Me@19 37
Me@19 38 retCode = pthread_cond_init ( &retQ->cond_r_t, NULL);
Me@19 39 if(retCode){perror("Error in creating cond_var:"); exit(1);}
Me@19 40
Me@19 41 retQ->count = 0;
Me@19 42 retQ->readPos = 0;
Me@19 43 retQ->writePos = 0;
Me@19 44 retQ->w_empty = 0;
Me@19 45 retQ->w_full = 0;
Me@19 46
Me@19 47 return retQ;
Me@19 48 }
Me@19 49
Me@19 50 void * readPThdQ( PThdQueueStruc *Q )
Me@19 51 { void *ret;
Me@19 52 int retCode, wt;
Me@19 53 pthread_mutex_lock( &Q->mutex_t );
Me@19 54 {
Me@19 55 while( Q -> count == 0 )
Me@19 56 { Q -> w_empty = 1;
Me@19 57 retCode =
Me@19 58 pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
Me@19 59 if( retCode ){ perror("Thread wait error: "); exit(1); }
Me@19 60 }
Me@19 61 Q -> w_empty = 0;
Me@19 62 Q -> count -= 1;
Me@19 63 ret = Q->data[ Q->readPos ];
Me@19 64 INC( Q->readPos );
Me@19 65 wt = Q -> w_full;
Me@19 66 Q -> w_full = 0;
Me@19 67 }
Me@19 68 pthread_mutex_unlock( &Q->mutex_t );
Me@19 69 if (wt)
Me@19 70 pthread_cond_signal( &Q->cond_w_t );
Me@19 71
Me@19 72 //printf("Q out: %d\n", ret);
Me@19 73 return( ret );
Me@19 74 }
Me@19 75
Me@19 76 void writePThdQ( void * in, PThdQueueStruc* Q )
Me@19 77 {
Me@19 78 int status, wt;
Me@19 79 //printf("Q in: %d\n", in);
Me@19 80
Me@19 81 pthread_mutex_lock( &Q->mutex_t );
Me@19 82 {
Me@19 83 while( Q->count >= 1024 )
Me@19 84 {
Me@19 85 Q -> w_full = 1;
Me@19 86 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t );
Me@19 87 if (status != 0)
Me@19 88 { perror("Thread wait error: ");
Me@19 89 exit(1);
Me@19 90 }
Me@19 91 }
Me@19 92
Me@19 93 Q -> w_full = 0;
Me@19 94 Q->count += 1;
Me@19 95 Q->data[ Q->writePos ] = in;
Me@19 96 INC( Q->writePos );
Me@19 97 wt = Q -> w_empty;
Me@19 98 Q -> w_empty = 0;
Me@19 99 }
Me@19 100
Me@19 101 pthread_mutex_unlock( &Q->mutex_t );
Me@19 102 if( wt ) pthread_cond_signal( &Q->cond_r_t );
Me@19 103 }
Me@19 104
Me@19 105
Me@19 106 //===========================================================================
Me@19 107 // multi reader multi writer fast Q via CAS
Me@19 108 #ifndef _GNU_SOURCE
Me@19 109 #define _GNU_SOURCE
Me@19 110
Me@19 111 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
Me@19 112 * or full
Me@19 113 *It uses CAS because it's meant to have more than one reader and more than
Me@19 114 * one writer.
Me@19 115 */
Me@19 116
Me@19 117 CASQueueStruc* makeCASQ()
Me@19 118 {
Me@19 119 CASQueueStruc* retQ;
Me@19 120 retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) );
Me@19 121
Me@19 122 retQ->insertLock = UNLOCKED;
Me@19 123 retQ->extractLock= UNLOCKED;
Me@19 124 //TODO: check got pointer syntax right
Me@19 125 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
Me@19 126 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
Me@19 127 retQ->endOfData = &(retQ->startOfData[1023]);
Me@19 128
Me@19 129 return retQ;
Me@19 130 }
Me@19 131
Me@19 132
Me@19 133 void* readCASQ( CASQueueStruc* Q )
Me@19 134 { void *out = 0;
Me@19 135 int tries = 0;
Me@19 136 void **startOfData = Q->startOfData;
Me@19 137 void **endOfData = Q->endOfData;
Me@19 138
Me@19 139 int gotLock = FALSE;
Me@19 140
Me@19 141 while( TRUE )
Me@19 142 { //this intrinsic returns true if the lock held "UNLOCKED", in which
Me@19 143 // case it now holds "LOCKED" -- if it already held "LOCKED", then
Me@19 144 // gotLock is FALSE
Me@19 145 gotLock =
Me@19 146 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
Me@19 147 //NOTE: checked assy, and it does lock correctly..
Me@19 148 if( gotLock )
Me@19 149 {
Me@19 150 void **insertPos = Q->insertPos;
Me@19 151 void **extractPos = Q->extractPos;
Me@19 152
Me@19 153 //if not empty -- extract just below insert when empty
Me@19 154 if( insertPos - extractPos != 1 &&
Me@19 155 !(extractPos == endOfData && insertPos == startOfData))
Me@19 156 { //move before read
Me@19 157 if( extractPos == endOfData ) //write new pos exactly once, correctly
Me@19 158 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
Me@19 159 } // other thread might read bad pos
Me@19 160 else
Me@19 161 { Q->extractPos++;
Me@19 162 }
Me@19 163 out = *(Q->extractPos);
Me@19 164 Q->extractLock = UNLOCKED;
Me@19 165 return out;
Me@19 166 }
Me@19 167 else //Q is empty
Me@19 168 { Q->extractLock = UNLOCKED;//empty, so release lock for others
Me@19 169 }
Me@19 170 }
Me@19 171 //Q is busy or empty
Me@19 172 tries++;
Me@19 173 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
Me@19 174 }
Me@19 175 }
Me@19 176
Me@19 177 void writeCASQ( void * in, CASQueueStruc* Q )
Me@19 178 {
Me@19 179 int tries = 0;
Me@19 180 //TODO: need to make Q volatile? Want to do this Q in assembly!
Me@19 181 //Have no idea what GCC's going to do to this code
Me@19 182 void **startOfData = Q->startOfData;
Me@19 183 void **endOfData = Q->endOfData;
Me@19 184
Me@19 185 int gotLock = FALSE;
Me@19 186
Me@19 187 while( TRUE )
Me@19 188 { //this intrinsic returns true if the lock held "UNLOCKED", in which
Me@19 189 // case it now holds "LOCKED" -- if it already held "LOCKED", then
Me@19 190 // gotLock is FALSE
Me@19 191 gotLock =
Me@19 192 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
Me@19 193 if( gotLock )
Me@19 194 {
Me@19 195 void **insertPos = Q->insertPos;
Me@19 196 void **extractPos = Q->extractPos;
Me@19 197
Me@19 198 //check if room to insert.. can't use a count variable
Me@19 199 // 'cause both insertor Thd and extractor Thd would write it
Me@19 200 if( extractPos - insertPos != 1 &&
Me@19 201 !(insertPos == endOfData && extractPos == startOfData))
Me@19 202 { *(Q->insertPos) = in; //insert before move
Me@19 203 if( insertPos == endOfData )
Me@19 204 { Q->insertPos = startOfData;
Me@19 205 }
Me@19 206 else
Me@19 207 { Q->insertPos++;
Me@19 208 }
Me@19 209 Q->insertLock = UNLOCKED;
Me@19 210 return;
Me@19 211 }
Me@19 212 else //Q is full
Me@19 213 { Q->insertLock = UNLOCKED;//full, so release lock for others
Me@19 214 }
Me@19 215 }
Me@19 216 tries++;
Me@19 217 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
Me@19 218 }
Me@19 219 }
Me@19 220
Me@19 221 #endif //_GNU_SOURCE
Me@19 222
Me@19 223
Me@19 224 //===========================================================================
Me@19 225 //Single reader single writer super fast Q.. no atomic instrs..
Me@19 226
Me@19 227
Me@19 228 /*This is a blocking queue, but it uses no atomic instructions, just does
Me@19 229 * yield() when empty or full
Me@19 230 *
Me@19 231 *It doesn't need any atomic instructions because only a single thread
Me@19 232 * extracts and only a single thread inserts, and it has no locations that
Me@19 233 * are written by both. It writes before moving and moves before reading,
Me@19 234 * and never lets write position and read position be the same, so dis-
Me@19 235 * synchrony can only ever cause an unnecessary call to yield(), never a
Me@19 236 * wrong value (by monotonicity of movement of pointers, plus single writer
Me@19 237 * to pointers, plus sequence of write before change pointer, plus
Me@19 238 * assumptions that if thread A semantically writes X before Y, then thread
Me@19 239 * B will see the writes in that order.)
Me@19 240 */
Me@19 241
Me@19 242 SRSWQueueStruc* makeSRSWQ()
Me@19 243 {
Me@19 244 SRSWQueueStruc* retQ;
Me@19 245 retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) );
Me@19 246 memset( retQ->startOfData, 0, 1024 * sizeof(void *) );
Me@19 247
Me@19 248 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
Me@19 249 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
Me@19 250 retQ->endOfData = &(retQ->startOfData[1023]);
Me@19 251
Me@19 252 return retQ;
Me@19 253 }
Me@19 254
Me@19 255 void
Me@19 256 freeSRSWQ( SRSWQueueStruc* Q )
Me@19 257 {
Me@19 258 free( Q );
Me@19 259 }
Me@19 260
Me@19 261 void* readSRSWQ( SRSWQueueStruc* Q )
Me@19 262 { void *out = 0;
Me@19 263 int tries = 0;
Me@19 264
Me@19 265 while( TRUE )
Me@19 266 {
Me@19 267 if( Q->insertPos - Q->extractPos != 1 &&
Me@19 268 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
Me@19 269 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
Me@19 270 else Q->extractPos++; //move before read
Me@19 271 out = *(Q->extractPos);
Me@19 272 return out;
Me@19 273 }
Me@19 274 //Q is empty
Me@19 275 tries++;
Me@19 276 if( tries > SPINLOCK_TRIES ) pthread_yield();
Me@19 277 }
Me@19 278 }
Me@19 279
Me@19 280
Me@19 281 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
Me@19 282 { void *out = 0;
Me@19 283 int tries = 0;
Me@19 284
Me@19 285 while( TRUE )
Me@19 286 {
Me@19 287 if( Q->insertPos - Q->extractPos != 1 &&
Me@19 288 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
Me@19 289 { Q->extractPos++; //move before read
Me@19 290 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
Me@19 291 out = *(Q->extractPos);
Me@19 292 return out;
Me@19 293 }
Me@19 294 //Q is empty
Me@19 295 tries++;
Me@19 296 if( tries > 10 ) return NULL; //long enough for writer to finish
Me@19 297 }
Me@19 298 }
Me@19 299
Me@19 300
Me@19 301 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
Me@19 302 {
Me@19 303 int tries = 0;
Me@19 304
Me@19 305 while( TRUE )
Me@19 306 {
Me@19 307 if( Q->extractPos - Q->insertPos != 1 &&
Me@19 308 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
Me@19 309 { *(Q->insertPos) = in; //insert before move
Me@19 310 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
Me@19 311 else Q->insertPos++;
Me@19 312 return;
Me@19 313 }
Me@19 314 //Q is full
Me@19 315 tries++;
Me@19 316 if( tries > SPINLOCK_TRIES ) pthread_yield();
Me@19 317 }
Me@19 318 }
Me@19 319
Me@19 320
Me@19 321
Me@19 322 //===========================================================================
Me@19 323 //Single reader Multiple writer super fast Q.. no atomic instrs..
Me@19 324
Me@19 325
Me@19 326 /*This is a blocking queue, but it uses no atomic instructions, just does
Me@19 327 * yield() when empty or full
Me@19 328 *
Me@19 329 *It doesn't need any atomic instructions because only a single thread
Me@19 330 * extracts and only a single thread inserts, and it has no locations that
Me@19 331 * are written by both. It writes before moving and moves before reading,
Me@19 332 * and never lets write position and read position be the same, so dis-
Me@19 333 * synchrony can only ever cause an unnecessary call to yield(), never a
Me@19 334 * wrong value (by monotonicity of movement of pointers, plus single writer
Me@19 335 * to pointers, plus sequence of write before change pointer, plus
Me@19 336 * assumptions that if thread A semantically writes X before Y, then thread
Me@19 337 * B will see the writes in that order.)
Me@19 338 *
Me@19 339 *The multi-writer version is implemented as a hierarchy. Each writer has
Me@19 340 * its own single-reader single-writer queue. The reader simply does a
Me@19 341 * round-robin harvesting from them.
Me@19 342 *
Me@19 343 *A writer must first register itself with the queue, and receives an ID back
Me@19 344 * It then uses that ID on each write operation.
Me@19 345 *
Me@19 346 *The implementation is:
Me@19 347 *Physically:
Me@19 348 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
Me@19 349 * -] it also has read-pointer to the last queue a write was taken from.
Me@19 350 *
Me@19 351 *Action-Patterns:
Me@19 352 * -] To add a writer
Me@19 353 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
Me@19 354 * --]] internally addWriterToQ does:
Me@19 355 * ---]]] if needs more room, makes a larger writer-array
Me@19 356 * ---]]] copies the old writer-array into the new
Me@19 357 * ---]]] makes a new SRSW queue an puts it into the array
Me@19 358 * ---]]] returns the index to the new SRSW queue as the ID
Me@19 359 * -] To write
Me@19 360 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
Me@19 361 * --]] this call may block, via repeated yield() calls
Me@19 362 * --]] internally, writeSRMWQ does:
Me@19 363 * ---]]] uses the writerID as index to get the SRSW queue for that writer
Me@19 364 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
Me@19 365 * -] To Read
Me@19 366 * --]] reader calls readSRMWQ, passing the Q struc
Me@19 367 * --]] this call may block, via repeated yield() calls
Me@19 368 * --]] internally, readSRMWQ does:
Me@19 369 * ---]]] gets saved index of last SRSW queue read from
Me@19 370 * ---]]] increments index and gets indexed queue
Me@19 371 * ---]]] does a non-blocking read of that queue
Me@19 372 * ---]]] if gets something, saves index and returns that value
Me@19 373 * ---]]] if gets null, then goes to next queue
Me@19 374 * ---]]] if got null from all the queues then does yield() then tries again
Me@19 375 *
Me@19 376 *Note: "0" is used as the value null, so SRSW queues must only contain
Me@19 377 * pointers, and cannot use 0 as a valid pointer value.
Me@19 378 *
Me@19 379 */
Me@19 380
Me@19 381 SRMWQueueStruc* makeSRMWQ()
Me@19 382 { SRMWQueueStruc* retQ;
Me@19 383
Me@19 384 retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) );
Me@19 385
Me@19 386 retQ->numInternalQs = 0;
Me@19 387 retQ->internalQsSz = 10;
Me@19 388 retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *));
Me@19 389
Me@19 390 retQ->lastQReadFrom = 0;
Me@19 391
Me@19 392 return retQ;
Me@19 393 }
Me@19 394
Me@19 395 /* ---]]] if needs more room, makes a larger writer-array
Me@19 396 * ---]]] copies the old writer-array into the new
Me@19 397 * ---]]] makes a new SRSW queue an puts it into the array
Me@19 398 * ---]]] returns the index to the new SRSW queue as the ID
Me@19 399 *
Me@19 400 *NOTE: assuming all adds are completed before any writes or reads are
Me@19 401 * performed.. otherwise, this needs to be re-done carefully, probably with
Me@19 402 * a lock.
Me@19 403 */
Me@19 404 int addWriterToSRMWQ( SRMWQueueStruc* Q )
Me@19 405 { int oldSz, i;
Me@19 406 SRSWQueueStruc * *oldArray;
Me@19 407
Me@19 408 (Q->numInternalQs)++;
Me@19 409 if( Q->numInternalQs >= Q->internalQsSz )
Me@19 410 { //full, so make bigger
Me@19 411 oldSz = Q->internalQsSz;
Me@19 412 oldArray = Q->internalQs;
Me@19 413 Q->internalQsSz *= 2;
Me@19 414 Q->internalQs = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
Me@19 415 for( i = 0; i < oldSz; i++ )
Me@19 416 { Q->internalQs[i] = oldArray[i];
Me@19 417 }
Me@19 418 free( oldArray );
Me@19 419 }
Me@19 420 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
Me@19 421 return Q->numInternalQs - 1;
Me@19 422 }
Me@19 423
Me@19 424
Me@19 425 /* ---]]] gets saved index of last SRSW queue read-from
Me@19 426 * ---]]] increments index and gets indexed queue
Me@19 427 * ---]]] does a non-blocking read of that queue
Me@19 428 * ---]]] if gets something, saves index and returns that value
Me@19 429 * ---]]] if gets null, then goes to next queue
Me@19 430 * ---]]] if got null from all the queues then does yield() then tries again
Me@19 431 */
Me@19 432 void* readSRMWQ( SRMWQueueStruc* Q )
Me@19 433 { SRSWQueueStruc *readQ;
Me@19 434 void *readValue = 0;
Me@19 435 int tries = 0;
Me@19 436 int QToReadFrom = 0;
Me@19 437
Me@19 438 QToReadFrom = Q->lastQReadFrom;
Me@19 439
Me@19 440 while( TRUE )
Me@19 441 { QToReadFrom++;
Me@19 442 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
Me@19 443 readQ = Q->internalQs[ QToReadFrom ];
Me@19 444 readValue = readSRSWQ_NonBlocking( readQ );
Me@19 445
Me@19 446 if( readValue != 0 ) //got a value, return it
Me@19 447 { Q->lastQReadFrom = QToReadFrom;
Me@19 448 return readValue;
Me@19 449 }
Me@19 450 else //SRSW Q just read is empty
Me@19 451 { //check if all queues have been tried
Me@19 452 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
Me@19 453 { tries++; //give a writer a chance to finish before yield
Me@19 454 if( tries > SPINLOCK_TRIES ) pthread_yield();
Me@19 455 }
Me@19 456 }
Me@19 457 }
Me@19 458 }
Me@19 459
Me@19 460
Me@19 461 /*
Me@19 462 * ---]]] uses the writerID as index to get the SRSW queue for that writer
Me@19 463 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
Me@19 464 */
Me@19 465 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
Me@19 466 {
Me@19 467 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
Me@19 468
Me@19 469 writeSRSWQ( in, Q->internalQs[ writerID ] );
Me@19 470 }