# HG changeset patch # User Me@portablequad # Date 1328647889 28800 # Node ID 1ed562d601d9b42daaeb0ea22d4ed5d93b2ffa47 # Parent 53c614b781ce12b818d5b81d8804dcec33e6046d Newly created project repository -- commit sub-states diff -r 53c614b781ce -r 1ed562d601d9 BlockingQueue.c --- a/BlockingQueue.c Thu Nov 04 17:50:29 2010 -0700 +++ b/BlockingQueue.c Tue Feb 07 12:51:29 2012 -0800 @@ -1,470 +1,470 @@ -/* - * Copyright 2009 OpenSourceStewardshipFoundation.org - * Licensed under GNU General Public License version 2 - * - * Author: seanhalle@yahoo.com - */ - - -#include -#include -#include -#include -#include - -#include "BlockingQueue.h" - -#define INC(x) (++x == 1024) ? (x) = 0 : (x) - -#define SPINLOCK_TRIES 100000 - -//=========================================================================== -//Normal pthread Q - -PThdQueueStruc* makePThdQ() - { - PThdQueueStruc* retQ; - int retCode; - retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) ); - - - retCode = - pthread_mutex_init( &retQ->mutex_t, NULL); - if(retCode){perror("Error in creating mutex:"); exit(1);} - - retCode = pthread_cond_init ( &retQ->cond_w_t, NULL); - if(retCode){perror("Error in creating cond_var:"); exit(1);} - - retCode = pthread_cond_init ( &retQ->cond_r_t, NULL); - if(retCode){perror("Error in creating cond_var:"); exit(1);} - - retQ->count = 0; - retQ->readPos = 0; - retQ->writePos = 0; - retQ->w_empty = 0; - retQ->w_full = 0; - - return retQ; - } - -void * readPThdQ( PThdQueueStruc *Q ) - { void *ret; - int retCode, wt; - pthread_mutex_lock( &Q->mutex_t ); - { - while( Q -> count == 0 ) - { Q -> w_empty = 1; - retCode = - pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); - if( retCode ){ perror("Thread wait error: "); exit(1); } - } - Q -> w_empty = 0; - Q -> count -= 1; - ret = Q->data[ Q->readPos ]; - INC( Q->readPos ); - wt = Q -> w_full; - Q -> w_full = 0; - } - pthread_mutex_unlock( &Q->mutex_t ); - if (wt) - pthread_cond_signal( &Q->cond_w_t ); - - //printf("Q out: %d\n", ret); - return( ret ); - } - -void writePThdQ( void * in, PThdQueueStruc* Q ) - { - int status, wt; - //printf("Q in: %d\n", in); - - pthread_mutex_lock( &Q->mutex_t ); - { - while( Q->count >= 1024 ) - { - Q -> w_full = 1; - status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); - if (status != 0) - { perror("Thread wait error: "); - exit(1); - } - } - - Q -> w_full = 0; - Q->count += 1; - Q->data[ Q->writePos ] = in; - INC( Q->writePos ); - wt = Q -> w_empty; - Q -> w_empty = 0; - } - - pthread_mutex_unlock( &Q->mutex_t ); - if( wt ) pthread_cond_signal( &Q->cond_r_t ); - } - - -//=========================================================================== -// multi reader multi writer fast Q via CAS -#ifndef _GNU_SOURCE -#define _GNU_SOURCE - -/*This is a blocking queue, but it uses CAS instr plus yield() when empty - * or full - *It uses CAS because it's meant to have more than one reader and more than - * one writer. - */ - -CASQueueStruc* makeCASQ() - { - CASQueueStruc* retQ; - retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) ); - - retQ->insertLock = UNLOCKED; - retQ->extractLock= UNLOCKED; - //TODO: check got pointer syntax right - retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty - retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be - retQ->endOfData = &(retQ->startOfData[1023]); - - return retQ; - } - - -void* readCASQ( CASQueueStruc* Q ) - { void *out = 0; - int tries = 0; - void **startOfData = Q->startOfData; - void **endOfData = Q->endOfData; - - int gotLock = FALSE; - - while( TRUE ) - { //this intrinsic returns true if the lock held "UNLOCKED", in which - // case it now holds "LOCKED" -- if it already held "LOCKED", then - // gotLock is FALSE - gotLock = - __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); - //NOTE: checked assy, and it does lock correctly.. - if( gotLock ) - { - void **insertPos = Q->insertPos; - void **extractPos = Q->extractPos; - - //if not empty -- extract just below insert when empty - if( insertPos - extractPos != 1 && - !(extractPos == endOfData && insertPos == startOfData)) - { //move before read - if( extractPos == endOfData ) //write new pos exactly once, correctly - { Q->extractPos = startOfData; //can't overrun then fix it 'cause - } // other thread might read bad pos - else - { Q->extractPos++; - } - out = *(Q->extractPos); - Q->extractLock = UNLOCKED; - return out; - } - else //Q is empty - { Q->extractLock = UNLOCKED;//empty, so release lock for others - } - } - //Q is busy or empty - tries++; - if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable - } - } - -void writeCASQ( void * in, CASQueueStruc* Q ) - { - int tries = 0; - //TODO: need to make Q volatile? Want to do this Q in assembly! - //Have no idea what GCC's going to do to this code - void **startOfData = Q->startOfData; - void **endOfData = Q->endOfData; - - int gotLock = FALSE; - - while( TRUE ) - { //this intrinsic returns true if the lock held "UNLOCKED", in which - // case it now holds "LOCKED" -- if it already held "LOCKED", then - // gotLock is FALSE - gotLock = - __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); - if( gotLock ) - { - void **insertPos = Q->insertPos; - void **extractPos = Q->extractPos; - - //check if room to insert.. can't use a count variable - // 'cause both insertor Thd and extractor Thd would write it - if( extractPos - insertPos != 1 && - !(insertPos == endOfData && extractPos == startOfData)) - { *(Q->insertPos) = in; //insert before move - if( insertPos == endOfData ) - { Q->insertPos = startOfData; - } - else - { Q->insertPos++; - } - Q->insertLock = UNLOCKED; - return; - } - else //Q is full - { Q->insertLock = UNLOCKED;//full, so release lock for others - } - } - tries++; - if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable - } - } - -#endif //_GNU_SOURCE - - -//=========================================================================== -//Single reader single writer super fast Q.. no atomic instrs.. - - -/*This is a blocking queue, but it uses no atomic instructions, just does - * yield() when empty or full - * - *It doesn't need any atomic instructions because only a single thread - * extracts and only a single thread inserts, and it has no locations that - * are written by both. It writes before moving and moves before reading, - * and never lets write position and read position be the same, so dis- - * synchrony can only ever cause an unnecessary call to yield(), never a - * wrong value (by monotonicity of movement of pointers, plus single writer - * to pointers, plus sequence of write before change pointer, plus - * assumptions that if thread A semantically writes X before Y, then thread - * B will see the writes in that order.) - */ - -SRSWQueueStruc* makeSRSWQ() - { - SRSWQueueStruc* retQ; - retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) ); - memset( retQ->startOfData, 0, 1024 * sizeof(void *) ); - - retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty - retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be - retQ->endOfData = &(retQ->startOfData[1023]); - - return retQ; - } - -void -freeSRSWQ( SRSWQueueStruc* Q ) - { - free( Q ); - } - -void* readSRSWQ( SRSWQueueStruc* Q ) - { void *out = 0; - int tries = 0; - - while( TRUE ) - { - if( Q->insertPos - Q->extractPos != 1 && - !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) - { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData; - else Q->extractPos++; //move before read - out = *(Q->extractPos); - return out; - } - //Q is empty - tries++; - if( tries > SPINLOCK_TRIES ) pthread_yield(); - } - } - - -void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q ) - { void *out = 0; - int tries = 0; - - while( TRUE ) - { - if( Q->insertPos - Q->extractPos != 1 && - !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) - { Q->extractPos++; //move before read - if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData; - out = *(Q->extractPos); - return out; - } - //Q is empty - tries++; - if( tries > 10 ) return NULL; //long enough for writer to finish - } - } - - -void writeSRSWQ( void * in, SRSWQueueStruc* Q ) - { - int tries = 0; - - while( TRUE ) - { - if( Q->extractPos - Q->insertPos != 1 && - !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData)) - { *(Q->insertPos) = in; //insert before move - if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData; - else Q->insertPos++; - return; - } - //Q is full - tries++; - if( tries > SPINLOCK_TRIES ) pthread_yield(); - } - } - - - -//=========================================================================== -//Single reader Multiple writer super fast Q.. no atomic instrs.. - - -/*This is a blocking queue, but it uses no atomic instructions, just does - * yield() when empty or full - * - *It doesn't need any atomic instructions because only a single thread - * extracts and only a single thread inserts, and it has no locations that - * are written by both. It writes before moving and moves before reading, - * and never lets write position and read position be the same, so dis- - * synchrony can only ever cause an unnecessary call to yield(), never a - * wrong value (by monotonicity of movement of pointers, plus single writer - * to pointers, plus sequence of write before change pointer, plus - * assumptions that if thread A semantically writes X before Y, then thread - * B will see the writes in that order.) - * - *The multi-writer version is implemented as a hierarchy. Each writer has - * its own single-reader single-writer queue. The reader simply does a - * round-robin harvesting from them. - * - *A writer must first register itself with the queue, and receives an ID back - * It then uses that ID on each write operation. - * - *The implementation is: - *Physically: - * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s - * -] it also has read-pointer to the last queue a write was taken from. - * - *Action-Patterns: - * -] To add a writer - * --]] writer-thread calls addWriterToQ(), remember the ID it returns - * --]] internally addWriterToQ does: - * ---]]] if needs more room, makes a larger writer-array - * ---]]] copies the old writer-array into the new - * ---]]] makes a new SRSW queue an puts it into the array - * ---]]] returns the index to the new SRSW queue as the ID - * -] To write - * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID - * --]] this call may block, via repeated yield() calls - * --]] internally, writeSRMWQ does: - * ---]]] uses the writerID as index to get the SRSW queue for that writer - * ---]]] performs writeQ on that queue (may block via repeated yield calls) - * -] To Read - * --]] reader calls readSRMWQ, passing the Q struc - * --]] this call may block, via repeated yield() calls - * --]] internally, readSRMWQ does: - * ---]]] gets saved index of last SRSW queue read from - * ---]]] increments index and gets indexed queue - * ---]]] does a non-blocking read of that queue - * ---]]] if gets something, saves index and returns that value - * ---]]] if gets null, then goes to next queue - * ---]]] if got null from all the queues then does yield() then tries again - * - *Note: "0" is used as the value null, so SRSW queues must only contain - * pointers, and cannot use 0 as a valid pointer value. - * - */ - -SRMWQueueStruc* makeSRMWQ() - { SRMWQueueStruc* retQ; - - retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) ); - - retQ->numInternalQs = 0; - retQ->internalQsSz = 10; - retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *)); - - retQ->lastQReadFrom = 0; - - return retQ; - } - -/* ---]]] if needs more room, makes a larger writer-array - * ---]]] copies the old writer-array into the new - * ---]]] makes a new SRSW queue an puts it into the array - * ---]]] returns the index to the new SRSW queue as the ID - * - *NOTE: assuming all adds are completed before any writes or reads are - * performed.. otherwise, this needs to be re-done carefully, probably with - * a lock. - */ -int addWriterToSRMWQ( SRMWQueueStruc* Q ) - { int oldSz, i; - SRSWQueueStruc * *oldArray; - - (Q->numInternalQs)++; - if( Q->numInternalQs >= Q->internalQsSz ) - { //full, so make bigger - oldSz = Q->internalQsSz; - oldArray = Q->internalQs; - Q->internalQsSz *= 2; - Q->internalQs = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *)); - for( i = 0; i < oldSz; i++ ) - { Q->internalQs[i] = oldArray[i]; - } - free( oldArray ); - } - Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ(); - return Q->numInternalQs - 1; - } - - -/* ---]]] gets saved index of last SRSW queue read-from - * ---]]] increments index and gets indexed queue - * ---]]] does a non-blocking read of that queue - * ---]]] if gets something, saves index and returns that value - * ---]]] if gets null, then goes to next queue - * ---]]] if got null from all the queues then does yield() then tries again - */ -void* readSRMWQ( SRMWQueueStruc* Q ) - { SRSWQueueStruc *readQ; - void *readValue = 0; - int tries = 0; - int QToReadFrom = 0; - - QToReadFrom = Q->lastQReadFrom; - - while( TRUE ) - { QToReadFrom++; - if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0; - readQ = Q->internalQs[ QToReadFrom ]; - readValue = readSRSWQ_NonBlocking( readQ ); - - if( readValue != 0 ) //got a value, return it - { Q->lastQReadFrom = QToReadFrom; - return readValue; - } - else //SRSW Q just read is empty - { //check if all queues have been tried - if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty - { tries++; //give a writer a chance to finish before yield - if( tries > SPINLOCK_TRIES ) pthread_yield(); - } - } - } - } - - -/* - * ---]]] uses the writerID as index to get the SRSW queue for that writer - * ---]]] performs writeQ on that queue (may block via repeated yield calls) - */ -void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID ) - { - if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error - - writeSRSWQ( in, Q->internalQs[ writerID ] ); - } +/* + * Copyright 2009 OpenSourceStewardshipFoundation.org + * Licensed under GNU General Public License version 2 + * + * Author: seanhalle@yahoo.com + */ + + +#include +#include +#include +#include +#include + +#include "BlockingQueue.h" + +#define INC(x) (++x == 1024) ? (x) = 0 : (x) + +#define SPINLOCK_TRIES 100000 + +//=========================================================================== +//Normal pthread Q + +PThdQueueStruc* makePThdQ() + { + PThdQueueStruc* retQ; + int retCode; + retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) ); + + + retCode = + pthread_mutex_init( &retQ->mutex_t, NULL); + if(retCode){perror("Error in creating mutex:"); exit(1);} + + retCode = pthread_cond_init ( &retQ->cond_w_t, NULL); + if(retCode){perror("Error in creating cond_var:"); exit(1);} + + retCode = pthread_cond_init ( &retQ->cond_r_t, NULL); + if(retCode){perror("Error in creating cond_var:"); exit(1);} + + retQ->count = 0; + retQ->readPos = 0; + retQ->writePos = 0; + retQ->w_empty = 0; + retQ->w_full = 0; + + return retQ; + } + +void * readPThdQ( PThdQueueStruc *Q ) + { void *ret; + int retCode, wt; + pthread_mutex_lock( &Q->mutex_t ); + { + while( Q -> count == 0 ) + { Q -> w_empty = 1; + retCode = + pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); + if( retCode ){ perror("Thread wait error: "); exit(1); } + } + Q -> w_empty = 0; + Q -> count -= 1; + ret = Q->data[ Q->readPos ]; + INC( Q->readPos ); + wt = Q -> w_full; + Q -> w_full = 0; + } + pthread_mutex_unlock( &Q->mutex_t ); + if (wt) + pthread_cond_signal( &Q->cond_w_t ); + + //printf("Q out: %d\n", ret); + return( ret ); + } + +void writePThdQ( void * in, PThdQueueStruc* Q ) + { + int status, wt; + //printf("Q in: %d\n", in); + + pthread_mutex_lock( &Q->mutex_t ); + { + while( Q->count >= 1024 ) + { + Q -> w_full = 1; + status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); + if (status != 0) + { perror("Thread wait error: "); + exit(1); + } + } + + Q -> w_full = 0; + Q->count += 1; + Q->data[ Q->writePos ] = in; + INC( Q->writePos ); + wt = Q -> w_empty; + Q -> w_empty = 0; + } + + pthread_mutex_unlock( &Q->mutex_t ); + if( wt ) pthread_cond_signal( &Q->cond_r_t ); + } + + +//=========================================================================== +// multi reader multi writer fast Q via CAS +#ifndef _GNU_SOURCE +#define _GNU_SOURCE + +/*This is a blocking queue, but it uses CAS instr plus yield() when empty + * or full + *It uses CAS because it's meant to have more than one reader and more than + * one writer. + */ + +CASQueueStruc* makeCASQ() + { + CASQueueStruc* retQ; + retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) ); + + retQ->insertLock = UNLOCKED; + retQ->extractLock= UNLOCKED; + //TODO: check got pointer syntax right + retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty + retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be + retQ->endOfData = &(retQ->startOfData[1023]); + + return retQ; + } + + +void* readCASQ( CASQueueStruc* Q ) + { void *out = 0; + int tries = 0; + void **startOfData = Q->startOfData; + void **endOfData = Q->endOfData; + + int gotLock = FALSE; + + while( TRUE ) + { //this intrinsic returns true if the lock held "UNLOCKED", in which + // case it now holds "LOCKED" -- if it already held "LOCKED", then + // gotLock is FALSE + gotLock = + __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); + //NOTE: checked assy, and it does lock correctly.. + if( gotLock ) + { + void **insertPos = Q->insertPos; + void **extractPos = Q->extractPos; + + //if not empty -- extract just below insert when empty + if( insertPos - extractPos != 1 && + !(extractPos == endOfData && insertPos == startOfData)) + { //move before read + if( extractPos == endOfData ) //write new pos exactly once, correctly + { Q->extractPos = startOfData; //can't overrun then fix it 'cause + } // other thread might read bad pos + else + { Q->extractPos++; + } + out = *(Q->extractPos); + Q->extractLock = UNLOCKED; + return out; + } + else //Q is empty + { Q->extractLock = UNLOCKED;//empty, so release lock for others + } + } + //Q is busy or empty + tries++; + if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable + } + } + +void writeCASQ( void * in, CASQueueStruc* Q ) + { + int tries = 0; + //TODO: need to make Q volatile? Want to do this Q in assembly! + //Have no idea what GCC's going to do to this code + void **startOfData = Q->startOfData; + void **endOfData = Q->endOfData; + + int gotLock = FALSE; + + while( TRUE ) + { //this intrinsic returns true if the lock held "UNLOCKED", in which + // case it now holds "LOCKED" -- if it already held "LOCKED", then + // gotLock is FALSE + gotLock = + __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); + if( gotLock ) + { + void **insertPos = Q->insertPos; + void **extractPos = Q->extractPos; + + //check if room to insert.. can't use a count variable + // 'cause both insertor Thd and extractor Thd would write it + if( extractPos - insertPos != 1 && + !(insertPos == endOfData && extractPos == startOfData)) + { *(Q->insertPos) = in; //insert before move + if( insertPos == endOfData ) + { Q->insertPos = startOfData; + } + else + { Q->insertPos++; + } + Q->insertLock = UNLOCKED; + return; + } + else //Q is full + { Q->insertLock = UNLOCKED;//full, so release lock for others + } + } + tries++; + if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable + } + } + +#endif //_GNU_SOURCE + + +//=========================================================================== +//Single reader single writer super fast Q.. no atomic instrs.. + + +/*This is a blocking queue, but it uses no atomic instructions, just does + * yield() when empty or full + * + *It doesn't need any atomic instructions because only a single thread + * extracts and only a single thread inserts, and it has no locations that + * are written by both. It writes before moving and moves before reading, + * and never lets write position and read position be the same, so dis- + * synchrony can only ever cause an unnecessary call to yield(), never a + * wrong value (by monotonicity of movement of pointers, plus single writer + * to pointers, plus sequence of write before change pointer, plus + * assumptions that if thread A semantically writes X before Y, then thread + * B will see the writes in that order.) + */ + +SRSWQueueStruc* makeSRSWQ() + { + SRSWQueueStruc* retQ; + retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) ); + memset( retQ->startOfData, 0, 1024 * sizeof(void *) ); + + retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty + retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be + retQ->endOfData = &(retQ->startOfData[1023]); + + return retQ; + } + +void +freeSRSWQ( SRSWQueueStruc* Q ) + { + free( Q ); + } + +void* readSRSWQ( SRSWQueueStruc* Q ) + { void *out = 0; + int tries = 0; + + while( TRUE ) + { + if( Q->insertPos - Q->extractPos != 1 && + !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) + { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData; + else Q->extractPos++; //move before read + out = *(Q->extractPos); + return out; + } + //Q is empty + tries++; + if( tries > SPINLOCK_TRIES ) pthread_yield(); + } + } + + +void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q ) + { void *out = 0; + int tries = 0; + + while( TRUE ) + { + if( Q->insertPos - Q->extractPos != 1 && + !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) + { Q->extractPos++; //move before read + if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData; + out = *(Q->extractPos); + return out; + } + //Q is empty + tries++; + if( tries > 10 ) return NULL; //long enough for writer to finish + } + } + + +void writeSRSWQ( void * in, SRSWQueueStruc* Q ) + { + int tries = 0; + + while( TRUE ) + { + if( Q->extractPos - Q->insertPos != 1 && + !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData)) + { *(Q->insertPos) = in; //insert before move + if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData; + else Q->insertPos++; + return; + } + //Q is full + tries++; + if( tries > SPINLOCK_TRIES ) pthread_yield(); + } + } + + + +//=========================================================================== +//Single reader Multiple writer super fast Q.. no atomic instrs.. + + +/*This is a blocking queue, but it uses no atomic instructions, just does + * yield() when empty or full + * + *It doesn't need any atomic instructions because only a single thread + * extracts and only a single thread inserts, and it has no locations that + * are written by both. It writes before moving and moves before reading, + * and never lets write position and read position be the same, so dis- + * synchrony can only ever cause an unnecessary call to yield(), never a + * wrong value (by monotonicity of movement of pointers, plus single writer + * to pointers, plus sequence of write before change pointer, plus + * assumptions that if thread A semantically writes X before Y, then thread + * B will see the writes in that order.) + * + *The multi-writer version is implemented as a hierarchy. Each writer has + * its own single-reader single-writer queue. The reader simply does a + * round-robin harvesting from them. + * + *A writer must first register itself with the queue, and receives an ID back + * It then uses that ID on each write operation. + * + *The implementation is: + *Physically: + * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s + * -] it also has read-pointer to the last queue a write was taken from. + * + *Action-Patterns: + * -] To add a writer + * --]] writer-thread calls addWriterToQ(), remember the ID it returns + * --]] internally addWriterToQ does: + * ---]]] if needs more room, makes a larger writer-array + * ---]]] copies the old writer-array into the new + * ---]]] makes a new SRSW queue an puts it into the array + * ---]]] returns the index to the new SRSW queue as the ID + * -] To write + * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID + * --]] this call may block, via repeated yield() calls + * --]] internally, writeSRMWQ does: + * ---]]] uses the writerID as index to get the SRSW queue for that writer + * ---]]] performs writeQ on that queue (may block via repeated yield calls) + * -] To Read + * --]] reader calls readSRMWQ, passing the Q struc + * --]] this call may block, via repeated yield() calls + * --]] internally, readSRMWQ does: + * ---]]] gets saved index of last SRSW queue read from + * ---]]] increments index and gets indexed queue + * ---]]] does a non-blocking read of that queue + * ---]]] if gets something, saves index and returns that value + * ---]]] if gets null, then goes to next queue + * ---]]] if got null from all the queues then does yield() then tries again + * + *Note: "0" is used as the value null, so SRSW queues must only contain + * pointers, and cannot use 0 as a valid pointer value. + * + */ + +SRMWQueueStruc* makeSRMWQ() + { SRMWQueueStruc* retQ; + + retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) ); + + retQ->numInternalQs = 0; + retQ->internalQsSz = 10; + retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *)); + + retQ->lastQReadFrom = 0; + + return retQ; + } + +/* ---]]] if needs more room, makes a larger writer-array + * ---]]] copies the old writer-array into the new + * ---]]] makes a new SRSW queue an puts it into the array + * ---]]] returns the index to the new SRSW queue as the ID + * + *NOTE: assuming all adds are completed before any writes or reads are + * performed.. otherwise, this needs to be re-done carefully, probably with + * a lock. + */ +int addWriterToSRMWQ( SRMWQueueStruc* Q ) + { int oldSz, i; + SRSWQueueStruc * *oldArray; + + (Q->numInternalQs)++; + if( Q->numInternalQs >= Q->internalQsSz ) + { //full, so make bigger + oldSz = Q->internalQsSz; + oldArray = Q->internalQs; + Q->internalQsSz *= 2; + Q->internalQs = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *)); + for( i = 0; i < oldSz; i++ ) + { Q->internalQs[i] = oldArray[i]; + } + free( oldArray ); + } + Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ(); + return Q->numInternalQs - 1; + } + + +/* ---]]] gets saved index of last SRSW queue read-from + * ---]]] increments index and gets indexed queue + * ---]]] does a non-blocking read of that queue + * ---]]] if gets something, saves index and returns that value + * ---]]] if gets null, then goes to next queue + * ---]]] if got null from all the queues then does yield() then tries again + */ +void* readSRMWQ( SRMWQueueStruc* Q ) + { SRSWQueueStruc *readQ; + void *readValue = 0; + int tries = 0; + int QToReadFrom = 0; + + QToReadFrom = Q->lastQReadFrom; + + while( TRUE ) + { QToReadFrom++; + if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0; + readQ = Q->internalQs[ QToReadFrom ]; + readValue = readSRSWQ_NonBlocking( readQ ); + + if( readValue != 0 ) //got a value, return it + { Q->lastQReadFrom = QToReadFrom; + return readValue; + } + else //SRSW Q just read is empty + { //check if all queues have been tried + if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty + { tries++; //give a writer a chance to finish before yield + if( tries > SPINLOCK_TRIES ) pthread_yield(); + } + } + } + } + + +/* + * ---]]] uses the writerID as index to get the SRSW queue for that writer + * ---]]] performs writeQ on that queue (may block via repeated yield calls) + */ +void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID ) + { + if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error + + writeSRSWQ( in, Q->internalQs[ writerID ] ); + } diff -r 53c614b781ce -r 1ed562d601d9 PrivateQueue.c --- a/PrivateQueue.c Thu Nov 04 17:50:29 2010 -0700 +++ b/PrivateQueue.c Tue Feb 07 12:51:29 2012 -0800 @@ -1,141 +1,141 @@ -/* - * Copyright 2009 OpenSourceStewardshipFoundation.org - * Licensed under GNU General Public License version 2 - * - * NOTE: this version of SRSW correct as of April 25, 2010 - * - * Author: seanhalle@yahoo.com - */ - - -#include -#include -#include -#include - -#include "PrivateQueue.h" - - - -//=========================================================================== - -/*This kind of queue is private to a single core at a time -- has no - * synchronizations - */ - -PrivQueueStruc* makePrivQ() - { - PrivQueueStruc* retQ; - retQ = (PrivQueueStruc *) malloc( sizeof( PrivQueueStruc ) ); - - retQ->startOfData = malloc( 1024 * sizeof(void *) ); - memset( retQ->startOfData, 0, 1024 * sizeof(void *) ); - retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty - retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be - retQ->endOfData = &(retQ->startOfData[1023]); - - return retQ; - } - - -void -enlargePrivQ( PrivQueueStruc *Q ) - { int oldSize, newSize; - void **oldStartOfData; - - oldSize = Q->endOfData - Q->startOfData; - newSize = 2 * oldSize; - oldStartOfData = Q->startOfData; - Q->startOfData = malloc( newSize * sizeof(void *) ); - memcpy(Q->startOfData, oldStartOfData, oldSize * sizeof(void *)); - free(oldStartOfData); - - Q->extractPos = &(Q->startOfData[0]); //side by side == empty - Q->insertPos = &(Q->startOfData[1]); // so start pos's have to be - Q->endOfData = &(Q->startOfData[newSize - 1]); - } - - -/*Returns NULL when queue is empty - */ -void* readPrivQ( PrivQueueStruc* Q ) - { void *out = 0; - void **startOfData = Q->startOfData; - void **endOfData = Q->endOfData; - - void **insertPos = Q->insertPos; - void **extractPos = Q->extractPos; - - //if not empty -- (extract is just below insert when empty) - if( insertPos - extractPos != 1 && - !(extractPos == endOfData && insertPos == startOfData)) - { //move before read - if( extractPos == endOfData ) //write new pos exactly once, correctly - { Q->extractPos = startOfData; //can't overrun then fix it 'cause - } // other thread might read bad pos - else - { Q->extractPos++; - } - out = *(Q->extractPos); - return out; - } - //Q is empty - return NULL; - } - - -/*Expands the queue size automatically when it's full - */ -void -writePrivQ( void * in, PrivQueueStruc* Q ) - { - void **startOfData = Q->startOfData; - void **endOfData = Q->endOfData; - - void **insertPos = Q->insertPos; - void **extractPos = Q->extractPos; - -tryAgain: - //Full? (insert is just below extract when full) - if( extractPos - insertPos != 1 && - !(insertPos == endOfData && extractPos == startOfData)) - { *(Q->insertPos) = in; //insert before move - if( insertPos == endOfData ) //write new pos exactly once, correctly - { Q->insertPos = startOfData; - } - else - { Q->insertPos++; - } - return; - } - //Q is full - enlargePrivQ( Q ); - goto tryAgain; - } - - -/*Returns false when the queue was full. - * have option of calling make_larger_PrivQ to make more room, then try again - */ -int writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ) - { - void **startOfData = Q->startOfData; - void **endOfData = Q->endOfData; - - void **insertPos = Q->insertPos; - void **extractPos = Q->extractPos; - - if( extractPos - insertPos != 1 && - !(insertPos == endOfData && extractPos == startOfData)) - { *(Q->insertPos) = in; //insert before move - if( insertPos == endOfData ) //write new pos exactly once, correctly - { Q->insertPos = startOfData; - } - else - { Q->insertPos++; - } - return TRUE; - } - //Q is full - return FALSE; - } +/* + * Copyright 2009 OpenSourceStewardshipFoundation.org + * Licensed under GNU General Public License version 2 + * + * NOTE: this version of SRSW correct as of April 25, 2010 + * + * Author: seanhalle@yahoo.com + */ + + +#include +#include +#include +#include + +#include "PrivateQueue.h" + + + +//=========================================================================== + +/*This kind of queue is private to a single core at a time -- has no + * synchronizations + */ + +PrivQueueStruc* makePrivQ() + { + PrivQueueStruc* retQ; + retQ = (PrivQueueStruc *) malloc( sizeof( PrivQueueStruc ) ); + + retQ->startOfData = malloc( 1024 * sizeof(void *) ); + memset( retQ->startOfData, 0, 1024 * sizeof(void *) ); + retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty + retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be + retQ->endOfData = &(retQ->startOfData[1023]); + + return retQ; + } + + +void +enlargePrivQ( PrivQueueStruc *Q ) + { int oldSize, newSize; + void **oldStartOfData; + + oldSize = Q->endOfData - Q->startOfData; + newSize = 2 * oldSize; + oldStartOfData = Q->startOfData; + Q->startOfData = malloc( newSize * sizeof(void *) ); + memcpy(Q->startOfData, oldStartOfData, oldSize * sizeof(void *)); + free(oldStartOfData); + + Q->extractPos = &(Q->startOfData[0]); //side by side == empty + Q->insertPos = &(Q->startOfData[1]); // so start pos's have to be + Q->endOfData = &(Q->startOfData[newSize - 1]); + } + + +/*Returns NULL when queue is empty + */ +void* readPrivQ( PrivQueueStruc* Q ) + { void *out = 0; + void **startOfData = Q->startOfData; + void **endOfData = Q->endOfData; + + void **insertPos = Q->insertPos; + void **extractPos = Q->extractPos; + + //if not empty -- (extract is just below insert when empty) + if( insertPos - extractPos != 1 && + !(extractPos == endOfData && insertPos == startOfData)) + { //move before read + if( extractPos == endOfData ) //write new pos exactly once, correctly + { Q->extractPos = startOfData; //can't overrun then fix it 'cause + } // other thread might read bad pos + else + { Q->extractPos++; + } + out = *(Q->extractPos); + return out; + } + //Q is empty + return NULL; + } + + +/*Expands the queue size automatically when it's full + */ +void +writePrivQ( void * in, PrivQueueStruc* Q ) + { + void **startOfData = Q->startOfData; + void **endOfData = Q->endOfData; + + void **insertPos = Q->insertPos; + void **extractPos = Q->extractPos; + +tryAgain: + //Full? (insert is just below extract when full) + if( extractPos - insertPos != 1 && + !(insertPos == endOfData && extractPos == startOfData)) + { *(Q->insertPos) = in; //insert before move + if( insertPos == endOfData ) //write new pos exactly once, correctly + { Q->insertPos = startOfData; + } + else + { Q->insertPos++; + } + return; + } + //Q is full + enlargePrivQ( Q ); + goto tryAgain; + } + + +/*Returns false when the queue was full. + * have option of calling make_larger_PrivQ to make more room, then try again + */ +int writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ) + { + void **startOfData = Q->startOfData; + void **endOfData = Q->endOfData; + + void **insertPos = Q->insertPos; + void **extractPos = Q->extractPos; + + if( extractPos - insertPos != 1 && + !(insertPos == endOfData && extractPos == startOfData)) + { *(Q->insertPos) = in; //insert before move + if( insertPos == endOfData ) //write new pos exactly once, correctly + { Q->insertPos = startOfData; + } + else + { Q->insertPos++; + } + return TRUE; + } + //Q is full + return FALSE; + } diff -r 53c614b781ce -r 1ed562d601d9 PrivateQueue.h --- a/PrivateQueue.h Thu Nov 04 17:50:29 2010 -0700 +++ b/PrivateQueue.h Tue Feb 07 12:51:29 2012 -0800 @@ -1,37 +1,37 @@ -/* - * Copyright 2009 OpenSourceStewardshipFoundation.org - * Licensed under GNU General Public License version 2 - * - * Author: seanhalle@yahoo.com - */ - -#ifndef _PRIVATE_QUEUE_H -#define _PRIVATE_QUEUE_H - -#include - -#define TRUE 1 -#define FALSE 0 - -#define LOCKED 1 -#define UNLOCKED 0 - - -/* It is the data that is shared so only need one mutex. */ -typedef struct - { void **insertPos; - void **extractPos; - void **startOfData; //data is pointers - void **endOfData; //set when alloc data - } -PrivQueueStruc; - - -PrivQueueStruc* makePrivQ ( ); -void* readPrivQ ( PrivQueueStruc *Q ); -void writePrivQ( void *in, PrivQueueStruc *Q ); -int writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ); //return - // false when full - -#endif /* _PRIVATE_QUEUE_H */ - +/* + * Copyright 2009 OpenSourceStewardshipFoundation.org + * Licensed under GNU General Public License version 2 + * + * Author: seanhalle@yahoo.com + */ + +#ifndef _PRIVATE_QUEUE_H +#define _PRIVATE_QUEUE_H + +#include + +#define TRUE 1 +#define FALSE 0 + +#define LOCKED 1 +#define UNLOCKED 0 + + +/* It is the data that is shared so only need one mutex. */ +typedef struct + { void **insertPos; + void **extractPos; + void **startOfData; //data is pointers + void **endOfData; //set when alloc data + } +PrivQueueStruc; + + +PrivQueueStruc* makePrivQ ( ); +void* readPrivQ ( PrivQueueStruc *Q ); +void writePrivQ( void *in, PrivQueueStruc *Q ); +int writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ); //return + // false when full + +#endif /* _PRIVATE_QUEUE_H */ +