# HG changeset patch # User Merten Sach # Date 1331637948 -3600 # Node ID 29b273cf3b1ff248d4c4b8d7b8de04221eafba56 # Parent fdc2f264f3d66ddf9a14b3a2939cc12dda1040ff Benchmark that tests msg loss diff -r fdc2f264f3d6 -r 29b273cf3b1f main.c --- a/main.c Fri Feb 17 19:00:01 2012 +0100 +++ b/main.c Tue Mar 13 12:25:48 2012 +0100 @@ -11,6 +11,8 @@ #include #include "VMS_Implementations/Vthread_impl/VPThread.h" #include "C_Libraries/Queue_impl/PrivateQueue.h" +#include "C_Libraries/DynArray/DynArray.h" +#include "C_Libraries/BestEffortMessaging/LossyCom.h" #include #include @@ -19,7 +21,7 @@ #undef DEBUG //#define DEBUG -#define MEASURE_PERF +//#define MEASURE_PERF #if !defined(unix) && !defined(__unix__) #ifdef __MACH__ @@ -70,15 +72,18 @@ }; const char *usage = { - "Usage: malloc_test [options]\n" - " Spwans a number of threads and allocates memory.\n\n" + "Usage: msg_passing_test [options]\n" + " Starts threads equal to the number of cores and sends\n" + " messages to random receivers\n\n" "Options:\n" - " -t how many threads to use (default: 1). This is internaly multiplied by the number of cores.\n" - " -o repeat workload and sync operation times\n" - " -i size of workload, repeat times\n" + " -n This specifies the number of sends done by each thread.\n" " -h this help screen\n\n" }; +/*************************** + * Barrier Implementation + ***************************/ + struct barrier_t { int counter; @@ -122,7 +127,9 @@ } - +/************************** + * Worker Parameters + **************************/ typedef struct { struct barrier_t* barrier; uint64_t totalWorkCycles; @@ -131,21 +138,32 @@ uint64_t totalBadSyncCycles; uint64 numGoodSyncs; uint64 numGoodTasks; + uint64_t coreID; + lossyCom__endpoint_t* localEndpoint; + lossyCom__exchange_t* centralMsgExchange; + unsigned int receivedACKs; + unsigned int broadcasterStatus; + unsigned int terminate; } WorkerParams; - typedef struct { measurement_t *startExeCycles; measurement_t *endExeCycles; } BenchParams; +typedef struct +{ + lossyCom__endpointID_t receiverID; + lossyCom__msgBody_t msg; +} savedMsg_t; + //======================== Globals ========================= char __ProgrammName[] = "overhead_test"; char __DataSet[255]; -int outer_iters, inner_iters, num_threads; +int num_msg_to_send; size_t chunk_size = 0; int cycles_counter_fd[NUM_CORES]; @@ -153,6 +171,10 @@ WorkerParams *workerParamsArray; +// init random number + uint32_t seed1; + uint32_t seed2; + //======================== App Code ========================= /* * Workload @@ -169,34 +191,126 @@ } \ } while (0) //macro magic for scoping +extern inline uint32_t +randomNumber(uint32_t* seed1, uint32_t* seed2); + +#define BROADCAST BROADCAST_ID +#define BROADCAST_ACK BROADCAST_ID-1 +#define TERMINATE BROADCAST_ID-2 + +#define RECEIVING_BROADCAST 0 +#define BROADCASTING 1 +#define RECEIVING_ACK 2 + +/* + * Message Handler Function + */ +void msgHandler(lossyCom__endpointID_t senderID, lossyCom__msgBody_t msg, void* data) +{ + WorkerParams* threadData = (WorkerParams*)data; + lossyCom__endpoint_t* comEndpoint = threadData->localEndpoint; + lossyCom__endpointID_t receiverID; + + if(msg == BROADCAST_ID) //answer broadcast message + { + lossyCom__sendMsg(comEndpoint, senderID, BROADCAST_ACK); + return; + } + if(msg == (BROADCAST_ACK) && threadData->broadcasterStatus == RECEIVING_ACK) + { + threadData->receivedACKs++; + if(threadData->receivedACKs == NUM_CORES/2)//chose next broadcaster + { + do{ + receiverID = randomNumber(&seed1, &seed2) % NUM_CORES; + }while(receiverID == comEndpoint->endpointID); + + //send the receiverID to the receiver to notify him that he is next + lossyCom__sendMsg(comEndpoint, receiverID, receiverID); + threadData->broadcasterStatus = RECEIVING_BROADCAST; + } + return; + } + if(msg == TERMINATE) //termination message + { + printf("endpoint %d received termination request\n", comEndpoint->endpointID); + threadData->terminate = TRUE; + return; + } + // + threadData->broadcasterStatus = BROADCASTING; +} + +unsigned int global_broadcast_counter; double worker_TLF(void* _params, VirtProcr* animatingPr) { - int i,o; + unsigned int msgCounter; + unsigned int broadcaster; + uint32_t wait_iterations; WorkerParams* params = (WorkerParams*)_params; unsigned int totalWorkCycles = 0, totalBadCycles = 0; unsigned int totalSyncCycles = 0, totalBadSyncCycles = 0; unsigned int workspace1=0, numGoodSyncs = 0, numGoodTasks = 0; double workspace2=0.0; + + //core 0 always starts + params->broadcasterStatus = params->coreID==0?BROADCASTING:RECEIVING_BROADCAST; + + /* int32 privateMutex = VPThread__make_mutex(animatingPr); int cpuid = sched_getcpu(); measurement_t startWorkload, endWorkload, startWorkload2, endWorkload2; uint64 numCycles; - for(o=0; o < outer_iters; o++) - { + */ #ifdef MEASURE_PERF saveCyclesAndInstrs(cpuid,startWorkload.cycles); #endif + + //initialize endpoint for communication + lossyCom__endpoint_t comEndpoint; + params->localEndpoint = &comEndpoint; + lossyCom__initialize_endpoint(&comEndpoint, + params->centralMsgExchange, + params->coreID, + msgHandler, + params); + + lossyCom__endpointID_t receiverID; + msgCounter = 0; + while(msgCounter <= num_msg_to_send) + { + int i; - //workltask - for(i=0; i < inner_iters; i++) + if(params->broadcasterStatus == BROADCASTING) { - workspace1 += (workspace1 + 32)/2; - workspace2 += (workspace2 + 23.2)/1.4; + if(msgCounter == num_msg_to_send)//send termination msg + { + lossyCom__sendMsg(&comEndpoint,BROADCAST_ID, TERMINATE); + break; + }else{ //generate and send random message + params->receivedACKs = 0; + lossyCom__sendMsg(&comEndpoint, BROADCAST_ID, BROADCAST); + global_broadcast_counter++; + if(global_broadcast_counter % 1000 == 0){ + printf("broadcast count: %d\n", global_broadcast_counter); + } + params->broadcasterStatus = RECEIVING_ACK; //mark msg as send + msgCounter++; + } } + + //check if the benchmark should terminate + if(params->terminate) + break; + + //receive msg + lossyCom__receiveMsg(&comEndpoint); + } + #ifdef MEASURE_PERF saveCyclesAndInstrs(cpuid,endWorkload.cycles); @@ -206,27 +320,7 @@ else {totalBadCycles += numCycles; } #endif - //mutex access often causes switch to different Slave VP - VPThread__mutex_lock(privateMutex, animatingPr); - -/* - saveCyclesAndInstrs(cpuid,startWorkload2.cycles); - //Task - for(i=0; i < inner_iters; i++) - { - workspace1 += (workspace1 + 32)/2; - workspace2 += (workspace2 + 23.2)/1.4; - } - - saveCyclesAndInstrs(cpuid,endWorkload2.cycles); - numCycles = endWorkload2.cycles - startWorkload2.cycles; - //sanity check (400K is about 20K iters) - if( numCycles < 400000 ) {totalWorkCycles += numCycles; numGoodTasks++;} - else {totalBadCycles += numCycles; } - -*/ - VPThread__mutex_unlock(privateMutex, animatingPr); - } + barrier_wait(params->barrier, animatingPr); params->totalWorkCycles = totalWorkCycles; params->totalBadCycles = totalBadCycles; @@ -239,11 +333,6 @@ params->totalBadSyncCycles = 0; params->numGoodSyncs = VMS__give_num_plugin_animations(); */ - - - //Wait for all threads to end - barrier_wait(params->barrier, animatingPr); - //Shutdown worker VPThread__dissipate_thread(animatingPr); @@ -255,19 +344,30 @@ /* this is run after the VMS is set up*/ void benchmark(void *_params, VirtProcr *animatingPr) { - int i, cpuID; - struct barrier_t barr; + int i, cpuID, idx; + struct barrier_t barr; BenchParams *params; params = (BenchParams *)_params; - - barrier_init(&barr, num_threads+1, animatingPr); - + + barrier_init(&barr, NUM_CORES+1, animatingPr); + + //Init central communication exchange + lossyCom__exchange_t* centralMsgExchange = lossyCom__initialize(NUM_CORES); + //prepare input - for(i=0; i=0; i--) + { + VPThread__create_thread_with_affinity((VirtProcrFnPtr)worker_TLF, + &(workerParamsArray[i]), + animatingPr, + i);//schedule to core i } - //wait for all threads to finish - barrier_wait(&barr, animatingPr); #ifdef MEASURE_PERF //endBarrierCycles read in barrier_wait()! Merten, email me if want to chg params->endExeCycles->cycles = barr.endBarrierCycles.cycles; #endif + barrier_wait(&barr, animatingPr); + printf("Total broadcast count: %d\n", global_broadcast_counter); + + //print send msgs + /* + printf("sendMsgs = []\n"); + for(i = 0; inumInArray; idx++) + { + printf("(%lu, %lu),", + (uint64_t)(workerParamsArray[i].ptrToArrayOfSendMsgs[idx]) & 0xFFFFFFFF, + ((uint64_t)(workerParamsArray[i].ptrToArrayOfSendMsgs[idx]) >> 32 ) & 0xFFFFFFFF); + } + printf("])\n"); + } + + + //print received msgs + printf("receivedMsgs = []\n"); + for(i = 0; inumInArray; idx++) + { + printf("(%lu, %lu),", + (uint64_t)(workerParamsArray[i].ptrToArrayOfReceivedMsgs[idx]) & 0xFFFFFFFF, + ((uint64_t)(workerParamsArray[i].ptrToArrayOfReceivedMsgs[idx]) >> 32 ) & 0xFFFFFFFF); + } + printf("])\n"); + }*/ /* uint64_t overallWorkCycles = 0; @@ -321,39 +455,22 @@ { switch(argv[i][1]) { - case 't': + case 'n': if(!isdigit(argv[++i][0])) { - fprintf(stderr, "-t must be followed by the number of worker threads to spawn\n"); + fprintf(stderr, "-t must be followed by the number messages to send per core\n"); return EXIT_FAILURE; } - num_threads = atoi(argv[i]); - if(!num_threads) + num_msg_to_send = atoi(argv[i]); + if(!num_msg_to_send) { - fprintf(stderr, "invalid number of threads specified: %d\n", num_threads); + fprintf(stderr, "invalid number of messages to send: %d\n", num_msg_to_send); return EXIT_FAILURE; } break; - case 'o': - if(!isdigit(argv[++i][0])) - { - fputs("-i must be followed by a number\n", stderr); - return EXIT_FAILURE; - } - outer_iters = atoi(argv[i]); - break; - case 'i': - if(!isdigit(argv[++i][0])) - { - fputs("-o must be followed by a number (workload size)\n", stderr); - return EXIT_FAILURE; - } - inner_iters = atoi(argv[i]); - break; case 'h': fputs(usage, stdout); - return 0; - + return 0; default: fprintf(stderr, "unrecognized argument: %s\n", argv[i]); fputs(usage, stderr); @@ -362,9 +479,9 @@ }//if arg else { - fprintf(stderr, "unrecognized argument: %s\n", argv[i]); - fputs(usage, stderr); - return EXIT_FAILURE; + fprintf(stderr, "unrecognized argument: %s\n", argv[i]); + fputs(usage, stderr); + return EXIT_FAILURE; } }//for @@ -441,7 +558,7 @@ benchParams->startExeCycles = &startExeCycles; benchParams->endExeCycles = &endExeCycles; - workerParamsArray = (WorkerParams *)malloc( (num_threads + 1) * sizeof(WorkerParams) ); + workerParamsArray = (WorkerParams *)malloc( (NUM_CORES) * sizeof(WorkerParams) ); if(workerParamsArray == NULL ) printf("error mallocing worker params array\n"); @@ -474,7 +591,7 @@ printf("ExeCycles/WorkCycles Ratio %f\n", (double)totalExeCycles / (double)totalWorkCyclesAcrossCores); #else - printf("No measurement done!\n"); + printf("#No measurement done!\n"); #endif return 0; }