Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > SSR > SSR__KMeans__Bench
comparison kmeans.c @ 0:0ce47c784647
Initial commit
| author | Merten Sach <msach@mailbox.tu-berlin.de> |
|---|---|
| date | Tue, 27 Sep 2011 15:08:02 +0200 |
| parents | |
| children | 53dd2334f136 |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:10c5db3eced7 |
|---|---|
| 1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ | |
| 2 /* File: pthreads_kmeans.c (OpenMP version) */ | |
| 3 /* Description: Implementation of simple k-means clustering algorithm */ | |
| 4 /* This program takes an array of N data objects, each with */ | |
| 5 /* M coordinates and performs a k-means clustering given a */ | |
| 6 /* user-provided value of the number of clusters (K). The */ | |
| 7 /* clustering results are saved in 2 arrays: */ | |
| 8 /* 1. a returned array of size [K][N] indicating the center */ | |
| 9 /* coordinates of K clusters */ | |
| 10 /* 2. membership[N] stores the cluster center ids, each */ | |
| 11 /* corresponding to the cluster a data object is assigned */ | |
| 12 /* */ | |
| 13 /* Author: Wei-keng Liao */ | |
| 14 /* ECE Department, Northwestern University */ | |
| 15 /* email: wkliao@ece.northwestern.edu */ | |
| 16 /* Copyright, 2005, Wei-keng Liao */ | |
| 17 /* */ | |
| 18 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ | |
| 19 | |
| 20 #include <stdio.h> | |
| 21 #include <stdlib.h> | |
| 22 #include <pthread.h> | |
| 23 #include <time.h> | |
| 24 #include <math.h> | |
| 25 #include "kmeans.h" | |
| 26 | |
| 27 #include "SSR_lib/SSR.h" | |
| 28 | |
| 29 #define PREC 300 | |
| 30 | |
| 31 #define START 1 | |
| 32 #define END 2 | |
| 33 | |
| 34 extern int nthreads; /* Thread count */ | |
| 35 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */ | |
| 36 | |
| 37 VirtProcr *parentVP; | |
| 38 | |
| 39 /* | |
| 40 * Struct: input | |
| 41 * ------------- | |
| 42 * Encapsulates all the input data for the benchmark, i.e. the object list, | |
| 43 * clustering output and the input statistics. | |
| 44 */ | |
| 45 struct input { | |
| 46 int t; | |
| 47 double local_delta; | |
| 48 double **objects; /* Object list */ | |
| 49 double **clusters; /* Cluster list, out: [numClusters][numCoords] */ | |
| 50 int *membership; /* For each object, contains the index of the cluster it currently belongs to */ | |
| 51 int **local_newClusterSize; /* Thread-safe, [nthreads][numClusters] */ | |
| 52 double ***local_newClusters; /* Thread-safe, [nthreads][numClusters][numCoords] */ | |
| 53 int numObjs,numClusters,numCoords; | |
| 54 }; | |
| 55 | |
| 56 /* | |
| 57 * Function: euclid_dist_2 | |
| 58 * ----------------------- | |
| 59 * Computes the square of the euclidean distance between two multi-dimensional points. | |
| 60 */ | |
| 61 __inline static double euclid_dist_2(int numdims, double *coord1, double *coord2) { | |
| 62 int i; | |
| 63 double ans=0.0; | |
| 64 | |
| 65 for (i=0; i<numdims; i++) | |
| 66 ans += (coord1[i]-coord2[i]) * (coord1[i]-coord2[i]); | |
| 67 | |
| 68 return(ans); | |
| 69 } | |
| 70 | |
| 71 /* | |
| 72 * Function: find_nearest_cluster | |
| 73 * ------------------------------ | |
| 74 * Function determining the cluster center which is closest to the given object. | |
| 75 * Returns the index of that cluster center. | |
| 76 */ | |
| 77 __inline static int find_nearest_cluster(int numClusters, int numCoords, double *object, double **clusters) { | |
| 78 int index, i; | |
| 79 double dist, min_dist; | |
| 80 | |
| 81 /* find the cluster id that has min distance to object */ | |
| 82 index = 0; | |
| 83 min_dist = euclid_dist_2(numCoords, object, clusters[0]); | |
| 84 for (i=1; i<numClusters; i++) { | |
| 85 dist = euclid_dist_2(numCoords, object, clusters[i]); | |
| 86 | |
| 87 /* no need square root */ | |
| 88 if (dist < min_dist) { /* find the min and its array index */ | |
| 89 min_dist = dist; | |
| 90 index = i; | |
| 91 } | |
| 92 } | |
| 93 return index; | |
| 94 } | |
| 95 | |
| 96 void work(struct input *x, VirtProcr *VProc){ | |
| 97 int tid = x->t; | |
| 98 x->local_delta=0; | |
| 99 int i; | |
| 100 for (i = tid; i < x->numObjs; i += nthreads) { | |
| 101 /* find the array index of nearest cluster center */ | |
| 102 int index = find_nearest_cluster(x->numClusters, x->numCoords, | |
| 103 x->objects[i], x->clusters); | |
| 104 | |
| 105 /* if membership changes, increase delta by 1 */ | |
| 106 if (x->membership[i] != index) | |
| 107 x->local_delta += 1.0; | |
| 108 /* assign the membership to object i */ | |
| 109 x->membership[i] = index; | |
| 110 | |
| 111 /* update new cluster centers : sum of all objects located | |
| 112 within (average will be performed later) */ | |
| 113 x->local_newClusterSize[tid][index]++; | |
| 114 int j; | |
| 115 for (j=0; j < x->numCoords; j++) | |
| 116 x->local_newClusters[tid][index][j] += x->objects[i][j]; | |
| 117 | |
| 118 } | |
| 119 SSR__send_from_to((void*)&x->local_delta, VProc, parentVP); | |
| 120 } | |
| 121 | |
| 122 /* | |
| 123 * Function: thread function work | |
| 124 * -------------- | |
| 125 * Worker function for threading. Work distribution is done so that each thread computers | |
| 126 */ | |
| 127 void tfwork(void *ip, VirtProcr *VProc) | |
| 128 { | |
| 129 struct input *x; | |
| 130 x = (struct input *)ip; | |
| 131 | |
| 132 for(;;){ | |
| 133 if(*((int*)SSR__receive_from_to(parentVP, VProc)) == END) | |
| 134 break; | |
| 135 else | |
| 136 work(x, VProc); | |
| 137 } | |
| 138 | |
| 139 SSR__dissipate_procr(VProc); | |
| 140 } | |
| 141 | |
| 142 /* | |
| 143 * Function: create_array_2d_f | |
| 144 * -------------------------- | |
| 145 * Allocates memory for a 2-dim double array as needed for the algorithm. | |
| 146 */ | |
| 147 double** create_array_2d_f(int height, int width, VirtProcr *VProc) { | |
| 148 double** ptr; | |
| 149 int i; | |
| 150 ptr = SSR__malloc_to(height * sizeof(double*), VProc); | |
| 151 assert(ptr != NULL); | |
| 152 ptr[0] = SSR__malloc_to(width * height * sizeof(double), VProc); | |
| 153 assert(ptr[0] != NULL); | |
| 154 /* Assign pointers correctly */ | |
| 155 for(i = 1; i < height; i++) | |
| 156 ptr[i] = ptr[i-1] + width; | |
| 157 return ptr; | |
| 158 } | |
| 159 | |
| 160 /* | |
| 161 * Function: create_array_2Dd_i | |
| 162 * -------------------------- | |
| 163 * Allocates memory for a 2-dim integer array as needed for the algorithm. | |
| 164 */ | |
| 165 int** create_array_2d_i(int height, int width, VirtProcr *VProc) { | |
| 166 int** ptr; | |
| 167 int i; | |
| 168 ptr = SSR__malloc_to(height * sizeof(int*), VProc); | |
| 169 assert(ptr != NULL); | |
| 170 ptr[0] = SSR__malloc_to(width * height * sizeof(int), VProc); | |
| 171 assert(ptr[0] != NULL); | |
| 172 /* Assign pointers correctly */ | |
| 173 for(i = 1; i < height; i++) | |
| 174 ptr[i] = ptr[i-1] + width; | |
| 175 return ptr; | |
| 176 } | |
| 177 | |
| 178 /* | |
| 179 * Function: pthreads_kmeans | |
| 180 * ------------------------- | |
| 181 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords]. | |
| 182 */ | |
| 183 void kmeans(void *data, VirtProcr *VProc) | |
| 184 { | |
| 185 struct call_data *cluster_data = (struct call_data*)data; | |
| 186 //int is_perform_atomic = cluster_data->is_perform_atomic; /* in: */ | |
| 187 double **objects = cluster_data->objects; /* in: [numObjs][numCoords] */ | |
| 188 int numCoords = cluster_data->numCoords; /* no. coordinates */ | |
| 189 int numObjs = cluster_data->numObjs; /* no. objects */ | |
| 190 int numClusters = cluster_data->numClusters; /* no. clusters */ | |
| 191 double threshold = cluster_data->threshold; /* % objects change membership */ | |
| 192 int *membership = cluster_data->membership; /* out: [numObjs] */ | |
| 193 | |
| 194 int i, j, k, loop = 0; | |
| 195 int *newClusterSize; /* [numClusters]: no. objects assigned in each | |
| 196 new cluster */ | |
| 197 double **clusters = cluster_data->clusters; /* out: [numClusters][numCoords] */ | |
| 198 double **newClusters; /* [numClusters][numCoords] */ | |
| 199 //double timing; | |
| 200 int **local_newClusterSize; /* [nthreads][numClusters] */ | |
| 201 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */ | |
| 202 | |
| 203 VirtProcr **tasks; | |
| 204 volatile int syncMsg; | |
| 205 | |
| 206 parentVP = VProc; | |
| 207 | |
| 208 /* === MEMORY SETUP === */ | |
| 209 | |
| 210 /* [numClusters] clusters of [numCoords] double coordinates each */ | |
| 211 //Set pointers | |
| 212 for(i = 1; i < numClusters; i++) | |
| 213 clusters[i] = clusters[i-1] + numCoords; | |
| 214 | |
| 215 /* Pick first numClusters elements of objects[] as initial cluster centers */ | |
| 216 for (i=0; i < numClusters; i++) | |
| 217 for (j=0; j < numCoords; j++) | |
| 218 clusters[i][j] = objects[i][j]; | |
| 219 | |
| 220 /* Initialize membership, no object belongs to any cluster yet */ | |
| 221 for (i = 0; i < numObjs; i++) | |
| 222 membership[i] = -1; | |
| 223 | |
| 224 /* newClusterSize holds information on the count of members in each cluster */ | |
| 225 newClusterSize = (int*)SSR__malloc_to(numClusters * sizeof(int), VProc); | |
| 226 assert(newClusterSize != NULL); | |
| 227 | |
| 228 /* newClusters holds the coordinates of the freshly created clusters */ | |
| 229 newClusters = create_array_2d_f(numClusters, numCoords, VProc); | |
| 230 local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc); | |
| 231 | |
| 232 /* local_newClusters is a 3D array */ | |
| 233 local_newClusters = (double***)SSR__malloc_to(nthreads * sizeof(double**), VProc); | |
| 234 assert(local_newClusters != NULL); | |
| 235 local_newClusters[0] = (double**)SSR__malloc_to(nthreads * numClusters * sizeof(double*), VProc); | |
| 236 assert(local_newClusters[0] != NULL); | |
| 237 | |
| 238 /* Set up the pointers */ | |
| 239 for (i = 1; i < nthreads; i++) | |
| 240 local_newClusters[i] = local_newClusters[i-1] + numClusters; | |
| 241 | |
| 242 for (i = 0; i < nthreads; i++) { | |
| 243 for (j = 0; j < numClusters; j++) { | |
| 244 local_newClusters[i][j] = (double*)SSR__malloc_to(numCoords * sizeof(double), VProc); | |
| 245 assert(local_newClusters[i][j] != NULL); | |
| 246 } | |
| 247 } | |
| 248 /* Perform thread setup */ | |
| 249 tasks = (VirtProcr**)SSR__malloc_to(nthreads * sizeof(VirtProcr*), VProc); | |
| 250 | |
| 251 struct input *ip = SSR__malloc_to(nthreads * sizeof(struct input), VProc); | |
| 252 /* Provide thread-safe memory locations for each worker */ | |
| 253 for(i = 0; i < nthreads; i++){ | |
| 254 ip[i].t = i; | |
| 255 ip[i].objects=objects; | |
| 256 ip[i].clusters=clusters; | |
| 257 ip[i].membership=membership; | |
| 258 ip[i].local_newClusterSize=local_newClusterSize; | |
| 259 ip[i].local_newClusters=local_newClusters; | |
| 260 ip[i].numObjs=numObjs; | |
| 261 ip[i].numClusters=numClusters; | |
| 262 ip[i].numCoords=numCoords; | |
| 263 | |
| 264 if(i!=0) | |
| 265 tasks[i] = SSR__create_procr_with(tfwork, (void*)&ip[i], VProc); | |
| 266 } | |
| 267 | |
| 268 /* === COMPUTATIONAL PHASE === */ | |
| 269 syncMsg = START; | |
| 270 do { | |
| 271 delta = 0.0; | |
| 272 | |
| 273 printf("start children 1\n"); | |
| 274 for(i=0; i<nthreads; i++) | |
| 275 SSR__send_from_to((void*)&syncMsg, VProc, tasks[i]); | |
| 276 | |
| 277 //let the children do the work | |
| 278 | |
| 279 printf("receive results\n"); | |
| 280 for(i=0; i<nthreads; i++) | |
| 281 delta += *(double*)SSR__receive_from_to(tasks[i],VProc); | |
| 282 printf("start children 2\n"); | |
| 283 for(i=1; i<nthreads; i++) | |
| 284 SSR__send_from_to((void*)&syncMsg, VProc, tasks[i]); | |
| 285 | |
| 286 /* Let the main thread perform the array reduction */ | |
| 287 for (i = 0; i < numClusters; i++) { | |
| 288 for (j = 0; j < nthreads; j++) { | |
| 289 newClusterSize[i] += local_newClusterSize[j][i]; | |
| 290 local_newClusterSize[j][i] = 0.0; | |
| 291 for (k = 0; k < numCoords; k++) { | |
| 292 newClusters[i][k] += local_newClusters[j][i][k]; | |
| 293 local_newClusters[j][i][k] = 0.0; | |
| 294 } | |
| 295 } | |
| 296 } | |
| 297 | |
| 298 /* Average the sum and replace old cluster centers with newClusters */ | |
| 299 for (i = 0; i < numClusters; i++) { | |
| 300 for (j = 0; j < numCoords; j++) { | |
| 301 if (newClusterSize[i] > 1) | |
| 302 clusters[i][j] = newClusters[i][j] / newClusterSize[i]; | |
| 303 newClusters[i][j] = 0.0; /* set back to 0 */ | |
| 304 } | |
| 305 newClusterSize[i] = 0; /* set back to 0 */ | |
| 306 } | |
| 307 | |
| 308 delta /= numObjs; | |
| 309 } while (loop++ < PREC && delta > threshold); | |
| 310 | |
| 311 // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program, | |
| 312 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed | |
| 313 // iterations, therefore making benchmarking completely unreliable. | |
| 314 | |
| 315 syncMsg = END; | |
| 316 for(i=0; i<nthreads; i++) | |
| 317 SSR__send_from_to((void*)&syncMsg, VProc, tasks[i]); | |
| 318 | |
| 319 | |
| 320 SSR__free(ip, VProc); | |
| 321 SSR__free(tasks, VProc); | |
| 322 | |
| 323 SSR__free(local_newClusterSize[0], VProc); | |
| 324 SSR__free(local_newClusterSize, VProc); | |
| 325 | |
| 326 for (i = 0; i < nthreads; i++) | |
| 327 for (j = 0; j < numClusters; j++) | |
| 328 SSR__free(local_newClusters[i][j], VProc); | |
| 329 SSR__free(local_newClusters[0], VProc); | |
| 330 SSR__free(local_newClusters, VProc); | |
| 331 | |
| 332 SSR__free(newClusters[0], VProc); | |
| 333 SSR__free(newClusters, VProc); | |
| 334 SSR__free(newClusterSize, VProc); | |
| 335 | |
| 336 (cluster_data)->clusters = clusters; | |
| 337 | |
| 338 SSR__dissipate_procr(VProc); | |
| 339 } | |
| 340 |
