Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > pthread > pthread__k_tuple__async
changeset 0:9cf9b2091eeb cond_var
working condition variable version
| author | Sean Halle <seanhalle@yahoo.com> |
|---|---|
| date | Wed, 10 Jul 2013 14:13:46 -0700 |
| parents | |
| children | 88db7b62b961 |
| files | .hgignore src/Application/consumer.c src/Application/main.c src/Application/main.h src/Application/producer.c |
| diffstat | 5 files changed, 650 insertions(+), 0 deletions(-) [+] |
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/.hgignore Wed Jul 10 14:13:46 2013 -0700 1.3 @@ -0,0 +1,3 @@ 1.4 +nbproject 1.5 +task_size_vs_exe_time 1.6 +glob: *.o
2.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 2.2 +++ b/src/Application/consumer.c Wed Jul 10 14:13:46 2013 -0700 2.3 @@ -0,0 +1,94 @@ 2.4 +/* 2.5 + * 2.6 + */ 2.7 + 2.8 +#include "main.h" 2.9 +#include <pthread.h> 2.10 + 2.11 +/* 2.12 + * Consumer. 2.13 + * 2.14 + * Birth function for thread that performs the consumer behavior 2.15 + * 2.16 + *Here's the protocol: 2.17 + *Consumer is born waiting for some producer to send it a production. 2.18 + *When it wakes, it reads the variables used for communication, and packages 2.19 + * the information into a tuple it is constructing. 2.20 + *Then it wakes the producer, who is waiting to be sure that send was received. 2.21 + *If the tuple is not yet complete, it loops back for another production. 2.22 + *When tuple complete, it adds that tuple to the output 2.23 + *If that's the last tuple, it ends itself 2.24 + *If not, then wakes all the producers, who go to the next iteration. 2.25 + *Then loops back to wait for some producer to send it a production 2.26 + */ 2.27 +void* 2.28 +consumer_birthFn( void* _params ) 2.29 + { 2.30 + int lastSeenProductionNum, numProducts; 2.31 + 2.32 + ConsumerParams* params = (ConsumerParams *)_params; 2.33 + 2.34 + 2.35 + /*The consumer does two loops. 2.36 + * The outside loop counts the number of tuples created. 2.37 + * The inside loop collects the products for one tuple. 2.38 + * 2.39 + *Protocol: 2.40 + * increment tupleIter 2.41 + * wake producers for next tuple 2.42 + * wait on production ready 2.43 + * read comm vars and add msg to current tuple 2.44 + * increment ACK count 2.45 + * wake producer (who is waiting for ack) 2.46 + * if more productions for current tuple, repeat 2.47 + * have all productions for current tuple, so add tuple to output 2.48 + * if have all tuples are going to produce, end 2.49 + * else more, so repeat 2.50 + */ 2.51 + while( tupleIter < params->numTuplesToCreate ) 2.52 + { 2.53 + // increment tupleIter (global shared) 2.54 + // wake producers for next iter (don't need cond lock? -- teeter totter) 2.55 + DEBUG__printf("consumer broadcast for next iter\n"); 2.56 + pthread_mutex_lock(&tupleIterLock); 2.57 + tupleIter += 1; 2.58 + pthread_cond_broadcast( &tupleIterCond ); 2.59 + pthread_mutex_unlock(&tupleIterLock); 2.60 + 2.61 + if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter); 2.62 + 2.63 + for( numProducts = 0; numProducts < params->numProducers; numProducts++ ) 2.64 + { 2.65 + //wait on productionReadyCond (suspend until there is a production) 2.66 + pthread_mutex_lock( &productionReadyLock ); 2.67 + while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID 2.68 + { 2.69 + pthread_cond_wait( &productionReadyCond, &productionReadyLock ); 2.70 + } 2.71 + DEBUG__printf1( "consumer got production %d\n", currProductionNum ); 2.72 + lastSeenProductionNum = currProductionNum; 2.73 + pthread_mutex_unlock( &productionReadyLock ); 2.74 + 2.75 + //Read comm vars and add msg to current tuple 2.76 + //add production to tuple -- overhead meas, do nothing 2.77 + 2.78 + // increment ACK count 2.79 + pthread_mutex_lock( &consumerReceivedAckLock ); 2.80 + currConsumerReceivedACKNum += 1;//make different than last time prod saw 2.81 + 2.82 + // wake producer (who is waiting to be sure that send was received) 2.83 + DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum); 2.84 + pthread_cond_broadcast( &consumerReceivedAckCond ); 2.85 + pthread_mutex_unlock( &consumerReceivedAckLock ); 2.86 + } //if more productions for current tuple, repeat 2.87 + 2.88 + // have all productions for current tuple, so add tuple to output 2.89 + //add tuple to output and malloc new tuple array -- overhead meas, do nothing 2.90 + 2.91 + } // if have all tuples are going to produce, end 2.92 + 2.93 + //Shutdown consumer thread 2.94 + pthread_exit(NULL); 2.95 + 2.96 + } 2.97 +
3.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 3.2 +++ b/src/Application/main.c Wed Jul 10 14:13:46 2013 -0700 3.3 @@ -0,0 +1,277 @@ 3.4 +/* 3.5 + * 3.6 + */ 3.7 + 3.8 +#include "main.h" 3.9 + 3.10 +//========== Global Vars =========== 3.11 + 3.12 +const char *usage = 3.13 + { 3.14 + "Usage: k_tuple_async [options]\n" 3.15 + " Creates a number of workers, and one consumer that packages productions " 3.16 + " into a tuple.\n\n" 3.17 + "Options:\n" 3.18 + " -p <num> The number of producer threads to create.\n" 3.19 + " -t <num> the number of tuples to create\n" 3.20 + " -h this help screen\n\n" 3.21 + }; 3.22 + 3.23 +char __ProgrammName[] = "K-tuple_async"; 3.24 +char __DataSet[255]; 3.25 + 3.26 +#ifdef MEASURE_PERF 3.27 +int cycles_counter_fd[NUM_CORES]; 3.28 +int instrs_counter_fd[NUM_CORES]; 3.29 +int cycles_counter_main_fd; 3.30 +#endif 3.31 + 3.32 +pthread_mutex_t waitForAllDoneLock; 3.33 +pthread_cond_t waitForAllDoneCond; 3.34 + 3.35 + 3.36 +//=================================== 3.37 +/* provide a millisecond-resolution timer for each system */ 3.38 +#if defined(unix) || defined(__unix__) 3.39 +#include <time.h> 3.40 +#include <sys/time.h> 3.41 +unsigned long get_msec(void) { 3.42 + static struct timeval timeval, first_timeval; 3.43 + 3.44 + gettimeofday(&timeval, 0); 3.45 + if(first_timeval.tv_sec == 0) { 3.46 + first_timeval = timeval; 3.47 + return 0; 3.48 + } 3.49 + return (timeval.tv_sec - first_timeval.tv_sec) * 1000 + (timeval.tv_usec - first_timeval.tv_usec) / 1000; 3.50 +} 3.51 +#elif defined(__WIN32__) || defined(WIN32) 3.52 +#include <windows.h> 3.53 +unsigned long get_msec(void) { 3.54 + return GetTickCount(); 3.55 +} 3.56 +#else 3.57 +#error "I don't know how to measure time on your platform" 3.58 +#endif 3.59 + 3.60 +/*Initializes the performance counters, and opens the file descriptors used 3.61 + * to read from the performance counters 3.62 + */ 3.63 +void 3.64 +set_up_performance_counters() 3.65 + { int i; 3.66 + 3.67 + #ifdef MEASURE_PERF 3.68 + //setup performance counters 3.69 + struct perf_event_attr hw_event; 3.70 + memset(&hw_event,0,sizeof(hw_event)); 3.71 + hw_event.type = PERF_TYPE_HARDWARE; 3.72 + hw_event.size = sizeof(hw_event); 3.73 + hw_event.disabled = 0; 3.74 + hw_event.freq = 0; 3.75 + hw_event.inherit = 1; /* children inherit it */ 3.76 + hw_event.pinned = 1; /* must always be on PMU */ 3.77 + hw_event.exclusive = 0; /* only group on PMU */ 3.78 + hw_event.exclude_user = 0; /* don't count user */ 3.79 + hw_event.exclude_kernel = 1; /* ditto kernel */ 3.80 + hw_event.exclude_hv = 1; /* ditto hypervisor */ 3.81 + hw_event.exclude_idle = 1; /* don't count when idle */ 3.82 + hw_event.mmap = 0; /* include mmap data */ 3.83 + hw_event.comm = 0; /* include comm data */ 3.84 + 3.85 + 3.86 + for( i = 0; i < NUM_CORES; i++ ) 3.87 + { 3.88 + hw_event.config = PERF_COUNT_HW_CPU_CYCLES; //cycles 3.89 + cycles_counter_fd[i] = syscall(__NR_perf_event_open, &hw_event, 3.90 + 0,//pid_t pid, 3.91 + i,//int cpu, 3.92 + -1,//int group_fd, 3.93 + 0//unsigned long flags 3.94 + ); 3.95 + if (cycles_counter_fd[i]<0){ 3.96 + fprintf(stderr,"On core %d: ",i); 3.97 + perror("Failed to open cycles counter"); 3.98 + } 3.99 + } 3.100 + 3.101 + int cycles_counter_main_fd; 3.102 + hw_event.config = PERF_COUNT_HW_CPU_CYCLES; //cycles 3.103 + hw_event.exclude_kernel=0; 3.104 + cycles_counter_main_fd = syscall(__NR_perf_event_open, &hw_event, 3.105 + 0,//pid_t pid, 3.106 + -1,//int cpu, 3.107 + -1,//int group_fd, 3.108 + 0//unsigned long flags 3.109 + ); 3.110 + if (cycles_counter_main_fd<0){ 3.111 + perror("Failed to open main cycles counter"); 3.112 + } 3.113 + 3.114 + #endif 3.115 + } 3.116 + 3.117 + 3.118 +void 3.119 +init_stuff() 3.120 + { 3.121 + pthread_mutex_init(&tupleIterLock, NULL); 3.122 + pthread_cond_init( &tupleIterCond, NULL ); 3.123 + tupleIter = 0; 3.124 + 3.125 + pthread_mutex_init(&producerAccessMutex, NULL); 3.126 + pthread_mutex_init(&productionReadyLock, NULL); 3.127 + pthread_cond_init( &productionReadyCond, NULL ); 3.128 + currProductionNum = 0; 3.129 + 3.130 + pthread_mutex_init(&consumerReceivedAckLock, NULL); 3.131 + pthread_cond_init( &consumerReceivedAckCond, NULL ); 3.132 + currConsumerReceivedACKNum = 0; 3.133 + } 3.134 + 3.135 + 3.136 +typedef struct 3.137 + { 3.138 + int numProducers; 3.139 + int numTuplesToCreate; 3.140 + } 3.141 +ParsedArgs; 3.142 + 3.143 +/*The benchmark Fn creates the producers and the consumer, then gives the 3.144 + * "go" signal. It measures time from go until the consumer produces the 3.145 + * last tuple as output. 3.146 + */ 3.147 +void 3.148 +benchmark( ParsedArgs *args ) 3.149 + { 3.150 + int i; 3.151 + ProducerParams producerParams[args->numProducers]; 3.152 + pthread_t producerThds[args->numProducers]; 3.153 + pthread_t consumerThd; 3.154 + 3.155 + ConsumerParams consumerParams; 3.156 + 3.157 + //Set up the param structs for producers.. gives them the mutex and cond var 3.158 + // to communicate with consumer 3.159 + //Also the core the producer should pin its thread to 3.160 + for(i=0; i < args->numProducers; i++) 3.161 + { 3.162 + producerParams[i].producerID = i + 1; //no ID of 0, a fact used in handshake 3.163 + producerParams[i].numTuplesToCreate = args->numTuplesToCreate; 3.164 + producerParams[i].coreID = i % NUM_CORES; 3.165 + } 3.166 + 3.167 + consumerParams.numProducers = args->numProducers; 3.168 + consumerParams.numTuplesToCreate = args->numTuplesToCreate; 3.169 + 3.170 + //take measurement before creation of threads, to get total exetime 3.171 + MeasStruct benchStartMeas, benchEndMeas; 3.172 + 3.173 + takeAMeas(0, benchStartMeas); 3.174 + 3.175 + for(i=0; i < args->numProducers; i++) 3.176 + { pthread_create( &producerThds[i], NULL, &producer_birthFn, (void*)&producerParams[i]); 3.177 + } 3.178 + 3.179 + pthread_create( &consumerThd, NULL, &consumer_birthFn, (void*)&consumerParams ); 3.180 + 3.181 + for(i=0; i<args->numProducers; i++) 3.182 + { pthread_join( producerThds[i], NULL ); 3.183 + } 3.184 + pthread_join( consumerThd, NULL ); 3.185 + 3.186 + //work is all done, so take a measurement snapshot at end 3.187 + takeAMeas(0, benchEndMeas); 3.188 + 3.189 + 3.190 +#ifdef MEASURE_PERF 3.191 + uint64_t totalExeCycles = ( benchEndMeas.cycles - benchStartMeas.cycles); 3.192 + printf("Total Execution: %lu\n", totalExeCycles); 3.193 +#else 3.194 + uint64_t totalExeCycles = ( benchEndMeas.total - benchStartMeas.total); 3.195 + printf("Total Cycles of Execution: %lu\n", totalExeCycles); 3.196 +#endif 3.197 + 3.198 + //====================================================== 3.199 + } 3.200 + 3.201 + 3.202 +/*This parsed the command line arguments and returns the values in a struct 3.203 + * Command line args should be a '-' followed by a single letter, then a value 3.204 + */ 3.205 +ParsedArgs * 3.206 +parse_arguments( int argc, char **argv ) 3.207 + { ParsedArgs *parsedArgs; 3.208 + int i; 3.209 + 3.210 + parsedArgs = malloc(sizeof(ParsedArgs)); 3.211 + if(argc < 2) 3.212 + { fprintf(stdout, "must give arguments"); 3.213 + fputs(usage, stdout); 3.214 + return EXIT_FAILURE; 3.215 + } 3.216 + for( i=1; i < argc; i++ ) 3.217 + { if(argv[i][0] == '-' && argv[i][2] == 0) 3.218 + { switch(argv[i][1]) 3.219 + { case 'p': 3.220 + { if(!isdigit(argv[++i][0])) 3.221 + { fprintf(stderr, "-p must be followed by the number of producer threads to spawn\n"); 3.222 + return EXIT_FAILURE; 3.223 + } 3.224 + parsedArgs->numProducers = atoi(argv[i]); 3.225 + if( parsedArgs->numProducers == 0 ) 3.226 + { fprintf(stderr, "invalid number of producers specified: %d\n", parsedArgs->numProducers); 3.227 + return EXIT_FAILURE; 3.228 + } 3.229 + else 3.230 + { DEBUG__printf1("num producers: %d\n", parsedArgs->numProducers ); 3.231 + } 3.232 + } 3.233 + break; 3.234 + case 't': 3.235 + { if( !isdigit( argv[++i][0] ) ) 3.236 + { fputs("-t must be followed by a number\n", stderr); 3.237 + return EXIT_FAILURE; 3.238 + } 3.239 + parsedArgs->numTuplesToCreate = atoi(argv[i]); 3.240 + DEBUG__printf1("num tuples to produce: %d\n", parsedArgs->numTuplesToCreate ); 3.241 + } 3.242 + break; 3.243 + case 'h': 3.244 + { fputs(usage, stdout); 3.245 + return 0; 3.246 + } 3.247 + default: 3.248 + { fprintf(stderr, "unrecognized argument: %s\n", argv[i]); 3.249 + fputs(usage, stderr); 3.250 + return EXIT_FAILURE; 3.251 + } 3.252 + } 3.253 + } 3.254 + else 3.255 + { fprintf(stdout, "unrecognized argument: %s\n", argv[i]); 3.256 + fputs(usage, stdout); 3.257 + return EXIT_FAILURE; 3.258 + } 3.259 + }//for 3.260 + return parsedArgs; 3.261 + } 3.262 + 3.263 +int main(int argc, char **argv) 3.264 + { ParsedArgs *args; 3.265 + int i; 3.266 + 3.267 + 3.268 + set_up_performance_counters(); 3.269 + 3.270 + init_stuff(); 3.271 + 3.272 + args = parse_arguments( argc, argv); 3.273 + 3.274 + if( args < 10 ) return args +1; //non-zero exit when parsing went wrong 3.275 + 3.276 + benchmark( args ); 3.277 + 3.278 + return 0; 3.279 + } 3.280 +
4.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 4.2 +++ b/src/Application/main.h Wed Jul 10 14:13:46 2013 -0700 4.3 @@ -0,0 +1,166 @@ 4.4 +/* 4.5 + * 4.6 + */ 4.7 +#include <stdio.h> 4.8 +#include <stdlib.h> 4.9 +#include <string.h> 4.10 +#include <math.h> 4.11 +#include <ctype.h> 4.12 +#include <errno.h> 4.13 +#include <pthread.h> 4.14 +#include <sched.h> 4.15 +#include <unistd.h> 4.16 + 4.17 +#include <linux/perf_event.h> 4.18 +#include <sys/syscall.h> 4.19 + 4.20 +//========================== 4.21 +//#define TURN_ON_DEBUG 4.22 + 4.23 +//========================== 4.24 +#define NUM_CORES 4 4.25 + 4.26 +//========================== 4.27 + 4.28 +//SELECT how the measurement is done 4.29 +//only one must be enabled 4.30 +#define MEASURE_TSC 4.31 +//#define MEASURE_PERF 4.32 + 4.33 + 4.34 +#if !defined(unix) && !defined(__unix__) 4.35 +#ifdef __MACH__ 4.36 +#define unix 1 4.37 +#define __unix__ 1 4.38 +#endif /* __MACH__ */ 4.39 +#endif /* unix */ 4.40 + 4.41 +/* find the appropriate way to define explicitly sized types */ 4.42 +/* for C99 or GNU libc (also mach's libc) we can use stdint.h */ 4.43 +#if (__STDC_VERSION__ >= 199900) || defined(__GLIBC__) || defined(__MACH__) 4.44 +#include <stdint.h> 4.45 +#elif defined(unix) || defined(__unix__) /* some UNIX systems have them in sys/types.h */ 4.46 +#include <sys/types.h> 4.47 +#elif defined(__WIN32__) || defined(WIN32) /* the nameless one */ 4.48 +typedef unsigned __int8 uint8_t; 4.49 +typedef unsigned __int32 uint32_t; 4.50 +#endif /* sized type detection */ 4.51 + 4.52 + 4.53 +//================== 4.54 +#ifdef TURN_ON_DEBUG 4.55 + #define DEBUG__printf(msg) printf(msg) 4.56 + #define DEBUG__printf1(msg, arg1) printf(msg, arg1) 4.57 + #define DEBUG__printf2(msg, arg1, arg2) printf(msg, arg1, arg2) 4.58 +#else 4.59 + #define DEBUG__printf(msg) 4.60 + #define DEBUG__printf1(msg, arg1) 4.61 + #define DEBUG__printf2(msg, arg1, arg2) 4.62 +#endif 4.63 +//===== RDTSC wrapper ===== //Does work for x86_64 compile 4.64 + 4.65 +#define saveTimeStampCountInto(low, high) \ 4.66 + asm volatile("RDTSC; \ 4.67 + movl %%eax, %0; \ 4.68 + movl %%edx, %1;" \ 4.69 + /* outputs */ : "=m" (low), "=m" (high)\ 4.70 + /* inputs */ : \ 4.71 + /* clobber */ : "%eax", "%edx" \ 4.72 + ); 4.73 + 4.74 +#define saveLowTimeStampCountInto(low) \ 4.75 + asm volatile("RDTSC; \ 4.76 + movl %%eax, %0;" \ 4.77 + /* outputs */ : "=m" (low) \ 4.78 + /* inputs */ : \ 4.79 + /* clobber */ : "%eax", "%edx" \ 4.80 + ); 4.81 + 4.82 +//==================== 4.83 + 4.84 +union timeStamp 4.85 + { 4.86 + uint32_t lowHigh[2]; //lowHigh[0] is low, lowHigh[1] is high 4.87 + uint64_t total; 4.88 + }; 4.89 + 4.90 +struct perfData 4.91 + { 4.92 + uint64_t cycles; 4.93 + uint64_t instructions; 4.94 + }; 4.95 + 4.96 +//MEASURE_TSC should be mutually exclusive with MEASURE_PERF 4.97 +#ifdef MEASURE_TSC 4.98 + typedef union timeStamp MeasStruct; 4.99 +#else 4.100 + #ifdef MEASURE_PERF 4.101 + typedef struct perfData MeasStruct; 4.102 + #endif 4.103 +#endif 4.104 + 4.105 + //fast way to collect time intervals, by putting into hist right away 4.106 +#define makeAMeasHist( idx, name, numBins, startVal, binWidth ) \ 4.107 + makeHighestDynArrayIndexBeAtLeast( _VMSMasterEnv->measHistsInfo, idx ); \ 4.108 + _VMSMasterEnv->measHists[idx] = \ 4.109 + makeFixedBinHist( numBins, startVal, binWidth, name ); 4.110 + 4.111 +//read and save current perf-counter readings for cycles and instrs 4.112 +#ifdef MEASURE_PERF 4.113 + #define takeAMeas(core, perfDataStruct) do{ \ 4.114 + int cycles_fd = cycles_counter_fd[core]; \ 4.115 + int nread; \ 4.116 + \ 4.117 + nread = read(cycles_fd,&(perfDataStruct.cycles),sizeof(perfDataStruct.cycles)); \ 4.118 + if(nread<0){ \ 4.119 + perror("Error reading cycles counter"); \ 4.120 + cycles = 0; \ 4.121 + } \ 4.122 + } while (0) //macro magic for scoping 4.123 +#else 4.124 + #define takeAMeas(core, timeStampStruct) do{ \ 4.125 + saveTimeStampCountInto(timeStampStruct.lowHigh[0], timeStampStruct.lowHigh[1]);\ 4.126 + } while (0) //macro magic for scoping 4.127 +#endif 4.128 + 4.129 + 4.130 +typedef struct 4.131 + { 4.132 + int coreID; 4.133 + int numTuplesToCreate; 4.134 + int producerID; 4.135 + 4.136 + } 4.137 +ProducerParams; 4.138 + 4.139 +typedef struct 4.140 + { 4.141 + int coreID; 4.142 + int numTuplesToCreate; 4.143 + int numProducers; 4.144 + 4.145 + } 4.146 +ConsumerParams; 4.147 + 4.148 +//=========== Global Vars ============= 4.149 +pthread_mutex_t tupleIterLock; 4.150 +pthread_cond_t tupleIterCond; 4.151 +int tupleIter; 4.152 + 4.153 +pthread_mutex_t producerAccessMutex; 4.154 +pthread_mutex_t productionReadyLock; 4.155 +pthread_cond_t productionReadyCond; 4.156 +int currProductionNum; 4.157 +int producerMessage; 4.158 + 4.159 +pthread_mutex_t consumerReceivedAckLock; 4.160 +pthread_cond_t consumerReceivedAckCond; 4.161 +int currConsumerReceivedACKNum; 4.162 + 4.163 +//======= 4.164 +void* 4.165 +producer_birthFn( void* _params ); 4.166 +void* 4.167 +consumer_birthFn( void* _params ); 4.168 + 4.169 +
5.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 5.2 +++ b/src/Application/producer.c Wed Jul 10 14:13:46 2013 -0700 5.3 @@ -0,0 +1,110 @@ 5.4 +/* 5.5 + * 5.6 + */ 5.7 + 5.8 +#include "main.h" 5.9 +#include <pthread.h> 5.10 +#include <sched.h> 5.11 + 5.12 +/* 5.13 + * Producer. 5.14 + * 5.15 + * Birth function for thread that performs the producer behavior 5.16 + * 5.17 + * Note: is pinned to a core, to facilitate collecting measurements 5.18 + */ 5.19 +void* 5.20 +producer_birthFn( void* _params ) 5.21 + { 5.22 + cpu_set_t cpuinfo; 5.23 + int lastTupleIter, oldConsumerReceivedACKNum; 5.24 + 5.25 + ProducerParams *params = (ProducerParams *)_params; 5.26 + 5.27 + lastTupleIter = 0; //compared to global tupleIter while waiting 5.28 + oldConsumerReceivedACKNum = 0; //used when waiting for consumer to receive 5.29 + 5.30 + /* -------------------------------------------------- 5.31 + * Pin thread to core, the producers are divided 5.32 + * equally over all cores. Pinning prohibits the 5.33 + * switching of cores so that perf counter and TSC values remain 5.34 + * from the same core between readings. Pinning shouldn't 5.35 + * affect results.. may be odd case when num thds doesn't divide into 5.36 + * num Cores 5.37 + * -------------------------------------------------- 5.38 + */ 5.39 + /* 5.40 + CPU_ZERO( &cpuinfo ); 5.41 + CPU_SET( params->coreID, &cpuinfo ); 5.42 + pthread_setaffinity_np( pthread_self(), sizeof(cpuinfo), &cpuinfo ); 5.43 + pthread_yield(); //get off the core, so next can be created on it 5.44 + uint32_t cpuid = sched_getcpu(); 5.45 + */ 5.46 + 5.47 + 5.48 + /*Protocol: 5.49 + * wait for change in tupleIter (save updated tuple num for next time) 5.50 + * Get producer lock (only one producer at a time) 5.51 + * write into comm vars 5.52 + * get current ACK number 5.53 + * notify consumer 5.54 + * wait for ACK (get ACK lock, check on change in ACK number) 5.55 + * release producer lock 5.56 + * if not done, repeat 5.57 + */ 5.58 + while( lastTupleIter < params->numTuplesToCreate ) 5.59 + { 5.60 + //wait for change in tupleNum (save updated tuple num for next time) 5.61 + pthread_mutex_lock( &tupleIterLock ); 5.62 + while( lastTupleIter == tupleIter ) 5.63 + { 5.64 + pthread_cond_wait( &tupleIterCond, 5.65 + &tupleIterLock ); 5.66 + } 5.67 + pthread_mutex_unlock( &tupleIterLock ); 5.68 + 5.69 + lastTupleIter = tupleIter; //save for next time through loop 5.70 + 5.71 + DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter); 5.72 + 5.73 + //Two vars used to comm with consumer. One holds message to send, 5.74 + // other holds ID of producer sending. 5.75 + //Protect the two variables with a lock, that only one 5.76 + // producer can get. Update the variable with the message to be 5.77 + // communicated, and write ID of sender in second var. 5.78 + 5.79 + //Get producer lock 5.80 + pthread_mutex_lock( &producerAccessMutex ); 5.81 + 5.82 + // write into comm vars 5.83 + producerMessage = tupleIter; //just a dummy -- overhead meas, do nothing 5.84 + currProductionNum += 1; 5.85 + 5.86 + // get current ACK number 5.87 + oldConsumerReceivedACKNum = currConsumerReceivedACKNum; 5.88 + 5.89 + // notify consumer (don't think need the cond lock here -- teeter-totter) 5.90 + pthread_mutex_lock( &productionReadyLock ); 5.91 + DEBUG__printf1("producer %d wrote msg, about to wake up consumer\n", params->producerID ); 5.92 + pthread_cond_broadcast( &productionReadyCond ); 5.93 + pthread_mutex_unlock( &productionReadyLock ); 5.94 + 5.95 + // wait for ACK (get ACK lock, check on change in ACK number) 5.96 + pthread_mutex_lock( &consumerReceivedAckLock ); 5.97 + while( currConsumerReceivedACKNum == oldConsumerReceivedACKNum ) 5.98 + { 5.99 + pthread_cond_wait( &consumerReceivedAckCond, 5.100 + &consumerReceivedAckLock ); 5.101 + } 5.102 + pthread_mutex_unlock( &consumerReceivedAckLock ); 5.103 + DEBUG__printf2("producer %d got ack %d\n", params->producerID, currConsumerReceivedACKNum ); 5.104 + 5.105 + // release producer lock (so different producer can get and send) 5.106 + pthread_mutex_unlock( &producerAccessMutex ); 5.107 + } //if not done, do again 5.108 + 5.109 + //Shutdown producer 5.110 + pthread_exit(NULL); 5.111 + 5.112 + } 5.113 +
