seanhalle@0: /* seanhalle@0: * seanhalle@0: */ seanhalle@0: seanhalle@0: #include "main.h" seanhalle@0: #include seanhalle@0: seanhalle@0: /* seanhalle@0: * Consumer. seanhalle@0: * seanhalle@0: * Birth function for thread that performs the consumer behavior seanhalle@0: * seanhalle@0: *Here's the protocol: seanhalle@0: *Consumer is born waiting for some producer to send it a production. seanhalle@0: *When it wakes, it reads the variables used for communication, and packages seanhalle@0: * the information into a tuple it is constructing. seanhalle@0: *Then it wakes the producer, who is waiting to be sure that send was received. seanhalle@0: *If the tuple is not yet complete, it loops back for another production. seanhalle@0: *When tuple complete, it adds that tuple to the output seanhalle@0: *If that's the last tuple, it ends itself seanhalle@0: *If not, then wakes all the producers, who go to the next iteration. seanhalle@0: *Then loops back to wait for some producer to send it a production seanhalle@0: */ seanhalle@0: void* seanhalle@0: consumer_birthFn( void* _params ) seanhalle@0: { seanhalle@0: int lastSeenProductionNum, numProducts; seanhalle@0: seanhalle@0: ConsumerParams* params = (ConsumerParams *)_params; seanhalle@0: seanhalle@0: seanhalle@0: /*The consumer does two loops. seanhalle@0: * The outside loop counts the number of tuples created. seanhalle@0: * The inside loop collects the products for one tuple. seanhalle@0: * seanhalle@0: *Protocol: seanhalle@0: * increment tupleIter seanhalle@0: * wake producers for next tuple seanhalle@0: * wait on production ready seanhalle@0: * read comm vars and add msg to current tuple seanhalle@0: * increment ACK count seanhalle@0: * wake producer (who is waiting for ack) seanhalle@0: * if more productions for current tuple, repeat seanhalle@0: * have all productions for current tuple, so add tuple to output seanhalle@0: * if have all tuples are going to produce, end seanhalle@0: * else more, so repeat seanhalle@0: */ seanhalle@0: while( tupleIter < params->numTuplesToCreate ) seanhalle@0: { seanhalle@0: // increment tupleIter (global shared) seanhalle@0: // wake producers for next iter (don't need cond lock? -- teeter totter) seanhalle@0: DEBUG__printf("consumer broadcast for next iter\n"); seanhalle@0: pthread_mutex_lock(&tupleIterLock); seanhalle@0: tupleIter += 1; seanhalle@0: pthread_cond_broadcast( &tupleIterCond ); seanhalle@0: pthread_mutex_unlock(&tupleIterLock); seanhalle@0: seanhalle@0: if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter); seanhalle@0: seanhalle@0: for( numProducts = 0; numProducts < params->numProducers; numProducts++ ) seanhalle@0: { seanhalle@0: //wait on productionReadyCond (suspend until there is a production) seanhalle@0: pthread_mutex_lock( &productionReadyLock ); seanhalle@0: while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID seanhalle@0: { seanhalle@0: pthread_cond_wait( &productionReadyCond, &productionReadyLock ); seanhalle@0: } seanhalle@0: DEBUG__printf1( "consumer got production %d\n", currProductionNum ); seanhalle@0: lastSeenProductionNum = currProductionNum; seanhalle@0: pthread_mutex_unlock( &productionReadyLock ); seanhalle@0: seanhalle@0: //Read comm vars and add msg to current tuple seanhalle@0: //add production to tuple -- overhead meas, do nothing seanhalle@0: seanhalle@0: // increment ACK count seanhalle@0: pthread_mutex_lock( &consumerReceivedAckLock ); seanhalle@0: currConsumerReceivedACKNum += 1;//make different than last time prod saw seanhalle@0: seanhalle@0: // wake producer (who is waiting to be sure that send was received) seanhalle@0: DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum); seanhalle@0: pthread_cond_broadcast( &consumerReceivedAckCond ); seanhalle@0: pthread_mutex_unlock( &consumerReceivedAckLock ); seanhalle@0: } //if more productions for current tuple, repeat seanhalle@0: seanhalle@0: // have all productions for current tuple, so add tuple to output seanhalle@0: //add tuple to output and malloc new tuple array -- overhead meas, do nothing seanhalle@0: seanhalle@0: } // if have all tuples are going to produce, end seanhalle@0: seanhalle@0: //Shutdown consumer thread seanhalle@0: pthread_exit(NULL); seanhalle@0: seanhalle@0: } seanhalle@0: