# HG changeset patch # User Sean Halle # Date 1374955810 25200 # Node ID 083298a6f7b6e3fd7b1623af593cc16cb4d28194 # Parent 1ea30ca7093c64cb3d84ce8dc13b689a92dc8798 new branch PR_univ_lib -- creates a static library, copied to /usr/lib/PR__lib diff -r 1ea30ca7093c -r 083298a6f7b6 BlockingQueue.c --- a/BlockingQueue.c Tue Jul 23 07:28:22 2013 -0700 +++ b/BlockingQueue.c Sat Jul 27 13:10:10 2013 -0700 @@ -14,13 +14,32 @@ #include #include "BlockingQueue.h" -#include "PR__common_includes/Services_offered_by_PR/Memory_Handling/vmalloc__wrapper_library.h" +#include +#include #define INC(x) (++x == 1024) ? (x) = 0 : (x) #define SPINLOCK_TRIES 100000 +//========================== pthread based queue ========================= +/*Not currently implemented.. however, copied this code from place where + * did equivalent.. Idea is to just make a private queue, then protect + * access with a lock.. copied code snippet below is how access was + * protected.. just roll this inside a "readBlockingQ()" function.. do + * equivalent inside writeBlockingQ() function.. to make one, just add + * the lock to the queue structure.. + */ +/* + production = NULL; + while( production == NULL ) + { pthread_mutex_lock( &queueAccessLock ); + production = readPrivQ( commQ ); + pthread_mutex_unlock( &queueAccessLock ); + // If empty, yields and tries again. + if( production == NULL) sched_yield(); + } +*/ //=========================================================================== // multi reader multi writer fast Q via CAS @@ -36,7 +55,7 @@ CASQueueStruc* makeCASQ() { CASQueueStruc* retQ; - retQ = (CASQueueStruc *) PR_WL__malloc( sizeof( CASQueueStruc ) ); + retQ = (CASQueueStruc *) PR__malloc( sizeof( CASQueueStruc ) ); retQ->insertLock = UNLOCKED; retQ->extractLock= UNLOCKED; @@ -161,7 +180,7 @@ SRSWQueueStruc* makeSRSWQ() { SRSWQueueStruc* retQ; - retQ = (SRSWQueueStruc *) PR_WL__malloc( sizeof( SRSWQueueStruc ) ); + retQ = (SRSWQueueStruc *) PR__malloc( sizeof( SRSWQueueStruc ) ); memset( retQ->startOfData, 0, 1024 * sizeof(void *) ); retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty @@ -300,11 +319,11 @@ SRMWQueueStruc* makeSRMWQ() { SRMWQueueStruc* retQ; - retQ = (SRMWQueueStruc *) PR_WL__malloc( sizeof( SRMWQueueStruc ) ); + retQ = (SRMWQueueStruc *) PR__malloc( sizeof( SRMWQueueStruc ) ); retQ->numInternalQs = 0; retQ->internalQsSz = 10; - retQ->internalQs = PR_WL__malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *)); + retQ->internalQs = PR__malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *)); retQ->lastQReadFrom = 0; @@ -330,7 +349,7 @@ oldSz = Q->internalQsSz; oldArray = Q->internalQs; Q->internalQsSz *= 2; - Q->internalQs = PR_WL__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *)); + Q->internalQs = PR__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *)); for( i = 0; i < oldSz; i++ ) { Q->internalQs[i] = oldArray[i]; } diff -r 1ea30ca7093c -r 083298a6f7b6 BlockingQueue.h --- a/BlockingQueue.h Tue Jul 23 07:28:22 2013 -0700 +++ b/BlockingQueue.h Sat Jul 27 13:10:10 2013 -0700 @@ -8,7 +8,8 @@ #ifndef _BLOCKINGQUEUE_H #define _BLOCKINGQUEUE_H -#include "PR__common_includes/PR__primitive_data_types.h" +#include +#include #define TRUE 1 @@ -17,76 +18,5 @@ #define LOCKED 1 #define UNLOCKED 0 - -//========== pThreads based queue ========== -/* It is the data that is shared so only need one mutex. */ -typedef -struct - { pthread_mutex_t mutex_t; - pthread_cond_t cond_w_t; - pthread_cond_t cond_r_t; - int32 count; - int32 readPos; - int32 writePos; - void* data[1024]; //an array of pointers - int w_empty; - int w_full; - } -PThdQueueStruc; - -PThdQueueStruc* makePThdQ(); -void* readPThdQ( PThdQueueStruc *Q ); -void writePThdQ( void *in, PThdQueueStruc *Q ); - - -//========== CAS based queue ========== -typedef -struct - { volatile int32 insertLock; - volatile int32 extractLock; - volatile void* *insertPos; - volatile void* *extractPos; - void* startOfData[1024]; //data is pointers - void* *endOfData; //set when make queue - } -CASQueueStruc; - -CASQueueStruc* makeCASQ(); -void* readCASQ( CASQueueStruc *Q ); -void writeCASQ( void *in, CASQueueStruc *Q ); - - -//========= non-atomic instr based queue =========== -typedef -struct - { void* *insertPos; - void* *extractPos; - void* startOfData[1024]; //data is pointers - void* *endOfData; //set when make queue - } -SRSWQueueStruc; - -SRSWQueueStruc* makeSRSWQ(); -void freeSRSWQ( SRSWQueueStruc* Q ); -void* readSRSWQ( SRSWQueueStruc *Q ); -void writeSRSWQ( void *in, SRSWQueueStruc *Q ); - - -//========= non-atomic instr S R M W queue =========== -typedef -struct - { int32 lastQReadFrom; - int32 numInternalQs; - int32 internalQsSz; - SRSWQueueStruc* *internalQs; - } -SRMWQueueStruc; - -SRMWQueueStruc* makeSRMWQ(); -int addWriterToSRMWQ( SRMWQueueStruc *Q ); -void* readSRMWQ( SRMWQueueStruc *Q ); -void writeSRMWQ( void *in, SRMWQueueStruc *Q, int writerID ); - - #endif /* _BLOCKINGQUEUE_H */ diff -r 1ea30ca7093c -r 083298a6f7b6 PrivateQueue.c --- a/PrivateQueue.c Tue Jul 23 07:28:22 2013 -0700 +++ b/PrivateQueue.c Sat Jul 27 13:10:10 2013 -0700 @@ -14,7 +14,8 @@ #include #include "PrivateQueue.h" -#include "PR__common_includes/Services_offered_by_PR/Memory_Handling/vmalloc__wrapper_library.h" +#include +#include diff -r 1ea30ca7093c -r 083298a6f7b6 PrivateQueue.h --- a/PrivateQueue.h Tue Jul 23 07:28:22 2013 -0700 +++ b/PrivateQueue.h Sat Jul 27 13:10:10 2013 -0700 @@ -8,7 +8,7 @@ #ifndef _PRIVATE_QUEUE_H #define _PRIVATE_QUEUE_H -#include "PR__common_includes/PR__primitive_data_types.h" +#include #define TRUE 1 #define FALSE 0 @@ -17,27 +17,5 @@ #define UNLOCKED 0 -/* It is the data that is shared so only need one mutex. */ -typedef struct - { void **insertPos; - void **extractPos; - void **startOfData; //data is pointers - void **endOfData; //set when alloc data - } -PrivQueueStruc; - -typedef void (*DynArrayFnPtr) ( void * ); //fn has to cast void * - -PrivQueueStruc* makePrivQ ( ); -bool32 isEmptyPrivQ ( PrivQueueStruc *Q ); //ret TRUE if empty -void* peekPrivQ ( PrivQueueStruc *Q ); //ret NULL if empty -void* readPrivQ ( PrivQueueStruc *Q ); //ret NULL if empty -void writePrivQ( void *in, PrivQueueStruc *Q ); - //return false when full -bool32 writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ); -int32 numInPrivQ( PrivQueueStruc *Q ); -void pushPrivQ( void * in, PrivQueueStruc* Q ); -void freePrivQ( PrivQueueStruc *Q ); - #endif /* _PRIVATE_QUEUE_H */ diff -r 1ea30ca7093c -r 083298a6f7b6 __brch__PR_univ_lib --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/__brch__PR_univ_lib Sat Jul 27 13:10:10 2013 -0700 @@ -0,0 +1,4 @@ +This branch is for developing the queues used by proto-runtime code.. and packaging as a static library. + + + diff -r 1ea30ca7093c -r 083298a6f7b6 __brch__Univ_dev --- a/__brch__Univ_dev Tue Jul 23 07:28:22 2013 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,4 +0,0 @@ -This branch is for the project structure defined Jan 2012.. the #includes reflect this directory structure. - -More importantly, the MC_shared version of PR requires a separat malloc implemeted by PR code.. so this branch has modified the library to use the PR-specific malloc. - diff -r 1ea30ca7093c -r 083298a6f7b6 prqueue.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/prqueue.h Sat Jul 27 13:10:10 2013 -0700 @@ -0,0 +1,114 @@ +/* + * Copyright 2009 OpenSourceResearchInstitute.org + * Licensed under GNU General Public License version 2 + * + * Author: seanhalle@yahoo.com + */ + +#ifndef _PRQUEUE_H +#define _PRQUEUE_H + +#include +#include + +#define TRUE 1 +#define FALSE 0 + +//================== Private Queue stuff =================== +/* It is the data that is shared so only need one mutex. */ +typedef struct + { void **insertPos; + void **extractPos; + void **startOfData; //data is pointers + void **endOfData; //set when alloc data + } +PrivQueueStruc; + +typedef void (*DynArrayFnPtr) ( void * ); //fn has to cast void * + +PrivQueueStruc* makePrivQ ( ); +bool32 isEmptyPrivQ ( PrivQueueStruc *Q ); //ret TRUE if empty +void* peekPrivQ ( PrivQueueStruc *Q ); //ret NULL if empty +void* readPrivQ ( PrivQueueStruc *Q ); //ret NULL if empty +void writePrivQ( void *in, PrivQueueStruc *Q ); + //return false when full +bool32 writeIfSpacePrivQ( void * in, PrivQueueStruc* Q ); +int32 numInPrivQ( PrivQueueStruc *Q ); +void pushPrivQ( void * in, PrivQueueStruc* Q ); +void freePrivQ( PrivQueueStruc *Q ); + + +//====================== Parallel Queue Stuff ==================== + +//========== pThreads based queue ========== +/* It is the data that is shared so only need one mutex. */ +typedef +struct + { pthread_mutex_t mutex_t; + pthread_cond_t cond_w_t; + pthread_cond_t cond_r_t; + int32 count; + int32 readPos; + int32 writePos; + void* data[1024]; //an array of pointers + int w_empty; + int w_full; + } +PThdQueueStruc; + +PThdQueueStruc* makePThdQ(); +void* readPThdQ( PThdQueueStruc *Q ); +void writePThdQ( void *in, PThdQueueStruc *Q ); + + +//========== CAS based queue ========== +typedef +struct + { volatile int32 insertLock; + volatile int32 extractLock; + volatile void* *insertPos; + volatile void* *extractPos; + void* startOfData[1024]; //data is pointers + void* *endOfData; //set when make queue + } +CASQueueStruc; + +CASQueueStruc* makeCASQ(); +void* readCASQ( CASQueueStruc *Q ); +void writeCASQ( void *in, CASQueueStruc *Q ); + + +//========= non-atomic instr based queue =========== +typedef +struct + { void* *insertPos; + void* *extractPos; + void* startOfData[1024]; //data is pointers + void* *endOfData; //set when make queue + } +SRSWQueueStruc; + +SRSWQueueStruc* makeSRSWQ(); +void freeSRSWQ( SRSWQueueStruc* Q ); +void* readSRSWQ( SRSWQueueStruc *Q ); +void writeSRSWQ( void *in, SRSWQueueStruc *Q ); + + +//========= non-atomic instr S R M W queue =========== +typedef +struct + { int32 lastQReadFrom; + int32 numInternalQs; + int32 internalQsSz; + SRSWQueueStruc* *internalQs; + } +SRMWQueueStruc; + +SRMWQueueStruc* makeSRMWQ(); +int addWriterToSRMWQ( SRMWQueueStruc *Q ); +void* readSRMWQ( SRMWQueueStruc *Q ); +void writeSRMWQ( void *in, SRMWQueueStruc *Q, int writerID ); + + +#endif /* _PRIVATE_QUEUE_H */ +