# HG changeset patch # User Sean Halle # Date 1373491024 25200 # Node ID 88db7b62b9610c9e9d94083c208dc7a7e374963e # Parent 9cf9b2091eeb768d45cc73d819062f0c9ceb823f Working queue based version diff -r 9cf9b2091eeb -r 88db7b62b961 src/Application/consumer.c --- a/src/Application/consumer.c Wed Jul 10 14:13:46 2013 -0700 +++ b/src/Application/consumer.c Wed Jul 10 14:17:04 2013 -0700 @@ -11,20 +11,16 @@ * Birth function for thread that performs the consumer behavior * *Here's the protocol: - *Consumer is born waiting for some producer to send it a production. - *When it wakes, it reads the variables used for communication, and packages - * the information into a tuple it is constructing. - *Then it wakes the producer, who is waiting to be sure that send was received. - *If the tuple is not yet complete, it loops back for another production. - *When tuple complete, it adds that tuple to the output - *If that's the last tuple, it ends itself - *If not, then wakes all the producers, who go to the next iteration. - *Then loops back to wait for some producer to send it a production + * Reads a production from the shared Q. + * If empty, yields and tries again. + * When has a production from every producer, broadcasts next iter to producers + * When has all the tuples, end */ void* consumer_birthFn( void* _params ) { - int lastSeenProductionNum, numProducts; + int numProducts; + void *production; //dummy ptr ConsumerParams* params = (ConsumerParams *)_params; @@ -36,51 +32,36 @@ *Protocol: * increment tupleIter * wake producers for next tuple - * wait on production ready - * read comm vars and add msg to current tuple - * increment ACK count - * wake producer (who is waiting for ack) - * if more productions for current tuple, repeat - * have all productions for current tuple, so add tuple to output - * if have all tuples are going to produce, end - * else more, so repeat + * Reads a production from the shared Q. + * If empty, yields and tries again. + * if more productions in tuple, repeat + * if have more tuples, repeat + * end thread */ while( tupleIter < params->numTuplesToCreate ) { - // increment tupleIter (global shared) - // wake producers for next iter (don't need cond lock? -- teeter totter) + if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter); + + // wake producers for next iter DEBUG__printf("consumer broadcast for next iter\n"); pthread_mutex_lock(&tupleIterLock); - tupleIter += 1; + tupleIter += 1; // increment tupleIter (global shared) pthread_cond_broadcast( &tupleIterCond ); pthread_mutex_unlock(&tupleIterLock); - - if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter); for( numProducts = 0; numProducts < params->numProducers; numProducts++ ) { - //wait on productionReadyCond (suspend until there is a production) - pthread_mutex_lock( &productionReadyLock ); - while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID - { - pthread_cond_wait( &productionReadyCond, &productionReadyLock ); + // read a production from the shared Q. + production = NULL; + while( production == NULL ) + { pthread_mutex_lock( &queueAccessLock ); + production = readPrivQ( commQ ); + pthread_mutex_unlock( &queueAccessLock ); + // If empty, yields and tries again. + if( production == NULL) sched_yield(); } DEBUG__printf1( "consumer got production %d\n", currProductionNum ); - lastSeenProductionNum = currProductionNum; - pthread_mutex_unlock( &productionReadyLock ); - - //Read comm vars and add msg to current tuple - //add production to tuple -- overhead meas, do nothing - - // increment ACK count - pthread_mutex_lock( &consumerReceivedAckLock ); - currConsumerReceivedACKNum += 1;//make different than last time prod saw - - // wake producer (who is waiting to be sure that send was received) - DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum); - pthread_cond_broadcast( &consumerReceivedAckCond ); - pthread_mutex_unlock( &consumerReceivedAckLock ); - } //if more productions for current tuple, repeat + } //if more productions for current tuple, repeat // have all productions for current tuple, so add tuple to output //add tuple to output and malloc new tuple array -- overhead meas, do nothing diff -r 9cf9b2091eeb -r 88db7b62b961 src/Application/main.c --- a/src/Application/main.c Wed Jul 10 14:13:46 2013 -0700 +++ b/src/Application/main.c Wed Jul 10 14:17:04 2013 -0700 @@ -119,14 +119,8 @@ pthread_cond_init( &tupleIterCond, NULL ); tupleIter = 0; - pthread_mutex_init(&producerAccessMutex, NULL); - pthread_mutex_init(&productionReadyLock, NULL); - pthread_cond_init( &productionReadyCond, NULL ); - currProductionNum = 0; - - pthread_mutex_init(&consumerReceivedAckLock, NULL); - pthread_cond_init( &consumerReceivedAckCond, NULL ); - currConsumerReceivedACKNum = 0; + pthread_mutex_init( &queueAccessLock, NULL ); + commQ = makePrivQ(); } diff -r 9cf9b2091eeb -r 88db7b62b961 src/Application/main.h --- a/src/Application/main.h Wed Jul 10 14:13:46 2013 -0700 +++ b/src/Application/main.h Wed Jul 10 14:17:04 2013 -0700 @@ -14,6 +14,8 @@ #include #include +#include "Queue_impl/PrivateQueue.h" + //========================== //#define TURN_ON_DEBUG @@ -147,16 +149,12 @@ pthread_cond_t tupleIterCond; int tupleIter; -pthread_mutex_t producerAccessMutex; -pthread_mutex_t productionReadyLock; -pthread_cond_t productionReadyCond; +pthread_mutex_t queueAccessLock; +PrivQueueStruc *commQ; + int currProductionNum; int producerMessage; -pthread_mutex_t consumerReceivedAckLock; -pthread_cond_t consumerReceivedAckCond; -int currConsumerReceivedACKNum; - //======= void* producer_birthFn( void* _params ); diff -r 9cf9b2091eeb -r 88db7b62b961 src/Application/producer.c --- a/src/Application/producer.c Wed Jul 10 14:13:46 2013 -0700 +++ b/src/Application/producer.c Wed Jul 10 14:17:04 2013 -0700 @@ -10,8 +10,6 @@ * Producer. * * Birth function for thread that performs the producer behavior - * - * Note: is pinned to a core, to facilitate collecting measurements */ void* producer_birthFn( void* _params ) @@ -22,7 +20,6 @@ ProducerParams *params = (ProducerParams *)_params; lastTupleIter = 0; //compared to global tupleIter while waiting - oldConsumerReceivedACKNum = 0; //used when waiting for consumer to receive /* -------------------------------------------------- * Pin thread to core, the producers are divided @@ -44,12 +41,9 @@ /*Protocol: * wait for change in tupleIter (save updated tuple num for next time) - * Get producer lock (only one producer at a time) - * write into comm vars - * get current ACK number - * notify consumer - * wait for ACK (get ACK lock, check on change in ACK number) - * release producer lock + * Get queue lock + * write into queue + * release queue lock * if not done, repeat */ while( lastTupleIter < params->numTuplesToCreate ) @@ -61,46 +55,18 @@ pthread_cond_wait( &tupleIterCond, &tupleIterLock ); } - pthread_mutex_unlock( &tupleIterLock ); + pthread_mutex_unlock( &tupleIterLock ); lastTupleIter = tupleIter; //save for next time through loop DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter); - //Two vars used to comm with consumer. One holds message to send, - // other holds ID of producer sending. - //Protect the two variables with a lock, that only one - // producer can get. Update the variable with the message to be - // communicated, and write ID of sender in second var. + //Q used to comm with consumer, protected with a lock //Get producer lock - pthread_mutex_lock( &producerAccessMutex ); - - // write into comm vars - producerMessage = tupleIter; //just a dummy -- overhead meas, do nothing - currProductionNum += 1; - - // get current ACK number - oldConsumerReceivedACKNum = currConsumerReceivedACKNum; - - // notify consumer (don't think need the cond lock here -- teeter-totter) - pthread_mutex_lock( &productionReadyLock ); - DEBUG__printf1("producer %d wrote msg, about to wake up consumer\n", params->producerID ); - pthread_cond_broadcast( &productionReadyCond ); - pthread_mutex_unlock( &productionReadyLock ); - - // wait for ACK (get ACK lock, check on change in ACK number) - pthread_mutex_lock( &consumerReceivedAckLock ); - while( currConsumerReceivedACKNum == oldConsumerReceivedACKNum ) - { - pthread_cond_wait( &consumerReceivedAckCond, - &consumerReceivedAckLock ); - } - pthread_mutex_unlock( &consumerReceivedAckLock ); - DEBUG__printf2("producer %d got ack %d\n", params->producerID, currConsumerReceivedACKNum ); - - // release producer lock (so different producer can get and send) - pthread_mutex_unlock( &producerAccessMutex ); + pthread_mutex_lock( &queueAccessLock ); + writePrivQ( params, commQ ); //params is just a dummy pointer + pthread_mutex_unlock( &queueAccessLock ); } //if not done, do again //Shutdown producer