/*
 *  Copyright 2009 OpenSourceResearchInstitute.org
 *  Licensed under GNU General Public License version 2
 *
 * Author: seanhalle@yahoo.com
 */


#include <stdio.h>
#include <errno.h>
#include <pthread.h>
#include <stdlib.h>
#include <sched.h>
#include <string.h>

#include "BlockingQueue.h"
#include <PR__include/prqueue.h>
#include <PR__include/prmalloc.h>

#define INC(x) (++x == 1024) ? (x) = 0 : (x)

#define SPINLOCK_TRIES 100000


//========================== pthread based queue =========================
/*Not currently implemented..  however, copied this code from place where 
 * did equivalent..  Idea is to just make a private queue, then protect
 * access with a lock.. copied code snippet below is how access was 
 * protected..  just roll this inside a "readBlockingQ()" function..  do
 * equivalent inside writeBlockingQ() function..  to make one, just add
 * the lock to the queue structure..
 */
/*
         production = NULL;
         while( production == NULL )
          { pthread_mutex_lock( &queueAccessLock );
            production = readPrivQ( commQ );
            pthread_mutex_unlock( &queueAccessLock );
            // If empty, yields and tries again.
            if( production == NULL) sched_yield();
          }
*/

//===========================================================================
// multi reader  multi writer fast Q   via CAS
#ifndef _GNU_SOURCE
#define _GNU_SOURCE

/*This is a blocking queue, but it uses CAS instr plus yield() when empty
 * or full
 *It uses CAS because it's meant to have more than one reader and more than
 * one writer.
 */

CASQueueStruc* makeCASQ()
 {
   CASQueueStruc* retQ;
   retQ = (CASQueueStruc *) PR__malloc( sizeof( CASQueueStruc ) );

   retQ->insertLock = UNLOCKED;
   retQ->extractLock= UNLOCKED;
   
   retQ->extractPos = (volatile void**)&(retQ->startOfData[0]); //side by side == empty
   retQ->insertPos  = (volatile void**)&(retQ->startOfData[1]); // so start pos's have to be
   retQ->endOfData  = &(retQ->startOfData[1023]);

   return retQ;
 }


void* readCASQ( CASQueueStruc* Q )
 { void  *out    = 0;
   int32  tries  = 0;
   void **startOfData = Q->startOfData;
   void **endOfData   = Q->endOfData;

   int32 gotLock = FALSE;

   while( TRUE )
    {    //this intrinsic returns true if the lock held "UNLOCKED", in which
         // case it now holds "LOCKED" -- if it already held "LOCKED", then
         // gotLock is FALSE
      gotLock =
         __sync_bool_compare_and_swap( &(Q->extractLock), UNLOCKED, LOCKED );
            //NOTE: checked assy, and it does lock correctly..
      if( gotLock )
       {
         void **insertPos  = (void **)Q->insertPos;
         void **extractPos = (void **)Q->extractPos;

            //if not empty -- extract just below insert when empty
         if( insertPos - extractPos != 1 &&
             !(extractPos == endOfData && insertPos == startOfData))
          {    //move before read
            if( extractPos == endOfData ) //write new pos exactly once, correctly
             { Q->extractPos = startOfData; //can't overrun then fix it 'cause
             }                              // other thread might read bad pos
            else
             { Q->extractPos++;
             }
            out = (void *) *(Q->extractPos);
            Q->extractLock = UNLOCKED;
            return out;
          }
         else //Q is empty
          { Q->extractLock = UNLOCKED;//empty, so release lock for others
          }
       }
         //Q is busy or empty
      tries++;
      if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
    }
 }

void writeCASQ( void * in, CASQueueStruc* Q )
 {
   int32 tries   = 0;
      //TODO: need to make Q volatile?  Want to do this Q in assembly!
      //Have no idea what GCC's going to do to this code
   void **startOfData = Q->startOfData;
   void **endOfData   = Q->endOfData;

   int32 gotLock = FALSE;

   while( TRUE )
    {    //this intrinsic returns true if the lock held "UNLOCKED", in which
         // case it now holds "LOCKED" -- if it already held "LOCKED", then
         // gotLock is FALSE
      gotLock =
         __sync_bool_compare_and_swap( &(Q->insertLock), UNLOCKED, LOCKED );
      if( gotLock )
       {
         void **insertPos  = (void **)Q->insertPos;
         void **extractPos = (void **)Q->extractPos;

            //check if room to insert.. can't use a count variable
            // 'cause both insertor Thd and extractor Thd would write it
         if( extractPos - insertPos != 1 &&
             !(insertPos == endOfData && extractPos == startOfData))
          { *(Q->insertPos) = in;   //insert before move
            if( insertPos == endOfData )
             { Q->insertPos = startOfData;
             }
            else
             { Q->insertPos++;
             }
            Q->insertLock = UNLOCKED;
            return;
          }
         else //Q is full
          { Q->insertLock = UNLOCKED;//full, so release lock for others
          }
       }
      tries++;
      if( tries > SPINLOCK_TRIES ) pthread_yield(); //not reliable
    }
 }

#endif  //_GNU_SOURCE


//===========================================================================
//Single reader single writer super fast Q.. no atomic instrs..


/*This is a blocking queue, but it uses no atomic instructions, just does
 * yield() when empty or full
 *
 *It doesn't need any atomic instructions because only a single thread
 * extracts and only a single thread inserts, and it has no locations that
 * are written by both.  It writes before moving and moves before reading,
 * and never lets write position and read position be the same, so dis-
 * synchrony can only ever cause an unnecessary call to yield(), never a
 * wrong value (by monotonicity of movement of pointers, plus single writer
 * to pointers, plus sequence of write before change pointer, plus
 * assumptions that if thread A semantically writes X before Y, then thread
 * B will see the writes in that order.)
 */

SRSWQueueStruc* makeSRSWQ()
 {
   SRSWQueueStruc* retQ;
   retQ = (SRSWQueueStruc *) PR__malloc( sizeof( SRSWQueueStruc ) );
   memset( retQ->startOfData, 0, 1024 * sizeof(void *) );
   
   retQ->extractPos = &(retQ->startOfData[0]); //side by side == empty
   retQ->insertPos  = &(retQ->startOfData[1]); // so start pos's have to be
   retQ->endOfData  = &(retQ->startOfData[1023]);
   
   return retQ;
 }

void
freeSRSWQ( SRSWQueueStruc* Q )
 {
   PR__free( Q );
 }

void* readSRSWQ( SRSWQueueStruc* Q )
 { void *out    = 0;
   int32 tries   = 0;

   while( TRUE )
    {
      if( Q->insertPos - Q->extractPos != 1 &&
          !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
       { if( Q->extractPos >= Q->endOfData ) Q->extractPos = Q->startOfData;
         else Q->extractPos++;           //move before read
         out = *(Q->extractPos);
         return out;
       }
         //Q is empty
      tries++;
      if( tries > SPINLOCK_TRIES ) pthread_yield();
    }
 }


void* readSRSWQ_NonBlocking( SRSWQueueStruc* Q )
 { void *out    = 0;
   int32 tries   = 0;

   while( TRUE )
    {
      if( Q->insertPos - Q->extractPos != 1 &&
          !(Q->extractPos == Q->endOfData && Q->insertPos == Q->startOfData))
       { Q->extractPos++;           //move before read
         if( Q->extractPos > Q->endOfData ) Q->extractPos = Q->startOfData;
         out = *(Q->extractPos);
         return out;
       }
         //Q is empty
      tries++;
      if( tries > 10 ) return NULL; //long enough for writer to finish
    }
 }


void writeSRSWQ( void * in, SRSWQueueStruc* Q )
 {
   int32 tries   = 0;
   
   while( TRUE )
    {
      if( Q->extractPos - Q->insertPos != 1 &&
          !(Q->insertPos == Q->endOfData && Q->extractPos == Q->startOfData))
       { *(Q->insertPos) = in;   //insert before move
         if( Q->insertPos >= Q->endOfData ) Q->insertPos = Q->startOfData;
         else Q->insertPos++;
         return;
       }
         //Q is full
      tries++;
      if( tries > SPINLOCK_TRIES ) pthread_yield();
    }
 }



//===========================================================================
//Single reader Multiple writer super fast Q.. no atomic instrs..


/*This is a blocking queue, but it uses no atomic instructions, just does
 * yield() when empty or full
 *
 *It doesn't need any atomic instructions because only a single thread
 * extracts and only a single thread inserts, and it has no locations that
 * are written by both.  It writes before moving and moves before reading,
 * and never lets write position and read position be the same, so dis-
 * synchrony can only ever cause an unnecessary call to yield(), never a
 * wrong value (by monotonicity of movement of pointers, plus single writer
 * to pointers, plus sequence of write before change pointer, plus
 * assumptions that if thread A semantically writes X before Y, then thread
 * B will see the writes in that order.)
 *
 *The multi-writer version is implemented as a hierarchy.  Each writer has
 * its own single-reader single-writer queue.  The reader simply does a
 * round-robin harvesting from them.
 *
 *A writer must first register itself with the queue, and receives an ID back
 * It then uses that ID on each write operation.
 *
 *The implementation is:
 *Physically:
 * -] the SRMWQueueStruc holds an array of SRSWQueueStruc s
 * -] it also has read-pointer to the last queue a write was taken from.
 *
 *Action-Patterns:
 * -] To add a writer
 * --]] writer-thread calls addWriterToQ(), remember the ID it returns
 * --]] internally addWriterToQ does:
 * ---]]] if needs more room, makes a larger writer-array
 * ---]]] copies the old writer-array into the new
 * ---]]] makes a new SRSW queue an puts it into the array
 * ---]]] returns the index to the new SRSW queue as the ID
 * -] To write
 * --]] writer thread calls writeSRMWQ, passing the Q struc and its writer-ID
 * --]] this call may block, via repeated yield() calls
 * --]] internally, writeSRMWQ does:
 * ---]]] uses the writerID as index to get the SRSW queue for that writer
 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
 * -] To Read
 * --]] reader calls readSRMWQ, passing the Q struc
 * --]] this call may block, via repeated yield() calls
 * --]] internally, readSRMWQ does:
 * ---]]] gets saved index of last SRSW queue read from
 * ---]]] increments index and gets indexed queue
 * ---]]] does a non-blocking read of that queue
 * ---]]] if gets something, saves index and returns that value
 * ---]]] if gets null, then goes to next queue
 * ---]]] if got null from all the queues then does yield() then tries again
 *
 *Note: "0" is used as the value null, so SRSW queues must only contain
 * pointers, and cannot use 0 as a valid pointer value.
 *
 */

SRMWQueueStruc* makeSRMWQ()
 { SRMWQueueStruc* retQ;
   
   retQ = (SRMWQueueStruc *) PR__malloc( sizeof( SRMWQueueStruc ) );
   
   retQ->numInternalQs = 0;
   retQ->internalQsSz  = 10;
   retQ->internalQs = PR__malloc( retQ->internalQsSz * sizeof(SRSWQueueStruc *));
   
   retQ->lastQReadFrom = 0;
   
   return retQ;
 }

/* ---]]] if needs more room, makes a larger writer-array
 * ---]]] copies the old writer-array into the new
 * ---]]] makes a new SRSW queue an puts it into the array
 * ---]]] returns the index to the new SRSW queue as the ID
 *
 *NOTE: assuming all adds are completed before any writes or reads are
 * performed..  otherwise, this needs to be re-done carefully, probably with
 * a lock.
 */
int addWriterToSRMWQ( SRMWQueueStruc* Q )
 { int oldSz, i;
   SRSWQueueStruc * *oldArray;
   
   (Q->numInternalQs)++;
   if( Q->numInternalQs >= Q->internalQsSz )
    { //full, so make bigger
      oldSz            = Q->internalQsSz;
      oldArray         = Q->internalQs;
      Q->internalQsSz *= 2;
      Q->internalQs    = PR__malloc( Q->internalQsSz * sizeof(SRSWQueueStruc *));
      for( i = 0; i < oldSz; i++ )
       { Q->internalQs[i] = oldArray[i];
       }
      PR__free( oldArray );
    }
   Q->internalQs[ Q->numInternalQs - 1 ] = makeSRSWQ();
   return Q->numInternalQs - 1;
 }


/* ---]]] gets saved index of last SRSW queue read-from
 * ---]]] increments index and gets indexed queue
 * ---]]] does a non-blocking read of that queue
 * ---]]] if gets something, saves index and returns that value
 * ---]]] if gets null, then goes to next queue
 * ---]]] if got null from all the queues then does yield() then tries again
 */
void* readSRMWQ( SRMWQueueStruc* Q )
 { SRSWQueueStruc *readQ;
   void *readValue   = 0;
   int32 tries       = 0;
   int32 QToReadFrom = 0;
   
   QToReadFrom = Q->lastQReadFrom;
   
   while( TRUE )
    { QToReadFrom++;
      if( QToReadFrom >= Q->numInternalQs ) QToReadFrom = 0;
      readQ = Q->internalQs[ QToReadFrom ];
      readValue = readSRSWQ_NonBlocking( readQ );
      
      if( readValue != 0 ) //got a value, return it
       { Q->lastQReadFrom = QToReadFrom;
         return readValue;
       }
      else  //SRSW Q just read is empty
       {    //check if all queues have been tried
         if( QToReadFrom == Q->lastQReadFrom ) //all the queues tried & empty
          { tries++; //give a writer a chance to finish before yield
            if( tries > SPINLOCK_TRIES ) pthread_yield();
          }
       }
    }
 }


/* 
 * ---]]] uses the writerID as index to get the SRSW queue for that writer
 * ---]]] performs writeQ on that queue (may block via repeated yield calls)
 */
void writeSRMWQ( void * in, SRMWQueueStruc* Q, int writerID )
 { 
   if( in == 0 ) printf( "error, wrote 0 to SRMW Q" );//TODO: throw an error
   
   writeSRSWQ( in, Q->internalQs[ writerID ] );
 }
