changeset 0:9cf9b2091eeb cond_var

working condition variable version
author Sean Halle <seanhalle@yahoo.com>
date Wed, 10 Jul 2013 14:13:46 -0700
parents
children 88db7b62b961
files .hgignore src/Application/consumer.c src/Application/main.c src/Application/main.h src/Application/producer.c
diffstat 5 files changed, 650 insertions(+), 0 deletions(-) [+]
line diff
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/.hgignore	Wed Jul 10 14:13:46 2013 -0700
     1.3 @@ -0,0 +1,3 @@
     1.4 +nbproject
     1.5 +task_size_vs_exe_time
     1.6 +glob: *.o
     2.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     2.2 +++ b/src/Application/consumer.c	Wed Jul 10 14:13:46 2013 -0700
     2.3 @@ -0,0 +1,94 @@
     2.4 +/* 
     2.5 + * 
     2.6 + */
     2.7 +
     2.8 +#include "main.h"
     2.9 +#include <pthread.h>
    2.10 +
    2.11 +/*
    2.12 + * Consumer.
    2.13 + * 
    2.14 + * Birth function for thread that performs the consumer behavior
    2.15 + * 
    2.16 + *Here's the protocol: 
    2.17 + *Consumer is born waiting for some producer to send it a production.
    2.18 + *When it wakes, it reads the variables used for communication, and packages
    2.19 + * the information into a tuple it is constructing.
    2.20 + *Then it wakes the producer, who is waiting to be sure that send was received.
    2.21 + *If the tuple is not yet complete, it loops back for another production.
    2.22 + *When tuple complete, it adds that tuple to the output
    2.23 + *If that's the last tuple, it ends itself
    2.24 + *If not, then wakes all the producers, who go to the next iteration.
    2.25 + *Then loops back to wait for some producer to send it a production
    2.26 + */
    2.27 +void* 
    2.28 +consumer_birthFn( void* _params )
    2.29 + {
    2.30 +   int lastSeenProductionNum, numProducts;
    2.31 +   
    2.32 +   ConsumerParams* params = (ConsumerParams *)_params;
    2.33 +       
    2.34 +   
    2.35 +   /*The consumer does two loops.  
    2.36 +    * The outside loop counts the number of tuples created.
    2.37 +    * The inside loop collects the products for one tuple.
    2.38 +    * 
    2.39 +    *Protocol:
    2.40 +    * increment tupleIter
    2.41 +    * wake producers for next tuple
    2.42 +    * wait on production ready
    2.43 +    * read comm vars and add msg to current tuple
    2.44 +    * increment ACK count
    2.45 +    * wake producer (who is waiting for ack)
    2.46 +    * if more productions for current tuple, repeat
    2.47 +    * have all productions for current tuple, so add tuple to output
    2.48 +    * if have all tuples are going to produce, end
    2.49 +    * else more, so  repeat
    2.50 +    */
    2.51 +   while( tupleIter < params->numTuplesToCreate )
    2.52 +    {
    2.53 +      // increment tupleIter (global shared)
    2.54 +      // wake producers for next iter (don't need cond lock? -- teeter totter)
    2.55 +      DEBUG__printf("consumer broadcast for next iter\n");
    2.56 +      pthread_mutex_lock(&tupleIterLock);
    2.57 +      tupleIter += 1;
    2.58 +      pthread_cond_broadcast( &tupleIterCond );
    2.59 +      pthread_mutex_unlock(&tupleIterLock);
    2.60 +	  
    2.61 +      if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter);       
    2.62 +         
    2.63 +      for( numProducts = 0; numProducts < params->numProducers; numProducts++ )
    2.64 +       { 
    2.65 +         //wait on productionReadyCond  (suspend until there is a production)
    2.66 +         pthread_mutex_lock( &productionReadyLock );
    2.67 +         while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID
    2.68 +          {
    2.69 +            pthread_cond_wait( &productionReadyCond, &productionReadyLock );
    2.70 +          }
    2.71 +         DEBUG__printf1( "consumer got production %d\n", currProductionNum );
    2.72 +         lastSeenProductionNum = currProductionNum;
    2.73 +         pthread_mutex_unlock( &productionReadyLock );
    2.74 +                  
    2.75 +         //Read comm vars and add msg to current tuple
    2.76 +         //add production to tuple -- overhead meas, do nothing
    2.77 +      
    2.78 +         // increment ACK count
    2.79 +         pthread_mutex_lock(     &consumerReceivedAckLock  );
    2.80 +         currConsumerReceivedACKNum += 1;//make different than last time prod saw
    2.81 +         
    2.82 +         // wake producer (who is waiting to be sure that send was received)
    2.83 +         DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum);
    2.84 +         pthread_cond_broadcast( &consumerReceivedAckCond );
    2.85 +         pthread_mutex_unlock(     &consumerReceivedAckLock  );
    2.86 +    } //if more productions for current tuple, repeat
    2.87 +    
    2.88 +      // have all productions for current tuple, so add tuple to output
    2.89 +      //add tuple to output and malloc new tuple array -- overhead meas, do nothing
    2.90 +      
    2.91 +    } // if have all tuples are going to produce, end
    2.92 +
    2.93 +   //Shutdown consumer thread
    2.94 +   pthread_exit(NULL);
    2.95 +    
    2.96 + }
    2.97 +
     3.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     3.2 +++ b/src/Application/main.c	Wed Jul 10 14:13:46 2013 -0700
     3.3 @@ -0,0 +1,277 @@
     3.4 +/* 
     3.5 + * 
     3.6 + */
     3.7 +
     3.8 +#include "main.h"
     3.9 +
    3.10 +//==========  Global Vars  ===========
    3.11 +
    3.12 +const char *usage = 
    3.13 + {
    3.14 +   "Usage: k_tuple_async [options]\n"
    3.15 +   "  Creates a number of workers, and one consumer that packages productions "
    3.16 +   "   into a tuple.\n\n"
    3.17 +   "Options:\n"
    3.18 +   "  -p <num>   The number of producer threads to create.\n"
    3.19 +   "  -t <num>   the number of tuples to create\n"
    3.20 +   "  -h         this help screen\n\n"
    3.21 + };
    3.22 +
    3.23 +char __ProgrammName[] = "K-tuple_async";
    3.24 +char __DataSet[255];
    3.25 +
    3.26 +#ifdef MEASURE_PERF
    3.27 +int cycles_counter_fd[NUM_CORES];
    3.28 +int instrs_counter_fd[NUM_CORES];
    3.29 +int cycles_counter_main_fd;
    3.30 +#endif
    3.31 +
    3.32 +pthread_mutex_t  waitForAllDoneLock;
    3.33 +pthread_cond_t   waitForAllDoneCond;   
    3.34 +
    3.35 +
    3.36 +//===================================
    3.37 +/* provide a millisecond-resolution timer for each system */
    3.38 +#if defined(unix) || defined(__unix__)
    3.39 +#include <time.h>
    3.40 +#include <sys/time.h>
    3.41 +unsigned long get_msec(void) {
    3.42 +	static struct timeval timeval, first_timeval;
    3.43 +	
    3.44 +	gettimeofday(&timeval, 0);
    3.45 +	if(first_timeval.tv_sec == 0) {
    3.46 +		first_timeval = timeval;
    3.47 +		return 0;
    3.48 +	}
    3.49 +	return (timeval.tv_sec - first_timeval.tv_sec) * 1000 + (timeval.tv_usec - first_timeval.tv_usec) / 1000;
    3.50 +}
    3.51 +#elif defined(__WIN32__) || defined(WIN32)
    3.52 +#include <windows.h>
    3.53 +unsigned long get_msec(void) {
    3.54 +	return GetTickCount();
    3.55 +}
    3.56 +#else
    3.57 +#error "I don't know how to measure time on your platform"
    3.58 +#endif
    3.59 +
    3.60 +/*Initializes the performance counters, and opens the file descriptors used
    3.61 + * to read from the performance counters
    3.62 + */
    3.63 +void
    3.64 +set_up_performance_counters()
    3.65 + { int i;
    3.66 + 
    3.67 + #ifdef MEASURE_PERF
    3.68 +    //setup performance counters
    3.69 +    struct perf_event_attr hw_event;
    3.70 +    memset(&hw_event,0,sizeof(hw_event));
    3.71 +        hw_event.type = PERF_TYPE_HARDWARE;
    3.72 +        hw_event.size = sizeof(hw_event);
    3.73 +        hw_event.disabled = 0;
    3.74 +        hw_event.freq = 0;
    3.75 +        hw_event.inherit = 1; /* children inherit it   */
    3.76 +        hw_event.pinned = 1; /* must always be on PMU */
    3.77 +        hw_event.exclusive = 0; /* only group on PMU     */
    3.78 +        hw_event.exclude_user = 0; /* don't count user      */
    3.79 +        hw_event.exclude_kernel = 1; /* ditto kernel          */
    3.80 +        hw_event.exclude_hv = 1; /* ditto hypervisor      */
    3.81 +        hw_event.exclude_idle = 1; /* don't count when idle */
    3.82 +        hw_event.mmap = 0; /* include mmap data     */
    3.83 +        hw_event.comm = 0; /* include comm data     */
    3.84 +
    3.85 +
    3.86 +    for( i = 0; i < NUM_CORES; i++ )
    3.87 +    {
    3.88 +        hw_event.config = PERF_COUNT_HW_CPU_CYCLES; //cycles
    3.89 +        cycles_counter_fd[i] = syscall(__NR_perf_event_open, &hw_event,
    3.90 +                0,//pid_t pid, 
    3.91 +                i,//int cpu, 
    3.92 +                -1,//int group_fd,
    3.93 +                0//unsigned long flags
    3.94 +        );
    3.95 +        if (cycles_counter_fd[i]<0){
    3.96 +            fprintf(stderr,"On core %d: ",i);
    3.97 +            perror("Failed to open cycles counter");
    3.98 +        }
    3.99 +    }        
   3.100 +       
   3.101 +    int cycles_counter_main_fd;
   3.102 +    hw_event.config = PERF_COUNT_HW_CPU_CYCLES; //cycles
   3.103 +    hw_event.exclude_kernel=0;
   3.104 +    cycles_counter_main_fd = syscall(__NR_perf_event_open, &hw_event,
   3.105 +            0,//pid_t pid, 
   3.106 +            -1,//int cpu, 
   3.107 +            -1,//int group_fd,
   3.108 +            0//unsigned long flags
   3.109 +    );
   3.110 +    if (cycles_counter_main_fd<0){
   3.111 +        perror("Failed to open main cycles counter");
   3.112 +    }
   3.113 +    
   3.114 + #endif
   3.115 + }
   3.116 +
   3.117 +
   3.118 +void
   3.119 +init_stuff()
   3.120 + { 
   3.121 +   pthread_mutex_init(&tupleIterLock, NULL);
   3.122 +   pthread_cond_init( &tupleIterCond, NULL );
   3.123 +   tupleIter = 0;
   3.124 +
   3.125 +   pthread_mutex_init(&producerAccessMutex, NULL);
   3.126 +   pthread_mutex_init(&productionReadyLock, NULL);
   3.127 +   pthread_cond_init( &productionReadyCond, NULL );
   3.128 +   currProductionNum = 0;
   3.129 +
   3.130 +   pthread_mutex_init(&consumerReceivedAckLock, NULL);
   3.131 +   pthread_cond_init( &consumerReceivedAckCond, NULL );
   3.132 +   currConsumerReceivedACKNum = 0;
   3.133 + }
   3.134 +
   3.135 +
   3.136 +typedef struct
   3.137 + {
   3.138 +   int numProducers;
   3.139 +   int numTuplesToCreate;   
   3.140 + }
   3.141 +ParsedArgs;
   3.142 +
   3.143 +/*The benchmark Fn creates the producers and the consumer, then gives the
   3.144 + * "go" signal.  It measures time from go until the consumer produces the
   3.145 + * last tuple as output.
   3.146 + */
   3.147 +void 
   3.148 +benchmark( ParsedArgs *args )
   3.149 + {
   3.150 +   int i;
   3.151 +   ProducerParams producerParams[args->numProducers];
   3.152 +   pthread_t producerThds[args->numProducers];
   3.153 +   pthread_t consumerThd;
   3.154 +   
   3.155 +   ConsumerParams consumerParams;
   3.156 +   
   3.157 +   //Set up the param structs for producers.. gives them the mutex and cond var
   3.158 +   // to communicate with consumer
   3.159 +   //Also the core the producer should pin its thread to
   3.160 +   for(i=0; i < args->numProducers; i++)
   3.161 +    {
   3.162 +      producerParams[i].producerID = i + 1; //no ID of 0, a fact used in handshake
   3.163 +      producerParams[i].numTuplesToCreate = args->numTuplesToCreate;
   3.164 +      producerParams[i].coreID = i % NUM_CORES;
   3.165 +    }
   3.166 +
   3.167 +   consumerParams.numProducers = args->numProducers;
   3.168 +   consumerParams.numTuplesToCreate = args->numTuplesToCreate;
   3.169 +           
   3.170 +   //take measurement before creation of threads, to get total exetime
   3.171 +   MeasStruct benchStartMeas, benchEndMeas;
   3.172 +       
   3.173 +   takeAMeas(0, benchStartMeas);
   3.174 +   
   3.175 +   for(i=0; i < args->numProducers; i++) 
   3.176 +    { pthread_create( &producerThds[i], NULL, &producer_birthFn, (void*)&producerParams[i]);  
   3.177 +    }
   3.178 +   
   3.179 +   pthread_create( &consumerThd, NULL, &consumer_birthFn, (void*)&consumerParams );
   3.180 +   
   3.181 +   for(i=0; i<args->numProducers; i++)
   3.182 +    { pthread_join( producerThds[i], NULL );
   3.183 +    }
   3.184 +   pthread_join( consumerThd, NULL );
   3.185 +   
   3.186 +   //work is all done, so take a measurement snapshot at end
   3.187 +   takeAMeas(0, benchEndMeas);
   3.188 +   
   3.189 +   
   3.190 +#ifdef MEASURE_PERF
   3.191 +   uint64_t totalExeCycles = ( benchEndMeas.cycles - benchStartMeas.cycles);
   3.192 +   printf("Total Execution: %lu\n", totalExeCycles);
   3.193 +#else
   3.194 +   uint64_t totalExeCycles = ( benchEndMeas.total - benchStartMeas.total);
   3.195 +   printf("Total Cycles of Execution: %lu\n", totalExeCycles);   
   3.196 +#endif
   3.197 +
   3.198 +    //======================================================
   3.199 + }
   3.200 +
   3.201 +
   3.202 +/*This parsed the command line arguments and returns the values in a struct
   3.203 + * Command line args should be a '-' followed by a single letter, then a value
   3.204 + */
   3.205 +ParsedArgs *
   3.206 +parse_arguments( int argc, char **argv )
   3.207 + { ParsedArgs *parsedArgs;
   3.208 +   int i;
   3.209 +   
   3.210 +   parsedArgs = malloc(sizeof(ParsedArgs));
   3.211 +   if(argc < 2)
   3.212 +    { fprintf(stdout, "must give arguments");
   3.213 +      fputs(usage, stdout);
   3.214 +      return EXIT_FAILURE;
   3.215 +    }
   3.216 +   for( i=1; i < argc; i++ ) 
   3.217 +    { if(argv[i][0] == '-' && argv[i][2] == 0) 
   3.218 +       { switch(argv[i][1]) 
   3.219 +          { case 'p':
   3.220 +             { if(!isdigit(argv[++i][0])) 
   3.221 +                { fprintf(stderr, "-p must be followed by the number of producer threads to spawn\n");
   3.222 +                  return EXIT_FAILURE;
   3.223 +                }
   3.224 +               parsedArgs->numProducers = atoi(argv[i]);
   3.225 +               if( parsedArgs->numProducers == 0 ) 
   3.226 +                { fprintf(stderr, "invalid number of producers specified: %d\n", parsedArgs->numProducers);
   3.227 +                  return EXIT_FAILURE;
   3.228 +                }
   3.229 +               else
   3.230 +                { DEBUG__printf1("num producers: %d\n", parsedArgs->numProducers );
   3.231 +                }
   3.232 +             }
   3.233 +            break;
   3.234 +            case 't':
   3.235 +             { if( !isdigit( argv[++i][0] ) ) 
   3.236 +                { fputs("-t must be followed by a number\n", stderr);
   3.237 +                  return EXIT_FAILURE;
   3.238 +                }
   3.239 +               parsedArgs->numTuplesToCreate = atoi(argv[i]);
   3.240 +               DEBUG__printf1("num tuples to produce: %d\n", parsedArgs->numTuplesToCreate );
   3.241 +             }
   3.242 +            break;
   3.243 +            case 'h':
   3.244 +             { fputs(usage, stdout);
   3.245 +               return 0;
   3.246 +             }
   3.247 +            default:
   3.248 +             { fprintf(stderr, "unrecognized argument: %s\n", argv[i]);
   3.249 +               fputs(usage, stderr);
   3.250 +               return EXIT_FAILURE;
   3.251 +             }
   3.252 +          }
   3.253 +       }
   3.254 +      else 
   3.255 +       { fprintf(stdout, "unrecognized argument: %s\n", argv[i]);
   3.256 +         fputs(usage, stdout);
   3.257 +         return EXIT_FAILURE;
   3.258 +       }
   3.259 +    }//for
   3.260 +   return parsedArgs;
   3.261 + }
   3.262 +
   3.263 +int main(int argc, char **argv)
   3.264 + { ParsedArgs *args;
   3.265 +   int i;
   3.266 +        
   3.267 +    
   3.268 +   set_up_performance_counters();
   3.269 + 
   3.270 +   init_stuff();
   3.271 +   
   3.272 +   args = parse_arguments( argc, argv);
   3.273 +   
   3.274 +   if( args < 10 ) return args +1; //non-zero exit when parsing went wrong
   3.275 +   
   3.276 +   benchmark( args );
   3.277 +
   3.278 +   return 0;
   3.279 + }
   3.280 +
     4.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     4.2 +++ b/src/Application/main.h	Wed Jul 10 14:13:46 2013 -0700
     4.3 @@ -0,0 +1,166 @@
     4.4 +/* 
     4.5 + * 
     4.6 + */
     4.7 +#include <stdio.h>
     4.8 +#include <stdlib.h>
     4.9 +#include <string.h>
    4.10 +#include <math.h>
    4.11 +#include <ctype.h>
    4.12 +#include <errno.h>
    4.13 +#include <pthread.h>
    4.14 +#include <sched.h>
    4.15 +#include <unistd.h>
    4.16 +
    4.17 +#include <linux/perf_event.h>
    4.18 +#include <sys/syscall.h>
    4.19 +
    4.20 +//==========================
    4.21 +//#define TURN_ON_DEBUG
    4.22 +
    4.23 +//==========================
    4.24 +#define NUM_CORES 4
    4.25 +
    4.26 +//==========================
    4.27 +
    4.28 +//SELECT how the measurement is done
    4.29 +//only one must be enabled
    4.30 +#define MEASURE_TSC
    4.31 +//#define MEASURE_PERF
    4.32 +
    4.33 +
    4.34 +#if !defined(unix) && !defined(__unix__)
    4.35 +#ifdef __MACH__
    4.36 +#define unix		1
    4.37 +#define __unix__	1
    4.38 +#endif	/* __MACH__ */
    4.39 +#endif	/* unix */
    4.40 +
    4.41 +/* find the appropriate way to define explicitly sized types */
    4.42 +/* for C99 or GNU libc (also mach's libc) we can use stdint.h */
    4.43 +#if (__STDC_VERSION__ >= 199900) || defined(__GLIBC__) || defined(__MACH__)
    4.44 +#include <stdint.h>
    4.45 +#elif defined(unix) || defined(__unix__)	/* some UNIX systems have them in sys/types.h */
    4.46 +#include <sys/types.h>
    4.47 +#elif defined(__WIN32__) || defined(WIN32)	/* the nameless one */
    4.48 +typedef unsigned __int8 uint8_t;
    4.49 +typedef unsigned __int32 uint32_t;
    4.50 +#endif	/* sized type detection */
    4.51 +
    4.52 +
    4.53 +//==================
    4.54 +#ifdef TURN_ON_DEBUG
    4.55 + #define DEBUG__printf(msg) printf(msg)
    4.56 + #define DEBUG__printf1(msg, arg1) printf(msg, arg1)
    4.57 + #define DEBUG__printf2(msg, arg1, arg2) printf(msg, arg1, arg2)
    4.58 +#else
    4.59 + #define DEBUG__printf(msg)
    4.60 + #define DEBUG__printf1(msg, arg1)
    4.61 + #define DEBUG__printf2(msg, arg1, arg2)
    4.62 +#endif
    4.63 +//===== RDTSC wrapper ===== //Does work for x86_64 compile
    4.64 +
    4.65 +#define saveTimeStampCountInto(low, high) \
    4.66 +   asm volatile("RDTSC;                   \
    4.67 +                 movl %%eax, %0;          \
    4.68 +                 movl %%edx, %1;"         \
    4.69 +   /* outputs */ : "=m" (low), "=m" (high)\
    4.70 +   /* inputs  */ :                        \
    4.71 +   /* clobber */ : "%eax", "%edx"         \
    4.72 +                );
    4.73 +
    4.74 +#define saveLowTimeStampCountInto(low)    \
    4.75 +   asm volatile("RDTSC;                   \
    4.76 +                 movl %%eax, %0;"         \
    4.77 +   /* outputs */ : "=m" (low)             \
    4.78 +   /* inputs  */ :                        \
    4.79 +   /* clobber */ : "%eax", "%edx"         \
    4.80 +                );
    4.81 +
    4.82 +//====================
    4.83 +
    4.84 +union timeStamp
    4.85 + {
    4.86 +    uint32_t lowHigh[2]; //lowHigh[0] is low, lowHigh[1] is high
    4.87 +    uint64_t total;
    4.88 + };
    4.89 +
    4.90 +struct perfData
    4.91 + {
    4.92 +    uint64_t cycles;
    4.93 +    uint64_t instructions;
    4.94 + };
    4.95 +
    4.96 +//MEASURE_TSC should be mutually exclusive with MEASURE_PERF
    4.97 +#ifdef MEASURE_TSC
    4.98 + typedef union timeStamp MeasStruct;
    4.99 +#else
   4.100 + #ifdef MEASURE_PERF
   4.101 + typedef struct perfData MeasStruct;
   4.102 + #endif
   4.103 +#endif
   4.104 +
   4.105 + //fast way to collect time intervals, by putting into hist right away
   4.106 +#define makeAMeasHist( idx, name, numBins, startVal, binWidth ) \
   4.107 +   makeHighestDynArrayIndexBeAtLeast( _VMSMasterEnv->measHistsInfo, idx ); \
   4.108 +   _VMSMasterEnv->measHists[idx] =  \
   4.109 +                       makeFixedBinHist( numBins, startVal, binWidth, name );
   4.110 +
   4.111 +//read and save current perf-counter readings for cycles and instrs
   4.112 +#ifdef MEASURE_PERF
   4.113 + #define takeAMeas(core, perfDataStruct) do{     \
   4.114 +   int cycles_fd = cycles_counter_fd[core];             \
   4.115 +   int nread;                                           \
   4.116 +                                                        \
   4.117 +   nread = read(cycles_fd,&(perfDataStruct.cycles),sizeof(perfDataStruct.cycles));    \
   4.118 +   if(nread<0){                                         \
   4.119 +       perror("Error reading cycles counter");          \
   4.120 +       cycles = 0;                                      \
   4.121 +   }                                                    \
   4.122 + } while (0) //macro magic for scoping
   4.123 +#else
   4.124 + #define takeAMeas(core, timeStampStruct) do{     \
   4.125 +   saveTimeStampCountInto(timeStampStruct.lowHigh[0], timeStampStruct.lowHigh[1]);\
   4.126 + } while (0) //macro magic for scoping
   4.127 +#endif
   4.128 +
   4.129 + 
   4.130 +typedef struct
   4.131 + {
   4.132 +   int coreID;
   4.133 +   int numTuplesToCreate;
   4.134 +   int producerID;
   4.135 +   
   4.136 + }
   4.137 +ProducerParams;
   4.138 +
   4.139 +typedef struct
   4.140 + {
   4.141 +   int coreID;
   4.142 +   int numTuplesToCreate;
   4.143 +   int numProducers;
   4.144 +   
   4.145 + }
   4.146 +ConsumerParams;
   4.147 + 
   4.148 +//=========== Global Vars =============
   4.149 +pthread_mutex_t tupleIterLock;
   4.150 +pthread_cond_t  tupleIterCond;
   4.151 +int tupleIter;
   4.152 +
   4.153 +pthread_mutex_t producerAccessMutex;
   4.154 +pthread_mutex_t productionReadyLock;
   4.155 +pthread_cond_t  productionReadyCond;
   4.156 +int currProductionNum;
   4.157 +int producerMessage;
   4.158 +
   4.159 +pthread_mutex_t consumerReceivedAckLock;
   4.160 +pthread_cond_t  consumerReceivedAckCond;
   4.161 +int currConsumerReceivedACKNum;
   4.162 +
   4.163 +//=======
   4.164 +void* 
   4.165 +producer_birthFn( void* _params );
   4.166 +void* 
   4.167 +consumer_birthFn( void* _params );
   4.168 +
   4.169 +
     5.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     5.2 +++ b/src/Application/producer.c	Wed Jul 10 14:13:46 2013 -0700
     5.3 @@ -0,0 +1,110 @@
     5.4 +/* 
     5.5 + * 
     5.6 + */
     5.7 +
     5.8 +#include "main.h"
     5.9 +#include <pthread.h>
    5.10 +#include <sched.h>
    5.11 +
    5.12 +/*
    5.13 + * Producer.
    5.14 + * 
    5.15 + * Birth function for thread that performs the producer behavior
    5.16 + * 
    5.17 + * Note: is pinned to a core, to facilitate collecting measurements
    5.18 + */
    5.19 +void* 
    5.20 +producer_birthFn( void* _params )
    5.21 + {
    5.22 +   cpu_set_t cpuinfo;
    5.23 +   int lastTupleIter, oldConsumerReceivedACKNum;
    5.24 +    
    5.25 +   ProducerParams *params = (ProducerParams *)_params;
    5.26 +    
    5.27 +   lastTupleIter = 0; //compared to global tupleIter while waiting
    5.28 +   oldConsumerReceivedACKNum = 0; //used when waiting for consumer to receive
    5.29 +   
    5.30 +   /* --------------------------------------------------
    5.31 +    * Pin thread to core, the producers are divided
    5.32 +    * equally over all cores. Pinning prohibits the
    5.33 +    * switching of cores so that perf counter and TSC values remain
    5.34 +    * from the same core between readings.  Pinning shouldn't 
    5.35 +    * affect results.. may be odd case when num thds doesn't divide into
    5.36 +    * num Cores
    5.37 +    * --------------------------------------------------
    5.38 +    */
    5.39 + /*
    5.40 +   CPU_ZERO( &cpuinfo );
    5.41 +   CPU_SET( params->coreID, &cpuinfo );
    5.42 +   pthread_setaffinity_np( pthread_self(), sizeof(cpuinfo), &cpuinfo );
    5.43 +   pthread_yield(); //get off the core, so next can be created on it
    5.44 +   uint32_t cpuid = sched_getcpu();
    5.45 +  */
    5.46 +   
    5.47 +       
    5.48 +   /*Protocol:
    5.49 +    * wait for change in tupleIter (save updated tuple num for next time)
    5.50 +    * Get producer lock (only one producer at a time)
    5.51 +    * write into comm vars
    5.52 +    * get current ACK number
    5.53 +    * notify consumer
    5.54 +    * wait for ACK (get ACK lock, check on change in ACK number)
    5.55 +    * release producer lock
    5.56 +    * if not done, repeat
    5.57 +    */
    5.58 +   while( lastTupleIter < params->numTuplesToCreate )
    5.59 +    {
    5.60 +      //wait for change in tupleNum (save updated tuple num for next time)
    5.61 +      pthread_mutex_lock(     &tupleIterLock  );
    5.62 +      while( lastTupleIter == tupleIter )
    5.63 +       {
    5.64 +         pthread_cond_wait( &tupleIterCond,
    5.65 +                            &tupleIterLock );
    5.66 +       }
    5.67 +      pthread_mutex_unlock(   &tupleIterLock  );
    5.68 +      
    5.69 +      lastTupleIter = tupleIter; //save for next time through loop
    5.70 +      
    5.71 +      DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter);
    5.72 +      
    5.73 +      //Two vars used to comm with consumer.  One holds message to send,
    5.74 +      // other holds ID of producer sending.
    5.75 +      //Protect the two variables with a lock, that only one
    5.76 +      // producer can get.  Update the variable with the message to be 
    5.77 +      // communicated, and write ID of sender in second var.
    5.78 +
    5.79 +      //Get producer lock
    5.80 +      pthread_mutex_lock( &producerAccessMutex );
    5.81 +      
    5.82 +      // write into comm vars
    5.83 +      producerMessage = tupleIter;  //just a dummy -- overhead meas, do nothing
    5.84 +      currProductionNum += 1;
    5.85 +      
    5.86 +      // get current ACK number
    5.87 +      oldConsumerReceivedACKNum = currConsumerReceivedACKNum;
    5.88 +
    5.89 +      // notify consumer (don't think need the cond lock here -- teeter-totter)
    5.90 +      pthread_mutex_lock(     &productionReadyLock  );
    5.91 +      DEBUG__printf1("producer %d wrote msg, about to wake up consumer\n", params->producerID );
    5.92 +      pthread_cond_broadcast( &productionReadyCond );
    5.93 +      pthread_mutex_unlock(   &productionReadyLock  );
    5.94 +
    5.95 +      // wait for ACK (get ACK lock, check on change in ACK number)
    5.96 +      pthread_mutex_lock(     &consumerReceivedAckLock  );
    5.97 +      while( currConsumerReceivedACKNum == oldConsumerReceivedACKNum )
    5.98 +       {
    5.99 +         pthread_cond_wait( &consumerReceivedAckCond,
   5.100 +                            &consumerReceivedAckLock );
   5.101 +       }
   5.102 +      pthread_mutex_unlock(   &consumerReceivedAckLock  );
   5.103 +      DEBUG__printf2("producer %d got ack %d\n", params->producerID, currConsumerReceivedACKNum );
   5.104 +
   5.105 +      // release producer lock (so different producer can get and send)
   5.106 +      pthread_mutex_unlock(   &producerAccessMutex );  
   5.107 +    } //if not done, do again  
   5.108 +
   5.109 +   //Shutdown producer
   5.110 +   pthread_exit(NULL);
   5.111 +    
   5.112 + }
   5.113 +