Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > Queue_impl
comparison BlockingQueue.c @ 6:174a7c2ca340
Works with sequential version -- not sure changes, but works
| author | Me |
|---|---|
| date | Wed, 28 Jul 2010 13:13:01 -0700 |
| parents | 8abcca1590b8 |
| children | 08f0b4da7610 |
comparison
equal
deleted
inserted
replaced
| 2:e022be73344f | 3:084cb48da029 |
|---|---|
| 9 #include <stdio.h> | 9 #include <stdio.h> |
| 10 #include <errno.h> | 10 #include <errno.h> |
| 11 #include <pthread.h> | 11 #include <pthread.h> |
| 12 #include <stdlib.h> | 12 #include <stdlib.h> |
| 13 #include <sched.h> | 13 #include <sched.h> |
| 14 #include <windows.h> | |
| 15 | 14 |
| 16 #include "BlockingQueue.h" | 15 #include "BlockingQueue.h" |
| 17 | 16 |
| 18 #define INC(x) (++x == 1024) ? (x) = 0 : (x) | 17 #define INC(x) (++x == 1024) ? (x) = 0 : (x) |
| 19 | 18 |
| 23 //Normal pthread Q | 22 //Normal pthread Q |
| 24 | 23 |
| 25 PThdQueueStruc* makePThdQ() | 24 PThdQueueStruc* makePThdQ() |
| 26 { | 25 { |
| 27 PThdQueueStruc* retQ; | 26 PThdQueueStruc* retQ; |
| 28 int status; | 27 int retCode; |
| 29 retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) ); | 28 retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) ); |
| 30 | 29 |
| 31 | 30 |
| 32 status = pthread_mutex_init( &retQ->mutex_t, NULL); | 31 retCode = |
| 33 if (status < 0) | 32 pthread_mutex_init( &retQ->mutex_t, NULL); |
| 34 { | 33 if(retCode){perror("Error in creating mutex:"); exit(1);} |
| 35 perror("Error in creating mutex:"); | 34 |
| 36 exit(1); | 35 retCode = pthread_cond_init ( &retQ->cond_w_t, NULL); |
| 37 return NULL; | 36 if(retCode){perror("Error in creating cond_var:"); exit(1);} |
| 38 } | 37 |
| 39 | 38 retCode = pthread_cond_init ( &retQ->cond_r_t, NULL); |
| 40 status = pthread_cond_init ( &retQ->cond_w_t, NULL); | 39 if(retCode){perror("Error in creating cond_var:"); exit(1);} |
| 41 if (status < 0) | |
| 42 { | |
| 43 perror("Error in creating cond_var:"); | |
| 44 exit(1); | |
| 45 return NULL; | |
| 46 } | |
| 47 | |
| 48 status = pthread_cond_init ( &retQ->cond_r_t, NULL); | |
| 49 if (status < 0) | |
| 50 { | |
| 51 perror("Error in creating cond_var:"); | |
| 52 exit(1); | |
| 53 return NULL; | |
| 54 } | |
| 55 | 40 |
| 56 retQ->count = 0; | 41 retQ->count = 0; |
| 57 retQ->readPos = 0; | 42 retQ->readPos = 0; |
| 58 retQ->writePos = 0; | 43 retQ->writePos = 0; |
| 59 retQ -> w_empty = retQ -> w_full = 0; | 44 retQ->w_empty = 0; |
| 45 retQ->w_full = 0; | |
| 60 | 46 |
| 61 return retQ; | 47 return retQ; |
| 62 } | 48 } |
| 63 | 49 |
| 64 void * readPThdQ( PThdQueueStruc *Q ) | 50 void * readPThdQ( PThdQueueStruc *Q ) |
| 65 { void *ret; | 51 { void *ret; |
| 66 int status, wt; | 52 int retCode, wt; |
| 67 pthread_mutex_lock( &Q->mutex_t ); | 53 pthread_mutex_lock( &Q->mutex_t ); |
| 68 { | 54 { |
| 69 while( Q -> count == 0 ) | 55 while( Q -> count == 0 ) |
| 70 { Q -> w_empty = 1; | 56 { Q -> w_empty = 1; |
| 71 // pthread_cond_broadcast( &Q->cond_w_t ); | 57 retCode = |
| 72 status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); | 58 pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); |
| 73 if (status != 0) | 59 if( retCode ){ perror("Thread wait error: "); exit(1); } |
| 74 { perror("Thread wait error: "); | |
| 75 exit(1); | |
| 76 } | |
| 77 } | 60 } |
| 78 Q -> w_empty = 0; | 61 Q -> w_empty = 0; |
| 79 Q -> count -= 1; | 62 Q -> count -= 1; |
| 80 ret = Q->data[ Q->readPos ]; | 63 ret = Q->data[ Q->readPos ]; |
| 81 INC( Q->readPos ); | 64 INC( Q->readPos ); |
| 82 wt = Q -> w_full; | 65 wt = Q -> w_full; |
| 83 Q -> w_full = 0; | 66 Q -> w_full = 0; |
| 84 //pthread_cond_broadcast( &Q->cond_w_t ); | |
| 85 } | 67 } |
| 86 pthread_mutex_unlock( &Q->mutex_t ); | 68 pthread_mutex_unlock( &Q->mutex_t ); |
| 87 if (wt) pthread_cond_signal( &Q->cond_w_t ); | 69 if (wt) |
| 88 | 70 pthread_cond_signal( &Q->cond_w_t ); |
| 71 | |
| 72 //printf("Q out: %d\n", ret); | |
| 89 return( ret ); | 73 return( ret ); |
| 90 } | 74 } |
| 91 | 75 |
| 92 void writePThdQ( void * in, PThdQueueStruc* Q ) | 76 void writePThdQ( void * in, PThdQueueStruc* Q ) |
| 93 { | 77 { |
| 94 int status, wt; | 78 int status, wt; |
| 79 //printf("Q in: %d\n", in); | |
| 80 | |
| 95 pthread_mutex_lock( &Q->mutex_t ); | 81 pthread_mutex_lock( &Q->mutex_t ); |
| 96 { | 82 { |
| 97 while( Q->count >= 1024 ) | 83 while( Q->count >= 1024 ) |
| 98 { | 84 { |
| 99 Q -> w_full = 1; | 85 Q -> w_full = 1; |
| 100 // pthread_cond_broadcast( &Q->cond_r_t ); | |
| 101 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); | 86 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); |
| 102 if (status != 0) | 87 if (status != 0) |
| 103 { perror("Thread wait error: "); | 88 { perror("Thread wait error: "); |
| 104 exit(1); | 89 exit(1); |
| 105 } | 90 } |
| 106 } | 91 } |
| 92 | |
| 107 Q -> w_full = 0; | 93 Q -> w_full = 0; |
| 108 Q->count += 1; | 94 Q->count += 1; |
| 109 Q->data[ Q->writePos ] = in; | 95 Q->data[ Q->writePos ] = in; |
| 110 INC( Q->writePos ); | 96 INC( Q->writePos ); |
| 111 wt = Q -> w_empty; | 97 wt = Q -> w_empty; |
| 112 Q -> w_empty = 0; | 98 Q -> w_empty = 0; |
| 113 // pthread_cond_broadcast( &Q->cond_r_t ); | 99 } |
| 114 } | 100 |
| 115 pthread_mutex_unlock( &Q->mutex_t ); | 101 pthread_mutex_unlock( &Q->mutex_t ); |
| 116 if( wt ) pthread_cond_signal( &Q->cond_r_t ); | 102 if( wt ) pthread_cond_signal( &Q->cond_r_t ); |
| 117 } | 103 } |
| 118 | 104 |
| 119 | 105 |
| 180 Q->extractLock = UNLOCKED;//have to try again, release for others | 166 Q->extractLock = UNLOCKED;//have to try again, release for others |
| 181 } | 167 } |
| 182 } | 168 } |
| 183 //Q is busy or empty | 169 //Q is busy or empty |
| 184 tries++; | 170 tries++; |
| 185 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //WinAPI yield() | 171 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable |
| 186 } | 172 } |
| 187 } | 173 } |
| 188 | 174 |
| 189 void writeCASQ( void * in, CASQueueStruc* Q ) | 175 void writeCASQ( void * in, CASQueueStruc* Q ) |
| 190 { | 176 { |
| 222 { success = FALSE; | 208 { success = FALSE; |
| 223 Q->insertLock = UNLOCKED;//have to try again, release for others | 209 Q->insertLock = UNLOCKED;//have to try again, release for others |
| 224 } | 210 } |
| 225 } | 211 } |
| 226 tries++; | 212 tries++; |
| 227 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() | 213 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable |
| 228 } | 214 } |
| 229 } | 215 } |
| 230 | 216 |
| 231 #endif //_GNU_SOURCE | 217 #endif //_GNU_SOURCE |
| 232 | 218 |
| 275 out = *(Q->extractPos); | 261 out = *(Q->extractPos); |
| 276 return out; | 262 return out; |
| 277 } | 263 } |
| 278 //Q is empty | 264 //Q is empty |
| 279 tries++; | 265 tries++; |
| 280 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() | 266 if( tries > SPINLOCK_TRIES ) pthread_yield(); |
| 281 } | 267 } |
| 282 } | 268 } |
| 283 | 269 |
| 284 | 270 |
| 285 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q ) | 271 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q ) |
| 315 else Q->insertPos++; | 301 else Q->insertPos++; |
| 316 return; | 302 return; |
| 317 } | 303 } |
| 318 //Q is full | 304 //Q is full |
| 319 tries++; | 305 tries++; |
| 320 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() | 306 if( tries > SPINLOCK_TRIES ) pthread_yield(); |
| 321 } | 307 } |
| 322 } | 308 } |
| 323 | 309 |
| 324 | 310 |
| 325 | 311 |
| 453 } | 439 } |
| 454 else //SRSW Q just read is empty | 440 else //SRSW Q just read is empty |
| 455 { //check if all queues have been tried | 441 { //check if all queues have been tried |
| 456 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty | 442 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty |
| 457 { tries++; //give a writer a chance to finish before yield | 443 { tries++; //give a writer a chance to finish before yield |
| 458 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() | 444 if( tries > SPINLOCK_TRIES ) pthread_yield(); |
| 459 } | 445 } |
| 460 } | 446 } |
| 461 } | 447 } |
| 462 } | 448 } |
| 463 | 449 |
