Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > Queue_impl
view BlockingQueue.c @ 0:85af604dee9b
initial add
| author | Me |
|---|---|
| date | Sat, 22 May 2010 19:51:09 -0700 |
| parents | |
| children | 81f6687d52d1 |
line source
1 /*
2 * Copyright 2009 OpenSourceCodeStewardshipFoundation.org
3 * Licensed under GNU General Public License version 2
4 *
5 * NOTE: this version of SRSW correct as of April 25, 2010
6 *
7 * Author: seanhalle@yahoo.com
8 */
11 #include <stdio.h>
12 #include <errno.h>
13 #include <pthread.h>
14 #include <stdlib.h>
16 #include "BlockingQueue.h"
18 #define INC(x) (++x == 1024) ? (x) = 0 : (x)
21 //===========================================================================
22 //Normal pthread Q
24 QueueStruc* makeQ()
25 {
26 QueueStruc* retQ;
27 int status;
28 retQ = (QueueStruc *) malloc( sizeof( QueueStruc ) );
31 status = pthread_mutex_init( &retQ->mutex_t, NULL);
32 if (status < 0)
33 {
34 perror("Error in creating mutex:");
35 exit(1);
36 return NULL;
37 }
39 status = pthread_cond_init ( &retQ->cond_w_t, NULL);
40 if (status < 0)
41 {
42 perror("Error in creating cond_var:");
43 exit(1);
44 return NULL;
45 }
47 status = pthread_cond_init ( &retQ->cond_r_t, NULL);
48 if (status < 0)
49 {
50 perror("Error in creating cond_var:");
51 exit(1);
52 return NULL;
53 }
55 retQ->count = 0;
56 retQ->readPos = 0;
57 retQ->writePos = 0;
58 retQ -> w_empty = retQ -> w_full = 0;
60 return retQ;
61 }
63 void * readQ( QueueStruc *Q )
64 { void *ret;
65 int status, wt;
66 pthread_mutex_lock( &Q->mutex_t );
67 {
68 while( Q -> count == 0 )
69 { Q -> w_empty = 1;
70 status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
71 if (status != 0)
72 { perror("Thread wait error: ");
73 exit(1);
74 }
75 }
76 Q -> w_empty = 0;
77 Q -> count -= 1;
78 ret = Q->data[ Q->readPos ];
79 INC( Q->readPos );
80 wt = Q -> w_full;
81 Q -> w_full = 0;
82 }
83 pthread_mutex_unlock( &Q->mutex_t );
84 if (wt) pthread_cond_signal( &Q->cond_w_t );
86 return( ret );
87 }
89 void writeQ( void * in, QueueStruc* Q )
90 {
91 int status, wt;
92 pthread_mutex_lock( &Q->mutex_t );
93 {
94 while( Q->count >= 1024 )
95 {
96 Q -> w_full = 1;
97 // pthread_cond_broadcast( &Q->cond_r_t );
98 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t );
99 if (status != 0)
100 { perror("Thread wait error: ");
101 exit(1);
102 }
103 }
104 Q -> w_full = 0;
105 Q->count += 1;
106 Q->data[ Q->writePos ] = in;
107 INC( Q->writePos );
108 wt = Q -> w_empty;
109 Q -> w_empty = 0;
110 // pthread_cond_broadcast( &Q->cond_r_t );
111 }
112 pthread_mutex_unlock( &Q->mutex_t );
113 if( wt ) pthread_cond_signal( &Q->cond_r_t );
114 }
117 //===========================================================================
118 // multi reader multi writer fast Q via CAS
119 #ifndef _GNU_SOURCE
120 #define _GNU_SOURCE
122 /*This is a blocking queue, but it uses CAS instr plus yield() when empty
123 * or full
124 *It uses CAS because it's meant to have more than one reader and more than
125 * one writer.
126 */
128 CASQueueStruc* makeCASQ()
129 {
130 CASQueueStruc* retQ;
131 retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) );
133 retQ->insertLock = UNLOCKED;
134 retQ->extractLock= UNLOCKED;
135 //TODO: check got pointer syntax right
136 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
137 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
138 retQ->endOfData = &(retQ->startOfData[1023]);
140 return retQ;
141 }
144 void* readCASQ( CASQueueStruc* Q )
145 { void *out = 0;
146 int tries = 0;
147 int startOfData = Q->startOfData;
148 int endOfData = Q->endOfData;
150 int success = FALSE;
152 while( !success )
153 { success =
154 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
155 if( success )
156 {
157 volatile int insertPos = Q->insertPos;
158 volatile int extractPos = Q->extractPos;
160 //if not empty -- extract just below insert when empty
161 if( insertPos - extractPos != 1 &&
162 !(extractPos == endOfData && insertPos == startOfData))
163 { //move before read
164 if( extractPos == endOfData ) //write new pos exactly once, correctly
165 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
166 } // other thread might read bad pos
167 else
168 { Q->extractPos++;
169 }
170 out = *(Q->extractPos);
171 Q->extractLock = UNLOCKED;
172 return out;
173 }
174 else //Q is empty
175 { success = FALSE;
176 Q->extractLock = UNLOCKED;//have to try again, release for others
177 }
178 }
179 //Q is busy or empty
180 tries++;
181 if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock
182 }
183 }
185 void writeCASQ( void * in, CASQueueStruc* Q )
186 {
187 int tries = 0;
188 int startOfData = Q->startOfData;
189 int endOfData = Q->endOfData;
191 int success = FALSE;
193 while( !success )
194 { success =
195 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
196 if( success )
197 {
198 volatile int insertPos = Q->insertPos;
199 volatile int extractPos = Q->extractPos;
201 //check if room to insert.. can't use a count variable
202 // 'cause both insertor Thd and extractor Thd would write it
203 if( extractPos - insertPos != 1 &&
204 !(insertPos == endOfData && extractPos == startOfData))
205 { *(insertPos) = in; //insert before move
206 if( insertPos == endOfData ) //write new pos exactly once, correctly
207 { Q->insertPos = startOfData;
208 }
209 else
210 { Q->insertPos++;
211 }
212 Q->insertLock = UNLOCKED;
213 return;
214 }
215 else //Q is full
216 { success = FALSE;
217 Q->insertLock = UNLOCKED;//have to try again, release for others
218 }
219 }
220 tries++;
221 if( tries > 10000 ) pthread_yield();//yield not guaranteed
222 }
223 }
225 #endif //_GNU_SOURCE
227 //===========================================================================
228 //Single reader single writer super fast Q.. no atomic instrs..
231 /*This is a blocking queue, but it uses no atomic instructions, just does
232 * busy-waiting when empty or full (but yield() if waits too long)
233 *
234 *It doesn't need any atomic instructions because only a single thread
235 * extracts and only a single thread inserts, and it has no locations that
236 * are written by both. It writes before moving and moves before reading,
237 * and never lets write position and read position be the same, so dis-
238 * synchrony can only ever cause an unnecessary call to yield(), never a
239 * wrong value (by monotonicity of movement of pointers, plus single writer
240 * to pointers, plus sequence of write before change pointer, plus
241 * assumptions that if thread A semantically writes X before Y, then thread
242 * B will see the writes in that order.)
243 */
245 SRSWQueueStruc* makeSRSWQ()
246 {
247 SRSWQueueStruc* retQ;
248 retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) );
250 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
251 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be
252 retQ->endOfData = &(retQ->startOfData[1023]);
254 return retQ;
255 }
258 void* readSRSWQ( SRSWQueueStruc* Q )
259 { void *out = 0;
260 int tries = 0;
261 int startOfData = Q->startOfData;
262 int endOfData = Q->endOfData;
264 while( TRUE )
265 { //not certain the volatile reads need to be done, but safe..
266 volatile int insertPos = Q->insertPos;
267 volatile int extractPos = Q->extractPos;
269 //if not empty -- extract just below insert when empty
270 if( insertPos - extractPos != 1 &&
271 !(extractPos == endOfData && insertPos == startOfData))
272 { //move before read
273 if( extractPos == endOfData ) //write new pos exactly once, correctly
274 { Q->extractPos = startOfData; //can't overrun then fix it 'cause
275 } // other thread might read bad pos
276 else
277 { Q->extractPos++;
278 }
279 out = *(Q->extractPos);
280 return out;
281 }
282 //Q is empty
283 tries++;
284 if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock
285 }
286 }
288 void writeSRSWQ( void * in, SRSWQueueStruc* Q )
289 {
290 int tries = 0;
291 int startOfData = Q->startOfData;
292 int endOfData = Q->endOfData;
294 while( TRUE )
295 { //not certain the volatile reads need to be done, but safe..
296 volatile int insertPos = Q->insertPos;
297 volatile int extractPos = Q->extractPos;
299 if( extractPos - insertPos != 1 &&
300 !(insertPos == endOfData && extractPos == startOfData))
301 { *(insertPos) = in; //insert before move
302 if( insertPos == endOfData ) //write new pos exactly once, correctly
303 { Q->insertPos = startOfData;
304 }
305 else
306 { Q->insertPos++;
307 }
308 return;
309 }
310 //Q is full
311 tries++;
312 if( tries > 10000 ) pthread_yield();//yield not guaranteed
313 }
314 }
