Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > Vthread > Vthread__Best_Effort_Msg__Bench
changeset 20:29b273cf3b1f
Benchmark that tests msg loss
| author | Merten Sach <msach@mailbox.tu-berlin.de> |
|---|---|
| date | Tue, 13 Mar 2012 12:25:48 +0100 |
| parents | fdc2f264f3d6 |
| children | 08b37152b48d |
| files | main.c |
| diffstat | 1 files changed, 197 insertions(+), 80 deletions(-) [+] |
line diff
1.1 --- a/main.c Fri Feb 17 19:00:01 2012 +0100 1.2 +++ b/main.c Tue Mar 13 12:25:48 2012 +0100 1.3 @@ -11,6 +11,8 @@ 1.4 #include <unistd.h> 1.5 #include "VMS_Implementations/Vthread_impl/VPThread.h" 1.6 #include "C_Libraries/Queue_impl/PrivateQueue.h" 1.7 +#include "C_Libraries/DynArray/DynArray.h" 1.8 +#include "C_Libraries/BestEffortMessaging/LossyCom.h" 1.9 1.10 #include <linux/perf_event.h> 1.11 #include <linux/prctl.h> 1.12 @@ -19,7 +21,7 @@ 1.13 #undef DEBUG 1.14 //#define DEBUG 1.15 1.16 -#define MEASURE_PERF 1.17 +//#define MEASURE_PERF 1.18 1.19 #if !defined(unix) && !defined(__unix__) 1.20 #ifdef __MACH__ 1.21 @@ -70,15 +72,18 @@ 1.22 }; 1.23 1.24 const char *usage = { 1.25 - "Usage: malloc_test [options]\n" 1.26 - " Spwans a number of threads and allocates memory.\n\n" 1.27 + "Usage: msg_passing_test [options]\n" 1.28 + " Starts threads equal to the number of cores and sends\n" 1.29 + " messages to random receivers\n\n" 1.30 "Options:\n" 1.31 - " -t <num> how many threads to use (default: 1). This is internaly multiplied by the number of cores.\n" 1.32 - " -o <num> repeat workload and sync operation <m> times\n" 1.33 - " -i <num> size of workload, repeat <n> times\n" 1.34 + " -n <num> This specifies the number of sends done by each thread.\n" 1.35 " -h this help screen\n\n" 1.36 }; 1.37 1.38 +/*************************** 1.39 + * Barrier Implementation 1.40 + ***************************/ 1.41 + 1.42 struct barrier_t 1.43 { 1.44 int counter; 1.45 @@ -122,7 +127,9 @@ 1.46 } 1.47 1.48 1.49 - 1.50 +/************************** 1.51 + * Worker Parameters 1.52 + **************************/ 1.53 typedef struct 1.54 { struct barrier_t* barrier; 1.55 uint64_t totalWorkCycles; 1.56 @@ -131,21 +138,32 @@ 1.57 uint64_t totalBadSyncCycles; 1.58 uint64 numGoodSyncs; 1.59 uint64 numGoodTasks; 1.60 + uint64_t coreID; 1.61 + lossyCom__endpoint_t* localEndpoint; 1.62 + lossyCom__exchange_t* centralMsgExchange; 1.63 + unsigned int receivedACKs; 1.64 + unsigned int broadcasterStatus; 1.65 + unsigned int terminate; 1.66 } 1.67 WorkerParams; 1.68 1.69 - 1.70 typedef struct 1.71 { measurement_t *startExeCycles; 1.72 measurement_t *endExeCycles; 1.73 } 1.74 BenchParams; 1.75 1.76 +typedef struct 1.77 +{ 1.78 + lossyCom__endpointID_t receiverID; 1.79 + lossyCom__msgBody_t msg; 1.80 +} savedMsg_t; 1.81 + 1.82 //======================== Globals ========================= 1.83 char __ProgrammName[] = "overhead_test"; 1.84 char __DataSet[255]; 1.85 1.86 -int outer_iters, inner_iters, num_threads; 1.87 +int num_msg_to_send; 1.88 size_t chunk_size = 0; 1.89 1.90 int cycles_counter_fd[NUM_CORES]; 1.91 @@ -153,6 +171,10 @@ 1.92 1.93 WorkerParams *workerParamsArray; 1.94 1.95 +// init random number 1.96 + uint32_t seed1; 1.97 + uint32_t seed2; 1.98 + 1.99 //======================== App Code ========================= 1.100 /* 1.101 * Workload 1.102 @@ -169,34 +191,126 @@ 1.103 } \ 1.104 } while (0) //macro magic for scoping 1.105 1.106 +extern inline uint32_t 1.107 +randomNumber(uint32_t* seed1, uint32_t* seed2); 1.108 + 1.109 +#define BROADCAST BROADCAST_ID 1.110 +#define BROADCAST_ACK BROADCAST_ID-1 1.111 +#define TERMINATE BROADCAST_ID-2 1.112 + 1.113 +#define RECEIVING_BROADCAST 0 1.114 +#define BROADCASTING 1 1.115 +#define RECEIVING_ACK 2 1.116 + 1.117 +/* 1.118 + * Message Handler Function 1.119 + */ 1.120 +void msgHandler(lossyCom__endpointID_t senderID, lossyCom__msgBody_t msg, void* data) 1.121 +{ 1.122 + WorkerParams* threadData = (WorkerParams*)data; 1.123 + lossyCom__endpoint_t* comEndpoint = threadData->localEndpoint; 1.124 + lossyCom__endpointID_t receiverID; 1.125 + 1.126 + if(msg == BROADCAST_ID) //answer broadcast message 1.127 + { 1.128 + lossyCom__sendMsg(comEndpoint, senderID, BROADCAST_ACK); 1.129 + return; 1.130 + } 1.131 + if(msg == (BROADCAST_ACK) && threadData->broadcasterStatus == RECEIVING_ACK) 1.132 + { 1.133 + threadData->receivedACKs++; 1.134 + if(threadData->receivedACKs == NUM_CORES/2)//chose next broadcaster 1.135 + { 1.136 + do{ 1.137 + receiverID = randomNumber(&seed1, &seed2) % NUM_CORES; 1.138 + }while(receiverID == comEndpoint->endpointID); 1.139 + 1.140 + //send the receiverID to the receiver to notify him that he is next 1.141 + lossyCom__sendMsg(comEndpoint, receiverID, receiverID); 1.142 + threadData->broadcasterStatus = RECEIVING_BROADCAST; 1.143 + } 1.144 + return; 1.145 + } 1.146 + if(msg == TERMINATE) //termination message 1.147 + { 1.148 + printf("endpoint %d received termination request\n", comEndpoint->endpointID); 1.149 + threadData->terminate = TRUE; 1.150 + return; 1.151 + } 1.152 + // 1.153 + threadData->broadcasterStatus = BROADCASTING; 1.154 +} 1.155 + 1.156 +unsigned int global_broadcast_counter; 1.157 1.158 double 1.159 worker_TLF(void* _params, VirtProcr* animatingPr) 1.160 { 1.161 - int i,o; 1.162 + unsigned int msgCounter; 1.163 + unsigned int broadcaster; 1.164 + uint32_t wait_iterations; 1.165 WorkerParams* params = (WorkerParams*)_params; 1.166 unsigned int totalWorkCycles = 0, totalBadCycles = 0; 1.167 unsigned int totalSyncCycles = 0, totalBadSyncCycles = 0; 1.168 unsigned int workspace1=0, numGoodSyncs = 0, numGoodTasks = 0; 1.169 double workspace2=0.0; 1.170 + 1.171 + //core 0 always starts 1.172 + params->broadcasterStatus = params->coreID==0?BROADCASTING:RECEIVING_BROADCAST; 1.173 + 1.174 + /* 1.175 int32 privateMutex = VPThread__make_mutex(animatingPr); 1.176 1.177 int cpuid = sched_getcpu(); 1.178 1.179 measurement_t startWorkload, endWorkload, startWorkload2, endWorkload2; 1.180 uint64 numCycles; 1.181 - for(o=0; o < outer_iters; o++) 1.182 - { 1.183 + */ 1.184 #ifdef MEASURE_PERF 1.185 saveCyclesAndInstrs(cpuid,startWorkload.cycles); 1.186 #endif 1.187 + 1.188 + //initialize endpoint for communication 1.189 + lossyCom__endpoint_t comEndpoint; 1.190 + params->localEndpoint = &comEndpoint; 1.191 + lossyCom__initialize_endpoint(&comEndpoint, 1.192 + params->centralMsgExchange, 1.193 + params->coreID, 1.194 + msgHandler, 1.195 + params); 1.196 + 1.197 + lossyCom__endpointID_t receiverID; 1.198 + msgCounter = 0; 1.199 + while(msgCounter <= num_msg_to_send) 1.200 + { 1.201 + int i; 1.202 1.203 - //workltask 1.204 - for(i=0; i < inner_iters; i++) 1.205 + if(params->broadcasterStatus == BROADCASTING) 1.206 { 1.207 - workspace1 += (workspace1 + 32)/2; 1.208 - workspace2 += (workspace2 + 23.2)/1.4; 1.209 + if(msgCounter == num_msg_to_send)//send termination msg 1.210 + { 1.211 + lossyCom__sendMsg(&comEndpoint,BROADCAST_ID, TERMINATE); 1.212 + break; 1.213 + }else{ //generate and send random message 1.214 + params->receivedACKs = 0; 1.215 + lossyCom__sendMsg(&comEndpoint, BROADCAST_ID, BROADCAST); 1.216 + global_broadcast_counter++; 1.217 + if(global_broadcast_counter % 1000 == 0){ 1.218 + printf("broadcast count: %d\n", global_broadcast_counter); 1.219 + } 1.220 + params->broadcasterStatus = RECEIVING_ACK; //mark msg as send 1.221 + msgCounter++; 1.222 + } 1.223 } 1.224 + 1.225 + //check if the benchmark should terminate 1.226 + if(params->terminate) 1.227 + break; 1.228 + 1.229 + //receive msg 1.230 + lossyCom__receiveMsg(&comEndpoint); 1.231 + } 1.232 + 1.233 1.234 #ifdef MEASURE_PERF 1.235 saveCyclesAndInstrs(cpuid,endWorkload.cycles); 1.236 @@ -206,27 +320,7 @@ 1.237 else {totalBadCycles += numCycles; } 1.238 #endif 1.239 1.240 - //mutex access often causes switch to different Slave VP 1.241 - VPThread__mutex_lock(privateMutex, animatingPr); 1.242 - 1.243 -/* 1.244 - saveCyclesAndInstrs(cpuid,startWorkload2.cycles); 1.245 - //Task 1.246 - for(i=0; i < inner_iters; i++) 1.247 - { 1.248 - workspace1 += (workspace1 + 32)/2; 1.249 - workspace2 += (workspace2 + 23.2)/1.4; 1.250 - } 1.251 - 1.252 - saveCyclesAndInstrs(cpuid,endWorkload2.cycles); 1.253 - numCycles = endWorkload2.cycles - startWorkload2.cycles; 1.254 - //sanity check (400K is about 20K iters) 1.255 - if( numCycles < 400000 ) {totalWorkCycles += numCycles; numGoodTasks++;} 1.256 - else {totalBadCycles += numCycles; } 1.257 - 1.258 -*/ 1.259 - VPThread__mutex_unlock(privateMutex, animatingPr); 1.260 - } 1.261 + barrier_wait(params->barrier, animatingPr); 1.262 1.263 params->totalWorkCycles = totalWorkCycles; 1.264 params->totalBadCycles = totalBadCycles; 1.265 @@ -239,11 +333,6 @@ 1.266 params->totalBadSyncCycles = 0; 1.267 params->numGoodSyncs = VMS__give_num_plugin_animations(); 1.268 */ 1.269 - 1.270 - 1.271 - //Wait for all threads to end 1.272 - barrier_wait(params->barrier, animatingPr); 1.273 - 1.274 //Shutdown worker 1.275 VPThread__dissipate_thread(animatingPr); 1.276 1.277 @@ -255,19 +344,30 @@ 1.278 /* this is run after the VMS is set up*/ 1.279 void benchmark(void *_params, VirtProcr *animatingPr) 1.280 { 1.281 - int i, cpuID; 1.282 - struct barrier_t barr; 1.283 + int i, cpuID, idx; 1.284 + struct barrier_t barr; 1.285 BenchParams *params; 1.286 1.287 params = (BenchParams *)_params; 1.288 - 1.289 - barrier_init(&barr, num_threads+1, animatingPr); 1.290 - 1.291 + 1.292 + barrier_init(&barr, NUM_CORES+1, animatingPr); 1.293 + 1.294 + //Init central communication exchange 1.295 + lossyCom__exchange_t* centralMsgExchange = lossyCom__initialize(NUM_CORES); 1.296 + 1.297 //prepare input 1.298 - for(i=0; i<num_threads; i++) 1.299 + for(i=0; i<NUM_CORES; i++) 1.300 { 1.301 workerParamsArray[i].barrier = &barr; 1.302 + workerParamsArray[i].coreID = i; 1.303 + workerParamsArray[i].centralMsgExchange = centralMsgExchange; 1.304 + workerParamsArray[i].terminate = FALSE; 1.305 } 1.306 + global_broadcast_counter = 0; 1.307 + 1.308 + // init random number generator for wait and msg content 1.309 + seed1 = rand()%1000; 1.310 + seed2 = rand()%1000; 1.311 1.312 //save cycles before execution of threads, to get total exe cycles 1.313 measurement_t *startExeCycles, *endExeCycles; 1.314 @@ -280,17 +380,51 @@ 1.315 #endif 1.316 1.317 //create (which starts running) all threads 1.318 - for(i=0; i<num_threads; i++) 1.319 - { VPThread__create_thread((VirtProcrFnPtr)worker_TLF, &(workerParamsArray[i]), animatingPr); 1.320 + for(i=NUM_CORES-1; i>=0; i--) 1.321 + { 1.322 + VPThread__create_thread_with_affinity((VirtProcrFnPtr)worker_TLF, 1.323 + &(workerParamsArray[i]), 1.324 + animatingPr, 1.325 + i);//schedule to core i 1.326 } 1.327 - //wait for all threads to finish 1.328 - barrier_wait(&barr, animatingPr); 1.329 1.330 #ifdef MEASURE_PERF 1.331 //endBarrierCycles read in barrier_wait()! Merten, email me if want to chg 1.332 params->endExeCycles->cycles = barr.endBarrierCycles.cycles; 1.333 #endif 1.334 1.335 + barrier_wait(&barr, animatingPr); 1.336 + printf("Total broadcast count: %d\n", global_broadcast_counter); 1.337 + 1.338 + //print send msgs 1.339 + /* 1.340 + printf("sendMsgs = []\n"); 1.341 + for(i = 0; i<NUM_CORES; i++) 1.342 + { 1.343 + printf("sendMsgs.append(["); 1.344 + for(idx = 0; idx< workerParamsArray[i].sendMsgs->numInArray; idx++) 1.345 + { 1.346 + printf("(%lu, %lu),", 1.347 + (uint64_t)(workerParamsArray[i].ptrToArrayOfSendMsgs[idx]) & 0xFFFFFFFF, 1.348 + ((uint64_t)(workerParamsArray[i].ptrToArrayOfSendMsgs[idx]) >> 32 ) & 0xFFFFFFFF); 1.349 + } 1.350 + printf("])\n"); 1.351 + } 1.352 + 1.353 + 1.354 + //print received msgs 1.355 + printf("receivedMsgs = []\n"); 1.356 + for(i = 0; i<NUM_CORES; i++) 1.357 + { 1.358 + printf("receivedMsgs.append(["); 1.359 + for(idx = 0; idx< workerParamsArray[i].receivedMsgs->numInArray; idx++) 1.360 + { 1.361 + printf("(%lu, %lu),", 1.362 + (uint64_t)(workerParamsArray[i].ptrToArrayOfReceivedMsgs[idx]) & 0xFFFFFFFF, 1.363 + ((uint64_t)(workerParamsArray[i].ptrToArrayOfReceivedMsgs[idx]) >> 32 ) & 0xFFFFFFFF); 1.364 + } 1.365 + printf("])\n"); 1.366 + }*/ 1.367 1.368 /* 1.369 uint64_t overallWorkCycles = 0; 1.370 @@ -321,39 +455,22 @@ 1.371 { 1.372 switch(argv[i][1]) 1.373 { 1.374 - case 't': 1.375 + case 'n': 1.376 if(!isdigit(argv[++i][0])) 1.377 { 1.378 - fprintf(stderr, "-t must be followed by the number of worker threads to spawn\n"); 1.379 + fprintf(stderr, "-t must be followed by the number messages to send per core\n"); 1.380 return EXIT_FAILURE; 1.381 } 1.382 - num_threads = atoi(argv[i]); 1.383 - if(!num_threads) 1.384 + num_msg_to_send = atoi(argv[i]); 1.385 + if(!num_msg_to_send) 1.386 { 1.387 - fprintf(stderr, "invalid number of threads specified: %d\n", num_threads); 1.388 + fprintf(stderr, "invalid number of messages to send: %d\n", num_msg_to_send); 1.389 return EXIT_FAILURE; 1.390 } 1.391 break; 1.392 - case 'o': 1.393 - if(!isdigit(argv[++i][0])) 1.394 - { 1.395 - fputs("-i must be followed by a number\n", stderr); 1.396 - return EXIT_FAILURE; 1.397 - } 1.398 - outer_iters = atoi(argv[i]); 1.399 - break; 1.400 - case 'i': 1.401 - if(!isdigit(argv[++i][0])) 1.402 - { 1.403 - fputs("-o must be followed by a number (workload size)\n", stderr); 1.404 - return EXIT_FAILURE; 1.405 - } 1.406 - inner_iters = atoi(argv[i]); 1.407 - break; 1.408 case 'h': 1.409 fputs(usage, stdout); 1.410 - return 0; 1.411 - 1.412 + return 0; 1.413 default: 1.414 fprintf(stderr, "unrecognized argument: %s\n", argv[i]); 1.415 fputs(usage, stderr); 1.416 @@ -362,9 +479,9 @@ 1.417 }//if arg 1.418 else 1.419 { 1.420 - fprintf(stderr, "unrecognized argument: %s\n", argv[i]); 1.421 - fputs(usage, stderr); 1.422 - return EXIT_FAILURE; 1.423 + fprintf(stderr, "unrecognized argument: %s\n", argv[i]); 1.424 + fputs(usage, stderr); 1.425 + return EXIT_FAILURE; 1.426 } 1.427 }//for 1.428 1.429 @@ -441,7 +558,7 @@ 1.430 benchParams->startExeCycles = &startExeCycles; 1.431 benchParams->endExeCycles = &endExeCycles; 1.432 1.433 - workerParamsArray = (WorkerParams *)malloc( (num_threads + 1) * sizeof(WorkerParams) ); 1.434 + workerParamsArray = (WorkerParams *)malloc( (NUM_CORES) * sizeof(WorkerParams) ); 1.435 if(workerParamsArray == NULL ) printf("error mallocing worker params array\n"); 1.436 1.437 1.438 @@ -474,7 +591,7 @@ 1.439 printf("ExeCycles/WorkCycles Ratio %f\n", 1.440 (double)totalExeCycles / (double)totalWorkCyclesAcrossCores); 1.441 #else 1.442 - printf("No measurement done!\n"); 1.443 + printf("#No measurement done!\n"); 1.444 #endif 1.445 return 0; 1.446 }
