view BlockingQueue.c @ 30:e59d39874274

Compiles, but does not yet run properly
author Some Random Person <seanhalle@yahoo.com>
date Tue, 13 Mar 2012 10:07:00 -0700
parents bd38feb38c80
children b66352de717e d01d48b023ca
line source
1 /*
2 * Copyright 2009 OpenSourceStewardshipFoundation.org
3 * Licensed under GNU General Public License version 2
4 *
5 * Author: seanhalle@yahoo.com
6 */
9 #include <stdio.h>
10 #include <errno.h>
11 #include <pthread.h>
12 #include <stdlib.h>
13 #include <sched.h>
14 #include <string.h>
16 #include "BlockingQueue.h"
18 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
20 #define SPINLOCK_TRIES 100000
24 //===========================================================================
25 // multi reader multi writer fast Q via CAS
26 #ifndef _GNU_SOURCE
27 #define _GNU_SOURCE
29 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
30 * or full
31 *It uses CAS because it's meant to have more than one reader and more than
32 * one writer.
33 */
35 CASQueueStruc* makeCASQ()
36 {
37 CASQueueStruc* retQ;
38 retQ = (CASQueueStruc *) VMS_WL__malloc( sizeof( CASQueueStruc ) );
40 retQ->insertLock = UNLOCKED;
41 retQ->extractLock= UNLOCKED;
43 retQ->extractPos = (volatile void**)&(retQ->startOfData[0]); //side by side == empty
44 retQ->insertPos = (volatile void**)&(retQ->startOfData[1]); // so start pos's have to be
45 retQ->endOfData = &(retQ->startOfData[1023]);
47 return retQ;
48 }
51 void* readCASQ( CASQueueStruc* Q )
52 { void *out = 0;
53 int tries = 0;
54 void **startOfData = Q->startOfData;
55 void **endOfData = Q->endOfData;
57 int gotLock = FALSE;
59 while( TRUE )
60 { //this intrinsic returns true if the lock held "UNLOCKED", in which
61 // case it now holds "LOCKED" -- if it already held "LOCKED", then
62 // gotLock is FALSE
63 gotLock =
64 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
65 //NOTE: checked assy, and it does lock correctly..
66 if( gotLock )
67 {
68 void **insertPos = (void **)Q->insertPos;
69 void **extractPos = (void **)Q->extractPos;
71 //if not empty -- extract just below insert when empty
72 if( insertPos - extractPos != 1 &&
73 !(extractPos == endOfData && insertPos == startOfData))
74 { //move before read
75 if( extractPos == endOfData ) //write new pos exactly once, correctly
76 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
77 } // other thread might read bad pos
78 else
79 { Q->extractPos++;
80 }
81 out = (void *) *(Q->extractPos);
82 Q->extractLock = UNLOCKED;
83 return out;
84 }
85 else //Q is empty
86 { Q->extractLock = UNLOCKED;//empty, so release lock for others
87 }
88 }
89 //Q is busy or empty
90 tries++;
91 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
92 }
93 }
95 void writeCASQ( void * in, CASQueueStruc* Q )
96 {
97 int tries = 0;
98 //TODO: need to make Q volatile? Want to do this Q in assembly!
99 //Have no idea what GCC's going to do to this code
100 void **startOfData = Q->startOfData;
101 void **endOfData = Q->endOfData;
103 int gotLock = FALSE;
105 while( TRUE )
106 { //this intrinsic returns true if the lock held "UNLOCKED", in which
107 // case it now holds "LOCKED" -- if it already held "LOCKED", then
108 // gotLock is FALSE
109 gotLock =
110 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
111 if( gotLock )
112 {
113 void **insertPos = (void **)Q->insertPos;
114 void **extractPos = (void **)Q->extractPos;
116 //check if room to insert.. can't use a count variable
117 // 'cause both insertor Thd and extractor Thd would write it
118 if( extractPos - insertPos != 1 &&
119 !(insertPos == endOfData && extractPos == startOfData))
120 { *(Q->insertPos) = in; //insert before move
121 if( insertPos == endOfData )
122 { Q->insertPos = startOfData;
123 }
124 else
125 { Q->insertPos++;
126 }
127 Q->insertLock = UNLOCKED;
128 return;
129 }
130 else //Q is full
131 { Q->insertLock = UNLOCKED;//full, so release lock for others
132 }
133 }
134 tries++;
135 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
136 }
137 }
139 #endif //_GNU_SOURCE
142 //===========================================================================
143 //Single reader single writer super fast Q.. no atomic instrs..
146 /*This is a blocking queue, but it uses no atomic instructions, just does
147 * yield() when empty or full
148 *
149 *It doesn't need any atomic instructions because only a single thread
150 * extracts and only a single thread inserts, and it has no locations that
151 * are written by both. It writes before moving and moves before reading,
152 * and never lets write position and read position be the same, so dis-
153 * synchrony can only ever cause an unnecessary call to yield(), never a
154 * wrong value (by monotonicity of movement of pointers, plus single writer
155 * to pointers, plus sequence of write before change pointer, plus
156 * assumptions that if thread A semantically writes X before Y, then thread
157 * B will see the writes in that order.)
158 */
160 SRSWQueueStruc* makeSRSWQ()
161 {
162 SRSWQueueStruc* retQ;
163 retQ = (SRSWQueueStruc *) VMS_WL__malloc( sizeof( SRSWQueueStruc ) );
164 memset( retQ->startOfData, 0, 1024 * sizeof(void *) );
166 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
167 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
168 retQ->endOfData = &(retQ->startOfData[1023]);
170 return retQ;
171 }
173 void
174 freeSRSWQ( SRSWQueueStruc* Q )
175 {
176 VMS_int__free( Q );
177 }
179 void* readSRSWQ( SRSWQueueStruc* Q )
180 { void *out = 0;
181 int tries = 0;
183 while( TRUE )
184 {
185 if( Q->insertPos - Q->extractPos != 1 &&
186 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
187 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
188 else Q->extractPos++; //move before read
189 out = *(Q->extractPos);
190 return out;
191 }
192 //Q is empty
193 tries++;
194 if( tries > SPINLOCK_TRIES ) pthread_yield();
195 }
196 }
199 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
200 { void *out = 0;
201 int tries = 0;
203 while( TRUE )
204 {
205 if( Q->insertPos - Q->extractPos != 1 &&
206 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
207 { Q->extractPos++; //move before read
208 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
209 out = *(Q->extractPos);
210 return out;
211 }
212 //Q is empty
213 tries++;
214 if( tries > 10 ) return NULL; //long enough for writer to finish
215 }
216 }
219 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
220 {
221 int tries = 0;
223 while( TRUE )
224 {
225 if( Q->extractPos - Q->insertPos != 1 &&
226 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
227 { *(Q->insertPos) = in; //insert before move
228 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
229 else Q->insertPos++;
230 return;
231 }
232 //Q is full
233 tries++;
234 if( tries > SPINLOCK_TRIES ) pthread_yield();
235 }
236 }
240 //===========================================================================
241 //Single reader Multiple writer super fast Q.. no atomic instrs..
244 /*This is a blocking queue, but it uses no atomic instructions, just does
245 * yield() when empty or full
246 *
247 *It doesn't need any atomic instructions because only a single thread
248 * extracts and only a single thread inserts, and it has no locations that
249 * are written by both. It writes before moving and moves before reading,
250 * and never lets write position and read position be the same, so dis-
251 * synchrony can only ever cause an unnecessary call to yield(), never a
252 * wrong value (by monotonicity of movement of pointers, plus single writer
253 * to pointers, plus sequence of write before change pointer, plus
254 * assumptions that if thread A semantically writes X before Y, then thread
255 * B will see the writes in that order.)
256 *
257 *The multi-writer version is implemented as a hierarchy. Each writer has
258 * its own single-reader single-writer queue. The reader simply does a
259 * round-robin harvesting from them.
260 *
261 *A writer must first register itself with the queue, and receives an ID back
262 * It then uses that ID on each write operation.
263 *
264 *The implementation is:
265 *Physically:
266 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
267 * -] it also has read-pointer to the last queue a write was taken from.
268 *
269 *Action-Patterns:
270 * -] To add a writer
271 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
272 * --]] internally addWriterToQ does:
273 * ---]]] if needs more room, makes a larger writer-array
274 * ---]]] copies the old writer-array into the new
275 * ---]]] makes a new SRSW queue an puts it into the array
276 * ---]]] returns the index to the new SRSW queue as the ID
277 * -] To write
278 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
279 * --]] this call may block, via repeated yield() calls
280 * --]] internally, writeSRMWQ does:
281 * ---]]] uses the writerID as index to get the SRSW queue for that writer
282 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
283 * -] To Read
284 * --]] reader calls readSRMWQ, passing the Q struc
285 * --]] this call may block, via repeated yield() calls
286 * --]] internally, readSRMWQ does:
287 * ---]]] gets saved index of last SRSW queue read from
288 * ---]]] increments index and gets indexed queue
289 * ---]]] does a non-blocking read of that queue
290 * ---]]] if gets something, saves index and returns that value
291 * ---]]] if gets null, then goes to next queue
292 * ---]]] if got null from all the queues then does yield() then tries again
293 *
294 *Note: "0" is used as the value null, so SRSW queues must only contain
295 * pointers, and cannot use 0 as a valid pointer value.
296 *
297 */
299 SRMWQueueStruc* makeSRMWQ()
300 { SRMWQueueStruc* retQ;
302 retQ = (SRMWQueueStruc *) VMS_WL__malloc( sizeof( SRMWQueueStruc ) );
304 retQ->numInternalQs = 0;
305 retQ->internalQsSz = 10;
306 retQ->internalQs = VMS_WL__malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *));
308 retQ->lastQReadFrom = 0;
310 return retQ;
311 }
313 /* ---]]] if needs more room, makes a larger writer-array
314 * ---]]] copies the old writer-array into the new
315 * ---]]] makes a new SRSW queue an puts it into the array
316 * ---]]] returns the index to the new SRSW queue as the ID
317 *
318 *NOTE: assuming all adds are completed before any writes or reads are
319 * performed.. otherwise, this needs to be re-done carefully, probably with
320 * a lock.
321 */
322 int addWriterToSRMWQ( SRMWQueueStruc* Q )
323 { int oldSz, i;
324 SRSWQueueStruc * *oldArray;
326 (Q->numInternalQs)++;
327 if( Q->numInternalQs >= Q->internalQsSz )
328 { //full, so make bigger
329 oldSz = Q->internalQsSz;
330 oldArray = Q->internalQs;
331 Q->internalQsSz *= 2;
332 Q->internalQs = VMS_WL__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
333 for( i = 0; i < oldSz; i++ )
334 { Q->internalQs[i] = oldArray[i];
335 }
336 VMS_int__free( oldArray );
337 }
338 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
339 return Q->numInternalQs - 1;
340 }
343 /* ---]]] gets saved index of last SRSW queue read-from
344 * ---]]] increments index and gets indexed queue
345 * ---]]] does a non-blocking read of that queue
346 * ---]]] if gets something, saves index and returns that value
347 * ---]]] if gets null, then goes to next queue
348 * ---]]] if got null from all the queues then does yield() then tries again
349 */
350 void* readSRMWQ( SRMWQueueStruc* Q )
351 { SRSWQueueStruc *readQ;
352 void *readValue = 0;
353 int tries = 0;
354 int QToReadFrom = 0;
356 QToReadFrom = Q->lastQReadFrom;
358 while( TRUE )
359 { QToReadFrom++;
360 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
361 readQ = Q->internalQs[ QToReadFrom ];
362 readValue = readSRSWQ_NonBlocking( readQ );
364 if( readValue != 0 ) //got a value, return it
365 { Q->lastQReadFrom = QToReadFrom;
366 return readValue;
367 }
368 else //SRSW Q just read is empty
369 { //check if all queues have been tried
370 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
371 { tries++; //give a writer a chance to finish before yield
372 if( tries > SPINLOCK_TRIES ) pthread_yield();
373 }
374 }
375 }
376 }
379 /*
380 * ---]]] uses the writerID as index to get the SRSW queue for that writer
381 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
382 */
383 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
384 {
385 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
387 writeSRSWQ( in, Q->internalQs[ writerID ] );
388 }