annotate src/Application/consumer.c @ 0:9cf9b2091eeb

working condition variable version
author Sean Halle <seanhalle@yahoo.com>
date Wed, 10 Jul 2013 14:13:46 -0700
parents
children 88db7b62b961
rev   line source
seanhalle@0 1 /*
seanhalle@0 2 *
seanhalle@0 3 */
seanhalle@0 4
seanhalle@0 5 #include "main.h"
seanhalle@0 6 #include <pthread.h>
seanhalle@0 7
seanhalle@0 8 /*
seanhalle@0 9 * Consumer.
seanhalle@0 10 *
seanhalle@0 11 * Birth function for thread that performs the consumer behavior
seanhalle@0 12 *
seanhalle@0 13 *Here's the protocol:
seanhalle@0 14 *Consumer is born waiting for some producer to send it a production.
seanhalle@0 15 *When it wakes, it reads the variables used for communication, and packages
seanhalle@0 16 * the information into a tuple it is constructing.
seanhalle@0 17 *Then it wakes the producer, who is waiting to be sure that send was received.
seanhalle@0 18 *If the tuple is not yet complete, it loops back for another production.
seanhalle@0 19 *When tuple complete, it adds that tuple to the output
seanhalle@0 20 *If that's the last tuple, it ends itself
seanhalle@0 21 *If not, then wakes all the producers, who go to the next iteration.
seanhalle@0 22 *Then loops back to wait for some producer to send it a production
seanhalle@0 23 */
seanhalle@0 24 void*
seanhalle@0 25 consumer_birthFn( void* _params )
seanhalle@0 26 {
seanhalle@0 27 int lastSeenProductionNum, numProducts;
seanhalle@0 28
seanhalle@0 29 ConsumerParams* params = (ConsumerParams *)_params;
seanhalle@0 30
seanhalle@0 31
seanhalle@0 32 /*The consumer does two loops.
seanhalle@0 33 * The outside loop counts the number of tuples created.
seanhalle@0 34 * The inside loop collects the products for one tuple.
seanhalle@0 35 *
seanhalle@0 36 *Protocol:
seanhalle@0 37 * increment tupleIter
seanhalle@0 38 * wake producers for next tuple
seanhalle@0 39 * wait on production ready
seanhalle@0 40 * read comm vars and add msg to current tuple
seanhalle@0 41 * increment ACK count
seanhalle@0 42 * wake producer (who is waiting for ack)
seanhalle@0 43 * if more productions for current tuple, repeat
seanhalle@0 44 * have all productions for current tuple, so add tuple to output
seanhalle@0 45 * if have all tuples are going to produce, end
seanhalle@0 46 * else more, so repeat
seanhalle@0 47 */
seanhalle@0 48 while( tupleIter < params->numTuplesToCreate )
seanhalle@0 49 {
seanhalle@0 50 // increment tupleIter (global shared)
seanhalle@0 51 // wake producers for next iter (don't need cond lock? -- teeter totter)
seanhalle@0 52 DEBUG__printf("consumer broadcast for next iter\n");
seanhalle@0 53 pthread_mutex_lock(&tupleIterLock);
seanhalle@0 54 tupleIter += 1;
seanhalle@0 55 pthread_cond_broadcast( &tupleIterCond );
seanhalle@0 56 pthread_mutex_unlock(&tupleIterLock);
seanhalle@0 57
seanhalle@0 58 if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter);
seanhalle@0 59
seanhalle@0 60 for( numProducts = 0; numProducts < params->numProducers; numProducts++ )
seanhalle@0 61 {
seanhalle@0 62 //wait on productionReadyCond (suspend until there is a production)
seanhalle@0 63 pthread_mutex_lock( &productionReadyLock );
seanhalle@0 64 while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID
seanhalle@0 65 {
seanhalle@0 66 pthread_cond_wait( &productionReadyCond, &productionReadyLock );
seanhalle@0 67 }
seanhalle@0 68 DEBUG__printf1( "consumer got production %d\n", currProductionNum );
seanhalle@0 69 lastSeenProductionNum = currProductionNum;
seanhalle@0 70 pthread_mutex_unlock( &productionReadyLock );
seanhalle@0 71
seanhalle@0 72 //Read comm vars and add msg to current tuple
seanhalle@0 73 //add production to tuple -- overhead meas, do nothing
seanhalle@0 74
seanhalle@0 75 // increment ACK count
seanhalle@0 76 pthread_mutex_lock( &consumerReceivedAckLock );
seanhalle@0 77 currConsumerReceivedACKNum += 1;//make different than last time prod saw
seanhalle@0 78
seanhalle@0 79 // wake producer (who is waiting to be sure that send was received)
seanhalle@0 80 DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum);
seanhalle@0 81 pthread_cond_broadcast( &consumerReceivedAckCond );
seanhalle@0 82 pthread_mutex_unlock( &consumerReceivedAckLock );
seanhalle@0 83 } //if more productions for current tuple, repeat
seanhalle@0 84
seanhalle@0 85 // have all productions for current tuple, so add tuple to output
seanhalle@0 86 //add tuple to output and malloc new tuple array -- overhead meas, do nothing
seanhalle@0 87
seanhalle@0 88 } // if have all tuples are going to produce, end
seanhalle@0 89
seanhalle@0 90 //Shutdown consumer thread
seanhalle@0 91 pthread_exit(NULL);
seanhalle@0 92
seanhalle@0 93 }
seanhalle@0 94