annotate kmeans.c @ 0:0ce47c784647

Initial commit
author Merten Sach <msach@mailbox.tu-berlin.de>
date Tue, 27 Sep 2011 15:08:02 +0200
parents
children 53dd2334f136
rev   line source
msach@0 1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
msach@0 2 /* File: pthreads_kmeans.c (OpenMP version) */
msach@0 3 /* Description: Implementation of simple k-means clustering algorithm */
msach@0 4 /* This program takes an array of N data objects, each with */
msach@0 5 /* M coordinates and performs a k-means clustering given a */
msach@0 6 /* user-provided value of the number of clusters (K). The */
msach@0 7 /* clustering results are saved in 2 arrays: */
msach@0 8 /* 1. a returned array of size [K][N] indicating the center */
msach@0 9 /* coordinates of K clusters */
msach@0 10 /* 2. membership[N] stores the cluster center ids, each */
msach@0 11 /* corresponding to the cluster a data object is assigned */
msach@0 12 /* */
msach@0 13 /* Author: Wei-keng Liao */
msach@0 14 /* ECE Department, Northwestern University */
msach@0 15 /* email: wkliao@ece.northwestern.edu */
msach@0 16 /* Copyright, 2005, Wei-keng Liao */
msach@0 17 /* */
msach@0 18 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
msach@0 19
msach@0 20 #include <stdio.h>
msach@0 21 #include <stdlib.h>
msach@0 22 #include <pthread.h>
msach@0 23 #include <time.h>
msach@0 24 #include <math.h>
msach@0 25 #include "kmeans.h"
msach@0 26
msach@0 27 #include "SSR_lib/SSR.h"
msach@0 28
msach@0 29 #define PREC 300
msach@0 30
msach@0 31 #define START 1
msach@0 32 #define END 2
msach@0 33
msach@0 34 extern int nthreads; /* Thread count */
msach@0 35 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */
msach@0 36
msach@0 37 VirtProcr *parentVP;
msach@0 38
msach@0 39 /*
msach@0 40 * Struct: input
msach@0 41 * -------------
msach@0 42 * Encapsulates all the input data for the benchmark, i.e. the object list,
msach@0 43 * clustering output and the input statistics.
msach@0 44 */
msach@0 45 struct input {
msach@0 46 int t;
msach@0 47 double local_delta;
msach@0 48 double **objects; /* Object list */
msach@0 49 double **clusters; /* Cluster list, out: [numClusters][numCoords] */
msach@0 50 int *membership; /* For each object, contains the index of the cluster it currently belongs to */
msach@0 51 int **local_newClusterSize; /* Thread-safe, [nthreads][numClusters] */
msach@0 52 double ***local_newClusters; /* Thread-safe, [nthreads][numClusters][numCoords] */
msach@0 53 int numObjs,numClusters,numCoords;
msach@0 54 };
msach@0 55
msach@0 56 /*
msach@0 57 * Function: euclid_dist_2
msach@0 58 * -----------------------
msach@0 59 * Computes the square of the euclidean distance between two multi-dimensional points.
msach@0 60 */
msach@0 61 __inline static double euclid_dist_2(int numdims, double *coord1, double *coord2) {
msach@0 62 int i;
msach@0 63 double ans=0.0;
msach@0 64
msach@0 65 for (i=0; i<numdims; i++)
msach@0 66 ans += (coord1[i]-coord2[i]) * (coord1[i]-coord2[i]);
msach@0 67
msach@0 68 return(ans);
msach@0 69 }
msach@0 70
msach@0 71 /*
msach@0 72 * Function: find_nearest_cluster
msach@0 73 * ------------------------------
msach@0 74 * Function determining the cluster center which is closest to the given object.
msach@0 75 * Returns the index of that cluster center.
msach@0 76 */
msach@0 77 __inline static int find_nearest_cluster(int numClusters, int numCoords, double *object, double **clusters) {
msach@0 78 int index, i;
msach@0 79 double dist, min_dist;
msach@0 80
msach@0 81 /* find the cluster id that has min distance to object */
msach@0 82 index = 0;
msach@0 83 min_dist = euclid_dist_2(numCoords, object, clusters[0]);
msach@0 84 for (i=1; i<numClusters; i++) {
msach@0 85 dist = euclid_dist_2(numCoords, object, clusters[i]);
msach@0 86
msach@0 87 /* no need square root */
msach@0 88 if (dist < min_dist) { /* find the min and its array index */
msach@0 89 min_dist = dist;
msach@0 90 index = i;
msach@0 91 }
msach@0 92 }
msach@0 93 return index;
msach@0 94 }
msach@0 95
msach@0 96 void work(struct input *x, VirtProcr *VProc){
msach@0 97 int tid = x->t;
msach@0 98 x->local_delta=0;
msach@0 99 int i;
msach@0 100 for (i = tid; i < x->numObjs; i += nthreads) {
msach@0 101 /* find the array index of nearest cluster center */
msach@0 102 int index = find_nearest_cluster(x->numClusters, x->numCoords,
msach@0 103 x->objects[i], x->clusters);
msach@0 104
msach@0 105 /* if membership changes, increase delta by 1 */
msach@0 106 if (x->membership[i] != index)
msach@0 107 x->local_delta += 1.0;
msach@0 108 /* assign the membership to object i */
msach@0 109 x->membership[i] = index;
msach@0 110
msach@0 111 /* update new cluster centers : sum of all objects located
msach@0 112 within (average will be performed later) */
msach@0 113 x->local_newClusterSize[tid][index]++;
msach@0 114 int j;
msach@0 115 for (j=0; j < x->numCoords; j++)
msach@0 116 x->local_newClusters[tid][index][j] += x->objects[i][j];
msach@0 117
msach@0 118 }
msach@0 119 SSR__send_from_to((void*)&x->local_delta, VProc, parentVP);
msach@0 120 }
msach@0 121
msach@0 122 /*
msach@0 123 * Function: thread function work
msach@0 124 * --------------
msach@0 125 * Worker function for threading. Work distribution is done so that each thread computers
msach@0 126 */
msach@0 127 void tfwork(void *ip, VirtProcr *VProc)
msach@0 128 {
msach@0 129 struct input *x;
msach@0 130 x = (struct input *)ip;
msach@0 131
msach@0 132 for(;;){
msach@0 133 if(*((int*)SSR__receive_from_to(parentVP, VProc)) == END)
msach@0 134 break;
msach@0 135 else
msach@0 136 work(x, VProc);
msach@0 137 }
msach@0 138
msach@0 139 SSR__dissipate_procr(VProc);
msach@0 140 }
msach@0 141
msach@0 142 /*
msach@0 143 * Function: create_array_2d_f
msach@0 144 * --------------------------
msach@0 145 * Allocates memory for a 2-dim double array as needed for the algorithm.
msach@0 146 */
msach@0 147 double** create_array_2d_f(int height, int width, VirtProcr *VProc) {
msach@0 148 double** ptr;
msach@0 149 int i;
msach@0 150 ptr = SSR__malloc_to(height * sizeof(double*), VProc);
msach@0 151 assert(ptr != NULL);
msach@0 152 ptr[0] = SSR__malloc_to(width * height * sizeof(double), VProc);
msach@0 153 assert(ptr[0] != NULL);
msach@0 154 /* Assign pointers correctly */
msach@0 155 for(i = 1; i < height; i++)
msach@0 156 ptr[i] = ptr[i-1] + width;
msach@0 157 return ptr;
msach@0 158 }
msach@0 159
msach@0 160 /*
msach@0 161 * Function: create_array_2Dd_i
msach@0 162 * --------------------------
msach@0 163 * Allocates memory for a 2-dim integer array as needed for the algorithm.
msach@0 164 */
msach@0 165 int** create_array_2d_i(int height, int width, VirtProcr *VProc) {
msach@0 166 int** ptr;
msach@0 167 int i;
msach@0 168 ptr = SSR__malloc_to(height * sizeof(int*), VProc);
msach@0 169 assert(ptr != NULL);
msach@0 170 ptr[0] = SSR__malloc_to(width * height * sizeof(int), VProc);
msach@0 171 assert(ptr[0] != NULL);
msach@0 172 /* Assign pointers correctly */
msach@0 173 for(i = 1; i < height; i++)
msach@0 174 ptr[i] = ptr[i-1] + width;
msach@0 175 return ptr;
msach@0 176 }
msach@0 177
msach@0 178 /*
msach@0 179 * Function: pthreads_kmeans
msach@0 180 * -------------------------
msach@0 181 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords].
msach@0 182 */
msach@0 183 void kmeans(void *data, VirtProcr *VProc)
msach@0 184 {
msach@0 185 struct call_data *cluster_data = (struct call_data*)data;
msach@0 186 //int is_perform_atomic = cluster_data->is_perform_atomic; /* in: */
msach@0 187 double **objects = cluster_data->objects; /* in: [numObjs][numCoords] */
msach@0 188 int numCoords = cluster_data->numCoords; /* no. coordinates */
msach@0 189 int numObjs = cluster_data->numObjs; /* no. objects */
msach@0 190 int numClusters = cluster_data->numClusters; /* no. clusters */
msach@0 191 double threshold = cluster_data->threshold; /* % objects change membership */
msach@0 192 int *membership = cluster_data->membership; /* out: [numObjs] */
msach@0 193
msach@0 194 int i, j, k, loop = 0;
msach@0 195 int *newClusterSize; /* [numClusters]: no. objects assigned in each
msach@0 196 new cluster */
msach@0 197 double **clusters = cluster_data->clusters; /* out: [numClusters][numCoords] */
msach@0 198 double **newClusters; /* [numClusters][numCoords] */
msach@0 199 //double timing;
msach@0 200 int **local_newClusterSize; /* [nthreads][numClusters] */
msach@0 201 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */
msach@0 202
msach@0 203 VirtProcr **tasks;
msach@0 204 volatile int syncMsg;
msach@0 205
msach@0 206 parentVP = VProc;
msach@0 207
msach@0 208 /* === MEMORY SETUP === */
msach@0 209
msach@0 210 /* [numClusters] clusters of [numCoords] double coordinates each */
msach@0 211 //Set pointers
msach@0 212 for(i = 1; i < numClusters; i++)
msach@0 213 clusters[i] = clusters[i-1] + numCoords;
msach@0 214
msach@0 215 /* Pick first numClusters elements of objects[] as initial cluster centers */
msach@0 216 for (i=0; i < numClusters; i++)
msach@0 217 for (j=0; j < numCoords; j++)
msach@0 218 clusters[i][j] = objects[i][j];
msach@0 219
msach@0 220 /* Initialize membership, no object belongs to any cluster yet */
msach@0 221 for (i = 0; i < numObjs; i++)
msach@0 222 membership[i] = -1;
msach@0 223
msach@0 224 /* newClusterSize holds information on the count of members in each cluster */
msach@0 225 newClusterSize = (int*)SSR__malloc_to(numClusters * sizeof(int), VProc);
msach@0 226 assert(newClusterSize != NULL);
msach@0 227
msach@0 228 /* newClusters holds the coordinates of the freshly created clusters */
msach@0 229 newClusters = create_array_2d_f(numClusters, numCoords, VProc);
msach@0 230 local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc);
msach@0 231
msach@0 232 /* local_newClusters is a 3D array */
msach@0 233 local_newClusters = (double***)SSR__malloc_to(nthreads * sizeof(double**), VProc);
msach@0 234 assert(local_newClusters != NULL);
msach@0 235 local_newClusters[0] = (double**)SSR__malloc_to(nthreads * numClusters * sizeof(double*), VProc);
msach@0 236 assert(local_newClusters[0] != NULL);
msach@0 237
msach@0 238 /* Set up the pointers */
msach@0 239 for (i = 1; i < nthreads; i++)
msach@0 240 local_newClusters[i] = local_newClusters[i-1] + numClusters;
msach@0 241
msach@0 242 for (i = 0; i < nthreads; i++) {
msach@0 243 for (j = 0; j < numClusters; j++) {
msach@0 244 local_newClusters[i][j] = (double*)SSR__malloc_to(numCoords * sizeof(double), VProc);
msach@0 245 assert(local_newClusters[i][j] != NULL);
msach@0 246 }
msach@0 247 }
msach@0 248 /* Perform thread setup */
msach@0 249 tasks = (VirtProcr**)SSR__malloc_to(nthreads * sizeof(VirtProcr*), VProc);
msach@0 250
msach@0 251 struct input *ip = SSR__malloc_to(nthreads * sizeof(struct input), VProc);
msach@0 252 /* Provide thread-safe memory locations for each worker */
msach@0 253 for(i = 0; i < nthreads; i++){
msach@0 254 ip[i].t = i;
msach@0 255 ip[i].objects=objects;
msach@0 256 ip[i].clusters=clusters;
msach@0 257 ip[i].membership=membership;
msach@0 258 ip[i].local_newClusterSize=local_newClusterSize;
msach@0 259 ip[i].local_newClusters=local_newClusters;
msach@0 260 ip[i].numObjs=numObjs;
msach@0 261 ip[i].numClusters=numClusters;
msach@0 262 ip[i].numCoords=numCoords;
msach@0 263
msach@0 264 if(i!=0)
msach@0 265 tasks[i] = SSR__create_procr_with(tfwork, (void*)&ip[i], VProc);
msach@0 266 }
msach@0 267
msach@0 268 /* === COMPUTATIONAL PHASE === */
msach@0 269 syncMsg = START;
msach@0 270 do {
msach@0 271 delta = 0.0;
msach@0 272
msach@0 273 printf("start children 1\n");
msach@0 274 for(i=0; i<nthreads; i++)
msach@0 275 SSR__send_from_to((void*)&syncMsg, VProc, tasks[i]);
msach@0 276
msach@0 277 //let the children do the work
msach@0 278
msach@0 279 printf("receive results\n");
msach@0 280 for(i=0; i<nthreads; i++)
msach@0 281 delta += *(double*)SSR__receive_from_to(tasks[i],VProc);
msach@0 282 printf("start children 2\n");
msach@0 283 for(i=1; i<nthreads; i++)
msach@0 284 SSR__send_from_to((void*)&syncMsg, VProc, tasks[i]);
msach@0 285
msach@0 286 /* Let the main thread perform the array reduction */
msach@0 287 for (i = 0; i < numClusters; i++) {
msach@0 288 for (j = 0; j < nthreads; j++) {
msach@0 289 newClusterSize[i] += local_newClusterSize[j][i];
msach@0 290 local_newClusterSize[j][i] = 0.0;
msach@0 291 for (k = 0; k < numCoords; k++) {
msach@0 292 newClusters[i][k] += local_newClusters[j][i][k];
msach@0 293 local_newClusters[j][i][k] = 0.0;
msach@0 294 }
msach@0 295 }
msach@0 296 }
msach@0 297
msach@0 298 /* Average the sum and replace old cluster centers with newClusters */
msach@0 299 for (i = 0; i < numClusters; i++) {
msach@0 300 for (j = 0; j < numCoords; j++) {
msach@0 301 if (newClusterSize[i] > 1)
msach@0 302 clusters[i][j] = newClusters[i][j] / newClusterSize[i];
msach@0 303 newClusters[i][j] = 0.0; /* set back to 0 */
msach@0 304 }
msach@0 305 newClusterSize[i] = 0; /* set back to 0 */
msach@0 306 }
msach@0 307
msach@0 308 delta /= numObjs;
msach@0 309 } while (loop++ < PREC && delta > threshold);
msach@0 310
msach@0 311 // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program,
msach@0 312 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed
msach@0 313 // iterations, therefore making benchmarking completely unreliable.
msach@0 314
msach@0 315 syncMsg = END;
msach@0 316 for(i=0; i<nthreads; i++)
msach@0 317 SSR__send_from_to((void*)&syncMsg, VProc, tasks[i]);
msach@0 318
msach@0 319
msach@0 320 SSR__free(ip, VProc);
msach@0 321 SSR__free(tasks, VProc);
msach@0 322
msach@0 323 SSR__free(local_newClusterSize[0], VProc);
msach@0 324 SSR__free(local_newClusterSize, VProc);
msach@0 325
msach@0 326 for (i = 0; i < nthreads; i++)
msach@0 327 for (j = 0; j < numClusters; j++)
msach@0 328 SSR__free(local_newClusters[i][j], VProc);
msach@0 329 SSR__free(local_newClusters[0], VProc);
msach@0 330 SSR__free(local_newClusters, VProc);
msach@0 331
msach@0 332 SSR__free(newClusters[0], VProc);
msach@0 333 SSR__free(newClusters, VProc);
msach@0 334 SSR__free(newClusterSize, VProc);
msach@0 335
msach@0 336 (cluster_data)->clusters = clusters;
msach@0 337
msach@0 338 SSR__dissipate_procr(VProc);
msach@0 339 }
msach@0 340