view BlockingQueue.c @ 48:1ea30ca7093c

changed headers
author Sean Halle <seanhalle@yahoo.com>
date Tue, 23 Jul 2013 07:28:22 -0700
parents 67c7f5a0308b
children 083298a6f7b6
line source
1 /*
2 * Copyright 2009 OpenSourceResearchInstitute.org
3 * Licensed under GNU General Public License version 2
4 *
5 * Author: seanhalle@yahoo.com
6 */
9 #include <stdio.h>
10 #include <errno.h>
11 #include <pthread.h>
12 #include <stdlib.h>
13 #include <sched.h>
14 #include <string.h>
16 #include "BlockingQueue.h"
17 #include "PR__common_includes/Services_offered_by_PR/Memory_Handling/vmalloc__wrapper_library.h"
19 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
21 #define SPINLOCK_TRIES 100000
25 //===========================================================================
26 // multi reader multi writer fast Q via CAS
27 #ifndef _GNU_SOURCE
28 #define _GNU_SOURCE
30 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
31 * or full
32 *It uses CAS because it's meant to have more than one reader and more than
33 * one writer.
34 */
36 CASQueueStruc* makeCASQ()
37 {
38 CASQueueStruc* retQ;
39 retQ = (CASQueueStruc *) PR_WL__malloc( sizeof( CASQueueStruc ) );
41 retQ->insertLock = UNLOCKED;
42 retQ->extractLock= UNLOCKED;
44 retQ->extractPos = (volatile void**)&(retQ->startOfData[0]); //side by side == empty
45 retQ->insertPos = (volatile void**)&(retQ->startOfData[1]); // so start pos's have to be
46 retQ->endOfData = &(retQ->startOfData[1023]);
48 return retQ;
49 }
52 void* readCASQ( CASQueueStruc* Q )
53 { void *out = 0;
54 int32 tries = 0;
55 void **startOfData = Q->startOfData;
56 void **endOfData = Q->endOfData;
58 int32 gotLock = FALSE;
60 while( TRUE )
61 { //this intrinsic returns true if the lock held "UNLOCKED", in which
62 // case it now holds "LOCKED" -- if it already held "LOCKED", then
63 // gotLock is FALSE
64 gotLock =
65 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
66 //NOTE: checked assy, and it does lock correctly..
67 if( gotLock )
68 {
69 void **insertPos = (void **)Q->insertPos;
70 void **extractPos = (void **)Q->extractPos;
72 //if not empty -- extract just below insert when empty
73 if( insertPos - extractPos != 1 &&
74 !(extractPos == endOfData && insertPos == startOfData))
75 { //move before read
76 if( extractPos == endOfData ) //write new pos exactly once, correctly
77 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
78 } // other thread might read bad pos
79 else
80 { Q->extractPos++;
81 }
82 out = (void *) *(Q->extractPos);
83 Q->extractLock = UNLOCKED;
84 return out;
85 }
86 else //Q is empty
87 { Q->extractLock = UNLOCKED;//empty, so release lock for others
88 }
89 }
90 //Q is busy or empty
91 tries++;
92 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
93 }
94 }
96 void writeCASQ( void * in, CASQueueStruc* Q )
97 {
98 int32 tries = 0;
99 //TODO: need to make Q volatile? Want to do this Q in assembly!
100 //Have no idea what GCC's going to do to this code
101 void **startOfData = Q->startOfData;
102 void **endOfData = Q->endOfData;
104 int32 gotLock = FALSE;
106 while( TRUE )
107 { //this intrinsic returns true if the lock held "UNLOCKED", in which
108 // case it now holds "LOCKED" -- if it already held "LOCKED", then
109 // gotLock is FALSE
110 gotLock =
111 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
112 if( gotLock )
113 {
114 void **insertPos = (void **)Q->insertPos;
115 void **extractPos = (void **)Q->extractPos;
117 //check if room to insert.. can't use a count variable
118 // 'cause both insertor Thd and extractor Thd would write it
119 if( extractPos - insertPos != 1 &&
120 !(insertPos == endOfData && extractPos == startOfData))
121 { *(Q->insertPos) = in; //insert before move
122 if( insertPos == endOfData )
123 { Q->insertPos = startOfData;
124 }
125 else
126 { Q->insertPos++;
127 }
128 Q->insertLock = UNLOCKED;
129 return;
130 }
131 else //Q is full
132 { Q->insertLock = UNLOCKED;//full, so release lock for others
133 }
134 }
135 tries++;
136 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
137 }
138 }
140 #endif //_GNU_SOURCE
143 //===========================================================================
144 //Single reader single writer super fast Q.. no atomic instrs..
147 /*This is a blocking queue, but it uses no atomic instructions, just does
148 * yield() when empty or full
149 *
150 *It doesn't need any atomic instructions because only a single thread
151 * extracts and only a single thread inserts, and it has no locations that
152 * are written by both. It writes before moving and moves before reading,
153 * and never lets write position and read position be the same, so dis-
154 * synchrony can only ever cause an unnecessary call to yield(), never a
155 * wrong value (by monotonicity of movement of pointers, plus single writer
156 * to pointers, plus sequence of write before change pointer, plus
157 * assumptions that if thread A semantically writes X before Y, then thread
158 * B will see the writes in that order.)
159 */
161 SRSWQueueStruc* makeSRSWQ()
162 {
163 SRSWQueueStruc* retQ;
164 retQ = (SRSWQueueStruc *) PR_WL__malloc( sizeof( SRSWQueueStruc ) );
165 memset( retQ->startOfData, 0, 1024 * sizeof(void *) );
167 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
168 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
169 retQ->endOfData = &(retQ->startOfData[1023]);
171 return retQ;
172 }
174 void
175 freeSRSWQ( SRSWQueueStruc* Q )
176 {
177 PR__free( Q );
178 }
180 void* readSRSWQ( SRSWQueueStruc* Q )
181 { void *out = 0;
182 int32 tries = 0;
184 while( TRUE )
185 {
186 if( Q->insertPos - Q->extractPos != 1 &&
187 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
188 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
189 else Q->extractPos++; //move before read
190 out = *(Q->extractPos);
191 return out;
192 }
193 //Q is empty
194 tries++;
195 if( tries > SPINLOCK_TRIES ) pthread_yield();
196 }
197 }
200 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
201 { void *out = 0;
202 int32 tries = 0;
204 while( TRUE )
205 {
206 if( Q->insertPos - Q->extractPos != 1 &&
207 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
208 { Q->extractPos++; //move before read
209 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
210 out = *(Q->extractPos);
211 return out;
212 }
213 //Q is empty
214 tries++;
215 if( tries > 10 ) return NULL; //long enough for writer to finish
216 }
217 }
220 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
221 {
222 int32 tries = 0;
224 while( TRUE )
225 {
226 if( Q->extractPos - Q->insertPos != 1 &&
227 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
228 { *(Q->insertPos) = in; //insert before move
229 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
230 else Q->insertPos++;
231 return;
232 }
233 //Q is full
234 tries++;
235 if( tries > SPINLOCK_TRIES ) pthread_yield();
236 }
237 }
241 //===========================================================================
242 //Single reader Multiple writer super fast Q.. no atomic instrs..
245 /*This is a blocking queue, but it uses no atomic instructions, just does
246 * yield() when empty or full
247 *
248 *It doesn't need any atomic instructions because only a single thread
249 * extracts and only a single thread inserts, and it has no locations that
250 * are written by both. It writes before moving and moves before reading,
251 * and never lets write position and read position be the same, so dis-
252 * synchrony can only ever cause an unnecessary call to yield(), never a
253 * wrong value (by monotonicity of movement of pointers, plus single writer
254 * to pointers, plus sequence of write before change pointer, plus
255 * assumptions that if thread A semantically writes X before Y, then thread
256 * B will see the writes in that order.)
257 *
258 *The multi-writer version is implemented as a hierarchy. Each writer has
259 * its own single-reader single-writer queue. The reader simply does a
260 * round-robin harvesting from them.
261 *
262 *A writer must first register itself with the queue, and receives an ID back
263 * It then uses that ID on each write operation.
264 *
265 *The implementation is:
266 *Physically:
267 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
268 * -] it also has read-pointer to the last queue a write was taken from.
269 *
270 *Action-Patterns:
271 * -] To add a writer
272 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
273 * --]] internally addWriterToQ does:
274 * ---]]] if needs more room, makes a larger writer-array
275 * ---]]] copies the old writer-array into the new
276 * ---]]] makes a new SRSW queue an puts it into the array
277 * ---]]] returns the index to the new SRSW queue as the ID
278 * -] To write
279 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
280 * --]] this call may block, via repeated yield() calls
281 * --]] internally, writeSRMWQ does:
282 * ---]]] uses the writerID as index to get the SRSW queue for that writer
283 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
284 * -] To Read
285 * --]] reader calls readSRMWQ, passing the Q struc
286 * --]] this call may block, via repeated yield() calls
287 * --]] internally, readSRMWQ does:
288 * ---]]] gets saved index of last SRSW queue read from
289 * ---]]] increments index and gets indexed queue
290 * ---]]] does a non-blocking read of that queue
291 * ---]]] if gets something, saves index and returns that value
292 * ---]]] if gets null, then goes to next queue
293 * ---]]] if got null from all the queues then does yield() then tries again
294 *
295 *Note: "0" is used as the value null, so SRSW queues must only contain
296 * pointers, and cannot use 0 as a valid pointer value.
297 *
298 */
300 SRMWQueueStruc* makeSRMWQ()
301 { SRMWQueueStruc* retQ;
303 retQ = (SRMWQueueStruc *) PR_WL__malloc( sizeof( SRMWQueueStruc ) );
305 retQ->numInternalQs = 0;
306 retQ->internalQsSz = 10;
307 retQ->internalQs = PR_WL__malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *));
309 retQ->lastQReadFrom = 0;
311 return retQ;
312 }
314 /* ---]]] if needs more room, makes a larger writer-array
315 * ---]]] copies the old writer-array into the new
316 * ---]]] makes a new SRSW queue an puts it into the array
317 * ---]]] returns the index to the new SRSW queue as the ID
318 *
319 *NOTE: assuming all adds are completed before any writes or reads are
320 * performed.. otherwise, this needs to be re-done carefully, probably with
321 * a lock.
322 */
323 int addWriterToSRMWQ( SRMWQueueStruc* Q )
324 { int oldSz, i;
325 SRSWQueueStruc * *oldArray;
327 (Q->numInternalQs)++;
328 if( Q->numInternalQs >= Q->internalQsSz )
329 { //full, so make bigger
330 oldSz = Q->internalQsSz;
331 oldArray = Q->internalQs;
332 Q->internalQsSz *= 2;
333 Q->internalQs = PR_WL__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
334 for( i = 0; i < oldSz; i++ )
335 { Q->internalQs[i] = oldArray[i];
336 }
337 PR__free( oldArray );
338 }
339 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
340 return Q->numInternalQs - 1;
341 }
344 /* ---]]] gets saved index of last SRSW queue read-from
345 * ---]]] increments index and gets indexed queue
346 * ---]]] does a non-blocking read of that queue
347 * ---]]] if gets something, saves index and returns that value
348 * ---]]] if gets null, then goes to next queue
349 * ---]]] if got null from all the queues then does yield() then tries again
350 */
351 void* readSRMWQ( SRMWQueueStruc* Q )
352 { SRSWQueueStruc *readQ;
353 void *readValue = 0;
354 int32 tries = 0;
355 int32 QToReadFrom = 0;
357 QToReadFrom = Q->lastQReadFrom;
359 while( TRUE )
360 { QToReadFrom++;
361 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
362 readQ = Q->internalQs[ QToReadFrom ];
363 readValue = readSRSWQ_NonBlocking( readQ );
365 if( readValue != 0 ) //got a value, return it
366 { Q->lastQReadFrom = QToReadFrom;
367 return readValue;
368 }
369 else //SRSW Q just read is empty
370 { //check if all queues have been tried
371 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
372 { tries++; //give a writer a chance to finish before yield
373 if( tries > SPINLOCK_TRIES ) pthread_yield();
374 }
375 }
376 }
377 }
380 /*
381 * ---]]] uses the writerID as index to get the SRSW queue for that writer
382 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
383 */
384 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
385 {
386 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
388 writeSRSWQ( in, Q->internalQs[ writerID ] );
389 }