# HG changeset patch # User Me # Date 1276908578 25200 # Node ID 81f6687d52d15c48f791418b5d937f251281fd96 # Parent 85af604dee9bacb886250f7e951555ea69137be1 Correct SRSW queue and correst CAS queue diff -r 85af604dee9b -r 81f6687d52d1 BlockingQueue.c --- a/BlockingQueue.c Sat May 22 19:51:09 2010 -0700 +++ b/BlockingQueue.c Fri Jun 18 17:49:38 2010 -0700 @@ -1,9 +1,7 @@ /* - * Copyright 2009 OpenSourceCodeStewardshipFoundation.org + * Copyright 2009 OpenSourceStewardshipFoundation.org * Licensed under GNU General Public License version 2 * - * NOTE: this version of SRSW correct as of April 25, 2010 - * * Author: seanhalle@yahoo.com */ @@ -12,11 +10,14 @@ #include #include #include +#include +#include #include "BlockingQueue.h" #define INC(x) (++x == 1024) ? (x) = 0 : (x) +#define SPINLOCK_TRIES 100000 //=========================================================================== //Normal pthread Q @@ -67,6 +68,7 @@ { while( Q -> count == 0 ) { Q -> w_empty = 1; + // pthread_cond_broadcast( &Q->cond_w_t ); status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); if (status != 0) { perror("Thread wait error: "); @@ -79,6 +81,7 @@ INC( Q->readPos ); wt = Q -> w_full; Q -> w_full = 0; + //pthread_cond_broadcast( &Q->cond_w_t ); } pthread_mutex_unlock( &Q->mutex_t ); if (wt) pthread_cond_signal( &Q->cond_w_t ); @@ -142,11 +145,11 @@ void* readCASQ( CASQueueStruc* Q ) - { void *out = 0; - int tries = 0; - int startOfData = Q->startOfData; - int endOfData = Q->endOfData; - + { void *out = 0; + int tries = 0; + void **startOfData = Q->startOfData; + void **endOfData = Q->endOfData; + int success = FALSE; while( !success ) @@ -154,8 +157,8 @@ __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); if( success ) { - volatile int insertPos = Q->insertPos; - volatile int extractPos = Q->extractPos; + void **insertPos = Q->insertPos; + void **extractPos = Q->extractPos; //if not empty -- extract just below insert when empty if( insertPos - extractPos != 1 && @@ -178,16 +181,18 @@ } //Q is busy or empty tries++; - if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock + if( tries > SPINLOCK_TRIES ) SwitchToThread(); //WinAPI yield() } } void writeCASQ( void * in, CASQueueStruc* Q ) { int tries = 0; - int startOfData = Q->startOfData; - int endOfData = Q->endOfData; - + //TODO: need to make Q volatile? Want to do this Q in assembly! + //Have no idea what GCC's going to do to this code + void **startOfData = Q->startOfData; + void **endOfData = Q->endOfData; + int success = FALSE; while( !success ) @@ -195,14 +200,14 @@ __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); if( success ) { - volatile int insertPos = Q->insertPos; - volatile int extractPos = Q->extractPos; + void **insertPos = Q->insertPos; + void **extractPos = Q->extractPos; //check if room to insert.. can't use a count variable // 'cause both insertor Thd and extractor Thd would write it if( extractPos - insertPos != 1 && !(insertPos == endOfData && extractPos == startOfData)) - { *(insertPos) = in; //insert before move + { *(Q->insertPos) = in; //insert before move if( insertPos == endOfData ) //write new pos exactly once, correctly { Q->insertPos = startOfData; } @@ -218,18 +223,19 @@ } } tries++; - if( tries > 10000 ) pthread_yield();//yield not guaranteed + if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() } } #endif //_GNU_SOURCE + //=========================================================================== //Single reader single writer super fast Q.. no atomic instrs.. /*This is a blocking queue, but it uses no atomic instructions, just does - * busy-waiting when empty or full (but yield() if waits too long) + * yield() when empty or full * *It doesn't need any atomic instructions because only a single thread * extracts and only a single thread inserts, and it has no locations that @@ -258,57 +264,210 @@ void* readSRSWQ( SRSWQueueStruc* Q ) { void *out = 0; int tries = 0; - int startOfData = Q->startOfData; - int endOfData = Q->endOfData; while( TRUE ) - { //not certain the volatile reads need to be done, but safe.. - volatile int insertPos = Q->insertPos; - volatile int extractPos = Q->extractPos; - - //if not empty -- extract just below insert when empty - if( insertPos - extractPos != 1 && - !(extractPos == endOfData && insertPos == startOfData)) - { //move before read - if( extractPos == endOfData ) //write new pos exactly once, correctly - { Q->extractPos = startOfData; //can't overrun then fix it 'cause - } // other thread might read bad pos - else - { Q->extractPos++; - } + { + if( Q->insertPos - Q->extractPos != 1 && + !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) + { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData; + else Q->extractPos++; //move before read out = *(Q->extractPos); return out; } //Q is empty tries++; - if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock + if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() } } + +void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q ) + { void *out = 0; + int tries = 0; + + while( TRUE ) + { + if( Q->insertPos - Q->extractPos != 1 && + !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) + { Q->extractPos++; //move before read + if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData; + out = *(Q->extractPos); + return out; + } + //Q is empty + tries++; + if( tries > 2 ) return 0; //long enough for writer to finish + } + } + + void writeSRSWQ( void * in, SRSWQueueStruc* Q ) { int tries = 0; - int startOfData = Q->startOfData; - int endOfData = Q->endOfData; while( TRUE ) - { //not certain the volatile reads need to be done, but safe.. - volatile int insertPos = Q->insertPos; - volatile int extractPos = Q->extractPos; - - if( extractPos - insertPos != 1 && - !(insertPos == endOfData && extractPos == startOfData)) - { *(insertPos) = in; //insert before move - if( insertPos == endOfData ) //write new pos exactly once, correctly - { Q->insertPos = startOfData; - } - else - { Q->insertPos++; - } + { + if( Q->extractPos - Q->insertPos != 1 && + !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData)) + { *(Q->insertPos) = in; //insert before move + if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData; + else Q->insertPos++; return; } //Q is full tries++; - if( tries > 10000 ) pthread_yield();//yield not guaranteed + if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() } } + + + +//=========================================================================== +//Single reader Multiple writer super fast Q.. no atomic instrs.. + + +/*This is a blocking queue, but it uses no atomic instructions, just does + * yield() when empty or full + * + *It doesn't need any atomic instructions because only a single thread + * extracts and only a single thread inserts, and it has no locations that + * are written by both. It writes before moving and moves before reading, + * and never lets write position and read position be the same, so dis- + * synchrony can only ever cause an unnecessary call to yield(), never a + * wrong value (by monotonicity of movement of pointers, plus single writer + * to pointers, plus sequence of write before change pointer, plus + * assumptions that if thread A semantically writes X before Y, then thread + * B will see the writes in that order.) + * + *The multi-writer version is implemented as a hierarchy. Each writer has + * its own single-reader single-writer queue. The reader simply does a + * round-robin harvesting from them. + * + *A writer must first register itself with the queue, and receives an ID back + * It then uses that ID on each write operation. + * + *The implementation is: + *Physically: + * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s + * -] it also has read-pointer to the last queue a write was taken from. + * + *Action-Patterns: + * -] To add a writer + * --]] writer-thread calls addWriterToQ(), remember the ID it returns + * --]] internally addWriterToQ does: + * ---]]] if needs more room, makes a larger writer-array + * ---]]] copies the old writer-array into the new + * ---]]] makes a new SRSW queue an puts it into the array + * ---]]] returns the index to the new SRSW queue as the ID + * -] To write + * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID + * --]] this call may block, via repeated yield() calls + * --]] internally, writeSRMWQ does: + * ---]]] uses the writerID as index to get the SRSW queue for that writer + * ---]]] performs writeQ on that queue (may block via repeated yield calls) + * -] To Read + * --]] reader calls readSRMWQ, passing the Q struc + * --]] this call may block, via repeated yield() calls + * --]] internally, readSRMWQ does: + * ---]]] gets saved index of last SRSW queue read from + * ---]]] increments index and gets indexed queue + * ---]]] does a non-blocking read of that queue + * ---]]] if gets something, saves index and returns that value + * ---]]] if gets null, then goes to next queue + * ---]]] if got null from all the queues then does yield() then tries again + * + *Note: "0" is used as the value null, so SRSW queues must only contain + * pointers, and cannot use 0 as a valid pointer value. + * + */ + +SRMWQueueStruc* makeSRMWQ() + { SRMWQueueStruc* retQ; + + retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) ); + + retQ->numInternalQs = 0; + retQ->internalQsSz = 10; + retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *)); + + retQ->lastQReadFrom = 0; + + return retQ; + } + +/* ---]]] if needs more room, makes a larger writer-array + * ---]]] copies the old writer-array into the new + * ---]]] makes a new SRSW queue an puts it into the array + * ---]]] returns the index to the new SRSW queue as the ID + * + *NOTE: assuming all adds are completed before any writes or reads are + * performed.. otherwise, this needs to be re-done carefully, probably with + * a lock. + */ +int addWriterToSRMWQ( SRMWQueueStruc* Q ) + { int oldSz, i; + SRSWQueueStruc * *oldArray; + + (Q->numInternalQs)++; + if( Q->numInternalQs >= Q->internalQsSz ) + { //full, so make bigger + oldSz = Q->internalQsSz; + oldArray = Q->internalQs; + Q->internalQsSz *= 2; + Q->internalQs = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *)); + for( i = 0; i < oldSz; i++ ) + { Q->internalQs[i] = oldArray[i]; + } + free( oldArray ); + } + Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ(); + return Q->numInternalQs - 1; + } + + +/* ---]]] gets saved index of last SRSW queue read-from + * ---]]] increments index and gets indexed queue + * ---]]] does a non-blocking read of that queue + * ---]]] if gets something, saves index and returns that value + * ---]]] if gets null, then goes to next queue + * ---]]] if got null from all the queues then does yield() then tries again + */ +void* readSRMWQ( SRMWQueueStruc* Q ) + { SRSWQueueStruc *readQ; + void *readValue = 0; + int tries = 0; + int QToReadFrom = 0; + + QToReadFrom = Q->lastQReadFrom; + + while( TRUE ) + { QToReadFrom++; + if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0; + readQ = Q->internalQs[ QToReadFrom ]; + readValue = readSRSWQ_NonBlocking( readQ ); + + if( readValue != 0 ) //got a value, return it + { Q->lastQReadFrom = QToReadFrom; + return readValue; + } + else //SRSW Q just read is empty + { //check if all queues have been tried + if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty + { tries++; //give a writer a chance to finish before yield + if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() + } + } + } + } + + +/* + * ---]]] uses the writerID as index to get the SRSW queue for that writer + * ---]]] performs writeQ on that queue (may block via repeated yield calls) + */ +void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID ) + { + if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error + + writeSRSWQ( in, Q->internalQs[ writerID ] ); + }