| rev |
line source |
|
seanhalle@0
|
1 /*
|
|
seanhalle@0
|
2 *
|
|
seanhalle@0
|
3 */
|
|
seanhalle@0
|
4
|
|
seanhalle@0
|
5 #include "main.h"
|
|
seanhalle@0
|
6 #include <pthread.h>
|
|
seanhalle@0
|
7
|
|
seanhalle@0
|
8 /*
|
|
seanhalle@0
|
9 * Consumer.
|
|
seanhalle@0
|
10 *
|
|
seanhalle@0
|
11 * Birth function for thread that performs the consumer behavior
|
|
seanhalle@0
|
12 *
|
|
seanhalle@0
|
13 *Here's the protocol:
|
|
seanhalle@0
|
14 *Consumer is born waiting for some producer to send it a production.
|
|
seanhalle@0
|
15 *When it wakes, it reads the variables used for communication, and packages
|
|
seanhalle@0
|
16 * the information into a tuple it is constructing.
|
|
seanhalle@0
|
17 *Then it wakes the producer, who is waiting to be sure that send was received.
|
|
seanhalle@0
|
18 *If the tuple is not yet complete, it loops back for another production.
|
|
seanhalle@0
|
19 *When tuple complete, it adds that tuple to the output
|
|
seanhalle@0
|
20 *If that's the last tuple, it ends itself
|
|
seanhalle@0
|
21 *If not, then wakes all the producers, who go to the next iteration.
|
|
seanhalle@0
|
22 *Then loops back to wait for some producer to send it a production
|
|
seanhalle@0
|
23 */
|
|
seanhalle@0
|
24 void*
|
|
seanhalle@0
|
25 consumer_birthFn( void* _params )
|
|
seanhalle@0
|
26 {
|
|
seanhalle@0
|
27 int lastSeenProductionNum, numProducts;
|
|
seanhalle@0
|
28
|
|
seanhalle@0
|
29 ConsumerParams* params = (ConsumerParams *)_params;
|
|
seanhalle@0
|
30
|
|
seanhalle@0
|
31
|
|
seanhalle@0
|
32 /*The consumer does two loops.
|
|
seanhalle@0
|
33 * The outside loop counts the number of tuples created.
|
|
seanhalle@0
|
34 * The inside loop collects the products for one tuple.
|
|
seanhalle@0
|
35 *
|
|
seanhalle@0
|
36 *Protocol:
|
|
seanhalle@0
|
37 * increment tupleIter
|
|
seanhalle@0
|
38 * wake producers for next tuple
|
|
seanhalle@0
|
39 * wait on production ready
|
|
seanhalle@0
|
40 * read comm vars and add msg to current tuple
|
|
seanhalle@0
|
41 * increment ACK count
|
|
seanhalle@0
|
42 * wake producer (who is waiting for ack)
|
|
seanhalle@0
|
43 * if more productions for current tuple, repeat
|
|
seanhalle@0
|
44 * have all productions for current tuple, so add tuple to output
|
|
seanhalle@0
|
45 * if have all tuples are going to produce, end
|
|
seanhalle@0
|
46 * else more, so repeat
|
|
seanhalle@0
|
47 */
|
|
seanhalle@0
|
48 while( tupleIter < params->numTuplesToCreate )
|
|
seanhalle@0
|
49 {
|
|
seanhalle@0
|
50 // increment tupleIter (global shared)
|
|
seanhalle@0
|
51 // wake producers for next iter (don't need cond lock? -- teeter totter)
|
|
seanhalle@0
|
52 DEBUG__printf("consumer broadcast for next iter\n");
|
|
seanhalle@0
|
53 pthread_mutex_lock(&tupleIterLock);
|
|
seanhalle@0
|
54 tupleIter += 1;
|
|
seanhalle@0
|
55 pthread_cond_broadcast( &tupleIterCond );
|
|
seanhalle@0
|
56 pthread_mutex_unlock(&tupleIterLock);
|
|
seanhalle@0
|
57
|
|
seanhalle@0
|
58 if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter);
|
|
seanhalle@0
|
59
|
|
seanhalle@0
|
60 for( numProducts = 0; numProducts < params->numProducers; numProducts++ )
|
|
seanhalle@0
|
61 {
|
|
seanhalle@0
|
62 //wait on productionReadyCond (suspend until there is a production)
|
|
seanhalle@0
|
63 pthread_mutex_lock( &productionReadyLock );
|
|
seanhalle@0
|
64 while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID
|
|
seanhalle@0
|
65 {
|
|
seanhalle@0
|
66 pthread_cond_wait( &productionReadyCond, &productionReadyLock );
|
|
seanhalle@0
|
67 }
|
|
seanhalle@0
|
68 DEBUG__printf1( "consumer got production %d\n", currProductionNum );
|
|
seanhalle@0
|
69 lastSeenProductionNum = currProductionNum;
|
|
seanhalle@0
|
70 pthread_mutex_unlock( &productionReadyLock );
|
|
seanhalle@0
|
71
|
|
seanhalle@0
|
72 //Read comm vars and add msg to current tuple
|
|
seanhalle@0
|
73 //add production to tuple -- overhead meas, do nothing
|
|
seanhalle@0
|
74
|
|
seanhalle@0
|
75 // increment ACK count
|
|
seanhalle@0
|
76 pthread_mutex_lock( &consumerReceivedAckLock );
|
|
seanhalle@0
|
77 currConsumerReceivedACKNum += 1;//make different than last time prod saw
|
|
seanhalle@0
|
78
|
|
seanhalle@0
|
79 // wake producer (who is waiting to be sure that send was received)
|
|
seanhalle@0
|
80 DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum);
|
|
seanhalle@0
|
81 pthread_cond_broadcast( &consumerReceivedAckCond );
|
|
seanhalle@0
|
82 pthread_mutex_unlock( &consumerReceivedAckLock );
|
|
seanhalle@0
|
83 } //if more productions for current tuple, repeat
|
|
seanhalle@0
|
84
|
|
seanhalle@0
|
85 // have all productions for current tuple, so add tuple to output
|
|
seanhalle@0
|
86 //add tuple to output and malloc new tuple array -- overhead meas, do nothing
|
|
seanhalle@0
|
87
|
|
seanhalle@0
|
88 } // if have all tuples are going to produce, end
|
|
seanhalle@0
|
89
|
|
seanhalle@0
|
90 //Shutdown consumer thread
|
|
seanhalle@0
|
91 pthread_exit(NULL);
|
|
seanhalle@0
|
92
|
|
seanhalle@0
|
93 }
|
|
seanhalle@0
|
94
|