Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > Queue_impl
changeset 6:174a7c2ca340
Works with sequential version -- not sure changes, but works
| author | Me |
|---|---|
| date | Wed, 28 Jul 2010 13:13:01 -0700 |
| parents | 228ca5487d81 |
| children | 08f0b4da7610 |
| files | BlockingQueue.c PrivateQueue.c PrivateQueue.h |
| diffstat | 3 files changed, 33 insertions(+), 45 deletions(-) [+] |
line diff
1.1 --- a/BlockingQueue.c Wed Jun 30 14:35:04 2010 -0700 1.2 +++ b/BlockingQueue.c Wed Jul 28 13:13:01 2010 -0700 1.3 @@ -11,7 +11,6 @@ 1.4 #include <pthread.h> 1.5 #include <stdlib.h> 1.6 #include <sched.h> 1.7 -#include <windows.h> 1.8 1.9 #include "BlockingQueue.h" 1.10 1.11 @@ -25,55 +24,39 @@ 1.12 PThdQueueStruc* makePThdQ() 1.13 { 1.14 PThdQueueStruc* retQ; 1.15 - int status; 1.16 + int retCode; 1.17 retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) ); 1.18 1.19 1.20 - status = pthread_mutex_init( &retQ->mutex_t, NULL); 1.21 - if (status < 0) 1.22 - { 1.23 - perror("Error in creating mutex:"); 1.24 - exit(1); 1.25 - return NULL; 1.26 - } 1.27 + retCode = 1.28 + pthread_mutex_init( &retQ->mutex_t, NULL); 1.29 + if(retCode){perror("Error in creating mutex:"); exit(1);} 1.30 1.31 - status = pthread_cond_init ( &retQ->cond_w_t, NULL); 1.32 - if (status < 0) 1.33 - { 1.34 - perror("Error in creating cond_var:"); 1.35 - exit(1); 1.36 - return NULL; 1.37 - } 1.38 + retCode = pthread_cond_init ( &retQ->cond_w_t, NULL); 1.39 + if(retCode){perror("Error in creating cond_var:"); exit(1);} 1.40 1.41 - status = pthread_cond_init ( &retQ->cond_r_t, NULL); 1.42 - if (status < 0) 1.43 - { 1.44 - perror("Error in creating cond_var:"); 1.45 - exit(1); 1.46 - return NULL; 1.47 - } 1.48 + retCode = pthread_cond_init ( &retQ->cond_r_t, NULL); 1.49 + if(retCode){perror("Error in creating cond_var:"); exit(1);} 1.50 1.51 retQ->count = 0; 1.52 retQ->readPos = 0; 1.53 retQ->writePos = 0; 1.54 - retQ -> w_empty = retQ -> w_full = 0; 1.55 + retQ->w_empty = 0; 1.56 + retQ->w_full = 0; 1.57 1.58 return retQ; 1.59 } 1.60 1.61 void * readPThdQ( PThdQueueStruc *Q ) 1.62 { void *ret; 1.63 - int status, wt; 1.64 + int retCode, wt; 1.65 pthread_mutex_lock( &Q->mutex_t ); 1.66 { 1.67 while( Q -> count == 0 ) 1.68 { Q -> w_empty = 1; 1.69 - // pthread_cond_broadcast( &Q->cond_w_t ); 1.70 - status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); 1.71 - if (status != 0) 1.72 - { perror("Thread wait error: "); 1.73 - exit(1); 1.74 - } 1.75 + retCode = 1.76 + pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); 1.77 + if( retCode ){ perror("Thread wait error: "); exit(1); } 1.78 } 1.79 Q -> w_empty = 0; 1.80 Q -> count -= 1; 1.81 @@ -81,37 +64,40 @@ 1.82 INC( Q->readPos ); 1.83 wt = Q -> w_full; 1.84 Q -> w_full = 0; 1.85 - //pthread_cond_broadcast( &Q->cond_w_t ); 1.86 } 1.87 pthread_mutex_unlock( &Q->mutex_t ); 1.88 - if (wt) pthread_cond_signal( &Q->cond_w_t ); 1.89 + if (wt) 1.90 + pthread_cond_signal( &Q->cond_w_t ); 1.91 1.92 + //printf("Q out: %d\n", ret); 1.93 return( ret ); 1.94 } 1.95 1.96 void writePThdQ( void * in, PThdQueueStruc* Q ) 1.97 { 1.98 int status, wt; 1.99 + //printf("Q in: %d\n", in); 1.100 + 1.101 pthread_mutex_lock( &Q->mutex_t ); 1.102 { 1.103 while( Q->count >= 1024 ) 1.104 { 1.105 Q -> w_full = 1; 1.106 - // pthread_cond_broadcast( &Q->cond_r_t ); 1.107 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); 1.108 if (status != 0) 1.109 { perror("Thread wait error: "); 1.110 exit(1); 1.111 } 1.112 } 1.113 + 1.114 Q -> w_full = 0; 1.115 Q->count += 1; 1.116 Q->data[ Q->writePos ] = in; 1.117 INC( Q->writePos ); 1.118 wt = Q -> w_empty; 1.119 Q -> w_empty = 0; 1.120 - // pthread_cond_broadcast( &Q->cond_r_t ); 1.121 } 1.122 + 1.123 pthread_mutex_unlock( &Q->mutex_t ); 1.124 if( wt ) pthread_cond_signal( &Q->cond_r_t ); 1.125 } 1.126 @@ -182,7 +168,7 @@ 1.127 } 1.128 //Q is busy or empty 1.129 tries++; 1.130 - if( tries > SPINLOCK_TRIES ) SwitchToThread(); //WinAPI yield() 1.131 + if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable 1.132 } 1.133 } 1.134 1.135 @@ -224,7 +210,7 @@ 1.136 } 1.137 } 1.138 tries++; 1.139 - if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() 1.140 + if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable 1.141 } 1.142 } 1.143 1.144 @@ -277,7 +263,7 @@ 1.145 } 1.146 //Q is empty 1.147 tries++; 1.148 - if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() 1.149 + if( tries > SPINLOCK_TRIES ) pthread_yield(); 1.150 } 1.151 } 1.152 1.153 @@ -317,7 +303,7 @@ 1.154 } 1.155 //Q is full 1.156 tries++; 1.157 - if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() 1.158 + if( tries > SPINLOCK_TRIES ) pthread_yield(); 1.159 } 1.160 } 1.161 1.162 @@ -455,7 +441,7 @@ 1.163 { //check if all queues have been tried 1.164 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty 1.165 { tries++; //give a writer a chance to finish before yield 1.166 - if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() 1.167 + if( tries > SPINLOCK_TRIES ) pthread_yield(); 1.168 } 1.169 } 1.170 }
2.1 --- a/PrivateQueue.c Wed Jun 30 14:35:04 2010 -0700 2.2 +++ b/PrivateQueue.c Wed Jul 28 13:13:01 2010 -0700 2.3 @@ -45,6 +45,7 @@ 2.4 2.5 oldSize = Q->endOfData - Q->startOfData; 2.6 newSize = 2 * oldSize; 2.7 + oldStartOfData = Q->startOfData; 2.8 Q->startOfData = malloc( newSize * sizeof(void *) ); 2.9 memcpy(Q->startOfData, oldStartOfData, oldSize * sizeof(void *)); 2.10 free(oldStartOfData); 2.11 @@ -65,7 +66,7 @@ 2.12 void **insertPos = Q->insertPos; 2.13 void **extractPos = Q->extractPos; 2.14 2.15 - //if not empty -- extract just below insert when empty 2.16 + //if not empty -- (extract is just below insert when empty) 2.17 if( insertPos - extractPos != 1 && 2.18 !(extractPos == endOfData && insertPos == startOfData)) 2.19 { //move before read 2.20 @@ -86,7 +87,7 @@ 2.21 /*Expands the queue size automatically when it's full 2.22 */ 2.23 void 2.24 -writeAndEnlargePrivQ( void * in, PrivQueueStruc* Q ) 2.25 +writePrivQ( void * in, PrivQueueStruc* Q ) 2.26 { 2.27 void **startOfData = Q->startOfData; 2.28 void **endOfData = Q->endOfData; 2.29 @@ -95,6 +96,7 @@ 2.30 void **extractPos = Q->extractPos; 2.31 2.32 tryAgain: 2.33 + //Full? (insert is just below extract when full) 2.34 if( extractPos - insertPos != 1 && 2.35 !(insertPos == endOfData && extractPos == startOfData)) 2.36 { *(Q->insertPos) = in; //insert before move 2.37 @@ -115,7 +117,7 @@ 2.38 /*Returns false when the queue was full. 2.39 * have option of calling make_larger_PrivQ to make more room, then try again 2.40 */ 2.41 -int writeAndFailPrivQ( void * in, PrivQueueStruc* Q ) 2.42 +int writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ) 2.43 { 2.44 void **startOfData = Q->startOfData; 2.45 void **endOfData = Q->endOfData;
3.1 --- a/PrivateQueue.h Wed Jun 30 14:35:04 2010 -0700 3.2 +++ b/PrivateQueue.h Wed Jul 28 13:13:01 2010 -0700 3.3 @@ -29,8 +29,8 @@ 3.4 3.5 PrivQueueStruc* makePrivQ ( ); 3.6 void* readPrivQ ( PrivQueueStruc *Q ); 3.7 -void writeAndEnlargePrivQ( void *in, PrivQueueStruc *Q ); 3.8 -int writeAndFailPrivQ( void * in, PrivQueueStruc* Q ); //return 3.9 +void writePrivQ( void *in, PrivQueueStruc *Q ); 3.10 +int writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ); //return 3.11 // false when full 3.12 3.13 #endif /* _PRIVATE_QUEUE_H */
