view BlockingQueue.c @ 10:88efea74818a

Fixed some free s missed before
author Me
date Tue, 02 Nov 2010 16:48:22 -0700
parents 62326cc8e6f4
children 3562716ebdbd
line source
1 /*
2 * Copyright 2009 OpenSourceStewardshipFoundation.org
3 * Licensed under GNU General Public License version 2
4 *
5 * Author: seanhalle@yahoo.com
6 */
9 #include <stdio.h>
10 #include <errno.h>
11 #include <pthread.h>
12 #include <stdlib.h>
13 #include <sched.h>
15 #include "BlockingQueue.h"
17 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
19 #define SPINLOCK_TRIES 100000
21 //===========================================================================
22 //Normal pthread Q
24 PThdQueueStruc* makePThdQ()
25 {
26 PThdQueueStruc* retQ;
27 int retCode;
28 retQ = (PThdQueueStruc *) VMS__malloc( sizeof( PThdQueueStruc ) );
31 retCode =
32 pthread_mutex_init( &retQ->mutex_t, NULL);
33 if(retCode){perror("Error in creating mutex:"); exit(1);}
35 retCode = pthread_cond_init ( &retQ->cond_w_t, NULL);
36 if(retCode){perror("Error in creating cond_var:"); exit(1);}
38 retCode = pthread_cond_init ( &retQ->cond_r_t, NULL);
39 if(retCode){perror("Error in creating cond_var:"); exit(1);}
41 retQ->count = 0;
42 retQ->readPos = 0;
43 retQ->writePos = 0;
44 retQ->w_empty = 0;
45 retQ->w_full = 0;
47 return retQ;
48 }
50 void * readPThdQ( PThdQueueStruc *Q )
51 { void *ret;
52 int retCode, wt;
53 pthread_mutex_lock( &Q->mutex_t );
54 {
55 while( Q -> count == 0 )
56 { Q -> w_empty = 1;
57 retCode =
58 pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
59 if( retCode ){ perror("Thread wait error: "); exit(1); }
60 }
61 Q -> w_empty = 0;
62 Q -> count -= 1;
63 ret = Q->data[ Q->readPos ];
64 INC( Q->readPos );
65 wt = Q -> w_full;
66 Q -> w_full = 0;
67 }
68 pthread_mutex_unlock( &Q->mutex_t );
69 if (wt)
70 pthread_cond_signal( &Q->cond_w_t );
72 //printf("Q out: %d\n", ret);
73 return( ret );
74 }
76 void writePThdQ( void * in, PThdQueueStruc* Q )
77 {
78 int status, wt;
79 //printf("Q in: %d\n", in);
81 pthread_mutex_lock( &Q->mutex_t );
82 {
83 while( Q->count >= 1024 )
84 {
85 Q -> w_full = 1;
86 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t );
87 if (status != 0)
88 { perror("Thread wait error: ");
89 exit(1);
90 }
91 }
93 Q -> w_full = 0;
94 Q->count += 1;
95 Q->data[ Q->writePos ] = in;
96 INC( Q->writePos );
97 wt = Q -> w_empty;
98 Q -> w_empty = 0;
99 }
101 pthread_mutex_unlock( &Q->mutex_t );
102 if( wt ) pthread_cond_signal( &Q->cond_r_t );
103 }
106 //===========================================================================
107 // multi reader multi writer fast Q via CAS
108 #ifndef _GNU_SOURCE
109 #define _GNU_SOURCE
111 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
112 * or full
113 *It uses CAS because it's meant to have more than one reader and more than
114 * one writer.
115 */
117 CASQueueStruc* makeCASQ()
118 {
119 CASQueueStruc* retQ;
120 retQ = (CASQueueStruc *) VMS__malloc( sizeof( CASQueueStruc ) );
122 retQ->insertLock = UNLOCKED;
123 retQ->extractLock= UNLOCKED;
124 //TODO: check got pointer syntax right
125 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
126 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
127 retQ->endOfData = &(retQ->startOfData[1023]);
129 return retQ;
130 }
133 void* readCASQ( CASQueueStruc* Q )
134 { void *out = 0;
135 int tries = 0;
136 void **startOfData = Q->startOfData;
137 void **endOfData = Q->endOfData;
139 int gotLock = FALSE;
141 while( TRUE )
142 { //this intrinsic returns true if the lock held "UNLOCKED", in which
143 // case it now holds "LOCKED" -- if it already held "LOCKED", then
144 // gotLock is FALSE
145 gotLock =
146 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
147 //NOTE: checked assy, and it does lock correctly..
148 if( gotLock )
149 {
150 void **insertPos = Q->insertPos;
151 void **extractPos = Q->extractPos;
153 //if not empty -- extract just below insert when empty
154 if( insertPos - extractPos != 1 &&
155 !(extractPos == endOfData && insertPos == startOfData))
156 { //move before read
157 if( extractPos == endOfData ) //write new pos exactly once, correctly
158 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
159 } // other thread might read bad pos
160 else
161 { Q->extractPos++;
162 }
163 out = *(Q->extractPos);
164 Q->extractLock = UNLOCKED;
165 return out;
166 }
167 else //Q is empty
168 { Q->extractLock = UNLOCKED;//empty, so release lock for others
169 }
170 }
171 //Q is busy or empty
172 tries++;
173 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
174 }
175 }
177 void writeCASQ( void * in, CASQueueStruc* Q )
178 {
179 int tries = 0;
180 //TODO: need to make Q volatile? Want to do this Q in assembly!
181 //Have no idea what GCC's going to do to this code
182 void **startOfData = Q->startOfData;
183 void **endOfData = Q->endOfData;
185 int gotLock = FALSE;
187 while( TRUE )
188 { //this intrinsic returns true if the lock held "UNLOCKED", in which
189 // case it now holds "LOCKED" -- if it already held "LOCKED", then
190 // gotLock is FALSE
191 gotLock =
192 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
193 if( gotLock )
194 {
195 void **insertPos = Q->insertPos;
196 void **extractPos = Q->extractPos;
198 //check if room to insert.. can't use a count variable
199 // 'cause both insertor Thd and extractor Thd would write it
200 if( extractPos - insertPos != 1 &&
201 !(insertPos == endOfData && extractPos == startOfData))
202 { *(Q->insertPos) = in; //insert before move
203 if( insertPos == endOfData )
204 { Q->insertPos = startOfData;
205 }
206 else
207 { Q->insertPos++;
208 }
209 Q->insertLock = UNLOCKED;
210 return;
211 }
212 else //Q is full
213 { Q->insertLock = UNLOCKED;//full, so release lock for others
214 }
215 }
216 tries++;
217 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
218 }
219 }
221 #endif //_GNU_SOURCE
224 //===========================================================================
225 //Single reader single writer super fast Q.. no atomic instrs..
228 /*This is a blocking queue, but it uses no atomic instructions, just does
229 * yield() when empty or full
230 *
231 *It doesn't need any atomic instructions because only a single thread
232 * extracts and only a single thread inserts, and it has no locations that
233 * are written by both. It writes before moving and moves before reading,
234 * and never lets write position and read position be the same, so dis-
235 * synchrony can only ever cause an unnecessary call to yield(), never a
236 * wrong value (by monotonicity of movement of pointers, plus single writer
237 * to pointers, plus sequence of write before change pointer, plus
238 * assumptions that if thread A semantically writes X before Y, then thread
239 * B will see the writes in that order.)
240 */
242 SRSWQueueStruc* makeSRSWQ()
243 {
244 SRSWQueueStruc* retQ;
245 retQ = (SRSWQueueStruc *) VMS__malloc( sizeof( SRSWQueueStruc ) );
247 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
248 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
249 retQ->endOfData = &(retQ->startOfData[1023]);
251 return retQ;
252 }
254 void
255 freeSRSWQ( SRSWQueueStruc* Q )
256 {
257 VMS__free( Q );
258 }
260 void* readSRSWQ( SRSWQueueStruc* Q )
261 { void *out = 0;
262 int tries = 0;
264 while( TRUE )
265 {
266 if( Q->insertPos - Q->extractPos != 1 &&
267 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
268 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
269 else Q->extractPos++; //move before read
270 out = *(Q->extractPos);
271 return out;
272 }
273 //Q is empty
274 tries++;
275 if( tries > SPINLOCK_TRIES ) pthread_yield();
276 }
277 }
280 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
281 { void *out = 0;
282 int tries = 0;
284 while( TRUE )
285 {
286 if( Q->insertPos - Q->extractPos != 1 &&
287 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
288 { Q->extractPos++; //move before read
289 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
290 out = *(Q->extractPos);
291 return out;
292 }
293 //Q is empty
294 tries++;
295 if( tries > 10 ) return NULL; //long enough for writer to finish
296 }
297 }
300 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
301 {
302 int tries = 0;
304 while( TRUE )
305 {
306 if( Q->extractPos - Q->insertPos != 1 &&
307 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
308 { *(Q->insertPos) = in; //insert before move
309 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
310 else Q->insertPos++;
311 return;
312 }
313 //Q is full
314 tries++;
315 if( tries > SPINLOCK_TRIES ) pthread_yield();
316 }
317 }
321 //===========================================================================
322 //Single reader Multiple writer super fast Q.. no atomic instrs..
325 /*This is a blocking queue, but it uses no atomic instructions, just does
326 * yield() when empty or full
327 *
328 *It doesn't need any atomic instructions because only a single thread
329 * extracts and only a single thread inserts, and it has no locations that
330 * are written by both. It writes before moving and moves before reading,
331 * and never lets write position and read position be the same, so dis-
332 * synchrony can only ever cause an unnecessary call to yield(), never a
333 * wrong value (by monotonicity of movement of pointers, plus single writer
334 * to pointers, plus sequence of write before change pointer, plus
335 * assumptions that if thread A semantically writes X before Y, then thread
336 * B will see the writes in that order.)
337 *
338 *The multi-writer version is implemented as a hierarchy. Each writer has
339 * its own single-reader single-writer queue. The reader simply does a
340 * round-robin harvesting from them.
341 *
342 *A writer must first register itself with the queue, and receives an ID back
343 * It then uses that ID on each write operation.
344 *
345 *The implementation is:
346 *Physically:
347 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
348 * -] it also has read-pointer to the last queue a write was taken from.
349 *
350 *Action-Patterns:
351 * -] To add a writer
352 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
353 * --]] internally addWriterToQ does:
354 * ---]]] if needs more room, makes a larger writer-array
355 * ---]]] copies the old writer-array into the new
356 * ---]]] makes a new SRSW queue an puts it into the array
357 * ---]]] returns the index to the new SRSW queue as the ID
358 * -] To write
359 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
360 * --]] this call may block, via repeated yield() calls
361 * --]] internally, writeSRMWQ does:
362 * ---]]] uses the writerID as index to get the SRSW queue for that writer
363 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
364 * -] To Read
365 * --]] reader calls readSRMWQ, passing the Q struc
366 * --]] this call may block, via repeated yield() calls
367 * --]] internally, readSRMWQ does:
368 * ---]]] gets saved index of last SRSW queue read from
369 * ---]]] increments index and gets indexed queue
370 * ---]]] does a non-blocking read of that queue
371 * ---]]] if gets something, saves index and returns that value
372 * ---]]] if gets null, then goes to next queue
373 * ---]]] if got null from all the queues then does yield() then tries again
374 *
375 *Note: "0" is used as the value null, so SRSW queues must only contain
376 * pointers, and cannot use 0 as a valid pointer value.
377 *
378 */
380 SRMWQueueStruc* makeSRMWQ()
381 { SRMWQueueStruc* retQ;
383 retQ = (SRMWQueueStruc *) VMS__malloc( sizeof( SRMWQueueStruc ) );
385 retQ->numInternalQs = 0;
386 retQ->internalQsSz = 10;
387 retQ->internalQs = VMS__malloc( retQ->internalQsSz *
388 sizeof(SRSWQueueStruc *) );
390 retQ->lastQReadFrom = 0;
392 return retQ;
393 }
395 /* ---]]] if needs more room, makes a larger writer-array
396 * ---]]] copies the old writer-array into the new
397 * ---]]] makes a new SRSW queue an puts it into the array
398 * ---]]] returns the index to the new SRSW queue as the ID
399 *
400 *NOTE: assuming all adds are completed before any writes or reads are
401 * performed.. otherwise, this needs to be re-done carefully, probably with
402 * a lock.
403 */
404 int addWriterToSRMWQ( SRMWQueueStruc* Q )
405 { int oldSz, i;
406 SRSWQueueStruc * *oldArray;
408 (Q->numInternalQs)++;
409 if( Q->numInternalQs >= Q->internalQsSz )
410 { //full, so make bigger
411 oldSz = Q->internalQsSz;
412 oldArray = Q->internalQs;
413 Q->internalQsSz *= 2;
414 Q->internalQs = VMS__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
415 for( i = 0; i < oldSz; i++ )
416 { Q->internalQs[i] = oldArray[i];
417 }
418 VMS__free( oldArray );
419 }
420 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
421 return Q->numInternalQs - 1;
422 }
425 /* ---]]] gets saved index of last SRSW queue read-from
426 * ---]]] increments index and gets indexed queue
427 * ---]]] does a non-blocking read of that queue
428 * ---]]] if gets something, saves index and returns that value
429 * ---]]] if gets null, then goes to next queue
430 * ---]]] if got null from all the queues then does yield() then tries again
431 */
432 void* readSRMWQ( SRMWQueueStruc* Q )
433 { SRSWQueueStruc *readQ;
434 void *readValue = 0;
435 int tries = 0;
436 int QToReadFrom = 0;
438 QToReadFrom = Q->lastQReadFrom;
440 while( TRUE )
441 { QToReadFrom++;
442 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
443 readQ = Q->internalQs[ QToReadFrom ];
444 readValue = readSRSWQ_NonBlocking( readQ );
446 if( readValue != 0 ) //got a value, return it
447 { Q->lastQReadFrom = QToReadFrom;
448 return readValue;
449 }
450 else //SRSW Q just read is empty
451 { //check if all queues have been tried
452 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
453 { tries++; //give a writer a chance to finish before yield
454 if( tries > SPINLOCK_TRIES ) pthread_yield();
455 }
456 }
457 }
458 }
461 /*
462 * ---]]] uses the writerID as index to get the SRSW queue for that writer
463 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
464 */
465 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
466 {
467 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
469 writeSRSWQ( in, Q->internalQs[ writerID ] );
470 }