changeset 1:8e7bdab2840f

VPThread version workinh
author Merten Sach <msach@mailbox.tu-berlin.de>
date Tue, 16 Aug 2011 20:32:55 +0200
parents e69e4c2d612a
children 467746c73fd0
files .hgignore Makefile kmeans.h pthreads_kmeans.c pthreads_main.c
diffstat 5 files changed, 136 insertions(+), 82 deletions(-) [+]
line diff
     1.1 --- a/.hgignore	Wed Aug 03 19:30:34 2011 +0200
     1.2 +++ b/.hgignore	Tue Aug 16 20:32:55 2011 +0200
     1.3 @@ -1,5 +1,8 @@
     1.4  syntax: glob
     1.5  
     1.6 +histograms
     1.7 +kmeans
     1.8 +out
     1.9  nbproject
    1.10  c-ray-mt
    1.11  *.ppm
     2.1 --- a/Makefile	Wed Aug 03 19:30:34 2011 +0200
     2.2 +++ b/Makefile	Tue Aug 16 20:32:55 2011 +0200
     2.3 @@ -1,5 +1,5 @@
     2.4  CC = gcc
     2.5 -CFLAGS = -m64 -ffast-math -fwrapv -fno-omit-frame-pointer -O0 -D VPTHREAD -D APPLICATION=KMEANS -g -Wall
     2.6 +CFLAGS = -m64 -ffast-math -fwrapv -fno-omit-frame-pointer -O3 -D VPTHREAD -D APPLICATION=KMEANS -g -Wall
     2.7  LDFLAGS = 
     2.8  
     2.9  LIBS = -lm -lpthread
     3.1 --- a/kmeans.h	Wed Aug 03 19:30:34 2011 +0200
     3.2 +++ b/kmeans.h	Tue Aug 16 20:32:55 2011 +0200
     3.3 @@ -13,8 +13,20 @@
     3.4  #define _H_KMEANS
     3.5  
     3.6  #include <assert.h>
     3.7 +#include "VPThread_lib/VPThread.h"
     3.8  
     3.9 -double** pthreads_kmeans(int, double**, int, int, int, double, int*);
    3.10 +struct call_data{
    3.11 +    int is_perform_atomic; 	/* in: */
    3.12 +    double **objects;          	/* in: [numObjs][numCoords] */
    3.13 +    int     numCoords;         	/* no. coordinates */
    3.14 +    int     numObjs;           	/* no. objects */
    3.15 +    int     numClusters;       	/* no. clusters */
    3.16 +    double   threshold;         	/* % objects change membership */
    3.17 +    int    *membership;
    3.18 +    double **clusters;
    3.19 +};
    3.20 +
    3.21 +void pthreads_kmeans(void *data, VirtProcr *VProc);
    3.22  
    3.23  double** file_read(int, char*, int*, int*);
    3.24  
     4.1 --- a/pthreads_kmeans.c	Wed Aug 03 19:30:34 2011 +0200
     4.2 +++ b/pthreads_kmeans.c	Tue Aug 16 20:32:55 2011 +0200
     4.3 @@ -24,19 +24,54 @@
     4.4  #include <math.h>
     4.5  #include "kmeans.h"
     4.6  
     4.7 +#include "VPThread_lib/VPThread.h"
     4.8 +
     4.9  #define PREC 300
    4.10  
    4.11 -char __ProgrammName[] = "kmeans";
    4.12 -char __DataSet[255];
    4.13 +struct barrier_t
    4.14 +{
    4.15 +    int counter;
    4.16 +    int nthreads;
    4.17 +    int32 mutex;
    4.18 +    int32 cond;
    4.19 +};
    4.20 +typedef struct barrier_t barrier;
    4.21  
    4.22  extern int nthreads; /* Thread count */
    4.23  double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */
    4.24  volatile int finished;
    4.25  
    4.26 -pthread_barrier_t barr;
    4.27 -pthread_mutex_t lock1;
    4.28 +barrier barr;
    4.29 +int32 lock1;
    4.30  pthread_attr_t attr;
    4.31  
    4.32 +void inline barrier_init(barrier *barr, int nthreads, VirtProcr *VProc)
    4.33 +{
    4.34 +    barr->counter = 0;
    4.35 +    barr->nthreads = nthreads;
    4.36 +    barr->mutex   = VPThread__make_mutex(VProc);
    4.37 +    barr->cond    = VPThread__make_cond(barr->mutex, VProc);
    4.38 +}
    4.39 +
    4.40 +void inline barrier_wait(barrier *barr, VirtProcr *VProc)
    4.41 +{
    4.42 +    int i;
    4.43 +    
    4.44 +    VPThread__mutex_lock(barr->mutex, VProc);
    4.45 +    barr->counter++;
    4.46 +    if(barr->counter == barr->nthreads)
    4.47 +    {
    4.48 +        barr->counter = 0;
    4.49 +        for(i=0; i < barr->nthreads; i++)
    4.50 +            VPThread__cond_signal(barr->cond, VProc);
    4.51 +    }
    4.52 +    else
    4.53 +    {
    4.54 +        VPThread__cond_wait(barr->cond, VProc);
    4.55 +    }
    4.56 +    VPThread__mutex_unlock(barr->mutex, VProc);
    4.57 +}
    4.58 +
    4.59  /*
    4.60  *	Struct: input
    4.61  *	-------------
    4.62 @@ -93,7 +128,7 @@
    4.63      return index;
    4.64  }
    4.65  
    4.66 -void work(struct input *x){
    4.67 +void work(struct input *x, VirtProcr *VProc){
    4.68      int tid = x->t;
    4.69      double local_delta=0;
    4.70      int i;
    4.71 @@ -116,30 +151,32 @@
    4.72              x->local_newClusters[tid][index][j] += x->objects[i][j];
    4.73  
    4.74      }
    4.75 -    pthread_mutex_lock(&lock1);
    4.76 +    VPThread__mutex_lock(lock1, VProc);
    4.77      delta +=local_delta;
    4.78 -    pthread_mutex_unlock(&lock1);
    4.79 +    VPThread__mutex_unlock(lock1, VProc);
    4.80  }
    4.81 +
    4.82  /*
    4.83  *	Function: thread function work
    4.84  *	--------------
    4.85  *	Worker function for threading. Work distribution is done so that each thread computers
    4.86  */
    4.87 -void* tfwork(void *ip)
    4.88 +void tfwork(void *ip, VirtProcr *VProc)
    4.89  {
    4.90      struct input *x;
    4.91      x = (struct input *)ip;    
    4.92      
    4.93      for(;;){
    4.94 -        pthread_barrier_wait(&barr);
    4.95 +        barrier_wait(&barr, VProc);
    4.96          if (finished){            
    4.97              break;
    4.98          }
    4.99 -        work(x);
   4.100 -        pthread_barrier_wait(&barr);
   4.101 +        work(x, VProc);
   4.102 +        barrier_wait(&barr, VProc);
   4.103      }
   4.104      
   4.105 -	pthread_exit(NULL);
   4.106 +	//pthread_exit(NULL);
   4.107 +    VPThread__dissipate_thread(VProc);
   4.108  }
   4.109  
   4.110  /*
   4.111 @@ -147,12 +184,12 @@
   4.112  *	--------------------------
   4.113  *	Allocates memory for a 2-dim double array as needed for the algorithm.
   4.114  */
   4.115 -double** create_array_2d_f(int height, int width) {
   4.116 +double** create_array_2d_f(int height, int width, VirtProcr *VProc) {
   4.117  	double** ptr;
   4.118  	int i;
   4.119 -	ptr = calloc(height, sizeof(double*));
   4.120 +	ptr = VPThread__malloc(height * sizeof(double*), VProc);
   4.121  	assert(ptr != NULL);
   4.122 -	ptr[0] = calloc(width * height, sizeof(double));
   4.123 +	ptr[0] = VPThread__malloc(width * height * sizeof(double), VProc);
   4.124  	assert(ptr[0] != NULL);
   4.125  	/* Assign pointers correctly */
   4.126  	for(i = 1; i < height; i++)
   4.127 @@ -165,12 +202,12 @@
   4.128  *	--------------------------
   4.129  *	Allocates memory for a 2-dim integer array as needed for the algorithm.
   4.130  */
   4.131 -int** create_array_2d_i(int height, int width) {
   4.132 +int** create_array_2d_i(int height, int width, VirtProcr *VProc) {
   4.133  	int** ptr;
   4.134  	int i;
   4.135 -	ptr = calloc(height, sizeof(int*));
   4.136 +	ptr = VPThread__malloc(height * sizeof(int*), VProc);
   4.137  	assert(ptr != NULL);
   4.138 -	ptr[0] = calloc(width * height, sizeof(int));
   4.139 +	ptr[0] = VPThread__malloc(width * height * sizeof(int), VProc);
   4.140  	assert(ptr[0] != NULL);
   4.141  	/* Assign pointers correctly */
   4.142  	for(i = 1; i < height; i++)
   4.143 @@ -183,30 +220,34 @@
   4.144  *	-------------------------
   4.145  *	Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords].
   4.146  */
   4.147 -double** pthreads_kmeans(int is_perform_atomic, 	/* in: */
   4.148 -                   double **objects,           	/* in: [numObjs][numCoords] */
   4.149 -                   int     numCoords,         	/* no. coordinates */
   4.150 -                   int     numObjs,           	/* no. objects */
   4.151 -                   int     numClusters,       	/* no. clusters */
   4.152 -                   double   threshold,         	/* % objects change membership */
   4.153 -                   int    *membership)        	/* out: [numObjs] */
   4.154 +void pthreads_kmeans(void *data, VirtProcr *VProc)
   4.155  {
   4.156 +    struct call_data *cluster_data = (struct call_data*)data;
   4.157 +    //int is_perform_atomic = cluster_data->is_perform_atomic;	/* in: */
   4.158 +    double **objects      = cluster_data->objects;           	/* in: [numObjs][numCoords] */
   4.159 +    int     numCoords     = cluster_data->numCoords;         	/* no. coordinates */
   4.160 +    int     numObjs       = cluster_data->numObjs;           	/* no. objects */
   4.161 +    int     numClusters   = cluster_data->numClusters;     	/* no. clusters */
   4.162 +    double  threshold     = cluster_data->threshold;         	/* % objects change membership */
   4.163 +    int    *membership    = cluster_data->membership;        	/* out: [numObjs] */ 
   4.164  
   4.165 -    int      i, j, k, index, loop = 0, rc;
   4.166 +    int      i, j, k, loop = 0;
   4.167      int     *newClusterSize; /* [numClusters]: no. objects assigned in each
   4.168                                  new cluster */
   4.169 -    double  **clusters;       /* out: [numClusters][numCoords] */
   4.170 +    double  **clusters    = cluster_data->clusters;       /* out: [numClusters][numCoords] */
   4.171      double  **newClusters;    /* [numClusters][numCoords] */
   4.172 -    double   timing;
   4.173 +    //double   timing;
   4.174      int    **local_newClusterSize; /* [nthreads][numClusters] */
   4.175      double ***local_newClusters;    /* [nthreads][numClusters][numCoords] */
   4.176  
   4.177 -	pthread_t *thread;
   4.178 +	VirtProcr **thread;
   4.179  
   4.180  	/* === MEMORY SETUP === */
   4.181  
   4.182  	/* [numClusters] clusters of [numCoords] double coordinates each */
   4.183 -	clusters = create_array_2d_f(numClusters, numCoords);
   4.184 +        //Set pointers
   4.185 +        for(i = 1; i < numClusters; i++)
   4.186 +		clusters[i] = clusters[i-1] + numCoords; 
   4.187  
   4.188      /* Pick first numClusters elements of objects[] as initial cluster centers */
   4.189      for (i=0; i < numClusters; i++)
   4.190 @@ -218,17 +259,17 @@
   4.191  		membership[i] = -1;
   4.192  
   4.193  	/* newClusterSize holds information on the count of members in each cluster */
   4.194 -    newClusterSize = (int*)calloc(numClusters, sizeof(int));
   4.195 +    newClusterSize = (int*)VPThread__malloc(numClusters * sizeof(int), VProc);
   4.196      assert(newClusterSize != NULL);
   4.197  
   4.198  	/* newClusters holds the coordinates of the freshly created clusters */
   4.199 -	newClusters = create_array_2d_f(numClusters, numCoords);
   4.200 -	local_newClusterSize = create_array_2d_i(nthreads, numClusters);
   4.201 +	newClusters = create_array_2d_f(numClusters, numCoords, VProc);
   4.202 +	local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc);
   4.203  
   4.204      /* local_newClusters is a 3D array */
   4.205 -    local_newClusters    = (double***)malloc(nthreads * sizeof(double**));
   4.206 +    local_newClusters    = (double***)VPThread__malloc(nthreads * sizeof(double**), VProc);
   4.207      assert(local_newClusters != NULL);
   4.208 -    local_newClusters[0] = (double**) malloc(nthreads * numClusters * sizeof(double*));
   4.209 +    local_newClusters[0] = (double**) VPThread__malloc(nthreads * numClusters * sizeof(double*), VProc);
   4.210      assert(local_newClusters[0] != NULL);
   4.211  
   4.212  	/* Set up the pointers */
   4.213 @@ -237,21 +278,18 @@
   4.214  
   4.215      for (i = 0; i < nthreads; i++) {
   4.216          for (j = 0; j < numClusters; j++) {
   4.217 -            local_newClusters[i][j] = (double*)calloc(numCoords, sizeof(double));
   4.218 +            local_newClusters[i][j] = (double*)VPThread__malloc(numCoords * sizeof(double), VProc);
   4.219              assert(local_newClusters[i][j] != NULL);
   4.220          }
   4.221      }
   4.222      /* Perform thread setup */
   4.223 -    thread = (pthread_t*)calloc(nthreads, sizeof(pthread_t));
   4.224 +    thread = (VirtProcr**)VPThread__malloc(nthreads * sizeof(VirtProcr*), VProc);
   4.225  
   4.226 -    printf("nthreads %d\n", nthreads);
   4.227 -    pthread_barrier_init(&barr, NULL, nthreads);
   4.228 -    pthread_attr_init(&attr);
   4.229 -    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
   4.230 -    pthread_mutex_init(&lock1, NULL);
   4.231 +    barrier_init(&barr, nthreads, VProc);
   4.232 +    lock1 = VPThread__make_mutex(VProc);
   4.233      finished=0;
   4.234      
   4.235 -    struct input *ip = malloc(nthreads * sizeof(struct input));
   4.236 +    struct input *ip = VPThread__malloc(nthreads * sizeof(struct input), VProc);
   4.237      /* Provide thread-safe memory locations for each worker */
   4.238      for(i = 0; i < nthreads; i++){
   4.239          ip[i].t = i;
   4.240 @@ -265,11 +303,7 @@
   4.241          ip[i].numCoords=numCoords;
   4.242  
   4.243          if (i>0){
   4.244 -            rc = pthread_create(&thread[i], &attr, tfwork, (void *)&ip[i]);
   4.245 -            if (rc) {
   4.246 -                fprintf(stderr, "ERROR: Return Code For Thread Creation Is %d\n", rc);
   4.247 -                exit(EXIT_FAILURE);
   4.248 -            }
   4.249 +            thread[i] = VPThread__create_thread(tfwork, (void*)&ip[i], VProc);
   4.250          }
   4.251      }
   4.252      
   4.253 @@ -277,10 +311,10 @@
   4.254      
   4.255      do {
   4.256          delta = 0.0;
   4.257 -        pthread_barrier_wait(&barr);
   4.258 -        work(&ip[0]);
   4.259 +        barrier_wait(&barr, VProc);
   4.260 +        work(&ip[0], VProc);
   4.261          
   4.262 -        pthread_barrier_wait(&barr);
   4.263 +        barrier_wait(&barr, VProc);
   4.264  		/* Let the main thread perform the array reduction */
   4.265  		for (i = 0; i < numClusters; i++) {
   4.266  			for (j = 0; j < nthreads; j++) {
   4.267 @@ -306,39 +340,32 @@
   4.268  		delta /= numObjs;
   4.269      } while (loop++ < PREC && delta > threshold);
   4.270      
   4.271 -	// Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program,
   4.272 +    // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program,
   4.273      // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed
   4.274      // iterations, therefore making benchmarking completely unreliable.
   4.275  
   4.276 -    finished=1;    
   4.277 -    pthread_barrier_wait(&barr);
   4.278 +    finished=1;
   4.279 +    barrier_wait(&barr, VProc);
   4.280      
   4.281 -    for(i = 1; i < nthreads; i++) {
   4.282 -        rc = pthread_join(thread[i], NULL);
   4.283 -        if (rc) {
   4.284 -            fprintf(stderr, "ERROR: Return Code For Thread Join Is %d\n", rc);
   4.285 -            exit(EXIT_FAILURE);
   4.286 -        }
   4.287 -    }
   4.288 -    
   4.289 -    free(ip);
   4.290 -    free(thread);
   4.291 -    pthread_barrier_destroy(&barr);
   4.292 -    pthread_mutex_destroy(&lock1);
   4.293 -	pthread_attr_destroy(&attr);
   4.294  
   4.295 -    free(local_newClusterSize[0]);
   4.296 -    free(local_newClusterSize);
   4.297 +    VPThread__free(ip, VProc);
   4.298 +    VPThread__free(thread, VProc);
   4.299 +
   4.300 +    VPThread__free(local_newClusterSize[0], VProc);
   4.301 +    VPThread__free(local_newClusterSize, VProc);
   4.302          
   4.303      for (i = 0; i < nthreads; i++)
   4.304          for (j = 0; j < numClusters; j++)
   4.305 -            free(local_newClusters[i][j]);
   4.306 -    free(local_newClusters[0]);
   4.307 -    free(local_newClusters);
   4.308 +            VPThread__free(local_newClusters[i][j], VProc);
   4.309 +    VPThread__free(local_newClusters[0], VProc);
   4.310 +    VPThread__free(local_newClusters, VProc);
   4.311  
   4.312 -    free(newClusters[0]);
   4.313 -    free(newClusters);
   4.314 -    free(newClusterSize);
   4.315 -    return clusters;
   4.316 +    VPThread__free(newClusters[0], VProc);
   4.317 +    VPThread__free(newClusters, VProc);
   4.318 +    VPThread__free(newClusterSize, VProc);
   4.319 +    
   4.320 +    (cluster_data)->clusters = clusters;
   4.321 +    
   4.322 +    VPThread__dissipate_thread(VProc);
   4.323  }
   4.324  
     5.1 --- a/pthreads_main.c	Wed Aug 03 19:30:34 2011 +0200
     5.2 +++ b/pthreads_main.c	Tue Aug 16 20:32:55 2011 +0200
     5.3 @@ -27,6 +27,11 @@
     5.4  #include <time.h>
     5.5  #include "kmeans.h"
     5.6  
     5.7 +#include "VPThread_lib/VPThread.h"
     5.8 +
     5.9 +char __ProgrammName[] = "kmeans";
    5.10 +char __DataSet[255];
    5.11 +
    5.12  #define seconds(tm) gettimeofday(&tp,(struct timezone *)0);\
    5.13     tm=tp.tv_sec+tp.tv_usec/1000000.0
    5.14  
    5.15 @@ -56,7 +61,7 @@
    5.16      int     opt;
    5.17      extern char   *optarg;
    5.18      extern int     optind;
    5.19 -    int     i, j;
    5.20 +    int     j;
    5.21      int     isBinaryFile;
    5.22      
    5.23      int    *membership;    /* [numObjs] */
    5.24 @@ -108,10 +113,17 @@
    5.25  
    5.26      membership = (int*) malloc(numObjs * sizeof(int));
    5.27  	assert(membership != NULL);
    5.28 +        
    5.29 +    clusters = malloc(numClusters * sizeof(double*));
    5.30 +    assert(clusters != NULL);
    5.31 +    clusters[0] = malloc(numClusters * numCoords * sizeof(double));
    5.32 +    assert(clusters[0] != NULL);
    5.33  
    5.34 -	/* Launch the core computation algorithm */
    5.35 -    clusters = pthreads_kmeans(0, objects, numCoords, numObjs,
    5.36 -                          numClusters, threshold, membership);
    5.37 +    struct call_data data = { 0, objects, numCoords, numObjs,
    5.38 +                          numClusters, threshold, membership, clusters };
    5.39 +    
    5.40 +    /* Launch the core computation algorithm */
    5.41 +    VPThread__create_seed_procr_and_do_work(pthreads_kmeans, (void*)&data);
    5.42  
    5.43      free(objects[0]);
    5.44      free(objects);