changeset 1:88db7b62b961 queue tip

Working queue based version
author Sean Halle <seanhalle@yahoo.com>
date Wed, 10 Jul 2013 14:17:04 -0700
parents 9cf9b2091eeb
children
files src/Application/consumer.c src/Application/main.c src/Application/main.h src/Application/producer.c
diffstat 4 files changed, 39 insertions(+), 100 deletions(-) [+]
line diff
     1.1 --- a/src/Application/consumer.c	Wed Jul 10 14:13:46 2013 -0700
     1.2 +++ b/src/Application/consumer.c	Wed Jul 10 14:17:04 2013 -0700
     1.3 @@ -11,20 +11,16 @@
     1.4   * Birth function for thread that performs the consumer behavior
     1.5   * 
     1.6   *Here's the protocol: 
     1.7 - *Consumer is born waiting for some producer to send it a production.
     1.8 - *When it wakes, it reads the variables used for communication, and packages
     1.9 - * the information into a tuple it is constructing.
    1.10 - *Then it wakes the producer, who is waiting to be sure that send was received.
    1.11 - *If the tuple is not yet complete, it loops back for another production.
    1.12 - *When tuple complete, it adds that tuple to the output
    1.13 - *If that's the last tuple, it ends itself
    1.14 - *If not, then wakes all the producers, who go to the next iteration.
    1.15 - *Then loops back to wait for some producer to send it a production
    1.16 + * Reads a production from the shared Q. 
    1.17 + *   If empty, yields and tries again.
    1.18 + * When has a production from every producer, broadcasts next iter to producers
    1.19 + * When has all the tuples, end
    1.20   */
    1.21  void* 
    1.22  consumer_birthFn( void* _params )
    1.23   {
    1.24 -   int lastSeenProductionNum, numProducts;
    1.25 +   int numProducts;
    1.26 +   void *production; //dummy ptr
    1.27     
    1.28     ConsumerParams* params = (ConsumerParams *)_params;
    1.29         
    1.30 @@ -36,51 +32,36 @@
    1.31      *Protocol:
    1.32      * increment tupleIter
    1.33      * wake producers for next tuple
    1.34 -    * wait on production ready
    1.35 -    * read comm vars and add msg to current tuple
    1.36 -    * increment ACK count
    1.37 -    * wake producer (who is waiting for ack)
    1.38 -    * if more productions for current tuple, repeat
    1.39 -    * have all productions for current tuple, so add tuple to output
    1.40 -    * if have all tuples are going to produce, end
    1.41 -    * else more, so  repeat
    1.42 +    * Reads a production from the shared Q. 
    1.43 +    *   If empty, yields and tries again.
    1.44 +    * if more productions in tuple, repeat
    1.45 +    * if have more tuples, repeat
    1.46 +    * end thread
    1.47      */
    1.48     while( tupleIter < params->numTuplesToCreate )
    1.49      {
    1.50 -      // increment tupleIter (global shared)
    1.51 -      // wake producers for next iter (don't need cond lock? -- teeter totter)
    1.52 +      if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter);
    1.53 +       
    1.54 +      // wake producers for next iter
    1.55        DEBUG__printf("consumer broadcast for next iter\n");
    1.56        pthread_mutex_lock(&tupleIterLock);
    1.57 -      tupleIter += 1;
    1.58 +      tupleIter += 1; // increment tupleIter (global shared)
    1.59        pthread_cond_broadcast( &tupleIterCond );
    1.60        pthread_mutex_unlock(&tupleIterLock);
    1.61 -	  
    1.62 -      if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter);       
    1.63           
    1.64        for( numProducts = 0; numProducts < params->numProducers; numProducts++ )
    1.65         { 
    1.66 -         //wait on productionReadyCond  (suspend until there is a production)
    1.67 -         pthread_mutex_lock( &productionReadyLock );
    1.68 -         while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID
    1.69 -          {
    1.70 -            pthread_cond_wait( &productionReadyCond, &productionReadyLock );
    1.71 +         // read a production from the shared Q.
    1.72 +         production = NULL;
    1.73 +         while( production == NULL )
    1.74 +          { pthread_mutex_lock( &queueAccessLock );
    1.75 +            production = readPrivQ( commQ );
    1.76 +            pthread_mutex_unlock( &queueAccessLock );
    1.77 +            // If empty, yields and tries again.
    1.78 +            if( production == NULL) sched_yield();
    1.79            }
    1.80           DEBUG__printf1( "consumer got production %d\n", currProductionNum );
    1.81 -         lastSeenProductionNum = currProductionNum;
    1.82 -         pthread_mutex_unlock( &productionReadyLock );
    1.83 -                  
    1.84 -         //Read comm vars and add msg to current tuple
    1.85 -         //add production to tuple -- overhead meas, do nothing
    1.86 -      
    1.87 -         // increment ACK count
    1.88 -         pthread_mutex_lock(     &consumerReceivedAckLock  );
    1.89 -         currConsumerReceivedACKNum += 1;//make different than last time prod saw
    1.90 -         
    1.91 -         // wake producer (who is waiting to be sure that send was received)
    1.92 -         DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum);
    1.93 -         pthread_cond_broadcast( &consumerReceivedAckCond );
    1.94 -         pthread_mutex_unlock(     &consumerReceivedAckLock  );
    1.95 -    } //if more productions for current tuple, repeat
    1.96 +       } //if more productions for current tuple, repeat
    1.97      
    1.98        // have all productions for current tuple, so add tuple to output
    1.99        //add tuple to output and malloc new tuple array -- overhead meas, do nothing
     2.1 --- a/src/Application/main.c	Wed Jul 10 14:13:46 2013 -0700
     2.2 +++ b/src/Application/main.c	Wed Jul 10 14:17:04 2013 -0700
     2.3 @@ -119,14 +119,8 @@
     2.4     pthread_cond_init( &tupleIterCond, NULL );
     2.5     tupleIter = 0;
     2.6  
     2.7 -   pthread_mutex_init(&producerAccessMutex, NULL);
     2.8 -   pthread_mutex_init(&productionReadyLock, NULL);
     2.9 -   pthread_cond_init( &productionReadyCond, NULL );
    2.10 -   currProductionNum = 0;
    2.11 -
    2.12 -   pthread_mutex_init(&consumerReceivedAckLock, NULL);
    2.13 -   pthread_cond_init( &consumerReceivedAckCond, NULL );
    2.14 -   currConsumerReceivedACKNum = 0;
    2.15 +   pthread_mutex_init( &queueAccessLock, NULL );
    2.16 +   commQ = makePrivQ();
    2.17   }
    2.18  
    2.19  
     3.1 --- a/src/Application/main.h	Wed Jul 10 14:13:46 2013 -0700
     3.2 +++ b/src/Application/main.h	Wed Jul 10 14:17:04 2013 -0700
     3.3 @@ -14,6 +14,8 @@
     3.4  #include <linux/perf_event.h>
     3.5  #include <sys/syscall.h>
     3.6  
     3.7 +#include "Queue_impl/PrivateQueue.h"
     3.8 +
     3.9  //==========================
    3.10  //#define TURN_ON_DEBUG
    3.11  
    3.12 @@ -147,16 +149,12 @@
    3.13  pthread_cond_t  tupleIterCond;
    3.14  int tupleIter;
    3.15  
    3.16 -pthread_mutex_t producerAccessMutex;
    3.17 -pthread_mutex_t productionReadyLock;
    3.18 -pthread_cond_t  productionReadyCond;
    3.19 +pthread_mutex_t queueAccessLock;
    3.20 +PrivQueueStruc *commQ;
    3.21 +
    3.22  int currProductionNum;
    3.23  int producerMessage;
    3.24  
    3.25 -pthread_mutex_t consumerReceivedAckLock;
    3.26 -pthread_cond_t  consumerReceivedAckCond;
    3.27 -int currConsumerReceivedACKNum;
    3.28 -
    3.29  //=======
    3.30  void* 
    3.31  producer_birthFn( void* _params );
     4.1 --- a/src/Application/producer.c	Wed Jul 10 14:13:46 2013 -0700
     4.2 +++ b/src/Application/producer.c	Wed Jul 10 14:17:04 2013 -0700
     4.3 @@ -10,8 +10,6 @@
     4.4   * Producer.
     4.5   * 
     4.6   * Birth function for thread that performs the producer behavior
     4.7 - * 
     4.8 - * Note: is pinned to a core, to facilitate collecting measurements
     4.9   */
    4.10  void* 
    4.11  producer_birthFn( void* _params )
    4.12 @@ -22,7 +20,6 @@
    4.13     ProducerParams *params = (ProducerParams *)_params;
    4.14      
    4.15     lastTupleIter = 0; //compared to global tupleIter while waiting
    4.16 -   oldConsumerReceivedACKNum = 0; //used when waiting for consumer to receive
    4.17     
    4.18     /* --------------------------------------------------
    4.19      * Pin thread to core, the producers are divided
    4.20 @@ -44,12 +41,9 @@
    4.21         
    4.22     /*Protocol:
    4.23      * wait for change in tupleIter (save updated tuple num for next time)
    4.24 -    * Get producer lock (only one producer at a time)
    4.25 -    * write into comm vars
    4.26 -    * get current ACK number
    4.27 -    * notify consumer
    4.28 -    * wait for ACK (get ACK lock, check on change in ACK number)
    4.29 -    * release producer lock
    4.30 +    * Get queue lock 
    4.31 +    * write into queue
    4.32 +    * release queue lock
    4.33      * if not done, repeat
    4.34      */
    4.35     while( lastTupleIter < params->numTuplesToCreate )
    4.36 @@ -61,46 +55,18 @@
    4.37           pthread_cond_wait( &tupleIterCond,
    4.38                              &tupleIterLock );
    4.39         }
    4.40 -      pthread_mutex_unlock(   &tupleIterLock  );
    4.41 +      pthread_mutex_unlock( &tupleIterLock  );
    4.42        
    4.43        lastTupleIter = tupleIter; //save for next time through loop
    4.44        
    4.45        DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter);
    4.46        
    4.47 -      //Two vars used to comm with consumer.  One holds message to send,
    4.48 -      // other holds ID of producer sending.
    4.49 -      //Protect the two variables with a lock, that only one
    4.50 -      // producer can get.  Update the variable with the message to be 
    4.51 -      // communicated, and write ID of sender in second var.
    4.52 +      //Q used to comm with consumer, protected with a lock  
    4.53  
    4.54        //Get producer lock
    4.55 -      pthread_mutex_lock( &producerAccessMutex );
    4.56 -      
    4.57 -      // write into comm vars
    4.58 -      producerMessage = tupleIter;  //just a dummy -- overhead meas, do nothing
    4.59 -      currProductionNum += 1;
    4.60 -      
    4.61 -      // get current ACK number
    4.62 -      oldConsumerReceivedACKNum = currConsumerReceivedACKNum;
    4.63 -
    4.64 -      // notify consumer (don't think need the cond lock here -- teeter-totter)
    4.65 -      pthread_mutex_lock(     &productionReadyLock  );
    4.66 -      DEBUG__printf1("producer %d wrote msg, about to wake up consumer\n", params->producerID );
    4.67 -      pthread_cond_broadcast( &productionReadyCond );
    4.68 -      pthread_mutex_unlock(   &productionReadyLock  );
    4.69 -
    4.70 -      // wait for ACK (get ACK lock, check on change in ACK number)
    4.71 -      pthread_mutex_lock(     &consumerReceivedAckLock  );
    4.72 -      while( currConsumerReceivedACKNum == oldConsumerReceivedACKNum )
    4.73 -       {
    4.74 -         pthread_cond_wait( &consumerReceivedAckCond,
    4.75 -                            &consumerReceivedAckLock );
    4.76 -       }
    4.77 -      pthread_mutex_unlock(   &consumerReceivedAckLock  );
    4.78 -      DEBUG__printf2("producer %d got ack %d\n", params->producerID, currConsumerReceivedACKNum );
    4.79 -
    4.80 -      // release producer lock (so different producer can get and send)
    4.81 -      pthread_mutex_unlock(   &producerAccessMutex );  
    4.82 +      pthread_mutex_lock( &queueAccessLock );
    4.83 +      writePrivQ( params, commQ ); //params is just a dummy pointer 
    4.84 +      pthread_mutex_unlock( &queueAccessLock );
    4.85      } //if not done, do again  
    4.86  
    4.87     //Shutdown producer