comparison BlockingQueue.c @ 6:174a7c2ca340

Works with sequential version -- not sure changes, but works
author Me
date Wed, 28 Jul 2010 13:13:01 -0700
parents 8abcca1590b8
children 08f0b4da7610
comparison
equal deleted inserted replaced
2:e022be73344f 3:084cb48da029
9 #include <stdio.h> 9 #include <stdio.h>
10 #include <errno.h> 10 #include <errno.h>
11 #include <pthread.h> 11 #include <pthread.h>
12 #include <stdlib.h> 12 #include <stdlib.h>
13 #include <sched.h> 13 #include <sched.h>
14 #include <windows.h>
15 14
16 #include "BlockingQueue.h" 15 #include "BlockingQueue.h"
17 16
18 #define INC(x) (++x == 1024) ? (x) = 0 : (x) 17 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
19 18
23 //Normal pthread Q 22 //Normal pthread Q
24 23
25 PThdQueueStruc* makePThdQ() 24 PThdQueueStruc* makePThdQ()
26 { 25 {
27 PThdQueueStruc* retQ; 26 PThdQueueStruc* retQ;
28 int status; 27 int retCode;
29 retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) ); 28 retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) );
30 29
31 30
32 status = pthread_mutex_init( &retQ->mutex_t, NULL); 31 retCode =
33 if (status < 0) 32 pthread_mutex_init( &retQ->mutex_t, NULL);
34 { 33 if(retCode){perror("Error in creating mutex:"); exit(1);}
35 perror("Error in creating mutex:"); 34
36 exit(1); 35 retCode = pthread_cond_init ( &retQ->cond_w_t, NULL);
37 return NULL; 36 if(retCode){perror("Error in creating cond_var:"); exit(1);}
38 } 37
39 38 retCode = pthread_cond_init ( &retQ->cond_r_t, NULL);
40 status = pthread_cond_init ( &retQ->cond_w_t, NULL); 39 if(retCode){perror("Error in creating cond_var:"); exit(1);}
41 if (status < 0)
42 {
43 perror("Error in creating cond_var:");
44 exit(1);
45 return NULL;
46 }
47
48 status = pthread_cond_init ( &retQ->cond_r_t, NULL);
49 if (status < 0)
50 {
51 perror("Error in creating cond_var:");
52 exit(1);
53 return NULL;
54 }
55 40
56 retQ->count = 0; 41 retQ->count = 0;
57 retQ->readPos = 0; 42 retQ->readPos = 0;
58 retQ->writePos = 0; 43 retQ->writePos = 0;
59 retQ -> w_empty = retQ -> w_full = 0; 44 retQ->w_empty = 0;
45 retQ->w_full = 0;
60 46
61 return retQ; 47 return retQ;
62 } 48 }
63 49
64 void * readPThdQ( PThdQueueStruc *Q ) 50 void * readPThdQ( PThdQueueStruc *Q )
65 { void *ret; 51 { void *ret;
66 int status, wt; 52 int retCode, wt;
67 pthread_mutex_lock( &Q->mutex_t ); 53 pthread_mutex_lock( &Q->mutex_t );
68 { 54 {
69 while( Q -> count == 0 ) 55 while( Q -> count == 0 )
70 { Q -> w_empty = 1; 56 { Q -> w_empty = 1;
71 // pthread_cond_broadcast( &Q->cond_w_t ); 57 retCode =
72 status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); 58 pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
73 if (status != 0) 59 if( retCode ){ perror("Thread wait error: "); exit(1); }
74 { perror("Thread wait error: ");
75 exit(1);
76 }
77 } 60 }
78 Q -> w_empty = 0; 61 Q -> w_empty = 0;
79 Q -> count -= 1; 62 Q -> count -= 1;
80 ret = Q->data[ Q->readPos ]; 63 ret = Q->data[ Q->readPos ];
81 INC( Q->readPos ); 64 INC( Q->readPos );
82 wt = Q -> w_full; 65 wt = Q -> w_full;
83 Q -> w_full = 0; 66 Q -> w_full = 0;
84 //pthread_cond_broadcast( &Q->cond_w_t );
85 } 67 }
86 pthread_mutex_unlock( &Q->mutex_t ); 68 pthread_mutex_unlock( &Q->mutex_t );
87 if (wt) pthread_cond_signal( &Q->cond_w_t ); 69 if (wt)
88 70 pthread_cond_signal( &Q->cond_w_t );
71
72 //printf("Q out: %d\n", ret);
89 return( ret ); 73 return( ret );
90 } 74 }
91 75
92 void writePThdQ( void * in, PThdQueueStruc* Q ) 76 void writePThdQ( void * in, PThdQueueStruc* Q )
93 { 77 {
94 int status, wt; 78 int status, wt;
79 //printf("Q in: %d\n", in);
80
95 pthread_mutex_lock( &Q->mutex_t ); 81 pthread_mutex_lock( &Q->mutex_t );
96 { 82 {
97 while( Q->count >= 1024 ) 83 while( Q->count >= 1024 )
98 { 84 {
99 Q -> w_full = 1; 85 Q -> w_full = 1;
100 // pthread_cond_broadcast( &Q->cond_r_t );
101 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); 86 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t );
102 if (status != 0) 87 if (status != 0)
103 { perror("Thread wait error: "); 88 { perror("Thread wait error: ");
104 exit(1); 89 exit(1);
105 } 90 }
106 } 91 }
92
107 Q -> w_full = 0; 93 Q -> w_full = 0;
108 Q->count += 1; 94 Q->count += 1;
109 Q->data[ Q->writePos ] = in; 95 Q->data[ Q->writePos ] = in;
110 INC( Q->writePos ); 96 INC( Q->writePos );
111 wt = Q -> w_empty; 97 wt = Q -> w_empty;
112 Q -> w_empty = 0; 98 Q -> w_empty = 0;
113 // pthread_cond_broadcast( &Q->cond_r_t ); 99 }
114 } 100
115 pthread_mutex_unlock( &Q->mutex_t ); 101 pthread_mutex_unlock( &Q->mutex_t );
116 if( wt ) pthread_cond_signal( &Q->cond_r_t ); 102 if( wt ) pthread_cond_signal( &Q->cond_r_t );
117 } 103 }
118 104
119 105
180 Q->extractLock = UNLOCKED;//have to try again, release for others 166 Q->extractLock = UNLOCKED;//have to try again, release for others
181 } 167 }
182 } 168 }
183 //Q is busy or empty 169 //Q is busy or empty
184 tries++; 170 tries++;
185 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //WinAPI yield() 171 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
186 } 172 }
187 } 173 }
188 174
189 void writeCASQ( void * in, CASQueueStruc* Q ) 175 void writeCASQ( void * in, CASQueueStruc* Q )
190 { 176 {
222 { success = FALSE; 208 { success = FALSE;
223 Q->insertLock = UNLOCKED;//have to try again, release for others 209 Q->insertLock = UNLOCKED;//have to try again, release for others
224 } 210 }
225 } 211 }
226 tries++; 212 tries++;
227 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() 213 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
228 } 214 }
229 } 215 }
230 216
231 #endif //_GNU_SOURCE 217 #endif //_GNU_SOURCE
232 218
275 out = *(Q->extractPos); 261 out = *(Q->extractPos);
276 return out; 262 return out;
277 } 263 }
278 //Q is empty 264 //Q is empty
279 tries++; 265 tries++;
280 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() 266 if( tries > SPINLOCK_TRIES ) pthread_yield();
281 } 267 }
282 } 268 }
283 269
284 270
285 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q ) 271 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
315 else Q->insertPos++; 301 else Q->insertPos++;
316 return; 302 return;
317 } 303 }
318 //Q is full 304 //Q is full
319 tries++; 305 tries++;
320 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() 306 if( tries > SPINLOCK_TRIES ) pthread_yield();
321 } 307 }
322 } 308 }
323 309
324 310
325 311
453 } 439 }
454 else //SRSW Q just read is empty 440 else //SRSW Q just read is empty
455 { //check if all queues have been tried 441 { //check if all queues have been tried
456 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty 442 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
457 { tries++; //give a writer a chance to finish before yield 443 { tries++; //give a writer a chance to finish before yield
458 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield() 444 if( tries > SPINLOCK_TRIES ) pthread_yield();
459 } 445 }
460 } 446 }
461 } 447 }
462 } 448 }
463 449