Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > Queue_impl
view BlockingQueue.c @ 14:447e97a52426
added zeroing out private queue when created
| author | Me |
|---|---|
| date | Thu, 04 Nov 2010 17:56:08 -0700 |
| parents | 88efea74818a |
| children | 1e93e5dbeda1 |
line source
1 /*
2 * Copyright 2009 OpenSourceStewardshipFoundation.org
3 * Licensed under GNU General Public License version 2
4 *
5 * Author: seanhalle@yahoo.com
6 */
9 #include <stdio.h>
10 #include <errno.h>
11 #include <pthread.h>
12 #include <stdlib.h>
13 #include <sched.h>
15 #include "BlockingQueue.h"
17 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
19 #define SPINLOCK_TRIES 100000
21 //===========================================================================
22 //Normal pthread Q
24 PThdQueueStruc* makePThdQ()
25 {
26 PThdQueueStruc* retQ;
27 int retCode;
28 retQ = (PThdQueueStruc *) VMS__malloc( sizeof( PThdQueueStruc ) );
31 retCode =
32 pthread_mutex_init( &retQ->mutex_t, NULL);
33 if(retCode){perror("Error in creating mutex:"); exit(1);}
35 retCode = pthread_cond_init ( &retQ->cond_w_t, NULL);
36 if(retCode){perror("Error in creating cond_var:"); exit(1);}
38 retCode = pthread_cond_init ( &retQ->cond_r_t, NULL);
39 if(retCode){perror("Error in creating cond_var:"); exit(1);}
41 retQ->count = 0;
42 retQ->readPos = 0;
43 retQ->writePos = 0;
44 retQ->w_empty = 0;
45 retQ->w_full = 0;
47 return retQ;
48 }
50 void * readPThdQ( PThdQueueStruc *Q )
51 { void *ret;
52 int retCode, wt;
53 pthread_mutex_lock( &Q->mutex_t );
54 {
55 while( Q -> count == 0 )
56 { Q -> w_empty = 1;
57 retCode =
58 pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
59 if( retCode ){ perror("Thread wait error: "); exit(1); }
60 }
61 Q -> w_empty = 0;
62 Q -> count -= 1;
63 ret = Q->data[ Q->readPos ];
64 INC( Q->readPos );
65 wt = Q -> w_full;
66 Q -> w_full = 0;
67 }
68 pthread_mutex_unlock( &Q->mutex_t );
69 if (wt)
70 pthread_cond_signal( &Q->cond_w_t );
72 //printf("Q out: %d\n", ret);
73 return( ret );
74 }
76 void writePThdQ( void * in, PThdQueueStruc* Q )
77 {
78 int status, wt;
79 //printf("Q in: %d\n", in);
81 pthread_mutex_lock( &Q->mutex_t );
82 {
83 while( Q->count >= 1024 )
84 {
85 Q -> w_full = 1;
86 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t );
87 if (status != 0)
88 { perror("Thread wait error: ");
89 exit(1);
90 }
91 }
93 Q -> w_full = 0;
94 Q->count += 1;
95 Q->data[ Q->writePos ] = in;
96 INC( Q->writePos );
97 wt = Q -> w_empty;
98 Q -> w_empty = 0;
99 }
101 pthread_mutex_unlock( &Q->mutex_t );
102 if( wt ) pthread_cond_signal( &Q->cond_r_t );
103 }
106 //===========================================================================
107 // multi reader multi writer fast Q via CAS
108 #ifndef _GNU_SOURCE
109 #define _GNU_SOURCE
111 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
112 * or full
113 *It uses CAS because it's meant to have more than one reader and more than
114 * one writer.
115 */
117 CASQueueStruc* makeCASQ()
118 {
119 CASQueueStruc* retQ;
120 retQ = (CASQueueStruc *) VMS__malloc( sizeof( CASQueueStruc ) );
122 retQ->insertLock = UNLOCKED;
123 retQ->extractLock= UNLOCKED;
124 //TODO: check got pointer syntax right
125 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
126 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
127 retQ->endOfData = &(retQ->startOfData[1023]);
129 return retQ;
130 }
133 void* readCASQ( CASQueueStruc* Q )
134 { void *out = 0;
135 int tries = 0;
136 void **startOfData = Q->startOfData;
137 void **endOfData = Q->endOfData;
139 int gotLock = FALSE;
141 while( TRUE )
142 { //this intrinsic returns true if the lock held "UNLOCKED", in which
143 // case it now holds "LOCKED" -- if it already held "LOCKED", then
144 // gotLock is FALSE
145 gotLock =
146 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
147 //NOTE: checked assy, and it does lock correctly..
148 if( gotLock )
149 {
150 void **insertPos = Q->insertPos;
151 void **extractPos = Q->extractPos;
153 //if not empty -- extract just below insert when empty
154 if( insertPos - extractPos != 1 &&
155 !(extractPos == endOfData && insertPos == startOfData))
156 { //move before read
157 if( extractPos == endOfData ) //write new pos exactly once, correctly
158 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
159 } // other thread might read bad pos
160 else
161 { Q->extractPos++;
162 }
163 out = *(Q->extractPos);
164 Q->extractLock = UNLOCKED;
165 return out;
166 }
167 else //Q is empty
168 { Q->extractLock = UNLOCKED;//empty, so release lock for others
169 }
170 }
171 //Q is busy or empty
172 tries++;
173 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
174 }
175 }
177 void writeCASQ( void * in, CASQueueStruc* Q )
178 {
179 int tries = 0;
180 //TODO: need to make Q volatile? Want to do this Q in assembly!
181 //Have no idea what GCC's going to do to this code
182 void **startOfData = Q->startOfData;
183 void **endOfData = Q->endOfData;
185 int gotLock = FALSE;
187 while( TRUE )
188 { //this intrinsic returns true if the lock held "UNLOCKED", in which
189 // case it now holds "LOCKED" -- if it already held "LOCKED", then
190 // gotLock is FALSE
191 gotLock =
192 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
193 if( gotLock )
194 {
195 void **insertPos = Q->insertPos;
196 void **extractPos = Q->extractPos;
198 //check if room to insert.. can't use a count variable
199 // 'cause both insertor Thd and extractor Thd would write it
200 if( extractPos - insertPos != 1 &&
201 !(insertPos == endOfData && extractPos == startOfData))
202 { *(Q->insertPos) = in; //insert before move
203 if( insertPos == endOfData )
204 { Q->insertPos = startOfData;
205 }
206 else
207 { Q->insertPos++;
208 }
209 Q->insertLock = UNLOCKED;
210 return;
211 }
212 else //Q is full
213 { Q->insertLock = UNLOCKED;//full, so release lock for others
214 }
215 }
216 tries++;
217 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
218 }
219 }
221 #endif //_GNU_SOURCE
224 //===========================================================================
225 //Single reader single writer super fast Q.. no atomic instrs..
228 /*This is a blocking queue, but it uses no atomic instructions, just does
229 * yield() when empty or full
230 *
231 *It doesn't need any atomic instructions because only a single thread
232 * extracts and only a single thread inserts, and it has no locations that
233 * are written by both. It writes before moving and moves before reading,
234 * and never lets write position and read position be the same, so dis-
235 * synchrony can only ever cause an unnecessary call to yield(), never a
236 * wrong value (by monotonicity of movement of pointers, plus single writer
237 * to pointers, plus sequence of write before change pointer, plus
238 * assumptions that if thread A semantically writes X before Y, then thread
239 * B will see the writes in that order.)
240 */
242 SRSWQueueStruc* makeSRSWQ()
243 {
244 SRSWQueueStruc* retQ;
245 retQ = (SRSWQueueStruc *) VMS__malloc( sizeof( SRSWQueueStruc ) );
247 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
248 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
249 retQ->endOfData = &(retQ->startOfData[1023]);
251 return retQ;
252 }
254 void
255 freeSRSWQ( SRSWQueueStruc* Q )
256 {
257 VMS__free( Q );
258 }
260 void* readSRSWQ( SRSWQueueStruc* Q )
261 { void *out = 0;
262 int tries = 0;
264 while( TRUE )
265 {
266 if( Q->insertPos - Q->extractPos != 1 &&
267 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
268 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
269 else Q->extractPos++; //move before read
270 out = *(Q->extractPos);
271 return out;
272 }
273 //Q is empty
274 tries++;
275 if( tries > SPINLOCK_TRIES ) pthread_yield();
276 }
277 }
280 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
281 { void *out = 0;
282 int tries = 0;
284 while( TRUE )
285 {
286 if( Q->insertPos - Q->extractPos != 1 &&
287 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
288 { Q->extractPos++; //move before read
289 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
290 out = *(Q->extractPos);
291 return out;
292 }
293 //Q is empty
294 tries++;
295 if( tries > 10 ) return NULL; //long enough for writer to finish
296 }
297 }
299 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
300 {
301 int tries = 0;
303 while( TRUE )
304 {
305 if( Q->extractPos - Q->insertPos != 1 &&
306 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
307 { *(Q->insertPos) = in; //insert before move
308 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
309 else Q->insertPos++;
310 return;
311 }
312 //Q is full
313 tries++;
314 if( tries > SPINLOCK_TRIES ) pthread_yield();
315 }
316 }
320 //===========================================================================
321 //Single reader Multiple writer super fast Q.. no atomic instrs..
324 /*This is a blocking queue, but it uses no atomic instructions, just does
325 * yield() when empty or full
326 *
327 *It doesn't need any atomic instructions because only a single thread
328 * extracts and only a single thread inserts, and it has no locations that
329 * are written by both. It writes before moving and moves before reading,
330 * and never lets write position and read position be the same, so dis-
331 * synchrony can only ever cause an unnecessary call to yield(), never a
332 * wrong value (by monotonicity of movement of pointers, plus single writer
333 * to pointers, plus sequence of write before change pointer, plus
334 * assumptions that if thread A semantically writes X before Y, then thread
335 * B will see the writes in that order.)
336 *
337 *The multi-writer version is implemented as a hierarchy. Each writer has
338 * its own single-reader single-writer queue. The reader simply does a
339 * round-robin harvesting from them.
340 *
341 *A writer must first register itself with the queue, and receives an ID back
342 * It then uses that ID on each write operation.
343 *
344 *The implementation is:
345 *Physically:
346 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
347 * -] it also has read-pointer to the last queue a write was taken from.
348 *
349 *Action-Patterns:
350 * -] To add a writer
351 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
352 * --]] internally addWriterToQ does:
353 * ---]]] if needs more room, makes a larger writer-array
354 * ---]]] copies the old writer-array into the new
355 * ---]]] makes a new SRSW queue an puts it into the array
356 * ---]]] returns the index to the new SRSW queue as the ID
357 * -] To write
358 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
359 * --]] this call may block, via repeated yield() calls
360 * --]] internally, writeSRMWQ does:
361 * ---]]] uses the writerID as index to get the SRSW queue for that writer
362 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
363 * -] To Read
364 * --]] reader calls readSRMWQ, passing the Q struc
365 * --]] this call may block, via repeated yield() calls
366 * --]] internally, readSRMWQ does:
367 * ---]]] gets saved index of last SRSW queue read from
368 * ---]]] increments index and gets indexed queue
369 * ---]]] does a non-blocking read of that queue
370 * ---]]] if gets something, saves index and returns that value
371 * ---]]] if gets null, then goes to next queue
372 * ---]]] if got null from all the queues then does yield() then tries again
373 *
374 *Note: "0" is used as the value null, so SRSW queues must only contain
375 * pointers, and cannot use 0 as a valid pointer value.
376 *
377 */
379 SRMWQueueStruc* makeSRMWQ()
380 { SRMWQueueStruc* retQ;
382 retQ = (SRMWQueueStruc *) VMS__malloc( sizeof( SRMWQueueStruc ) );
384 retQ->numInternalQs = 0;
385 retQ->internalQsSz = 10;
386 retQ->internalQs = VMS__malloc( retQ->internalQsSz *
387 sizeof(SRSWQueueStruc *) );
389 retQ->lastQReadFrom = 0;
391 return retQ;
392 }
394 /* ---]]] if needs more room, makes a larger writer-array
395 * ---]]] copies the old writer-array into the new
396 * ---]]] makes a new SRSW queue an puts it into the array
397 * ---]]] returns the index to the new SRSW queue as the ID
398 *
399 *NOTE: assuming all adds are completed before any writes or reads are
400 * performed.. otherwise, this needs to be re-done carefully, probably with
401 * a lock.
402 */
403 int addWriterToSRMWQ( SRMWQueueStruc* Q )
404 { int oldSz, i;
405 SRSWQueueStruc * *oldArray;
407 (Q->numInternalQs)++;
408 if( Q->numInternalQs >= Q->internalQsSz )
409 { //full, so make bigger
410 oldSz = Q->internalQsSz;
411 oldArray = Q->internalQs;
412 Q->internalQsSz *= 2;
413 Q->internalQs = VMS__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
414 for( i = 0; i < oldSz; i++ )
415 { Q->internalQs[i] = oldArray[i];
416 }
417 VMS__free( oldArray );
418 }
419 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
420 return Q->numInternalQs - 1;
421 }
424 /* ---]]] gets saved index of last SRSW queue read-from
425 * ---]]] increments index and gets indexed queue
426 * ---]]] does a non-blocking read of that queue
427 * ---]]] if gets something, saves index and returns that value
428 * ---]]] if gets null, then goes to next queue
429 * ---]]] if got null from all the queues then does yield() then tries again
430 */
431 void* readSRMWQ( SRMWQueueStruc* Q )
432 { SRSWQueueStruc *readQ;
433 void *readValue = 0;
434 int tries = 0;
435 int QToReadFrom = 0;
437 QToReadFrom = Q->lastQReadFrom;
439 while( TRUE )
440 { QToReadFrom++;
441 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
442 readQ = Q->internalQs[ QToReadFrom ];
443 readValue = readSRSWQ_NonBlocking( readQ );
445 if( readValue != 0 ) //got a value, return it
446 { Q->lastQReadFrom = QToReadFrom;
447 return readValue;
448 }
449 else //SRSW Q just read is empty
450 { //check if all queues have been tried
451 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
452 { tries++; //give a writer a chance to finish before yield
453 if( tries > SPINLOCK_TRIES ) pthread_yield();
454 }
455 }
456 }
457 }
460 /*
461 * ---]]] uses the writerID as index to get the SRSW queue for that writer
462 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
463 */
464 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
465 {
466 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
468 writeSRSWQ( in, Q->internalQs[ writerID ] );
469 }
