| rev |
line source |
|
Me@0
|
1 /*
|
|
Me@1
|
2 * Copyright 2009 OpenSourceStewardshipFoundation.org
|
|
Me@0
|
3 * Licensed under GNU General Public License version 2
|
|
Me@0
|
4 *
|
|
Me@0
|
5 * Author: seanhalle@yahoo.com
|
|
Me@0
|
6 */
|
|
Me@0
|
7
|
|
Me@0
|
8
|
|
Me@0
|
9 #include <stdio.h>
|
|
Me@0
|
10 #include <errno.h>
|
|
Me@0
|
11 #include <pthread.h>
|
|
Me@0
|
12 #include <stdlib.h>
|
|
Me@1
|
13 #include <sched.h>
|
|
Me@0
|
14
|
|
Me@0
|
15 #include "BlockingQueue.h"
|
|
Me@0
|
16
|
|
Me@0
|
17 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
|
|
Me@0
|
18
|
|
Me@1
|
19 #define SPINLOCK_TRIES 100000
|
|
Me@0
|
20
|
|
Me@0
|
21 //===========================================================================
|
|
Me@0
|
22 //Normal pthread Q
|
|
Me@0
|
23
|
|
Me@4
|
24 PThdQueueStruc* makePThdQ()
|
|
Me@0
|
25 {
|
|
Me@4
|
26 PThdQueueStruc* retQ;
|
|
Me@6
|
27 int retCode;
|
|
Me@9
|
28 retQ = (PThdQueueStruc *) VMS__malloc( sizeof( PThdQueueStruc ) );
|
|
Me@0
|
29
|
|
Me@0
|
30
|
|
Me@6
|
31 retCode =
|
|
Me@6
|
32 pthread_mutex_init( &retQ->mutex_t, NULL);
|
|
Me@6
|
33 if(retCode){perror("Error in creating mutex:"); exit(1);}
|
|
Me@0
|
34
|
|
Me@6
|
35 retCode = pthread_cond_init ( &retQ->cond_w_t, NULL);
|
|
Me@6
|
36 if(retCode){perror("Error in creating cond_var:"); exit(1);}
|
|
Me@0
|
37
|
|
Me@6
|
38 retCode = pthread_cond_init ( &retQ->cond_r_t, NULL);
|
|
Me@6
|
39 if(retCode){perror("Error in creating cond_var:"); exit(1);}
|
|
Me@0
|
40
|
|
Me@0
|
41 retQ->count = 0;
|
|
Me@0
|
42 retQ->readPos = 0;
|
|
Me@0
|
43 retQ->writePos = 0;
|
|
Me@6
|
44 retQ->w_empty = 0;
|
|
Me@6
|
45 retQ->w_full = 0;
|
|
Me@0
|
46
|
|
Me@0
|
47 return retQ;
|
|
Me@0
|
48 }
|
|
Me@0
|
49
|
|
Me@4
|
50 void * readPThdQ( PThdQueueStruc *Q )
|
|
Me@0
|
51 { void *ret;
|
|
Me@6
|
52 int retCode, wt;
|
|
Me@0
|
53 pthread_mutex_lock( &Q->mutex_t );
|
|
Me@0
|
54 {
|
|
Me@0
|
55 while( Q -> count == 0 )
|
|
Me@0
|
56 { Q -> w_empty = 1;
|
|
Me@6
|
57 retCode =
|
|
Me@6
|
58 pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
|
|
Me@6
|
59 if( retCode ){ perror("Thread wait error: "); exit(1); }
|
|
Me@0
|
60 }
|
|
Me@0
|
61 Q -> w_empty = 0;
|
|
Me@0
|
62 Q -> count -= 1;
|
|
Me@0
|
63 ret = Q->data[ Q->readPos ];
|
|
Me@0
|
64 INC( Q->readPos );
|
|
Me@0
|
65 wt = Q -> w_full;
|
|
Me@0
|
66 Q -> w_full = 0;
|
|
Me@0
|
67 }
|
|
Me@0
|
68 pthread_mutex_unlock( &Q->mutex_t );
|
|
Me@6
|
69 if (wt)
|
|
Me@6
|
70 pthread_cond_signal( &Q->cond_w_t );
|
|
Me@0
|
71
|
|
Me@6
|
72 //printf("Q out: %d\n", ret);
|
|
Me@0
|
73 return( ret );
|
|
Me@0
|
74 }
|
|
Me@0
|
75
|
|
Me@4
|
76 void writePThdQ( void * in, PThdQueueStruc* Q )
|
|
Me@0
|
77 {
|
|
Me@0
|
78 int status, wt;
|
|
Me@6
|
79 //printf("Q in: %d\n", in);
|
|
Me@6
|
80
|
|
Me@0
|
81 pthread_mutex_lock( &Q->mutex_t );
|
|
Me@0
|
82 {
|
|
Me@0
|
83 while( Q->count >= 1024 )
|
|
Me@0
|
84 {
|
|
Me@0
|
85 Q -> w_full = 1;
|
|
Me@0
|
86 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t );
|
|
Me@0
|
87 if (status != 0)
|
|
Me@0
|
88 { perror("Thread wait error: ");
|
|
Me@0
|
89 exit(1);
|
|
Me@0
|
90 }
|
|
Me@0
|
91 }
|
|
Me@6
|
92
|
|
Me@0
|
93 Q -> w_full = 0;
|
|
Me@0
|
94 Q->count += 1;
|
|
Me@0
|
95 Q->data[ Q->writePos ] = in;
|
|
Me@0
|
96 INC( Q->writePos );
|
|
Me@0
|
97 wt = Q -> w_empty;
|
|
Me@0
|
98 Q -> w_empty = 0;
|
|
Me@0
|
99 }
|
|
Me@6
|
100
|
|
Me@0
|
101 pthread_mutex_unlock( &Q->mutex_t );
|
|
Me@0
|
102 if( wt ) pthread_cond_signal( &Q->cond_r_t );
|
|
Me@0
|
103 }
|
|
Me@0
|
104
|
|
Me@0
|
105
|
|
Me@0
|
106 //===========================================================================
|
|
Me@0
|
107 // multi reader multi writer fast Q via CAS
|
|
Me@0
|
108 #ifndef _GNU_SOURCE
|
|
Me@0
|
109 #define _GNU_SOURCE
|
|
Me@0
|
110
|
|
Me@0
|
111 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
|
|
Me@0
|
112 * or full
|
|
Me@0
|
113 *It uses CAS because it's meant to have more than one reader and more than
|
|
Me@0
|
114 * one writer.
|
|
Me@0
|
115 */
|
|
Me@0
|
116
|
|
Me@0
|
117 CASQueueStruc* makeCASQ()
|
|
Me@0
|
118 {
|
|
Me@0
|
119 CASQueueStruc* retQ;
|
|
Me@9
|
120 retQ = (CASQueueStruc *) VMS__malloc( sizeof( CASQueueStruc ) );
|
|
Me@0
|
121
|
|
Me@0
|
122 retQ->insertLock = UNLOCKED;
|
|
Me@0
|
123 retQ->extractLock= UNLOCKED;
|
|
Me@0
|
124 //TODO: check got pointer syntax right
|
|
Me@0
|
125 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
|
|
Me@0
|
126 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
|
|
Me@0
|
127 retQ->endOfData = &(retQ->startOfData[1023]);
|
|
Me@0
|
128
|
|
Me@0
|
129 return retQ;
|
|
Me@0
|
130 }
|
|
Me@0
|
131
|
|
Me@0
|
132
|
|
Me@0
|
133 void* readCASQ( CASQueueStruc* Q )
|
|
Me@1
|
134 { void *out = 0;
|
|
Me@1
|
135 int tries = 0;
|
|
Me@1
|
136 void **startOfData = Q->startOfData;
|
|
Me@1
|
137 void **endOfData = Q->endOfData;
|
|
Me@1
|
138
|
|
Me@7
|
139 int gotLock = FALSE;
|
|
Me@0
|
140
|
|
Me@4
|
141 while( TRUE )
|
|
Me@7
|
142 { //this intrinsic returns true if the lock held "UNLOCKED", in which
|
|
Me@7
|
143 // case it now holds "LOCKED" -- if it already held "LOCKED", then
|
|
Me@7
|
144 // gotLock is FALSE
|
|
Me@7
|
145 gotLock =
|
|
Me@0
|
146 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
|
|
Me@4
|
147 //NOTE: checked assy, and it does lock correctly..
|
|
Me@7
|
148 if( gotLock )
|
|
Me@0
|
149 {
|
|
Me@1
|
150 void **insertPos = Q->insertPos;
|
|
Me@1
|
151 void **extractPos = Q->extractPos;
|
|
Me@0
|
152
|
|
Me@0
|
153 //if not empty -- extract just below insert when empty
|
|
Me@0
|
154 if( insertPos - extractPos != 1 &&
|
|
Me@0
|
155 !(extractPos == endOfData && insertPos == startOfData))
|
|
Me@0
|
156 { //move before read
|
|
Me@0
|
157 if( extractPos == endOfData ) //write new pos exactly once, correctly
|
|
Me@0
|
158 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
|
|
Me@0
|
159 } // other thread might read bad pos
|
|
Me@0
|
160 else
|
|
Me@0
|
161 { Q->extractPos++;
|
|
Me@0
|
162 }
|
|
Me@0
|
163 out = *(Q->extractPos);
|
|
Me@0
|
164 Q->extractLock = UNLOCKED;
|
|
Me@0
|
165 return out;
|
|
Me@0
|
166 }
|
|
Me@0
|
167 else //Q is empty
|
|
Me@7
|
168 { Q->extractLock = UNLOCKED;//empty, so release lock for others
|
|
Me@0
|
169 }
|
|
Me@0
|
170 }
|
|
Me@0
|
171 //Q is busy or empty
|
|
Me@0
|
172 tries++;
|
|
Me@6
|
173 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
|
|
Me@0
|
174 }
|
|
Me@0
|
175 }
|
|
Me@0
|
176
|
|
Me@0
|
177 void writeCASQ( void * in, CASQueueStruc* Q )
|
|
Me@0
|
178 {
|
|
Me@0
|
179 int tries = 0;
|
|
Me@1
|
180 //TODO: need to make Q volatile? Want to do this Q in assembly!
|
|
Me@1
|
181 //Have no idea what GCC's going to do to this code
|
|
Me@1
|
182 void **startOfData = Q->startOfData;
|
|
Me@1
|
183 void **endOfData = Q->endOfData;
|
|
Me@1
|
184
|
|
Me@7
|
185 int gotLock = FALSE;
|
|
Me@0
|
186
|
|
Me@4
|
187 while( TRUE )
|
|
Me@7
|
188 { //this intrinsic returns true if the lock held "UNLOCKED", in which
|
|
Me@7
|
189 // case it now holds "LOCKED" -- if it already held "LOCKED", then
|
|
Me@7
|
190 // gotLock is FALSE
|
|
Me@7
|
191 gotLock =
|
|
Me@0
|
192 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
|
|
Me@7
|
193 if( gotLock )
|
|
Me@0
|
194 {
|
|
Me@1
|
195 void **insertPos = Q->insertPos;
|
|
Me@1
|
196 void **extractPos = Q->extractPos;
|
|
Me@0
|
197
|
|
Me@0
|
198 //check if room to insert.. can't use a count variable
|
|
Me@0
|
199 // 'cause both insertor Thd and extractor Thd would write it
|
|
Me@0
|
200 if( extractPos - insertPos != 1 &&
|
|
Me@0
|
201 !(insertPos == endOfData && extractPos == startOfData))
|
|
Me@1
|
202 { *(Q->insertPos) = in; //insert before move
|
|
Me@4
|
203 if( insertPos == endOfData )
|
|
Me@0
|
204 { Q->insertPos = startOfData;
|
|
Me@0
|
205 }
|
|
Me@0
|
206 else
|
|
Me@0
|
207 { Q->insertPos++;
|
|
Me@0
|
208 }
|
|
Me@0
|
209 Q->insertLock = UNLOCKED;
|
|
Me@0
|
210 return;
|
|
Me@0
|
211 }
|
|
Me@0
|
212 else //Q is full
|
|
Me@7
|
213 { Q->insertLock = UNLOCKED;//full, so release lock for others
|
|
Me@0
|
214 }
|
|
Me@0
|
215 }
|
|
Me@0
|
216 tries++;
|
|
Me@6
|
217 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
|
|
Me@0
|
218 }
|
|
Me@0
|
219 }
|
|
Me@0
|
220
|
|
Me@0
|
221 #endif //_GNU_SOURCE
|
|
Me@0
|
222
|
|
Me@1
|
223
|
|
Me@0
|
224 //===========================================================================
|
|
Me@0
|
225 //Single reader single writer super fast Q.. no atomic instrs..
|
|
Me@0
|
226
|
|
Me@0
|
227
|
|
Me@0
|
228 /*This is a blocking queue, but it uses no atomic instructions, just does
|
|
Me@1
|
229 * yield() when empty or full
|
|
Me@0
|
230 *
|
|
Me@0
|
231 *It doesn't need any atomic instructions because only a single thread
|
|
Me@0
|
232 * extracts and only a single thread inserts, and it has no locations that
|
|
Me@0
|
233 * are written by both. It writes before moving and moves before reading,
|
|
Me@0
|
234 * and never lets write position and read position be the same, so dis-
|
|
Me@0
|
235 * synchrony can only ever cause an unnecessary call to yield(), never a
|
|
Me@0
|
236 * wrong value (by monotonicity of movement of pointers, plus single writer
|
|
Me@0
|
237 * to pointers, plus sequence of write before change pointer, plus
|
|
Me@0
|
238 * assumptions that if thread A semantically writes X before Y, then thread
|
|
Me@0
|
239 * B will see the writes in that order.)
|
|
Me@0
|
240 */
|
|
Me@0
|
241
|
|
Me@0
|
242 SRSWQueueStruc* makeSRSWQ()
|
|
Me@0
|
243 {
|
|
Me@0
|
244 SRSWQueueStruc* retQ;
|
|
Me@9
|
245 retQ = (SRSWQueueStruc *) VMS__malloc( sizeof( SRSWQueueStruc ) );
|
|
Me@0
|
246
|
|
Me@0
|
247 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
|
|
Me@0
|
248 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
|
|
Me@0
|
249 retQ->endOfData = &(retQ->startOfData[1023]);
|
|
Me@0
|
250
|
|
Me@0
|
251 return retQ;
|
|
Me@0
|
252 }
|
|
Me@0
|
253
|
|
Me@7
|
254 void
|
|
Me@7
|
255 freeSRSWQ( SRSWQueueStruc* Q )
|
|
Me@7
|
256 {
|
|
Me@10
|
257 VMS__free( Q );
|
|
Me@7
|
258 }
|
|
Me@0
|
259
|
|
Me@0
|
260 void* readSRSWQ( SRSWQueueStruc* Q )
|
|
Me@0
|
261 { void *out = 0;
|
|
Me@0
|
262 int tries = 0;
|
|
Me@0
|
263
|
|
Me@0
|
264 while( TRUE )
|
|
Me@1
|
265 {
|
|
Me@1
|
266 if( Q->insertPos - Q->extractPos != 1 &&
|
|
Me@1
|
267 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
|
|
Me@1
|
268 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
|
|
Me@1
|
269 else Q->extractPos++; //move before read
|
|
Me@0
|
270 out = *(Q->extractPos);
|
|
Me@0
|
271 return out;
|
|
Me@0
|
272 }
|
|
Me@0
|
273 //Q is empty
|
|
Me@0
|
274 tries++;
|
|
Me@6
|
275 if( tries > SPINLOCK_TRIES ) pthread_yield();
|
|
Me@0
|
276 }
|
|
Me@0
|
277 }
|
|
Me@0
|
278
|
|
Me@1
|
279
|
|
Me@1
|
280 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
|
|
Me@1
|
281 { void *out = 0;
|
|
Me@1
|
282 int tries = 0;
|
|
Me@1
|
283
|
|
Me@1
|
284 while( TRUE )
|
|
Me@1
|
285 {
|
|
Me@1
|
286 if( Q->insertPos - Q->extractPos != 1 &&
|
|
Me@1
|
287 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
|
|
Me@1
|
288 { Q->extractPos++; //move before read
|
|
Me@1
|
289 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
|
|
Me@1
|
290 out = *(Q->extractPos);
|
|
Me@1
|
291 return out;
|
|
Me@1
|
292 }
|
|
Me@1
|
293 //Q is empty
|
|
Me@1
|
294 tries++;
|
|
Me@7
|
295 if( tries > 10 ) return NULL; //long enough for writer to finish
|
|
Me@1
|
296 }
|
|
Me@1
|
297 }
|
|
Me@1
|
298
|
|
Me@1
|
299
|
|
Me@0
|
300 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
|
|
Me@0
|
301 {
|
|
Me@0
|
302 int tries = 0;
|
|
Me@0
|
303
|
|
Me@0
|
304 while( TRUE )
|
|
Me@1
|
305 {
|
|
Me@1
|
306 if( Q->extractPos - Q->insertPos != 1 &&
|
|
Me@1
|
307 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
|
|
Me@1
|
308 { *(Q->insertPos) = in; //insert before move
|
|
Me@1
|
309 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
|
|
Me@1
|
310 else Q->insertPos++;
|
|
Me@0
|
311 return;
|
|
Me@0
|
312 }
|
|
Me@0
|
313 //Q is full
|
|
Me@0
|
314 tries++;
|
|
Me@6
|
315 if( tries > SPINLOCK_TRIES ) pthread_yield();
|
|
Me@0
|
316 }
|
|
Me@0
|
317 }
|
|
Me@1
|
318
|
|
Me@1
|
319
|
|
Me@1
|
320
|
|
Me@1
|
321 //===========================================================================
|
|
Me@1
|
322 //Single reader Multiple writer super fast Q.. no atomic instrs..
|
|
Me@1
|
323
|
|
Me@1
|
324
|
|
Me@1
|
325 /*This is a blocking queue, but it uses no atomic instructions, just does
|
|
Me@1
|
326 * yield() when empty or full
|
|
Me@1
|
327 *
|
|
Me@1
|
328 *It doesn't need any atomic instructions because only a single thread
|
|
Me@1
|
329 * extracts and only a single thread inserts, and it has no locations that
|
|
Me@1
|
330 * are written by both. It writes before moving and moves before reading,
|
|
Me@1
|
331 * and never lets write position and read position be the same, so dis-
|
|
Me@1
|
332 * synchrony can only ever cause an unnecessary call to yield(), never a
|
|
Me@1
|
333 * wrong value (by monotonicity of movement of pointers, plus single writer
|
|
Me@1
|
334 * to pointers, plus sequence of write before change pointer, plus
|
|
Me@1
|
335 * assumptions that if thread A semantically writes X before Y, then thread
|
|
Me@1
|
336 * B will see the writes in that order.)
|
|
Me@1
|
337 *
|
|
Me@1
|
338 *The multi-writer version is implemented as a hierarchy. Each writer has
|
|
Me@1
|
339 * its own single-reader single-writer queue. The reader simply does a
|
|
Me@1
|
340 * round-robin harvesting from them.
|
|
Me@1
|
341 *
|
|
Me@1
|
342 *A writer must first register itself with the queue, and receives an ID back
|
|
Me@1
|
343 * It then uses that ID on each write operation.
|
|
Me@1
|
344 *
|
|
Me@1
|
345 *The implementation is:
|
|
Me@1
|
346 *Physically:
|
|
Me@1
|
347 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
|
|
Me@1
|
348 * -] it also has read-pointer to the last queue a write was taken from.
|
|
Me@1
|
349 *
|
|
Me@1
|
350 *Action-Patterns:
|
|
Me@1
|
351 * -] To add a writer
|
|
Me@1
|
352 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
|
|
Me@1
|
353 * --]] internally addWriterToQ does:
|
|
Me@1
|
354 * ---]]] if needs more room, makes a larger writer-array
|
|
Me@1
|
355 * ---]]] copies the old writer-array into the new
|
|
Me@1
|
356 * ---]]] makes a new SRSW queue an puts it into the array
|
|
Me@1
|
357 * ---]]] returns the index to the new SRSW queue as the ID
|
|
Me@1
|
358 * -] To write
|
|
Me@1
|
359 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
|
|
Me@1
|
360 * --]] this call may block, via repeated yield() calls
|
|
Me@1
|
361 * --]] internally, writeSRMWQ does:
|
|
Me@1
|
362 * ---]]] uses the writerID as index to get the SRSW queue for that writer
|
|
Me@1
|
363 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
|
|
Me@1
|
364 * -] To Read
|
|
Me@1
|
365 * --]] reader calls readSRMWQ, passing the Q struc
|
|
Me@1
|
366 * --]] this call may block, via repeated yield() calls
|
|
Me@1
|
367 * --]] internally, readSRMWQ does:
|
|
Me@1
|
368 * ---]]] gets saved index of last SRSW queue read from
|
|
Me@1
|
369 * ---]]] increments index and gets indexed queue
|
|
Me@1
|
370 * ---]]] does a non-blocking read of that queue
|
|
Me@1
|
371 * ---]]] if gets something, saves index and returns that value
|
|
Me@1
|
372 * ---]]] if gets null, then goes to next queue
|
|
Me@1
|
373 * ---]]] if got null from all the queues then does yield() then tries again
|
|
Me@1
|
374 *
|
|
Me@1
|
375 *Note: "0" is used as the value null, so SRSW queues must only contain
|
|
Me@1
|
376 * pointers, and cannot use 0 as a valid pointer value.
|
|
Me@1
|
377 *
|
|
Me@1
|
378 */
|
|
Me@1
|
379
|
|
Me@1
|
380 SRMWQueueStruc* makeSRMWQ()
|
|
Me@1
|
381 { SRMWQueueStruc* retQ;
|
|
Me@1
|
382
|
|
Me@9
|
383 retQ = (SRMWQueueStruc *) VMS__malloc( sizeof( SRMWQueueStruc ) );
|
|
Me@1
|
384
|
|
Me@1
|
385 retQ->numInternalQs = 0;
|
|
Me@1
|
386 retQ->internalQsSz = 10;
|
|
Me@10
|
387 retQ->internalQs = VMS__malloc( retQ->internalQsSz *
|
|
Me@10
|
388 sizeof(SRSWQueueStruc *) );
|
|
Me@1
|
389
|
|
Me@1
|
390 retQ->lastQReadFrom = 0;
|
|
Me@1
|
391
|
|
Me@1
|
392 return retQ;
|
|
Me@1
|
393 }
|
|
Me@1
|
394
|
|
Me@1
|
395 /* ---]]] if needs more room, makes a larger writer-array
|
|
Me@1
|
396 * ---]]] copies the old writer-array into the new
|
|
Me@1
|
397 * ---]]] makes a new SRSW queue an puts it into the array
|
|
Me@1
|
398 * ---]]] returns the index to the new SRSW queue as the ID
|
|
Me@1
|
399 *
|
|
Me@1
|
400 *NOTE: assuming all adds are completed before any writes or reads are
|
|
Me@1
|
401 * performed.. otherwise, this needs to be re-done carefully, probably with
|
|
Me@1
|
402 * a lock.
|
|
Me@1
|
403 */
|
|
Me@1
|
404 int addWriterToSRMWQ( SRMWQueueStruc* Q )
|
|
Me@1
|
405 { int oldSz, i;
|
|
Me@1
|
406 SRSWQueueStruc * *oldArray;
|
|
Me@1
|
407
|
|
Me@1
|
408 (Q->numInternalQs)++;
|
|
Me@1
|
409 if( Q->numInternalQs >= Q->internalQsSz )
|
|
Me@1
|
410 { //full, so make bigger
|
|
Me@1
|
411 oldSz = Q->internalQsSz;
|
|
Me@1
|
412 oldArray = Q->internalQs;
|
|
Me@1
|
413 Q->internalQsSz *= 2;
|
|
Me@9
|
414 Q->internalQs = VMS__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
|
|
Me@1
|
415 for( i = 0; i < oldSz; i++ )
|
|
Me@1
|
416 { Q->internalQs[i] = oldArray[i];
|
|
Me@1
|
417 }
|
|
Me@10
|
418 VMS__free( oldArray );
|
|
Me@1
|
419 }
|
|
Me@1
|
420 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
|
|
Me@1
|
421 return Q->numInternalQs - 1;
|
|
Me@1
|
422 }
|
|
Me@1
|
423
|
|
Me@1
|
424
|
|
Me@1
|
425 /* ---]]] gets saved index of last SRSW queue read-from
|
|
Me@1
|
426 * ---]]] increments index and gets indexed queue
|
|
Me@1
|
427 * ---]]] does a non-blocking read of that queue
|
|
Me@1
|
428 * ---]]] if gets something, saves index and returns that value
|
|
Me@1
|
429 * ---]]] if gets null, then goes to next queue
|
|
Me@1
|
430 * ---]]] if got null from all the queues then does yield() then tries again
|
|
Me@1
|
431 */
|
|
Me@1
|
432 void* readSRMWQ( SRMWQueueStruc* Q )
|
|
Me@1
|
433 { SRSWQueueStruc *readQ;
|
|
Me@1
|
434 void *readValue = 0;
|
|
Me@1
|
435 int tries = 0;
|
|
Me@1
|
436 int QToReadFrom = 0;
|
|
Me@1
|
437
|
|
Me@1
|
438 QToReadFrom = Q->lastQReadFrom;
|
|
Me@1
|
439
|
|
Me@1
|
440 while( TRUE )
|
|
Me@1
|
441 { QToReadFrom++;
|
|
Me@1
|
442 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
|
|
Me@1
|
443 readQ = Q->internalQs[ QToReadFrom ];
|
|
Me@1
|
444 readValue = readSRSWQ_NonBlocking( readQ );
|
|
Me@1
|
445
|
|
Me@1
|
446 if( readValue != 0 ) //got a value, return it
|
|
Me@1
|
447 { Q->lastQReadFrom = QToReadFrom;
|
|
Me@1
|
448 return readValue;
|
|
Me@1
|
449 }
|
|
Me@1
|
450 else //SRSW Q just read is empty
|
|
Me@1
|
451 { //check if all queues have been tried
|
|
Me@1
|
452 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
|
|
Me@1
|
453 { tries++; //give a writer a chance to finish before yield
|
|
Me@6
|
454 if( tries > SPINLOCK_TRIES ) pthread_yield();
|
|
Me@1
|
455 }
|
|
Me@1
|
456 }
|
|
Me@1
|
457 }
|
|
Me@1
|
458 }
|
|
Me@1
|
459
|
|
Me@1
|
460
|
|
Me@1
|
461 /*
|
|
Me@1
|
462 * ---]]] uses the writerID as index to get the SRSW queue for that writer
|
|
Me@1
|
463 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
|
|
Me@1
|
464 */
|
|
Me@1
|
465 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
|
|
Me@1
|
466 {
|
|
Me@1
|
467 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
|
|
Me@1
|
468
|
|
Me@1
|
469 writeSRSWQ( in, Q->internalQs[ writerID ] );
|
|
Me@1
|
470 }
|