Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > Queue_impl
comparison BlockingQueue.c @ 0:85af604dee9b
initial add
| author | Me |
|---|---|
| date | Sat, 22 May 2010 19:51:09 -0700 |
| parents | |
| children | 81f6687d52d1 |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:329d0de8e9a4 |
|---|---|
| 1 /* | |
| 2 * Copyright 2009 OpenSourceCodeStewardshipFoundation.org | |
| 3 * Licensed under GNU General Public License version 2 | |
| 4 * | |
| 5 * NOTE: this version of SRSW correct as of April 25, 2010 | |
| 6 * | |
| 7 * Author: seanhalle@yahoo.com | |
| 8 */ | |
| 9 | |
| 10 | |
| 11 #include <stdio.h> | |
| 12 #include <errno.h> | |
| 13 #include <pthread.h> | |
| 14 #include <stdlib.h> | |
| 15 | |
| 16 #include "BlockingQueue.h" | |
| 17 | |
| 18 #define INC(x) (++x == 1024) ? (x) = 0 : (x) | |
| 19 | |
| 20 | |
| 21 //=========================================================================== | |
| 22 //Normal pthread Q | |
| 23 | |
| 24 QueueStruc* makeQ() | |
| 25 { | |
| 26 QueueStruc* retQ; | |
| 27 int status; | |
| 28 retQ = (QueueStruc *) malloc( sizeof( QueueStruc ) ); | |
| 29 | |
| 30 | |
| 31 status = pthread_mutex_init( &retQ->mutex_t, NULL); | |
| 32 if (status < 0) | |
| 33 { | |
| 34 perror("Error in creating mutex:"); | |
| 35 exit(1); | |
| 36 return NULL; | |
| 37 } | |
| 38 | |
| 39 status = pthread_cond_init ( &retQ->cond_w_t, NULL); | |
| 40 if (status < 0) | |
| 41 { | |
| 42 perror("Error in creating cond_var:"); | |
| 43 exit(1); | |
| 44 return NULL; | |
| 45 } | |
| 46 | |
| 47 status = pthread_cond_init ( &retQ->cond_r_t, NULL); | |
| 48 if (status < 0) | |
| 49 { | |
| 50 perror("Error in creating cond_var:"); | |
| 51 exit(1); | |
| 52 return NULL; | |
| 53 } | |
| 54 | |
| 55 retQ->count = 0; | |
| 56 retQ->readPos = 0; | |
| 57 retQ->writePos = 0; | |
| 58 retQ -> w_empty = retQ -> w_full = 0; | |
| 59 | |
| 60 return retQ; | |
| 61 } | |
| 62 | |
| 63 void * readQ( QueueStruc *Q ) | |
| 64 { void *ret; | |
| 65 int status, wt; | |
| 66 pthread_mutex_lock( &Q->mutex_t ); | |
| 67 { | |
| 68 while( Q -> count == 0 ) | |
| 69 { Q -> w_empty = 1; | |
| 70 status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); | |
| 71 if (status != 0) | |
| 72 { perror("Thread wait error: "); | |
| 73 exit(1); | |
| 74 } | |
| 75 } | |
| 76 Q -> w_empty = 0; | |
| 77 Q -> count -= 1; | |
| 78 ret = Q->data[ Q->readPos ]; | |
| 79 INC( Q->readPos ); | |
| 80 wt = Q -> w_full; | |
| 81 Q -> w_full = 0; | |
| 82 } | |
| 83 pthread_mutex_unlock( &Q->mutex_t ); | |
| 84 if (wt) pthread_cond_signal( &Q->cond_w_t ); | |
| 85 | |
| 86 return( ret ); | |
| 87 } | |
| 88 | |
| 89 void writeQ( void * in, QueueStruc* Q ) | |
| 90 { | |
| 91 int status, wt; | |
| 92 pthread_mutex_lock( &Q->mutex_t ); | |
| 93 { | |
| 94 while( Q->count >= 1024 ) | |
| 95 { | |
| 96 Q -> w_full = 1; | |
| 97 // pthread_cond_broadcast( &Q->cond_r_t ); | |
| 98 status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); | |
| 99 if (status != 0) | |
| 100 { perror("Thread wait error: "); | |
| 101 exit(1); | |
| 102 } | |
| 103 } | |
| 104 Q -> w_full = 0; | |
| 105 Q->count += 1; | |
| 106 Q->data[ Q->writePos ] = in; | |
| 107 INC( Q->writePos ); | |
| 108 wt = Q -> w_empty; | |
| 109 Q -> w_empty = 0; | |
| 110 // pthread_cond_broadcast( &Q->cond_r_t ); | |
| 111 } | |
| 112 pthread_mutex_unlock( &Q->mutex_t ); | |
| 113 if( wt ) pthread_cond_signal( &Q->cond_r_t ); | |
| 114 } | |
| 115 | |
| 116 | |
| 117 //=========================================================================== | |
| 118 // multi reader multi writer fast Q via CAS | |
| 119 #ifndef _GNU_SOURCE | |
| 120 #define _GNU_SOURCE | |
| 121 | |
| 122 /*This is a blocking queue, but it uses CAS instr plus yield() when empty | |
| 123 * or full | |
| 124 *It uses CAS because it's meant to have more than one reader and more than | |
| 125 * one writer. | |
| 126 */ | |
| 127 | |
| 128 CASQueueStruc* makeCASQ() | |
| 129 { | |
| 130 CASQueueStruc* retQ; | |
| 131 retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) ); | |
| 132 | |
| 133 retQ->insertLock = UNLOCKED; | |
| 134 retQ->extractLock= UNLOCKED; | |
| 135 //TODO: check got pointer syntax right | |
| 136 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty | |
| 137 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be | |
| 138 retQ->endOfData = &(retQ->startOfData[1023]); | |
| 139 | |
| 140 return retQ; | |
| 141 } | |
| 142 | |
| 143 | |
| 144 void* readCASQ( CASQueueStruc* Q ) | |
| 145 { void *out = 0; | |
| 146 int tries = 0; | |
| 147 int startOfData = Q->startOfData; | |
| 148 int endOfData = Q->endOfData; | |
| 149 | |
| 150 int success = FALSE; | |
| 151 | |
| 152 while( !success ) | |
| 153 { success = | |
| 154 __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); | |
| 155 if( success ) | |
| 156 { | |
| 157 volatile int insertPos = Q->insertPos; | |
| 158 volatile int extractPos = Q->extractPos; | |
| 159 | |
| 160 //if not empty -- extract just below insert when empty | |
| 161 if( insertPos - extractPos != 1 && | |
| 162 !(extractPos == endOfData && insertPos == startOfData)) | |
| 163 { //move before read | |
| 164 if( extractPos == endOfData ) //write new pos exactly once, correctly | |
| 165 { Q->extractPos = startOfData; //can't overrun then fix it 'cause | |
| 166 } // other thread might read bad pos | |
| 167 else | |
| 168 { Q->extractPos++; | |
| 169 } | |
| 170 out = *(Q->extractPos); | |
| 171 Q->extractLock = UNLOCKED; | |
| 172 return out; | |
| 173 } | |
| 174 else //Q is empty | |
| 175 { success = FALSE; | |
| 176 Q->extractLock = UNLOCKED;//have to try again, release for others | |
| 177 } | |
| 178 } | |
| 179 //Q is busy or empty | |
| 180 tries++; | |
| 181 if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock | |
| 182 } | |
| 183 } | |
| 184 | |
| 185 void writeCASQ( void * in, CASQueueStruc* Q ) | |
| 186 { | |
| 187 int tries = 0; | |
| 188 int startOfData = Q->startOfData; | |
| 189 int endOfData = Q->endOfData; | |
| 190 | |
| 191 int success = FALSE; | |
| 192 | |
| 193 while( !success ) | |
| 194 { success = | |
| 195 __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); | |
| 196 if( success ) | |
| 197 { | |
| 198 volatile int insertPos = Q->insertPos; | |
| 199 volatile int extractPos = Q->extractPos; | |
| 200 | |
| 201 //check if room to insert.. can't use a count variable | |
| 202 // 'cause both insertor Thd and extractor Thd would write it | |
| 203 if( extractPos - insertPos != 1 && | |
| 204 !(insertPos == endOfData && extractPos == startOfData)) | |
| 205 { *(insertPos) = in; //insert before move | |
| 206 if( insertPos == endOfData ) //write new pos exactly once, correctly | |
| 207 { Q->insertPos = startOfData; | |
| 208 } | |
| 209 else | |
| 210 { Q->insertPos++; | |
| 211 } | |
| 212 Q->insertLock = UNLOCKED; | |
| 213 return; | |
| 214 } | |
| 215 else //Q is full | |
| 216 { success = FALSE; | |
| 217 Q->insertLock = UNLOCKED;//have to try again, release for others | |
| 218 } | |
| 219 } | |
| 220 tries++; | |
| 221 if( tries > 10000 ) pthread_yield();//yield not guaranteed | |
| 222 } | |
| 223 } | |
| 224 | |
| 225 #endif //_GNU_SOURCE | |
| 226 | |
| 227 //=========================================================================== | |
| 228 //Single reader single writer super fast Q.. no atomic instrs.. | |
| 229 | |
| 230 | |
| 231 /*This is a blocking queue, but it uses no atomic instructions, just does | |
| 232 * busy-waiting when empty or full (but yield() if waits too long) | |
| 233 * | |
| 234 *It doesn't need any atomic instructions because only a single thread | |
| 235 * extracts and only a single thread inserts, and it has no locations that | |
| 236 * are written by both. It writes before moving and moves before reading, | |
| 237 * and never lets write position and read position be the same, so dis- | |
| 238 * synchrony can only ever cause an unnecessary call to yield(), never a | |
| 239 * wrong value (by monotonicity of movement of pointers, plus single writer | |
| 240 * to pointers, plus sequence of write before change pointer, plus | |
| 241 * assumptions that if thread A semantically writes X before Y, then thread | |
| 242 * B will see the writes in that order.) | |
| 243 */ | |
| 244 | |
| 245 SRSWQueueStruc* makeSRSWQ() | |
| 246 { | |
| 247 SRSWQueueStruc* retQ; | |
| 248 retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) ); | |
| 249 | |
| 250 retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty | |
| 251 retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be | |
| 252 retQ->endOfData = &(retQ->startOfData[1023]); | |
| 253 | |
| 254 return retQ; | |
| 255 } | |
| 256 | |
| 257 | |
| 258 void* readSRSWQ( SRSWQueueStruc* Q ) | |
| 259 { void *out = 0; | |
| 260 int tries = 0; | |
| 261 int startOfData = Q->startOfData; | |
| 262 int endOfData = Q->endOfData; | |
| 263 | |
| 264 while( TRUE ) | |
| 265 { //not certain the volatile reads need to be done, but safe.. | |
| 266 volatile int insertPos = Q->insertPos; | |
| 267 volatile int extractPos = Q->extractPos; | |
| 268 | |
| 269 //if not empty -- extract just below insert when empty | |
| 270 if( insertPos - extractPos != 1 && | |
| 271 !(extractPos == endOfData && insertPos == startOfData)) | |
| 272 { //move before read | |
| 273 if( extractPos == endOfData ) //write new pos exactly once, correctly | |
| 274 { Q->extractPos = startOfData; //can't overrun then fix it 'cause | |
| 275 } // other thread might read bad pos | |
| 276 else | |
| 277 { Q->extractPos++; | |
| 278 } | |
| 279 out = *(Q->extractPos); | |
| 280 return out; | |
| 281 } | |
| 282 //Q is empty | |
| 283 tries++; | |
| 284 if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock | |
| 285 } | |
| 286 } | |
| 287 | |
| 288 void writeSRSWQ( void * in, SRSWQueueStruc* Q ) | |
| 289 { | |
| 290 int tries = 0; | |
| 291 int startOfData = Q->startOfData; | |
| 292 int endOfData = Q->endOfData; | |
| 293 | |
| 294 while( TRUE ) | |
| 295 { //not certain the volatile reads need to be done, but safe.. | |
| 296 volatile int insertPos = Q->insertPos; | |
| 297 volatile int extractPos = Q->extractPos; | |
| 298 | |
| 299 if( extractPos - insertPos != 1 && | |
| 300 !(insertPos == endOfData && extractPos == startOfData)) | |
| 301 { *(insertPos) = in; //insert before move | |
| 302 if( insertPos == endOfData ) //write new pos exactly once, correctly | |
| 303 { Q->insertPos = startOfData; | |
| 304 } | |
| 305 else | |
| 306 { Q->insertPos++; | |
| 307 } | |
| 308 return; | |
| 309 } | |
| 310 //Q is full | |
| 311 tries++; | |
| 312 if( tries > 10000 ) pthread_yield();//yield not guaranteed | |
| 313 } | |
| 314 } |
