Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > pthread > pthread__k_tuple__async
view src/Application/producer.c @ 0:9cf9b2091eeb
working condition variable version
| author | Sean Halle <seanhalle@yahoo.com> |
|---|---|
| date | Wed, 10 Jul 2013 14:13:46 -0700 |
| parents | |
| children | 88db7b62b961 |
line source
1 /*
2 *
3 */
5 #include "main.h"
6 #include <pthread.h>
7 #include <sched.h>
9 /*
10 * Producer.
11 *
12 * Birth function for thread that performs the producer behavior
13 *
14 * Note: is pinned to a core, to facilitate collecting measurements
15 */
16 void*
17 producer_birthFn( void* _params )
18 {
19 cpu_set_t cpuinfo;
20 int lastTupleIter, oldConsumerReceivedACKNum;
22 ProducerParams *params = (ProducerParams *)_params;
24 lastTupleIter = 0; //compared to global tupleIter while waiting
25 oldConsumerReceivedACKNum = 0; //used when waiting for consumer to receive
27 /* --------------------------------------------------
28 * Pin thread to core, the producers are divided
29 * equally over all cores. Pinning prohibits the
30 * switching of cores so that perf counter and TSC values remain
31 * from the same core between readings. Pinning shouldn't
32 * affect results.. may be odd case when num thds doesn't divide into
33 * num Cores
34 * --------------------------------------------------
35 */
36 /*
37 CPU_ZERO( &cpuinfo );
38 CPU_SET( params->coreID, &cpuinfo );
39 pthread_setaffinity_np( pthread_self(), sizeof(cpuinfo), &cpuinfo );
40 pthread_yield(); //get off the core, so next can be created on it
41 uint32_t cpuid = sched_getcpu();
42 */
45 /*Protocol:
46 * wait for change in tupleIter (save updated tuple num for next time)
47 * Get producer lock (only one producer at a time)
48 * write into comm vars
49 * get current ACK number
50 * notify consumer
51 * wait for ACK (get ACK lock, check on change in ACK number)
52 * release producer lock
53 * if not done, repeat
54 */
55 while( lastTupleIter < params->numTuplesToCreate )
56 {
57 //wait for change in tupleNum (save updated tuple num for next time)
58 pthread_mutex_lock( &tupleIterLock );
59 while( lastTupleIter == tupleIter )
60 {
61 pthread_cond_wait( &tupleIterCond,
62 &tupleIterLock );
63 }
64 pthread_mutex_unlock( &tupleIterLock );
66 lastTupleIter = tupleIter; //save for next time through loop
68 DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter);
70 //Two vars used to comm with consumer. One holds message to send,
71 // other holds ID of producer sending.
72 //Protect the two variables with a lock, that only one
73 // producer can get. Update the variable with the message to be
74 // communicated, and write ID of sender in second var.
76 //Get producer lock
77 pthread_mutex_lock( &producerAccessMutex );
79 // write into comm vars
80 producerMessage = tupleIter; //just a dummy -- overhead meas, do nothing
81 currProductionNum += 1;
83 // get current ACK number
84 oldConsumerReceivedACKNum = currConsumerReceivedACKNum;
86 // notify consumer (don't think need the cond lock here -- teeter-totter)
87 pthread_mutex_lock( &productionReadyLock );
88 DEBUG__printf1("producer %d wrote msg, about to wake up consumer\n", params->producerID );
89 pthread_cond_broadcast( &productionReadyCond );
90 pthread_mutex_unlock( &productionReadyLock );
92 // wait for ACK (get ACK lock, check on change in ACK number)
93 pthread_mutex_lock( &consumerReceivedAckLock );
94 while( currConsumerReceivedACKNum == oldConsumerReceivedACKNum )
95 {
96 pthread_cond_wait( &consumerReceivedAckCond,
97 &consumerReceivedAckLock );
98 }
99 pthread_mutex_unlock( &consumerReceivedAckLock );
100 DEBUG__printf2("producer %d got ack %d\n", params->producerID, currConsumerReceivedACKNum );
102 // release producer lock (so different producer can get and send)
103 pthread_mutex_unlock( &producerAccessMutex );
104 } //if not done, do again
106 //Shutdown producer
107 pthread_exit(NULL);
109 }
