| rev |
line source |
|
Me@0
|
1 /*
|
|
Me@0
|
2 * Copyright 2009 OpenSourceCodeStewardshipFoundation.org
|
|
Me@0
|
3 * Licensed under GNU General Public License version 2
|
|
Me@0
|
4 *
|
|
Me@0
|
5 * NOTE: this version of SRSW correct as of April 25, 2010
|
|
Me@0
|
6 *
|
|
Me@0
|
7 * Author: seanhalle@yahoo.com
|
|
Me@0
|
8 */
|
|
Me@0
|
9
|
|
Me@0
|
10
|
|
Me@0
|
11 #include <stdio.h>
|
|
Me@0
|
12 #include <errno.h>
|
|
Me@0
|
13 #include <pthread.h>
|
|
Me@0
|
14 #include <stdlib.h>
|
|
Me@0
|
15
|
|
Me@0
|
16 #include "BlockingQueue.h"
|
|
Me@0
|
17
|
|
Me@0
|
18 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
|
|
Me@0
|
19
|
|
Me@0
|
20
|
|
Me@0
|
21 //===========================================================================
|
|
Me@0
|
22 //Normal pthread Q
|
|
Me@0
|
23
|
|
Me@0
|
24 QueueStruc* makeQ()
|
|
Me@0
|
25 {
|
|
Me@0
|
26 QueueStruc* retQ;
|
|
Me@0
|
27 int status;
|
|
Me@0
|
28 retQ = (QueueStruc *) malloc( sizeof( QueueStruc ) );
|
|
Me@0
|
29
|
|
Me@0
|
30
|
|
Me@0
|
31 status = pthread_mutex_init( &retQ->mutex_t, NULL);
|
|
Me@0
|
32 if (status < 0)
|
|
Me@0
|
33 {
|
|
Me@0
|
34 perror("Error in creating mutex:");
|
|
Me@0
|
35 exit(1);
|
|
Me@0
|
36 return NULL;
|
|
Me@0
|
37 }
|
|
Me@0
|
38
|
|
Me@0
|
39 status = pthread_cond_init ( &retQ->cond_w_t, NULL);
|
|
Me@0
|
40 if (status < 0)
|
|
Me@0
|
41 {
|
|
Me@0
|
42 perror("Error in creating cond_var:");
|
|
Me@0
|
43 exit(1);
|
|
Me@0
|
44 return NULL;
|
|
Me@0
|
45 }
|
|
Me@0
|
46
|
|
Me@0
|
47 status = pthread_cond_init ( &retQ->cond_r_t, NULL);
|
|
Me@0
|
48 if (status < 0)
|
|
Me@0
|
49 {
|
|
Me@0
|
50 perror("Error in creating cond_var:");
|
|
Me@0
|
51 exit(1);
|
|
Me@0
|
52 return NULL;
|
|
Me@0
|
53 }
|
|
Me@0
|
54
|
|
Me@0
|
55 retQ->count = 0;
|
|
Me@0
|
56 retQ->readPos = 0;
|
|
Me@0
|
57 retQ->writePos = 0;
|
|
Me@0
|
58 retQ -> w_empty = retQ -> w_full = 0;
|
|
Me@0
|
59
|
|
Me@0
|
60 return retQ;
|
|
Me@0
|
61 }
|
|
Me@0
|
62
|
|
Me@0
|
63 void * readQ( QueueStruc *Q )
|
|
Me@0
|
64 { void *ret;
|
|
Me@0
|
65 int status, wt;
|
|
Me@0
|
66 pthread_mutex_lock( &Q->mutex_t );
|
|
Me@0
|
67 {
|
|
Me@0
|
68 while( Q -> count == 0 )
|
|
Me@0
|
69 { Q -> w_empty = 1;
|
|
Me@0
|
70 status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
|
|
Me@0
|
71 if (status != 0)
|
|
Me@0
|
72 { perror("Thread wait error: ");
|
|
Me@0
|
73 exit(1);
|
|
Me@0
|
74 }
|
|
Me@0
|
75 }
|
|
Me@0
|
76 Q -> w_empty = 0;
|
|
Me@0
|
77 Q -> count -= 1;
|
|
Me@0
|
78 ret = Q->data[ Q->readPos ];
|
|
Me@0
|
79 INC( Q->readPos );
|
|
Me@0
|
80 wt = Q -> w_full;
|
|
Me@0
|
81 Q -> w_full = 0;
|
|
Me@0
|
82 }
|
|
Me@0
|
83 pthread_mutex_unlock( &Q->mutex_t );
|
|
Me@0
|
84 if (wt) pthread_cond_signal( &Q->cond_w_t );
|
|
Me@0
|
85
|
|
Me@0
|
86 return( ret );
|
|
Me@0
|
87 }
|
|
Me@0
|
88
|
|
Me@0
|
89 void writeQ( void * in, QueueStruc* Q )
|
|
Me@0
|
90 {
|
|
Me@0
|
91 int status, wt;
|
|
Me@0
|
92 pthread_mutex_lock( &Q->mutex_t );
|
|
Me@0
|
93 {
|
|
Me@0
|
94 while( Q->count >= 1024 )
|
|
Me@0
|
95 {
|
|
Me@0
|
96 Q -> w_full = 1;
|
|
Me@0
|
97 // pthread_cond_broadcast( &Q->cond_r_t );
|
|
Me@0
|
98 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t );
|
|
Me@0
|
99 if (status != 0)
|
|
Me@0
|
100 { perror("Thread wait error: ");
|
|
Me@0
|
101 exit(1);
|
|
Me@0
|
102 }
|
|
Me@0
|
103 }
|
|
Me@0
|
104 Q -> w_full = 0;
|
|
Me@0
|
105 Q->count += 1;
|
|
Me@0
|
106 Q->data[ Q->writePos ] = in;
|
|
Me@0
|
107 INC( Q->writePos );
|
|
Me@0
|
108 wt = Q -> w_empty;
|
|
Me@0
|
109 Q -> w_empty = 0;
|
|
Me@0
|
110 // pthread_cond_broadcast( &Q->cond_r_t );
|
|
Me@0
|
111 }
|
|
Me@0
|
112 pthread_mutex_unlock( &Q->mutex_t );
|
|
Me@0
|
113 if( wt ) pthread_cond_signal( &Q->cond_r_t );
|
|
Me@0
|
114 }
|
|
Me@0
|
115
|
|
Me@0
|
116
|
|
Me@0
|
117 //===========================================================================
|
|
Me@0
|
118 // multi reader multi writer fast Q via CAS
|
|
Me@0
|
119 #ifndef _GNU_SOURCE
|
|
Me@0
|
120 #define _GNU_SOURCE
|
|
Me@0
|
121
|
|
Me@0
|
122 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
|
|
Me@0
|
123 * or full
|
|
Me@0
|
124 *It uses CAS because it's meant to have more than one reader and more than
|
|
Me@0
|
125 * one writer.
|
|
Me@0
|
126 */
|
|
Me@0
|
127
|
|
Me@0
|
128 CASQueueStruc* makeCASQ()
|
|
Me@0
|
129 {
|
|
Me@0
|
130 CASQueueStruc* retQ;
|
|
Me@0
|
131 retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) );
|
|
Me@0
|
132
|
|
Me@0
|
133 retQ->insertLock = UNLOCKED;
|
|
Me@0
|
134 retQ->extractLock= UNLOCKED;
|
|
Me@0
|
135 //TODO: check got pointer syntax right
|
|
Me@0
|
136 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
|
|
Me@0
|
137 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
|
|
Me@0
|
138 retQ->endOfData = &(retQ->startOfData[1023]);
|
|
Me@0
|
139
|
|
Me@0
|
140 return retQ;
|
|
Me@0
|
141 }
|
|
Me@0
|
142
|
|
Me@0
|
143
|
|
Me@0
|
144 void* readCASQ( CASQueueStruc* Q )
|
|
Me@0
|
145 { void *out = 0;
|
|
Me@0
|
146 int tries = 0;
|
|
Me@0
|
147 int startOfData = Q->startOfData;
|
|
Me@0
|
148 int endOfData = Q->endOfData;
|
|
Me@0
|
149
|
|
Me@0
|
150 int success = FALSE;
|
|
Me@0
|
151
|
|
Me@0
|
152 while( !success )
|
|
Me@0
|
153 { success =
|
|
Me@0
|
154 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
|
|
Me@0
|
155 if( success )
|
|
Me@0
|
156 {
|
|
Me@0
|
157 volatile int insertPos = Q->insertPos;
|
|
Me@0
|
158 volatile int extractPos = Q->extractPos;
|
|
Me@0
|
159
|
|
Me@0
|
160 //if not empty -- extract just below insert when empty
|
|
Me@0
|
161 if( insertPos - extractPos != 1 &&
|
|
Me@0
|
162 !(extractPos == endOfData && insertPos == startOfData))
|
|
Me@0
|
163 { //move before read
|
|
Me@0
|
164 if( extractPos == endOfData ) //write new pos exactly once, correctly
|
|
Me@0
|
165 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
|
|
Me@0
|
166 } // other thread might read bad pos
|
|
Me@0
|
167 else
|
|
Me@0
|
168 { Q->extractPos++;
|
|
Me@0
|
169 }
|
|
Me@0
|
170 out = *(Q->extractPos);
|
|
Me@0
|
171 Q->extractLock = UNLOCKED;
|
|
Me@0
|
172 return out;
|
|
Me@0
|
173 }
|
|
Me@0
|
174 else //Q is empty
|
|
Me@0
|
175 { success = FALSE;
|
|
Me@0
|
176 Q->extractLock = UNLOCKED;//have to try again, release for others
|
|
Me@0
|
177 }
|
|
Me@0
|
178 }
|
|
Me@0
|
179 //Q is busy or empty
|
|
Me@0
|
180 tries++;
|
|
Me@0
|
181 if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock
|
|
Me@0
|
182 }
|
|
Me@0
|
183 }
|
|
Me@0
|
184
|
|
Me@0
|
185 void writeCASQ( void * in, CASQueueStruc* Q )
|
|
Me@0
|
186 {
|
|
Me@0
|
187 int tries = 0;
|
|
Me@0
|
188 int startOfData = Q->startOfData;
|
|
Me@0
|
189 int endOfData = Q->endOfData;
|
|
Me@0
|
190
|
|
Me@0
|
191 int success = FALSE;
|
|
Me@0
|
192
|
|
Me@0
|
193 while( !success )
|
|
Me@0
|
194 { success =
|
|
Me@0
|
195 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
|
|
Me@0
|
196 if( success )
|
|
Me@0
|
197 {
|
|
Me@0
|
198 volatile int insertPos = Q->insertPos;
|
|
Me@0
|
199 volatile int extractPos = Q->extractPos;
|
|
Me@0
|
200
|
|
Me@0
|
201 //check if room to insert.. can't use a count variable
|
|
Me@0
|
202 // 'cause both insertor Thd and extractor Thd would write it
|
|
Me@0
|
203 if( extractPos - insertPos != 1 &&
|
|
Me@0
|
204 !(insertPos == endOfData && extractPos == startOfData))
|
|
Me@0
|
205 { *(insertPos) = in; //insert before move
|
|
Me@0
|
206 if( insertPos == endOfData ) //write new pos exactly once, correctly
|
|
Me@0
|
207 { Q->insertPos = startOfData;
|
|
Me@0
|
208 }
|
|
Me@0
|
209 else
|
|
Me@0
|
210 { Q->insertPos++;
|
|
Me@0
|
211 }
|
|
Me@0
|
212 Q->insertLock = UNLOCKED;
|
|
Me@0
|
213 return;
|
|
Me@0
|
214 }
|
|
Me@0
|
215 else //Q is full
|
|
Me@0
|
216 { success = FALSE;
|
|
Me@0
|
217 Q->insertLock = UNLOCKED;//have to try again, release for others
|
|
Me@0
|
218 }
|
|
Me@0
|
219 }
|
|
Me@0
|
220 tries++;
|
|
Me@0
|
221 if( tries > 10000 ) pthread_yield();//yield not guaranteed
|
|
Me@0
|
222 }
|
|
Me@0
|
223 }
|
|
Me@0
|
224
|
|
Me@0
|
225 #endif //_GNU_SOURCE
|
|
Me@0
|
226
|
|
Me@0
|
227 //===========================================================================
|
|
Me@0
|
228 //Single reader single writer super fast Q.. no atomic instrs..
|
|
Me@0
|
229
|
|
Me@0
|
230
|
|
Me@0
|
231 /*This is a blocking queue, but it uses no atomic instructions, just does
|
|
Me@0
|
232 * busy-waiting when empty or full (but yield() if waits too long)
|
|
Me@0
|
233 *
|
|
Me@0
|
234 *It doesn't need any atomic instructions because only a single thread
|
|
Me@0
|
235 * extracts and only a single thread inserts, and it has no locations that
|
|
Me@0
|
236 * are written by both. It writes before moving and moves before reading,
|
|
Me@0
|
237 * and never lets write position and read position be the same, so dis-
|
|
Me@0
|
238 * synchrony can only ever cause an unnecessary call to yield(), never a
|
|
Me@0
|
239 * wrong value (by monotonicity of movement of pointers, plus single writer
|
|
Me@0
|
240 * to pointers, plus sequence of write before change pointer, plus
|
|
Me@0
|
241 * assumptions that if thread A semantically writes X before Y, then thread
|
|
Me@0
|
242 * B will see the writes in that order.)
|
|
Me@0
|
243 */
|
|
Me@0
|
244
|
|
Me@0
|
245 SRSWQueueStruc* makeSRSWQ()
|
|
Me@0
|
246 {
|
|
Me@0
|
247 SRSWQueueStruc* retQ;
|
|
Me@0
|
248 retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) );
|
|
Me@0
|
249
|
|
Me@0
|
250 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
|
|
Me@0
|
251 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
|
|
Me@0
|
252 retQ->endOfData = &(retQ->startOfData[1023]);
|
|
Me@0
|
253
|
|
Me@0
|
254 return retQ;
|
|
Me@0
|
255 }
|
|
Me@0
|
256
|
|
Me@0
|
257
|
|
Me@0
|
258 void* readSRSWQ( SRSWQueueStruc* Q )
|
|
Me@0
|
259 { void *out = 0;
|
|
Me@0
|
260 int tries = 0;
|
|
Me@0
|
261 int startOfData = Q->startOfData;
|
|
Me@0
|
262 int endOfData = Q->endOfData;
|
|
Me@0
|
263
|
|
Me@0
|
264 while( TRUE )
|
|
Me@0
|
265 { //not certain the volatile reads need to be done, but safe..
|
|
Me@0
|
266 volatile int insertPos = Q->insertPos;
|
|
Me@0
|
267 volatile int extractPos = Q->extractPos;
|
|
Me@0
|
268
|
|
Me@0
|
269 //if not empty -- extract just below insert when empty
|
|
Me@0
|
270 if( insertPos - extractPos != 1 &&
|
|
Me@0
|
271 !(extractPos == endOfData && insertPos == startOfData))
|
|
Me@0
|
272 { //move before read
|
|
Me@0
|
273 if( extractPos == endOfData ) //write new pos exactly once, correctly
|
|
Me@0
|
274 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
|
|
Me@0
|
275 } // other thread might read bad pos
|
|
Me@0
|
276 else
|
|
Me@0
|
277 { Q->extractPos++;
|
|
Me@0
|
278 }
|
|
Me@0
|
279 out = *(Q->extractPos);
|
|
Me@0
|
280 return out;
|
|
Me@0
|
281 }
|
|
Me@0
|
282 //Q is empty
|
|
Me@0
|
283 tries++;
|
|
Me@0
|
284 if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock
|
|
Me@0
|
285 }
|
|
Me@0
|
286 }
|
|
Me@0
|
287
|
|
Me@0
|
288 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
|
|
Me@0
|
289 {
|
|
Me@0
|
290 int tries = 0;
|
|
Me@0
|
291 int startOfData = Q->startOfData;
|
|
Me@0
|
292 int endOfData = Q->endOfData;
|
|
Me@0
|
293
|
|
Me@0
|
294 while( TRUE )
|
|
Me@0
|
295 { //not certain the volatile reads need to be done, but safe..
|
|
Me@0
|
296 volatile int insertPos = Q->insertPos;
|
|
Me@0
|
297 volatile int extractPos = Q->extractPos;
|
|
Me@0
|
298
|
|
Me@0
|
299 if( extractPos - insertPos != 1 &&
|
|
Me@0
|
300 !(insertPos == endOfData && extractPos == startOfData))
|
|
Me@0
|
301 { *(insertPos) = in; //insert before move
|
|
Me@0
|
302 if( insertPos == endOfData ) //write new pos exactly once, correctly
|
|
Me@0
|
303 { Q->insertPos = startOfData;
|
|
Me@0
|
304 }
|
|
Me@0
|
305 else
|
|
Me@0
|
306 { Q->insertPos++;
|
|
Me@0
|
307 }
|
|
Me@0
|
308 return;
|
|
Me@0
|
309 }
|
|
Me@0
|
310 //Q is full
|
|
Me@0
|
311 tries++;
|
|
Me@0
|
312 if( tries > 10000 ) pthread_yield();//yield not guaranteed
|
|
Me@0
|
313 }
|
|
Me@0
|
314 }
|