Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > Vthread > Vthread__KMeans__Bench
changeset 1:8e7bdab2840f
VPThread version workinh
| author | Merten Sach <msach@mailbox.tu-berlin.de> |
|---|---|
| date | Tue, 16 Aug 2011 20:32:55 +0200 |
| parents | e69e4c2d612a |
| children | 467746c73fd0 |
| files | .hgignore Makefile kmeans.h pthreads_kmeans.c pthreads_main.c |
| diffstat | 5 files changed, 136 insertions(+), 82 deletions(-) [+] |
line diff
1.1 --- a/.hgignore Wed Aug 03 19:30:34 2011 +0200 1.2 +++ b/.hgignore Tue Aug 16 20:32:55 2011 +0200 1.3 @@ -1,5 +1,8 @@ 1.4 syntax: glob 1.5 1.6 +histograms 1.7 +kmeans 1.8 +out 1.9 nbproject 1.10 c-ray-mt 1.11 *.ppm
2.1 --- a/Makefile Wed Aug 03 19:30:34 2011 +0200 2.2 +++ b/Makefile Tue Aug 16 20:32:55 2011 +0200 2.3 @@ -1,5 +1,5 @@ 2.4 CC = gcc 2.5 -CFLAGS = -m64 -ffast-math -fwrapv -fno-omit-frame-pointer -O0 -D VPTHREAD -D APPLICATION=KMEANS -g -Wall 2.6 +CFLAGS = -m64 -ffast-math -fwrapv -fno-omit-frame-pointer -O3 -D VPTHREAD -D APPLICATION=KMEANS -g -Wall 2.7 LDFLAGS = 2.8 2.9 LIBS = -lm -lpthread
3.1 --- a/kmeans.h Wed Aug 03 19:30:34 2011 +0200 3.2 +++ b/kmeans.h Tue Aug 16 20:32:55 2011 +0200 3.3 @@ -13,8 +13,20 @@ 3.4 #define _H_KMEANS 3.5 3.6 #include <assert.h> 3.7 +#include "VPThread_lib/VPThread.h" 3.8 3.9 -double** pthreads_kmeans(int, double**, int, int, int, double, int*); 3.10 +struct call_data{ 3.11 + int is_perform_atomic; /* in: */ 3.12 + double **objects; /* in: [numObjs][numCoords] */ 3.13 + int numCoords; /* no. coordinates */ 3.14 + int numObjs; /* no. objects */ 3.15 + int numClusters; /* no. clusters */ 3.16 + double threshold; /* % objects change membership */ 3.17 + int *membership; 3.18 + double **clusters; 3.19 +}; 3.20 + 3.21 +void pthreads_kmeans(void *data, VirtProcr *VProc); 3.22 3.23 double** file_read(int, char*, int*, int*); 3.24
4.1 --- a/pthreads_kmeans.c Wed Aug 03 19:30:34 2011 +0200 4.2 +++ b/pthreads_kmeans.c Tue Aug 16 20:32:55 2011 +0200 4.3 @@ -24,19 +24,54 @@ 4.4 #include <math.h> 4.5 #include "kmeans.h" 4.6 4.7 +#include "VPThread_lib/VPThread.h" 4.8 + 4.9 #define PREC 300 4.10 4.11 -char __ProgrammName[] = "kmeans"; 4.12 -char __DataSet[255]; 4.13 +struct barrier_t 4.14 +{ 4.15 + int counter; 4.16 + int nthreads; 4.17 + int32 mutex; 4.18 + int32 cond; 4.19 +}; 4.20 +typedef struct barrier_t barrier; 4.21 4.22 extern int nthreads; /* Thread count */ 4.23 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */ 4.24 volatile int finished; 4.25 4.26 -pthread_barrier_t barr; 4.27 -pthread_mutex_t lock1; 4.28 +barrier barr; 4.29 +int32 lock1; 4.30 pthread_attr_t attr; 4.31 4.32 +void inline barrier_init(barrier *barr, int nthreads, VirtProcr *VProc) 4.33 +{ 4.34 + barr->counter = 0; 4.35 + barr->nthreads = nthreads; 4.36 + barr->mutex = VPThread__make_mutex(VProc); 4.37 + barr->cond = VPThread__make_cond(barr->mutex, VProc); 4.38 +} 4.39 + 4.40 +void inline barrier_wait(barrier *barr, VirtProcr *VProc) 4.41 +{ 4.42 + int i; 4.43 + 4.44 + VPThread__mutex_lock(barr->mutex, VProc); 4.45 + barr->counter++; 4.46 + if(barr->counter == barr->nthreads) 4.47 + { 4.48 + barr->counter = 0; 4.49 + for(i=0; i < barr->nthreads; i++) 4.50 + VPThread__cond_signal(barr->cond, VProc); 4.51 + } 4.52 + else 4.53 + { 4.54 + VPThread__cond_wait(barr->cond, VProc); 4.55 + } 4.56 + VPThread__mutex_unlock(barr->mutex, VProc); 4.57 +} 4.58 + 4.59 /* 4.60 * Struct: input 4.61 * ------------- 4.62 @@ -93,7 +128,7 @@ 4.63 return index; 4.64 } 4.65 4.66 -void work(struct input *x){ 4.67 +void work(struct input *x, VirtProcr *VProc){ 4.68 int tid = x->t; 4.69 double local_delta=0; 4.70 int i; 4.71 @@ -116,30 +151,32 @@ 4.72 x->local_newClusters[tid][index][j] += x->objects[i][j]; 4.73 4.74 } 4.75 - pthread_mutex_lock(&lock1); 4.76 + VPThread__mutex_lock(lock1, VProc); 4.77 delta +=local_delta; 4.78 - pthread_mutex_unlock(&lock1); 4.79 + VPThread__mutex_unlock(lock1, VProc); 4.80 } 4.81 + 4.82 /* 4.83 * Function: thread function work 4.84 * -------------- 4.85 * Worker function for threading. Work distribution is done so that each thread computers 4.86 */ 4.87 -void* tfwork(void *ip) 4.88 +void tfwork(void *ip, VirtProcr *VProc) 4.89 { 4.90 struct input *x; 4.91 x = (struct input *)ip; 4.92 4.93 for(;;){ 4.94 - pthread_barrier_wait(&barr); 4.95 + barrier_wait(&barr, VProc); 4.96 if (finished){ 4.97 break; 4.98 } 4.99 - work(x); 4.100 - pthread_barrier_wait(&barr); 4.101 + work(x, VProc); 4.102 + barrier_wait(&barr, VProc); 4.103 } 4.104 4.105 - pthread_exit(NULL); 4.106 + //pthread_exit(NULL); 4.107 + VPThread__dissipate_thread(VProc); 4.108 } 4.109 4.110 /* 4.111 @@ -147,12 +184,12 @@ 4.112 * -------------------------- 4.113 * Allocates memory for a 2-dim double array as needed for the algorithm. 4.114 */ 4.115 -double** create_array_2d_f(int height, int width) { 4.116 +double** create_array_2d_f(int height, int width, VirtProcr *VProc) { 4.117 double** ptr; 4.118 int i; 4.119 - ptr = calloc(height, sizeof(double*)); 4.120 + ptr = VPThread__malloc(height * sizeof(double*), VProc); 4.121 assert(ptr != NULL); 4.122 - ptr[0] = calloc(width * height, sizeof(double)); 4.123 + ptr[0] = VPThread__malloc(width * height * sizeof(double), VProc); 4.124 assert(ptr[0] != NULL); 4.125 /* Assign pointers correctly */ 4.126 for(i = 1; i < height; i++) 4.127 @@ -165,12 +202,12 @@ 4.128 * -------------------------- 4.129 * Allocates memory for a 2-dim integer array as needed for the algorithm. 4.130 */ 4.131 -int** create_array_2d_i(int height, int width) { 4.132 +int** create_array_2d_i(int height, int width, VirtProcr *VProc) { 4.133 int** ptr; 4.134 int i; 4.135 - ptr = calloc(height, sizeof(int*)); 4.136 + ptr = VPThread__malloc(height * sizeof(int*), VProc); 4.137 assert(ptr != NULL); 4.138 - ptr[0] = calloc(width * height, sizeof(int)); 4.139 + ptr[0] = VPThread__malloc(width * height * sizeof(int), VProc); 4.140 assert(ptr[0] != NULL); 4.141 /* Assign pointers correctly */ 4.142 for(i = 1; i < height; i++) 4.143 @@ -183,30 +220,34 @@ 4.144 * ------------------------- 4.145 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords]. 4.146 */ 4.147 -double** pthreads_kmeans(int is_perform_atomic, /* in: */ 4.148 - double **objects, /* in: [numObjs][numCoords] */ 4.149 - int numCoords, /* no. coordinates */ 4.150 - int numObjs, /* no. objects */ 4.151 - int numClusters, /* no. clusters */ 4.152 - double threshold, /* % objects change membership */ 4.153 - int *membership) /* out: [numObjs] */ 4.154 +void pthreads_kmeans(void *data, VirtProcr *VProc) 4.155 { 4.156 + struct call_data *cluster_data = (struct call_data*)data; 4.157 + //int is_perform_atomic = cluster_data->is_perform_atomic; /* in: */ 4.158 + double **objects = cluster_data->objects; /* in: [numObjs][numCoords] */ 4.159 + int numCoords = cluster_data->numCoords; /* no. coordinates */ 4.160 + int numObjs = cluster_data->numObjs; /* no. objects */ 4.161 + int numClusters = cluster_data->numClusters; /* no. clusters */ 4.162 + double threshold = cluster_data->threshold; /* % objects change membership */ 4.163 + int *membership = cluster_data->membership; /* out: [numObjs] */ 4.164 4.165 - int i, j, k, index, loop = 0, rc; 4.166 + int i, j, k, loop = 0; 4.167 int *newClusterSize; /* [numClusters]: no. objects assigned in each 4.168 new cluster */ 4.169 - double **clusters; /* out: [numClusters][numCoords] */ 4.170 + double **clusters = cluster_data->clusters; /* out: [numClusters][numCoords] */ 4.171 double **newClusters; /* [numClusters][numCoords] */ 4.172 - double timing; 4.173 + //double timing; 4.174 int **local_newClusterSize; /* [nthreads][numClusters] */ 4.175 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */ 4.176 4.177 - pthread_t *thread; 4.178 + VirtProcr **thread; 4.179 4.180 /* === MEMORY SETUP === */ 4.181 4.182 /* [numClusters] clusters of [numCoords] double coordinates each */ 4.183 - clusters = create_array_2d_f(numClusters, numCoords); 4.184 + //Set pointers 4.185 + for(i = 1; i < numClusters; i++) 4.186 + clusters[i] = clusters[i-1] + numCoords; 4.187 4.188 /* Pick first numClusters elements of objects[] as initial cluster centers */ 4.189 for (i=0; i < numClusters; i++) 4.190 @@ -218,17 +259,17 @@ 4.191 membership[i] = -1; 4.192 4.193 /* newClusterSize holds information on the count of members in each cluster */ 4.194 - newClusterSize = (int*)calloc(numClusters, sizeof(int)); 4.195 + newClusterSize = (int*)VPThread__malloc(numClusters * sizeof(int), VProc); 4.196 assert(newClusterSize != NULL); 4.197 4.198 /* newClusters holds the coordinates of the freshly created clusters */ 4.199 - newClusters = create_array_2d_f(numClusters, numCoords); 4.200 - local_newClusterSize = create_array_2d_i(nthreads, numClusters); 4.201 + newClusters = create_array_2d_f(numClusters, numCoords, VProc); 4.202 + local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc); 4.203 4.204 /* local_newClusters is a 3D array */ 4.205 - local_newClusters = (double***)malloc(nthreads * sizeof(double**)); 4.206 + local_newClusters = (double***)VPThread__malloc(nthreads * sizeof(double**), VProc); 4.207 assert(local_newClusters != NULL); 4.208 - local_newClusters[0] = (double**) malloc(nthreads * numClusters * sizeof(double*)); 4.209 + local_newClusters[0] = (double**) VPThread__malloc(nthreads * numClusters * sizeof(double*), VProc); 4.210 assert(local_newClusters[0] != NULL); 4.211 4.212 /* Set up the pointers */ 4.213 @@ -237,21 +278,18 @@ 4.214 4.215 for (i = 0; i < nthreads; i++) { 4.216 for (j = 0; j < numClusters; j++) { 4.217 - local_newClusters[i][j] = (double*)calloc(numCoords, sizeof(double)); 4.218 + local_newClusters[i][j] = (double*)VPThread__malloc(numCoords * sizeof(double), VProc); 4.219 assert(local_newClusters[i][j] != NULL); 4.220 } 4.221 } 4.222 /* Perform thread setup */ 4.223 - thread = (pthread_t*)calloc(nthreads, sizeof(pthread_t)); 4.224 + thread = (VirtProcr**)VPThread__malloc(nthreads * sizeof(VirtProcr*), VProc); 4.225 4.226 - printf("nthreads %d\n", nthreads); 4.227 - pthread_barrier_init(&barr, NULL, nthreads); 4.228 - pthread_attr_init(&attr); 4.229 - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 4.230 - pthread_mutex_init(&lock1, NULL); 4.231 + barrier_init(&barr, nthreads, VProc); 4.232 + lock1 = VPThread__make_mutex(VProc); 4.233 finished=0; 4.234 4.235 - struct input *ip = malloc(nthreads * sizeof(struct input)); 4.236 + struct input *ip = VPThread__malloc(nthreads * sizeof(struct input), VProc); 4.237 /* Provide thread-safe memory locations for each worker */ 4.238 for(i = 0; i < nthreads; i++){ 4.239 ip[i].t = i; 4.240 @@ -265,11 +303,7 @@ 4.241 ip[i].numCoords=numCoords; 4.242 4.243 if (i>0){ 4.244 - rc = pthread_create(&thread[i], &attr, tfwork, (void *)&ip[i]); 4.245 - if (rc) { 4.246 - fprintf(stderr, "ERROR: Return Code For Thread Creation Is %d\n", rc); 4.247 - exit(EXIT_FAILURE); 4.248 - } 4.249 + thread[i] = VPThread__create_thread(tfwork, (void*)&ip[i], VProc); 4.250 } 4.251 } 4.252 4.253 @@ -277,10 +311,10 @@ 4.254 4.255 do { 4.256 delta = 0.0; 4.257 - pthread_barrier_wait(&barr); 4.258 - work(&ip[0]); 4.259 + barrier_wait(&barr, VProc); 4.260 + work(&ip[0], VProc); 4.261 4.262 - pthread_barrier_wait(&barr); 4.263 + barrier_wait(&barr, VProc); 4.264 /* Let the main thread perform the array reduction */ 4.265 for (i = 0; i < numClusters; i++) { 4.266 for (j = 0; j < nthreads; j++) { 4.267 @@ -306,39 +340,32 @@ 4.268 delta /= numObjs; 4.269 } while (loop++ < PREC && delta > threshold); 4.270 4.271 - // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program, 4.272 + // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program, 4.273 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed 4.274 // iterations, therefore making benchmarking completely unreliable. 4.275 4.276 - finished=1; 4.277 - pthread_barrier_wait(&barr); 4.278 + finished=1; 4.279 + barrier_wait(&barr, VProc); 4.280 4.281 - for(i = 1; i < nthreads; i++) { 4.282 - rc = pthread_join(thread[i], NULL); 4.283 - if (rc) { 4.284 - fprintf(stderr, "ERROR: Return Code For Thread Join Is %d\n", rc); 4.285 - exit(EXIT_FAILURE); 4.286 - } 4.287 - } 4.288 - 4.289 - free(ip); 4.290 - free(thread); 4.291 - pthread_barrier_destroy(&barr); 4.292 - pthread_mutex_destroy(&lock1); 4.293 - pthread_attr_destroy(&attr); 4.294 4.295 - free(local_newClusterSize[0]); 4.296 - free(local_newClusterSize); 4.297 + VPThread__free(ip, VProc); 4.298 + VPThread__free(thread, VProc); 4.299 + 4.300 + VPThread__free(local_newClusterSize[0], VProc); 4.301 + VPThread__free(local_newClusterSize, VProc); 4.302 4.303 for (i = 0; i < nthreads; i++) 4.304 for (j = 0; j < numClusters; j++) 4.305 - free(local_newClusters[i][j]); 4.306 - free(local_newClusters[0]); 4.307 - free(local_newClusters); 4.308 + VPThread__free(local_newClusters[i][j], VProc); 4.309 + VPThread__free(local_newClusters[0], VProc); 4.310 + VPThread__free(local_newClusters, VProc); 4.311 4.312 - free(newClusters[0]); 4.313 - free(newClusters); 4.314 - free(newClusterSize); 4.315 - return clusters; 4.316 + VPThread__free(newClusters[0], VProc); 4.317 + VPThread__free(newClusters, VProc); 4.318 + VPThread__free(newClusterSize, VProc); 4.319 + 4.320 + (cluster_data)->clusters = clusters; 4.321 + 4.322 + VPThread__dissipate_thread(VProc); 4.323 } 4.324
5.1 --- a/pthreads_main.c Wed Aug 03 19:30:34 2011 +0200 5.2 +++ b/pthreads_main.c Tue Aug 16 20:32:55 2011 +0200 5.3 @@ -27,6 +27,11 @@ 5.4 #include <time.h> 5.5 #include "kmeans.h" 5.6 5.7 +#include "VPThread_lib/VPThread.h" 5.8 + 5.9 +char __ProgrammName[] = "kmeans"; 5.10 +char __DataSet[255]; 5.11 + 5.12 #define seconds(tm) gettimeofday(&tp,(struct timezone *)0);\ 5.13 tm=tp.tv_sec+tp.tv_usec/1000000.0 5.14 5.15 @@ -56,7 +61,7 @@ 5.16 int opt; 5.17 extern char *optarg; 5.18 extern int optind; 5.19 - int i, j; 5.20 + int j; 5.21 int isBinaryFile; 5.22 5.23 int *membership; /* [numObjs] */ 5.24 @@ -108,10 +113,17 @@ 5.25 5.26 membership = (int*) malloc(numObjs * sizeof(int)); 5.27 assert(membership != NULL); 5.28 + 5.29 + clusters = malloc(numClusters * sizeof(double*)); 5.30 + assert(clusters != NULL); 5.31 + clusters[0] = malloc(numClusters * numCoords * sizeof(double)); 5.32 + assert(clusters[0] != NULL); 5.33 5.34 - /* Launch the core computation algorithm */ 5.35 - clusters = pthreads_kmeans(0, objects, numCoords, numObjs, 5.36 - numClusters, threshold, membership); 5.37 + struct call_data data = { 0, objects, numCoords, numObjs, 5.38 + numClusters, threshold, membership, clusters }; 5.39 + 5.40 + /* Launch the core computation algorithm */ 5.41 + VPThread__create_seed_procr_and_do_work(pthreads_kmeans, (void*)&data); 5.42 5.43 free(objects[0]); 5.44 free(objects);
