Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > Queue_impl
view BlockingQueue.c @ 18:7c9e00ff1bf4
fixed queue enlarging function
| author | Merten Sach <msach@mailbox.tu-berlin.de> |
|---|---|
| date | Wed, 22 Jun 2011 18:49:17 +0200 |
| parents | 3562716ebdbd |
| children | 677afc259a58 |
line source
1 /*
2 * Copyright 2009 OpenSourceStewardshipFoundation.org
3 * Licensed under GNU General Public License version 2
4 *
5 * Author: seanhalle@yahoo.com
6 */
9 #include <stdio.h>
10 #include <errno.h>
11 #include <pthread.h>
12 #include <stdlib.h>
13 #include <sched.h>
15 #include "BlockingQueue.h"
16 #include "../vmalloc.h"
18 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
20 #define SPINLOCK_TRIES 100000
22 //===========================================================================
23 //Normal pthread Q
25 PThdQueueStruc* makePThdQ()
26 {
27 PThdQueueStruc* retQ;
28 int retCode;
29 retQ = (PThdQueueStruc *) VMS__malloc( sizeof( PThdQueueStruc ) );
32 retCode =
33 pthread_mutex_init( &retQ->mutex_t, NULL);
34 if(retCode){perror("Error in creating mutex:"); exit(1);}
36 retCode = pthread_cond_init ( &retQ->cond_w_t, NULL);
37 if(retCode){perror("Error in creating cond_var:"); exit(1);}
39 retCode = pthread_cond_init ( &retQ->cond_r_t, NULL);
40 if(retCode){perror("Error in creating cond_var:"); exit(1);}
42 retQ->count = 0;
43 retQ->readPos = 0;
44 retQ->writePos = 0;
45 retQ->w_empty = 0;
46 retQ->w_full = 0;
48 return retQ;
49 }
51 void * readPThdQ( PThdQueueStruc *Q )
52 { void *ret;
53 int retCode, wt;
54 pthread_mutex_lock( &Q->mutex_t );
55 {
56 while( Q -> count == 0 )
57 { Q -> w_empty = 1;
58 retCode =
59 pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
60 if( retCode ){ perror("Thread wait error: "); exit(1); }
61 }
62 Q -> w_empty = 0;
63 Q -> count -= 1;
64 ret = Q->data[ Q->readPos ];
65 INC( Q->readPos );
66 wt = Q -> w_full;
67 Q -> w_full = 0;
68 }
69 pthread_mutex_unlock( &Q->mutex_t );
70 if (wt)
71 pthread_cond_signal( &Q->cond_w_t );
73 //printf("Q out: %d\n", ret);
74 return( ret );
75 }
77 void writePThdQ( void * in, PThdQueueStruc* Q )
78 {
79 int status, wt;
80 //printf("Q in: %d\n", in);
82 pthread_mutex_lock( &Q->mutex_t );
83 {
84 while( Q->count >= 1024 )
85 {
86 Q -> w_full = 1;
87 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t );
88 if (status != 0)
89 { perror("Thread wait error: ");
90 exit(1);
91 }
92 }
94 Q -> w_full = 0;
95 Q->count += 1;
96 Q->data[ Q->writePos ] = in;
97 INC( Q->writePos );
98 wt = Q -> w_empty;
99 Q -> w_empty = 0;
100 }
102 pthread_mutex_unlock( &Q->mutex_t );
103 if( wt ) pthread_cond_signal( &Q->cond_r_t );
104 }
107 //===========================================================================
108 // multi reader multi writer fast Q via CAS
109 #ifndef _GNU_SOURCE
110 #define _GNU_SOURCE
112 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
113 * or full
114 *It uses CAS because it's meant to have more than one reader and more than
115 * one writer.
116 */
118 CASQueueStruc* makeCASQ()
119 {
120 CASQueueStruc* retQ;
121 retQ = (CASQueueStruc *) VMS__malloc( sizeof( CASQueueStruc ) );
123 retQ->insertLock = UNLOCKED;
124 retQ->extractLock= UNLOCKED;
125 //TODO: check got pointer syntax right
126 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
127 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
128 retQ->endOfData = &(retQ->startOfData[1023]);
130 return retQ;
131 }
134 void* readCASQ( CASQueueStruc* Q )
135 { void *out = 0;
136 int tries = 0;
137 void **startOfData = Q->startOfData;
138 void **endOfData = Q->endOfData;
140 int gotLock = FALSE;
142 while( TRUE )
143 { //this intrinsic returns true if the lock held "UNLOCKED", in which
144 // case it now holds "LOCKED" -- if it already held "LOCKED", then
145 // gotLock is FALSE
146 gotLock =
147 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
148 //NOTE: checked assy, and it does lock correctly..
149 if( gotLock )
150 {
151 void **insertPos = Q->insertPos;
152 void **extractPos = Q->extractPos;
154 //if not empty -- extract just below insert when empty
155 if( insertPos - extractPos != 1 &&
156 !(extractPos == endOfData && insertPos == startOfData))
157 { //move before read
158 if( extractPos == endOfData ) //write new pos exactly once, correctly
159 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
160 } // other thread might read bad pos
161 else
162 { Q->extractPos++;
163 }
164 out = *(Q->extractPos);
165 Q->extractLock = UNLOCKED;
166 return out;
167 }
168 else //Q is empty
169 { Q->extractLock = UNLOCKED;//empty, so release lock for others
170 }
171 }
172 //Q is busy or empty
173 tries++;
174 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
175 }
176 }
178 void writeCASQ( void * in, CASQueueStruc* Q )
179 {
180 int tries = 0;
181 //TODO: need to make Q volatile? Want to do this Q in assembly!
182 //Have no idea what GCC's going to do to this code
183 void **startOfData = Q->startOfData;
184 void **endOfData = Q->endOfData;
186 int gotLock = FALSE;
188 while( TRUE )
189 { //this intrinsic returns true if the lock held "UNLOCKED", in which
190 // case it now holds "LOCKED" -- if it already held "LOCKED", then
191 // gotLock is FALSE
192 gotLock =
193 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
194 if( gotLock )
195 {
196 void **insertPos = Q->insertPos;
197 void **extractPos = Q->extractPos;
199 //check if room to insert.. can't use a count variable
200 // 'cause both insertor Thd and extractor Thd would write it
201 if( extractPos - insertPos != 1 &&
202 !(insertPos == endOfData && extractPos == startOfData))
203 { *(Q->insertPos) = in; //insert before move
204 if( insertPos == endOfData )
205 { Q->insertPos = startOfData;
206 }
207 else
208 { Q->insertPos++;
209 }
210 Q->insertLock = UNLOCKED;
211 return;
212 }
213 else //Q is full
214 { Q->insertLock = UNLOCKED;//full, so release lock for others
215 }
216 }
217 tries++;
218 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
219 }
220 }
222 #endif //_GNU_SOURCE
225 //===========================================================================
226 //Single reader single writer super fast Q.. no atomic instrs..
229 /*This is a blocking queue, but it uses no atomic instructions, just does
230 * yield() when empty or full
231 *
232 *It doesn't need any atomic instructions because only a single thread
233 * extracts and only a single thread inserts, and it has no locations that
234 * are written by both. It writes before moving and moves before reading,
235 * and never lets write position and read position be the same, so dis-
236 * synchrony can only ever cause an unnecessary call to yield(), never a
237 * wrong value (by monotonicity of movement of pointers, plus single writer
238 * to pointers, plus sequence of write before change pointer, plus
239 * assumptions that if thread A semantically writes X before Y, then thread
240 * B will see the writes in that order.)
241 */
243 SRSWQueueStruc* makeSRSWQ()
244 {
245 SRSWQueueStruc* retQ;
246 retQ = (SRSWQueueStruc *) VMS__malloc( sizeof( SRSWQueueStruc ) );
248 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
249 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
250 retQ->endOfData = &(retQ->startOfData[1023]);
252 return retQ;
253 }
255 void
256 freeSRSWQ( SRSWQueueStruc* Q )
257 {
258 VMS__free( Q );
259 }
261 void* readSRSWQ( SRSWQueueStruc* Q )
262 { void *out = 0;
263 int tries = 0;
265 while( TRUE )
266 {
267 if( Q->insertPos - Q->extractPos != 1 &&
268 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
269 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
270 else Q->extractPos++; //move before read
271 out = *(Q->extractPos);
272 return out;
273 }
274 //Q is empty
275 tries++;
276 if( tries > SPINLOCK_TRIES ) pthread_yield();
277 }
278 }
281 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
282 { void *out = 0;
283 int tries = 0;
285 while( TRUE )
286 {
287 if( Q->insertPos - Q->extractPos != 1 &&
288 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
289 { Q->extractPos++; //move before read
290 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
291 out = *(Q->extractPos);
292 return out;
293 }
294 //Q is empty
295 tries++;
296 if( tries > 10 ) return NULL; //long enough for writer to finish
297 }
298 }
300 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
301 {
302 int tries = 0;
304 while( TRUE )
305 {
306 if( Q->extractPos - Q->insertPos != 1 &&
307 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
308 { *(Q->insertPos) = in; //insert before move
309 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
310 else Q->insertPos++;
311 return;
312 }
313 //Q is full
314 tries++;
315 if( tries > SPINLOCK_TRIES ) pthread_yield();
316 }
317 }
321 //===========================================================================
322 //Single reader Multiple writer super fast Q.. no atomic instrs..
325 /*This is a blocking queue, but it uses no atomic instructions, just does
326 * yield() when empty or full
327 *
328 *It doesn't need any atomic instructions because only a single thread
329 * extracts and only a single thread inserts, and it has no locations that
330 * are written by both. It writes before moving and moves before reading,
331 * and never lets write position and read position be the same, so dis-
332 * synchrony can only ever cause an unnecessary call to yield(), never a
333 * wrong value (by monotonicity of movement of pointers, plus single writer
334 * to pointers, plus sequence of write before change pointer, plus
335 * assumptions that if thread A semantically writes X before Y, then thread
336 * B will see the writes in that order.)
337 *
338 *The multi-writer version is implemented as a hierarchy. Each writer has
339 * its own single-reader single-writer queue. The reader simply does a
340 * round-robin harvesting from them.
341 *
342 *A writer must first register itself with the queue, and receives an ID back
343 * It then uses that ID on each write operation.
344 *
345 *The implementation is:
346 *Physically:
347 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
348 * -] it also has read-pointer to the last queue a write was taken from.
349 *
350 *Action-Patterns:
351 * -] To add a writer
352 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
353 * --]] internally addWriterToQ does:
354 * ---]]] if needs more room, makes a larger writer-array
355 * ---]]] copies the old writer-array into the new
356 * ---]]] makes a new SRSW queue an puts it into the array
357 * ---]]] returns the index to the new SRSW queue as the ID
358 * -] To write
359 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
360 * --]] this call may block, via repeated yield() calls
361 * --]] internally, writeSRMWQ does:
362 * ---]]] uses the writerID as index to get the SRSW queue for that writer
363 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
364 * -] To Read
365 * --]] reader calls readSRMWQ, passing the Q struc
366 * --]] this call may block, via repeated yield() calls
367 * --]] internally, readSRMWQ does:
368 * ---]]] gets saved index of last SRSW queue read from
369 * ---]]] increments index and gets indexed queue
370 * ---]]] does a non-blocking read of that queue
371 * ---]]] if gets something, saves index and returns that value
372 * ---]]] if gets null, then goes to next queue
373 * ---]]] if got null from all the queues then does yield() then tries again
374 *
375 *Note: "0" is used as the value null, so SRSW queues must only contain
376 * pointers, and cannot use 0 as a valid pointer value.
377 *
378 */
380 SRMWQueueStruc* makeSRMWQ()
381 { SRMWQueueStruc* retQ;
383 retQ = (SRMWQueueStruc *) VMS__malloc( sizeof( SRMWQueueStruc ) );
385 retQ->numInternalQs = 0;
386 retQ->internalQsSz = 10;
387 retQ->internalQs = VMS__malloc( retQ->internalQsSz *
388 sizeof(SRSWQueueStruc *) );
390 retQ->lastQReadFrom = 0;
392 return retQ;
393 }
395 /* ---]]] if needs more room, makes a larger writer-array
396 * ---]]] copies the old writer-array into the new
397 * ---]]] makes a new SRSW queue an puts it into the array
398 * ---]]] returns the index to the new SRSW queue as the ID
399 *
400 *NOTE: assuming all adds are completed before any writes or reads are
401 * performed.. otherwise, this needs to be re-done carefully, probably with
402 * a lock.
403 */
404 int addWriterToSRMWQ( SRMWQueueStruc* Q )
405 { int oldSz, i;
406 SRSWQueueStruc * *oldArray;
408 (Q->numInternalQs)++;
409 if( Q->numInternalQs >= Q->internalQsSz )
410 { //full, so make bigger
411 oldSz = Q->internalQsSz;
412 oldArray = Q->internalQs;
413 Q->internalQsSz *= 2;
414 Q->internalQs = VMS__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
415 for( i = 0; i < oldSz; i++ )
416 { Q->internalQs[i] = oldArray[i];
417 }
418 VMS__free( oldArray );
419 }
420 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
421 return Q->numInternalQs - 1;
422 }
425 /* ---]]] gets saved index of last SRSW queue read-from
426 * ---]]] increments index and gets indexed queue
427 * ---]]] does a non-blocking read of that queue
428 * ---]]] if gets something, saves index and returns that value
429 * ---]]] if gets null, then goes to next queue
430 * ---]]] if got null from all the queues then does yield() then tries again
431 */
432 void* readSRMWQ( SRMWQueueStruc* Q )
433 { SRSWQueueStruc *readQ;
434 void *readValue = 0;
435 int tries = 0;
436 int QToReadFrom = 0;
438 QToReadFrom = Q->lastQReadFrom;
440 while( TRUE )
441 { QToReadFrom++;
442 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
443 readQ = Q->internalQs[ QToReadFrom ];
444 readValue = readSRSWQ_NonBlocking( readQ );
446 if( readValue != 0 ) //got a value, return it
447 { Q->lastQReadFrom = QToReadFrom;
448 return readValue;
449 }
450 else //SRSW Q just read is empty
451 { //check if all queues have been tried
452 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
453 { tries++; //give a writer a chance to finish before yield
454 if( tries > SPINLOCK_TRIES ) pthread_yield();
455 }
456 }
457 }
458 }
461 /*
462 * ---]]] uses the writerID as index to get the SRSW queue for that writer
463 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
464 */
465 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
466 {
467 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
469 writeSRSWQ( in, Q->internalQs[ writerID ] );
470 }
