| rev |
line source |
|
Me@19
|
1 /*
|
|
seanhalle@44
|
2 * Copyright 2009 OpenSourceResearchInstitute.org
|
|
Me@19
|
3 * Licensed under GNU General Public License version 2
|
|
Me@19
|
4 *
|
|
Me@19
|
5 * Author: seanhalle@yahoo.com
|
|
Me@19
|
6 */
|
|
Me@19
|
7
|
|
Me@19
|
8
|
|
Me@19
|
9 #include <stdio.h>
|
|
Me@19
|
10 #include <errno.h>
|
|
Me@19
|
11 #include <pthread.h>
|
|
Me@19
|
12 #include <stdlib.h>
|
|
Me@19
|
13 #include <sched.h>
|
|
seanhalle@29
|
14 #include <string.h>
|
|
Me@19
|
15
|
|
Me@19
|
16 #include "BlockingQueue.h"
|
|
seanhalle@49
|
17 #include <PR__include/prqueue.h>
|
|
seanhalle@49
|
18 #include <PR__include/prmalloc.h>
|
|
Me@19
|
19
|
|
Me@19
|
20 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
|
|
Me@19
|
21
|
|
Me@19
|
22 #define SPINLOCK_TRIES 100000
|
|
Me@19
|
23
|
|
Me@19
|
24
|
|
seanhalle@49
|
25 //========================== pthread based queue =========================
|
|
seanhalle@49
|
26 /*Not currently implemented.. however, copied this code from place where
|
|
seanhalle@49
|
27 * did equivalent.. Idea is to just make a private queue, then protect
|
|
seanhalle@49
|
28 * access with a lock.. copied code snippet below is how access was
|
|
seanhalle@49
|
29 * protected.. just roll this inside a "readBlockingQ()" function.. do
|
|
seanhalle@49
|
30 * equivalent inside writeBlockingQ() function.. to make one, just add
|
|
seanhalle@49
|
31 * the lock to the queue structure..
|
|
seanhalle@49
|
32 */
|
|
seanhalle@49
|
33 /*
|
|
seanhalle@49
|
34 production = NULL;
|
|
seanhalle@49
|
35 while( production == NULL )
|
|
seanhalle@49
|
36 { pthread_mutex_lock( &queueAccessLock );
|
|
seanhalle@49
|
37 production = readPrivQ( commQ );
|
|
seanhalle@49
|
38 pthread_mutex_unlock( &queueAccessLock );
|
|
seanhalle@49
|
39 // If empty, yields and tries again.
|
|
seanhalle@49
|
40 if( production == NULL) sched_yield();
|
|
seanhalle@49
|
41 }
|
|
seanhalle@49
|
42 */
|
|
Me@19
|
43
|
|
Me@19
|
44 //===========================================================================
|
|
Me@19
|
45 // multi reader multi writer fast Q via CAS
|
|
Me@19
|
46 #ifndef _GNU_SOURCE
|
|
Me@19
|
47 #define _GNU_SOURCE
|
|
Me@19
|
48
|
|
Me@19
|
49 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
|
|
Me@19
|
50 * or full
|
|
Me@19
|
51 *It uses CAS because it's meant to have more than one reader and more than
|
|
Me@19
|
52 * one writer.
|
|
Me@19
|
53 */
|
|
Me@19
|
54
|
|
Me@19
|
55 CASQueueStruc* makeCASQ()
|
|
Me@19
|
56 {
|
|
Me@19
|
57 CASQueueStruc* retQ;
|
|
seanhalle@49
|
58 retQ = (CASQueueStruc *) PR__malloc( sizeof( CASQueueStruc ) );
|
|
Me@19
|
59
|
|
Me@19
|
60 retQ->insertLock = UNLOCKED;
|
|
Me@19
|
61 retQ->extractLock= UNLOCKED;
|
|
seanhalle@29
|
62
|
|
seanhalle@29
|
63 retQ->extractPos = (volatile void**)&(retQ->startOfData[0]); //side by side == empty
|
|
seanhalle@29
|
64 retQ->insertPos = (volatile void**)&(retQ->startOfData[1]); // so start pos's have to be
|
|
Me@19
|
65 retQ->endOfData = &(retQ->startOfData[1023]);
|
|
Me@19
|
66
|
|
Me@19
|
67 return retQ;
|
|
Me@19
|
68 }
|
|
Me@19
|
69
|
|
Me@19
|
70
|
|
Me@19
|
71 void* readCASQ( CASQueueStruc* Q )
|
|
Me@19
|
72 { void *out = 0;
|
|
seanhalle@44
|
73 int32 tries = 0;
|
|
Me@19
|
74 void **startOfData = Q->startOfData;
|
|
Me@19
|
75 void **endOfData = Q->endOfData;
|
|
Me@19
|
76
|
|
seanhalle@44
|
77 int32 gotLock = FALSE;
|
|
Me@19
|
78
|
|
Me@19
|
79 while( TRUE )
|
|
Me@19
|
80 { //this intrinsic returns true if the lock held "UNLOCKED", in which
|
|
Me@19
|
81 // case it now holds "LOCKED" -- if it already held "LOCKED", then
|
|
Me@19
|
82 // gotLock is FALSE
|
|
Me@19
|
83 gotLock =
|
|
Me@19
|
84 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
|
|
Me@19
|
85 //NOTE: checked assy, and it does lock correctly..
|
|
Me@19
|
86 if( gotLock )
|
|
Me@19
|
87 {
|
|
seanhalle@29
|
88 void **insertPos = (void **)Q->insertPos;
|
|
seanhalle@29
|
89 void **extractPos = (void **)Q->extractPos;
|
|
Me@19
|
90
|
|
Me@19
|
91 //if not empty -- extract just below insert when empty
|
|
Me@19
|
92 if( insertPos - extractPos != 1 &&
|
|
Me@19
|
93 !(extractPos == endOfData && insertPos == startOfData))
|
|
Me@19
|
94 { //move before read
|
|
Me@19
|
95 if( extractPos == endOfData ) //write new pos exactly once, correctly
|
|
Me@19
|
96 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
|
|
Me@19
|
97 } // other thread might read bad pos
|
|
Me@19
|
98 else
|
|
Me@19
|
99 { Q->extractPos++;
|
|
Me@19
|
100 }
|
|
seanhalle@29
|
101 out = (void *) *(Q->extractPos);
|
|
Me@19
|
102 Q->extractLock = UNLOCKED;
|
|
Me@19
|
103 return out;
|
|
Me@19
|
104 }
|
|
Me@19
|
105 else //Q is empty
|
|
Me@19
|
106 { Q->extractLock = UNLOCKED;//empty, so release lock for others
|
|
Me@19
|
107 }
|
|
Me@19
|
108 }
|
|
Me@19
|
109 //Q is busy or empty
|
|
Me@19
|
110 tries++;
|
|
Me@19
|
111 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
|
|
Me@19
|
112 }
|
|
Me@19
|
113 }
|
|
Me@19
|
114
|
|
Me@19
|
115 void writeCASQ( void * in, CASQueueStruc* Q )
|
|
Me@19
|
116 {
|
|
seanhalle@44
|
117 int32 tries = 0;
|
|
Me@19
|
118 //TODO: need to make Q volatile? Want to do this Q in assembly!
|
|
Me@19
|
119 //Have no idea what GCC's going to do to this code
|
|
Me@19
|
120 void **startOfData = Q->startOfData;
|
|
Me@19
|
121 void **endOfData = Q->endOfData;
|
|
Me@19
|
122
|
|
seanhalle@44
|
123 int32 gotLock = FALSE;
|
|
Me@19
|
124
|
|
Me@19
|
125 while( TRUE )
|
|
Me@19
|
126 { //this intrinsic returns true if the lock held "UNLOCKED", in which
|
|
Me@19
|
127 // case it now holds "LOCKED" -- if it already held "LOCKED", then
|
|
Me@19
|
128 // gotLock is FALSE
|
|
Me@19
|
129 gotLock =
|
|
Me@19
|
130 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
|
|
Me@19
|
131 if( gotLock )
|
|
Me@19
|
132 {
|
|
seanhalle@29
|
133 void **insertPos = (void **)Q->insertPos;
|
|
seanhalle@29
|
134 void **extractPos = (void **)Q->extractPos;
|
|
Me@19
|
135
|
|
Me@19
|
136 //check if room to insert.. can't use a count variable
|
|
Me@19
|
137 // 'cause both insertor Thd and extractor Thd would write it
|
|
Me@19
|
138 if( extractPos - insertPos != 1 &&
|
|
Me@19
|
139 !(insertPos == endOfData && extractPos == startOfData))
|
|
Me@19
|
140 { *(Q->insertPos) = in; //insert before move
|
|
Me@19
|
141 if( insertPos == endOfData )
|
|
Me@19
|
142 { Q->insertPos = startOfData;
|
|
Me@19
|
143 }
|
|
Me@19
|
144 else
|
|
Me@19
|
145 { Q->insertPos++;
|
|
Me@19
|
146 }
|
|
Me@19
|
147 Q->insertLock = UNLOCKED;
|
|
Me@19
|
148 return;
|
|
Me@19
|
149 }
|
|
Me@19
|
150 else //Q is full
|
|
Me@19
|
151 { Q->insertLock = UNLOCKED;//full, so release lock for others
|
|
Me@19
|
152 }
|
|
Me@19
|
153 }
|
|
Me@19
|
154 tries++;
|
|
Me@19
|
155 if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
|
|
Me@19
|
156 }
|
|
Me@19
|
157 }
|
|
Me@19
|
158
|
|
Me@19
|
159 #endif //_GNU_SOURCE
|
|
Me@19
|
160
|
|
Me@19
|
161
|
|
Me@19
|
162 //===========================================================================
|
|
Me@19
|
163 //Single reader single writer super fast Q.. no atomic instrs..
|
|
Me@19
|
164
|
|
Me@19
|
165
|
|
Me@19
|
166 /*This is a blocking queue, but it uses no atomic instructions, just does
|
|
Me@19
|
167 * yield() when empty or full
|
|
Me@19
|
168 *
|
|
Me@19
|
169 *It doesn't need any atomic instructions because only a single thread
|
|
Me@19
|
170 * extracts and only a single thread inserts, and it has no locations that
|
|
Me@19
|
171 * are written by both. It writes before moving and moves before reading,
|
|
Me@19
|
172 * and never lets write position and read position be the same, so dis-
|
|
Me@19
|
173 * synchrony can only ever cause an unnecessary call to yield(), never a
|
|
Me@19
|
174 * wrong value (by monotonicity of movement of pointers, plus single writer
|
|
Me@19
|
175 * to pointers, plus sequence of write before change pointer, plus
|
|
Me@19
|
176 * assumptions that if thread A semantically writes X before Y, then thread
|
|
Me@19
|
177 * B will see the writes in that order.)
|
|
Me@19
|
178 */
|
|
Me@19
|
179
|
|
Me@19
|
180 SRSWQueueStruc* makeSRSWQ()
|
|
Me@19
|
181 {
|
|
Me@19
|
182 SRSWQueueStruc* retQ;
|
|
seanhalle@49
|
183 retQ = (SRSWQueueStruc *) PR__malloc( sizeof( SRSWQueueStruc ) );
|
|
Me@19
|
184 memset( retQ->startOfData, 0, 1024 * sizeof(void *) );
|
|
Me@19
|
185
|
|
Me@19
|
186 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
|
|
Me@19
|
187 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
|
|
Me@19
|
188 retQ->endOfData = &(retQ->startOfData[1023]);
|
|
Me@19
|
189
|
|
Me@19
|
190 return retQ;
|
|
Me@19
|
191 }
|
|
Me@19
|
192
|
|
Me@19
|
193 void
|
|
Me@19
|
194 freeSRSWQ( SRSWQueueStruc* Q )
|
|
Me@19
|
195 {
|
|
seanhalle@48
|
196 PR__free( Q );
|
|
Me@19
|
197 }
|
|
Me@19
|
198
|
|
Me@19
|
199 void* readSRSWQ( SRSWQueueStruc* Q )
|
|
Me@19
|
200 { void *out = 0;
|
|
seanhalle@44
|
201 int32 tries = 0;
|
|
Me@19
|
202
|
|
Me@19
|
203 while( TRUE )
|
|
Me@19
|
204 {
|
|
Me@19
|
205 if( Q->insertPos - Q->extractPos != 1 &&
|
|
Me@19
|
206 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
|
|
Me@19
|
207 { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
|
|
Me@19
|
208 else Q->extractPos++; //move before read
|
|
Me@19
|
209 out = *(Q->extractPos);
|
|
Me@19
|
210 return out;
|
|
Me@19
|
211 }
|
|
Me@19
|
212 //Q is empty
|
|
Me@19
|
213 tries++;
|
|
Me@19
|
214 if( tries > SPINLOCK_TRIES ) pthread_yield();
|
|
Me@19
|
215 }
|
|
Me@19
|
216 }
|
|
Me@19
|
217
|
|
Me@19
|
218
|
|
Me@19
|
219 void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
|
|
Me@19
|
220 { void *out = 0;
|
|
seanhalle@44
|
221 int32 tries = 0;
|
|
Me@19
|
222
|
|
Me@19
|
223 while( TRUE )
|
|
Me@19
|
224 {
|
|
Me@19
|
225 if( Q->insertPos - Q->extractPos != 1 &&
|
|
Me@19
|
226 !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
|
|
Me@19
|
227 { Q->extractPos++; //move before read
|
|
Me@19
|
228 if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
|
|
Me@19
|
229 out = *(Q->extractPos);
|
|
Me@19
|
230 return out;
|
|
Me@19
|
231 }
|
|
Me@19
|
232 //Q is empty
|
|
Me@19
|
233 tries++;
|
|
Me@19
|
234 if( tries > 10 ) return NULL; //long enough for writer to finish
|
|
Me@19
|
235 }
|
|
Me@19
|
236 }
|
|
Me@19
|
237
|
|
Me@19
|
238
|
|
Me@19
|
239 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
|
|
Me@19
|
240 {
|
|
seanhalle@44
|
241 int32 tries = 0;
|
|
Me@19
|
242
|
|
Me@19
|
243 while( TRUE )
|
|
Me@19
|
244 {
|
|
Me@19
|
245 if( Q->extractPos - Q->insertPos != 1 &&
|
|
Me@19
|
246 !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
|
|
Me@19
|
247 { *(Q->insertPos) = in; //insert before move
|
|
Me@19
|
248 if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
|
|
Me@19
|
249 else Q->insertPos++;
|
|
Me@19
|
250 return;
|
|
Me@19
|
251 }
|
|
Me@19
|
252 //Q is full
|
|
Me@19
|
253 tries++;
|
|
Me@19
|
254 if( tries > SPINLOCK_TRIES ) pthread_yield();
|
|
Me@19
|
255 }
|
|
Me@19
|
256 }
|
|
Me@19
|
257
|
|
Me@19
|
258
|
|
Me@19
|
259
|
|
Me@19
|
260 //===========================================================================
|
|
Me@19
|
261 //Single reader Multiple writer super fast Q.. no atomic instrs..
|
|
Me@19
|
262
|
|
Me@19
|
263
|
|
Me@19
|
264 /*This is a blocking queue, but it uses no atomic instructions, just does
|
|
Me@19
|
265 * yield() when empty or full
|
|
Me@19
|
266 *
|
|
Me@19
|
267 *It doesn't need any atomic instructions because only a single thread
|
|
Me@19
|
268 * extracts and only a single thread inserts, and it has no locations that
|
|
Me@19
|
269 * are written by both. It writes before moving and moves before reading,
|
|
Me@19
|
270 * and never lets write position and read position be the same, so dis-
|
|
Me@19
|
271 * synchrony can only ever cause an unnecessary call to yield(), never a
|
|
Me@19
|
272 * wrong value (by monotonicity of movement of pointers, plus single writer
|
|
Me@19
|
273 * to pointers, plus sequence of write before change pointer, plus
|
|
Me@19
|
274 * assumptions that if thread A semantically writes X before Y, then thread
|
|
Me@19
|
275 * B will see the writes in that order.)
|
|
Me@19
|
276 *
|
|
Me@19
|
277 *The multi-writer version is implemented as a hierarchy. Each writer has
|
|
Me@19
|
278 * its own single-reader single-writer queue. The reader simply does a
|
|
Me@19
|
279 * round-robin harvesting from them.
|
|
Me@19
|
280 *
|
|
Me@19
|
281 *A writer must first register itself with the queue, and receives an ID back
|
|
Me@19
|
282 * It then uses that ID on each write operation.
|
|
Me@19
|
283 *
|
|
Me@19
|
284 *The implementation is:
|
|
Me@19
|
285 *Physically:
|
|
Me@19
|
286 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
|
|
Me@19
|
287 * -] it also has read-pointer to the last queue a write was taken from.
|
|
Me@19
|
288 *
|
|
Me@19
|
289 *Action-Patterns:
|
|
Me@19
|
290 * -] To add a writer
|
|
Me@19
|
291 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
|
|
Me@19
|
292 * --]] internally addWriterToQ does:
|
|
Me@19
|
293 * ---]]] if needs more room, makes a larger writer-array
|
|
Me@19
|
294 * ---]]] copies the old writer-array into the new
|
|
Me@19
|
295 * ---]]] makes a new SRSW queue an puts it into the array
|
|
Me@19
|
296 * ---]]] returns the index to the new SRSW queue as the ID
|
|
Me@19
|
297 * -] To write
|
|
Me@19
|
298 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
|
|
Me@19
|
299 * --]] this call may block, via repeated yield() calls
|
|
Me@19
|
300 * --]] internally, writeSRMWQ does:
|
|
Me@19
|
301 * ---]]] uses the writerID as index to get the SRSW queue for that writer
|
|
Me@19
|
302 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
|
|
Me@19
|
303 * -] To Read
|
|
Me@19
|
304 * --]] reader calls readSRMWQ, passing the Q struc
|
|
Me@19
|
305 * --]] this call may block, via repeated yield() calls
|
|
Me@19
|
306 * --]] internally, readSRMWQ does:
|
|
Me@19
|
307 * ---]]] gets saved index of last SRSW queue read from
|
|
Me@19
|
308 * ---]]] increments index and gets indexed queue
|
|
Me@19
|
309 * ---]]] does a non-blocking read of that queue
|
|
Me@19
|
310 * ---]]] if gets something, saves index and returns that value
|
|
Me@19
|
311 * ---]]] if gets null, then goes to next queue
|
|
Me@19
|
312 * ---]]] if got null from all the queues then does yield() then tries again
|
|
Me@19
|
313 *
|
|
Me@19
|
314 *Note: "0" is used as the value null, so SRSW queues must only contain
|
|
Me@19
|
315 * pointers, and cannot use 0 as a valid pointer value.
|
|
Me@19
|
316 *
|
|
Me@19
|
317 */
|
|
Me@19
|
318
|
|
Me@19
|
319 SRMWQueueStruc* makeSRMWQ()
|
|
Me@19
|
320 { SRMWQueueStruc* retQ;
|
|
Me@19
|
321
|
|
seanhalle@49
|
322 retQ = (SRMWQueueStruc *) PR__malloc( sizeof( SRMWQueueStruc ) );
|
|
Me@19
|
323
|
|
Me@19
|
324 retQ->numInternalQs = 0;
|
|
Me@19
|
325 retQ->internalQsSz = 10;
|
|
seanhalle@49
|
326 retQ->internalQs = PR__malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *));
|
|
Me@19
|
327
|
|
Me@19
|
328 retQ->lastQReadFrom = 0;
|
|
Me@19
|
329
|
|
Me@19
|
330 return retQ;
|
|
Me@19
|
331 }
|
|
Me@19
|
332
|
|
Me@19
|
333 /* ---]]] if needs more room, makes a larger writer-array
|
|
Me@19
|
334 * ---]]] copies the old writer-array into the new
|
|
Me@19
|
335 * ---]]] makes a new SRSW queue an puts it into the array
|
|
Me@19
|
336 * ---]]] returns the index to the new SRSW queue as the ID
|
|
Me@19
|
337 *
|
|
Me@19
|
338 *NOTE: assuming all adds are completed before any writes or reads are
|
|
Me@19
|
339 * performed.. otherwise, this needs to be re-done carefully, probably with
|
|
Me@19
|
340 * a lock.
|
|
Me@19
|
341 */
|
|
Me@19
|
342 int addWriterToSRMWQ( SRMWQueueStruc* Q )
|
|
Me@19
|
343 { int oldSz, i;
|
|
Me@19
|
344 SRSWQueueStruc * *oldArray;
|
|
Me@19
|
345
|
|
Me@19
|
346 (Q->numInternalQs)++;
|
|
Me@19
|
347 if( Q->numInternalQs >= Q->internalQsSz )
|
|
Me@19
|
348 { //full, so make bigger
|
|
Me@19
|
349 oldSz = Q->internalQsSz;
|
|
Me@19
|
350 oldArray = Q->internalQs;
|
|
Me@19
|
351 Q->internalQsSz *= 2;
|
|
seanhalle@49
|
352 Q->internalQs = PR__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
|
|
Me@19
|
353 for( i = 0; i < oldSz; i++ )
|
|
Me@19
|
354 { Q->internalQs[i] = oldArray[i];
|
|
Me@19
|
355 }
|
|
seanhalle@48
|
356 PR__free( oldArray );
|
|
Me@19
|
357 }
|
|
Me@19
|
358 Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
|
|
Me@19
|
359 return Q->numInternalQs - 1;
|
|
Me@19
|
360 }
|
|
Me@19
|
361
|
|
Me@19
|
362
|
|
Me@19
|
363 /* ---]]] gets saved index of last SRSW queue read-from
|
|
Me@19
|
364 * ---]]] increments index and gets indexed queue
|
|
Me@19
|
365 * ---]]] does a non-blocking read of that queue
|
|
Me@19
|
366 * ---]]] if gets something, saves index and returns that value
|
|
Me@19
|
367 * ---]]] if gets null, then goes to next queue
|
|
Me@19
|
368 * ---]]] if got null from all the queues then does yield() then tries again
|
|
Me@19
|
369 */
|
|
Me@19
|
370 void* readSRMWQ( SRMWQueueStruc* Q )
|
|
Me@19
|
371 { SRSWQueueStruc *readQ;
|
|
Me@19
|
372 void *readValue = 0;
|
|
seanhalle@44
|
373 int32 tries = 0;
|
|
seanhalle@44
|
374 int32 QToReadFrom = 0;
|
|
Me@19
|
375
|
|
Me@19
|
376 QToReadFrom = Q->lastQReadFrom;
|
|
Me@19
|
377
|
|
Me@19
|
378 while( TRUE )
|
|
Me@19
|
379 { QToReadFrom++;
|
|
Me@19
|
380 if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
|
|
Me@19
|
381 readQ = Q->internalQs[ QToReadFrom ];
|
|
Me@19
|
382 readValue = readSRSWQ_NonBlocking( readQ );
|
|
Me@19
|
383
|
|
Me@19
|
384 if( readValue != 0 ) //got a value, return it
|
|
Me@19
|
385 { Q->lastQReadFrom = QToReadFrom;
|
|
Me@19
|
386 return readValue;
|
|
Me@19
|
387 }
|
|
Me@19
|
388 else //SRSW Q just read is empty
|
|
Me@19
|
389 { //check if all queues have been tried
|
|
Me@19
|
390 if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
|
|
Me@19
|
391 { tries++; //give a writer a chance to finish before yield
|
|
Me@19
|
392 if( tries > SPINLOCK_TRIES ) pthread_yield();
|
|
Me@19
|
393 }
|
|
Me@19
|
394 }
|
|
Me@19
|
395 }
|
|
Me@19
|
396 }
|
|
Me@19
|
397
|
|
Me@19
|
398
|
|
Me@19
|
399 /*
|
|
Me@19
|
400 * ---]]] uses the writerID as index to get the SRSW queue for that writer
|
|
Me@19
|
401 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
|
|
Me@19
|
402 */
|
|
Me@19
|
403 void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
|
|
Me@19
|
404 {
|
|
Me@19
|
405 if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
|
|
Me@19
|
406
|
|
Me@19
|
407 writeSRSWQ( in, Q->internalQs[ writerID ] );
|
|
Me@19
|
408 }
|