| rev |
line source |
|
msach@0
|
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
|
|
msach@0
|
2 /* File: pthreads_kmeans.c (OpenMP version) */
|
|
msach@0
|
3 /* Description: Implementation of simple k-means clustering algorithm */
|
|
msach@0
|
4 /* This program takes an array of N data objects, each with */
|
|
msach@0
|
5 /* M coordinates and performs a k-means clustering given a */
|
|
msach@0
|
6 /* user-provided value of the number of clusters (K). The */
|
|
msach@0
|
7 /* clustering results are saved in 2 arrays: */
|
|
msach@0
|
8 /* 1. a returned array of size [K][N] indicating the center */
|
|
msach@0
|
9 /* coordinates of K clusters */
|
|
msach@0
|
10 /* 2. membership[N] stores the cluster center ids, each */
|
|
msach@0
|
11 /* corresponding to the cluster a data object is assigned */
|
|
msach@0
|
12 /* */
|
|
msach@0
|
13 /* Author: Wei-keng Liao */
|
|
msach@0
|
14 /* ECE Department, Northwestern University */
|
|
msach@0
|
15 /* email: wkliao@ece.northwestern.edu */
|
|
msach@0
|
16 /* Copyright, 2005, Wei-keng Liao */
|
|
msach@0
|
17 /* */
|
|
msach@0
|
18 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
|
|
msach@0
|
19
|
|
msach@0
|
20 #include <stdio.h>
|
|
msach@0
|
21 #include <stdlib.h>
|
|
msach@0
|
22 #include <pthread.h>
|
|
msach@0
|
23 #include <time.h>
|
|
msach@0
|
24 #include <math.h>
|
|
msach@0
|
25 #include "kmeans.h"
|
|
msach@0
|
26
|
|
msach@0
|
27 #include "SSR_lib/SSR.h"
|
|
msach@0
|
28
|
|
msach@0
|
29 #define PREC 300
|
|
msach@0
|
30
|
|
msach@0
|
31 #define START 1
|
|
msach@0
|
32 #define END 2
|
|
msach@0
|
33
|
|
msach@0
|
34 extern int nthreads; /* Thread count */
|
|
msach@0
|
35 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */
|
|
msach@0
|
36
|
|
msach@0
|
37 VirtProcr *parentVP;
|
|
msach@0
|
38
|
|
msach@0
|
39 /*
|
|
msach@0
|
40 * Struct: input
|
|
msach@0
|
41 * -------------
|
|
msach@0
|
42 * Encapsulates all the input data for the benchmark, i.e. the object list,
|
|
msach@0
|
43 * clustering output and the input statistics.
|
|
msach@0
|
44 */
|
|
msach@0
|
45 struct input {
|
|
msach@0
|
46 int t;
|
|
msach@0
|
47 double local_delta;
|
|
msach@0
|
48 double **objects; /* Object list */
|
|
msach@0
|
49 double **clusters; /* Cluster list, out: [numClusters][numCoords] */
|
|
msach@0
|
50 int *membership; /* For each object, contains the index of the cluster it currently belongs to */
|
|
msach@0
|
51 int **local_newClusterSize; /* Thread-safe, [nthreads][numClusters] */
|
|
msach@0
|
52 double ***local_newClusters; /* Thread-safe, [nthreads][numClusters][numCoords] */
|
|
msach@0
|
53 int numObjs,numClusters,numCoords;
|
|
msach@0
|
54 };
|
|
msach@0
|
55
|
|
msach@0
|
56 /*
|
|
msach@0
|
57 * Function: euclid_dist_2
|
|
msach@0
|
58 * -----------------------
|
|
msach@0
|
59 * Computes the square of the euclidean distance between two multi-dimensional points.
|
|
msach@0
|
60 */
|
|
msach@0
|
61 __inline static double euclid_dist_2(int numdims, double *coord1, double *coord2) {
|
|
msach@0
|
62 int i;
|
|
msach@0
|
63 double ans=0.0;
|
|
msach@0
|
64
|
|
msach@0
|
65 for (i=0; i<numdims; i++)
|
|
msach@0
|
66 ans += (coord1[i]-coord2[i]) * (coord1[i]-coord2[i]);
|
|
msach@0
|
67
|
|
msach@0
|
68 return(ans);
|
|
msach@0
|
69 }
|
|
msach@0
|
70
|
|
msach@0
|
71 /*
|
|
msach@0
|
72 * Function: find_nearest_cluster
|
|
msach@0
|
73 * ------------------------------
|
|
msach@0
|
74 * Function determining the cluster center which is closest to the given object.
|
|
msach@0
|
75 * Returns the index of that cluster center.
|
|
msach@0
|
76 */
|
|
msach@0
|
77 __inline static int find_nearest_cluster(int numClusters, int numCoords, double *object, double **clusters) {
|
|
msach@0
|
78 int index, i;
|
|
msach@0
|
79 double dist, min_dist;
|
|
msach@0
|
80
|
|
msach@0
|
81 /* find the cluster id that has min distance to object */
|
|
msach@0
|
82 index = 0;
|
|
msach@0
|
83 min_dist = euclid_dist_2(numCoords, object, clusters[0]);
|
|
msach@0
|
84 for (i=1; i<numClusters; i++) {
|
|
msach@0
|
85 dist = euclid_dist_2(numCoords, object, clusters[i]);
|
|
msach@0
|
86
|
|
msach@0
|
87 /* no need square root */
|
|
msach@0
|
88 if (dist < min_dist) { /* find the min and its array index */
|
|
msach@0
|
89 min_dist = dist;
|
|
msach@0
|
90 index = i;
|
|
msach@0
|
91 }
|
|
msach@0
|
92 }
|
|
msach@0
|
93 return index;
|
|
msach@0
|
94 }
|
|
msach@0
|
95
|
|
msach@0
|
96 void work(struct input *x, VirtProcr *VProc){
|
|
msach@0
|
97 int tid = x->t;
|
|
msach@0
|
98 x->local_delta=0;
|
|
msach@0
|
99 int i;
|
|
msach@0
|
100 for (i = tid; i < x->numObjs; i += nthreads) {
|
|
msach@0
|
101 /* find the array index of nearest cluster center */
|
|
msach@0
|
102 int index = find_nearest_cluster(x->numClusters, x->numCoords,
|
|
msach@0
|
103 x->objects[i], x->clusters);
|
|
msach@0
|
104
|
|
msach@0
|
105 /* if membership changes, increase delta by 1 */
|
|
msach@0
|
106 if (x->membership[i] != index)
|
|
msach@0
|
107 x->local_delta += 1.0;
|
|
msach@0
|
108 /* assign the membership to object i */
|
|
msach@0
|
109 x->membership[i] = index;
|
|
msach@0
|
110
|
|
msach@0
|
111 /* update new cluster centers : sum of all objects located
|
|
msach@0
|
112 within (average will be performed later) */
|
|
msach@0
|
113 x->local_newClusterSize[tid][index]++;
|
|
msach@0
|
114 int j;
|
|
msach@0
|
115 for (j=0; j < x->numCoords; j++)
|
|
msach@0
|
116 x->local_newClusters[tid][index][j] += x->objects[i][j];
|
|
msach@0
|
117
|
|
msach@0
|
118 }
|
|
msach@0
|
119 SSR__send_from_to((void*)&x->local_delta, VProc, parentVP);
|
|
msach@0
|
120 }
|
|
msach@0
|
121
|
|
msach@0
|
122 /*
|
|
msach@0
|
123 * Function: thread function work
|
|
msach@0
|
124 * --------------
|
|
msach@0
|
125 * Worker function for threading. Work distribution is done so that each thread computers
|
|
msach@0
|
126 */
|
|
msach@0
|
127 void tfwork(void *ip, VirtProcr *VProc)
|
|
msach@0
|
128 {
|
|
msach@0
|
129 struct input *x;
|
|
msach@0
|
130 x = (struct input *)ip;
|
|
msach@0
|
131
|
|
msach@0
|
132 for(;;){
|
|
msach@0
|
133 if(*((int*)SSR__receive_from_to(parentVP, VProc)) == END)
|
|
msach@0
|
134 break;
|
|
msach@0
|
135 else
|
|
msach@0
|
136 work(x, VProc);
|
|
msach@0
|
137 }
|
|
msach@0
|
138
|
|
msach@0
|
139 SSR__dissipate_procr(VProc);
|
|
msach@0
|
140 }
|
|
msach@0
|
141
|
|
msach@0
|
142 /*
|
|
msach@0
|
143 * Function: create_array_2d_f
|
|
msach@0
|
144 * --------------------------
|
|
msach@0
|
145 * Allocates memory for a 2-dim double array as needed for the algorithm.
|
|
msach@0
|
146 */
|
|
msach@0
|
147 double** create_array_2d_f(int height, int width, VirtProcr *VProc) {
|
|
msach@0
|
148 double** ptr;
|
|
msach@0
|
149 int i;
|
|
msach@0
|
150 ptr = SSR__malloc_to(height * sizeof(double*), VProc);
|
|
msach@0
|
151 assert(ptr != NULL);
|
|
msach@0
|
152 ptr[0] = SSR__malloc_to(width * height * sizeof(double), VProc);
|
|
msach@0
|
153 assert(ptr[0] != NULL);
|
|
msach@0
|
154 /* Assign pointers correctly */
|
|
msach@0
|
155 for(i = 1; i < height; i++)
|
|
msach@0
|
156 ptr[i] = ptr[i-1] + width;
|
|
msach@0
|
157 return ptr;
|
|
msach@0
|
158 }
|
|
msach@0
|
159
|
|
msach@0
|
160 /*
|
|
msach@0
|
161 * Function: create_array_2Dd_i
|
|
msach@0
|
162 * --------------------------
|
|
msach@0
|
163 * Allocates memory for a 2-dim integer array as needed for the algorithm.
|
|
msach@0
|
164 */
|
|
msach@0
|
165 int** create_array_2d_i(int height, int width, VirtProcr *VProc) {
|
|
msach@0
|
166 int** ptr;
|
|
msach@0
|
167 int i;
|
|
msach@0
|
168 ptr = SSR__malloc_to(height * sizeof(int*), VProc);
|
|
msach@0
|
169 assert(ptr != NULL);
|
|
msach@0
|
170 ptr[0] = SSR__malloc_to(width * height * sizeof(int), VProc);
|
|
msach@0
|
171 assert(ptr[0] != NULL);
|
|
msach@0
|
172 /* Assign pointers correctly */
|
|
msach@0
|
173 for(i = 1; i < height; i++)
|
|
msach@0
|
174 ptr[i] = ptr[i-1] + width;
|
|
msach@0
|
175 return ptr;
|
|
msach@0
|
176 }
|
|
msach@0
|
177
|
|
msach@0
|
178 /*
|
|
msach@0
|
179 * Function: pthreads_kmeans
|
|
msach@0
|
180 * -------------------------
|
|
msach@0
|
181 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords].
|
|
msach@0
|
182 */
|
|
msach@0
|
183 void kmeans(void *data, VirtProcr *VProc)
|
|
msach@0
|
184 {
|
|
msach@0
|
185 struct call_data *cluster_data = (struct call_data*)data;
|
|
msach@0
|
186 //int is_perform_atomic = cluster_data->is_perform_atomic; /* in: */
|
|
msach@0
|
187 double **objects = cluster_data->objects; /* in: [numObjs][numCoords] */
|
|
msach@0
|
188 int numCoords = cluster_data->numCoords; /* no. coordinates */
|
|
msach@0
|
189 int numObjs = cluster_data->numObjs; /* no. objects */
|
|
msach@0
|
190 int numClusters = cluster_data->numClusters; /* no. clusters */
|
|
msach@0
|
191 double threshold = cluster_data->threshold; /* % objects change membership */
|
|
msach@0
|
192 int *membership = cluster_data->membership; /* out: [numObjs] */
|
|
msach@0
|
193
|
|
msach@0
|
194 int i, j, k, loop = 0;
|
|
msach@0
|
195 int *newClusterSize; /* [numClusters]: no. objects assigned in each
|
|
msach@0
|
196 new cluster */
|
|
msach@0
|
197 double **clusters = cluster_data->clusters; /* out: [numClusters][numCoords] */
|
|
msach@0
|
198 double **newClusters; /* [numClusters][numCoords] */
|
|
msach@0
|
199 //double timing;
|
|
msach@0
|
200 int **local_newClusterSize; /* [nthreads][numClusters] */
|
|
msach@0
|
201 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */
|
|
msach@0
|
202
|
|
msach@0
|
203 VirtProcr **tasks;
|
|
msach@0
|
204 volatile int syncMsg;
|
|
msach@0
|
205
|
|
msach@0
|
206 parentVP = VProc;
|
|
msach@0
|
207
|
|
msach@0
|
208 /* === MEMORY SETUP === */
|
|
msach@0
|
209
|
|
msach@0
|
210 /* [numClusters] clusters of [numCoords] double coordinates each */
|
|
msach@0
|
211 //Set pointers
|
|
msach@0
|
212 for(i = 1; i < numClusters; i++)
|
|
msach@0
|
213 clusters[i] = clusters[i-1] + numCoords;
|
|
msach@0
|
214
|
|
msach@0
|
215 /* Pick first numClusters elements of objects[] as initial cluster centers */
|
|
msach@0
|
216 for (i=0; i < numClusters; i++)
|
|
msach@0
|
217 for (j=0; j < numCoords; j++)
|
|
msach@0
|
218 clusters[i][j] = objects[i][j];
|
|
msach@0
|
219
|
|
msach@0
|
220 /* Initialize membership, no object belongs to any cluster yet */
|
|
msach@0
|
221 for (i = 0; i < numObjs; i++)
|
|
msach@0
|
222 membership[i] = -1;
|
|
msach@0
|
223
|
|
msach@0
|
224 /* newClusterSize holds information on the count of members in each cluster */
|
|
msach@0
|
225 newClusterSize = (int*)SSR__malloc_to(numClusters * sizeof(int), VProc);
|
|
msach@0
|
226 assert(newClusterSize != NULL);
|
|
msach@0
|
227
|
|
msach@0
|
228 /* newClusters holds the coordinates of the freshly created clusters */
|
|
msach@0
|
229 newClusters = create_array_2d_f(numClusters, numCoords, VProc);
|
|
msach@0
|
230 local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc);
|
|
msach@0
|
231
|
|
msach@0
|
232 /* local_newClusters is a 3D array */
|
|
msach@0
|
233 local_newClusters = (double***)SSR__malloc_to(nthreads * sizeof(double**), VProc);
|
|
msach@0
|
234 assert(local_newClusters != NULL);
|
|
msach@0
|
235 local_newClusters[0] = (double**)SSR__malloc_to(nthreads * numClusters * sizeof(double*), VProc);
|
|
msach@0
|
236 assert(local_newClusters[0] != NULL);
|
|
msach@0
|
237
|
|
msach@0
|
238 /* Set up the pointers */
|
|
msach@0
|
239 for (i = 1; i < nthreads; i++)
|
|
msach@0
|
240 local_newClusters[i] = local_newClusters[i-1] + numClusters;
|
|
msach@0
|
241
|
|
msach@0
|
242 for (i = 0; i < nthreads; i++) {
|
|
msach@0
|
243 for (j = 0; j < numClusters; j++) {
|
|
msach@0
|
244 local_newClusters[i][j] = (double*)SSR__malloc_to(numCoords * sizeof(double), VProc);
|
|
msach@0
|
245 assert(local_newClusters[i][j] != NULL);
|
|
msach@0
|
246 }
|
|
msach@0
|
247 }
|
|
msach@0
|
248 /* Perform thread setup */
|
|
msach@0
|
249 tasks = (VirtProcr**)SSR__malloc_to(nthreads * sizeof(VirtProcr*), VProc);
|
|
msach@0
|
250
|
|
msach@0
|
251 struct input *ip = SSR__malloc_to(nthreads * sizeof(struct input), VProc);
|
|
msach@0
|
252 /* Provide thread-safe memory locations for each worker */
|
|
msach@0
|
253 for(i = 0; i < nthreads; i++){
|
|
msach@0
|
254 ip[i].t = i;
|
|
msach@0
|
255 ip[i].objects=objects;
|
|
msach@0
|
256 ip[i].clusters=clusters;
|
|
msach@0
|
257 ip[i].membership=membership;
|
|
msach@0
|
258 ip[i].local_newClusterSize=local_newClusterSize;
|
|
msach@0
|
259 ip[i].local_newClusters=local_newClusters;
|
|
msach@0
|
260 ip[i].numObjs=numObjs;
|
|
msach@0
|
261 ip[i].numClusters=numClusters;
|
|
msach@0
|
262 ip[i].numCoords=numCoords;
|
|
msach@0
|
263
|
|
msach@0
|
264 if(i!=0)
|
|
msach@0
|
265 tasks[i] = SSR__create_procr_with(tfwork, (void*)&ip[i], VProc);
|
|
msach@0
|
266 }
|
|
msach@0
|
267
|
|
msach@0
|
268 /* === COMPUTATIONAL PHASE === */
|
|
msach@0
|
269 syncMsg = START;
|
|
msach@0
|
270 do {
|
|
msach@0
|
271 delta = 0.0;
|
|
msach@0
|
272
|
|
msach@0
|
273 printf("start children 1\n");
|
|
msach@0
|
274 for(i=0; i<nthreads; i++)
|
|
msach@0
|
275 SSR__send_from_to((void*)&syncMsg, VProc, tasks[i]);
|
|
msach@0
|
276
|
|
msach@0
|
277 //let the children do the work
|
|
msach@0
|
278
|
|
msach@0
|
279 printf("receive results\n");
|
|
msach@0
|
280 for(i=0; i<nthreads; i++)
|
|
msach@0
|
281 delta += *(double*)SSR__receive_from_to(tasks[i],VProc);
|
|
msach@0
|
282 printf("start children 2\n");
|
|
msach@0
|
283 for(i=1; i<nthreads; i++)
|
|
msach@0
|
284 SSR__send_from_to((void*)&syncMsg, VProc, tasks[i]);
|
|
msach@0
|
285
|
|
msach@0
|
286 /* Let the main thread perform the array reduction */
|
|
msach@0
|
287 for (i = 0; i < numClusters; i++) {
|
|
msach@0
|
288 for (j = 0; j < nthreads; j++) {
|
|
msach@0
|
289 newClusterSize[i] += local_newClusterSize[j][i];
|
|
msach@0
|
290 local_newClusterSize[j][i] = 0.0;
|
|
msach@0
|
291 for (k = 0; k < numCoords; k++) {
|
|
msach@0
|
292 newClusters[i][k] += local_newClusters[j][i][k];
|
|
msach@0
|
293 local_newClusters[j][i][k] = 0.0;
|
|
msach@0
|
294 }
|
|
msach@0
|
295 }
|
|
msach@0
|
296 }
|
|
msach@0
|
297
|
|
msach@0
|
298 /* Average the sum and replace old cluster centers with newClusters */
|
|
msach@0
|
299 for (i = 0; i < numClusters; i++) {
|
|
msach@0
|
300 for (j = 0; j < numCoords; j++) {
|
|
msach@0
|
301 if (newClusterSize[i] > 1)
|
|
msach@0
|
302 clusters[i][j] = newClusters[i][j] / newClusterSize[i];
|
|
msach@0
|
303 newClusters[i][j] = 0.0; /* set back to 0 */
|
|
msach@0
|
304 }
|
|
msach@0
|
305 newClusterSize[i] = 0; /* set back to 0 */
|
|
msach@0
|
306 }
|
|
msach@0
|
307
|
|
msach@0
|
308 delta /= numObjs;
|
|
msach@0
|
309 } while (loop++ < PREC && delta > threshold);
|
|
msach@0
|
310
|
|
msach@0
|
311 // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program,
|
|
msach@0
|
312 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed
|
|
msach@0
|
313 // iterations, therefore making benchmarking completely unreliable.
|
|
msach@0
|
314
|
|
msach@0
|
315 syncMsg = END;
|
|
msach@0
|
316 for(i=0; i<nthreads; i++)
|
|
msach@0
|
317 SSR__send_from_to((void*)&syncMsg, VProc, tasks[i]);
|
|
msach@0
|
318
|
|
msach@0
|
319
|
|
msach@0
|
320 SSR__free(ip, VProc);
|
|
msach@0
|
321 SSR__free(tasks, VProc);
|
|
msach@0
|
322
|
|
msach@0
|
323 SSR__free(local_newClusterSize[0], VProc);
|
|
msach@0
|
324 SSR__free(local_newClusterSize, VProc);
|
|
msach@0
|
325
|
|
msach@0
|
326 for (i = 0; i < nthreads; i++)
|
|
msach@0
|
327 for (j = 0; j < numClusters; j++)
|
|
msach@0
|
328 SSR__free(local_newClusters[i][j], VProc);
|
|
msach@0
|
329 SSR__free(local_newClusters[0], VProc);
|
|
msach@0
|
330 SSR__free(local_newClusters, VProc);
|
|
msach@0
|
331
|
|
msach@0
|
332 SSR__free(newClusters[0], VProc);
|
|
msach@0
|
333 SSR__free(newClusters, VProc);
|
|
msach@0
|
334 SSR__free(newClusterSize, VProc);
|
|
msach@0
|
335
|
|
msach@0
|
336 (cluster_data)->clusters = clusters;
|
|
msach@0
|
337
|
|
msach@0
|
338 SSR__dissipate_procr(VProc);
|
|
msach@0
|
339 }
|
|
msach@0
|
340
|