changeset 6:174a7c2ca340

Works with sequential version -- not sure changes, but works
author Me
date Wed, 28 Jul 2010 13:13:01 -0700
parents 228ca5487d81
children 08f0b4da7610
files BlockingQueue.c PrivateQueue.c PrivateQueue.h
diffstat 3 files changed, 33 insertions(+), 45 deletions(-) [+]
line diff
     1.1 --- a/BlockingQueue.c	Wed Jun 30 14:35:04 2010 -0700
     1.2 +++ b/BlockingQueue.c	Wed Jul 28 13:13:01 2010 -0700
     1.3 @@ -11,7 +11,6 @@
     1.4  #include <pthread.h>
     1.5  #include <stdlib.h>
     1.6  #include <sched.h>
     1.7 -#include <windows.h>
     1.8  
     1.9  #include "BlockingQueue.h"
    1.10  
    1.11 @@ -25,55 +24,39 @@
    1.12  PThdQueueStruc* makePThdQ()
    1.13   {
    1.14     PThdQueueStruc* retQ;
    1.15 -   int status;
    1.16 +   int retCode;
    1.17     retQ = (PThdQueueStruc *) malloc( sizeof( PThdQueueStruc ) );
    1.18  
    1.19  
    1.20 -   status = pthread_mutex_init( &retQ->mutex_t,  NULL);
    1.21 -   if (status < 0)
    1.22 -    {
    1.23 -      perror("Error in creating mutex:");
    1.24 -      exit(1);
    1.25 -      return NULL;
    1.26 -    }
    1.27 +   retCode =
    1.28 +   pthread_mutex_init( &retQ->mutex_t,  NULL);
    1.29 +   if(retCode){perror("Error in creating mutex:"); exit(1);}
    1.30  
    1.31 -   status = pthread_cond_init ( &retQ->cond_w_t, NULL);
    1.32 -   if (status < 0)
    1.33 -    {
    1.34 -      perror("Error in creating cond_var:");
    1.35 -      exit(1);
    1.36 -      return NULL;
    1.37 -    }
    1.38 +   retCode = pthread_cond_init ( &retQ->cond_w_t, NULL);
    1.39 +   if(retCode){perror("Error in creating cond_var:"); exit(1);}
    1.40  
    1.41 -   status = pthread_cond_init ( &retQ->cond_r_t, NULL);
    1.42 -   if (status < 0)
    1.43 -    {
    1.44 -      perror("Error in creating cond_var:");
    1.45 -      exit(1);
    1.46 -      return NULL;
    1.47 -    }
    1.48 +   retCode = pthread_cond_init ( &retQ->cond_r_t, NULL);
    1.49 +   if(retCode){perror("Error in creating cond_var:"); exit(1);}
    1.50  
    1.51     retQ->count    = 0;
    1.52     retQ->readPos  = 0;
    1.53     retQ->writePos = 0;
    1.54 -   retQ -> w_empty = retQ -> w_full = 0;
    1.55 +   retQ->w_empty  = 0;
    1.56 +   retQ->w_full   = 0;
    1.57  
    1.58     return retQ;
    1.59   }
    1.60  
    1.61  void * readPThdQ( PThdQueueStruc *Q )
    1.62   { void *ret;
    1.63 -   int status, wt;
    1.64 +   int retCode, wt;
    1.65     pthread_mutex_lock( &Q->mutex_t );
    1.66      {
    1.67        while( Q -> count == 0 )
    1.68         { Q -> w_empty = 1;
    1.69 -         // pthread_cond_broadcast( &Q->cond_w_t );
    1.70 -         status = pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
    1.71 -         if (status != 0)
    1.72 -          { perror("Thread wait error: ");
    1.73 -            exit(1);
    1.74 -          }
    1.75 +         retCode =
    1.76 +         pthread_cond_wait( &Q->cond_r_t, &Q->mutex_t );
    1.77 +         if( retCode ){ perror("Thread wait error: "); exit(1); }
    1.78         }
    1.79        Q -> w_empty = 0;
    1.80        Q -> count -= 1;
    1.81 @@ -81,37 +64,40 @@
    1.82        INC( Q->readPos );
    1.83        wt = Q -> w_full;
    1.84        Q -> w_full = 0;
    1.85 -      //pthread_cond_broadcast( &Q->cond_w_t );
    1.86      }
    1.87     pthread_mutex_unlock( &Q->mutex_t );
    1.88 -   if (wt)  pthread_cond_signal( &Q->cond_w_t );
    1.89 +   if (wt)
    1.90 +      pthread_cond_signal( &Q->cond_w_t );
    1.91  
    1.92 +         //printf("Q out: %d\n", ret);
    1.93     return( ret );
    1.94   }
    1.95  
    1.96  void writePThdQ( void * in, PThdQueueStruc* Q )
    1.97   {
    1.98     int status, wt;
    1.99 +         //printf("Q in: %d\n", in);
   1.100 +
   1.101     pthread_mutex_lock( &Q->mutex_t );
   1.102      {
   1.103        while( Q->count >= 1024 )
   1.104         {
   1.105           Q -> w_full = 1;
   1.106 -         //	 pthread_cond_broadcast( &Q->cond_r_t );
   1.107           status = pthread_cond_wait( &Q->cond_w_t, &Q->mutex_t );
   1.108           if (status != 0)
   1.109            { perror("Thread wait error: ");
   1.110              exit(1);
   1.111            }
   1.112         }
   1.113 +
   1.114        Q -> w_full = 0;
   1.115        Q->count += 1;
   1.116        Q->data[ Q->writePos ] = in;
   1.117        INC( Q->writePos );
   1.118        wt = Q -> w_empty;
   1.119        Q -> w_empty = 0;
   1.120 -      //    pthread_cond_broadcast( &Q->cond_r_t );
   1.121      }
   1.122 +
   1.123     pthread_mutex_unlock( &Q->mutex_t );
   1.124     if( wt )  pthread_cond_signal( &Q->cond_r_t );
   1.125   }
   1.126 @@ -182,7 +168,7 @@
   1.127         }
   1.128           //Q is busy or empty
   1.129        tries++;
   1.130 -      if( tries > SPINLOCK_TRIES ) SwitchToThread(); //WinAPI  yield()
   1.131 +      if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
   1.132      }
   1.133   }
   1.134  
   1.135 @@ -224,7 +210,7 @@
   1.136            }
   1.137         }
   1.138        tries++;
   1.139 -      if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
   1.140 +      if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
   1.141      }
   1.142   }
   1.143  
   1.144 @@ -277,7 +263,7 @@
   1.145         }
   1.146           //Q is empty
   1.147        tries++;
   1.148 -      if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
   1.149 +      if( tries > SPINLOCK_TRIES ) pthread_yield();
   1.150      }
   1.151   }
   1.152  
   1.153 @@ -317,7 +303,7 @@
   1.154         }
   1.155           //Q is full
   1.156        tries++;
   1.157 -      if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
   1.158 +      if( tries > SPINLOCK_TRIES ) pthread_yield();
   1.159      }
   1.160   }
   1.161  
   1.162 @@ -455,7 +441,7 @@
   1.163         {    //check if all queues have been tried
   1.164           if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
   1.165            { tries++; //give a writer a chance to finish before yield
   1.166 -            if( tries > SPINLOCK_TRIES ) SwitchToThread(); //Win yield()
   1.167 +            if( tries > SPINLOCK_TRIES ) pthread_yield();
   1.168            }
   1.169         }
   1.170      }
     2.1 --- a/PrivateQueue.c	Wed Jun 30 14:35:04 2010 -0700
     2.2 +++ b/PrivateQueue.c	Wed Jul 28 13:13:01 2010 -0700
     2.3 @@ -45,6 +45,7 @@
     2.4  
     2.5     oldSize           = Q->endOfData - Q->startOfData;
     2.6     newSize           = 2 * oldSize;
     2.7 +   oldStartOfData = Q->startOfData;
     2.8     Q->startOfData = malloc( newSize * sizeof(void *) );
     2.9     memcpy(Q->startOfData, oldStartOfData, oldSize * sizeof(void *));
    2.10     free(oldStartOfData);
    2.11 @@ -65,7 +66,7 @@
    2.12     void **insertPos  = Q->insertPos;
    2.13     void **extractPos = Q->extractPos;
    2.14  
    2.15 -      //if not empty -- extract just below insert when empty
    2.16 +      //if not empty -- (extract is just below insert when empty)
    2.17     if( insertPos - extractPos != 1 &&
    2.18         !(extractPos == endOfData && insertPos == startOfData))
    2.19      {    //move before read
    2.20 @@ -86,7 +87,7 @@
    2.21  /*Expands the queue size automatically when it's full
    2.22   */
    2.23  void
    2.24 -writeAndEnlargePrivQ( void * in, PrivQueueStruc* Q )
    2.25 +writePrivQ( void * in, PrivQueueStruc* Q )
    2.26   {
    2.27     void **startOfData = Q->startOfData;
    2.28     void **endOfData   = Q->endOfData;
    2.29 @@ -95,6 +96,7 @@
    2.30     void **extractPos = Q->extractPos;
    2.31  
    2.32  tryAgain:
    2.33 +      //Full? (insert is just below extract when full)
    2.34     if( extractPos - insertPos != 1 &&
    2.35         !(insertPos == endOfData && extractPos == startOfData))
    2.36      { *(Q->insertPos) = in;   //insert before move
    2.37 @@ -115,7 +117,7 @@
    2.38  /*Returns false when the queue was full.
    2.39   * have option of calling make_larger_PrivQ to make more room, then try again
    2.40   */
    2.41 -int writeAndFailPrivQ( void * in, PrivQueueStruc* Q )
    2.42 +int writeIfSpacePrivQ( void * in, PrivQueueStruc* Q )
    2.43   {
    2.44     void **startOfData = Q->startOfData;
    2.45     void **endOfData   = Q->endOfData;
     3.1 --- a/PrivateQueue.h	Wed Jun 30 14:35:04 2010 -0700
     3.2 +++ b/PrivateQueue.h	Wed Jul 28 13:13:01 2010 -0700
     3.3 @@ -29,8 +29,8 @@
     3.4  
     3.5  PrivQueueStruc*  makePrivQ ( );
     3.6  void*            readPrivQ ( PrivQueueStruc *Q );
     3.7 -void             writeAndEnlargePrivQ( void *in, PrivQueueStruc *Q );
     3.8 -int              writeAndFailPrivQ( void * in, PrivQueueStruc* Q ); //return
     3.9 +void             writePrivQ( void *in, PrivQueueStruc *Q );
    3.10 +int              writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ); //return
    3.11                      // false when full
    3.12  
    3.13  #endif	/* _PRIVATE_QUEUE_H */