Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > SSR > SSR__KMeans__Bench
view kmeans.c @ 3:d906272ff3a3
DataSet print
| author | Merten Sach <msach@mailbox.tu-berlin.de> |
|---|---|
| date | Wed, 28 Sep 2011 15:04:24 +0200 |
| parents | 0ce47c784647 |
| children |
line source
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* File: pthreads_kmeans.c (OpenMP version) */
3 /* Description: Implementation of simple k-means clustering algorithm */
4 /* This program takes an array of N data objects, each with */
5 /* M coordinates and performs a k-means clustering given a */
6 /* user-provided value of the number of clusters (K). The */
7 /* clustering results are saved in 2 arrays: */
8 /* 1. a returned array of size [K][N] indicating the center */
9 /* coordinates of K clusters */
10 /* 2. membership[N] stores the cluster center ids, each */
11 /* corresponding to the cluster a data object is assigned */
12 /* */
13 /* Author: Wei-keng Liao */
14 /* ECE Department, Northwestern University */
15 /* email: wkliao@ece.northwestern.edu */
16 /* Copyright, 2005, Wei-keng Liao */
17 /* */
18 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <pthread.h>
23 #include <time.h>
24 #include <math.h>
25 #include "kmeans.h"
27 #include "SSR_lib/SSR.h"
29 #define PREC 300
31 #define START 1
32 #define END 2
34 extern int nthreads; /* Thread count */
35 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */
37 VirtProcr *parentVP;
39 /*
40 * Struct: input
41 * -------------
42 * Encapsulates all the input data for the benchmark, i.e. the object list,
43 * clustering output and the input statistics.
44 */
45 struct input {
46 int t;
47 double local_delta;
48 double **objects; /* Object list */
49 double **clusters; /* Cluster list, out: [numClusters][numCoords] */
50 int *membership; /* For each object, contains the index of the cluster it currently belongs to */
51 int **local_newClusterSize; /* Thread-safe, [nthreads][numClusters] */
52 double ***local_newClusters; /* Thread-safe, [nthreads][numClusters][numCoords] */
53 int numObjs,numClusters,numCoords;
54 };
56 /*
57 * Function: euclid_dist_2
58 * -----------------------
59 * Computes the square of the euclidean distance between two multi-dimensional points.
60 */
61 __inline static double euclid_dist_2(int numdims, double *coord1, double *coord2) {
62 int i;
63 double ans=0.0;
65 for (i=0; i<numdims; i++)
66 ans += (coord1[i]-coord2[i]) * (coord1[i]-coord2[i]);
68 return(ans);
69 }
71 /*
72 * Function: find_nearest_cluster
73 * ------------------------------
74 * Function determining the cluster center which is closest to the given object.
75 * Returns the index of that cluster center.
76 */
77 __inline static int find_nearest_cluster(int numClusters, int numCoords, double *object, double **clusters) {
78 int index, i;
79 double dist, min_dist;
81 /* find the cluster id that has min distance to object */
82 index = 0;
83 min_dist = euclid_dist_2(numCoords, object, clusters[0]);
84 for (i=1; i<numClusters; i++) {
85 dist = euclid_dist_2(numCoords, object, clusters[i]);
87 /* no need square root */
88 if (dist < min_dist) { /* find the min and its array index */
89 min_dist = dist;
90 index = i;
91 }
92 }
93 return index;
94 }
96 void work(struct input *x, VirtProcr *VProc){
97 int tid = x->t;
98 x->local_delta=0;
99 int i;
100 for (i = tid; i < x->numObjs; i += nthreads) {
101 /* find the array index of nearest cluster center */
102 int index = find_nearest_cluster(x->numClusters, x->numCoords,
103 x->objects[i], x->clusters);
105 /* if membership changes, increase delta by 1 */
106 if (x->membership[i] != index)
107 x->local_delta += 1.0;
108 /* assign the membership to object i */
109 x->membership[i] = index;
111 /* update new cluster centers : sum of all objects located
112 within (average will be performed later) */
113 x->local_newClusterSize[tid][index]++;
114 int j;
115 for (j=0; j < x->numCoords; j++)
116 x->local_newClusters[tid][index][j] += x->objects[i][j];
118 }
119 SSR__send_from_to((void*)&x->local_delta, VProc, parentVP);
120 }
122 /*
123 * Function: thread function work
124 * --------------
125 * Worker function for threading. Work distribution is done so that each thread computers
126 */
127 void tfwork(void *ip, VirtProcr *VProc)
128 {
129 struct input *x;
130 x = (struct input *)ip;
132 for(;;){
133 if(*((int*)SSR__receive_from_to(parentVP, VProc)) == END)
134 break;
135 else
136 work(x, VProc);
137 }
139 SSR__dissipate_procr(VProc);
140 }
142 /*
143 * Function: create_array_2d_f
144 * --------------------------
145 * Allocates memory for a 2-dim double array as needed for the algorithm.
146 */
147 double** create_array_2d_f(int height, int width, VirtProcr *VProc) {
148 double** ptr;
149 int i;
150 ptr = SSR__malloc_to(height * sizeof(double*), VProc);
151 assert(ptr != NULL);
152 ptr[0] = SSR__malloc_to(width * height * sizeof(double), VProc);
153 assert(ptr[0] != NULL);
154 /* Assign pointers correctly */
155 for(i = 1; i < height; i++)
156 ptr[i] = ptr[i-1] + width;
157 return ptr;
158 }
160 /*
161 * Function: create_array_2Dd_i
162 * --------------------------
163 * Allocates memory for a 2-dim integer array as needed for the algorithm.
164 */
165 int** create_array_2d_i(int height, int width, VirtProcr *VProc) {
166 int** ptr;
167 int i;
168 ptr = SSR__malloc_to(height * sizeof(int*), VProc);
169 assert(ptr != NULL);
170 ptr[0] = SSR__malloc_to(width * height * sizeof(int), VProc);
171 assert(ptr[0] != NULL);
172 /* Assign pointers correctly */
173 for(i = 1; i < height; i++)
174 ptr[i] = ptr[i-1] + width;
175 return ptr;
176 }
178 /*
179 * Function: pthreads_kmeans
180 * -------------------------
181 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords].
182 */
183 void kmeans(void *data, VirtProcr *VProc)
184 {
185 struct call_data *cluster_data = (struct call_data*)data;
186 //int is_perform_atomic = cluster_data->is_perform_atomic; /* in: */
187 double **objects = cluster_data->objects; /* in: [numObjs][numCoords] */
188 int numCoords = cluster_data->numCoords; /* no. coordinates */
189 int numObjs = cluster_data->numObjs; /* no. objects */
190 int numClusters = cluster_data->numClusters; /* no. clusters */
191 double threshold = cluster_data->threshold; /* % objects change membership */
192 int *membership = cluster_data->membership; /* out: [numObjs] */
194 int i, j, k, loop = 0;
195 int *newClusterSize; /* [numClusters]: no. objects assigned in each
196 new cluster */
197 double **clusters = cluster_data->clusters; /* out: [numClusters][numCoords] */
198 double **newClusters; /* [numClusters][numCoords] */
199 //double timing;
200 int **local_newClusterSize; /* [nthreads][numClusters] */
201 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */
203 VirtProcr **tasks;
204 volatile int syncMsg;
206 parentVP = VProc;
208 /* === MEMORY SETUP === */
210 /* [numClusters] clusters of [numCoords] double coordinates each */
211 //Set pointers
212 for(i = 1; i < numClusters; i++)
213 clusters[i] = clusters[i-1] + numCoords;
215 /* Pick first numClusters elements of objects[] as initial cluster centers */
216 for (i=0; i < numClusters; i++)
217 for (j=0; j < numCoords; j++)
218 clusters[i][j] = objects[i][j];
220 /* Initialize membership, no object belongs to any cluster yet */
221 for (i = 0; i < numObjs; i++)
222 membership[i] = -1;
224 /* newClusterSize holds information on the count of members in each cluster */
225 newClusterSize = (int*)SSR__malloc_to(numClusters * sizeof(int), VProc);
226 assert(newClusterSize != NULL);
228 /* newClusters holds the coordinates of the freshly created clusters */
229 newClusters = create_array_2d_f(numClusters, numCoords, VProc);
230 local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc);
232 /* local_newClusters is a 3D array */
233 local_newClusters = (double***)SSR__malloc_to(nthreads * sizeof(double**), VProc);
234 assert(local_newClusters != NULL);
235 local_newClusters[0] = (double**)SSR__malloc_to(nthreads * numClusters * sizeof(double*), VProc);
236 assert(local_newClusters[0] != NULL);
238 /* Set up the pointers */
239 for (i = 1; i < nthreads; i++)
240 local_newClusters[i] = local_newClusters[i-1] + numClusters;
242 for (i = 0; i < nthreads; i++) {
243 for (j = 0; j < numClusters; j++) {
244 local_newClusters[i][j] = (double*)SSR__malloc_to(numCoords * sizeof(double), VProc);
245 assert(local_newClusters[i][j] != NULL);
246 }
247 }
248 /* Perform thread setup */
249 tasks = (VirtProcr**)SSR__malloc_to(nthreads * sizeof(VirtProcr*), VProc);
251 struct input *ip = SSR__malloc_to(nthreads * sizeof(struct input), VProc);
252 /* Provide thread-safe memory locations for each worker */
253 for(i = 0; i < nthreads; i++){
254 ip[i].t = i;
255 ip[i].objects=objects;
256 ip[i].clusters=clusters;
257 ip[i].membership=membership;
258 ip[i].local_newClusterSize=local_newClusterSize;
259 ip[i].local_newClusters=local_newClusters;
260 ip[i].numObjs=numObjs;
261 ip[i].numClusters=numClusters;
262 ip[i].numCoords=numCoords;
264 tasks[i] = SSR__create_procr_with(tfwork, (void*)&ip[i], VProc);
265 }
267 /* === COMPUTATIONAL PHASE === */
268 syncMsg = START;
269 do {
270 delta = 0.0;
272 for(i=0; i<nthreads; i++)
273 SSR__send_from_to((void*)&syncMsg, VProc, tasks[i]);
275 //let the children do the work
277 for(i=0; i<nthreads; i++)
278 delta += *(double*)SSR__receive_from_to(tasks[i],VProc);
280 /* Let the main thread perform the array reduction */
281 for (i = 0; i < numClusters; i++) {
282 for (j = 0; j < nthreads; j++) {
283 newClusterSize[i] += local_newClusterSize[j][i];
284 local_newClusterSize[j][i] = 0.0;
285 for (k = 0; k < numCoords; k++) {
286 newClusters[i][k] += local_newClusters[j][i][k];
287 local_newClusters[j][i][k] = 0.0;
288 }
289 }
290 }
292 /* Average the sum and replace old cluster centers with newClusters */
293 for (i = 0; i < numClusters; i++) {
294 for (j = 0; j < numCoords; j++) {
295 if (newClusterSize[i] > 1)
296 clusters[i][j] = newClusters[i][j] / newClusterSize[i];
297 newClusters[i][j] = 0.0; /* set back to 0 */
298 }
299 newClusterSize[i] = 0; /* set back to 0 */
300 }
302 delta /= numObjs;
303 } while (loop++ < PREC && delta > threshold);
305 // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program,
306 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed
307 // iterations, therefore making benchmarking completely unreliable.
309 syncMsg = END;
310 for(i=0; i<nthreads; i++)
311 SSR__send_from_to((void*)&syncMsg, VProc, tasks[i]);
314 SSR__free(ip, VProc);
315 SSR__free(tasks, VProc);
317 SSR__free(local_newClusterSize[0], VProc);
318 SSR__free(local_newClusterSize, VProc);
320 for (i = 0; i < nthreads; i++)
321 for (j = 0; j < numClusters; j++)
322 SSR__free(local_newClusters[i][j], VProc);
323 SSR__free(local_newClusters[0], VProc);
324 SSR__free(local_newClusters, VProc);
326 SSR__free(newClusters[0], VProc);
327 SSR__free(newClusters, VProc);
328 SSR__free(newClusterSize, VProc);
330 (cluster_data)->clusters = clusters;
332 SSR__dissipate_procr(VProc);
333 }
