msach@0: /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ msach@0: /* File: pthreads_kmeans.c (OpenMP version) */ msach@0: /* Description: Implementation of simple k-means clustering algorithm */ msach@0: /* This program takes an array of N data objects, each with */ msach@0: /* M coordinates and performs a k-means clustering given a */ msach@0: /* user-provided value of the number of clusters (K). The */ msach@0: /* clustering results are saved in 2 arrays: */ msach@0: /* 1. a returned array of size [K][N] indicating the center */ msach@0: /* coordinates of K clusters */ msach@0: /* 2. membership[N] stores the cluster center ids, each */ msach@0: /* corresponding to the cluster a data object is assigned */ msach@0: /* */ msach@0: /* Author: Wei-keng Liao */ msach@0: /* ECE Department, Northwestern University */ msach@0: /* email: wkliao@ece.northwestern.edu */ msach@0: /* Copyright, 2005, Wei-keng Liao */ msach@0: /* */ msach@0: /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ msach@0: msach@0: #include msach@0: #include msach@0: #include msach@0: #include msach@0: #include msach@0: #include "kmeans.h" msach@0: msach@0: #include "SSR_lib/SSR.h" msach@0: msach@0: #define PREC 300 msach@0: msach@0: #define START 1 msach@0: #define END 2 msach@0: msach@0: extern int nthreads; /* Thread count */ msach@0: double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */ msach@0: msach@0: VirtProcr *parentVP; msach@0: msach@0: /* msach@0: * Struct: input msach@0: * ------------- msach@0: * Encapsulates all the input data for the benchmark, i.e. the object list, msach@0: * clustering output and the input statistics. msach@0: */ msach@0: struct input { msach@0: int t; msach@0: double local_delta; msach@0: double **objects; /* Object list */ msach@0: double **clusters; /* Cluster list, out: [numClusters][numCoords] */ msach@0: int *membership; /* For each object, contains the index of the cluster it currently belongs to */ msach@0: int **local_newClusterSize; /* Thread-safe, [nthreads][numClusters] */ msach@0: double ***local_newClusters; /* Thread-safe, [nthreads][numClusters][numCoords] */ msach@0: int numObjs,numClusters,numCoords; msach@0: }; msach@0: msach@0: /* msach@0: * Function: euclid_dist_2 msach@0: * ----------------------- msach@0: * Computes the square of the euclidean distance between two multi-dimensional points. msach@0: */ msach@0: __inline static double euclid_dist_2(int numdims, double *coord1, double *coord2) { msach@0: int i; msach@0: double ans=0.0; msach@0: msach@0: for (i=0; it; msach@0: x->local_delta=0; msach@0: int i; msach@0: for (i = tid; i < x->numObjs; i += nthreads) { msach@0: /* find the array index of nearest cluster center */ msach@0: int index = find_nearest_cluster(x->numClusters, x->numCoords, msach@0: x->objects[i], x->clusters); msach@0: msach@0: /* if membership changes, increase delta by 1 */ msach@0: if (x->membership[i] != index) msach@0: x->local_delta += 1.0; msach@0: /* assign the membership to object i */ msach@0: x->membership[i] = index; msach@0: msach@0: /* update new cluster centers : sum of all objects located msach@0: within (average will be performed later) */ msach@0: x->local_newClusterSize[tid][index]++; msach@0: int j; msach@0: for (j=0; j < x->numCoords; j++) msach@0: x->local_newClusters[tid][index][j] += x->objects[i][j]; msach@0: msach@0: } msach@0: SSR__send_from_to((void*)&x->local_delta, VProc, parentVP); msach@0: } msach@0: msach@0: /* msach@0: * Function: thread function work msach@0: * -------------- msach@0: * Worker function for threading. Work distribution is done so that each thread computers msach@0: */ msach@0: void tfwork(void *ip, VirtProcr *VProc) msach@0: { msach@0: struct input *x; msach@0: x = (struct input *)ip; msach@0: msach@0: for(;;){ msach@0: if(*((int*)SSR__receive_from_to(parentVP, VProc)) == END) msach@0: break; msach@0: else msach@0: work(x, VProc); msach@0: } msach@0: msach@0: SSR__dissipate_procr(VProc); msach@0: } msach@0: msach@0: /* msach@0: * Function: create_array_2d_f msach@0: * -------------------------- msach@0: * Allocates memory for a 2-dim double array as needed for the algorithm. msach@0: */ msach@0: double** create_array_2d_f(int height, int width, VirtProcr *VProc) { msach@0: double** ptr; msach@0: int i; msach@0: ptr = SSR__malloc_to(height * sizeof(double*), VProc); msach@0: assert(ptr != NULL); msach@0: ptr[0] = SSR__malloc_to(width * height * sizeof(double), VProc); msach@0: assert(ptr[0] != NULL); msach@0: /* Assign pointers correctly */ msach@0: for(i = 1; i < height; i++) msach@0: ptr[i] = ptr[i-1] + width; msach@0: return ptr; msach@0: } msach@0: msach@0: /* msach@0: * Function: create_array_2Dd_i msach@0: * -------------------------- msach@0: * Allocates memory for a 2-dim integer array as needed for the algorithm. msach@0: */ msach@0: int** create_array_2d_i(int height, int width, VirtProcr *VProc) { msach@0: int** ptr; msach@0: int i; msach@0: ptr = SSR__malloc_to(height * sizeof(int*), VProc); msach@0: assert(ptr != NULL); msach@0: ptr[0] = SSR__malloc_to(width * height * sizeof(int), VProc); msach@0: assert(ptr[0] != NULL); msach@0: /* Assign pointers correctly */ msach@0: for(i = 1; i < height; i++) msach@0: ptr[i] = ptr[i-1] + width; msach@0: return ptr; msach@0: } msach@0: msach@0: /* msach@0: * Function: pthreads_kmeans msach@0: * ------------------------- msach@0: * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords]. msach@0: */ msach@0: void kmeans(void *data, VirtProcr *VProc) msach@0: { msach@0: struct call_data *cluster_data = (struct call_data*)data; msach@0: //int is_perform_atomic = cluster_data->is_perform_atomic; /* in: */ msach@0: double **objects = cluster_data->objects; /* in: [numObjs][numCoords] */ msach@0: int numCoords = cluster_data->numCoords; /* no. coordinates */ msach@0: int numObjs = cluster_data->numObjs; /* no. objects */ msach@0: int numClusters = cluster_data->numClusters; /* no. clusters */ msach@0: double threshold = cluster_data->threshold; /* % objects change membership */ msach@0: int *membership = cluster_data->membership; /* out: [numObjs] */ msach@0: msach@0: int i, j, k, loop = 0; msach@0: int *newClusterSize; /* [numClusters]: no. objects assigned in each msach@0: new cluster */ msach@0: double **clusters = cluster_data->clusters; /* out: [numClusters][numCoords] */ msach@0: double **newClusters; /* [numClusters][numCoords] */ msach@0: //double timing; msach@0: int **local_newClusterSize; /* [nthreads][numClusters] */ msach@0: double ***local_newClusters; /* [nthreads][numClusters][numCoords] */ msach@0: msach@0: VirtProcr **tasks; msach@0: volatile int syncMsg; msach@0: msach@0: parentVP = VProc; msach@0: msach@0: /* === MEMORY SETUP === */ msach@0: msach@0: /* [numClusters] clusters of [numCoords] double coordinates each */ msach@0: //Set pointers msach@0: for(i = 1; i < numClusters; i++) msach@0: clusters[i] = clusters[i-1] + numCoords; msach@0: msach@0: /* Pick first numClusters elements of objects[] as initial cluster centers */ msach@0: for (i=0; i < numClusters; i++) msach@0: for (j=0; j < numCoords; j++) msach@0: clusters[i][j] = objects[i][j]; msach@0: msach@0: /* Initialize membership, no object belongs to any cluster yet */ msach@0: for (i = 0; i < numObjs; i++) msach@0: membership[i] = -1; msach@0: msach@0: /* newClusterSize holds information on the count of members in each cluster */ msach@0: newClusterSize = (int*)SSR__malloc_to(numClusters * sizeof(int), VProc); msach@0: assert(newClusterSize != NULL); msach@0: msach@0: /* newClusters holds the coordinates of the freshly created clusters */ msach@0: newClusters = create_array_2d_f(numClusters, numCoords, VProc); msach@0: local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc); msach@0: msach@0: /* local_newClusters is a 3D array */ msach@0: local_newClusters = (double***)SSR__malloc_to(nthreads * sizeof(double**), VProc); msach@0: assert(local_newClusters != NULL); msach@0: local_newClusters[0] = (double**)SSR__malloc_to(nthreads * numClusters * sizeof(double*), VProc); msach@0: assert(local_newClusters[0] != NULL); msach@0: msach@0: /* Set up the pointers */ msach@0: for (i = 1; i < nthreads; i++) msach@0: local_newClusters[i] = local_newClusters[i-1] + numClusters; msach@0: msach@0: for (i = 0; i < nthreads; i++) { msach@0: for (j = 0; j < numClusters; j++) { msach@0: local_newClusters[i][j] = (double*)SSR__malloc_to(numCoords * sizeof(double), VProc); msach@0: assert(local_newClusters[i][j] != NULL); msach@0: } msach@0: } msach@0: /* Perform thread setup */ msach@0: tasks = (VirtProcr**)SSR__malloc_to(nthreads * sizeof(VirtProcr*), VProc); msach@0: msach@0: struct input *ip = SSR__malloc_to(nthreads * sizeof(struct input), VProc); msach@0: /* Provide thread-safe memory locations for each worker */ msach@0: for(i = 0; i < nthreads; i++){ msach@0: ip[i].t = i; msach@0: ip[i].objects=objects; msach@0: ip[i].clusters=clusters; msach@0: ip[i].membership=membership; msach@0: ip[i].local_newClusterSize=local_newClusterSize; msach@0: ip[i].local_newClusters=local_newClusters; msach@0: ip[i].numObjs=numObjs; msach@0: ip[i].numClusters=numClusters; msach@0: ip[i].numCoords=numCoords; msach@0: msach@0: if(i!=0) msach@0: tasks[i] = SSR__create_procr_with(tfwork, (void*)&ip[i], VProc); msach@0: } msach@0: msach@0: /* === COMPUTATIONAL PHASE === */ msach@0: syncMsg = START; msach@0: do { msach@0: delta = 0.0; msach@0: msach@0: printf("start children 1\n"); msach@0: for(i=0; i 1) msach@0: clusters[i][j] = newClusters[i][j] / newClusterSize[i]; msach@0: newClusters[i][j] = 0.0; /* set back to 0 */ msach@0: } msach@0: newClusterSize[i] = 0; /* set back to 0 */ msach@0: } msach@0: msach@0: delta /= numObjs; msach@0: } while (loop++ < PREC && delta > threshold); msach@0: msach@0: // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program, msach@0: // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed msach@0: // iterations, therefore making benchmarking completely unreliable. msach@0: msach@0: syncMsg = END; msach@0: for(i=0; iclusters = clusters; msach@0: msach@0: SSR__dissipate_procr(VProc); msach@0: } msach@0: