Me@19: /* seanhalle@44: * Copyright 2009 OpenSourceResearchInstitute.org Me@19: * Licensed under GNU General Public License version 2 Me@19: * Me@19: * Author: seanhalle@yahoo.com Me@19: */ Me@19: Me@19: Me@19: #include Me@19: #include Me@19: #include Me@19: #include Me@19: #include seanhalle@29: #include Me@19: Me@19: #include "BlockingQueue.h" Me@19: Me@19: #define INC(x) (++x == 1024) ? (x) = 0 : (x) Me@19: Me@19: #define SPINLOCK_TRIES 100000 Me@19: Me@19: Me@19: Me@19: //=========================================================================== Me@19: // multi reader multi writer fast Q via CAS Me@19: #ifndef _GNU_SOURCE Me@19: #define _GNU_SOURCE Me@19: Me@19: /*This is a blocking queue, but it uses CAS instr plus yield() when empty Me@19: * or full Me@19: *It uses CAS because it's meant to have more than one reader and more than Me@19: * one writer. Me@19: */ Me@19: Me@19: CASQueueStruc* makeCASQ() Me@19: { Me@19: CASQueueStruc* retQ; seanhalle@43: retQ = (CASQueueStruc *) PR_WL__malloc( sizeof( CASQueueStruc ) ); Me@19: Me@19: retQ->insertLock = UNLOCKED; Me@19: retQ->extractLock= UNLOCKED; seanhalle@29: seanhalle@29: retQ->extractPos = (volatile void**)&(retQ->startOfData[0]); //side by side == empty seanhalle@29: retQ->insertPos = (volatile void**)&(retQ->startOfData[1]); // so start pos's have to be Me@19: retQ->endOfData = &(retQ->startOfData[1023]); Me@19: Me@19: return retQ; Me@19: } Me@19: Me@19: Me@19: void* readCASQ( CASQueueStruc* Q ) Me@19: { void *out = 0; seanhalle@44: int32 tries = 0; Me@19: void **startOfData = Q->startOfData; Me@19: void **endOfData = Q->endOfData; Me@19: seanhalle@44: int32 gotLock = FALSE; Me@19: Me@19: while( TRUE ) Me@19: { //this intrinsic returns true if the lock held "UNLOCKED", in which Me@19: // case it now holds "LOCKED" -- if it already held "LOCKED", then Me@19: // gotLock is FALSE Me@19: gotLock = Me@19: __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); Me@19: //NOTE: checked assy, and it does lock correctly.. Me@19: if( gotLock ) Me@19: { seanhalle@29: void **insertPos = (void **)Q->insertPos; seanhalle@29: void **extractPos = (void **)Q->extractPos; Me@19: Me@19: //if not empty -- extract just below insert when empty Me@19: if( insertPos - extractPos != 1 && Me@19: !(extractPos == endOfData && insertPos == startOfData)) Me@19: { //move before read Me@19: if( extractPos == endOfData ) //write new pos exactly once, correctly Me@19: { Q->extractPos = startOfData; //can't overrun then fix it 'cause Me@19: } // other thread might read bad pos Me@19: else Me@19: { Q->extractPos++; Me@19: } seanhalle@29: out = (void *) *(Q->extractPos); Me@19: Q->extractLock = UNLOCKED; Me@19: return out; Me@19: } Me@19: else //Q is empty Me@19: { Q->extractLock = UNLOCKED;//empty, so release lock for others Me@19: } Me@19: } Me@19: //Q is busy or empty Me@19: tries++; Me@19: if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable Me@19: } Me@19: } Me@19: Me@19: void writeCASQ( void * in, CASQueueStruc* Q ) Me@19: { seanhalle@44: int32 tries = 0; Me@19: //TODO: need to make Q volatile? Want to do this Q in assembly! Me@19: //Have no idea what GCC's going to do to this code Me@19: void **startOfData = Q->startOfData; Me@19: void **endOfData = Q->endOfData; Me@19: seanhalle@44: int32 gotLock = FALSE; Me@19: Me@19: while( TRUE ) Me@19: { //this intrinsic returns true if the lock held "UNLOCKED", in which Me@19: // case it now holds "LOCKED" -- if it already held "LOCKED", then Me@19: // gotLock is FALSE Me@19: gotLock = Me@19: __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); Me@19: if( gotLock ) Me@19: { seanhalle@29: void **insertPos = (void **)Q->insertPos; seanhalle@29: void **extractPos = (void **)Q->extractPos; Me@19: Me@19: //check if room to insert.. can't use a count variable Me@19: // 'cause both insertor Thd and extractor Thd would write it Me@19: if( extractPos - insertPos != 1 && Me@19: !(insertPos == endOfData && extractPos == startOfData)) Me@19: { *(Q->insertPos) = in; //insert before move Me@19: if( insertPos == endOfData ) Me@19: { Q->insertPos = startOfData; Me@19: } Me@19: else Me@19: { Q->insertPos++; Me@19: } Me@19: Q->insertLock = UNLOCKED; Me@19: return; Me@19: } Me@19: else //Q is full Me@19: { Q->insertLock = UNLOCKED;//full, so release lock for others Me@19: } Me@19: } Me@19: tries++; Me@19: if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable Me@19: } Me@19: } Me@19: Me@19: #endif //_GNU_SOURCE Me@19: Me@19: Me@19: //=========================================================================== Me@19: //Single reader single writer super fast Q.. no atomic instrs.. Me@19: Me@19: Me@19: /*This is a blocking queue, but it uses no atomic instructions, just does Me@19: * yield() when empty or full Me@19: * Me@19: *It doesn't need any atomic instructions because only a single thread Me@19: * extracts and only a single thread inserts, and it has no locations that Me@19: * are written by both. It writes before moving and moves before reading, Me@19: * and never lets write position and read position be the same, so dis- Me@19: * synchrony can only ever cause an unnecessary call to yield(), never a Me@19: * wrong value (by monotonicity of movement of pointers, plus single writer Me@19: * to pointers, plus sequence of write before change pointer, plus Me@19: * assumptions that if thread A semantically writes X before Y, then thread Me@19: * B will see the writes in that order.) Me@19: */ Me@19: Me@19: SRSWQueueStruc* makeSRSWQ() Me@19: { Me@19: SRSWQueueStruc* retQ; seanhalle@43: retQ = (SRSWQueueStruc *) PR_WL__malloc( sizeof( SRSWQueueStruc ) ); Me@19: memset( retQ->startOfData, 0, 1024 * sizeof(void *) ); Me@19: Me@19: retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty Me@19: retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be Me@19: retQ->endOfData = &(retQ->startOfData[1023]); Me@19: Me@19: return retQ; Me@19: } Me@19: Me@19: void Me@19: freeSRSWQ( SRSWQueueStruc* Q ) Me@19: { seanhalle@43: PR_int__free( Q ); Me@19: } Me@19: Me@19: void* readSRSWQ( SRSWQueueStruc* Q ) Me@19: { void *out = 0; seanhalle@44: int32 tries = 0; Me@19: Me@19: while( TRUE ) Me@19: { Me@19: if( Q->insertPos - Q->extractPos != 1 && Me@19: !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) Me@19: { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData; Me@19: else Q->extractPos++; //move before read Me@19: out = *(Q->extractPos); Me@19: return out; Me@19: } Me@19: //Q is empty Me@19: tries++; Me@19: if( tries > SPINLOCK_TRIES ) pthread_yield(); Me@19: } Me@19: } Me@19: Me@19: Me@19: void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q ) Me@19: { void *out = 0; seanhalle@44: int32 tries = 0; Me@19: Me@19: while( TRUE ) Me@19: { Me@19: if( Q->insertPos - Q->extractPos != 1 && Me@19: !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) Me@19: { Q->extractPos++; //move before read Me@19: if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData; Me@19: out = *(Q->extractPos); Me@19: return out; Me@19: } Me@19: //Q is empty Me@19: tries++; Me@19: if( tries > 10 ) return NULL; //long enough for writer to finish Me@19: } Me@19: } Me@19: Me@19: Me@19: void writeSRSWQ( void * in, SRSWQueueStruc* Q ) Me@19: { seanhalle@44: int32 tries = 0; Me@19: Me@19: while( TRUE ) Me@19: { Me@19: if( Q->extractPos - Q->insertPos != 1 && Me@19: !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData)) Me@19: { *(Q->insertPos) = in; //insert before move Me@19: if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData; Me@19: else Q->insertPos++; Me@19: return; Me@19: } Me@19: //Q is full Me@19: tries++; Me@19: if( tries > SPINLOCK_TRIES ) pthread_yield(); Me@19: } Me@19: } Me@19: Me@19: Me@19: Me@19: //=========================================================================== Me@19: //Single reader Multiple writer super fast Q.. no atomic instrs.. Me@19: Me@19: Me@19: /*This is a blocking queue, but it uses no atomic instructions, just does Me@19: * yield() when empty or full Me@19: * Me@19: *It doesn't need any atomic instructions because only a single thread Me@19: * extracts and only a single thread inserts, and it has no locations that Me@19: * are written by both. It writes before moving and moves before reading, Me@19: * and never lets write position and read position be the same, so dis- Me@19: * synchrony can only ever cause an unnecessary call to yield(), never a Me@19: * wrong value (by monotonicity of movement of pointers, plus single writer Me@19: * to pointers, plus sequence of write before change pointer, plus Me@19: * assumptions that if thread A semantically writes X before Y, then thread Me@19: * B will see the writes in that order.) Me@19: * Me@19: *The multi-writer version is implemented as a hierarchy. Each writer has Me@19: * its own single-reader single-writer queue. The reader simply does a Me@19: * round-robin harvesting from them. Me@19: * Me@19: *A writer must first register itself with the queue, and receives an ID back Me@19: * It then uses that ID on each write operation. Me@19: * Me@19: *The implementation is: Me@19: *Physically: Me@19: * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s Me@19: * -] it also has read-pointer to the last queue a write was taken from. Me@19: * Me@19: *Action-Patterns: Me@19: * -] To add a writer Me@19: * --]] writer-thread calls addWriterToQ(), remember the ID it returns Me@19: * --]] internally addWriterToQ does: Me@19: * ---]]] if needs more room, makes a larger writer-array Me@19: * ---]]] copies the old writer-array into the new Me@19: * ---]]] makes a new SRSW queue an puts it into the array Me@19: * ---]]] returns the index to the new SRSW queue as the ID Me@19: * -] To write Me@19: * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID Me@19: * --]] this call may block, via repeated yield() calls Me@19: * --]] internally, writeSRMWQ does: Me@19: * ---]]] uses the writerID as index to get the SRSW queue for that writer Me@19: * ---]]] performs writeQ on that queue (may block via repeated yield calls) Me@19: * -] To Read Me@19: * --]] reader calls readSRMWQ, passing the Q struc Me@19: * --]] this call may block, via repeated yield() calls Me@19: * --]] internally, readSRMWQ does: Me@19: * ---]]] gets saved index of last SRSW queue read from Me@19: * ---]]] increments index and gets indexed queue Me@19: * ---]]] does a non-blocking read of that queue Me@19: * ---]]] if gets something, saves index and returns that value Me@19: * ---]]] if gets null, then goes to next queue Me@19: * ---]]] if got null from all the queues then does yield() then tries again Me@19: * Me@19: *Note: "0" is used as the value null, so SRSW queues must only contain Me@19: * pointers, and cannot use 0 as a valid pointer value. Me@19: * Me@19: */ Me@19: Me@19: SRMWQueueStruc* makeSRMWQ() Me@19: { SRMWQueueStruc* retQ; Me@19: seanhalle@43: retQ = (SRMWQueueStruc *) PR_WL__malloc( sizeof( SRMWQueueStruc ) ); Me@19: Me@19: retQ->numInternalQs = 0; Me@19: retQ->internalQsSz = 10; seanhalle@43: retQ->internalQs = PR_WL__malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *)); Me@19: Me@19: retQ->lastQReadFrom = 0; Me@19: Me@19: return retQ; Me@19: } Me@19: Me@19: /* ---]]] if needs more room, makes a larger writer-array Me@19: * ---]]] copies the old writer-array into the new Me@19: * ---]]] makes a new SRSW queue an puts it into the array Me@19: * ---]]] returns the index to the new SRSW queue as the ID Me@19: * Me@19: *NOTE: assuming all adds are completed before any writes or reads are Me@19: * performed.. otherwise, this needs to be re-done carefully, probably with Me@19: * a lock. Me@19: */ Me@19: int addWriterToSRMWQ( SRMWQueueStruc* Q ) Me@19: { int oldSz, i; Me@19: SRSWQueueStruc * *oldArray; Me@19: Me@19: (Q->numInternalQs)++; Me@19: if( Q->numInternalQs >= Q->internalQsSz ) Me@19: { //full, so make bigger Me@19: oldSz = Q->internalQsSz; Me@19: oldArray = Q->internalQs; Me@19: Q->internalQsSz *= 2; seanhalle@43: Q->internalQs = PR_WL__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *)); Me@19: for( i = 0; i < oldSz; i++ ) Me@19: { Q->internalQs[i] = oldArray[i]; Me@19: } seanhalle@43: PR_int__free( oldArray ); Me@19: } Me@19: Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ(); Me@19: return Q->numInternalQs - 1; Me@19: } Me@19: Me@19: Me@19: /* ---]]] gets saved index of last SRSW queue read-from Me@19: * ---]]] increments index and gets indexed queue Me@19: * ---]]] does a non-blocking read of that queue Me@19: * ---]]] if gets something, saves index and returns that value Me@19: * ---]]] if gets null, then goes to next queue Me@19: * ---]]] if got null from all the queues then does yield() then tries again Me@19: */ Me@19: void* readSRMWQ( SRMWQueueStruc* Q ) Me@19: { SRSWQueueStruc *readQ; Me@19: void *readValue = 0; seanhalle@44: int32 tries = 0; seanhalle@44: int32 QToReadFrom = 0; Me@19: Me@19: QToReadFrom = Q->lastQReadFrom; Me@19: Me@19: while( TRUE ) Me@19: { QToReadFrom++; Me@19: if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0; Me@19: readQ = Q->internalQs[ QToReadFrom ]; Me@19: readValue = readSRSWQ_NonBlocking( readQ ); Me@19: Me@19: if( readValue != 0 ) //got a value, return it Me@19: { Q->lastQReadFrom = QToReadFrom; Me@19: return readValue; Me@19: } Me@19: else //SRSW Q just read is empty Me@19: { //check if all queues have been tried Me@19: if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty Me@19: { tries++; //give a writer a chance to finish before yield Me@19: if( tries > SPINLOCK_TRIES ) pthread_yield(); Me@19: } Me@19: } Me@19: } Me@19: } Me@19: Me@19: Me@19: /* Me@19: * ---]]] uses the writerID as index to get the SRSW queue for that writer Me@19: * ---]]] performs writeQ on that queue (may block via repeated yield calls) Me@19: */ Me@19: void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID ) Me@19: { Me@19: if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error Me@19: Me@19: writeSRSWQ( in, Q->internalQs[ writerID ] ); Me@19: }