# HG changeset patch # User Merten Sach # Date 1313519575 -7200 # Node ID 8e7bdab2840f8ddcc93b4dc0b4a3d384445a7110 # Parent e69e4c2d612aff86b1d39db3be5d1020f8a20b67 VPThread version workinh diff -r e69e4c2d612a -r 8e7bdab2840f .hgignore --- a/.hgignore Wed Aug 03 19:30:34 2011 +0200 +++ b/.hgignore Tue Aug 16 20:32:55 2011 +0200 @@ -1,5 +1,8 @@ syntax: glob +histograms +kmeans +out nbproject c-ray-mt *.ppm diff -r e69e4c2d612a -r 8e7bdab2840f Makefile --- a/Makefile Wed Aug 03 19:30:34 2011 +0200 +++ b/Makefile Tue Aug 16 20:32:55 2011 +0200 @@ -1,5 +1,5 @@ CC = gcc -CFLAGS = -m64 -ffast-math -fwrapv -fno-omit-frame-pointer -O0 -D VPTHREAD -D APPLICATION=KMEANS -g -Wall +CFLAGS = -m64 -ffast-math -fwrapv -fno-omit-frame-pointer -O3 -D VPTHREAD -D APPLICATION=KMEANS -g -Wall LDFLAGS = LIBS = -lm -lpthread diff -r e69e4c2d612a -r 8e7bdab2840f kmeans.h --- a/kmeans.h Wed Aug 03 19:30:34 2011 +0200 +++ b/kmeans.h Tue Aug 16 20:32:55 2011 +0200 @@ -13,8 +13,20 @@ #define _H_KMEANS #include +#include "VPThread_lib/VPThread.h" -double** pthreads_kmeans(int, double**, int, int, int, double, int*); +struct call_data{ + int is_perform_atomic; /* in: */ + double **objects; /* in: [numObjs][numCoords] */ + int numCoords; /* no. coordinates */ + int numObjs; /* no. objects */ + int numClusters; /* no. clusters */ + double threshold; /* % objects change membership */ + int *membership; + double **clusters; +}; + +void pthreads_kmeans(void *data, VirtProcr *VProc); double** file_read(int, char*, int*, int*); diff -r e69e4c2d612a -r 8e7bdab2840f pthreads_kmeans.c --- a/pthreads_kmeans.c Wed Aug 03 19:30:34 2011 +0200 +++ b/pthreads_kmeans.c Tue Aug 16 20:32:55 2011 +0200 @@ -24,19 +24,54 @@ #include #include "kmeans.h" +#include "VPThread_lib/VPThread.h" + #define PREC 300 -char __ProgrammName[] = "kmeans"; -char __DataSet[255]; +struct barrier_t +{ + int counter; + int nthreads; + int32 mutex; + int32 cond; +}; +typedef struct barrier_t barrier; extern int nthreads; /* Thread count */ double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */ volatile int finished; -pthread_barrier_t barr; -pthread_mutex_t lock1; +barrier barr; +int32 lock1; pthread_attr_t attr; +void inline barrier_init(barrier *barr, int nthreads, VirtProcr *VProc) +{ + barr->counter = 0; + barr->nthreads = nthreads; + barr->mutex = VPThread__make_mutex(VProc); + barr->cond = VPThread__make_cond(barr->mutex, VProc); +} + +void inline barrier_wait(barrier *barr, VirtProcr *VProc) +{ + int i; + + VPThread__mutex_lock(barr->mutex, VProc); + barr->counter++; + if(barr->counter == barr->nthreads) + { + barr->counter = 0; + for(i=0; i < barr->nthreads; i++) + VPThread__cond_signal(barr->cond, VProc); + } + else + { + VPThread__cond_wait(barr->cond, VProc); + } + VPThread__mutex_unlock(barr->mutex, VProc); +} + /* * Struct: input * ------------- @@ -93,7 +128,7 @@ return index; } -void work(struct input *x){ +void work(struct input *x, VirtProcr *VProc){ int tid = x->t; double local_delta=0; int i; @@ -116,30 +151,32 @@ x->local_newClusters[tid][index][j] += x->objects[i][j]; } - pthread_mutex_lock(&lock1); + VPThread__mutex_lock(lock1, VProc); delta +=local_delta; - pthread_mutex_unlock(&lock1); + VPThread__mutex_unlock(lock1, VProc); } + /* * Function: thread function work * -------------- * Worker function for threading. Work distribution is done so that each thread computers */ -void* tfwork(void *ip) +void tfwork(void *ip, VirtProcr *VProc) { struct input *x; x = (struct input *)ip; for(;;){ - pthread_barrier_wait(&barr); + barrier_wait(&barr, VProc); if (finished){ break; } - work(x); - pthread_barrier_wait(&barr); + work(x, VProc); + barrier_wait(&barr, VProc); } - pthread_exit(NULL); + //pthread_exit(NULL); + VPThread__dissipate_thread(VProc); } /* @@ -147,12 +184,12 @@ * -------------------------- * Allocates memory for a 2-dim double array as needed for the algorithm. */ -double** create_array_2d_f(int height, int width) { +double** create_array_2d_f(int height, int width, VirtProcr *VProc) { double** ptr; int i; - ptr = calloc(height, sizeof(double*)); + ptr = VPThread__malloc(height * sizeof(double*), VProc); assert(ptr != NULL); - ptr[0] = calloc(width * height, sizeof(double)); + ptr[0] = VPThread__malloc(width * height * sizeof(double), VProc); assert(ptr[0] != NULL); /* Assign pointers correctly */ for(i = 1; i < height; i++) @@ -165,12 +202,12 @@ * -------------------------- * Allocates memory for a 2-dim integer array as needed for the algorithm. */ -int** create_array_2d_i(int height, int width) { +int** create_array_2d_i(int height, int width, VirtProcr *VProc) { int** ptr; int i; - ptr = calloc(height, sizeof(int*)); + ptr = VPThread__malloc(height * sizeof(int*), VProc); assert(ptr != NULL); - ptr[0] = calloc(width * height, sizeof(int)); + ptr[0] = VPThread__malloc(width * height * sizeof(int), VProc); assert(ptr[0] != NULL); /* Assign pointers correctly */ for(i = 1; i < height; i++) @@ -183,30 +220,34 @@ * ------------------------- * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords]. */ -double** pthreads_kmeans(int is_perform_atomic, /* in: */ - double **objects, /* in: [numObjs][numCoords] */ - int numCoords, /* no. coordinates */ - int numObjs, /* no. objects */ - int numClusters, /* no. clusters */ - double threshold, /* % objects change membership */ - int *membership) /* out: [numObjs] */ +void pthreads_kmeans(void *data, VirtProcr *VProc) { + struct call_data *cluster_data = (struct call_data*)data; + //int is_perform_atomic = cluster_data->is_perform_atomic; /* in: */ + double **objects = cluster_data->objects; /* in: [numObjs][numCoords] */ + int numCoords = cluster_data->numCoords; /* no. coordinates */ + int numObjs = cluster_data->numObjs; /* no. objects */ + int numClusters = cluster_data->numClusters; /* no. clusters */ + double threshold = cluster_data->threshold; /* % objects change membership */ + int *membership = cluster_data->membership; /* out: [numObjs] */ - int i, j, k, index, loop = 0, rc; + int i, j, k, loop = 0; int *newClusterSize; /* [numClusters]: no. objects assigned in each new cluster */ - double **clusters; /* out: [numClusters][numCoords] */ + double **clusters = cluster_data->clusters; /* out: [numClusters][numCoords] */ double **newClusters; /* [numClusters][numCoords] */ - double timing; + //double timing; int **local_newClusterSize; /* [nthreads][numClusters] */ double ***local_newClusters; /* [nthreads][numClusters][numCoords] */ - pthread_t *thread; + VirtProcr **thread; /* === MEMORY SETUP === */ /* [numClusters] clusters of [numCoords] double coordinates each */ - clusters = create_array_2d_f(numClusters, numCoords); + //Set pointers + for(i = 1; i < numClusters; i++) + clusters[i] = clusters[i-1] + numCoords; /* Pick first numClusters elements of objects[] as initial cluster centers */ for (i=0; i < numClusters; i++) @@ -218,17 +259,17 @@ membership[i] = -1; /* newClusterSize holds information on the count of members in each cluster */ - newClusterSize = (int*)calloc(numClusters, sizeof(int)); + newClusterSize = (int*)VPThread__malloc(numClusters * sizeof(int), VProc); assert(newClusterSize != NULL); /* newClusters holds the coordinates of the freshly created clusters */ - newClusters = create_array_2d_f(numClusters, numCoords); - local_newClusterSize = create_array_2d_i(nthreads, numClusters); + newClusters = create_array_2d_f(numClusters, numCoords, VProc); + local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc); /* local_newClusters is a 3D array */ - local_newClusters = (double***)malloc(nthreads * sizeof(double**)); + local_newClusters = (double***)VPThread__malloc(nthreads * sizeof(double**), VProc); assert(local_newClusters != NULL); - local_newClusters[0] = (double**) malloc(nthreads * numClusters * sizeof(double*)); + local_newClusters[0] = (double**) VPThread__malloc(nthreads * numClusters * sizeof(double*), VProc); assert(local_newClusters[0] != NULL); /* Set up the pointers */ @@ -237,21 +278,18 @@ for (i = 0; i < nthreads; i++) { for (j = 0; j < numClusters; j++) { - local_newClusters[i][j] = (double*)calloc(numCoords, sizeof(double)); + local_newClusters[i][j] = (double*)VPThread__malloc(numCoords * sizeof(double), VProc); assert(local_newClusters[i][j] != NULL); } } /* Perform thread setup */ - thread = (pthread_t*)calloc(nthreads, sizeof(pthread_t)); + thread = (VirtProcr**)VPThread__malloc(nthreads * sizeof(VirtProcr*), VProc); - printf("nthreads %d\n", nthreads); - pthread_barrier_init(&barr, NULL, nthreads); - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_mutex_init(&lock1, NULL); + barrier_init(&barr, nthreads, VProc); + lock1 = VPThread__make_mutex(VProc); finished=0; - struct input *ip = malloc(nthreads * sizeof(struct input)); + struct input *ip = VPThread__malloc(nthreads * sizeof(struct input), VProc); /* Provide thread-safe memory locations for each worker */ for(i = 0; i < nthreads; i++){ ip[i].t = i; @@ -265,11 +303,7 @@ ip[i].numCoords=numCoords; if (i>0){ - rc = pthread_create(&thread[i], &attr, tfwork, (void *)&ip[i]); - if (rc) { - fprintf(stderr, "ERROR: Return Code For Thread Creation Is %d\n", rc); - exit(EXIT_FAILURE); - } + thread[i] = VPThread__create_thread(tfwork, (void*)&ip[i], VProc); } } @@ -277,10 +311,10 @@ do { delta = 0.0; - pthread_barrier_wait(&barr); - work(&ip[0]); + barrier_wait(&barr, VProc); + work(&ip[0], VProc); - pthread_barrier_wait(&barr); + barrier_wait(&barr, VProc); /* Let the main thread perform the array reduction */ for (i = 0; i < numClusters; i++) { for (j = 0; j < nthreads; j++) { @@ -306,39 +340,32 @@ delta /= numObjs; } while (loop++ < PREC && delta > threshold); - // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program, + // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program, // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed // iterations, therefore making benchmarking completely unreliable. - finished=1; - pthread_barrier_wait(&barr); + finished=1; + barrier_wait(&barr, VProc); - for(i = 1; i < nthreads; i++) { - rc = pthread_join(thread[i], NULL); - if (rc) { - fprintf(stderr, "ERROR: Return Code For Thread Join Is %d\n", rc); - exit(EXIT_FAILURE); - } - } - - free(ip); - free(thread); - pthread_barrier_destroy(&barr); - pthread_mutex_destroy(&lock1); - pthread_attr_destroy(&attr); - free(local_newClusterSize[0]); - free(local_newClusterSize); + VPThread__free(ip, VProc); + VPThread__free(thread, VProc); + + VPThread__free(local_newClusterSize[0], VProc); + VPThread__free(local_newClusterSize, VProc); for (i = 0; i < nthreads; i++) for (j = 0; j < numClusters; j++) - free(local_newClusters[i][j]); - free(local_newClusters[0]); - free(local_newClusters); + VPThread__free(local_newClusters[i][j], VProc); + VPThread__free(local_newClusters[0], VProc); + VPThread__free(local_newClusters, VProc); - free(newClusters[0]); - free(newClusters); - free(newClusterSize); - return clusters; + VPThread__free(newClusters[0], VProc); + VPThread__free(newClusters, VProc); + VPThread__free(newClusterSize, VProc); + + (cluster_data)->clusters = clusters; + + VPThread__dissipate_thread(VProc); } diff -r e69e4c2d612a -r 8e7bdab2840f pthreads_main.c --- a/pthreads_main.c Wed Aug 03 19:30:34 2011 +0200 +++ b/pthreads_main.c Tue Aug 16 20:32:55 2011 +0200 @@ -27,6 +27,11 @@ #include #include "kmeans.h" +#include "VPThread_lib/VPThread.h" + +char __ProgrammName[] = "kmeans"; +char __DataSet[255]; + #define seconds(tm) gettimeofday(&tp,(struct timezone *)0);\ tm=tp.tv_sec+tp.tv_usec/1000000.0 @@ -56,7 +61,7 @@ int opt; extern char *optarg; extern int optind; - int i, j; + int j; int isBinaryFile; int *membership; /* [numObjs] */ @@ -108,10 +113,17 @@ membership = (int*) malloc(numObjs * sizeof(int)); assert(membership != NULL); + + clusters = malloc(numClusters * sizeof(double*)); + assert(clusters != NULL); + clusters[0] = malloc(numClusters * numCoords * sizeof(double)); + assert(clusters[0] != NULL); - /* Launch the core computation algorithm */ - clusters = pthreads_kmeans(0, objects, numCoords, numObjs, - numClusters, threshold, membership); + struct call_data data = { 0, objects, numCoords, numObjs, + numClusters, threshold, membership, clusters }; + + /* Launch the core computation algorithm */ + VPThread__create_seed_procr_and_do_work(pthreads_kmeans, (void*)&data); free(objects[0]); free(objects);