annotate BlockingQueue.c @ 18:7c9e00ff1bf4

fixed queue enlarging function
author Merten Sach <msach@mailbox.tu-berlin.de>
date Wed, 22 Jun 2011 18:49:17 +0200
parents 3562716ebdbd
children 677afc259a58
rev   line source
Me@0 1 /*
Me@1 2 * Copyright 2009 OpenSourceStewardshipFoundation.org
Me@0 3 * Licensed under GNU General Public License version 2
Me@0 4 *
Me@0 5 * Author: seanhalle@yahoo.com
Me@0 6 */
Me@0 7
Me@0 8
Me@0 9 #include <stdio.h>
Me@0 10 #include <errno.h>
Me@0 11 #include <pthread.h>
Me@0 12 #include <stdlib.h>
Me@1 13 #include <sched.h>
Me@0 14
Me@0 15 #include "BlockingQueue.h"
msach@17 16 #include "../vmalloc.h"
Me@0 17
Me@0 18 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
Me@0 19
Me@1 20 #define SPINLOCK_TRIES 100000
Me@0 21
Me@0 22 //===========================================================================
Me@0 23 //Normal pthread Q
Me@0 24
Me@4 25 PThdQueueStruc* makePThdQ()
Me@0 26 {
Me@4 27 PThdQueueStruc* retQ;
Me@6 28 int retCode;
Me@9 29 retQ = (PThdQueueStruc *) VMS__malloc( sizeof( PThdQueueStruc ) );
Me@0 30
Me@0 31
Me@6 32 retCode =
Me@6 33 pthread_mutex_init( &retQ->mutex_t, NULL);
Me@6 34 if(retCode){perror("Error in creating mutex:"); exit(1);}
Me@0 35
Me@6 36 retCode = pthread_cond_init ( &retQ->cond_w_t, NULL);
Me@6 37 if(retCode){perror("Error in creating cond_var:"); exit(1);}
Me@0 38
Me@6 39 retCode = pthread_cond_init ( &retQ->cond_r_t, NULL);
Me@6 40 if(retCode){perror("Error in creating cond_var:"); exit(1);}
Me@0 41
Me@0 42 retQ->count = 0;
Me@0 43 retQ->readPos = 0;
Me@0 44 retQ->writePos = 0;
Me@6 45 retQ->w_empty = 0;
Me@6 46 retQ->w_full = 0;
Me@0 47
Me@0 48 return retQ;
Me@0 49 }
Me@0 50
Me@4 51 void * readPThdQ( PThdQueueStruc *Q )
Me@0 52 { void *ret;
Me@6 53 int retCode, wt;
Me@0 54 pthread_mutex_lock( &Q->mutex_t );
Me@0 55 {
Me@0 56 while( Q -> count == 0 )
Me@0 57 { Q -> w_empty = 1;
Me@6 58 retCode =
Me@6 59 pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
Me@6 60 if( retCode ){ perror("Thread wait error: "); exit(1); }
Me@0 61 }
Me@0 62 Q -> w_empty = 0;
Me@0 63 Q -> count -= 1;
Me@0 64 ret = Q->data[ Q->readPos ];
Me@0 65 INC( Q->readPos );
Me@0 66 wt = Q -> w_full;
Me@0 67 Q -> w_full = 0;
Me@0 68 }
Me@0 69 pthread_mutex_unlock( &Q->mutex_t );
Me@6 70 if (wt)
Me@6 71 pthread_cond_signal( &Q->cond_w_t );
Me@0 72
Me@6 73 //printf("Q out: %d\n", ret);
Me@0 74 return( ret );
Me@0 75 }
Me@0 76
Me@4 77 void writePThdQ( void * in, PThdQueueStruc* Q )
Me@0 78 {
Me@0 79 int status, wt;
Me@6 80 //printf("Q in: %d\n", in);
Me@6 81
Me@0 82 pthread_mutex_lock( &Q->mutex_t );
Me@0 83 {
Me@0 84 while( Q->count >= 1024 )
Me@0 85 {
Me@0 86 Q -> w_full = 1;
Me@0 87 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t );
Me@0 88 if (status != 0)
Me@0 89 { perror("Thread wait error: ");
Me@0 90 exit(1);
Me@0 91 }
Me@0 92 }
Me@6 93
Me@0 94 Q -> w_full = 0;
Me@0 95 Q->count += 1;
Me@0 96 Q->data[ Q->writePos ] = in;
Me@0 97 INC( Q->writePos );
Me@0 98 wt = Q -> w_empty;
Me@0 99 Q -> w_empty = 0;
Me@0 100 }
Me@6 101
Me@0 102 pthread_mutex_unlock( &Q->mutex_t );
Me@0 103 if( wt ) pthread_cond_signal( &Q->cond_r_t );
Me@0 104 }
Me@0 105
Me@0 106
Me@0 107 //===========================================================================
Me@0 108 // multi reader multi writer fast Q via CAS
Me@0 109 #ifndef _GNU_SOURCE
Me@0 110 #define _GNU_SOURCE
Me@0 111
Me@0 112 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
Me@0 113 * or full
Me@0 114 *It uses CAS because it's meant to have more than one reader and more than
Me@0 115 * one writer.
Me@0 116 */
Me@0 117
Me@0 118 CASQueueStruc* makeCASQ()
Me@0 119 {
Me@0 120 CASQueueStruc* retQ;
Me@9 121 retQ = (CASQueueStruc *) VMS__malloc( sizeof( CASQueueStruc ) );
Me@0 122
Me@0 123 retQ->insertLock = UNLOCKED;
Me@0 124 retQ->extractLock= UNLOCKED;
Me@0 125 //TODO: check got pointer syntax right
Me@0 126 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
Me@0 127 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
Me@0 128 retQ->endOfData = &(retQ->startOfData[1023]);
Me@0 129
Me@0 130 return retQ;
Me@0 131 }
Me@0 132
Me@0 133
Me@0 134 void* readCASQ( CASQueueStruc* Q )
Me@1 135 { void *out = 0;
Me@1 136 int tries = 0;
Me@1 137 void **startOfData = Q->startOfData;
Me@1 138 void **endOfData = Q->endOfData;
Me@1 139
Me@7 140 int gotLock = FALSE;
Me@0 141
Me@4 142 while( TRUE )
Me@7 143 { //this intrinsic returns true if the lock held "UNLOCKED", in which
Me@7 144 // case it now holds "LOCKED" -- if it already held "LOCKED", then
Me@7 145 // gotLock is FALSE
Me@7 146 gotLock =
Me@0 147 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
Me@4 148 //NOTE: checked assy, and it does lock correctly..
Me@7 149 if( gotLock )
Me@0 150 {
Me@1 151 void **insertPos = Q->insertPos;
Me@1 152 void **extractPos = Q->extractPos;
Me@0 153
Me@0 154 //if not empty -- extract just below insert when empty
Me@0 155 if( insertPos - extractPos != 1 &&
Me@0 156 !(extractPos == endOfData && insertPos == startOfData))
Me@0 157 { //move before read
Me@0 158 if( extractPos == endOfData ) //write new pos exactly once, correctly
Me@0 159 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
Me@0 160 } // other thread might read bad pos
Me@0 161 else
Me@0 162 { Q->extractPos++;
Me@0 163 }
Me@0 164 out = *(Q->extractPos);
Me@0 165 Q->extractLock = UNLOCKED;
Me@0 166 return out;
Me@0 167 }
Me@0 168 else //Q is empty
Me@7 169 { Q->extractLock = UNLOCKED;//empty, so release lock for others
Me@0 170 }
Me@0 171 }
Me@0 172 //Q is busy or empty
Me@0 173 tries++;
Me@6 174 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
Me@0 175 }
Me@0 176 }
Me@0 177
Me@0 178 void writeCASQ( void * in, CASQueueStruc* Q )
Me@0 179 {
Me@0 180 int tries = 0;
Me@1 181 //TODO: need to make Q volatile? Want to do this Q in assembly!
Me@1 182 //Have no idea what GCC's going to do to this code
Me@1 183 void **startOfData = Q->startOfData;
Me@1 184 void **endOfData = Q->endOfData;
Me@1 185
Me@7 186 int gotLock = FALSE;
Me@0 187
Me@4 188 while( TRUE )
Me@7 189 { //this intrinsic returns true if the lock held "UNLOCKED", in which
Me@7 190 // case it now holds "LOCKED" -- if it already held "LOCKED", then
Me@7 191 // gotLock is FALSE
Me@7 192 gotLock =
Me@0 193 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
Me@7 194 if( gotLock )
Me@0 195 {
Me@1 196 void **insertPos = Q->insertPos;
Me@1 197 void **extractPos = Q->extractPos;
Me@0 198
Me@0 199 //check if room to insert.. can't use a count variable
Me@0 200 // 'cause both insertor Thd and extractor Thd would write it
Me@0 201 if( extractPos - insertPos != 1 &&
Me@0 202 !(insertPos == endOfData && extractPos == startOfData))
Me@1 203 { *(Q->insertPos) = in; //insert before move
Me@4 204 if( insertPos == endOfData )
Me@0 205 { Q->insertPos = startOfData;
Me@0 206 }
Me@0 207 else
Me@0 208 { Q->insertPos++;
Me@0 209 }
Me@0 210 Q->insertLock = UNLOCKED;
Me@0 211 return;
Me@0 212 }
Me@0 213 else //Q is full
Me@7 214 { Q->insertLock = UNLOCKED;//full, so release lock for others
Me@0 215 }
Me@0 216 }
Me@0 217 tries++;
Me@6 218 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
Me@0 219 }
Me@0 220 }
Me@0 221
Me@0 222 #endif //_GNU_SOURCE
Me@0 223
Me@1 224
Me@0 225 //===========================================================================
Me@0 226 //Single reader single writer super fast Q.. no atomic instrs..
Me@0 227
Me@0 228
Me@0 229 /*This is a blocking queue, but it uses no atomic instructions, just does
Me@1 230 * yield() when empty or full
Me@0 231 *
Me@0 232 *It doesn't need any atomic instructions because only a single thread
Me@0 233 * extracts and only a single thread inserts, and it has no locations that
Me@0 234 * are written by both. It writes before moving and moves before reading,
Me@0 235 * and never lets write position and read position be the same, so dis-
Me@0 236 * synchrony can only ever cause an unnecessary call to yield(), never a
Me@0 237 * wrong value (by monotonicity of movement of pointers, plus single writer
Me@0 238 * to pointers, plus sequence of write before change pointer, plus
Me@0 239 * assumptions that if thread A semantically writes X before Y, then thread
Me@0 240 * B will see the writes in that order.)
Me@0 241 */
Me@0 242
Me@0 243 SRSWQueueStruc* makeSRSWQ()
Me@0 244 {
Me@0 245 SRSWQueueStruc* retQ;
Me@9 246 retQ = (SRSWQueueStruc *) VMS__malloc( sizeof( SRSWQueueStruc ) );
Me@0 247
Me@0 248 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
Me@0 249 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
Me@0 250 retQ->endOfData = &(retQ->startOfData[1023]);
Me@0 251
Me@0 252 return retQ;
Me@0 253 }
Me@0 254
Me@7 255 void
Me@7 256 freeSRSWQ( SRSWQueueStruc* Q )
Me@7 257 {
Me@10 258 VMS__free( Q );
Me@7 259 }
Me@0 260
Me@0 261 void* readSRSWQ( SRSWQueueStruc* Q )
Me@0 262 { void *out = 0;
Me@0 263 int tries = 0;
Me@0 264
Me@0 265 while( TRUE )
Me@1 266 {
Me@1 267 if( Q->insertPos - Q->extractPos != 1 &&
Me@1 268 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
Me@1 269 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
Me@1 270 else Q->extractPos++; //move before read
Me@0 271 out = *(Q->extractPos);
Me@0 272 return out;
Me@0 273 }
Me@0 274 //Q is empty
Me@0 275 tries++;
Me@6 276 if( tries > SPINLOCK_TRIES ) pthread_yield();
Me@0 277 }
Me@0 278 }
Me@0 279
Me@1 280
Me@1 281 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
Me@1 282 { void *out = 0;
Me@1 283 int tries = 0;
Me@1 284
Me@1 285 while( TRUE )
Me@1 286 {
Me@1 287 if( Q->insertPos - Q->extractPos != 1 &&
Me@1 288 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
Me@1 289 { Q->extractPos++; //move before read
Me@1 290 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
Me@1 291 out = *(Q->extractPos);
Me@1 292 return out;
Me@1 293 }
Me@1 294 //Q is empty
Me@1 295 tries++;
Me@7 296 if( tries > 10 ) return NULL; //long enough for writer to finish
Me@1 297 }
Me@1 298 }
Me@1 299
Me@0 300 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
Me@0 301 {
Me@0 302 int tries = 0;
Me@0 303
Me@0 304 while( TRUE )
Me@1 305 {
Me@1 306 if( Q->extractPos - Q->insertPos != 1 &&
Me@1 307 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
Me@1 308 { *(Q->insertPos) = in; //insert before move
Me@1 309 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
Me@1 310 else Q->insertPos++;
Me@0 311 return;
Me@0 312 }
Me@0 313 //Q is full
Me@0 314 tries++;
Me@6 315 if( tries > SPINLOCK_TRIES ) pthread_yield();
Me@0 316 }
Me@0 317 }
Me@1 318
Me@1 319
Me@1 320
Me@1 321 //===========================================================================
Me@1 322 //Single reader Multiple writer super fast Q.. no atomic instrs..
Me@1 323
Me@1 324
Me@1 325 /*This is a blocking queue, but it uses no atomic instructions, just does
Me@1 326 * yield() when empty or full
Me@1 327 *
Me@1 328 *It doesn't need any atomic instructions because only a single thread
Me@1 329 * extracts and only a single thread inserts, and it has no locations that
Me@1 330 * are written by both. It writes before moving and moves before reading,
Me@1 331 * and never lets write position and read position be the same, so dis-
Me@1 332 * synchrony can only ever cause an unnecessary call to yield(), never a
Me@1 333 * wrong value (by monotonicity of movement of pointers, plus single writer
Me@1 334 * to pointers, plus sequence of write before change pointer, plus
Me@1 335 * assumptions that if thread A semantically writes X before Y, then thread
Me@1 336 * B will see the writes in that order.)
Me@1 337 *
Me@1 338 *The multi-writer version is implemented as a hierarchy. Each writer has
Me@1 339 * its own single-reader single-writer queue. The reader simply does a
Me@1 340 * round-robin harvesting from them.
Me@1 341 *
Me@1 342 *A writer must first register itself with the queue, and receives an ID back
Me@1 343 * It then uses that ID on each write operation.
Me@1 344 *
Me@1 345 *The implementation is:
Me@1 346 *Physically:
Me@1 347 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
Me@1 348 * -] it also has read-pointer to the last queue a write was taken from.
Me@1 349 *
Me@1 350 *Action-Patterns:
Me@1 351 * -] To add a writer
Me@1 352 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
Me@1 353 * --]] internally addWriterToQ does:
Me@1 354 * ---]]] if needs more room, makes a larger writer-array
Me@1 355 * ---]]] copies the old writer-array into the new
Me@1 356 * ---]]] makes a new SRSW queue an puts it into the array
Me@1 357 * ---]]] returns the index to the new SRSW queue as the ID
Me@1 358 * -] To write
Me@1 359 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
Me@1 360 * --]] this call may block, via repeated yield() calls
Me@1 361 * --]] internally, writeSRMWQ does:
Me@1 362 * ---]]] uses the writerID as index to get the SRSW queue for that writer
Me@1 363 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
Me@1 364 * -] To Read
Me@1 365 * --]] reader calls readSRMWQ, passing the Q struc
Me@1 366 * --]] this call may block, via repeated yield() calls
Me@1 367 * --]] internally, readSRMWQ does:
Me@1 368 * ---]]] gets saved index of last SRSW queue read from
Me@1 369 * ---]]] increments index and gets indexed queue
Me@1 370 * ---]]] does a non-blocking read of that queue
Me@1 371 * ---]]] if gets something, saves index and returns that value
Me@1 372 * ---]]] if gets null, then goes to next queue
Me@1 373 * ---]]] if got null from all the queues then does yield() then tries again
Me@1 374 *
Me@1 375 *Note: "0" is used as the value null, so SRSW queues must only contain
Me@1 376 * pointers, and cannot use 0 as a valid pointer value.
Me@1 377 *
Me@1 378 */
Me@1 379
Me@1 380 SRMWQueueStruc* makeSRMWQ()
Me@1 381 { SRMWQueueStruc* retQ;
Me@1 382
Me@9 383 retQ = (SRMWQueueStruc *) VMS__malloc( sizeof( SRMWQueueStruc ) );
Me@1 384
Me@1 385 retQ->numInternalQs = 0;
Me@1 386 retQ->internalQsSz = 10;
Me@10 387 retQ->internalQs = VMS__malloc( retQ->internalQsSz *
Me@10 388 sizeof(SRSWQueueStruc *) );
Me@1 389
Me@1 390 retQ->lastQReadFrom = 0;
Me@1 391
Me@1 392 return retQ;
Me@1 393 }
Me@1 394
Me@1 395 /* ---]]] if needs more room, makes a larger writer-array
Me@1 396 * ---]]] copies the old writer-array into the new
Me@1 397 * ---]]] makes a new SRSW queue an puts it into the array
Me@1 398 * ---]]] returns the index to the new SRSW queue as the ID
Me@1 399 *
Me@1 400 *NOTE: assuming all adds are completed before any writes or reads are
Me@1 401 * performed.. otherwise, this needs to be re-done carefully, probably with
Me@1 402 * a lock.
Me@1 403 */
Me@1 404 int addWriterToSRMWQ( SRMWQueueStruc* Q )
Me@1 405 { int oldSz, i;
Me@1 406 SRSWQueueStruc * *oldArray;
Me@1 407
Me@1 408 (Q->numInternalQs)++;
Me@1 409 if( Q->numInternalQs >= Q->internalQsSz )
Me@1 410 { //full, so make bigger
Me@1 411 oldSz = Q->internalQsSz;
Me@1 412 oldArray = Q->internalQs;
Me@1 413 Q->internalQsSz *= 2;
Me@9 414 Q->internalQs = VMS__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
Me@1 415 for( i = 0; i < oldSz; i++ )
Me@1 416 { Q->internalQs[i] = oldArray[i];
Me@1 417 }
Me@10 418 VMS__free( oldArray );
Me@1 419 }
Me@1 420 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
Me@1 421 return Q->numInternalQs - 1;
Me@1 422 }
Me@1 423
Me@1 424
Me@1 425 /* ---]]] gets saved index of last SRSW queue read-from
Me@1 426 * ---]]] increments index and gets indexed queue
Me@1 427 * ---]]] does a non-blocking read of that queue
Me@1 428 * ---]]] if gets something, saves index and returns that value
Me@1 429 * ---]]] if gets null, then goes to next queue
Me@1 430 * ---]]] if got null from all the queues then does yield() then tries again
Me@1 431 */
Me@1 432 void* readSRMWQ( SRMWQueueStruc* Q )
Me@1 433 { SRSWQueueStruc *readQ;
Me@1 434 void *readValue = 0;
Me@1 435 int tries = 0;
Me@1 436 int QToReadFrom = 0;
Me@1 437
Me@1 438 QToReadFrom = Q->lastQReadFrom;
Me@1 439
Me@1 440 while( TRUE )
Me@1 441 { QToReadFrom++;
Me@1 442 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
Me@1 443 readQ = Q->internalQs[ QToReadFrom ];
Me@1 444 readValue = readSRSWQ_NonBlocking( readQ );
Me@1 445
Me@1 446 if( readValue != 0 ) //got a value, return it
Me@1 447 { Q->lastQReadFrom = QToReadFrom;
Me@1 448 return readValue;
Me@1 449 }
Me@1 450 else //SRSW Q just read is empty
Me@1 451 { //check if all queues have been tried
Me@1 452 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
Me@1 453 { tries++; //give a writer a chance to finish before yield
Me@6 454 if( tries > SPINLOCK_TRIES ) pthread_yield();
Me@1 455 }
Me@1 456 }
Me@1 457 }
Me@1 458 }
Me@1 459
Me@1 460
Me@1 461 /*
Me@1 462 * ---]]] uses the writerID as index to get the SRSW queue for that writer
Me@1 463 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
Me@1 464 */
Me@1 465 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
Me@1 466 {
Me@1 467 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
Me@1 468
Me@1 469 writeSRSWQ( in, Q->internalQs[ writerID ] );
Me@1 470 }