view BlockingQueue.c @ 50:93a5782d064b

adding netbeans project directories to repository
author Sean Halle <seanhalle@yahoo.com>
date Fri, 14 Feb 2014 07:54:22 -0800
parents 1ea30ca7093c
children
line source
1 /*
2 * Copyright 2009 OpenSourceResearchInstitute.org
3 * Licensed under GNU General Public License version 2
4 *
5 * Author: seanhalle@yahoo.com
6 */
9 #include <stdio.h>
10 #include <errno.h>
11 #include <pthread.h>
12 #include <stdlib.h>
13 #include <sched.h>
14 #include <string.h>
16 #include "BlockingQueue.h"
17 #include <PR__include/prqueue.h>
18 #include <PR__include/prmalloc.h>
20 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
22 #define SPINLOCK_TRIES 100000
25 //========================== pthread based queue =========================
26 /*Not currently implemented.. however, copied this code from place where
27 * did equivalent.. Idea is to just make a private queue, then protect
28 * access with a lock.. copied code snippet below is how access was
29 * protected.. just roll this inside a "readBlockingQ()" function.. do
30 * equivalent inside writeBlockingQ() function.. to make one, just add
31 * the lock to the queue structure..
32 */
33 /*
34 production = NULL;
35 while( production == NULL )
36 { pthread_mutex_lock( &queueAccessLock );
37 production = readPrivQ( commQ );
38 pthread_mutex_unlock( &queueAccessLock );
39 // If empty, yields and tries again.
40 if( production == NULL) sched_yield();
41 }
42 */
44 //===========================================================================
45 // multi reader multi writer fast Q via CAS
46 #ifndef _GNU_SOURCE
47 #define _GNU_SOURCE
49 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
50 * or full
51 *It uses CAS because it's meant to have more than one reader and more than
52 * one writer.
53 */
55 CASQueueStruc* makeCASQ()
56 {
57 CASQueueStruc* retQ;
58 retQ = (CASQueueStruc *) PR__malloc( sizeof( CASQueueStruc ) );
60 retQ->insertLock = UNLOCKED;
61 retQ->extractLock= UNLOCKED;
63 retQ->extractPos = (volatile void**)&(retQ->startOfData[0]); //side by side == empty
64 retQ->insertPos = (volatile void**)&(retQ->startOfData[1]); // so start pos's have to be
65 retQ->endOfData = &(retQ->startOfData[1023]);
67 return retQ;
68 }
71 void* readCASQ( CASQueueStruc* Q )
72 { void *out = 0;
73 int32 tries = 0;
74 void **startOfData = Q->startOfData;
75 void **endOfData = Q->endOfData;
77 int32 gotLock = FALSE;
79 while( TRUE )
80 { //this intrinsic returns true if the lock held "UNLOCKED", in which
81 // case it now holds "LOCKED" -- if it already held "LOCKED", then
82 // gotLock is FALSE
83 gotLock =
84 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
85 //NOTE: checked assy, and it does lock correctly..
86 if( gotLock )
87 {
88 void **insertPos = (void **)Q->insertPos;
89 void **extractPos = (void **)Q->extractPos;
91 //if not empty -- extract just below insert when empty
92 if( insertPos - extractPos != 1 &&
93 !(extractPos == endOfData && insertPos == startOfData))
94 { //move before read
95 if( extractPos == endOfData ) //write new pos exactly once, correctly
96 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
97 } // other thread might read bad pos
98 else
99 { Q->extractPos++;
100 }
101 out = (void *) *(Q->extractPos);
102 Q->extractLock = UNLOCKED;
103 return out;
104 }
105 else //Q is empty
106 { Q->extractLock = UNLOCKED;//empty, so release lock for others
107 }
108 }
109 //Q is busy or empty
110 tries++;
111 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
112 }
113 }
115 void writeCASQ( void * in, CASQueueStruc* Q )
116 {
117 int32 tries = 0;
118 //TODO: need to make Q volatile? Want to do this Q in assembly!
119 //Have no idea what GCC's going to do to this code
120 void **startOfData = Q->startOfData;
121 void **endOfData = Q->endOfData;
123 int32 gotLock = FALSE;
125 while( TRUE )
126 { //this intrinsic returns true if the lock held "UNLOCKED", in which
127 // case it now holds "LOCKED" -- if it already held "LOCKED", then
128 // gotLock is FALSE
129 gotLock =
130 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
131 if( gotLock )
132 {
133 void **insertPos = (void **)Q->insertPos;
134 void **extractPos = (void **)Q->extractPos;
136 //check if room to insert.. can't use a count variable
137 // 'cause both insertor Thd and extractor Thd would write it
138 if( extractPos - insertPos != 1 &&
139 !(insertPos == endOfData && extractPos == startOfData))
140 { *(Q->insertPos) = in; //insert before move
141 if( insertPos == endOfData )
142 { Q->insertPos = startOfData;
143 }
144 else
145 { Q->insertPos++;
146 }
147 Q->insertLock = UNLOCKED;
148 return;
149 }
150 else //Q is full
151 { Q->insertLock = UNLOCKED;//full, so release lock for others
152 }
153 }
154 tries++;
155 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
156 }
157 }
159 #endif //_GNU_SOURCE
162 //===========================================================================
163 //Single reader single writer super fast Q.. no atomic instrs..
166 /*This is a blocking queue, but it uses no atomic instructions, just does
167 * yield() when empty or full
168 *
169 *It doesn't need any atomic instructions because only a single thread
170 * extracts and only a single thread inserts, and it has no locations that
171 * are written by both. It writes before moving and moves before reading,
172 * and never lets write position and read position be the same, so dis-
173 * synchrony can only ever cause an unnecessary call to yield(), never a
174 * wrong value (by monotonicity of movement of pointers, plus single writer
175 * to pointers, plus sequence of write before change pointer, plus
176 * assumptions that if thread A semantically writes X before Y, then thread
177 * B will see the writes in that order.)
178 */
180 SRSWQueueStruc* makeSRSWQ()
181 {
182 SRSWQueueStruc* retQ;
183 retQ = (SRSWQueueStruc *) PR__malloc( sizeof( SRSWQueueStruc ) );
184 memset( retQ->startOfData, 0, 1024 * sizeof(void *) );
186 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
187 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
188 retQ->endOfData = &(retQ->startOfData[1023]);
190 return retQ;
191 }
193 void
194 freeSRSWQ( SRSWQueueStruc* Q )
195 {
196 PR__free( Q );
197 }
199 void* readSRSWQ( SRSWQueueStruc* Q )
200 { void *out = 0;
201 int32 tries = 0;
203 while( TRUE )
204 {
205 if( Q->insertPos - Q->extractPos != 1 &&
206 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
207 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
208 else Q->extractPos++; //move before read
209 out = *(Q->extractPos);
210 return out;
211 }
212 //Q is empty
213 tries++;
214 if( tries > SPINLOCK_TRIES ) pthread_yield();
215 }
216 }
219 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
220 { void *out = 0;
221 int32 tries = 0;
223 while( TRUE )
224 {
225 if( Q->insertPos - Q->extractPos != 1 &&
226 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
227 { Q->extractPos++; //move before read
228 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
229 out = *(Q->extractPos);
230 return out;
231 }
232 //Q is empty
233 tries++;
234 if( tries > 10 ) return NULL; //long enough for writer to finish
235 }
236 }
239 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
240 {
241 int32 tries = 0;
243 while( TRUE )
244 {
245 if( Q->extractPos - Q->insertPos != 1 &&
246 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
247 { *(Q->insertPos) = in; //insert before move
248 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
249 else Q->insertPos++;
250 return;
251 }
252 //Q is full
253 tries++;
254 if( tries > SPINLOCK_TRIES ) pthread_yield();
255 }
256 }
260 //===========================================================================
261 //Single reader Multiple writer super fast Q.. no atomic instrs..
264 /*This is a blocking queue, but it uses no atomic instructions, just does
265 * yield() when empty or full
266 *
267 *It doesn't need any atomic instructions because only a single thread
268 * extracts and only a single thread inserts, and it has no locations that
269 * are written by both. It writes before moving and moves before reading,
270 * and never lets write position and read position be the same, so dis-
271 * synchrony can only ever cause an unnecessary call to yield(), never a
272 * wrong value (by monotonicity of movement of pointers, plus single writer
273 * to pointers, plus sequence of write before change pointer, plus
274 * assumptions that if thread A semantically writes X before Y, then thread
275 * B will see the writes in that order.)
276 *
277 *The multi-writer version is implemented as a hierarchy. Each writer has
278 * its own single-reader single-writer queue. The reader simply does a
279 * round-robin harvesting from them.
280 *
281 *A writer must first register itself with the queue, and receives an ID back
282 * It then uses that ID on each write operation.
283 *
284 *The implementation is:
285 *Physically:
286 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
287 * -] it also has read-pointer to the last queue a write was taken from.
288 *
289 *Action-Patterns:
290 * -] To add a writer
291 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
292 * --]] internally addWriterToQ does:
293 * ---]]] if needs more room, makes a larger writer-array
294 * ---]]] copies the old writer-array into the new
295 * ---]]] makes a new SRSW queue an puts it into the array
296 * ---]]] returns the index to the new SRSW queue as the ID
297 * -] To write
298 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
299 * --]] this call may block, via repeated yield() calls
300 * --]] internally, writeSRMWQ does:
301 * ---]]] uses the writerID as index to get the SRSW queue for that writer
302 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
303 * -] To Read
304 * --]] reader calls readSRMWQ, passing the Q struc
305 * --]] this call may block, via repeated yield() calls
306 * --]] internally, readSRMWQ does:
307 * ---]]] gets saved index of last SRSW queue read from
308 * ---]]] increments index and gets indexed queue
309 * ---]]] does a non-blocking read of that queue
310 * ---]]] if gets something, saves index and returns that value
311 * ---]]] if gets null, then goes to next queue
312 * ---]]] if got null from all the queues then does yield() then tries again
313 *
314 *Note: "0" is used as the value null, so SRSW queues must only contain
315 * pointers, and cannot use 0 as a valid pointer value.
316 *
317 */
319 SRMWQueueStruc* makeSRMWQ()
320 { SRMWQueueStruc* retQ;
322 retQ = (SRMWQueueStruc *) PR__malloc( sizeof( SRMWQueueStruc ) );
324 retQ->numInternalQs = 0;
325 retQ->internalQsSz = 10;
326 retQ->internalQs = PR__malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *));
328 retQ->lastQReadFrom = 0;
330 return retQ;
331 }
333 /* ---]]] if needs more room, makes a larger writer-array
334 * ---]]] copies the old writer-array into the new
335 * ---]]] makes a new SRSW queue an puts it into the array
336 * ---]]] returns the index to the new SRSW queue as the ID
337 *
338 *NOTE: assuming all adds are completed before any writes or reads are
339 * performed.. otherwise, this needs to be re-done carefully, probably with
340 * a lock.
341 */
342 int addWriterToSRMWQ( SRMWQueueStruc* Q )
343 { int oldSz, i;
344 SRSWQueueStruc * *oldArray;
346 (Q->numInternalQs)++;
347 if( Q->numInternalQs >= Q->internalQsSz )
348 { //full, so make bigger
349 oldSz = Q->internalQsSz;
350 oldArray = Q->internalQs;
351 Q->internalQsSz *= 2;
352 Q->internalQs = PR__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
353 for( i = 0; i < oldSz; i++ )
354 { Q->internalQs[i] = oldArray[i];
355 }
356 PR__free( oldArray );
357 }
358 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
359 return Q->numInternalQs - 1;
360 }
363 /* ---]]] gets saved index of last SRSW queue read-from
364 * ---]]] increments index and gets indexed queue
365 * ---]]] does a non-blocking read of that queue
366 * ---]]] if gets something, saves index and returns that value
367 * ---]]] if gets null, then goes to next queue
368 * ---]]] if got null from all the queues then does yield() then tries again
369 */
370 void* readSRMWQ( SRMWQueueStruc* Q )
371 { SRSWQueueStruc *readQ;
372 void *readValue = 0;
373 int32 tries = 0;
374 int32 QToReadFrom = 0;
376 QToReadFrom = Q->lastQReadFrom;
378 while( TRUE )
379 { QToReadFrom++;
380 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
381 readQ = Q->internalQs[ QToReadFrom ];
382 readValue = readSRSWQ_NonBlocking( readQ );
384 if( readValue != 0 ) //got a value, return it
385 { Q->lastQReadFrom = QToReadFrom;
386 return readValue;
387 }
388 else //SRSW Q just read is empty
389 { //check if all queues have been tried
390 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
391 { tries++; //give a writer a chance to finish before yield
392 if( tries > SPINLOCK_TRIES ) pthread_yield();
393 }
394 }
395 }
396 }
399 /*
400 * ---]]] uses the writerID as index to get the SRSW queue for that writer
401 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
402 */
403 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
404 {
405 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
407 writeSRSWQ( in, Q->internalQs[ writerID ] );
408 }