Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > Queue_impl
view BlockingQueue.c @ 22:59781a4c9cf1
updated VMS__malloc to VMS_int__malloc
| author | Me@portablequad |
|---|---|
| date | Sun, 12 Feb 2012 01:43:39 -0800 |
| parents | b5ae7fbb1f01 |
| children | bd38feb38c80 |
line source
1 /*
2 * Copyright 2009 OpenSourceStewardshipFoundation.org
3 * Licensed under GNU General Public License version 2
4 *
5 * Author: seanhalle@yahoo.com
6 */
9 #include <stdio.h>
10 #include <errno.h>
11 #include <pthread.h>
12 #include <stdlib.h>
13 #include <sched.h>
15 #include "BlockingQueue.h"
17 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
19 #define SPINLOCK_TRIES 100000
23 //===========================================================================
24 // multi reader multi writer fast Q via CAS
25 #ifndef _GNU_SOURCE
26 #define _GNU_SOURCE
28 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
29 * or full
30 *It uses CAS because it's meant to have more than one reader and more than
31 * one writer.
32 */
34 CASQueueStruc* makeCASQ()
35 {
36 CASQueueStruc* retQ;
37 retQ = (CASQueueStruc *) VMS_int__malloc( sizeof( CASQueueStruc ) );
39 retQ->insertLock = UNLOCKED;
40 retQ->extractLock= UNLOCKED;
41 //TODO: check got pointer syntax right
42 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
43 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
44 retQ->endOfData = &(retQ->startOfData[1023]);
46 return retQ;
47 }
50 void* readCASQ( CASQueueStruc* Q )
51 { void *out = 0;
52 int tries = 0;
53 void **startOfData = Q->startOfData;
54 void **endOfData = Q->endOfData;
56 int gotLock = FALSE;
58 while( TRUE )
59 { //this intrinsic returns true if the lock held "UNLOCKED", in which
60 // case it now holds "LOCKED" -- if it already held "LOCKED", then
61 // gotLock is FALSE
62 gotLock =
63 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
64 //NOTE: checked assy, and it does lock correctly..
65 if( gotLock )
66 {
67 void **insertPos = Q->insertPos;
68 void **extractPos = Q->extractPos;
70 //if not empty -- extract just below insert when empty
71 if( insertPos - extractPos != 1 &&
72 !(extractPos == endOfData && insertPos == startOfData))
73 { //move before read
74 if( extractPos == endOfData ) //write new pos exactly once, correctly
75 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
76 } // other thread might read bad pos
77 else
78 { Q->extractPos++;
79 }
80 out = *(Q->extractPos);
81 Q->extractLock = UNLOCKED;
82 return out;
83 }
84 else //Q is empty
85 { Q->extractLock = UNLOCKED;//empty, so release lock for others
86 }
87 }
88 //Q is busy or empty
89 tries++;
90 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
91 }
92 }
94 void writeCASQ( void * in, CASQueueStruc* Q )
95 {
96 int tries = 0;
97 //TODO: need to make Q volatile? Want to do this Q in assembly!
98 //Have no idea what GCC's going to do to this code
99 void **startOfData = Q->startOfData;
100 void **endOfData = Q->endOfData;
102 int gotLock = FALSE;
104 while( TRUE )
105 { //this intrinsic returns true if the lock held "UNLOCKED", in which
106 // case it now holds "LOCKED" -- if it already held "LOCKED", then
107 // gotLock is FALSE
108 gotLock =
109 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
110 if( gotLock )
111 {
112 void **insertPos = Q->insertPos;
113 void **extractPos = Q->extractPos;
115 //check if room to insert.. can't use a count variable
116 // 'cause both insertor Thd and extractor Thd would write it
117 if( extractPos - insertPos != 1 &&
118 !(insertPos == endOfData && extractPos == startOfData))
119 { *(Q->insertPos) = in; //insert before move
120 if( insertPos == endOfData )
121 { Q->insertPos = startOfData;
122 }
123 else
124 { Q->insertPos++;
125 }
126 Q->insertLock = UNLOCKED;
127 return;
128 }
129 else //Q is full
130 { Q->insertLock = UNLOCKED;//full, so release lock for others
131 }
132 }
133 tries++;
134 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
135 }
136 }
138 #endif //_GNU_SOURCE
141 //===========================================================================
142 //Single reader single writer super fast Q.. no atomic instrs..
145 /*This is a blocking queue, but it uses no atomic instructions, just does
146 * yield() when empty or full
147 *
148 *It doesn't need any atomic instructions because only a single thread
149 * extracts and only a single thread inserts, and it has no locations that
150 * are written by both. It writes before moving and moves before reading,
151 * and never lets write position and read position be the same, so dis-
152 * synchrony can only ever cause an unnecessary call to yield(), never a
153 * wrong value (by monotonicity of movement of pointers, plus single writer
154 * to pointers, plus sequence of write before change pointer, plus
155 * assumptions that if thread A semantically writes X before Y, then thread
156 * B will see the writes in that order.)
157 */
159 SRSWQueueStruc* makeSRSWQ()
160 {
161 SRSWQueueStruc* retQ;
162 retQ = (SRSWQueueStruc *) VMS_int__malloc( sizeof( SRSWQueueStruc ) );
163 memset( retQ->startOfData, 0, 1024 * sizeof(void *) );
165 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
166 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
167 retQ->endOfData = &(retQ->startOfData[1023]);
169 return retQ;
170 }
172 void
173 freeSRSWQ( SRSWQueueStruc* Q )
174 {
175 VMS_int__free( Q );
176 }
178 void* readSRSWQ( SRSWQueueStruc* Q )
179 { void *out = 0;
180 int tries = 0;
182 while( TRUE )
183 {
184 if( Q->insertPos - Q->extractPos != 1 &&
185 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
186 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
187 else Q->extractPos++; //move before read
188 out = *(Q->extractPos);
189 return out;
190 }
191 //Q is empty
192 tries++;
193 if( tries > SPINLOCK_TRIES ) pthread_yield();
194 }
195 }
198 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
199 { void *out = 0;
200 int tries = 0;
202 while( TRUE )
203 {
204 if( Q->insertPos - Q->extractPos != 1 &&
205 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
206 { Q->extractPos++; //move before read
207 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
208 out = *(Q->extractPos);
209 return out;
210 }
211 //Q is empty
212 tries++;
213 if( tries > 10 ) return NULL; //long enough for writer to finish
214 }
215 }
218 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
219 {
220 int tries = 0;
222 while( TRUE )
223 {
224 if( Q->extractPos - Q->insertPos != 1 &&
225 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
226 { *(Q->insertPos) = in; //insert before move
227 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
228 else Q->insertPos++;
229 return;
230 }
231 //Q is full
232 tries++;
233 if( tries > SPINLOCK_TRIES ) pthread_yield();
234 }
235 }
239 //===========================================================================
240 //Single reader Multiple writer super fast Q.. no atomic instrs..
243 /*This is a blocking queue, but it uses no atomic instructions, just does
244 * yield() when empty or full
245 *
246 *It doesn't need any atomic instructions because only a single thread
247 * extracts and only a single thread inserts, and it has no locations that
248 * are written by both. It writes before moving and moves before reading,
249 * and never lets write position and read position be the same, so dis-
250 * synchrony can only ever cause an unnecessary call to yield(), never a
251 * wrong value (by monotonicity of movement of pointers, plus single writer
252 * to pointers, plus sequence of write before change pointer, plus
253 * assumptions that if thread A semantically writes X before Y, then thread
254 * B will see the writes in that order.)
255 *
256 *The multi-writer version is implemented as a hierarchy. Each writer has
257 * its own single-reader single-writer queue. The reader simply does a
258 * round-robin harvesting from them.
259 *
260 *A writer must first register itself with the queue, and receives an ID back
261 * It then uses that ID on each write operation.
262 *
263 *The implementation is:
264 *Physically:
265 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
266 * -] it also has read-pointer to the last queue a write was taken from.
267 *
268 *Action-Patterns:
269 * -] To add a writer
270 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
271 * --]] internally addWriterToQ does:
272 * ---]]] if needs more room, makes a larger writer-array
273 * ---]]] copies the old writer-array into the new
274 * ---]]] makes a new SRSW queue an puts it into the array
275 * ---]]] returns the index to the new SRSW queue as the ID
276 * -] To write
277 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
278 * --]] this call may block, via repeated yield() calls
279 * --]] internally, writeSRMWQ does:
280 * ---]]] uses the writerID as index to get the SRSW queue for that writer
281 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
282 * -] To Read
283 * --]] reader calls readSRMWQ, passing the Q struc
284 * --]] this call may block, via repeated yield() calls
285 * --]] internally, readSRMWQ does:
286 * ---]]] gets saved index of last SRSW queue read from
287 * ---]]] increments index and gets indexed queue
288 * ---]]] does a non-blocking read of that queue
289 * ---]]] if gets something, saves index and returns that value
290 * ---]]] if gets null, then goes to next queue
291 * ---]]] if got null from all the queues then does yield() then tries again
292 *
293 *Note: "0" is used as the value null, so SRSW queues must only contain
294 * pointers, and cannot use 0 as a valid pointer value.
295 *
296 */
298 SRMWQueueStruc* makeSRMWQ()
299 { SRMWQueueStruc* retQ;
301 retQ = (SRMWQueueStruc *) VMS_int__malloc( sizeof( SRMWQueueStruc ) );
303 retQ->numInternalQs = 0;
304 retQ->internalQsSz = 10;
305 retQ->internalQs = VMS_int__malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *));
307 retQ->lastQReadFrom = 0;
309 return retQ;
310 }
312 /* ---]]] if needs more room, makes a larger writer-array
313 * ---]]] copies the old writer-array into the new
314 * ---]]] makes a new SRSW queue an puts it into the array
315 * ---]]] returns the index to the new SRSW queue as the ID
316 *
317 *NOTE: assuming all adds are completed before any writes or reads are
318 * performed.. otherwise, this needs to be re-done carefully, probably with
319 * a lock.
320 */
321 int addWriterToSRMWQ( SRMWQueueStruc* Q )
322 { int oldSz, i;
323 SRSWQueueStruc * *oldArray;
325 (Q->numInternalQs)++;
326 if( Q->numInternalQs >= Q->internalQsSz )
327 { //full, so make bigger
328 oldSz = Q->internalQsSz;
329 oldArray = Q->internalQs;
330 Q->internalQsSz *= 2;
331 Q->internalQs = VMS_int__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
332 for( i = 0; i < oldSz; i++ )
333 { Q->internalQs[i] = oldArray[i];
334 }
335 VMS_int__free( oldArray );
336 }
337 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
338 return Q->numInternalQs - 1;
339 }
342 /* ---]]] gets saved index of last SRSW queue read-from
343 * ---]]] increments index and gets indexed queue
344 * ---]]] does a non-blocking read of that queue
345 * ---]]] if gets something, saves index and returns that value
346 * ---]]] if gets null, then goes to next queue
347 * ---]]] if got null from all the queues then does yield() then tries again
348 */
349 void* readSRMWQ( SRMWQueueStruc* Q )
350 { SRSWQueueStruc *readQ;
351 void *readValue = 0;
352 int tries = 0;
353 int QToReadFrom = 0;
355 QToReadFrom = Q->lastQReadFrom;
357 while( TRUE )
358 { QToReadFrom++;
359 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
360 readQ = Q->internalQs[ QToReadFrom ];
361 readValue = readSRSWQ_NonBlocking( readQ );
363 if( readValue != 0 ) //got a value, return it
364 { Q->lastQReadFrom = QToReadFrom;
365 return readValue;
366 }
367 else //SRSW Q just read is empty
368 { //check if all queues have been tried
369 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
370 { tries++; //give a writer a chance to finish before yield
371 if( tries > SPINLOCK_TRIES ) pthread_yield();
372 }
373 }
374 }
375 }
378 /*
379 * ---]]] uses the writerID as index to get the SRSW queue for that writer
380 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
381 */
382 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
383 {
384 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
386 writeSRSWQ( in, Q->internalQs[ writerID ] );
387 }
