Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > Queue_impl
comparison BlockingQueue.c @ 1:81f6687d52d1
Correct SRSW queue and correst CAS queue
| author | Me |
|---|---|
| date | Fri, 18 Jun 2010 17:49:38 -0700 |
| parents | 85af604dee9b |
| children | 8abcca1590b8 |
comparison
equal
deleted
inserted
replaced
| 0:329d0de8e9a4 | 1:949ad36d2c6c |
|---|---|
| 1 /* | 1 /* |
| 2 * Copyright 2009 OpenSourceCodeStewardshipFoundation.org | 2 * Copyright 2009 OpenSourceStewardshipFoundation.org |
| 3 * Licensed under GNU General Public License version 2 | 3 * Licensed under GNU General Public License version 2 |
| 4 * | |
| 5 * NOTE: this version of SRSW correct as of April 25, 2010 | |
| 6 * | 4 * |
| 7 * Author: seanhalle@yahoo.com | 5 * Author: seanhalle@yahoo.com |
| 8 */ | 6 */ |
| 9 | 7 |
| 10 | 8 |
| 11 #include <stdio.h> | 9 #include <stdio.h> |
| 12 #include <errno.h> | 10 #include <errno.h> |
| 13 #include <pthread.h> | 11 #include <pthread.h> |
| 14 #include <stdlib.h> | 12 #include <stdlib.h> |
| 13 #include <sched.h> | |
| 14 #include <windows.h> | |
| 15 | 15 |
| 16 #include "BlockingQueue.h" | 16 #include "BlockingQueue.h" |
| 17 | 17 |
| 18 #define INC(x) (++x == 1024) ? (x) = 0 : (x) | 18 #define INC(x) (++x == 1024) ? (x) = 0 : (x) |
| 19 | 19 |
| 20 #define SPINLOCK_TRIES 100000 | |
| 20 | 21 |
| 21 //=========================================================================== | 22 //=========================================================================== |
| 22 //Normal pthread Q | 23 //Normal pthread Q |
| 23 | 24 |
| 24 QueueStruc* makeQ() | 25 QueueStruc* makeQ() |
| 65 int status, wt; | 66 int status, wt; |
| 66 pthread_mutex_lock( &Q->mutex_t ); | 67 pthread_mutex_lock( &Q->mutex_t ); |
| 67 { | 68 { |
| 68 while( Q -> count == 0 ) | 69 while( Q -> count == 0 ) |
| 69 { Q -> w_empty = 1; | 70 { Q -> w_empty = 1; |
| 71 // pthread_cond_broadcast( &Q->cond_w_t ); | |
| 70 status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); | 72 status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); |
| 71 if (status != 0) | 73 if (status != 0) |
| 72 { perror("Thread wait error: "); | 74 { perror("Thread wait error: "); |
| 73 exit(1); | 75 exit(1); |
| 74 } | 76 } |
| 77 Q -> count -= 1; | 79 Q -> count -= 1; |
| 78 ret = Q->data[ Q->readPos ]; | 80 ret = Q->data[ Q->readPos ]; |
| 79 INC( Q->readPos ); | 81 INC( Q->readPos ); |
| 80 wt = Q -> w_full; | 82 wt = Q -> w_full; |
| 81 Q -> w_full = 0; | 83 Q -> w_full = 0; |
| 84 //pthread_cond_broadcast( &Q->cond_w_t ); | |
| 82 } | 85 } |
| 83 pthread_mutex_unlock( &Q->mutex_t ); | 86 pthread_mutex_unlock( &Q->mutex_t ); |
| 84 if (wt) pthread_cond_signal( &Q->cond_w_t ); | 87 if (wt) pthread_cond_signal( &Q->cond_w_t ); |
| 85 | 88 |
| 86 return( ret ); | 89 return( ret ); |
| 140 return retQ; | 143 return retQ; |
| 141 } | 144 } |
| 142 | 145 |
| 143 | 146 |
| 144 void* readCASQ( CASQueueStruc* Q ) | 147 void* readCASQ( CASQueueStruc* Q ) |
| 145 { void *out = 0; | 148 { void *out = 0; |
| 146 int tries = 0; | 149 int tries = 0; |
| 147 int startOfData = Q->startOfData; | 150 void **startOfData = Q->startOfData; |
| 148 int endOfData = Q->endOfData; | 151 void **endOfData = Q->endOfData; |
| 149 | 152 |
| 150 int success = FALSE; | 153 int success = FALSE; |
| 151 | 154 |
| 152 while( !success ) | 155 while( !success ) |
| 153 { success = | 156 { success = |
| 154 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); | 157 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); |
| 155 if( success ) | 158 if( success ) |
| 156 { | 159 { |
| 157 volatile int insertPos = Q->insertPos; | 160 void **insertPos = Q->insertPos; |
| 158 volatile int extractPos = Q->extractPos; | 161 void **extractPos = Q->extractPos; |
| 159 | 162 |
| 160 //if not empty -- extract just below insert when empty | 163 //if not empty -- extract just below insert when empty |
| 161 if( insertPos - extractPos != 1 && | 164 if( insertPos - extractPos != 1 && |
| 162 !(extractPos == endOfData && insertPos == startOfData)) | 165 !(extractPos == endOfData && insertPos == startOfData)) |
| 163 { //move before read | 166 { //move before read |
| 176 Q->extractLock = UNLOCKED;//have to try again, release for others | 179 Q->extractLock = UNLOCKED;//have to try again, release for others |
| 177 } | 180 } |
| 178 } | 181 } |
| 179 //Q is busy or empty | 182 //Q is busy or empty |
| 180 tries++; | 183 tries++; |
| 181 if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock | 184 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //WinAPI yield() |
| 182 } | 185 } |
| 183 } | 186 } |
| 184 | 187 |
| 185 void writeCASQ( void * in, CASQueueStruc* Q ) | 188 void writeCASQ( void * in, CASQueueStruc* Q ) |
| 186 { | 189 { |
| 187 int tries = 0; | 190 int tries = 0; |
| 188 int startOfData = Q->startOfData; | 191 //TODO: need to make Q volatile? Want to do this Q in assembly! |
| 189 int endOfData = Q->endOfData; | 192 //Have no idea what GCC's going to do to this code |
| 190 | 193 void **startOfData = Q->startOfData; |
| 194 void **endOfData = Q->endOfData; | |
| 195 | |
| 191 int success = FALSE; | 196 int success = FALSE; |
| 192 | 197 |
| 193 while( !success ) | 198 while( !success ) |
| 194 { success = | 199 { success = |
| 195 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); | 200 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); |
| 196 if( success ) | 201 if( success ) |
| 197 { | 202 { |
| 198 volatile int insertPos = Q->insertPos; | 203 void **insertPos = Q->insertPos; |
| 199 volatile int extractPos = Q->extractPos; | 204 void **extractPos = Q->extractPos; |
| 200 | 205 |
| 201 //check if room to insert.. can't use a count variable | 206 //check if room to insert.. can't use a count variable |
| 202 // 'cause both insertor Thd and extractor Thd would write it | 207 // 'cause both insertor Thd and extractor Thd would write it |
| 203 if( extractPos - insertPos != 1 && | 208 if( extractPos - insertPos != 1 && |
| 204 !(insertPos == endOfData && extractPos == startOfData)) | 209 !(insertPos == endOfData && extractPos == startOfData)) |
| 205 { *(insertPos) = in; //insert before move | 210 { *(Q->insertPos) = in; //insert before move |
| 206 if( insertPos == endOfData ) //write new pos exactly once, correctly | 211 if( insertPos == endOfData ) //write new pos exactly once, correctly |
| 207 { Q->insertPos = startOfData; | 212 { Q->insertPos = startOfData; |
| 208 } | 213 } |
| 209 else | 214 else |
| 210 { Q->insertPos++; | 215 { Q->insertPos++; |
| 216 { success = FALSE; | 221 { success = FALSE; |
| 217 Q->insertLock = UNLOCKED;//have to try again, release for others | 222 Q->insertLock = UNLOCKED;//have to try again, release for others |
| 218 } | 223 } |
| 219 } | 224 } |
| 220 tries++; | 225 tries++; |
| 221 if( tries > 10000 ) pthread_yield();//yield not guaranteed | 226 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() |
| 222 } | 227 } |
| 223 } | 228 } |
| 224 | 229 |
| 225 #endif //_GNU_SOURCE | 230 #endif //_GNU_SOURCE |
| 231 | |
| 226 | 232 |
| 227 //=========================================================================== | 233 //=========================================================================== |
| 228 //Single reader single writer super fast Q.. no atomic instrs.. | 234 //Single reader single writer super fast Q.. no atomic instrs.. |
| 229 | 235 |
| 230 | 236 |
| 231 /*This is a blocking queue, but it uses no atomic instructions, just does | 237 /*This is a blocking queue, but it uses no atomic instructions, just does |
| 232 * busy-waiting when empty or full (but yield() if waits too long) | 238 * yield() when empty or full |
| 233 * | 239 * |
| 234 *It doesn't need any atomic instructions because only a single thread | 240 *It doesn't need any atomic instructions because only a single thread |
| 235 * extracts and only a single thread inserts, and it has no locations that | 241 * extracts and only a single thread inserts, and it has no locations that |
| 236 * are written by both. It writes before moving and moves before reading, | 242 * are written by both. It writes before moving and moves before reading, |
| 237 * and never lets write position and read position be the same, so dis- | 243 * and never lets write position and read position be the same, so dis- |
| 256 | 262 |
| 257 | 263 |
| 258 void* readSRSWQ( SRSWQueueStruc* Q ) | 264 void* readSRSWQ( SRSWQueueStruc* Q ) |
| 259 { void *out = 0; | 265 { void *out = 0; |
| 260 int tries = 0; | 266 int tries = 0; |
| 261 int startOfData = Q->startOfData; | |
| 262 int endOfData = Q->endOfData; | |
| 263 | 267 |
| 264 while( TRUE ) | 268 while( TRUE ) |
| 265 { //not certain the volatile reads need to be done, but safe.. | 269 { |
| 266 volatile int insertPos = Q->insertPos; | 270 if( Q->insertPos - Q->extractPos != 1 && |
| 267 volatile int extractPos = Q->extractPos; | 271 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) |
| 268 | 272 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData; |
| 269 //if not empty -- extract just below insert when empty | 273 else Q->extractPos++; //move before read |
| 270 if( insertPos - extractPos != 1 && | |
| 271 !(extractPos == endOfData && insertPos == startOfData)) | |
| 272 { //move before read | |
| 273 if( extractPos == endOfData ) //write new pos exactly once, correctly | |
| 274 { Q->extractPos = startOfData; //can't overrun then fix it 'cause | |
| 275 } // other thread might read bad pos | |
| 276 else | |
| 277 { Q->extractPos++; | |
| 278 } | |
| 279 out = *(Q->extractPos); | 274 out = *(Q->extractPos); |
| 280 return out; | 275 return out; |
| 281 } | 276 } |
| 282 //Q is empty | 277 //Q is empty |
| 283 tries++; | 278 tries++; |
| 284 if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock | 279 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() |
| 285 } | 280 } |
| 286 } | 281 } |
| 282 | |
| 283 | |
| 284 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q ) | |
| 285 { void *out = 0; | |
| 286 int tries = 0; | |
| 287 | |
| 288 while( TRUE ) | |
| 289 { | |
| 290 if( Q->insertPos - Q->extractPos != 1 && | |
| 291 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) | |
| 292 { Q->extractPos++; //move before read | |
| 293 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData; | |
| 294 out = *(Q->extractPos); | |
| 295 return out; | |
| 296 } | |
| 297 //Q is empty | |
| 298 tries++; | |
| 299 if( tries > 2 ) return 0; //long enough for writer to finish | |
| 300 } | |
| 301 } | |
| 302 | |
| 287 | 303 |
| 288 void writeSRSWQ( void * in, SRSWQueueStruc* Q ) | 304 void writeSRSWQ( void * in, SRSWQueueStruc* Q ) |
| 289 { | 305 { |
| 290 int tries = 0; | 306 int tries = 0; |
| 291 int startOfData = Q->startOfData; | |
| 292 int endOfData = Q->endOfData; | |
| 293 | 307 |
| 294 while( TRUE ) | 308 while( TRUE ) |
| 295 { //not certain the volatile reads need to be done, but safe.. | 309 { |
| 296 volatile int insertPos = Q->insertPos; | 310 if( Q->extractPos - Q->insertPos != 1 && |
| 297 volatile int extractPos = Q->extractPos; | 311 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData)) |
| 298 | 312 { *(Q->insertPos) = in; //insert before move |
| 299 if( extractPos - insertPos != 1 && | 313 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData; |
| 300 !(insertPos == endOfData && extractPos == startOfData)) | 314 else Q->insertPos++; |
| 301 { *(insertPos) = in; //insert before move | |
| 302 if( insertPos == endOfData ) //write new pos exactly once, correctly | |
| 303 { Q->insertPos = startOfData; | |
| 304 } | |
| 305 else | |
| 306 { Q->insertPos++; | |
| 307 } | |
| 308 return; | 315 return; |
| 309 } | 316 } |
| 310 //Q is full | 317 //Q is full |
| 311 tries++; | 318 tries++; |
| 312 if( tries > 10000 ) pthread_yield();//yield not guaranteed | 319 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() |
| 313 } | 320 } |
| 314 } | 321 } |
| 322 | |
| 323 | |
| 324 | |
| 325 //=========================================================================== | |
| 326 //Single reader Multiple writer super fast Q.. no atomic instrs.. | |
| 327 | |
| 328 | |
| 329 /*This is a blocking queue, but it uses no atomic instructions, just does | |
| 330 * yield() when empty or full | |
| 331 * | |
| 332 *It doesn't need any atomic instructions because only a single thread | |
| 333 * extracts and only a single thread inserts, and it has no locations that | |
| 334 * are written by both. It writes before moving and moves before reading, | |
| 335 * and never lets write position and read position be the same, so dis- | |
| 336 * synchrony can only ever cause an unnecessary call to yield(), never a | |
| 337 * wrong value (by monotonicity of movement of pointers, plus single writer | |
| 338 * to pointers, plus sequence of write before change pointer, plus | |
| 339 * assumptions that if thread A semantically writes X before Y, then thread | |
| 340 * B will see the writes in that order.) | |
| 341 * | |
| 342 *The multi-writer version is implemented as a hierarchy. Each writer has | |
| 343 * its own single-reader single-writer queue. The reader simply does a | |
| 344 * round-robin harvesting from them. | |
| 345 * | |
| 346 *A writer must first register itself with the queue, and receives an ID back | |
| 347 * It then uses that ID on each write operation. | |
| 348 * | |
| 349 *The implementation is: | |
| 350 *Physically: | |
| 351 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s | |
| 352 * -] it also has read-pointer to the last queue a write was taken from. | |
| 353 * | |
| 354 *Action-Patterns: | |
| 355 * -] To add a writer | |
| 356 * --]] writer-thread calls addWriterToQ(), remember the ID it returns | |
| 357 * --]] internally addWriterToQ does: | |
| 358 * ---]]] if needs more room, makes a larger writer-array | |
| 359 * ---]]] copies the old writer-array into the new | |
| 360 * ---]]] makes a new SRSW queue an puts it into the array | |
| 361 * ---]]] returns the index to the new SRSW queue as the ID | |
| 362 * -] To write | |
| 363 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID | |
| 364 * --]] this call may block, via repeated yield() calls | |
| 365 * --]] internally, writeSRMWQ does: | |
| 366 * ---]]] uses the writerID as index to get the SRSW queue for that writer | |
| 367 * ---]]] performs writeQ on that queue (may block via repeated yield calls) | |
| 368 * -] To Read | |
| 369 * --]] reader calls readSRMWQ, passing the Q struc | |
| 370 * --]] this call may block, via repeated yield() calls | |
| 371 * --]] internally, readSRMWQ does: | |
| 372 * ---]]] gets saved index of last SRSW queue read from | |
| 373 * ---]]] increments index and gets indexed queue | |
| 374 * ---]]] does a non-blocking read of that queue | |
| 375 * ---]]] if gets something, saves index and returns that value | |
| 376 * ---]]] if gets null, then goes to next queue | |
| 377 * ---]]] if got null from all the queues then does yield() then tries again | |
| 378 * | |
| 379 *Note: "0" is used as the value null, so SRSW queues must only contain | |
| 380 * pointers, and cannot use 0 as a valid pointer value. | |
| 381 * | |
| 382 */ | |
| 383 | |
| 384 SRMWQueueStruc* makeSRMWQ() | |
| 385 { SRMWQueueStruc* retQ; | |
| 386 | |
| 387 retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) ); | |
| 388 | |
| 389 retQ->numInternalQs = 0; | |
| 390 retQ->internalQsSz = 10; | |
| 391 retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *)); | |
| 392 | |
| 393 retQ->lastQReadFrom = 0; | |
| 394 | |
| 395 return retQ; | |
| 396 } | |
| 397 | |
| 398 /* ---]]] if needs more room, makes a larger writer-array | |
| 399 * ---]]] copies the old writer-array into the new | |
| 400 * ---]]] makes a new SRSW queue an puts it into the array | |
| 401 * ---]]] returns the index to the new SRSW queue as the ID | |
| 402 * | |
| 403 *NOTE: assuming all adds are completed before any writes or reads are | |
| 404 * performed.. otherwise, this needs to be re-done carefully, probably with | |
| 405 * a lock. | |
| 406 */ | |
| 407 int addWriterToSRMWQ( SRMWQueueStruc* Q ) | |
| 408 { int oldSz, i; | |
| 409 SRSWQueueStruc * *oldArray; | |
| 410 | |
| 411 (Q->numInternalQs)++; | |
| 412 if( Q->numInternalQs >= Q->internalQsSz ) | |
| 413 { //full, so make bigger | |
| 414 oldSz = Q->internalQsSz; | |
| 415 oldArray = Q->internalQs; | |
| 416 Q->internalQsSz *= 2; | |
| 417 Q->internalQs = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *)); | |
| 418 for( i = 0; i < oldSz; i++ ) | |
| 419 { Q->internalQs[i] = oldArray[i]; | |
| 420 } | |
| 421 free( oldArray ); | |
| 422 } | |
| 423 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ(); | |
| 424 return Q->numInternalQs - 1; | |
| 425 } | |
| 426 | |
| 427 | |
| 428 /* ---]]] gets saved index of last SRSW queue read-from | |
| 429 * ---]]] increments index and gets indexed queue | |
| 430 * ---]]] does a non-blocking read of that queue | |
| 431 * ---]]] if gets something, saves index and returns that value | |
| 432 * ---]]] if gets null, then goes to next queue | |
| 433 * ---]]] if got null from all the queues then does yield() then tries again | |
| 434 */ | |
| 435 void* readSRMWQ( SRMWQueueStruc* Q ) | |
| 436 { SRSWQueueStruc *readQ; | |
| 437 void *readValue = 0; | |
| 438 int tries = 0; | |
| 439 int QToReadFrom = 0; | |
| 440 | |
| 441 QToReadFrom = Q->lastQReadFrom; | |
| 442 | |
| 443 while( TRUE ) | |
| 444 { QToReadFrom++; | |
| 445 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0; | |
| 446 readQ = Q->internalQs[ QToReadFrom ]; | |
| 447 readValue = readSRSWQ_NonBlocking( readQ ); | |
| 448 | |
| 449 if( readValue != 0 ) //got a value, return it | |
| 450 { Q->lastQReadFrom = QToReadFrom; | |
| 451 return readValue; | |
| 452 } | |
| 453 else //SRSW Q just read is empty | |
| 454 { //check if all queues have been tried | |
| 455 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty | |
| 456 { tries++; //give a writer a chance to finish before yield | |
| 457 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() | |
| 458 } | |
| 459 } | |
| 460 } | |
| 461 } | |
| 462 | |
| 463 | |
| 464 /* | |
| 465 * ---]]] uses the writerID as index to get the SRSW queue for that writer | |
| 466 * ---]]] performs writeQ on that queue (may block via repeated yield calls) | |
| 467 */ | |
| 468 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID ) | |
| 469 { | |
| 470 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error | |
| 471 | |
| 472 writeSRSWQ( in, Q->internalQs[ writerID ] ); | |
| 473 } |
