changeset 20:29b273cf3b1f

Benchmark that tests msg loss
author Merten Sach <msach@mailbox.tu-berlin.de>
date Tue, 13 Mar 2012 12:25:48 +0100
parents fdc2f264f3d6
children 08b37152b48d
files main.c
diffstat 1 files changed, 197 insertions(+), 80 deletions(-) [+]
line diff
     1.1 --- a/main.c	Fri Feb 17 19:00:01 2012 +0100
     1.2 +++ b/main.c	Tue Mar 13 12:25:48 2012 +0100
     1.3 @@ -11,6 +11,8 @@
     1.4  #include <unistd.h>
     1.5  #include "VMS_Implementations/Vthread_impl/VPThread.h"
     1.6  #include "C_Libraries/Queue_impl/PrivateQueue.h"
     1.7 +#include "C_Libraries/DynArray/DynArray.h"
     1.8 +#include "C_Libraries/BestEffortMessaging/LossyCom.h"
     1.9  
    1.10  #include <linux/perf_event.h>
    1.11  #include <linux/prctl.h>
    1.12 @@ -19,7 +21,7 @@
    1.13  #undef DEBUG
    1.14  //#define DEBUG
    1.15  
    1.16 -#define MEASURE_PERF
    1.17 +//#define MEASURE_PERF
    1.18  
    1.19  #if !defined(unix) && !defined(__unix__)
    1.20  #ifdef __MACH__
    1.21 @@ -70,15 +72,18 @@
    1.22  };
    1.23  
    1.24  const char *usage = {
    1.25 -	"Usage: malloc_test [options]\n"
    1.26 -	"  Spwans a number of threads and allocates memory.\n\n"
    1.27 +	"Usage: msg_passing_test [options]\n"
    1.28 +	"  Starts threads equal to the number of cores and sends\n"
    1.29 +        "  messages to random receivers\n\n"
    1.30  	"Options:\n"
    1.31 -	"  -t <num>   how many threads to use (default: 1). This is internaly multiplied by the number of cores.\n"
    1.32 -	"  -o <num>   repeat workload and sync operation <m> times\n"
    1.33 -        "  -i <num>   size of workload, repeat <n> times\n"     
    1.34 +	"  -n <num>   This specifies the number of sends done by each thread.\n"
    1.35  	"  -h         this help screen\n\n"
    1.36  };
    1.37  
    1.38 +/***************************
    1.39 + * Barrier Implementation
    1.40 + ***************************/
    1.41 +
    1.42  struct barrier_t
    1.43  {
    1.44      int counter;
    1.45 @@ -122,7 +127,9 @@
    1.46   }
    1.47  
    1.48  
    1.49 -
    1.50 +/**************************
    1.51 + * Worker Parameters
    1.52 + **************************/
    1.53  typedef struct
    1.54   { struct barrier_t* barrier;
    1.55     uint64_t  totalWorkCycles;
    1.56 @@ -131,21 +138,32 @@
    1.57     uint64_t  totalBadSyncCycles;
    1.58     uint64     numGoodSyncs;
    1.59     uint64     numGoodTasks;
    1.60 +   uint64_t   coreID;
    1.61 +   lossyCom__endpoint_t* localEndpoint;
    1.62 +   lossyCom__exchange_t* centralMsgExchange;
    1.63 +   unsigned int receivedACKs;
    1.64 +   unsigned int broadcasterStatus;
    1.65 +   unsigned int terminate;
    1.66   }
    1.67  WorkerParams;
    1.68  
    1.69 -
    1.70  typedef struct
    1.71   { measurement_t *startExeCycles;
    1.72     measurement_t *endExeCycles;
    1.73   }
    1.74  BenchParams;
    1.75  
    1.76 +typedef struct
    1.77 +{
    1.78 +    lossyCom__endpointID_t receiverID;
    1.79 +    lossyCom__msgBody_t msg;
    1.80 +} savedMsg_t;
    1.81 +
    1.82  //======================== Globals =========================
    1.83  char __ProgrammName[] = "overhead_test";
    1.84  char __DataSet[255];
    1.85  
    1.86 -int outer_iters, inner_iters, num_threads;
    1.87 +int num_msg_to_send;
    1.88  size_t chunk_size = 0;
    1.89  
    1.90  int cycles_counter_fd[NUM_CORES];
    1.91 @@ -153,6 +171,10 @@
    1.92  
    1.93  WorkerParams *workerParamsArray;
    1.94  
    1.95 +// init random number
    1.96 +   uint32_t seed1;
    1.97 +   uint32_t seed2;
    1.98 +
    1.99  //======================== App Code =========================
   1.100  /*
   1.101   * Workload
   1.102 @@ -169,34 +191,126 @@
   1.103     }                                                    \
   1.104  } while (0) //macro magic for scoping
   1.105  
   1.106 +extern inline uint32_t
   1.107 +randomNumber(uint32_t* seed1, uint32_t* seed2);
   1.108 +
   1.109 +#define BROADCAST BROADCAST_ID                  
   1.110 +#define BROADCAST_ACK BROADCAST_ID-1
   1.111 +#define TERMINATE BROADCAST_ID-2
   1.112 +
   1.113 +#define RECEIVING_BROADCAST 0
   1.114 +#define BROADCASTING 1
   1.115 +#define RECEIVING_ACK 2
   1.116 +
   1.117 +/*
   1.118 + * Message Handler Function
   1.119 + */
   1.120 +void msgHandler(lossyCom__endpointID_t senderID, lossyCom__msgBody_t msg, void* data)
   1.121 +{
   1.122 +    WorkerParams* threadData = (WorkerParams*)data;
   1.123 +    lossyCom__endpoint_t* comEndpoint = threadData->localEndpoint;
   1.124 +    lossyCom__endpointID_t receiverID;
   1.125 +    
   1.126 +    if(msg == BROADCAST_ID) //answer broadcast message
   1.127 +    {
   1.128 +        lossyCom__sendMsg(comEndpoint, senderID, BROADCAST_ACK);
   1.129 +        return;
   1.130 +    }
   1.131 +    if(msg == (BROADCAST_ACK) && threadData->broadcasterStatus == RECEIVING_ACK)
   1.132 +    {
   1.133 +        threadData->receivedACKs++;
   1.134 +        if(threadData->receivedACKs == NUM_CORES/2)//chose next broadcaster
   1.135 +        {
   1.136 +            do{
   1.137 +                receiverID = randomNumber(&seed1, &seed2) % NUM_CORES;
   1.138 +            }while(receiverID == comEndpoint->endpointID);
   1.139 +            
   1.140 +            //send the receiverID to the receiver to notify him that he is next
   1.141 +            lossyCom__sendMsg(comEndpoint, receiverID, receiverID);
   1.142 +            threadData->broadcasterStatus = RECEIVING_BROADCAST;
   1.143 +        }
   1.144 +        return;
   1.145 +    }
   1.146 +    if(msg == TERMINATE) //termination message
   1.147 +    {
   1.148 +        printf("endpoint %d received termination request\n", comEndpoint->endpointID);
   1.149 +        threadData->terminate = TRUE;
   1.150 +        return;
   1.151 +    }
   1.152 +    //
   1.153 +    threadData->broadcasterStatus = BROADCASTING;
   1.154 +}
   1.155 +
   1.156 +unsigned int global_broadcast_counter;
   1.157  
   1.158  double
   1.159  worker_TLF(void* _params, VirtProcr* animatingPr)
   1.160   {
   1.161 -   int i,o;
   1.162 +    unsigned int msgCounter;
   1.163 +    unsigned int broadcaster;
   1.164 +    uint32_t wait_iterations;
   1.165     WorkerParams* params = (WorkerParams*)_params;
   1.166     unsigned int totalWorkCycles = 0, totalBadCycles = 0;
   1.167     unsigned int totalSyncCycles = 0, totalBadSyncCycles = 0;
   1.168     unsigned int workspace1=0, numGoodSyncs = 0, numGoodTasks = 0;
   1.169     double workspace2=0.0;
   1.170 +   
   1.171 +   //core 0 always starts
   1.172 +   params->broadcasterStatus = params->coreID==0?BROADCASTING:RECEIVING_BROADCAST;
   1.173 +   
   1.174 +   /*
   1.175     int32 privateMutex = VPThread__make_mutex(animatingPr);
   1.176     
   1.177     int cpuid = sched_getcpu();
   1.178     
   1.179     measurement_t startWorkload, endWorkload, startWorkload2, endWorkload2;
   1.180     uint64 numCycles;
   1.181 -   for(o=0; o < outer_iters; o++)
   1.182 -    {
   1.183 +    */
   1.184  #ifdef MEASURE_PERF
   1.185            saveCyclesAndInstrs(cpuid,startWorkload.cycles);
   1.186  #endif
   1.187 +     
   1.188 +   //initialize endpoint for communication
   1.189 +   lossyCom__endpoint_t comEndpoint;
   1.190 +   params->localEndpoint = &comEndpoint;
   1.191 +   lossyCom__initialize_endpoint(&comEndpoint, 
   1.192 +                                 params->centralMsgExchange,
   1.193 +                                 params->coreID,
   1.194 +                                 msgHandler,
   1.195 +                                 params);
   1.196 +   
   1.197 +   lossyCom__endpointID_t receiverID;
   1.198 +   msgCounter = 0;
   1.199 +   while(msgCounter <= num_msg_to_send)
   1.200 +   {
   1.201 +       int i;
   1.202         
   1.203 -      //workltask
   1.204 -      for(i=0; i < inner_iters; i++)
   1.205 +       if(params->broadcasterStatus == BROADCASTING)
   1.206         {
   1.207 -         workspace1 += (workspace1 + 32)/2;
   1.208 -         workspace2 += (workspace2 + 23.2)/1.4;
   1.209 +           if(msgCounter == num_msg_to_send)//send termination msg
   1.210 +           {
   1.211 +                lossyCom__sendMsg(&comEndpoint,BROADCAST_ID, TERMINATE);
   1.212 +                break;
   1.213 +           }else{ //generate and send random message
   1.214 +                params->receivedACKs = 0;
   1.215 +                lossyCom__sendMsg(&comEndpoint, BROADCAST_ID, BROADCAST);
   1.216 +                global_broadcast_counter++;
   1.217 +                if(global_broadcast_counter % 1000 == 0){
   1.218 +                    printf("broadcast count: %d\n", global_broadcast_counter);
   1.219 +                } 
   1.220 +                params->broadcasterStatus = RECEIVING_ACK; //mark msg as send
   1.221 +                msgCounter++;
   1.222 +           }
   1.223         }
   1.224 +
   1.225 +       //check if the benchmark should terminate
   1.226 +       if(params->terminate)
   1.227 +           break;
   1.228 +       
   1.229 +       //receive msg
   1.230 +       lossyCom__receiveMsg(&comEndpoint);
   1.231 +   }
   1.232 +     
   1.233    
   1.234  #ifdef MEASURE_PERF
   1.235            saveCyclesAndInstrs(cpuid,endWorkload.cycles);
   1.236 @@ -206,27 +320,7 @@
   1.237            else                     {totalBadCycles  += numCycles; }
   1.238  #endif
   1.239  
   1.240 -      //mutex access often causes switch to different Slave VP
   1.241 -      VPThread__mutex_lock(privateMutex, animatingPr);
   1.242 -      
   1.243 -/*
   1.244 -          saveCyclesAndInstrs(cpuid,startWorkload2.cycles);
   1.245 -      //Task
   1.246 -      for(i=0; i < inner_iters; i++)
   1.247 -       {
   1.248 -         workspace1 += (workspace1 + 32)/2;
   1.249 -         workspace2 += (workspace2 + 23.2)/1.4;
   1.250 -       }
   1.251 -      
   1.252 -          saveCyclesAndInstrs(cpuid,endWorkload2.cycles);
   1.253 -          numCycles = endWorkload2.cycles - startWorkload2.cycles;
   1.254 -          //sanity check (400K is about 20K iters)
   1.255 -          if( numCycles < 400000 ) {totalWorkCycles += numCycles; numGoodTasks++;}
   1.256 -          else                     {totalBadCycles  += numCycles; }
   1.257 -      
   1.258 -*/
   1.259 -      VPThread__mutex_unlock(privateMutex, animatingPr);
   1.260 -    }
   1.261 +   barrier_wait(params->barrier, animatingPr);
   1.262  
   1.263     params->totalWorkCycles = totalWorkCycles;
   1.264     params->totalBadCycles = totalBadCycles;
   1.265 @@ -239,11 +333,6 @@
   1.266     params->totalBadSyncCycles = 0;
   1.267     params->numGoodSyncs = VMS__give_num_plugin_animations();
   1.268  */
   1.269 -   
   1.270 -   
   1.271 -   //Wait for all threads to end
   1.272 -   barrier_wait(params->barrier, animatingPr);
   1.273 -   
   1.274     //Shutdown worker
   1.275     VPThread__dissipate_thread(animatingPr);
   1.276     
   1.277 @@ -255,19 +344,30 @@
   1.278  /* this is run after the VMS is set up*/
   1.279  void benchmark(void *_params, VirtProcr *animatingPr)
   1.280   {
   1.281 -   int i, cpuID;
   1.282 -   struct barrier_t  barr;
   1.283 +   int i, cpuID, idx;
   1.284 +   struct barrier_t barr;
   1.285     BenchParams      *params;
   1.286     
   1.287     params = (BenchParams *)_params;
   1.288 -
   1.289 -   barrier_init(&barr, num_threads+1, animatingPr);
   1.290 -      
   1.291 +   
   1.292 +   barrier_init(&barr, NUM_CORES+1, animatingPr);
   1.293 +   
   1.294 +   //Init central communication exchange
   1.295 +   lossyCom__exchange_t* centralMsgExchange = lossyCom__initialize(NUM_CORES);
   1.296 +   
   1.297     //prepare input
   1.298 -   for(i=0; i<num_threads; i++)
   1.299 +   for(i=0; i<NUM_CORES; i++)
   1.300      { 
   1.301         workerParamsArray[i].barrier = &barr;
   1.302 +       workerParamsArray[i].coreID = i;
   1.303 +       workerParamsArray[i].centralMsgExchange = centralMsgExchange;
   1.304 +       workerParamsArray[i].terminate = FALSE;
   1.305      }
   1.306 +   global_broadcast_counter = 0;
   1.307 +   
   1.308 +   // init random number generator for wait and msg content
   1.309 +   seed1 = rand()%1000;
   1.310 +   seed2 = rand()%1000;
   1.311       
   1.312     //save cycles before execution of threads, to get total exe cycles
   1.313     measurement_t *startExeCycles, *endExeCycles;
   1.314 @@ -280,17 +380,51 @@
   1.315  #endif
   1.316     
   1.317     //create (which starts running) all threads
   1.318 -   for(i=0; i<num_threads; i++)
   1.319 -    { VPThread__create_thread((VirtProcrFnPtr)worker_TLF, &(workerParamsArray[i]), animatingPr);
   1.320 +   for(i=NUM_CORES-1; i>=0; i--)
   1.321 +    { 
   1.322 +       VPThread__create_thread_with_affinity((VirtProcrFnPtr)worker_TLF,
   1.323 +                                             &(workerParamsArray[i]),
   1.324 +                                             animatingPr,
   1.325 +                                             i);//schedule to core i
   1.326      }
   1.327 -   //wait for all threads to finish
   1.328 -   barrier_wait(&barr, animatingPr);
   1.329    
   1.330  #ifdef MEASURE_PERF
   1.331     //endBarrierCycles read in barrier_wait()!  Merten, email me if want to chg
   1.332     params->endExeCycles->cycles = barr.endBarrierCycles.cycles;
   1.333  #endif
   1.334     
   1.335 +   barrier_wait(&barr, animatingPr);
   1.336 +   printf("Total broadcast count: %d\n", global_broadcast_counter);
   1.337 +   
   1.338 +   //print send msgs
   1.339 +   /*
   1.340 +   printf("sendMsgs = []\n");
   1.341 +   for(i = 0; i<NUM_CORES; i++)
   1.342 +   {
   1.343 +       printf("sendMsgs.append([");
   1.344 +       for(idx = 0; idx< workerParamsArray[i].sendMsgs->numInArray; idx++)
   1.345 +       {
   1.346 +           printf("(%lu, %lu),", 
   1.347 +                   (uint64_t)(workerParamsArray[i].ptrToArrayOfSendMsgs[idx]) & 0xFFFFFFFF,
   1.348 +                   ((uint64_t)(workerParamsArray[i].ptrToArrayOfSendMsgs[idx]) >> 32 ) & 0xFFFFFFFF);
   1.349 +       }
   1.350 +        printf("])\n");
   1.351 +   }
   1.352 +   
   1.353 +   
   1.354 +   //print received msgs
   1.355 +   printf("receivedMsgs = []\n");
   1.356 +   for(i = 0; i<NUM_CORES; i++)
   1.357 +   {
   1.358 +       printf("receivedMsgs.append([");
   1.359 +       for(idx = 0; idx< workerParamsArray[i].receivedMsgs->numInArray; idx++)
   1.360 +       {
   1.361 +           printf("(%lu, %lu),", 
   1.362 +                   (uint64_t)(workerParamsArray[i].ptrToArrayOfReceivedMsgs[idx]) & 0xFFFFFFFF,
   1.363 +                   ((uint64_t)(workerParamsArray[i].ptrToArrayOfReceivedMsgs[idx]) >> 32 ) & 0xFFFFFFFF);
   1.364 +       }
   1.365 +       printf("])\n");
   1.366 +   }*/
   1.367  
   1.368  /*
   1.369     uint64_t overallWorkCycles = 0;
   1.370 @@ -321,39 +455,22 @@
   1.371         {
   1.372           switch(argv[i][1])
   1.373            {
   1.374 -            case 't':
   1.375 +            case 'n':
   1.376                 if(!isdigit(argv[++i][0]))
   1.377                  {
   1.378 -                  fprintf(stderr, "-t must be followed by the number of worker threads to spawn\n");
   1.379 +                  fprintf(stderr, "-t must be followed by the number messages to send per core\n");
   1.380                    return EXIT_FAILURE;
   1.381                  }
   1.382 -               num_threads = atoi(argv[i]);
   1.383 -               if(!num_threads)
   1.384 +               num_msg_to_send = atoi(argv[i]);
   1.385 +               if(!num_msg_to_send)
   1.386                  {
   1.387 -                  fprintf(stderr, "invalid number of threads specified: %d\n", num_threads);
   1.388 +                  fprintf(stderr, "invalid number of messages to send: %d\n", num_msg_to_send);
   1.389                    return EXIT_FAILURE;
   1.390                  }
   1.391              break;
   1.392 -            case 'o':
   1.393 -               if(!isdigit(argv[++i][0]))
   1.394 -                {
   1.395 -                  fputs("-i must be followed by a number\n", stderr);
   1.396 -                  return EXIT_FAILURE;
   1.397 -                }
   1.398 -               outer_iters = atoi(argv[i]);
   1.399 -				break;
   1.400 -            case 'i':
   1.401 -               if(!isdigit(argv[++i][0]))
   1.402 -                {
   1.403 -                  fputs("-o must be followed by a number (workload size)\n", stderr);
   1.404 -                  return EXIT_FAILURE;
   1.405 -                }
   1.406 -               inner_iters = atoi(argv[i]);
   1.407 -				break;
   1.408              case 'h':
   1.409                 fputs(usage, stdout);
   1.410 -               return 0;
   1.411 -				
   1.412 +               return 0;		
   1.413              default:
   1.414                 fprintf(stderr, "unrecognized argument: %s\n", argv[i]);
   1.415                 fputs(usage, stderr);
   1.416 @@ -362,9 +479,9 @@
   1.417         }//if arg
   1.418        else
   1.419         {
   1.420 -			fprintf(stderr, "unrecognized argument: %s\n", argv[i]);
   1.421 -			fputs(usage, stderr);
   1.422 -			return EXIT_FAILURE;
   1.423 +		fprintf(stderr, "unrecognized argument: %s\n", argv[i]);
   1.424 +		fputs(usage, stderr);
   1.425 +		return EXIT_FAILURE;
   1.426         }
   1.427      }//for
   1.428     
   1.429 @@ -441,7 +558,7 @@
   1.430     benchParams->startExeCycles = &startExeCycles;
   1.431     benchParams->endExeCycles   = &endExeCycles;
   1.432     
   1.433 -   workerParamsArray =  (WorkerParams *)malloc( (num_threads + 1) * sizeof(WorkerParams) );
   1.434 +   workerParamsArray =  (WorkerParams *)malloc( (NUM_CORES) * sizeof(WorkerParams) );
   1.435     if(workerParamsArray == NULL ) printf("error mallocing worker params array\n");
   1.436     
   1.437   
   1.438 @@ -474,7 +591,7 @@
   1.439     printf("ExeCycles/WorkCycles Ratio %f\n", 
   1.440            (double)totalExeCycles / (double)totalWorkCyclesAcrossCores);
   1.441  #else
   1.442 -   printf("No measurement done!\n");
   1.443 +   printf("#No measurement done!\n");
   1.444  #endif
   1.445     return 0;
   1.446   }