Me@0: /* Me@1: * Copyright 2009 OpenSourceStewardshipFoundation.org Me@0: * Licensed under GNU General Public License version 2 Me@0: * Me@0: * Author: seanhalle@yahoo.com Me@0: */ Me@0: Me@0: Me@0: #include Me@0: #include Me@0: #include Me@0: #include Me@1: #include Me@1: #include Me@0: Me@0: #include "BlockingQueue.h" Me@0: Me@0: #define INC(x) (++x == 1024) ? (x) = 0 : (x) Me@0: Me@1: #define SPINLOCK_TRIES 100000 Me@0: Me@0: //=========================================================================== Me@0: //Normal pthread Q Me@0: Me@0: QueueStruc* makeQ() Me@0: { Me@0: QueueStruc* retQ; Me@0: int status; Me@0: retQ = (QueueStruc *) malloc( sizeof( QueueStruc ) ); Me@0: Me@0: Me@0: status = pthread_mutex_init( &retQ->mutex_t, NULL); Me@0: if (status < 0) Me@0: { Me@0: perror("Error in creating mutex:"); Me@0: exit(1); Me@0: return NULL; Me@0: } Me@0: Me@0: status = pthread_cond_init ( &retQ->cond_w_t, NULL); Me@0: if (status < 0) Me@0: { Me@0: perror("Error in creating cond_var:"); Me@0: exit(1); Me@0: return NULL; Me@0: } Me@0: Me@0: status = pthread_cond_init ( &retQ->cond_r_t, NULL); Me@0: if (status < 0) Me@0: { Me@0: perror("Error in creating cond_var:"); Me@0: exit(1); Me@0: return NULL; Me@0: } Me@0: Me@0: retQ->count = 0; Me@0: retQ->readPos = 0; Me@0: retQ->writePos = 0; Me@0: retQ -> w_empty = retQ -> w_full = 0; Me@0: Me@0: return retQ; Me@0: } Me@0: Me@0: void * readQ( QueueStruc *Q ) Me@0: { void *ret; Me@0: int status, wt; Me@0: pthread_mutex_lock( &Q->mutex_t ); Me@0: { Me@0: while( Q -> count == 0 ) Me@0: { Q -> w_empty = 1; Me@1: // pthread_cond_broadcast( &Q->cond_w_t ); Me@0: status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); Me@0: if (status != 0) Me@0: { perror("Thread wait error: "); Me@0: exit(1); Me@0: } Me@0: } Me@0: Q -> w_empty = 0; Me@0: Q -> count -= 1; Me@0: ret = Q->data[ Q->readPos ]; Me@0: INC( Q->readPos ); Me@0: wt = Q -> w_full; Me@0: Q -> w_full = 0; Me@1: //pthread_cond_broadcast( &Q->cond_w_t ); Me@0: } Me@0: pthread_mutex_unlock( &Q->mutex_t ); Me@0: if (wt) pthread_cond_signal( &Q->cond_w_t ); Me@0: Me@0: return( ret ); Me@0: } Me@0: Me@0: void writeQ( void * in, QueueStruc* Q ) Me@0: { Me@0: int status, wt; Me@0: pthread_mutex_lock( &Q->mutex_t ); Me@0: { Me@0: while( Q->count >= 1024 ) Me@0: { Me@0: Q -> w_full = 1; Me@0: // pthread_cond_broadcast( &Q->cond_r_t ); Me@0: status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); Me@0: if (status != 0) Me@0: { perror("Thread wait error: "); Me@0: exit(1); Me@0: } Me@0: } Me@0: Q -> w_full = 0; Me@0: Q->count += 1; Me@0: Q->data[ Q->writePos ] = in; Me@0: INC( Q->writePos ); Me@0: wt = Q -> w_empty; Me@0: Q -> w_empty = 0; Me@0: // pthread_cond_broadcast( &Q->cond_r_t ); Me@0: } Me@0: pthread_mutex_unlock( &Q->mutex_t ); Me@0: if( wt ) pthread_cond_signal( &Q->cond_r_t ); Me@0: } Me@0: Me@0: Me@0: //=========================================================================== Me@0: // multi reader multi writer fast Q via CAS Me@0: #ifndef _GNU_SOURCE Me@0: #define _GNU_SOURCE Me@0: Me@0: /*This is a blocking queue, but it uses CAS instr plus yield() when empty Me@0: * or full Me@0: *It uses CAS because it's meant to have more than one reader and more than Me@0: * one writer. Me@0: */ Me@0: Me@0: CASQueueStruc* makeCASQ() Me@0: { Me@0: CASQueueStruc* retQ; Me@0: retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) ); Me@0: Me@0: retQ->insertLock = UNLOCKED; Me@0: retQ->extractLock= UNLOCKED; Me@0: //TODO: check got pointer syntax right Me@0: retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty Me@0: retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be Me@0: retQ->endOfData = &(retQ->startOfData[1023]); Me@0: Me@0: return retQ; Me@0: } Me@0: Me@0: Me@0: void* readCASQ( CASQueueStruc* Q ) Me@1: { void *out = 0; Me@1: int tries = 0; Me@1: void **startOfData = Q->startOfData; Me@1: void **endOfData = Q->endOfData; Me@1: Me@0: int success = FALSE; Me@0: Me@0: while( !success ) Me@0: { success = Me@0: __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); Me@0: if( success ) Me@0: { Me@1: void **insertPos = Q->insertPos; Me@1: void **extractPos = Q->extractPos; Me@0: Me@0: //if not empty -- extract just below insert when empty Me@0: if( insertPos - extractPos != 1 && Me@0: !(extractPos == endOfData && insertPos == startOfData)) Me@0: { //move before read Me@0: if( extractPos == endOfData ) //write new pos exactly once, correctly Me@0: { Q->extractPos = startOfData; //can't overrun then fix it 'cause Me@0: } // other thread might read bad pos Me@0: else Me@0: { Q->extractPos++; Me@0: } Me@0: out = *(Q->extractPos); Me@0: Q->extractLock = UNLOCKED; Me@0: return out; Me@0: } Me@0: else //Q is empty Me@0: { success = FALSE; Me@0: Q->extractLock = UNLOCKED;//have to try again, release for others Me@0: } Me@0: } Me@0: //Q is busy or empty Me@0: tries++; Me@1: if( tries > SPINLOCK_TRIES ) SwitchToThread(); //WinAPI yield() Me@0: } Me@0: } Me@0: Me@0: void writeCASQ( void * in, CASQueueStruc* Q ) Me@0: { Me@0: int tries = 0; Me@1: //TODO: need to make Q volatile? Want to do this Q in assembly! Me@1: //Have no idea what GCC's going to do to this code Me@1: void **startOfData = Q->startOfData; Me@1: void **endOfData = Q->endOfData; Me@1: Me@0: int success = FALSE; Me@0: Me@0: while( !success ) Me@0: { success = Me@0: __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); Me@0: if( success ) Me@0: { Me@1: void **insertPos = Q->insertPos; Me@1: void **extractPos = Q->extractPos; Me@0: Me@0: //check if room to insert.. can't use a count variable Me@0: // 'cause both insertor Thd and extractor Thd would write it Me@0: if( extractPos - insertPos != 1 && Me@0: !(insertPos == endOfData && extractPos == startOfData)) Me@1: { *(Q->insertPos) = in; //insert before move Me@0: if( insertPos == endOfData ) //write new pos exactly once, correctly Me@0: { Q->insertPos = startOfData; Me@0: } Me@0: else Me@0: { Q->insertPos++; Me@0: } Me@0: Q->insertLock = UNLOCKED; Me@0: return; Me@0: } Me@0: else //Q is full Me@0: { success = FALSE; Me@0: Q->insertLock = UNLOCKED;//have to try again, release for others Me@0: } Me@0: } Me@0: tries++; Me@1: if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() Me@0: } Me@0: } Me@0: Me@0: #endif //_GNU_SOURCE Me@0: Me@1: Me@0: //=========================================================================== Me@0: //Single reader single writer super fast Q.. no atomic instrs.. Me@0: Me@0: Me@0: /*This is a blocking queue, but it uses no atomic instructions, just does Me@1: * yield() when empty or full Me@0: * Me@0: *It doesn't need any atomic instructions because only a single thread Me@0: * extracts and only a single thread inserts, and it has no locations that Me@0: * are written by both. It writes before moving and moves before reading, Me@0: * and never lets write position and read position be the same, so dis- Me@0: * synchrony can only ever cause an unnecessary call to yield(), never a Me@0: * wrong value (by monotonicity of movement of pointers, plus single writer Me@0: * to pointers, plus sequence of write before change pointer, plus Me@0: * assumptions that if thread A semantically writes X before Y, then thread Me@0: * B will see the writes in that order.) Me@0: */ Me@0: Me@0: SRSWQueueStruc* makeSRSWQ() Me@0: { Me@0: SRSWQueueStruc* retQ; Me@0: retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) ); Me@0: Me@0: retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty Me@0: retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be Me@0: retQ->endOfData = &(retQ->startOfData[1023]); Me@0: Me@0: return retQ; Me@0: } Me@0: Me@0: Me@0: void* readSRSWQ( SRSWQueueStruc* Q ) Me@0: { void *out = 0; Me@0: int tries = 0; Me@0: Me@0: while( TRUE ) Me@1: { Me@1: if( Q->insertPos - Q->extractPos != 1 && Me@1: !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) Me@1: { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData; Me@1: else Q->extractPos++; //move before read Me@0: out = *(Q->extractPos); Me@0: return out; Me@0: } Me@0: //Q is empty Me@0: tries++; Me@1: if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() Me@0: } Me@0: } Me@0: Me@1: Me@1: void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q ) Me@1: { void *out = 0; Me@1: int tries = 0; Me@1: Me@1: while( TRUE ) Me@1: { Me@1: if( Q->insertPos - Q->extractPos != 1 && Me@1: !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData)) Me@1: { Q->extractPos++; //move before read Me@1: if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData; Me@1: out = *(Q->extractPos); Me@1: return out; Me@1: } Me@1: //Q is empty Me@1: tries++; Me@1: if( tries > 2 ) return 0; //long enough for writer to finish Me@1: } Me@1: } Me@1: Me@1: Me@0: void writeSRSWQ( void * in, SRSWQueueStruc* Q ) Me@0: { Me@0: int tries = 0; Me@0: Me@0: while( TRUE ) Me@1: { Me@1: if( Q->extractPos - Q->insertPos != 1 && Me@1: !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData)) Me@1: { *(Q->insertPos) = in; //insert before move Me@1: if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData; Me@1: else Q->insertPos++; Me@0: return; Me@0: } Me@0: //Q is full Me@0: tries++; Me@1: if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() Me@0: } Me@0: } Me@1: Me@1: Me@1: Me@1: //=========================================================================== Me@1: //Single reader Multiple writer super fast Q.. no atomic instrs.. Me@1: Me@1: Me@1: /*This is a blocking queue, but it uses no atomic instructions, just does Me@1: * yield() when empty or full Me@1: * Me@1: *It doesn't need any atomic instructions because only a single thread Me@1: * extracts and only a single thread inserts, and it has no locations that Me@1: * are written by both. It writes before moving and moves before reading, Me@1: * and never lets write position and read position be the same, so dis- Me@1: * synchrony can only ever cause an unnecessary call to yield(), never a Me@1: * wrong value (by monotonicity of movement of pointers, plus single writer Me@1: * to pointers, plus sequence of write before change pointer, plus Me@1: * assumptions that if thread A semantically writes X before Y, then thread Me@1: * B will see the writes in that order.) Me@1: * Me@1: *The multi-writer version is implemented as a hierarchy. Each writer has Me@1: * its own single-reader single-writer queue. The reader simply does a Me@1: * round-robin harvesting from them. Me@1: * Me@1: *A writer must first register itself with the queue, and receives an ID back Me@1: * It then uses that ID on each write operation. Me@1: * Me@1: *The implementation is: Me@1: *Physically: Me@1: * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s Me@1: * -] it also has read-pointer to the last queue a write was taken from. Me@1: * Me@1: *Action-Patterns: Me@1: * -] To add a writer Me@1: * --]] writer-thread calls addWriterToQ(), remember the ID it returns Me@1: * --]] internally addWriterToQ does: Me@1: * ---]]] if needs more room, makes a larger writer-array Me@1: * ---]]] copies the old writer-array into the new Me@1: * ---]]] makes a new SRSW queue an puts it into the array Me@1: * ---]]] returns the index to the new SRSW queue as the ID Me@1: * -] To write Me@1: * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID Me@1: * --]] this call may block, via repeated yield() calls Me@1: * --]] internally, writeSRMWQ does: Me@1: * ---]]] uses the writerID as index to get the SRSW queue for that writer Me@1: * ---]]] performs writeQ on that queue (may block via repeated yield calls) Me@1: * -] To Read Me@1: * --]] reader calls readSRMWQ, passing the Q struc Me@1: * --]] this call may block, via repeated yield() calls Me@1: * --]] internally, readSRMWQ does: Me@1: * ---]]] gets saved index of last SRSW queue read from Me@1: * ---]]] increments index and gets indexed queue Me@1: * ---]]] does a non-blocking read of that queue Me@1: * ---]]] if gets something, saves index and returns that value Me@1: * ---]]] if gets null, then goes to next queue Me@1: * ---]]] if got null from all the queues then does yield() then tries again Me@1: * Me@1: *Note: "0" is used as the value null, so SRSW queues must only contain Me@1: * pointers, and cannot use 0 as a valid pointer value. Me@1: * Me@1: */ Me@1: Me@1: SRMWQueueStruc* makeSRMWQ() Me@1: { SRMWQueueStruc* retQ; Me@1: Me@1: retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) ); Me@1: Me@1: retQ->numInternalQs = 0; Me@1: retQ->internalQsSz = 10; Me@1: retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *)); Me@1: Me@1: retQ->lastQReadFrom = 0; Me@1: Me@1: return retQ; Me@1: } Me@1: Me@1: /* ---]]] if needs more room, makes a larger writer-array Me@1: * ---]]] copies the old writer-array into the new Me@1: * ---]]] makes a new SRSW queue an puts it into the array Me@1: * ---]]] returns the index to the new SRSW queue as the ID Me@1: * Me@1: *NOTE: assuming all adds are completed before any writes or reads are Me@1: * performed.. otherwise, this needs to be re-done carefully, probably with Me@1: * a lock. Me@1: */ Me@1: int addWriterToSRMWQ( SRMWQueueStruc* Q ) Me@1: { int oldSz, i; Me@1: SRSWQueueStruc * *oldArray; Me@1: Me@1: (Q->numInternalQs)++; Me@1: if( Q->numInternalQs >= Q->internalQsSz ) Me@1: { //full, so make bigger Me@1: oldSz = Q->internalQsSz; Me@1: oldArray = Q->internalQs; Me@1: Q->internalQsSz *= 2; Me@1: Q->internalQs = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *)); Me@1: for( i = 0; i < oldSz; i++ ) Me@1: { Q->internalQs[i] = oldArray[i]; Me@1: } Me@1: free( oldArray ); Me@1: } Me@1: Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ(); Me@1: return Q->numInternalQs - 1; Me@1: } Me@1: Me@1: Me@1: /* ---]]] gets saved index of last SRSW queue read-from Me@1: * ---]]] increments index and gets indexed queue Me@1: * ---]]] does a non-blocking read of that queue Me@1: * ---]]] if gets something, saves index and returns that value Me@1: * ---]]] if gets null, then goes to next queue Me@1: * ---]]] if got null from all the queues then does yield() then tries again Me@1: */ Me@1: void* readSRMWQ( SRMWQueueStruc* Q ) Me@1: { SRSWQueueStruc *readQ; Me@1: void *readValue = 0; Me@1: int tries = 0; Me@1: int QToReadFrom = 0; Me@1: Me@1: QToReadFrom = Q->lastQReadFrom; Me@1: Me@1: while( TRUE ) Me@1: { QToReadFrom++; Me@1: if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0; Me@1: readQ = Q->internalQs[ QToReadFrom ]; Me@1: readValue = readSRSWQ_NonBlocking( readQ ); Me@1: Me@1: if( readValue != 0 ) //got a value, return it Me@1: { Q->lastQReadFrom = QToReadFrom; Me@1: return readValue; Me@1: } Me@1: else //SRSW Q just read is empty Me@1: { //check if all queues have been tried Me@1: if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty Me@1: { tries++; //give a writer a chance to finish before yield Me@1: if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() Me@1: } Me@1: } Me@1: } Me@1: } Me@1: Me@1: Me@1: /* Me@1: * ---]]] uses the writerID as index to get the SRSW queue for that writer Me@1: * ---]]] performs writeQ on that queue (may block via repeated yield calls) Me@1: */ Me@1: void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID ) Me@1: { Me@1: if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error Me@1: Me@1: writeSRSWQ( in, Q->internalQs[ writerID ] ); Me@1: }