Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > pthread > pthread__k_tuple__async
comparison src/Application/producer.c @ 0:9cf9b2091eeb
working condition variable version
| author | Sean Halle <seanhalle@yahoo.com> |
|---|---|
| date | Wed, 10 Jul 2013 14:13:46 -0700 |
| parents | |
| children | 88db7b62b961 |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:ee1e1e746014 |
|---|---|
| 1 /* | |
| 2 * | |
| 3 */ | |
| 4 | |
| 5 #include "main.h" | |
| 6 #include <pthread.h> | |
| 7 #include <sched.h> | |
| 8 | |
| 9 /* | |
| 10 * Producer. | |
| 11 * | |
| 12 * Birth function for thread that performs the producer behavior | |
| 13 * | |
| 14 * Note: is pinned to a core, to facilitate collecting measurements | |
| 15 */ | |
| 16 void* | |
| 17 producer_birthFn( void* _params ) | |
| 18 { | |
| 19 cpu_set_t cpuinfo; | |
| 20 int lastTupleIter, oldConsumerReceivedACKNum; | |
| 21 | |
| 22 ProducerParams *params = (ProducerParams *)_params; | |
| 23 | |
| 24 lastTupleIter = 0; //compared to global tupleIter while waiting | |
| 25 oldConsumerReceivedACKNum = 0; //used when waiting for consumer to receive | |
| 26 | |
| 27 /* -------------------------------------------------- | |
| 28 * Pin thread to core, the producers are divided | |
| 29 * equally over all cores. Pinning prohibits the | |
| 30 * switching of cores so that perf counter and TSC values remain | |
| 31 * from the same core between readings. Pinning shouldn't | |
| 32 * affect results.. may be odd case when num thds doesn't divide into | |
| 33 * num Cores | |
| 34 * -------------------------------------------------- | |
| 35 */ | |
| 36 /* | |
| 37 CPU_ZERO( &cpuinfo ); | |
| 38 CPU_SET( params->coreID, &cpuinfo ); | |
| 39 pthread_setaffinity_np( pthread_self(), sizeof(cpuinfo), &cpuinfo ); | |
| 40 pthread_yield(); //get off the core, so next can be created on it | |
| 41 uint32_t cpuid = sched_getcpu(); | |
| 42 */ | |
| 43 | |
| 44 | |
| 45 /*Protocol: | |
| 46 * wait for change in tupleIter (save updated tuple num for next time) | |
| 47 * Get producer lock (only one producer at a time) | |
| 48 * write into comm vars | |
| 49 * get current ACK number | |
| 50 * notify consumer | |
| 51 * wait for ACK (get ACK lock, check on change in ACK number) | |
| 52 * release producer lock | |
| 53 * if not done, repeat | |
| 54 */ | |
| 55 while( lastTupleIter < params->numTuplesToCreate ) | |
| 56 { | |
| 57 //wait for change in tupleNum (save updated tuple num for next time) | |
| 58 pthread_mutex_lock( &tupleIterLock ); | |
| 59 while( lastTupleIter == tupleIter ) | |
| 60 { | |
| 61 pthread_cond_wait( &tupleIterCond, | |
| 62 &tupleIterLock ); | |
| 63 } | |
| 64 pthread_mutex_unlock( &tupleIterLock ); | |
| 65 | |
| 66 lastTupleIter = tupleIter; //save for next time through loop | |
| 67 | |
| 68 DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter); | |
| 69 | |
| 70 //Two vars used to comm with consumer. One holds message to send, | |
| 71 // other holds ID of producer sending. | |
| 72 //Protect the two variables with a lock, that only one | |
| 73 // producer can get. Update the variable with the message to be | |
| 74 // communicated, and write ID of sender in second var. | |
| 75 | |
| 76 //Get producer lock | |
| 77 pthread_mutex_lock( &producerAccessMutex ); | |
| 78 | |
| 79 // write into comm vars | |
| 80 producerMessage = tupleIter; //just a dummy -- overhead meas, do nothing | |
| 81 currProductionNum += 1; | |
| 82 | |
| 83 // get current ACK number | |
| 84 oldConsumerReceivedACKNum = currConsumerReceivedACKNum; | |
| 85 | |
| 86 // notify consumer (don't think need the cond lock here -- teeter-totter) | |
| 87 pthread_mutex_lock( &productionReadyLock ); | |
| 88 DEBUG__printf1("producer %d wrote msg, about to wake up consumer\n", params->producerID ); | |
| 89 pthread_cond_broadcast( &productionReadyCond ); | |
| 90 pthread_mutex_unlock( &productionReadyLock ); | |
| 91 | |
| 92 // wait for ACK (get ACK lock, check on change in ACK number) | |
| 93 pthread_mutex_lock( &consumerReceivedAckLock ); | |
| 94 while( currConsumerReceivedACKNum == oldConsumerReceivedACKNum ) | |
| 95 { | |
| 96 pthread_cond_wait( &consumerReceivedAckCond, | |
| 97 &consumerReceivedAckLock ); | |
| 98 } | |
| 99 pthread_mutex_unlock( &consumerReceivedAckLock ); | |
| 100 DEBUG__printf2("producer %d got ack %d\n", params->producerID, currConsumerReceivedACKNum ); | |
| 101 | |
| 102 // release producer lock (so different producer can get and send) | |
| 103 pthread_mutex_unlock( &producerAccessMutex ); | |
| 104 } //if not done, do again | |
| 105 | |
| 106 //Shutdown producer | |
| 107 pthread_exit(NULL); | |
| 108 | |
| 109 } | |
| 110 |
