diff src/Application/producer.c @ 0:9cf9b2091eeb

working condition variable version
author Sean Halle <seanhalle@yahoo.com>
date Wed, 10 Jul 2013 14:13:46 -0700
parents
children 88db7b62b961
line diff
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/src/Application/producer.c	Wed Jul 10 14:13:46 2013 -0700
     1.3 @@ -0,0 +1,110 @@
     1.4 +/* 
     1.5 + * 
     1.6 + */
     1.7 +
     1.8 +#include "main.h"
     1.9 +#include <pthread.h>
    1.10 +#include <sched.h>
    1.11 +
    1.12 +/*
    1.13 + * Producer.
    1.14 + * 
    1.15 + * Birth function for thread that performs the producer behavior
    1.16 + * 
    1.17 + * Note: is pinned to a core, to facilitate collecting measurements
    1.18 + */
    1.19 +void* 
    1.20 +producer_birthFn( void* _params )
    1.21 + {
    1.22 +   cpu_set_t cpuinfo;
    1.23 +   int lastTupleIter, oldConsumerReceivedACKNum;
    1.24 +    
    1.25 +   ProducerParams *params = (ProducerParams *)_params;
    1.26 +    
    1.27 +   lastTupleIter = 0; //compared to global tupleIter while waiting
    1.28 +   oldConsumerReceivedACKNum = 0; //used when waiting for consumer to receive
    1.29 +   
    1.30 +   /* --------------------------------------------------
    1.31 +    * Pin thread to core, the producers are divided
    1.32 +    * equally over all cores. Pinning prohibits the
    1.33 +    * switching of cores so that perf counter and TSC values remain
    1.34 +    * from the same core between readings.  Pinning shouldn't 
    1.35 +    * affect results.. may be odd case when num thds doesn't divide into
    1.36 +    * num Cores
    1.37 +    * --------------------------------------------------
    1.38 +    */
    1.39 + /*
    1.40 +   CPU_ZERO( &cpuinfo );
    1.41 +   CPU_SET( params->coreID, &cpuinfo );
    1.42 +   pthread_setaffinity_np( pthread_self(), sizeof(cpuinfo), &cpuinfo );
    1.43 +   pthread_yield(); //get off the core, so next can be created on it
    1.44 +   uint32_t cpuid = sched_getcpu();
    1.45 +  */
    1.46 +   
    1.47 +       
    1.48 +   /*Protocol:
    1.49 +    * wait for change in tupleIter (save updated tuple num for next time)
    1.50 +    * Get producer lock (only one producer at a time)
    1.51 +    * write into comm vars
    1.52 +    * get current ACK number
    1.53 +    * notify consumer
    1.54 +    * wait for ACK (get ACK lock, check on change in ACK number)
    1.55 +    * release producer lock
    1.56 +    * if not done, repeat
    1.57 +    */
    1.58 +   while( lastTupleIter < params->numTuplesToCreate )
    1.59 +    {
    1.60 +      //wait for change in tupleNum (save updated tuple num for next time)
    1.61 +      pthread_mutex_lock(     &tupleIterLock  );
    1.62 +      while( lastTupleIter == tupleIter )
    1.63 +       {
    1.64 +         pthread_cond_wait( &tupleIterCond,
    1.65 +                            &tupleIterLock );
    1.66 +       }
    1.67 +      pthread_mutex_unlock(   &tupleIterLock  );
    1.68 +      
    1.69 +      lastTupleIter = tupleIter; //save for next time through loop
    1.70 +      
    1.71 +      DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter);
    1.72 +      
    1.73 +      //Two vars used to comm with consumer.  One holds message to send,
    1.74 +      // other holds ID of producer sending.
    1.75 +      //Protect the two variables with a lock, that only one
    1.76 +      // producer can get.  Update the variable with the message to be 
    1.77 +      // communicated, and write ID of sender in second var.
    1.78 +
    1.79 +      //Get producer lock
    1.80 +      pthread_mutex_lock( &producerAccessMutex );
    1.81 +      
    1.82 +      // write into comm vars
    1.83 +      producerMessage = tupleIter;  //just a dummy -- overhead meas, do nothing
    1.84 +      currProductionNum += 1;
    1.85 +      
    1.86 +      // get current ACK number
    1.87 +      oldConsumerReceivedACKNum = currConsumerReceivedACKNum;
    1.88 +
    1.89 +      // notify consumer (don't think need the cond lock here -- teeter-totter)
    1.90 +      pthread_mutex_lock(     &productionReadyLock  );
    1.91 +      DEBUG__printf1("producer %d wrote msg, about to wake up consumer\n", params->producerID );
    1.92 +      pthread_cond_broadcast( &productionReadyCond );
    1.93 +      pthread_mutex_unlock(   &productionReadyLock  );
    1.94 +
    1.95 +      // wait for ACK (get ACK lock, check on change in ACK number)
    1.96 +      pthread_mutex_lock(     &consumerReceivedAckLock  );
    1.97 +      while( currConsumerReceivedACKNum == oldConsumerReceivedACKNum )
    1.98 +       {
    1.99 +         pthread_cond_wait( &consumerReceivedAckCond,
   1.100 +                            &consumerReceivedAckLock );
   1.101 +       }
   1.102 +      pthread_mutex_unlock(   &consumerReceivedAckLock  );
   1.103 +      DEBUG__printf2("producer %d got ack %d\n", params->producerID, currConsumerReceivedACKNum );
   1.104 +
   1.105 +      // release producer lock (so different producer can get and send)
   1.106 +      pthread_mutex_unlock(   &producerAccessMutex );  
   1.107 +    } //if not done, do again  
   1.108 +
   1.109 +   //Shutdown producer
   1.110 +   pthread_exit(NULL);
   1.111 +    
   1.112 + }
   1.113 +