# HG changeset patch # User Me # Date 1280347981 25200 # Node ID 174a7c2ca340897ab50d7d82715479d0cce0c028 # Parent 228ca5487d81aaa9760faad57971c6477f01f49d Works with sequential version -- not sure changes, but works diff -r 228ca5487d81 -r 174a7c2ca340 BlockingQueue.c --- a/BlockingQueue.c Wed Jun 30 14:35:04 2010 -0700 +++ b/BlockingQueue.c Wed Jul 28 13:13:01 2010 -0700 @@ -11,7 +11,6 @@ #include #include #include -#include #include "BlockingQueue.h" @@ -25,55 +24,39 @@ PThdQueueStruc* makePThdQ() { PThdQueueStruc* retQ; - int status; + int retCode; retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) ); - status = pthread_mutex_init( &retQ->mutex_t, NULL); - if (status < 0) - { - perror("Error in creating mutex:"); - exit(1); - return NULL; - } + retCode = + pthread_mutex_init( &retQ->mutex_t, NULL); + if(retCode){perror("Error in creating mutex:"); exit(1);} - status = pthread_cond_init ( &retQ->cond_w_t, NULL); - if (status < 0) - { - perror("Error in creating cond_var:"); - exit(1); - return NULL; - } + retCode = pthread_cond_init ( &retQ->cond_w_t, NULL); + if(retCode){perror("Error in creating cond_var:"); exit(1);} - status = pthread_cond_init ( &retQ->cond_r_t, NULL); - if (status < 0) - { - perror("Error in creating cond_var:"); - exit(1); - return NULL; - } + retCode = pthread_cond_init ( &retQ->cond_r_t, NULL); + if(retCode){perror("Error in creating cond_var:"); exit(1);} retQ->count = 0; retQ->readPos = 0; retQ->writePos = 0; - retQ -> w_empty = retQ -> w_full = 0; + retQ->w_empty = 0; + retQ->w_full = 0; return retQ; } void * readPThdQ( PThdQueueStruc *Q ) { void *ret; - int status, wt; + int retCode, wt; pthread_mutex_lock( &Q->mutex_t ); { while( Q -> count == 0 ) { Q -> w_empty = 1; - // pthread_cond_broadcast( &Q->cond_w_t ); - status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); - if (status != 0) - { perror("Thread wait error: "); - exit(1); - } + retCode = + pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); + if( retCode ){ perror("Thread wait error: "); exit(1); } } Q -> w_empty = 0; Q -> count -= 1; @@ -81,37 +64,40 @@ INC( Q->readPos ); wt = Q -> w_full; Q -> w_full = 0; - //pthread_cond_broadcast( &Q->cond_w_t ); } pthread_mutex_unlock( &Q->mutex_t ); - if (wt) pthread_cond_signal( &Q->cond_w_t ); + if (wt) + pthread_cond_signal( &Q->cond_w_t ); + //printf("Q out: %d\n", ret); return( ret ); } void writePThdQ( void * in, PThdQueueStruc* Q ) { int status, wt; + //printf("Q in: %d\n", in); + pthread_mutex_lock( &Q->mutex_t ); { while( Q->count >= 1024 ) { Q -> w_full = 1; - // pthread_cond_broadcast( &Q->cond_r_t ); status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); if (status != 0) { perror("Thread wait error: "); exit(1); } } + Q -> w_full = 0; Q->count += 1; Q->data[ Q->writePos ] = in; INC( Q->writePos ); wt = Q -> w_empty; Q -> w_empty = 0; - // pthread_cond_broadcast( &Q->cond_r_t ); } + pthread_mutex_unlock( &Q->mutex_t ); if( wt ) pthread_cond_signal( &Q->cond_r_t ); } @@ -182,7 +168,7 @@ } //Q is busy or empty tries++; - if( tries > SPINLOCK_TRIES ) SwitchToThread(); //WinAPI yield() + if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable } } @@ -224,7 +210,7 @@ } } tries++; - if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() + if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable } } @@ -277,7 +263,7 @@ } //Q is empty tries++; - if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() + if( tries > SPINLOCK_TRIES ) pthread_yield(); } } @@ -317,7 +303,7 @@ } //Q is full tries++; - if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() + if( tries > SPINLOCK_TRIES ) pthread_yield(); } } @@ -455,7 +441,7 @@ { //check if all queues have been tried if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty { tries++; //give a writer a chance to finish before yield - if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() + if( tries > SPINLOCK_TRIES ) pthread_yield(); } } } diff -r 228ca5487d81 -r 174a7c2ca340 PrivateQueue.c --- a/PrivateQueue.c Wed Jun 30 14:35:04 2010 -0700 +++ b/PrivateQueue.c Wed Jul 28 13:13:01 2010 -0700 @@ -45,6 +45,7 @@ oldSize = Q->endOfData - Q->startOfData; newSize = 2 * oldSize; + oldStartOfData = Q->startOfData; Q->startOfData = malloc( newSize * sizeof(void *) ); memcpy(Q->startOfData, oldStartOfData, oldSize * sizeof(void *)); free(oldStartOfData); @@ -65,7 +66,7 @@ void **insertPos = Q->insertPos; void **extractPos = Q->extractPos; - //if not empty -- extract just below insert when empty + //if not empty -- (extract is just below insert when empty) if( insertPos - extractPos != 1 && !(extractPos == endOfData && insertPos == startOfData)) { //move before read @@ -86,7 +87,7 @@ /*Expands the queue size automatically when it's full */ void -writeAndEnlargePrivQ( void * in, PrivQueueStruc* Q ) +writePrivQ( void * in, PrivQueueStruc* Q ) { void **startOfData = Q->startOfData; void **endOfData = Q->endOfData; @@ -95,6 +96,7 @@ void **extractPos = Q->extractPos; tryAgain: + //Full? (insert is just below extract when full) if( extractPos - insertPos != 1 && !(insertPos == endOfData && extractPos == startOfData)) { *(Q->insertPos) = in; //insert before move @@ -115,7 +117,7 @@ /*Returns false when the queue was full. * have option of calling make_larger_PrivQ to make more room, then try again */ -int writeAndFailPrivQ( void * in, PrivQueueStruc* Q ) +int writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ) { void **startOfData = Q->startOfData; void **endOfData = Q->endOfData; diff -r 228ca5487d81 -r 174a7c2ca340 PrivateQueue.h --- a/PrivateQueue.h Wed Jun 30 14:35:04 2010 -0700 +++ b/PrivateQueue.h Wed Jul 28 13:13:01 2010 -0700 @@ -29,8 +29,8 @@ PrivQueueStruc* makePrivQ ( ); void* readPrivQ ( PrivQueueStruc *Q ); -void writeAndEnlargePrivQ( void *in, PrivQueueStruc *Q ); -int writeAndFailPrivQ( void * in, PrivQueueStruc* Q ); //return +void writePrivQ( void *in, PrivQueueStruc *Q ); +int writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ); //return // false when full #endif /* _PRIVATE_QUEUE_H */