Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > Queue_impl
view BlockingQueue.c @ 6:174a7c2ca340
Works with sequential version -- not sure changes, but works
| author | Me |
|---|---|
| date | Wed, 28 Jul 2010 13:13:01 -0700 |
| parents | 8abcca1590b8 |
| children | 08f0b4da7610 |
line source
1 /*
2 * Copyright 2009 OpenSourceStewardshipFoundation.org
3 * Licensed under GNU General Public License version 2
4 *
5 * Author: seanhalle@yahoo.com
6 */
9 #include <stdio.h>
10 #include <errno.h>
11 #include <pthread.h>
12 #include <stdlib.h>
13 #include <sched.h>
15 #include "BlockingQueue.h"
17 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
19 #define SPINLOCK_TRIES 100000
21 //===========================================================================
22 //Normal pthread Q
24 PThdQueueStruc* makePThdQ()
25 {
26 PThdQueueStruc* retQ;
27 int retCode;
28 retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) );
31 retCode =
32 pthread_mutex_init( &retQ->mutex_t, NULL);
33 if(retCode){perror("Error in creating mutex:"); exit(1);}
35 retCode = pthread_cond_init ( &retQ->cond_w_t, NULL);
36 if(retCode){perror("Error in creating cond_var:"); exit(1);}
38 retCode = pthread_cond_init ( &retQ->cond_r_t, NULL);
39 if(retCode){perror("Error in creating cond_var:"); exit(1);}
41 retQ->count = 0;
42 retQ->readPos = 0;
43 retQ->writePos = 0;
44 retQ->w_empty = 0;
45 retQ->w_full = 0;
47 return retQ;
48 }
50 void * readPThdQ( PThdQueueStruc *Q )
51 { void *ret;
52 int retCode, wt;
53 pthread_mutex_lock( &Q->mutex_t );
54 {
55 while( Q -> count == 0 )
56 { Q -> w_empty = 1;
57 retCode =
58 pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
59 if( retCode ){ perror("Thread wait error: "); exit(1); }
60 }
61 Q -> w_empty = 0;
62 Q -> count -= 1;
63 ret = Q->data[ Q->readPos ];
64 INC( Q->readPos );
65 wt = Q -> w_full;
66 Q -> w_full = 0;
67 }
68 pthread_mutex_unlock( &Q->mutex_t );
69 if (wt)
70 pthread_cond_signal( &Q->cond_w_t );
72 //printf("Q out: %d\n", ret);
73 return( ret );
74 }
76 void writePThdQ( void * in, PThdQueueStruc* Q )
77 {
78 int status, wt;
79 //printf("Q in: %d\n", in);
81 pthread_mutex_lock( &Q->mutex_t );
82 {
83 while( Q->count >= 1024 )
84 {
85 Q -> w_full = 1;
86 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t );
87 if (status != 0)
88 { perror("Thread wait error: ");
89 exit(1);
90 }
91 }
93 Q -> w_full = 0;
94 Q->count += 1;
95 Q->data[ Q->writePos ] = in;
96 INC( Q->writePos );
97 wt = Q -> w_empty;
98 Q -> w_empty = 0;
99 }
101 pthread_mutex_unlock( &Q->mutex_t );
102 if( wt ) pthread_cond_signal( &Q->cond_r_t );
103 }
106 //===========================================================================
107 // multi reader multi writer fast Q via CAS
108 #ifndef _GNU_SOURCE
109 #define _GNU_SOURCE
111 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
112 * or full
113 *It uses CAS because it's meant to have more than one reader and more than
114 * one writer.
115 */
117 CASQueueStruc* makeCASQ()
118 {
119 CASQueueStruc* retQ;
120 retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) );
122 retQ->insertLock = UNLOCKED;
123 retQ->extractLock= UNLOCKED;
124 //TODO: check got pointer syntax right
125 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
126 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
127 retQ->endOfData = &(retQ->startOfData[1023]);
129 return retQ;
130 }
133 void* readCASQ( CASQueueStruc* Q )
134 { void *out = 0;
135 int tries = 0;
136 void **startOfData = Q->startOfData;
137 void **endOfData = Q->endOfData;
139 int success = FALSE;
141 while( TRUE )
142 { success =
143 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
144 //NOTE: checked assy, and it does lock correctly..
145 if( success )
146 {
147 void **insertPos = Q->insertPos;
148 void **extractPos = Q->extractPos;
150 //if not empty -- extract just below insert when empty
151 if( insertPos - extractPos != 1 &&
152 !(extractPos == endOfData && insertPos == startOfData))
153 { //move before read
154 if( extractPos == endOfData ) //write new pos exactly once, correctly
155 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
156 } // other thread might read bad pos
157 else
158 { Q->extractPos++;
159 }
160 out = *(Q->extractPos);
161 Q->extractLock = UNLOCKED;
162 return out;
163 }
164 else //Q is empty
165 { success = FALSE;
166 Q->extractLock = UNLOCKED;//have to try again, release for others
167 }
168 }
169 //Q is busy or empty
170 tries++;
171 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
172 }
173 }
175 void writeCASQ( void * in, CASQueueStruc* Q )
176 {
177 int tries = 0;
178 //TODO: need to make Q volatile? Want to do this Q in assembly!
179 //Have no idea what GCC's going to do to this code
180 void **startOfData = Q->startOfData;
181 void **endOfData = Q->endOfData;
183 int success = FALSE;
185 while( TRUE )
186 { success =
187 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
188 if( success )
189 {
190 void **insertPos = Q->insertPos;
191 void **extractPos = Q->extractPos;
193 //check if room to insert.. can't use a count variable
194 // 'cause both insertor Thd and extractor Thd would write it
195 if( extractPos - insertPos != 1 &&
196 !(insertPos == endOfData && extractPos == startOfData))
197 { *(Q->insertPos) = in; //insert before move
198 if( insertPos == endOfData )
199 { Q->insertPos = startOfData;
200 }
201 else
202 { Q->insertPos++;
203 }
204 Q->insertLock = UNLOCKED;
205 return;
206 }
207 else //Q is full
208 { success = FALSE;
209 Q->insertLock = UNLOCKED;//have to try again, release for others
210 }
211 }
212 tries++;
213 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
214 }
215 }
217 #endif //_GNU_SOURCE
220 //===========================================================================
221 //Single reader single writer super fast Q.. no atomic instrs..
224 /*This is a blocking queue, but it uses no atomic instructions, just does
225 * yield() when empty or full
226 *
227 *It doesn't need any atomic instructions because only a single thread
228 * extracts and only a single thread inserts, and it has no locations that
229 * are written by both. It writes before moving and moves before reading,
230 * and never lets write position and read position be the same, so dis-
231 * synchrony can only ever cause an unnecessary call to yield(), never a
232 * wrong value (by monotonicity of movement of pointers, plus single writer
233 * to pointers, plus sequence of write before change pointer, plus
234 * assumptions that if thread A semantically writes X before Y, then thread
235 * B will see the writes in that order.)
236 */
238 SRSWQueueStruc* makeSRSWQ()
239 {
240 SRSWQueueStruc* retQ;
241 retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) );
243 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
244 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
245 retQ->endOfData = &(retQ->startOfData[1023]);
247 return retQ;
248 }
251 void* readSRSWQ( SRSWQueueStruc* Q )
252 { void *out = 0;
253 int tries = 0;
255 while( TRUE )
256 {
257 if( Q->insertPos - Q->extractPos != 1 &&
258 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
259 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
260 else Q->extractPos++; //move before read
261 out = *(Q->extractPos);
262 return out;
263 }
264 //Q is empty
265 tries++;
266 if( tries > SPINLOCK_TRIES ) pthread_yield();
267 }
268 }
271 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
272 { void *out = 0;
273 int tries = 0;
275 while( TRUE )
276 {
277 if( Q->insertPos - Q->extractPos != 1 &&
278 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
279 { Q->extractPos++; //move before read
280 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
281 out = *(Q->extractPos);
282 return out;
283 }
284 //Q is empty
285 tries++;
286 if( tries > 2 ) return 0; //long enough for writer to finish
287 }
288 }
291 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
292 {
293 int tries = 0;
295 while( TRUE )
296 {
297 if( Q->extractPos - Q->insertPos != 1 &&
298 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
299 { *(Q->insertPos) = in; //insert before move
300 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
301 else Q->insertPos++;
302 return;
303 }
304 //Q is full
305 tries++;
306 if( tries > SPINLOCK_TRIES ) pthread_yield();
307 }
308 }
312 //===========================================================================
313 //Single reader Multiple writer super fast Q.. no atomic instrs..
316 /*This is a blocking queue, but it uses no atomic instructions, just does
317 * yield() when empty or full
318 *
319 *It doesn't need any atomic instructions because only a single thread
320 * extracts and only a single thread inserts, and it has no locations that
321 * are written by both. It writes before moving and moves before reading,
322 * and never lets write position and read position be the same, so dis-
323 * synchrony can only ever cause an unnecessary call to yield(), never a
324 * wrong value (by monotonicity of movement of pointers, plus single writer
325 * to pointers, plus sequence of write before change pointer, plus
326 * assumptions that if thread A semantically writes X before Y, then thread
327 * B will see the writes in that order.)
328 *
329 *The multi-writer version is implemented as a hierarchy. Each writer has
330 * its own single-reader single-writer queue. The reader simply does a
331 * round-robin harvesting from them.
332 *
333 *A writer must first register itself with the queue, and receives an ID back
334 * It then uses that ID on each write operation.
335 *
336 *The implementation is:
337 *Physically:
338 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
339 * -] it also has read-pointer to the last queue a write was taken from.
340 *
341 *Action-Patterns:
342 * -] To add a writer
343 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
344 * --]] internally addWriterToQ does:
345 * ---]]] if needs more room, makes a larger writer-array
346 * ---]]] copies the old writer-array into the new
347 * ---]]] makes a new SRSW queue an puts it into the array
348 * ---]]] returns the index to the new SRSW queue as the ID
349 * -] To write
350 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
351 * --]] this call may block, via repeated yield() calls
352 * --]] internally, writeSRMWQ does:
353 * ---]]] uses the writerID as index to get the SRSW queue for that writer
354 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
355 * -] To Read
356 * --]] reader calls readSRMWQ, passing the Q struc
357 * --]] this call may block, via repeated yield() calls
358 * --]] internally, readSRMWQ does:
359 * ---]]] gets saved index of last SRSW queue read from
360 * ---]]] increments index and gets indexed queue
361 * ---]]] does a non-blocking read of that queue
362 * ---]]] if gets something, saves index and returns that value
363 * ---]]] if gets null, then goes to next queue
364 * ---]]] if got null from all the queues then does yield() then tries again
365 *
366 *Note: "0" is used as the value null, so SRSW queues must only contain
367 * pointers, and cannot use 0 as a valid pointer value.
368 *
369 */
371 SRMWQueueStruc* makeSRMWQ()
372 { SRMWQueueStruc* retQ;
374 retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) );
376 retQ->numInternalQs = 0;
377 retQ->internalQsSz = 10;
378 retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *));
380 retQ->lastQReadFrom = 0;
382 return retQ;
383 }
385 /* ---]]] if needs more room, makes a larger writer-array
386 * ---]]] copies the old writer-array into the new
387 * ---]]] makes a new SRSW queue an puts it into the array
388 * ---]]] returns the index to the new SRSW queue as the ID
389 *
390 *NOTE: assuming all adds are completed before any writes or reads are
391 * performed.. otherwise, this needs to be re-done carefully, probably with
392 * a lock.
393 */
394 int addWriterToSRMWQ( SRMWQueueStruc* Q )
395 { int oldSz, i;
396 SRSWQueueStruc * *oldArray;
398 (Q->numInternalQs)++;
399 if( Q->numInternalQs >= Q->internalQsSz )
400 { //full, so make bigger
401 oldSz = Q->internalQsSz;
402 oldArray = Q->internalQs;
403 Q->internalQsSz *= 2;
404 Q->internalQs = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
405 for( i = 0; i < oldSz; i++ )
406 { Q->internalQs[i] = oldArray[i];
407 }
408 free( oldArray );
409 }
410 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
411 return Q->numInternalQs - 1;
412 }
415 /* ---]]] gets saved index of last SRSW queue read-from
416 * ---]]] increments index and gets indexed queue
417 * ---]]] does a non-blocking read of that queue
418 * ---]]] if gets something, saves index and returns that value
419 * ---]]] if gets null, then goes to next queue
420 * ---]]] if got null from all the queues then does yield() then tries again
421 */
422 void* readSRMWQ( SRMWQueueStruc* Q )
423 { SRSWQueueStruc *readQ;
424 void *readValue = 0;
425 int tries = 0;
426 int QToReadFrom = 0;
428 QToReadFrom = Q->lastQReadFrom;
430 while( TRUE )
431 { QToReadFrom++;
432 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
433 readQ = Q->internalQs[ QToReadFrom ];
434 readValue = readSRSWQ_NonBlocking( readQ );
436 if( readValue != 0 ) //got a value, return it
437 { Q->lastQReadFrom = QToReadFrom;
438 return readValue;
439 }
440 else //SRSW Q just read is empty
441 { //check if all queues have been tried
442 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
443 { tries++; //give a writer a chance to finish before yield
444 if( tries > SPINLOCK_TRIES ) pthread_yield();
445 }
446 }
447 }
448 }
451 /*
452 * ---]]] uses the writerID as index to get the SRSW queue for that writer
453 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
454 */
455 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
456 {
457 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
459 writeSRSWQ( in, Q->internalQs[ writerID ] );
460 }
