Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > Queue_impl
diff BlockingQueue.c @ 0:85af604dee9b
initial add
| author | Me |
|---|---|
| date | Sat, 22 May 2010 19:51:09 -0700 |
| parents | |
| children | 81f6687d52d1 |
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/BlockingQueue.c Sat May 22 19:51:09 2010 -0700 1.3 @@ -0,0 +1,314 @@ 1.4 +/* 1.5 + * Copyright 2009 OpenSourceCodeStewardshipFoundation.org 1.6 + * Licensed under GNU General Public License version 2 1.7 + * 1.8 + * NOTE: this version of SRSW correct as of April 25, 2010 1.9 + * 1.10 + * Author: seanhalle@yahoo.com 1.11 + */ 1.12 + 1.13 + 1.14 +#include <stdio.h> 1.15 +#include <errno.h> 1.16 +#include <pthread.h> 1.17 +#include <stdlib.h> 1.18 + 1.19 +#include "BlockingQueue.h" 1.20 + 1.21 +#define INC(x) (++x == 1024) ? (x) = 0 : (x) 1.22 + 1.23 + 1.24 +//=========================================================================== 1.25 +//Normal pthread Q 1.26 + 1.27 +QueueStruc* makeQ() 1.28 + { 1.29 + QueueStruc* retQ; 1.30 + int status; 1.31 + retQ = (QueueStruc *) malloc( sizeof( QueueStruc ) ); 1.32 + 1.33 + 1.34 + status = pthread_mutex_init( &retQ->mutex_t, NULL); 1.35 + if (status < 0) 1.36 + { 1.37 + perror("Error in creating mutex:"); 1.38 + exit(1); 1.39 + return NULL; 1.40 + } 1.41 + 1.42 + status = pthread_cond_init ( &retQ->cond_w_t, NULL); 1.43 + if (status < 0) 1.44 + { 1.45 + perror("Error in creating cond_var:"); 1.46 + exit(1); 1.47 + return NULL; 1.48 + } 1.49 + 1.50 + status = pthread_cond_init ( &retQ->cond_r_t, NULL); 1.51 + if (status < 0) 1.52 + { 1.53 + perror("Error in creating cond_var:"); 1.54 + exit(1); 1.55 + return NULL; 1.56 + } 1.57 + 1.58 + retQ->count = 0; 1.59 + retQ->readPos = 0; 1.60 + retQ->writePos = 0; 1.61 + retQ -> w_empty = retQ -> w_full = 0; 1.62 + 1.63 + return retQ; 1.64 + } 1.65 + 1.66 +void * readQ( QueueStruc *Q ) 1.67 + { void *ret; 1.68 + int status, wt; 1.69 + pthread_mutex_lock( &Q->mutex_t ); 1.70 + { 1.71 + while( Q -> count == 0 ) 1.72 + { Q -> w_empty = 1; 1.73 + status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t ); 1.74 + if (status != 0) 1.75 + { perror("Thread wait error: "); 1.76 + exit(1); 1.77 + } 1.78 + } 1.79 + Q -> w_empty = 0; 1.80 + Q -> count -= 1; 1.81 + ret = Q->data[ Q->readPos ]; 1.82 + INC( Q->readPos ); 1.83 + wt = Q -> w_full; 1.84 + Q -> w_full = 0; 1.85 + } 1.86 + pthread_mutex_unlock( &Q->mutex_t ); 1.87 + if (wt) pthread_cond_signal( &Q->cond_w_t ); 1.88 + 1.89 + return( ret ); 1.90 + } 1.91 + 1.92 +void writeQ( void * in, QueueStruc* Q ) 1.93 + { 1.94 + int status, wt; 1.95 + pthread_mutex_lock( &Q->mutex_t ); 1.96 + { 1.97 + while( Q->count >= 1024 ) 1.98 + { 1.99 + Q -> w_full = 1; 1.100 + // pthread_cond_broadcast( &Q->cond_r_t ); 1.101 + status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t ); 1.102 + if (status != 0) 1.103 + { perror("Thread wait error: "); 1.104 + exit(1); 1.105 + } 1.106 + } 1.107 + Q -> w_full = 0; 1.108 + Q->count += 1; 1.109 + Q->data[ Q->writePos ] = in; 1.110 + INC( Q->writePos ); 1.111 + wt = Q -> w_empty; 1.112 + Q -> w_empty = 0; 1.113 + // pthread_cond_broadcast( &Q->cond_r_t ); 1.114 + } 1.115 + pthread_mutex_unlock( &Q->mutex_t ); 1.116 + if( wt ) pthread_cond_signal( &Q->cond_r_t ); 1.117 + } 1.118 + 1.119 + 1.120 +//=========================================================================== 1.121 +// multi reader multi writer fast Q via CAS 1.122 +#ifndef _GNU_SOURCE 1.123 +#define _GNU_SOURCE 1.124 + 1.125 +/*This is a blocking queue, but it uses CAS instr plus yield() when empty 1.126 + * or full 1.127 + *It uses CAS because it's meant to have more than one reader and more than 1.128 + * one writer. 1.129 + */ 1.130 + 1.131 +CASQueueStruc* makeCASQ() 1.132 + { 1.133 + CASQueueStruc* retQ; 1.134 + retQ = (CASQueueStruc *) malloc( sizeof( CASQueueStruc ) ); 1.135 + 1.136 + retQ->insertLock = UNLOCKED; 1.137 + retQ->extractLock= UNLOCKED; 1.138 + //TODO: check got pointer syntax right 1.139 + retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty 1.140 + retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be 1.141 + retQ->endOfData = &(retQ->startOfData[1023]); 1.142 + 1.143 + return retQ; 1.144 + } 1.145 + 1.146 + 1.147 +void* readCASQ( CASQueueStruc* Q ) 1.148 + { void *out = 0; 1.149 + int tries = 0; 1.150 + int startOfData = Q->startOfData; 1.151 + int endOfData = Q->endOfData; 1.152 + 1.153 + int success = FALSE; 1.154 + 1.155 + while( !success ) 1.156 + { success = 1.157 + __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED ); 1.158 + if( success ) 1.159 + { 1.160 + volatile int insertPos = Q->insertPos; 1.161 + volatile int extractPos = Q->extractPos; 1.162 + 1.163 + //if not empty -- extract just below insert when empty 1.164 + if( insertPos - extractPos != 1 && 1.165 + !(extractPos == endOfData && insertPos == startOfData)) 1.166 + { //move before read 1.167 + if( extractPos == endOfData ) //write new pos exactly once, correctly 1.168 + { Q->extractPos = startOfData; //can't overrun then fix it 'cause 1.169 + } // other thread might read bad pos 1.170 + else 1.171 + { Q->extractPos++; 1.172 + } 1.173 + out = *(Q->extractPos); 1.174 + Q->extractLock = UNLOCKED; 1.175 + return out; 1.176 + } 1.177 + else //Q is empty 1.178 + { success = FALSE; 1.179 + Q->extractLock = UNLOCKED;//have to try again, release for others 1.180 + } 1.181 + } 1.182 + //Q is busy or empty 1.183 + tries++; 1.184 + if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock 1.185 + } 1.186 + } 1.187 + 1.188 +void writeCASQ( void * in, CASQueueStruc* Q ) 1.189 + { 1.190 + int tries = 0; 1.191 + int startOfData = Q->startOfData; 1.192 + int endOfData = Q->endOfData; 1.193 + 1.194 + int success = FALSE; 1.195 + 1.196 + while( !success ) 1.197 + { success = 1.198 + __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED ); 1.199 + if( success ) 1.200 + { 1.201 + volatile int insertPos = Q->insertPos; 1.202 + volatile int extractPos = Q->extractPos; 1.203 + 1.204 + //check if room to insert.. can't use a count variable 1.205 + // 'cause both insertor Thd and extractor Thd would write it 1.206 + if( extractPos - insertPos != 1 && 1.207 + !(insertPos == endOfData && extractPos == startOfData)) 1.208 + { *(insertPos) = in; //insert before move 1.209 + if( insertPos == endOfData ) //write new pos exactly once, correctly 1.210 + { Q->insertPos = startOfData; 1.211 + } 1.212 + else 1.213 + { Q->insertPos++; 1.214 + } 1.215 + Q->insertLock = UNLOCKED; 1.216 + return; 1.217 + } 1.218 + else //Q is full 1.219 + { success = FALSE; 1.220 + Q->insertLock = UNLOCKED;//have to try again, release for others 1.221 + } 1.222 + } 1.223 + tries++; 1.224 + if( tries > 10000 ) pthread_yield();//yield not guaranteed 1.225 + } 1.226 + } 1.227 + 1.228 +#endif //_GNU_SOURCE 1.229 + 1.230 +//=========================================================================== 1.231 +//Single reader single writer super fast Q.. no atomic instrs.. 1.232 + 1.233 + 1.234 +/*This is a blocking queue, but it uses no atomic instructions, just does 1.235 + * busy-waiting when empty or full (but yield() if waits too long) 1.236 + * 1.237 + *It doesn't need any atomic instructions because only a single thread 1.238 + * extracts and only a single thread inserts, and it has no locations that 1.239 + * are written by both. It writes before moving and moves before reading, 1.240 + * and never lets write position and read position be the same, so dis- 1.241 + * synchrony can only ever cause an unnecessary call to yield(), never a 1.242 + * wrong value (by monotonicity of movement of pointers, plus single writer 1.243 + * to pointers, plus sequence of write before change pointer, plus 1.244 + * assumptions that if thread A semantically writes X before Y, then thread 1.245 + * B will see the writes in that order.) 1.246 + */ 1.247 + 1.248 +SRSWQueueStruc* makeSRSWQ() 1.249 + { 1.250 + SRSWQueueStruc* retQ; 1.251 + retQ = (SRSWQueueStruc *) malloc( sizeof( SRSWQueueStruc ) ); 1.252 + 1.253 + retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty 1.254 + retQ->insertPos = &(retQ->startOfData[1]); // so start pos's have to be 1.255 + retQ->endOfData = &(retQ->startOfData[1023]); 1.256 + 1.257 + return retQ; 1.258 + } 1.259 + 1.260 + 1.261 +void* readSRSWQ( SRSWQueueStruc* Q ) 1.262 + { void *out = 0; 1.263 + int tries = 0; 1.264 + int startOfData = Q->startOfData; 1.265 + int endOfData = Q->endOfData; 1.266 + 1.267 + while( TRUE ) 1.268 + { //not certain the volatile reads need to be done, but safe.. 1.269 + volatile int insertPos = Q->insertPos; 1.270 + volatile int extractPos = Q->extractPos; 1.271 + 1.272 + //if not empty -- extract just below insert when empty 1.273 + if( insertPos - extractPos != 1 && 1.274 + !(extractPos == endOfData && insertPos == startOfData)) 1.275 + { //move before read 1.276 + if( extractPos == endOfData ) //write new pos exactly once, correctly 1.277 + { Q->extractPos = startOfData; //can't overrun then fix it 'cause 1.278 + } // other thread might read bad pos 1.279 + else 1.280 + { Q->extractPos++; 1.281 + } 1.282 + out = *(Q->extractPos); 1.283 + return out; 1.284 + } 1.285 + //Q is empty 1.286 + tries++; 1.287 + if( tries > 10000 ) pthread_yield();//not guaranteed, so quasi spin-lock 1.288 + } 1.289 + } 1.290 + 1.291 +void writeSRSWQ( void * in, SRSWQueueStruc* Q ) 1.292 + { 1.293 + int tries = 0; 1.294 + int startOfData = Q->startOfData; 1.295 + int endOfData = Q->endOfData; 1.296 + 1.297 + while( TRUE ) 1.298 + { //not certain the volatile reads need to be done, but safe.. 1.299 + volatile int insertPos = Q->insertPos; 1.300 + volatile int extractPos = Q->extractPos; 1.301 + 1.302 + if( extractPos - insertPos != 1 && 1.303 + !(insertPos == endOfData && extractPos == startOfData)) 1.304 + { *(insertPos) = in; //insert before move 1.305 + if( insertPos == endOfData ) //write new pos exactly once, correctly 1.306 + { Q->insertPos = startOfData; 1.307 + } 1.308 + else 1.309 + { Q->insertPos++; 1.310 + } 1.311 + return; 1.312 + } 1.313 + //Q is full 1.314 + tries++; 1.315 + if( tries > 10000 ) pthread_yield();//yield not guaranteed 1.316 + } 1.317 + }
