comparison BlockingQueue.c @ 1:81f6687d52d1

Correct SRSW queue and correst CAS queue
author Me
date Fri, 18 Jun 2010 17:49:38 -0700
parents 85af604dee9b
children 8abcca1590b8
comparison
equal deleted inserted replaced
0:329d0de8e9a4 1:949ad36d2c6c
1 /* 1 /*
2 * Copyright 2009 OpenSourceCodeStewardshipFoundation.org 2 * Copyright 2009 OpenSourceStewardshipFoundation.org
3 * Licensed under GNU General Public License version 2 3 * Licensed under GNU General Public License version 2
4 *
5 * NOTE: this version of SRSW correct as of April 25, 2010
6 * 4 *
7 * Author: seanhalle@yahoo.com 5 * Author: seanhalle@yahoo.com
8 */ 6 */
9 7
10 8
11 #include <stdio.h> 9 #include <stdio.h>
12 #include <errno.h> 10 #include <errno.h>
13 #include <pthread.h> 11 #include <pthread.h>
14 #include <stdlib.h> 12 #include <stdlib.h>
13 #include <sched.h>
14 #include <windows.h>
15 15
16 #include "BlockingQueue.h" 16 #include "BlockingQueue.h"
17 17
18 #define INC(x) (++x == 1024) ? (x) = 0 : (x) 18 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
19 19
20 #define SPINLOCK_TRIES 100000
20 21
21 //=========================================================================== 22 //===========================================================================
22 //Normal pthread Q 23 //Normal pthread Q
23 24
24 QueueStruc* makeQ() 25 QueueStruc* makeQ()
65 int status, wt; 66 int status, wt;
66 pthread_mutex_lock( &Q->mutex_t ); 67 pthread_mutex_lock( &Q->mutex_t );
67 { 68 {
68 while( Q -> count == 0 ) 69 while( Q -> count == 0 )
69 { Q -> w_empty = 1; 70 { Q -> w_empty = 1;
71 // pthread_cond_broadcast( &Q->cond_w_t );
70 status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); 72 status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
71 if (status != 0) 73 if (status != 0)
72 { perror("Thread wait error: "); 74 { perror("Thread wait error: ");
73 exit(1); 75 exit(1);
74 } 76 }
77 Q -> count -= 1; 79 Q -> count -= 1;
78 ret = Q->data[ Q->readPos ]; 80 ret = Q->data[ Q->readPos ];
79 INC( Q->readPos ); 81 INC( Q->readPos );
80 wt = Q -> w_full; 82 wt = Q -> w_full;
81 Q -> w_full = 0; 83 Q -> w_full = 0;
84 //pthread_cond_broadcast( &Q->cond_w_t );
82 } 85 }
83 pthread_mutex_unlock( &Q->mutex_t ); 86 pthread_mutex_unlock( &Q->mutex_t );
84 if (wt) pthread_cond_signal( &Q->cond_w_t ); 87 if (wt) pthread_cond_signal( &Q->cond_w_t );
85 88
86 return( ret ); 89 return( ret );
140 return retQ; 143 return retQ;
141 } 144 }
142 145
143 146
144 void* readCASQ( CASQueueStruc* Q ) 147 void* readCASQ( CASQueueStruc* Q )
145 { void *out = 0; 148 { void *out = 0;
146 int tries = 0; 149 int tries = 0;
147 int startOfData = Q->startOfData; 150 void **startOfData = Q->startOfData;
148 int endOfData = Q->endOfData; 151 void **endOfData = Q->endOfData;
149 152
150 int success = FALSE; 153 int success = FALSE;
151 154
152 while( !success ) 155 while( !success )
153 { success = 156 { success =
154 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); 157 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
155 if( success ) 158 if( success )
156 { 159 {
157 volatile int insertPos = Q->insertPos; 160 void **insertPos = Q->insertPos;
158 volatile int extractPos = Q->extractPos; 161 void **extractPos = Q->extractPos;
159 162
160 //if not empty -- extract just below insert when empty 163 //if not empty -- extract just below insert when empty
161 if( insertPos - extractPos != 1 && 164 if( insertPos - extractPos != 1 &&
162 !(extractPos == endOfData && insertPos == startOfData)) 165 !(extractPos == endOfData && insertPos == startOfData))
163 { //move before read 166 { //move before read
176 Q->extractLock = UNLOCKED;//have to try again, release for others 179 Q->extractLock = UNLOCKED;//have to try again, release for others
177 } 180 }
178 } 181 }
179 //Q is busy or empty 182 //Q is busy or empty
180 tries++; 183 tries++;
181 if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock 184 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //WinAPI yield()
182 } 185 }
183 } 186 }
184 187
185 void writeCASQ( void * in, CASQueueStruc* Q ) 188 void writeCASQ( void * in, CASQueueStruc* Q )
186 { 189 {
187 int tries = 0; 190 int tries = 0;
188 int startOfData = Q->startOfData; 191 //TODO: need to make Q volatile? Want to do this Q in assembly!
189 int endOfData = Q->endOfData; 192 //Have no idea what GCC's going to do to this code
190 193 void **startOfData = Q->startOfData;
194 void **endOfData = Q->endOfData;
195
191 int success = FALSE; 196 int success = FALSE;
192 197
193 while( !success ) 198 while( !success )
194 { success = 199 { success =
195 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); 200 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
196 if( success ) 201 if( success )
197 { 202 {
198 volatile int insertPos = Q->insertPos; 203 void **insertPos = Q->insertPos;
199 volatile int extractPos = Q->extractPos; 204 void **extractPos = Q->extractPos;
200 205
201 //check if room to insert.. can't use a count variable 206 //check if room to insert.. can't use a count variable
202 // 'cause both insertor Thd and extractor Thd would write it 207 // 'cause both insertor Thd and extractor Thd would write it
203 if( extractPos - insertPos != 1 && 208 if( extractPos - insertPos != 1 &&
204 !(insertPos == endOfData && extractPos == startOfData)) 209 !(insertPos == endOfData && extractPos == startOfData))
205 { *(insertPos) = in; //insert before move 210 { *(Q->insertPos) = in; //insert before move
206 if( insertPos == endOfData ) //write new pos exactly once, correctly 211 if( insertPos == endOfData ) //write new pos exactly once, correctly
207 { Q->insertPos = startOfData; 212 { Q->insertPos = startOfData;
208 } 213 }
209 else 214 else
210 { Q->insertPos++; 215 { Q->insertPos++;
216 { success = FALSE; 221 { success = FALSE;
217 Q->insertLock = UNLOCKED;//have to try again, release for others 222 Q->insertLock = UNLOCKED;//have to try again, release for others
218 } 223 }
219 } 224 }
220 tries++; 225 tries++;
221 if( tries > 10000 ) pthread_yield();//yield not guaranteed 226 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
222 } 227 }
223 } 228 }
224 229
225 #endif //_GNU_SOURCE 230 #endif //_GNU_SOURCE
231
226 232
227 //=========================================================================== 233 //===========================================================================
228 //Single reader single writer super fast Q.. no atomic instrs.. 234 //Single reader single writer super fast Q.. no atomic instrs..
229 235
230 236
231 /*This is a blocking queue, but it uses no atomic instructions, just does 237 /*This is a blocking queue, but it uses no atomic instructions, just does
232 * busy-waiting when empty or full (but yield() if waits too long) 238 * yield() when empty or full
233 * 239 *
234 *It doesn't need any atomic instructions because only a single thread 240 *It doesn't need any atomic instructions because only a single thread
235 * extracts and only a single thread inserts, and it has no locations that 241 * extracts and only a single thread inserts, and it has no locations that
236 * are written by both. It writes before moving and moves before reading, 242 * are written by both. It writes before moving and moves before reading,
237 * and never lets write position and read position be the same, so dis- 243 * and never lets write position and read position be the same, so dis-
256 262
257 263
258 void* readSRSWQ( SRSWQueueStruc* Q ) 264 void* readSRSWQ( SRSWQueueStruc* Q )
259 { void *out = 0; 265 { void *out = 0;
260 int tries = 0; 266 int tries = 0;
261 int startOfData = Q->startOfData;
262 int endOfData = Q->endOfData;
263 267
264 while( TRUE ) 268 while( TRUE )
265 { //not certain the volatile reads need to be done, but safe.. 269 {
266 volatile int insertPos = Q->insertPos; 270 if( Q->insertPos - Q->extractPos != 1 &&
267 volatile int extractPos = Q->extractPos; 271 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
268 272 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
269 //if not empty -- extract just below insert when empty 273 else Q->extractPos++; //move before read
270 if( insertPos - extractPos != 1 &&
271 !(extractPos == endOfData && insertPos == startOfData))
272 { //move before read
273 if( extractPos == endOfData ) //write new pos exactly once, correctly
274 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
275 } // other thread might read bad pos
276 else
277 { Q->extractPos++;
278 }
279 out = *(Q->extractPos); 274 out = *(Q->extractPos);
280 return out; 275 return out;
281 } 276 }
282 //Q is empty 277 //Q is empty
283 tries++; 278 tries++;
284 if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock 279 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
285 } 280 }
286 } 281 }
282
283
284 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
285 { void *out = 0;
286 int tries = 0;
287
288 while( TRUE )
289 {
290 if( Q->insertPos - Q->extractPos != 1 &&
291 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
292 { Q->extractPos++; //move before read
293 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
294 out = *(Q->extractPos);
295 return out;
296 }
297 //Q is empty
298 tries++;
299 if( tries > 2 ) return 0; //long enough for writer to finish
300 }
301 }
302
287 303
288 void writeSRSWQ( void * in, SRSWQueueStruc* Q ) 304 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
289 { 305 {
290 int tries = 0; 306 int tries = 0;
291 int startOfData = Q->startOfData;
292 int endOfData = Q->endOfData;
293 307
294 while( TRUE ) 308 while( TRUE )
295 { //not certain the volatile reads need to be done, but safe.. 309 {
296 volatile int insertPos = Q->insertPos; 310 if( Q->extractPos - Q->insertPos != 1 &&
297 volatile int extractPos = Q->extractPos; 311 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
298 312 { *(Q->insertPos) = in; //insert before move
299 if( extractPos - insertPos != 1 && 313 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
300 !(insertPos == endOfData && extractPos == startOfData)) 314 else Q->insertPos++;
301 { *(insertPos) = in; //insert before move
302 if( insertPos == endOfData ) //write new pos exactly once, correctly
303 { Q->insertPos = startOfData;
304 }
305 else
306 { Q->insertPos++;
307 }
308 return; 315 return;
309 } 316 }
310 //Q is full 317 //Q is full
311 tries++; 318 tries++;
312 if( tries > 10000 ) pthread_yield();//yield not guaranteed 319 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
313 } 320 }
314 } 321 }
322
323
324
325 //===========================================================================
326 //Single reader Multiple writer super fast Q.. no atomic instrs..
327
328
329 /*This is a blocking queue, but it uses no atomic instructions, just does
330 * yield() when empty or full
331 *
332 *It doesn't need any atomic instructions because only a single thread
333 * extracts and only a single thread inserts, and it has no locations that
334 * are written by both. It writes before moving and moves before reading,
335 * and never lets write position and read position be the same, so dis-
336 * synchrony can only ever cause an unnecessary call to yield(), never a
337 * wrong value (by monotonicity of movement of pointers, plus single writer
338 * to pointers, plus sequence of write before change pointer, plus
339 * assumptions that if thread A semantically writes X before Y, then thread
340 * B will see the writes in that order.)
341 *
342 *The multi-writer version is implemented as a hierarchy. Each writer has
343 * its own single-reader single-writer queue. The reader simply does a
344 * round-robin harvesting from them.
345 *
346 *A writer must first register itself with the queue, and receives an ID back
347 * It then uses that ID on each write operation.
348 *
349 *The implementation is:
350 *Physically:
351 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
352 * -] it also has read-pointer to the last queue a write was taken from.
353 *
354 *Action-Patterns:
355 * -] To add a writer
356 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
357 * --]] internally addWriterToQ does:
358 * ---]]] if needs more room, makes a larger writer-array
359 * ---]]] copies the old writer-array into the new
360 * ---]]] makes a new SRSW queue an puts it into the array
361 * ---]]] returns the index to the new SRSW queue as the ID
362 * -] To write
363 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
364 * --]] this call may block, via repeated yield() calls
365 * --]] internally, writeSRMWQ does:
366 * ---]]] uses the writerID as index to get the SRSW queue for that writer
367 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
368 * -] To Read
369 * --]] reader calls readSRMWQ, passing the Q struc
370 * --]] this call may block, via repeated yield() calls
371 * --]] internally, readSRMWQ does:
372 * ---]]] gets saved index of last SRSW queue read from
373 * ---]]] increments index and gets indexed queue
374 * ---]]] does a non-blocking read of that queue
375 * ---]]] if gets something, saves index and returns that value
376 * ---]]] if gets null, then goes to next queue
377 * ---]]] if got null from all the queues then does yield() then tries again
378 *
379 *Note: "0" is used as the value null, so SRSW queues must only contain
380 * pointers, and cannot use 0 as a valid pointer value.
381 *
382 */
383
384 SRMWQueueStruc* makeSRMWQ()
385 { SRMWQueueStruc* retQ;
386
387 retQ = (SRMWQueueStruc *) malloc( sizeof( SRMWQueueStruc ) );
388
389 retQ->numInternalQs = 0;
390 retQ->internalQsSz = 10;
391 retQ->internalQs = malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *));
392
393 retQ->lastQReadFrom = 0;
394
395 return retQ;
396 }
397
398 /* ---]]] if needs more room, makes a larger writer-array
399 * ---]]] copies the old writer-array into the new
400 * ---]]] makes a new SRSW queue an puts it into the array
401 * ---]]] returns the index to the new SRSW queue as the ID
402 *
403 *NOTE: assuming all adds are completed before any writes or reads are
404 * performed.. otherwise, this needs to be re-done carefully, probably with
405 * a lock.
406 */
407 int addWriterToSRMWQ( SRMWQueueStruc* Q )
408 { int oldSz, i;
409 SRSWQueueStruc * *oldArray;
410
411 (Q->numInternalQs)++;
412 if( Q->numInternalQs >= Q->internalQsSz )
413 { //full, so make bigger
414 oldSz = Q->internalQsSz;
415 oldArray = Q->internalQs;
416 Q->internalQsSz *= 2;
417 Q->internalQs = malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
418 for( i = 0; i < oldSz; i++ )
419 { Q->internalQs[i] = oldArray[i];
420 }
421 free( oldArray );
422 }
423 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
424 return Q->numInternalQs - 1;
425 }
426
427
428 /* ---]]] gets saved index of last SRSW queue read-from
429 * ---]]] increments index and gets indexed queue
430 * ---]]] does a non-blocking read of that queue
431 * ---]]] if gets something, saves index and returns that value
432 * ---]]] if gets null, then goes to next queue
433 * ---]]] if got null from all the queues then does yield() then tries again
434 */
435 void* readSRMWQ( SRMWQueueStruc* Q )
436 { SRSWQueueStruc *readQ;
437 void *readValue = 0;
438 int tries = 0;
439 int QToReadFrom = 0;
440
441 QToReadFrom = Q->lastQReadFrom;
442
443 while( TRUE )
444 { QToReadFrom++;
445 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
446 readQ = Q->internalQs[ QToReadFrom ];
447 readValue = readSRSWQ_NonBlocking( readQ );
448
449 if( readValue != 0 ) //got a value, return it
450 { Q->lastQReadFrom = QToReadFrom;
451 return readValue;
452 }
453 else //SRSW Q just read is empty
454 { //check if all queues have been tried
455 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
456 { tries++; //give a writer a chance to finish before yield
457 if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
458 }
459 }
460 }
461 }
462
463
464 /*
465 * ---]]] uses the writerID as index to get the SRSW queue for that writer
466 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
467 */
468 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
469 {
470 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
471
472 writeSRSWQ( in, Q->internalQs[ writerID ] );
473 }