comparison BlockingQueue.c @ 20:b5ae7fbb1f01

Created MC_shared brch
author Me@portablequad
date Sat, 11 Feb 2012 20:37:52 -0800
parents 1ed562d601d9
children 59781a4c9cf1
comparison
equal deleted inserted replaced
10:d0d1a516f04d 11:ebeb186ecf5d
16 16
17 #define INC(x) (++x == 1024) ? (x) = 0 : (x) 17 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
18 18
19 #define SPINLOCK_TRIES 100000 19 #define SPINLOCK_TRIES 100000
20 20
21 //===========================================================================
22 //Normal pthread Q
23
24 PThdQueueStruc* makePThdQ()
25 {
26 PThdQueueStruc* retQ;
27 int retCode;
28 retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) );
29
30
31 retCode =
32 pthread_mutex_init( &retQ->mutex_t, NULL);
33 if(retCode){perror("Error in creating mutex:"); exit(1);}
34
35 retCode = pthread_cond_init ( &retQ->cond_w_t, NULL);
36 if(retCode){perror("Error in creating cond_var:"); exit(1);}
37
38 retCode = pthread_cond_init ( &retQ->cond_r_t, NULL);
39 if(retCode){perror("Error in creating cond_var:"); exit(1);}
40
41 retQ->count = 0;
42 retQ->readPos = 0;
43 retQ->writePos = 0;
44 retQ->w_empty = 0;
45 retQ->w_full = 0;
46
47 return retQ;
48 }
49
50 void * readPThdQ( PThdQueueStruc *Q )
51 { void *ret;
52 int retCode, wt;
53 pthread_mutex_lock( &Q->mutex_t );
54 {
55 while( Q -> count == 0 )
56 { Q -> w_empty = 1;
57 retCode =
58 pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
59 if( retCode ){ perror("Thread wait error: "); exit(1); }
60 }
61 Q -> w_empty = 0;
62 Q -> count -= 1;
63 ret = Q->data[ Q->readPos ];
64 INC( Q->readPos );
65 wt = Q -> w_full;
66 Q -> w_full = 0;
67 }
68 pthread_mutex_unlock( &Q->mutex_t );
69 if (wt)
70 pthread_cond_signal( &Q->cond_w_t );
71
72 //printf("Q out: %d\n", ret);
73 return( ret );
74 }
75
76 void writePThdQ( void * in, PThdQueueStruc* Q )
77 {
78 int status, wt;
79 //printf("Q in: %d\n", in);
80
81 pthread_mutex_lock( &Q->mutex_t );
82 {
83 while( Q->count >= 1024 )
84 {
85 Q -> w_full = 1;
86 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t );
87 if (status != 0)
88 { perror("Thread wait error: ");
89 exit(1);
90 }
91 }
92
93 Q -> w_full = 0;
94 Q->count += 1;
95 Q->data[ Q->writePos ] = in;
96 INC( Q->writePos );
97 wt = Q -> w_empty;
98 Q -> w_empty = 0;
99 }
100
101 pthread_mutex_unlock( &Q->mutex_t );
102 if( wt ) pthread_cond_signal( &Q->cond_r_t );
103 }
104 21
105 22
106 //=========================================================================== 23 //===========================================================================
107 // multi reader multi writer fast Q via CAS 24 // multi reader multi writer fast Q via CAS
108 #ifndef _GNU_SOURCE 25 #ifndef _GNU_SOURCE
115 */ 32 */
116 33
117 CASQueueStruc* makeCASQ() 34 CASQueueStruc* makeCASQ()
118 { 35 {
119 CASQueueStruc* retQ; 36 CASQueueStruc* retQ;
120 retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) ); 37 retQ = (CASQueueStruc *) VMS__malloc( sizeof( CASQueueStruc ) );
121 38
122 retQ->insertLock = UNLOCKED; 39 retQ->insertLock = UNLOCKED;
123 retQ->extractLock= UNLOCKED; 40 retQ->extractLock= UNLOCKED;
124 //TODO: check got pointer syntax right 41 //TODO: check got pointer syntax right
125 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty 42 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
240 */ 157 */
241 158
242 SRSWQueueStruc* makeSRSWQ() 159 SRSWQueueStruc* makeSRSWQ()
243 { 160 {
244 SRSWQueueStruc* retQ; 161 SRSWQueueStruc* retQ;
245 retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) ); 162 retQ = (SRSWQueueStruc *) VMS__malloc( sizeof( SRSWQueueStruc ) );
246 memset( retQ->startOfData, 0, 1024 * sizeof(void *) ); 163 memset( retQ->startOfData, 0, 1024 * sizeof(void *) );
247 164
248 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty 165 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
249 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be 166 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
250 retQ->endOfData = &(retQ->startOfData[1023]); 167 retQ->endOfData = &(retQ->startOfData[1023]);
253 } 170 }
254 171
255 void 172 void
256 freeSRSWQ( SRSWQueueStruc* Q ) 173 freeSRSWQ( SRSWQueueStruc* Q )
257 { 174 {
258 free( Q ); 175 VMS__free( Q );
259 } 176 }
260 177
261 void* readSRSWQ( SRSWQueueStruc* Q ) 178 void* readSRSWQ( SRSWQueueStruc* Q )
262 { void *out = 0; 179 { void *out = 0;
263 int tries = 0; 180 int tries = 0;
379 */ 296 */
380 297
381 SRMWQueueStruc* makeSRMWQ() 298 SRMWQueueStruc* makeSRMWQ()
382 { SRMWQueueStruc* retQ; 299 { SRMWQueueStruc* retQ;
383 300
384 retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) ); 301 retQ = (SRMWQueueStruc *) VMS__malloc( sizeof( SRMWQueueStruc ) );
385 302
386 retQ->numInternalQs = 0; 303 retQ->numInternalQs = 0;
387 retQ->internalQsSz = 10; 304 retQ->internalQsSz = 10;
388 retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *)); 305 retQ->internalQs = VMS__malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *));
389 306
390 retQ->lastQReadFrom = 0; 307 retQ->lastQReadFrom = 0;
391 308
392 return retQ; 309 return retQ;
393 } 310 }
409 if( Q->numInternalQs >= Q->internalQsSz ) 326 if( Q->numInternalQs >= Q->internalQsSz )
410 { //full, so make bigger 327 { //full, so make bigger
411 oldSz = Q->internalQsSz; 328 oldSz = Q->internalQsSz;
412 oldArray = Q->internalQs; 329 oldArray = Q->internalQs;
413 Q->internalQsSz *= 2; 330 Q->internalQsSz *= 2;
414 Q->internalQs = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *)); 331 Q->internalQs = VMS__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
415 for( i = 0; i < oldSz; i++ ) 332 for( i = 0; i < oldSz; i++ )
416 { Q->internalQs[i] = oldArray[i]; 333 { Q->internalQs[i] = oldArray[i];
417 } 334 }
418 free( oldArray ); 335 VMS__free( oldArray );
419 } 336 }
420 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ(); 337 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
421 return Q->numInternalQs - 1; 338 return Q->numInternalQs - 1;
422 } 339 }
423 340