Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > SSR > SSR__KMeans__Bench
diff kmeans.c @ 0:0ce47c784647
Initial commit
| author | Merten Sach <msach@mailbox.tu-berlin.de> |
|---|---|
| date | Tue, 27 Sep 2011 15:08:02 +0200 |
| parents | |
| children | 53dd2334f136 |
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/kmeans.c Tue Sep 27 15:08:02 2011 +0200 1.3 @@ -0,0 +1,340 @@ 1.4 +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 1.5 +/* File: pthreads_kmeans.c (OpenMP version) */ 1.6 +/* Description: Implementation of simple k-means clustering algorithm */ 1.7 +/* This program takes an array of N data objects, each with */ 1.8 +/* M coordinates and performs a k-means clustering given a */ 1.9 +/* user-provided value of the number of clusters (K). The */ 1.10 +/* clustering results are saved in 2 arrays: */ 1.11 +/* 1. a returned array of size [K][N] indicating the center */ 1.12 +/* coordinates of K clusters */ 1.13 +/* 2. membership[N] stores the cluster center ids, each */ 1.14 +/* corresponding to the cluster a data object is assigned */ 1.15 +/* */ 1.16 +/* Author: Wei-keng Liao */ 1.17 +/* ECE Department, Northwestern University */ 1.18 +/* email: wkliao@ece.northwestern.edu */ 1.19 +/* Copyright, 2005, Wei-keng Liao */ 1.20 +/* */ 1.21 +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 1.22 + 1.23 +#include <stdio.h> 1.24 +#include <stdlib.h> 1.25 +#include <pthread.h> 1.26 +#include <time.h> 1.27 +#include <math.h> 1.28 +#include "kmeans.h" 1.29 + 1.30 +#include "SSR_lib/SSR.h" 1.31 + 1.32 +#define PREC 300 1.33 + 1.34 +#define START 1 1.35 +#define END 2 1.36 + 1.37 +extern int nthreads; /* Thread count */ 1.38 +double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */ 1.39 + 1.40 +VirtProcr *parentVP; 1.41 + 1.42 +/* 1.43 +* Struct: input 1.44 +* ------------- 1.45 +* Encapsulates all the input data for the benchmark, i.e. the object list, 1.46 +* clustering output and the input statistics. 1.47 +*/ 1.48 +struct input { 1.49 + int t; 1.50 + double local_delta; 1.51 + double **objects; /* Object list */ 1.52 + double **clusters; /* Cluster list, out: [numClusters][numCoords] */ 1.53 + int *membership; /* For each object, contains the index of the cluster it currently belongs to */ 1.54 + int **local_newClusterSize; /* Thread-safe, [nthreads][numClusters] */ 1.55 + double ***local_newClusters; /* Thread-safe, [nthreads][numClusters][numCoords] */ 1.56 + int numObjs,numClusters,numCoords; 1.57 +}; 1.58 + 1.59 +/* 1.60 +* Function: euclid_dist_2 1.61 +* ----------------------- 1.62 +* Computes the square of the euclidean distance between two multi-dimensional points. 1.63 +*/ 1.64 +__inline static double euclid_dist_2(int numdims, double *coord1, double *coord2) { 1.65 + int i; 1.66 + double ans=0.0; 1.67 + 1.68 + for (i=0; i<numdims; i++) 1.69 + ans += (coord1[i]-coord2[i]) * (coord1[i]-coord2[i]); 1.70 + 1.71 + return(ans); 1.72 +} 1.73 + 1.74 +/* 1.75 +* Function: find_nearest_cluster 1.76 +* ------------------------------ 1.77 +* Function determining the cluster center which is closest to the given object. 1.78 +* Returns the index of that cluster center. 1.79 +*/ 1.80 +__inline static int find_nearest_cluster(int numClusters, int numCoords, double *object, double **clusters) { 1.81 + int index, i; 1.82 + double dist, min_dist; 1.83 + 1.84 + /* find the cluster id that has min distance to object */ 1.85 + index = 0; 1.86 + min_dist = euclid_dist_2(numCoords, object, clusters[0]); 1.87 + for (i=1; i<numClusters; i++) { 1.88 + dist = euclid_dist_2(numCoords, object, clusters[i]); 1.89 + 1.90 + /* no need square root */ 1.91 + if (dist < min_dist) { /* find the min and its array index */ 1.92 + min_dist = dist; 1.93 + index = i; 1.94 + } 1.95 + } 1.96 + return index; 1.97 +} 1.98 + 1.99 +void work(struct input *x, VirtProcr *VProc){ 1.100 + int tid = x->t; 1.101 + x->local_delta=0; 1.102 + int i; 1.103 + for (i = tid; i < x->numObjs; i += nthreads) { 1.104 + /* find the array index of nearest cluster center */ 1.105 + int index = find_nearest_cluster(x->numClusters, x->numCoords, 1.106 + x->objects[i], x->clusters); 1.107 + 1.108 + /* if membership changes, increase delta by 1 */ 1.109 + if (x->membership[i] != index) 1.110 + x->local_delta += 1.0; 1.111 + /* assign the membership to object i */ 1.112 + x->membership[i] = index; 1.113 + 1.114 + /* update new cluster centers : sum of all objects located 1.115 + within (average will be performed later) */ 1.116 + x->local_newClusterSize[tid][index]++; 1.117 + int j; 1.118 + for (j=0; j < x->numCoords; j++) 1.119 + x->local_newClusters[tid][index][j] += x->objects[i][j]; 1.120 + 1.121 + } 1.122 + SSR__send_from_to((void*)&x->local_delta, VProc, parentVP); 1.123 +} 1.124 + 1.125 +/* 1.126 +* Function: thread function work 1.127 +* -------------- 1.128 +* Worker function for threading. Work distribution is done so that each thread computers 1.129 +*/ 1.130 +void tfwork(void *ip, VirtProcr *VProc) 1.131 +{ 1.132 + struct input *x; 1.133 + x = (struct input *)ip; 1.134 + 1.135 + for(;;){ 1.136 + if(*((int*)SSR__receive_from_to(parentVP, VProc)) == END) 1.137 + break; 1.138 + else 1.139 + work(x, VProc); 1.140 + } 1.141 + 1.142 + SSR__dissipate_procr(VProc); 1.143 +} 1.144 + 1.145 +/* 1.146 +* Function: create_array_2d_f 1.147 +* -------------------------- 1.148 +* Allocates memory for a 2-dim double array as needed for the algorithm. 1.149 +*/ 1.150 +double** create_array_2d_f(int height, int width, VirtProcr *VProc) { 1.151 + double** ptr; 1.152 + int i; 1.153 + ptr = SSR__malloc_to(height * sizeof(double*), VProc); 1.154 + assert(ptr != NULL); 1.155 + ptr[0] = SSR__malloc_to(width * height * sizeof(double), VProc); 1.156 + assert(ptr[0] != NULL); 1.157 + /* Assign pointers correctly */ 1.158 + for(i = 1; i < height; i++) 1.159 + ptr[i] = ptr[i-1] + width; 1.160 + return ptr; 1.161 +} 1.162 + 1.163 +/* 1.164 +* Function: create_array_2Dd_i 1.165 +* -------------------------- 1.166 +* Allocates memory for a 2-dim integer array as needed for the algorithm. 1.167 +*/ 1.168 +int** create_array_2d_i(int height, int width, VirtProcr *VProc) { 1.169 + int** ptr; 1.170 + int i; 1.171 + ptr = SSR__malloc_to(height * sizeof(int*), VProc); 1.172 + assert(ptr != NULL); 1.173 + ptr[0] = SSR__malloc_to(width * height * sizeof(int), VProc); 1.174 + assert(ptr[0] != NULL); 1.175 + /* Assign pointers correctly */ 1.176 + for(i = 1; i < height; i++) 1.177 + ptr[i] = ptr[i-1] + width; 1.178 + return ptr; 1.179 +} 1.180 + 1.181 +/* 1.182 +* Function: pthreads_kmeans 1.183 +* ------------------------- 1.184 +* Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords]. 1.185 +*/ 1.186 +void kmeans(void *data, VirtProcr *VProc) 1.187 +{ 1.188 + struct call_data *cluster_data = (struct call_data*)data; 1.189 + //int is_perform_atomic = cluster_data->is_perform_atomic; /* in: */ 1.190 + double **objects = cluster_data->objects; /* in: [numObjs][numCoords] */ 1.191 + int numCoords = cluster_data->numCoords; /* no. coordinates */ 1.192 + int numObjs = cluster_data->numObjs; /* no. objects */ 1.193 + int numClusters = cluster_data->numClusters; /* no. clusters */ 1.194 + double threshold = cluster_data->threshold; /* % objects change membership */ 1.195 + int *membership = cluster_data->membership; /* out: [numObjs] */ 1.196 + 1.197 + int i, j, k, loop = 0; 1.198 + int *newClusterSize; /* [numClusters]: no. objects assigned in each 1.199 + new cluster */ 1.200 + double **clusters = cluster_data->clusters; /* out: [numClusters][numCoords] */ 1.201 + double **newClusters; /* [numClusters][numCoords] */ 1.202 + //double timing; 1.203 + int **local_newClusterSize; /* [nthreads][numClusters] */ 1.204 + double ***local_newClusters; /* [nthreads][numClusters][numCoords] */ 1.205 + 1.206 + VirtProcr **tasks; 1.207 + volatile int syncMsg; 1.208 + 1.209 + parentVP = VProc; 1.210 + 1.211 + /* === MEMORY SETUP === */ 1.212 + 1.213 + /* [numClusters] clusters of [numCoords] double coordinates each */ 1.214 + //Set pointers 1.215 + for(i = 1; i < numClusters; i++) 1.216 + clusters[i] = clusters[i-1] + numCoords; 1.217 + 1.218 + /* Pick first numClusters elements of objects[] as initial cluster centers */ 1.219 + for (i=0; i < numClusters; i++) 1.220 + for (j=0; j < numCoords; j++) 1.221 + clusters[i][j] = objects[i][j]; 1.222 + 1.223 + /* Initialize membership, no object belongs to any cluster yet */ 1.224 + for (i = 0; i < numObjs; i++) 1.225 + membership[i] = -1; 1.226 + 1.227 + /* newClusterSize holds information on the count of members in each cluster */ 1.228 + newClusterSize = (int*)SSR__malloc_to(numClusters * sizeof(int), VProc); 1.229 + assert(newClusterSize != NULL); 1.230 + 1.231 + /* newClusters holds the coordinates of the freshly created clusters */ 1.232 + newClusters = create_array_2d_f(numClusters, numCoords, VProc); 1.233 + local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc); 1.234 + 1.235 + /* local_newClusters is a 3D array */ 1.236 + local_newClusters = (double***)SSR__malloc_to(nthreads * sizeof(double**), VProc); 1.237 + assert(local_newClusters != NULL); 1.238 + local_newClusters[0] = (double**)SSR__malloc_to(nthreads * numClusters * sizeof(double*), VProc); 1.239 + assert(local_newClusters[0] != NULL); 1.240 + 1.241 + /* Set up the pointers */ 1.242 + for (i = 1; i < nthreads; i++) 1.243 + local_newClusters[i] = local_newClusters[i-1] + numClusters; 1.244 + 1.245 + for (i = 0; i < nthreads; i++) { 1.246 + for (j = 0; j < numClusters; j++) { 1.247 + local_newClusters[i][j] = (double*)SSR__malloc_to(numCoords * sizeof(double), VProc); 1.248 + assert(local_newClusters[i][j] != NULL); 1.249 + } 1.250 + } 1.251 + /* Perform thread setup */ 1.252 + tasks = (VirtProcr**)SSR__malloc_to(nthreads * sizeof(VirtProcr*), VProc); 1.253 + 1.254 + struct input *ip = SSR__malloc_to(nthreads * sizeof(struct input), VProc); 1.255 + /* Provide thread-safe memory locations for each worker */ 1.256 + for(i = 0; i < nthreads; i++){ 1.257 + ip[i].t = i; 1.258 + ip[i].objects=objects; 1.259 + ip[i].clusters=clusters; 1.260 + ip[i].membership=membership; 1.261 + ip[i].local_newClusterSize=local_newClusterSize; 1.262 + ip[i].local_newClusters=local_newClusters; 1.263 + ip[i].numObjs=numObjs; 1.264 + ip[i].numClusters=numClusters; 1.265 + ip[i].numCoords=numCoords; 1.266 + 1.267 + if(i!=0) 1.268 + tasks[i] = SSR__create_procr_with(tfwork, (void*)&ip[i], VProc); 1.269 + } 1.270 + 1.271 + /* === COMPUTATIONAL PHASE === */ 1.272 + syncMsg = START; 1.273 + do { 1.274 + delta = 0.0; 1.275 + 1.276 + printf("start children 1\n"); 1.277 + for(i=0; i<nthreads; i++) 1.278 + SSR__send_from_to((void*)&syncMsg, VProc, tasks[i]); 1.279 + 1.280 + //let the children do the work 1.281 + 1.282 + printf("receive results\n"); 1.283 + for(i=0; i<nthreads; i++) 1.284 + delta += *(double*)SSR__receive_from_to(tasks[i],VProc); 1.285 + printf("start children 2\n"); 1.286 + for(i=1; i<nthreads; i++) 1.287 + SSR__send_from_to((void*)&syncMsg, VProc, tasks[i]); 1.288 + 1.289 + /* Let the main thread perform the array reduction */ 1.290 + for (i = 0; i < numClusters; i++) { 1.291 + for (j = 0; j < nthreads; j++) { 1.292 + newClusterSize[i] += local_newClusterSize[j][i]; 1.293 + local_newClusterSize[j][i] = 0.0; 1.294 + for (k = 0; k < numCoords; k++) { 1.295 + newClusters[i][k] += local_newClusters[j][i][k]; 1.296 + local_newClusters[j][i][k] = 0.0; 1.297 + } 1.298 + } 1.299 + } 1.300 + 1.301 + /* Average the sum and replace old cluster centers with newClusters */ 1.302 + for (i = 0; i < numClusters; i++) { 1.303 + for (j = 0; j < numCoords; j++) { 1.304 + if (newClusterSize[i] > 1) 1.305 + clusters[i][j] = newClusters[i][j] / newClusterSize[i]; 1.306 + newClusters[i][j] = 0.0; /* set back to 0 */ 1.307 + } 1.308 + newClusterSize[i] = 0; /* set back to 0 */ 1.309 + } 1.310 + 1.311 + delta /= numObjs; 1.312 + } while (loop++ < PREC && delta > threshold); 1.313 + 1.314 + // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program, 1.315 + // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed 1.316 + // iterations, therefore making benchmarking completely unreliable. 1.317 + 1.318 + syncMsg = END; 1.319 + for(i=0; i<nthreads; i++) 1.320 + SSR__send_from_to((void*)&syncMsg, VProc, tasks[i]); 1.321 + 1.322 + 1.323 + SSR__free(ip, VProc); 1.324 + SSR__free(tasks, VProc); 1.325 + 1.326 + SSR__free(local_newClusterSize[0], VProc); 1.327 + SSR__free(local_newClusterSize, VProc); 1.328 + 1.329 + for (i = 0; i < nthreads; i++) 1.330 + for (j = 0; j < numClusters; j++) 1.331 + SSR__free(local_newClusters[i][j], VProc); 1.332 + SSR__free(local_newClusters[0], VProc); 1.333 + SSR__free(local_newClusters, VProc); 1.334 + 1.335 + SSR__free(newClusters[0], VProc); 1.336 + SSR__free(newClusters, VProc); 1.337 + SSR__free(newClusterSize, VProc); 1.338 + 1.339 + (cluster_data)->clusters = clusters; 1.340 + 1.341 + SSR__dissipate_procr(VProc); 1.342 +} 1.343 +
