Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > pthread > pthread__k_tuple__async
diff src/Application/producer.c @ 0:9cf9b2091eeb
working condition variable version
| author | Sean Halle <seanhalle@yahoo.com> |
|---|---|
| date | Wed, 10 Jul 2013 14:13:46 -0700 |
| parents | |
| children | 88db7b62b961 |
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/src/Application/producer.c Wed Jul 10 14:13:46 2013 -0700 1.3 @@ -0,0 +1,110 @@ 1.4 +/* 1.5 + * 1.6 + */ 1.7 + 1.8 +#include "main.h" 1.9 +#include <pthread.h> 1.10 +#include <sched.h> 1.11 + 1.12 +/* 1.13 + * Producer. 1.14 + * 1.15 + * Birth function for thread that performs the producer behavior 1.16 + * 1.17 + * Note: is pinned to a core, to facilitate collecting measurements 1.18 + */ 1.19 +void* 1.20 +producer_birthFn( void* _params ) 1.21 + { 1.22 + cpu_set_t cpuinfo; 1.23 + int lastTupleIter, oldConsumerReceivedACKNum; 1.24 + 1.25 + ProducerParams *params = (ProducerParams *)_params; 1.26 + 1.27 + lastTupleIter = 0; //compared to global tupleIter while waiting 1.28 + oldConsumerReceivedACKNum = 0; //used when waiting for consumer to receive 1.29 + 1.30 + /* -------------------------------------------------- 1.31 + * Pin thread to core, the producers are divided 1.32 + * equally over all cores. Pinning prohibits the 1.33 + * switching of cores so that perf counter and TSC values remain 1.34 + * from the same core between readings. Pinning shouldn't 1.35 + * affect results.. may be odd case when num thds doesn't divide into 1.36 + * num Cores 1.37 + * -------------------------------------------------- 1.38 + */ 1.39 + /* 1.40 + CPU_ZERO( &cpuinfo ); 1.41 + CPU_SET( params->coreID, &cpuinfo ); 1.42 + pthread_setaffinity_np( pthread_self(), sizeof(cpuinfo), &cpuinfo ); 1.43 + pthread_yield(); //get off the core, so next can be created on it 1.44 + uint32_t cpuid = sched_getcpu(); 1.45 + */ 1.46 + 1.47 + 1.48 + /*Protocol: 1.49 + * wait for change in tupleIter (save updated tuple num for next time) 1.50 + * Get producer lock (only one producer at a time) 1.51 + * write into comm vars 1.52 + * get current ACK number 1.53 + * notify consumer 1.54 + * wait for ACK (get ACK lock, check on change in ACK number) 1.55 + * release producer lock 1.56 + * if not done, repeat 1.57 + */ 1.58 + while( lastTupleIter < params->numTuplesToCreate ) 1.59 + { 1.60 + //wait for change in tupleNum (save updated tuple num for next time) 1.61 + pthread_mutex_lock( &tupleIterLock ); 1.62 + while( lastTupleIter == tupleIter ) 1.63 + { 1.64 + pthread_cond_wait( &tupleIterCond, 1.65 + &tupleIterLock ); 1.66 + } 1.67 + pthread_mutex_unlock( &tupleIterLock ); 1.68 + 1.69 + lastTupleIter = tupleIter; //save for next time through loop 1.70 + 1.71 + DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter); 1.72 + 1.73 + //Two vars used to comm with consumer. One holds message to send, 1.74 + // other holds ID of producer sending. 1.75 + //Protect the two variables with a lock, that only one 1.76 + // producer can get. Update the variable with the message to be 1.77 + // communicated, and write ID of sender in second var. 1.78 + 1.79 + //Get producer lock 1.80 + pthread_mutex_lock( &producerAccessMutex ); 1.81 + 1.82 + // write into comm vars 1.83 + producerMessage = tupleIter; //just a dummy -- overhead meas, do nothing 1.84 + currProductionNum += 1; 1.85 + 1.86 + // get current ACK number 1.87 + oldConsumerReceivedACKNum = currConsumerReceivedACKNum; 1.88 + 1.89 + // notify consumer (don't think need the cond lock here -- teeter-totter) 1.90 + pthread_mutex_lock( &productionReadyLock ); 1.91 + DEBUG__printf1("producer %d wrote msg, about to wake up consumer\n", params->producerID ); 1.92 + pthread_cond_broadcast( &productionReadyCond ); 1.93 + pthread_mutex_unlock( &productionReadyLock ); 1.94 + 1.95 + // wait for ACK (get ACK lock, check on change in ACK number) 1.96 + pthread_mutex_lock( &consumerReceivedAckLock ); 1.97 + while( currConsumerReceivedACKNum == oldConsumerReceivedACKNum ) 1.98 + { 1.99 + pthread_cond_wait( &consumerReceivedAckCond, 1.100 + &consumerReceivedAckLock ); 1.101 + } 1.102 + pthread_mutex_unlock( &consumerReceivedAckLock ); 1.103 + DEBUG__printf2("producer %d got ack %d\n", params->producerID, currConsumerReceivedACKNum ); 1.104 + 1.105 + // release producer lock (so different producer can get and send) 1.106 + pthread_mutex_unlock( &producerAccessMutex ); 1.107 + } //if not done, do again 1.108 + 1.109 + //Shutdown producer 1.110 + pthread_exit(NULL); 1.111 + 1.112 + } 1.113 +
