Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > Queue_impl
comparison BlockingQueue.c @ 20:b5ae7fbb1f01
Created MC_shared brch
| author | Me@portablequad |
|---|---|
| date | Sat, 11 Feb 2012 20:37:52 -0800 |
| parents | 1ed562d601d9 |
| children | 59781a4c9cf1 |
comparison
equal
deleted
inserted
replaced
| 10:d0d1a516f04d | 11:ebeb186ecf5d |
|---|---|
| 16 | 16 |
| 17 #define INC(x) (++x == 1024) ? (x) = 0 : (x) | 17 #define INC(x) (++x == 1024) ? (x) = 0 : (x) |
| 18 | 18 |
| 19 #define SPINLOCK_TRIES 100000 | 19 #define SPINLOCK_TRIES 100000 |
| 20 | 20 |
| 21 //=========================================================================== | |
| 22 //Normal pthread Q | |
| 23 | |
| 24 PThdQueueStruc* makePThdQ() | |
| 25 { | |
| 26 PThdQueueStruc* retQ; | |
| 27 int retCode; | |
| 28 retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) ); | |
| 29 | |
| 30 | |
| 31 retCode = | |
| 32 pthread_mutex_init( &retQ->mutex_t, NULL); | |
| 33 if(retCode){perror("Error in creating mutex:"); exit(1);} | |
| 34 | |
| 35 retCode = pthread_cond_init ( &retQ->cond_w_t, NULL); | |
| 36 if(retCode){perror("Error in creating cond_var:"); exit(1);} | |
| 37 | |
| 38 retCode = pthread_cond_init ( &retQ->cond_r_t, NULL); | |
| 39 if(retCode){perror("Error in creating cond_var:"); exit(1);} | |
| 40 | |
| 41 retQ->count = 0; | |
| 42 retQ->readPos = 0; | |
| 43 retQ->writePos = 0; | |
| 44 retQ->w_empty = 0; | |
| 45 retQ->w_full = 0; | |
| 46 | |
| 47 return retQ; | |
| 48 } | |
| 49 | |
| 50 void * readPThdQ( PThdQueueStruc *Q ) | |
| 51 { void *ret; | |
| 52 int retCode, wt; | |
| 53 pthread_mutex_lock( &Q->mutex_t ); | |
| 54 { | |
| 55 while( Q -> count == 0 ) | |
| 56 { Q -> w_empty = 1; | |
| 57 retCode = | |
| 58 pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); | |
| 59 if( retCode ){ perror("Thread wait error: "); exit(1); } | |
| 60 } | |
| 61 Q -> w_empty = 0; | |
| 62 Q -> count -= 1; | |
| 63 ret = Q->data[ Q->readPos ]; | |
| 64 INC( Q->readPos ); | |
| 65 wt = Q -> w_full; | |
| 66 Q -> w_full = 0; | |
| 67 } | |
| 68 pthread_mutex_unlock( &Q->mutex_t ); | |
| 69 if (wt) | |
| 70 pthread_cond_signal( &Q->cond_w_t ); | |
| 71 | |
| 72 //printf("Q out: %d\n", ret); | |
| 73 return( ret ); | |
| 74 } | |
| 75 | |
| 76 void writePThdQ( void * in, PThdQueueStruc* Q ) | |
| 77 { | |
| 78 int status, wt; | |
| 79 //printf("Q in: %d\n", in); | |
| 80 | |
| 81 pthread_mutex_lock( &Q->mutex_t ); | |
| 82 { | |
| 83 while( Q->count >= 1024 ) | |
| 84 { | |
| 85 Q -> w_full = 1; | |
| 86 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); | |
| 87 if (status != 0) | |
| 88 { perror("Thread wait error: "); | |
| 89 exit(1); | |
| 90 } | |
| 91 } | |
| 92 | |
| 93 Q -> w_full = 0; | |
| 94 Q->count += 1; | |
| 95 Q->data[ Q->writePos ] = in; | |
| 96 INC( Q->writePos ); | |
| 97 wt = Q -> w_empty; | |
| 98 Q -> w_empty = 0; | |
| 99 } | |
| 100 | |
| 101 pthread_mutex_unlock( &Q->mutex_t ); | |
| 102 if( wt ) pthread_cond_signal( &Q->cond_r_t ); | |
| 103 } | |
| 104 | 21 |
| 105 | 22 |
| 106 //=========================================================================== | 23 //=========================================================================== |
| 107 // multi reader multi writer fast Q via CAS | 24 // multi reader multi writer fast Q via CAS |
| 108 #ifndef _GNU_SOURCE | 25 #ifndef _GNU_SOURCE |
| 115 */ | 32 */ |
| 116 | 33 |
| 117 CASQueueStruc* makeCASQ() | 34 CASQueueStruc* makeCASQ() |
| 118 { | 35 { |
| 119 CASQueueStruc* retQ; | 36 CASQueueStruc* retQ; |
| 120 retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) ); | 37 retQ = (CASQueueStruc *) VMS__malloc( sizeof( CASQueueStruc ) ); |
| 121 | 38 |
| 122 retQ->insertLock = UNLOCKED; | 39 retQ->insertLock = UNLOCKED; |
| 123 retQ->extractLock= UNLOCKED; | 40 retQ->extractLock= UNLOCKED; |
| 124 //TODO: check got pointer syntax right | 41 //TODO: check got pointer syntax right |
| 125 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty | 42 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty |
| 240 */ | 157 */ |
| 241 | 158 |
| 242 SRSWQueueStruc* makeSRSWQ() | 159 SRSWQueueStruc* makeSRSWQ() |
| 243 { | 160 { |
| 244 SRSWQueueStruc* retQ; | 161 SRSWQueueStruc* retQ; |
| 245 retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) ); | 162 retQ = (SRSWQueueStruc *) VMS__malloc( sizeof( SRSWQueueStruc ) ); |
| 246 memset( retQ->startOfData, 0, 1024 * sizeof(void *) ); | 163 memset( retQ->startOfData, 0, 1024 * sizeof(void *) ); |
| 247 | 164 |
| 248 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty | 165 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty |
| 249 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be | 166 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be |
| 250 retQ->endOfData = &(retQ->startOfData[1023]); | 167 retQ->endOfData = &(retQ->startOfData[1023]); |
| 253 } | 170 } |
| 254 | 171 |
| 255 void | 172 void |
| 256 freeSRSWQ( SRSWQueueStruc* Q ) | 173 freeSRSWQ( SRSWQueueStruc* Q ) |
| 257 { | 174 { |
| 258 free( Q ); | 175 VMS__free( Q ); |
| 259 } | 176 } |
| 260 | 177 |
| 261 void* readSRSWQ( SRSWQueueStruc* Q ) | 178 void* readSRSWQ( SRSWQueueStruc* Q ) |
| 262 { void *out = 0; | 179 { void *out = 0; |
| 263 int tries = 0; | 180 int tries = 0; |
| 379 */ | 296 */ |
| 380 | 297 |
| 381 SRMWQueueStruc* makeSRMWQ() | 298 SRMWQueueStruc* makeSRMWQ() |
| 382 { SRMWQueueStruc* retQ; | 299 { SRMWQueueStruc* retQ; |
| 383 | 300 |
| 384 retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) ); | 301 retQ = (SRMWQueueStruc *) VMS__malloc( sizeof( SRMWQueueStruc ) ); |
| 385 | 302 |
| 386 retQ->numInternalQs = 0; | 303 retQ->numInternalQs = 0; |
| 387 retQ->internalQsSz = 10; | 304 retQ->internalQsSz = 10; |
| 388 retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *)); | 305 retQ->internalQs = VMS__malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *)); |
| 389 | 306 |
| 390 retQ->lastQReadFrom = 0; | 307 retQ->lastQReadFrom = 0; |
| 391 | 308 |
| 392 return retQ; | 309 return retQ; |
| 393 } | 310 } |
| 409 if( Q->numInternalQs >= Q->internalQsSz ) | 326 if( Q->numInternalQs >= Q->internalQsSz ) |
| 410 { //full, so make bigger | 327 { //full, so make bigger |
| 411 oldSz = Q->internalQsSz; | 328 oldSz = Q->internalQsSz; |
| 412 oldArray = Q->internalQs; | 329 oldArray = Q->internalQs; |
| 413 Q->internalQsSz *= 2; | 330 Q->internalQsSz *= 2; |
| 414 Q->internalQs = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *)); | 331 Q->internalQs = VMS__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *)); |
| 415 for( i = 0; i < oldSz; i++ ) | 332 for( i = 0; i < oldSz; i++ ) |
| 416 { Q->internalQs[i] = oldArray[i]; | 333 { Q->internalQs[i] = oldArray[i]; |
| 417 } | 334 } |
| 418 free( oldArray ); | 335 VMS__free( oldArray ); |
| 419 } | 336 } |
| 420 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ(); | 337 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ(); |
| 421 return Q->numInternalQs - 1; | 338 return Q->numInternalQs - 1; |
| 422 } | 339 } |
| 423 | 340 |
