Me@0: /* Me@0: * Copyright 2009 OpenSourceCodeStewardshipFoundation.org Me@0: * Licensed under GNU General Public License version 2 Me@0: * Me@0: * NOTE: this version of SRSW correct as of April 25, 2010 Me@0: * Me@0: * Author: seanhalle@yahoo.com Me@0: */ Me@0: Me@0: Me@0: #include Me@0: #include Me@0: #include Me@0: #include Me@0: Me@0: #include "BlockingQueue.h" Me@0: Me@0: #define INC(x) (++x == 1024) ? (x) = 0 : (x) Me@0: Me@0: Me@0: //=========================================================================== Me@0: //Normal pthread Q Me@0: Me@0: QueueStruc* makeQ() Me@0: { Me@0: QueueStruc* retQ; Me@0: int status; Me@0: retQ = (QueueStruc *) malloc( sizeof( QueueStruc ) ); Me@0: Me@0: Me@0: status = pthread_mutex_init( &retQ->mutex_t, NULL); Me@0: if (status < 0) Me@0: { Me@0: perror("Error in creating mutex:"); Me@0: exit(1); Me@0: return NULL; Me@0: } Me@0: Me@0: status = pthread_cond_init ( &retQ->cond_w_t, NULL); Me@0: if (status < 0) Me@0: { Me@0: perror("Error in creating cond_var:"); Me@0: exit(1); Me@0: return NULL; Me@0: } Me@0: Me@0: status = pthread_cond_init ( &retQ->cond_r_t, NULL); Me@0: if (status < 0) Me@0: { Me@0: perror("Error in creating cond_var:"); Me@0: exit(1); Me@0: return NULL; Me@0: } Me@0: Me@0: retQ->count = 0; Me@0: retQ->readPos = 0; Me@0: retQ->writePos = 0; Me@0: retQ -> w_empty = retQ -> w_full = 0; Me@0: Me@0: return retQ; Me@0: } Me@0: Me@0: void * readQ( QueueStruc *Q ) Me@0: { void *ret; Me@0: int status, wt; Me@0: pthread_mutex_lock( &Q->mutex_t ); Me@0: { Me@0: while( Q -> count == 0 ) Me@0: { Q -> w_empty = 1; Me@0: status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); Me@0: if (status != 0) Me@0: { perror("Thread wait error: "); Me@0: exit(1); Me@0: } Me@0: } Me@0: Q -> w_empty = 0; Me@0: Q -> count -= 1; Me@0: ret = Q->data[ Q->readPos ]; Me@0: INC( Q->readPos ); Me@0: wt = Q -> w_full; Me@0: Q -> w_full = 0; Me@0: } Me@0: pthread_mutex_unlock( &Q->mutex_t ); Me@0: if (wt) pthread_cond_signal( &Q->cond_w_t ); Me@0: Me@0: return( ret ); Me@0: } Me@0: Me@0: void writeQ( void * in, QueueStruc* Q ) Me@0: { Me@0: int status, wt; Me@0: pthread_mutex_lock( &Q->mutex_t ); Me@0: { Me@0: while( Q->count >= 1024 ) Me@0: { Me@0: Q -> w_full = 1; Me@0: // pthread_cond_broadcast( &Q->cond_r_t ); Me@0: status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); Me@0: if (status != 0) Me@0: { perror("Thread wait error: "); Me@0: exit(1); Me@0: } Me@0: } Me@0: Q -> w_full = 0; Me@0: Q->count += 1; Me@0: Q->data[ Q->writePos ] = in; Me@0: INC( Q->writePos ); Me@0: wt = Q -> w_empty; Me@0: Q -> w_empty = 0; Me@0: // pthread_cond_broadcast( &Q->cond_r_t ); Me@0: } Me@0: pthread_mutex_unlock( &Q->mutex_t ); Me@0: if( wt ) pthread_cond_signal( &Q->cond_r_t ); Me@0: } Me@0: Me@0: Me@0: //=========================================================================== Me@0: // multi reader multi writer fast Q via CAS Me@0: #ifndef _GNU_SOURCE Me@0: #define _GNU_SOURCE Me@0: Me@0: /*This is a blocking queue, but it uses CAS instr plus yield() when empty Me@0: * or full Me@0: *It uses CAS because it's meant to have more than one reader and more than Me@0: * one writer. Me@0: */ Me@0: Me@0: CASQueueStruc* makeCASQ() Me@0: { Me@0: CASQueueStruc* retQ; Me@0: retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) ); Me@0: Me@0: retQ->insertLock = UNLOCKED; Me@0: retQ->extractLock= UNLOCKED; Me@0: //TODO: check got pointer syntax right Me@0: retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty Me@0: retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be Me@0: retQ->endOfData = &(retQ->startOfData[1023]); Me@0: Me@0: return retQ; Me@0: } Me@0: Me@0: Me@0: void* readCASQ( CASQueueStruc* Q ) Me@0: { void *out = 0; Me@0: int tries = 0; Me@0: int startOfData = Q->startOfData; Me@0: int endOfData = Q->endOfData; Me@0: Me@0: int success = FALSE; Me@0: Me@0: while( !success ) Me@0: { success = Me@0: __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); Me@0: if( success ) Me@0: { Me@0: volatile int insertPos = Q->insertPos; Me@0: volatile int extractPos = Q->extractPos; Me@0: Me@0: //if not empty -- extract just below insert when empty Me@0: if( insertPos - extractPos != 1 && Me@0: !(extractPos == endOfData && insertPos == startOfData)) Me@0: { //move before read Me@0: if( extractPos == endOfData ) //write new pos exactly once, correctly Me@0: { Q->extractPos = startOfData; //can't overrun then fix it 'cause Me@0: } // other thread might read bad pos Me@0: else Me@0: { Q->extractPos++; Me@0: } Me@0: out = *(Q->extractPos); Me@0: Q->extractLock = UNLOCKED; Me@0: return out; Me@0: } Me@0: else //Q is empty Me@0: { success = FALSE; Me@0: Q->extractLock = UNLOCKED;//have to try again, release for others Me@0: } Me@0: } Me@0: //Q is busy or empty Me@0: tries++; Me@0: if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock Me@0: } Me@0: } Me@0: Me@0: void writeCASQ( void * in, CASQueueStruc* Q ) Me@0: { Me@0: int tries = 0; Me@0: int startOfData = Q->startOfData; Me@0: int endOfData = Q->endOfData; Me@0: Me@0: int success = FALSE; Me@0: Me@0: while( !success ) Me@0: { success = Me@0: __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); Me@0: if( success ) Me@0: { Me@0: volatile int insertPos = Q->insertPos; Me@0: volatile int extractPos = Q->extractPos; Me@0: Me@0: //check if room to insert.. can't use a count variable Me@0: // 'cause both insertor Thd and extractor Thd would write it Me@0: if( extractPos - insertPos != 1 && Me@0: !(insertPos == endOfData && extractPos == startOfData)) Me@0: { *(insertPos) = in; //insert before move Me@0: if( insertPos == endOfData ) //write new pos exactly once, correctly Me@0: { Q->insertPos = startOfData; Me@0: } Me@0: else Me@0: { Q->insertPos++; Me@0: } Me@0: Q->insertLock = UNLOCKED; Me@0: return; Me@0: } Me@0: else //Q is full Me@0: { success = FALSE; Me@0: Q->insertLock = UNLOCKED;//have to try again, release for others Me@0: } Me@0: } Me@0: tries++; Me@0: if( tries > 10000 ) pthread_yield();//yield not guaranteed Me@0: } Me@0: } Me@0: Me@0: #endif //_GNU_SOURCE Me@0: Me@0: //=========================================================================== Me@0: //Single reader single writer super fast Q.. no atomic instrs.. Me@0: Me@0: Me@0: /*This is a blocking queue, but it uses no atomic instructions, just does Me@0: * busy-waiting when empty or full (but yield() if waits too long) Me@0: * Me@0: *It doesn't need any atomic instructions because only a single thread Me@0: * extracts and only a single thread inserts, and it has no locations that Me@0: * are written by both. It writes before moving and moves before reading, Me@0: * and never lets write position and read position be the same, so dis- Me@0: * synchrony can only ever cause an unnecessary call to yield(), never a Me@0: * wrong value (by monotonicity of movement of pointers, plus single writer Me@0: * to pointers, plus sequence of write before change pointer, plus Me@0: * assumptions that if thread A semantically writes X before Y, then thread Me@0: * B will see the writes in that order.) Me@0: */ Me@0: Me@0: SRSWQueueStruc* makeSRSWQ() Me@0: { Me@0: SRSWQueueStruc* retQ; Me@0: retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) ); Me@0: Me@0: retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty Me@0: retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be Me@0: retQ->endOfData = &(retQ->startOfData[1023]); Me@0: Me@0: return retQ; Me@0: } Me@0: Me@0: Me@0: void* readSRSWQ( SRSWQueueStruc* Q ) Me@0: { void *out = 0; Me@0: int tries = 0; Me@0: int startOfData = Q->startOfData; Me@0: int endOfData = Q->endOfData; Me@0: Me@0: while( TRUE ) Me@0: { //not certain the volatile reads need to be done, but safe.. Me@0: volatile int insertPos = Q->insertPos; Me@0: volatile int extractPos = Q->extractPos; Me@0: Me@0: //if not empty -- extract just below insert when empty Me@0: if( insertPos - extractPos != 1 && Me@0: !(extractPos == endOfData && insertPos == startOfData)) Me@0: { //move before read Me@0: if( extractPos == endOfData ) //write new pos exactly once, correctly Me@0: { Q->extractPos = startOfData; //can't overrun then fix it 'cause Me@0: } // other thread might read bad pos Me@0: else Me@0: { Q->extractPos++; Me@0: } Me@0: out = *(Q->extractPos); Me@0: return out; Me@0: } Me@0: //Q is empty Me@0: tries++; Me@0: if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock Me@0: } Me@0: } Me@0: Me@0: void writeSRSWQ( void * in, SRSWQueueStruc* Q ) Me@0: { Me@0: int tries = 0; Me@0: int startOfData = Q->startOfData; Me@0: int endOfData = Q->endOfData; Me@0: Me@0: while( TRUE ) Me@0: { //not certain the volatile reads need to be done, but safe.. Me@0: volatile int insertPos = Q->insertPos; Me@0: volatile int extractPos = Q->extractPos; Me@0: Me@0: if( extractPos - insertPos != 1 && Me@0: !(insertPos == endOfData && extractPos == startOfData)) Me@0: { *(insertPos) = in; //insert before move Me@0: if( insertPos == endOfData ) //write new pos exactly once, correctly Me@0: { Q->insertPos = startOfData; Me@0: } Me@0: else Me@0: { Q->insertPos++; Me@0: } Me@0: return; Me@0: } Me@0: //Q is full Me@0: tries++; Me@0: if( tries > 10000 ) pthread_yield();//yield not guaranteed Me@0: } Me@0: }