# HG changeset patch # User Me # Date 1274583069 25200 # Node ID 85af604dee9bacb886250f7e951555ea69137be1 initial add diff -r 000000000000 -r 85af604dee9b BlockingQueue.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/BlockingQueue.c Sat May 22 19:51:09 2010 -0700 @@ -0,0 +1,314 @@ +/* + * Copyright 2009 OpenSourceCodeStewardshipFoundation.org + * Licensed under GNU General Public License version 2 + * + * NOTE: this version of SRSW correct as of April 25, 2010 + * + * Author: seanhalle@yahoo.com + */ + + +#include +#include +#include +#include + +#include "BlockingQueue.h" + +#define INC(x) (++x == 1024) ? (x) = 0 : (x) + + +//=========================================================================== +//Normal pthread Q + +QueueStruc* makeQ() + { + QueueStruc* retQ; + int status; + retQ = (QueueStruc *) malloc( sizeof( QueueStruc ) ); + + + status = pthread_mutex_init( &retQ->mutex_t, NULL); + if (status < 0) + { + perror("Error in creating mutex:"); + exit(1); + return NULL; + } + + status = pthread_cond_init ( &retQ->cond_w_t, NULL); + if (status < 0) + { + perror("Error in creating cond_var:"); + exit(1); + return NULL; + } + + status = pthread_cond_init ( &retQ->cond_r_t, NULL); + if (status < 0) + { + perror("Error in creating cond_var:"); + exit(1); + return NULL; + } + + retQ->count = 0; + retQ->readPos = 0; + retQ->writePos = 0; + retQ -> w_empty = retQ -> w_full = 0; + + return retQ; + } + +void * readQ( QueueStruc *Q ) + { void *ret; + int status, wt; + pthread_mutex_lock( &Q->mutex_t ); + { + while( Q -> count == 0 ) + { Q -> w_empty = 1; + status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); + if (status != 0) + { perror("Thread wait error: "); + exit(1); + } + } + Q -> w_empty = 0; + Q -> count -= 1; + ret = Q->data[ Q->readPos ]; + INC( Q->readPos ); + wt = Q -> w_full; + Q -> w_full = 0; + } + pthread_mutex_unlock( &Q->mutex_t ); + if (wt) pthread_cond_signal( &Q->cond_w_t ); + + return( ret ); + } + +void writeQ( void * in, QueueStruc* Q ) + { + int status, wt; + pthread_mutex_lock( &Q->mutex_t ); + { + while( Q->count >= 1024 ) + { + Q -> w_full = 1; + // pthread_cond_broadcast( &Q->cond_r_t ); + status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); + if (status != 0) + { perror("Thread wait error: "); + exit(1); + } + } + Q -> w_full = 0; + Q->count += 1; + Q->data[ Q->writePos ] = in; + INC( Q->writePos ); + wt = Q -> w_empty; + Q -> w_empty = 0; + // pthread_cond_broadcast( &Q->cond_r_t ); + } + pthread_mutex_unlock( &Q->mutex_t ); + if( wt ) pthread_cond_signal( &Q->cond_r_t ); + } + + +//=========================================================================== +// multi reader multi writer fast Q via CAS +#ifndef _GNU_SOURCE +#define _GNU_SOURCE + +/*This is a blocking queue, but it uses CAS instr plus yield() when empty + * or full + *It uses CAS because it's meant to have more than one reader and more than + * one writer. + */ + +CASQueueStruc* makeCASQ() + { + CASQueueStruc* retQ; + retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) ); + + retQ->insertLock = UNLOCKED; + retQ->extractLock= UNLOCKED; + //TODO: check got pointer syntax right + retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty + retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be + retQ->endOfData = &(retQ->startOfData[1023]); + + return retQ; + } + + +void* readCASQ( CASQueueStruc* Q ) + { void *out = 0; + int tries = 0; + int startOfData = Q->startOfData; + int endOfData = Q->endOfData; + + int success = FALSE; + + while( !success ) + { success = + __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); + if( success ) + { + volatile int insertPos = Q->insertPos; + volatile int extractPos = Q->extractPos; + + //if not empty -- extract just below insert when empty + if( insertPos - extractPos != 1 && + !(extractPos == endOfData && insertPos == startOfData)) + { //move before read + if( extractPos == endOfData ) //write new pos exactly once, correctly + { Q->extractPos = startOfData; //can't overrun then fix it 'cause + } // other thread might read bad pos + else + { Q->extractPos++; + } + out = *(Q->extractPos); + Q->extractLock = UNLOCKED; + return out; + } + else //Q is empty + { success = FALSE; + Q->extractLock = UNLOCKED;//have to try again, release for others + } + } + //Q is busy or empty + tries++; + if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock + } + } + +void writeCASQ( void * in, CASQueueStruc* Q ) + { + int tries = 0; + int startOfData = Q->startOfData; + int endOfData = Q->endOfData; + + int success = FALSE; + + while( !success ) + { success = + __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); + if( success ) + { + volatile int insertPos = Q->insertPos; + volatile int extractPos = Q->extractPos; + + //check if room to insert.. can't use a count variable + // 'cause both insertor Thd and extractor Thd would write it + if( extractPos - insertPos != 1 && + !(insertPos == endOfData && extractPos == startOfData)) + { *(insertPos) = in; //insert before move + if( insertPos == endOfData ) //write new pos exactly once, correctly + { Q->insertPos = startOfData; + } + else + { Q->insertPos++; + } + Q->insertLock = UNLOCKED; + return; + } + else //Q is full + { success = FALSE; + Q->insertLock = UNLOCKED;//have to try again, release for others + } + } + tries++; + if( tries > 10000 ) pthread_yield();//yield not guaranteed + } + } + +#endif //_GNU_SOURCE + +//=========================================================================== +//Single reader single writer super fast Q.. no atomic instrs.. + + +/*This is a blocking queue, but it uses no atomic instructions, just does + * busy-waiting when empty or full (but yield() if waits too long) + * + *It doesn't need any atomic instructions because only a single thread + * extracts and only a single thread inserts, and it has no locations that + * are written by both. It writes before moving and moves before reading, + * and never lets write position and read position be the same, so dis- + * synchrony can only ever cause an unnecessary call to yield(), never a + * wrong value (by monotonicity of movement of pointers, plus single writer + * to pointers, plus sequence of write before change pointer, plus + * assumptions that if thread A semantically writes X before Y, then thread + * B will see the writes in that order.) + */ + +SRSWQueueStruc* makeSRSWQ() + { + SRSWQueueStruc* retQ; + retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) ); + + retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty + retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be + retQ->endOfData = &(retQ->startOfData[1023]); + + return retQ; + } + + +void* readSRSWQ( SRSWQueueStruc* Q ) + { void *out = 0; + int tries = 0; + int startOfData = Q->startOfData; + int endOfData = Q->endOfData; + + while( TRUE ) + { //not certain the volatile reads need to be done, but safe.. + volatile int insertPos = Q->insertPos; + volatile int extractPos = Q->extractPos; + + //if not empty -- extract just below insert when empty + if( insertPos - extractPos != 1 && + !(extractPos == endOfData && insertPos == startOfData)) + { //move before read + if( extractPos == endOfData ) //write new pos exactly once, correctly + { Q->extractPos = startOfData; //can't overrun then fix it 'cause + } // other thread might read bad pos + else + { Q->extractPos++; + } + out = *(Q->extractPos); + return out; + } + //Q is empty + tries++; + if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock + } + } + +void writeSRSWQ( void * in, SRSWQueueStruc* Q ) + { + int tries = 0; + int startOfData = Q->startOfData; + int endOfData = Q->endOfData; + + while( TRUE ) + { //not certain the volatile reads need to be done, but safe.. + volatile int insertPos = Q->insertPos; + volatile int extractPos = Q->extractPos; + + if( extractPos - insertPos != 1 && + !(insertPos == endOfData && extractPos == startOfData)) + { *(insertPos) = in; //insert before move + if( insertPos == endOfData ) //write new pos exactly once, correctly + { Q->insertPos = startOfData; + } + else + { Q->insertPos++; + } + return; + } + //Q is full + tries++; + if( tries > 10000 ) pthread_yield();//yield not guaranteed + } + } diff -r 000000000000 -r 85af604dee9b BlockingQueue.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/BlockingQueue.h Sat May 22 19:51:09 2010 -0700 @@ -0,0 +1,72 @@ +/* + * Copyright 2009 OpenSourceStewardshipFoundation.org + * Licensed under GNU General Public License version 2 + * + * Author: seanhalle@yahoo.com + */ + +#ifndef _BLOCKING_QUEUE_H +#define _BLOCKING_QUEUE_H + +#include + +#define TRUE 1 +#define FALSE 0 + +#define LOCKED 1 +#define UNLOCKED 0 + + +/* It is the data that is shared so only need one mutex. */ +typedef +struct + { + pthread_mutex_t mutex_t; + pthread_cond_t cond_w_t; + pthread_cond_t cond_r_t; + int count; + int readPos; + int writePos; + void* data[1024]; //an array of pointers + int w_empty; + int w_full; + } +QueueStruc; + + +typedef +struct + { int insertLock; + int extractLock; + void* *insertPos; + void* *extractPos; + void* startOfData[1024]; //data is pointers + void* *endOfData; //set when make queue + } +CASQueueStruc; + + +typedef +struct + { void* *insertPos; + void* *extractPos; + void* startOfData[1024]; //data is pointers + void* *endOfData; //set when make queue + } +SRSWQueueStruc; + + +QueueStruc* makeQ(); +void* readQ( QueueStruc *Q ); +void writeQ( void *in, QueueStruc *Q ); + +CASQueueStruc* makeCASQ(); +void* readCASQ( CASQueueStruc *Q ); +void writeCASQ( void *in, CASQueueStruc *Q ); + +SRSWQueueStruc* makeSRSWQ(); +void* readSRSWQ( SRSWQueueStruc *Q ); +void writeSRSWQ( void *in, SRSWQueueStruc *Q ); + +#endif /* _BLOCKING_QUEUE_H */ + diff -r 000000000000 -r 85af604dee9b PrivateQueue.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/PrivateQueue.c Sat May 22 19:51:09 2010 -0700 @@ -0,0 +1,110 @@ +/* + * Copyright 2009 OpenSourceCodeStewardshipFoundation.org + * Licensed under GNU General Public License version 2 + * + * NOTE: this version of SRSW correct as of April 25, 2010 + * + * Author: seanhalle@yahoo.com + */ + + +#include +#include +#include +#include + +#include "PrivateQueue.h" + + + +//=========================================================================== + +/*This kind of queue is private to a single core at a time -- has no + * synchronizations + */ + +PrivQueueStruc* makePrivQ() + { + PrivQueueStruc* retQ; + retQ = (PrivQueueStruc *) malloc( sizeof( PrivQueueStruc ) ); + + retQ->startOfData = malloc( 1024 * sizeof(void *) ); + + retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty + retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be + retQ->endOfData = &(retQ->startOfData[1023]); + + return retQ; + } + +PrivQueueStruc* make_larger_PrivQ( PrivQueueStruc *Q ) + { int oldSize, newSize; + void **oldStartOfData; + + oldSize = Q->endOfData - Q->startOfData; + newSize = 2 * oldSize; + Q->startOfData = malloc( newSize * sizeof(void *) ); + memcpy(Q->startOfData, oldStartOfData, oldSize * sizeof(void *)); + free(oldStartOfData); + + Q->extractPos = &(Q->startOfData[0]); //side by side == empty + Q->insertPos = &(Q->startOfData[1]); // so start pos's have to be + Q->endOfData = &(Q->startOfData[newSize - 1]); + + return Q; + } + + +/*Returns NULL when queue is empty + */ +void* readPrivQ( PrivQueueStruc* Q ) + { void *out = 0; + int startOfData = Q->startOfData; + int endOfData = Q->endOfData; + + volatile int insertPos = Q->insertPos; + volatile int extractPos = Q->extractPos; + + //if not empty -- extract just below insert when empty + if( insertPos - extractPos != 1 && + !(extractPos == endOfData && insertPos == startOfData)) + { //move before read + if( extractPos == endOfData ) //write new pos exactly once, correctly + { Q->extractPos = startOfData; //can't overrun then fix it 'cause + } // other thread might read bad pos + else + { Q->extractPos++; + } + out = *(Q->extractPos); + return out; + } + //Q is empty + return NULL; + } + +/*Returns false when the queue was full. + * have option of calling make_larger_PrivQ to make more room, then try again + */ +bool8 writePrivQ( void * in, PrivQueueStruc* Q ) + { + int startOfData = Q->startOfData; + int endOfData = Q->endOfData; + + volatile int insertPos = Q->insertPos; + volatile int extractPos = Q->extractPos; + + if( extractPos - insertPos != 1 && + !(insertPos == endOfData && extractPos == startOfData)) + { *(insertPos) = in; //insert before move + if( insertPos == endOfData ) //write new pos exactly once, correctly + { Q->insertPos = startOfData; + } + else + { Q->insertPos++; + } + return TRUE; + } + //Q is full + return FALSE; + } + diff -r 000000000000 -r 85af604dee9b PrivateQueue.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/PrivateQueue.h Sat May 22 19:51:09 2010 -0700 @@ -0,0 +1,35 @@ +/* + * Copyright 2009 OpenSourceStewardshipFoundation.org + * Licensed under GNU General Public License version 2 + * + * Author: seanhalle@yahoo.com + */ + +#ifndef _PRIVATE_QUEUE_H +#define _PRIVATE_QUEUE_H + +#include + +#define TRUE 1 +#define FALSE 0 + +#define LOCKED 1 +#define UNLOCKED 0 + + +/* It is the data that is shared so only need one mutex. */ +typedef struct + { void **insertPos; + void **extractPos; + void **startOfData; //data is pointers + void **endOfData; //set when alloc data + } +PrivQueueStruc; + + +PrivQueueStruc* makePrivQ ( ); +void* readPrivQ ( PrivQueueStruc *Q ); +void writePrivQ( void *in, PrivQueueStruc *Q ); + +#endif /* _PRIVATE_QUEUE_H */ +