Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > pthread > pthread__k_tuple__async
changeset 1:88db7b62b961 queue tip
Working queue based version
| author | Sean Halle <seanhalle@yahoo.com> |
|---|---|
| date | Wed, 10 Jul 2013 14:17:04 -0700 |
| parents | 9cf9b2091eeb |
| children | |
| files | src/Application/consumer.c src/Application/main.c src/Application/main.h src/Application/producer.c |
| diffstat | 4 files changed, 39 insertions(+), 100 deletions(-) [+] |
line diff
1.1 --- a/src/Application/consumer.c Wed Jul 10 14:13:46 2013 -0700 1.2 +++ b/src/Application/consumer.c Wed Jul 10 14:17:04 2013 -0700 1.3 @@ -11,20 +11,16 @@ 1.4 * Birth function for thread that performs the consumer behavior 1.5 * 1.6 *Here's the protocol: 1.7 - *Consumer is born waiting for some producer to send it a production. 1.8 - *When it wakes, it reads the variables used for communication, and packages 1.9 - * the information into a tuple it is constructing. 1.10 - *Then it wakes the producer, who is waiting to be sure that send was received. 1.11 - *If the tuple is not yet complete, it loops back for another production. 1.12 - *When tuple complete, it adds that tuple to the output 1.13 - *If that's the last tuple, it ends itself 1.14 - *If not, then wakes all the producers, who go to the next iteration. 1.15 - *Then loops back to wait for some producer to send it a production 1.16 + * Reads a production from the shared Q. 1.17 + * If empty, yields and tries again. 1.18 + * When has a production from every producer, broadcasts next iter to producers 1.19 + * When has all the tuples, end 1.20 */ 1.21 void* 1.22 consumer_birthFn( void* _params ) 1.23 { 1.24 - int lastSeenProductionNum, numProducts; 1.25 + int numProducts; 1.26 + void *production; //dummy ptr 1.27 1.28 ConsumerParams* params = (ConsumerParams *)_params; 1.29 1.30 @@ -36,51 +32,36 @@ 1.31 *Protocol: 1.32 * increment tupleIter 1.33 * wake producers for next tuple 1.34 - * wait on production ready 1.35 - * read comm vars and add msg to current tuple 1.36 - * increment ACK count 1.37 - * wake producer (who is waiting for ack) 1.38 - * if more productions for current tuple, repeat 1.39 - * have all productions for current tuple, so add tuple to output 1.40 - * if have all tuples are going to produce, end 1.41 - * else more, so repeat 1.42 + * Reads a production from the shared Q. 1.43 + * If empty, yields and tries again. 1.44 + * if more productions in tuple, repeat 1.45 + * if have more tuples, repeat 1.46 + * end thread 1.47 */ 1.48 while( tupleIter < params->numTuplesToCreate ) 1.49 { 1.50 - // increment tupleIter (global shared) 1.51 - // wake producers for next iter (don't need cond lock? -- teeter totter) 1.52 + if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter); 1.53 + 1.54 + // wake producers for next iter 1.55 DEBUG__printf("consumer broadcast for next iter\n"); 1.56 pthread_mutex_lock(&tupleIterLock); 1.57 - tupleIter += 1; 1.58 + tupleIter += 1; // increment tupleIter (global shared) 1.59 pthread_cond_broadcast( &tupleIterCond ); 1.60 pthread_mutex_unlock(&tupleIterLock); 1.61 - 1.62 - if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter); 1.63 1.64 for( numProducts = 0; numProducts < params->numProducers; numProducts++ ) 1.65 { 1.66 - //wait on productionReadyCond (suspend until there is a production) 1.67 - pthread_mutex_lock( &productionReadyLock ); 1.68 - while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID 1.69 - { 1.70 - pthread_cond_wait( &productionReadyCond, &productionReadyLock ); 1.71 + // read a production from the shared Q. 1.72 + production = NULL; 1.73 + while( production == NULL ) 1.74 + { pthread_mutex_lock( &queueAccessLock ); 1.75 + production = readPrivQ( commQ ); 1.76 + pthread_mutex_unlock( &queueAccessLock ); 1.77 + // If empty, yields and tries again. 1.78 + if( production == NULL) sched_yield(); 1.79 } 1.80 DEBUG__printf1( "consumer got production %d\n", currProductionNum ); 1.81 - lastSeenProductionNum = currProductionNum; 1.82 - pthread_mutex_unlock( &productionReadyLock ); 1.83 - 1.84 - //Read comm vars and add msg to current tuple 1.85 - //add production to tuple -- overhead meas, do nothing 1.86 - 1.87 - // increment ACK count 1.88 - pthread_mutex_lock( &consumerReceivedAckLock ); 1.89 - currConsumerReceivedACKNum += 1;//make different than last time prod saw 1.90 - 1.91 - // wake producer (who is waiting to be sure that send was received) 1.92 - DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum); 1.93 - pthread_cond_broadcast( &consumerReceivedAckCond ); 1.94 - pthread_mutex_unlock( &consumerReceivedAckLock ); 1.95 - } //if more productions for current tuple, repeat 1.96 + } //if more productions for current tuple, repeat 1.97 1.98 // have all productions for current tuple, so add tuple to output 1.99 //add tuple to output and malloc new tuple array -- overhead meas, do nothing
2.1 --- a/src/Application/main.c Wed Jul 10 14:13:46 2013 -0700 2.2 +++ b/src/Application/main.c Wed Jul 10 14:17:04 2013 -0700 2.3 @@ -119,14 +119,8 @@ 2.4 pthread_cond_init( &tupleIterCond, NULL ); 2.5 tupleIter = 0; 2.6 2.7 - pthread_mutex_init(&producerAccessMutex, NULL); 2.8 - pthread_mutex_init(&productionReadyLock, NULL); 2.9 - pthread_cond_init( &productionReadyCond, NULL ); 2.10 - currProductionNum = 0; 2.11 - 2.12 - pthread_mutex_init(&consumerReceivedAckLock, NULL); 2.13 - pthread_cond_init( &consumerReceivedAckCond, NULL ); 2.14 - currConsumerReceivedACKNum = 0; 2.15 + pthread_mutex_init( &queueAccessLock, NULL ); 2.16 + commQ = makePrivQ(); 2.17 } 2.18 2.19
3.1 --- a/src/Application/main.h Wed Jul 10 14:13:46 2013 -0700 3.2 +++ b/src/Application/main.h Wed Jul 10 14:17:04 2013 -0700 3.3 @@ -14,6 +14,8 @@ 3.4 #include <linux/perf_event.h> 3.5 #include <sys/syscall.h> 3.6 3.7 +#include "Queue_impl/PrivateQueue.h" 3.8 + 3.9 //========================== 3.10 //#define TURN_ON_DEBUG 3.11 3.12 @@ -147,16 +149,12 @@ 3.13 pthread_cond_t tupleIterCond; 3.14 int tupleIter; 3.15 3.16 -pthread_mutex_t producerAccessMutex; 3.17 -pthread_mutex_t productionReadyLock; 3.18 -pthread_cond_t productionReadyCond; 3.19 +pthread_mutex_t queueAccessLock; 3.20 +PrivQueueStruc *commQ; 3.21 + 3.22 int currProductionNum; 3.23 int producerMessage; 3.24 3.25 -pthread_mutex_t consumerReceivedAckLock; 3.26 -pthread_cond_t consumerReceivedAckCond; 3.27 -int currConsumerReceivedACKNum; 3.28 - 3.29 //======= 3.30 void* 3.31 producer_birthFn( void* _params );
4.1 --- a/src/Application/producer.c Wed Jul 10 14:13:46 2013 -0700 4.2 +++ b/src/Application/producer.c Wed Jul 10 14:17:04 2013 -0700 4.3 @@ -10,8 +10,6 @@ 4.4 * Producer. 4.5 * 4.6 * Birth function for thread that performs the producer behavior 4.7 - * 4.8 - * Note: is pinned to a core, to facilitate collecting measurements 4.9 */ 4.10 void* 4.11 producer_birthFn( void* _params ) 4.12 @@ -22,7 +20,6 @@ 4.13 ProducerParams *params = (ProducerParams *)_params; 4.14 4.15 lastTupleIter = 0; //compared to global tupleIter while waiting 4.16 - oldConsumerReceivedACKNum = 0; //used when waiting for consumer to receive 4.17 4.18 /* -------------------------------------------------- 4.19 * Pin thread to core, the producers are divided 4.20 @@ -44,12 +41,9 @@ 4.21 4.22 /*Protocol: 4.23 * wait for change in tupleIter (save updated tuple num for next time) 4.24 - * Get producer lock (only one producer at a time) 4.25 - * write into comm vars 4.26 - * get current ACK number 4.27 - * notify consumer 4.28 - * wait for ACK (get ACK lock, check on change in ACK number) 4.29 - * release producer lock 4.30 + * Get queue lock 4.31 + * write into queue 4.32 + * release queue lock 4.33 * if not done, repeat 4.34 */ 4.35 while( lastTupleIter < params->numTuplesToCreate ) 4.36 @@ -61,46 +55,18 @@ 4.37 pthread_cond_wait( &tupleIterCond, 4.38 &tupleIterLock ); 4.39 } 4.40 - pthread_mutex_unlock( &tupleIterLock ); 4.41 + pthread_mutex_unlock( &tupleIterLock ); 4.42 4.43 lastTupleIter = tupleIter; //save for next time through loop 4.44 4.45 DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter); 4.46 4.47 - //Two vars used to comm with consumer. One holds message to send, 4.48 - // other holds ID of producer sending. 4.49 - //Protect the two variables with a lock, that only one 4.50 - // producer can get. Update the variable with the message to be 4.51 - // communicated, and write ID of sender in second var. 4.52 + //Q used to comm with consumer, protected with a lock 4.53 4.54 //Get producer lock 4.55 - pthread_mutex_lock( &producerAccessMutex ); 4.56 - 4.57 - // write into comm vars 4.58 - producerMessage = tupleIter; //just a dummy -- overhead meas, do nothing 4.59 - currProductionNum += 1; 4.60 - 4.61 - // get current ACK number 4.62 - oldConsumerReceivedACKNum = currConsumerReceivedACKNum; 4.63 - 4.64 - // notify consumer (don't think need the cond lock here -- teeter-totter) 4.65 - pthread_mutex_lock( &productionReadyLock ); 4.66 - DEBUG__printf1("producer %d wrote msg, about to wake up consumer\n", params->producerID ); 4.67 - pthread_cond_broadcast( &productionReadyCond ); 4.68 - pthread_mutex_unlock( &productionReadyLock ); 4.69 - 4.70 - // wait for ACK (get ACK lock, check on change in ACK number) 4.71 - pthread_mutex_lock( &consumerReceivedAckLock ); 4.72 - while( currConsumerReceivedACKNum == oldConsumerReceivedACKNum ) 4.73 - { 4.74 - pthread_cond_wait( &consumerReceivedAckCond, 4.75 - &consumerReceivedAckLock ); 4.76 - } 4.77 - pthread_mutex_unlock( &consumerReceivedAckLock ); 4.78 - DEBUG__printf2("producer %d got ack %d\n", params->producerID, currConsumerReceivedACKNum ); 4.79 - 4.80 - // release producer lock (so different producer can get and send) 4.81 - pthread_mutex_unlock( &producerAccessMutex ); 4.82 + pthread_mutex_lock( &queueAccessLock ); 4.83 + writePrivQ( params, commQ ); //params is just a dummy pointer 4.84 + pthread_mutex_unlock( &queueAccessLock ); 4.85 } //if not done, do again 4.86 4.87 //Shutdown producer
