Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > Queue_impl
view BlockingQueue.c @ 4:8abcca1590b8
Tested and working on full VMS test
| author | Me |
|---|---|
| date | Wed, 30 Jun 2010 14:34:56 -0700 |
| parents | 81f6687d52d1 |
| children | 174a7c2ca340 |
line source
1 /*
2 * Copyright 2009 OpenSourceStewardshipFoundation.org
3 * Licensed under GNU General Public License version 2
4 *
5 * Author: seanhalle@yahoo.com
6 */
9 #include <stdio.h>
10 #include <errno.h>
11 #include <pthread.h>
12 #include <stdlib.h>
13 #include <sched.h>
14 #include <windows.h>
16 #include "BlockingQueue.h"
18 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
20 #define SPINLOCK_TRIES 100000
22 //===========================================================================
23 //Normal pthread Q
25 PThdQueueStruc* makePThdQ()
26 {
27 PThdQueueStruc* retQ;
28 int status;
29 retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) );
32 status = pthread_mutex_init( &retQ->mutex_t, NULL);
33 if (status < 0)
34 {
35 perror("Error in creating mutex:");
36 exit(1);
37 return NULL;
38 }
40 status = pthread_cond_init ( &retQ->cond_w_t, NULL);
41 if (status < 0)
42 {
43 perror("Error in creating cond_var:");
44 exit(1);
45 return NULL;
46 }
48 status = pthread_cond_init ( &retQ->cond_r_t, NULL);
49 if (status < 0)
50 {
51 perror("Error in creating cond_var:");
52 exit(1);
53 return NULL;
54 }
56 retQ->count = 0;
57 retQ->readPos = 0;
58 retQ->writePos = 0;
59 retQ -> w_empty = retQ -> w_full = 0;
61 return retQ;
62 }
64 void * readPThdQ( PThdQueueStruc *Q )
65 { void *ret;
66 int status, wt;
67 pthread_mutex_lock( &Q->mutex_t );
68 {
69 while( Q -> count == 0 )
70 { Q -> w_empty = 1;
71 // pthread_cond_broadcast( &Q->cond_w_t );
72 status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
73 if (status != 0)
74 { perror("Thread wait error: ");
75 exit(1);
76 }
77 }
78 Q -> w_empty = 0;
79 Q -> count -= 1;
80 ret = Q->data[ Q->readPos ];
81 INC( Q->readPos );
82 wt = Q -> w_full;
83 Q -> w_full = 0;
84 //pthread_cond_broadcast( &Q->cond_w_t );
85 }
86 pthread_mutex_unlock( &Q->mutex_t );
87 if (wt) pthread_cond_signal( &Q->cond_w_t );
89 return( ret );
90 }
92 void writePThdQ( void * in, PThdQueueStruc* Q )
93 {
94 int status, wt;
95 pthread_mutex_lock( &Q->mutex_t );
96 {
97 while( Q->count >= 1024 )
98 {
99 Q -> w_full = 1;
100 // pthread_cond_broadcast( &Q->cond_r_t );
101 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t );
102 if (status != 0)
103 { perror("Thread wait error: ");
104 exit(1);
105 }
106 }
107 Q -> w_full = 0;
108 Q->count += 1;
109 Q->data[ Q->writePos ] = in;
110 INC( Q->writePos );
111 wt = Q -> w_empty;
112 Q -> w_empty = 0;
113 // pthread_cond_broadcast( &Q->cond_r_t );
114 }
115 pthread_mutex_unlock( &Q->mutex_t );
116 if( wt ) pthread_cond_signal( &Q->cond_r_t );
117 }
120 //===========================================================================
121 // multi reader multi writer fast Q via CAS
122 #ifndef _GNU_SOURCE
123 #define _GNU_SOURCE
125 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
126 * or full
127 *It uses CAS because it's meant to have more than one reader and more than
128 * one writer.
129 */
131 CASQueueStruc* makeCASQ()
132 {
133 CASQueueStruc* retQ;
134 retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) );
136 retQ->insertLock = UNLOCKED;
137 retQ->extractLock= UNLOCKED;
138 //TODO: check got pointer syntax right
139 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
140 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
141 retQ->endOfData = &(retQ->startOfData[1023]);
143 return retQ;
144 }
147 void* readCASQ( CASQueueStruc* Q )
148 { void *out = 0;
149 int tries = 0;
150 void **startOfData = Q->startOfData;
151 void **endOfData = Q->endOfData;
153 int success = FALSE;
155 while( TRUE )
156 { success =
157 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
158 //NOTE: checked assy, and it does lock correctly..
159 if( success )
160 {
161 void **insertPos = Q->insertPos;
162 void **extractPos = Q->extractPos;
164 //if not empty -- extract just below insert when empty
165 if( insertPos - extractPos != 1 &&
166 !(extractPos == endOfData && insertPos == startOfData))
167 { //move before read
168 if( extractPos == endOfData ) //write new pos exactly once, correctly
169 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
170 } // other thread might read bad pos
171 else
172 { Q->extractPos++;
173 }
174 out = *(Q->extractPos);
175 Q->extractLock = UNLOCKED;
176 return out;
177 }
178 else //Q is empty
179 { success = FALSE;
180 Q->extractLock = UNLOCKED;//have to try again, release for others
181 }
182 }
183 //Q is busy or empty
184 tries++;
185 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //WinAPI yield()
186 }
187 }
189 void writeCASQ( void * in, CASQueueStruc* Q )
190 {
191 int tries = 0;
192 //TODO: need to make Q volatile? Want to do this Q in assembly!
193 //Have no idea what GCC's going to do to this code
194 void **startOfData = Q->startOfData;
195 void **endOfData = Q->endOfData;
197 int success = FALSE;
199 while( TRUE )
200 { success =
201 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
202 if( success )
203 {
204 void **insertPos = Q->insertPos;
205 void **extractPos = Q->extractPos;
207 //check if room to insert.. can't use a count variable
208 // 'cause both insertor Thd and extractor Thd would write it
209 if( extractPos - insertPos != 1 &&
210 !(insertPos == endOfData && extractPos == startOfData))
211 { *(Q->insertPos) = in; //insert before move
212 if( insertPos == endOfData )
213 { Q->insertPos = startOfData;
214 }
215 else
216 { Q->insertPos++;
217 }
218 Q->insertLock = UNLOCKED;
219 return;
220 }
221 else //Q is full
222 { success = FALSE;
223 Q->insertLock = UNLOCKED;//have to try again, release for others
224 }
225 }
226 tries++;
227 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
228 }
229 }
231 #endif //_GNU_SOURCE
234 //===========================================================================
235 //Single reader single writer super fast Q.. no atomic instrs..
238 /*This is a blocking queue, but it uses no atomic instructions, just does
239 * yield() when empty or full
240 *
241 *It doesn't need any atomic instructions because only a single thread
242 * extracts and only a single thread inserts, and it has no locations that
243 * are written by both. It writes before moving and moves before reading,
244 * and never lets write position and read position be the same, so dis-
245 * synchrony can only ever cause an unnecessary call to yield(), never a
246 * wrong value (by monotonicity of movement of pointers, plus single writer
247 * to pointers, plus sequence of write before change pointer, plus
248 * assumptions that if thread A semantically writes X before Y, then thread
249 * B will see the writes in that order.)
250 */
252 SRSWQueueStruc* makeSRSWQ()
253 {
254 SRSWQueueStruc* retQ;
255 retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) );
257 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
258 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
259 retQ->endOfData = &(retQ->startOfData[1023]);
261 return retQ;
262 }
265 void* readSRSWQ( SRSWQueueStruc* Q )
266 { void *out = 0;
267 int tries = 0;
269 while( TRUE )
270 {
271 if( Q->insertPos - Q->extractPos != 1 &&
272 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
273 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
274 else Q->extractPos++; //move before read
275 out = *(Q->extractPos);
276 return out;
277 }
278 //Q is empty
279 tries++;
280 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
281 }
282 }
285 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
286 { void *out = 0;
287 int tries = 0;
289 while( TRUE )
290 {
291 if( Q->insertPos - Q->extractPos != 1 &&
292 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
293 { Q->extractPos++; //move before read
294 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
295 out = *(Q->extractPos);
296 return out;
297 }
298 //Q is empty
299 tries++;
300 if( tries > 2 ) return 0; //long enough for writer to finish
301 }
302 }
305 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
306 {
307 int tries = 0;
309 while( TRUE )
310 {
311 if( Q->extractPos - Q->insertPos != 1 &&
312 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
313 { *(Q->insertPos) = in; //insert before move
314 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
315 else Q->insertPos++;
316 return;
317 }
318 //Q is full
319 tries++;
320 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
321 }
322 }
326 //===========================================================================
327 //Single reader Multiple writer super fast Q.. no atomic instrs..
330 /*This is a blocking queue, but it uses no atomic instructions, just does
331 * yield() when empty or full
332 *
333 *It doesn't need any atomic instructions because only a single thread
334 * extracts and only a single thread inserts, and it has no locations that
335 * are written by both. It writes before moving and moves before reading,
336 * and never lets write position and read position be the same, so dis-
337 * synchrony can only ever cause an unnecessary call to yield(), never a
338 * wrong value (by monotonicity of movement of pointers, plus single writer
339 * to pointers, plus sequence of write before change pointer, plus
340 * assumptions that if thread A semantically writes X before Y, then thread
341 * B will see the writes in that order.)
342 *
343 *The multi-writer version is implemented as a hierarchy. Each writer has
344 * its own single-reader single-writer queue. The reader simply does a
345 * round-robin harvesting from them.
346 *
347 *A writer must first register itself with the queue, and receives an ID back
348 * It then uses that ID on each write operation.
349 *
350 *The implementation is:
351 *Physically:
352 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
353 * -] it also has read-pointer to the last queue a write was taken from.
354 *
355 *Action-Patterns:
356 * -] To add a writer
357 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
358 * --]] internally addWriterToQ does:
359 * ---]]] if needs more room, makes a larger writer-array
360 * ---]]] copies the old writer-array into the new
361 * ---]]] makes a new SRSW queue an puts it into the array
362 * ---]]] returns the index to the new SRSW queue as the ID
363 * -] To write
364 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
365 * --]] this call may block, via repeated yield() calls
366 * --]] internally, writeSRMWQ does:
367 * ---]]] uses the writerID as index to get the SRSW queue for that writer
368 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
369 * -] To Read
370 * --]] reader calls readSRMWQ, passing the Q struc
371 * --]] this call may block, via repeated yield() calls
372 * --]] internally, readSRMWQ does:
373 * ---]]] gets saved index of last SRSW queue read from
374 * ---]]] increments index and gets indexed queue
375 * ---]]] does a non-blocking read of that queue
376 * ---]]] if gets something, saves index and returns that value
377 * ---]]] if gets null, then goes to next queue
378 * ---]]] if got null from all the queues then does yield() then tries again
379 *
380 *Note: "0" is used as the value null, so SRSW queues must only contain
381 * pointers, and cannot use 0 as a valid pointer value.
382 *
383 */
385 SRMWQueueStruc* makeSRMWQ()
386 { SRMWQueueStruc* retQ;
388 retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) );
390 retQ->numInternalQs = 0;
391 retQ->internalQsSz = 10;
392 retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *));
394 retQ->lastQReadFrom = 0;
396 return retQ;
397 }
399 /* ---]]] if needs more room, makes a larger writer-array
400 * ---]]] copies the old writer-array into the new
401 * ---]]] makes a new SRSW queue an puts it into the array
402 * ---]]] returns the index to the new SRSW queue as the ID
403 *
404 *NOTE: assuming all adds are completed before any writes or reads are
405 * performed.. otherwise, this needs to be re-done carefully, probably with
406 * a lock.
407 */
408 int addWriterToSRMWQ( SRMWQueueStruc* Q )
409 { int oldSz, i;
410 SRSWQueueStruc * *oldArray;
412 (Q->numInternalQs)++;
413 if( Q->numInternalQs >= Q->internalQsSz )
414 { //full, so make bigger
415 oldSz = Q->internalQsSz;
416 oldArray = Q->internalQs;
417 Q->internalQsSz *= 2;
418 Q->internalQs = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
419 for( i = 0; i < oldSz; i++ )
420 { Q->internalQs[i] = oldArray[i];
421 }
422 free( oldArray );
423 }
424 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
425 return Q->numInternalQs - 1;
426 }
429 /* ---]]] gets saved index of last SRSW queue read-from
430 * ---]]] increments index and gets indexed queue
431 * ---]]] does a non-blocking read of that queue
432 * ---]]] if gets something, saves index and returns that value
433 * ---]]] if gets null, then goes to next queue
434 * ---]]] if got null from all the queues then does yield() then tries again
435 */
436 void* readSRMWQ( SRMWQueueStruc* Q )
437 { SRSWQueueStruc *readQ;
438 void *readValue = 0;
439 int tries = 0;
440 int QToReadFrom = 0;
442 QToReadFrom = Q->lastQReadFrom;
444 while( TRUE )
445 { QToReadFrom++;
446 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
447 readQ = Q->internalQs[ QToReadFrom ];
448 readValue = readSRSWQ_NonBlocking( readQ );
450 if( readValue != 0 ) //got a value, return it
451 { Q->lastQReadFrom = QToReadFrom;
452 return readValue;
453 }
454 else //SRSW Q just read is empty
455 { //check if all queues have been tried
456 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
457 { tries++; //give a writer a chance to finish before yield
458 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
459 }
460 }
461 }
462 }
465 /*
466 * ---]]] uses the writerID as index to get the SRSW queue for that writer
467 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
468 */
469 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
470 {
471 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
473 writeSRSWQ( in, Q->internalQs[ writerID ] );
474 }
