VMS/VMS_Implementations/VSs_impls/VSs__MC_shared_impl

view VSs_Request_Handlers.c @ 5:8188c5b4bfd7

implemented taskwait
author Nina Engelhardt <nengel@mailbox.tu-berlin.de>
date Fri, 13 Jul 2012 17:35:49 +0200
parents 13af59ed7ea5
children 1780f6b00e3d
line source
1 /*
2 * Copyright 2010 OpenSourceCodeStewardshipFoundation
3 *
4 * Licensed under BSD
5 */
7 #include <stdio.h>
8 #include <stdlib.h>
10 #include "VMS_impl/VMS.h"
11 #include "Queue_impl/PrivateQueue.h"
12 #include "Hash_impl/PrivateHash.h"
13 #include "VSs.h"
17 //=========================== Local Fn Prototypes ===========================
18 void
19 resume_slaveVP( SlaveVP *slave, VSsSemEnv *semEnv );
23 //==========================================================================
24 // Helpers
25 //
27 /*Only clone the elements of req used in these reqst handlers
28 */
29 VSsSemReq *
30 cloneReq( VSsSemReq *semReq )
31 { VSsSemReq *clonedReq;
33 clonedReq = VMS_PI__malloc( sizeof(VSsSemReq) );
34 clonedReq->reqType = semReq->reqType;
35 clonedReq->senderSlv = semReq->senderSlv;
36 clonedReq->receiverSlv= semReq->receiverSlv;
37 clonedReq->msg = semReq->msg;
38 clonedReq->nextReqInHashEntry = NULL;
40 return clonedReq;
41 }
45 HashEntry *
46 giveEntryElseInsertReqst32( int32 *key, VSsSemReq *semReq,
47 HashTable *commHashTbl )
48 { HashEntry *entry;
49 VSsSemReq *waitingReq;
51 entry = getEntryFromTable32( key, commHashTbl );
52 if( entry == NULL )
53 { //no waiting sends or receives, so add this request and exit
54 // note: have to clone the request because it's on stack of sender
55 addValueIntoTable32( key, cloneReq( semReq ), commHashTbl );
56 return NULL;
57 }
58 waitingReq = (VSsSemReq *)entry->content;
59 if( waitingReq == NULL ) //might happen when last waiting gets paired
60 { //no waiting sends or receives, so add this request and exit
61 entry->content = semReq;
62 return NULL;
63 }
64 return entry;
65 }
68 inline VSsPointerEntry *
69 create_pointer_entry( )
70 { VSsPointerEntry *newEntry;
72 newEntry = VMS_PI__malloc( sizeof(VSsPointerEntry) );
73 newEntry->hasEnabledNonFinishedWriter = FALSE;
74 newEntry->numEnabledNonDoneReaders = 0;
75 newEntry->waitersQ = makePrivQ();
77 return newEntry;
78 }
80 /*malloc's space and initializes fields -- and COPIES the arg values
81 * to new space
82 */
83 inline VSsTaskStub *
84 create_task_stub( VSsTaskType *taskType, void **args )
85 { void **newArgs;
86 int32 i, numArgs;
87 VSsTaskStub *
88 newStub = VMS_int__malloc( sizeof(VSsTaskStub) + taskType->sizeOfArgs );
89 newStub->numBlockingProp = taskType->numCtldArgs;
90 newStub->slaveAssignedTo = NULL;
91 newStub->taskType = taskType;
92 newStub->ptrEntries =
93 VMS_int__malloc( taskType->numCtldArgs * sizeof(VSsPointerEntry *) );
94 newArgs = (void **)( (uint8 *)newStub + sizeof(VSsTaskStub) );
95 newStub->args = newArgs;
96 newStub->numChildTasks = 0;
97 newStub->parent = NULL;
98 //Copy the arg-pointers.. can be more arguments than just the ones
99 // that StarSs uses to control ordering of task execution.
100 memcpy( newArgs, args, taskType->sizeOfArgs );
102 return newStub;
103 }
105 inline VSsTaskStubCarrier *
106 create_task_carrier( VSsTaskStub *taskStub, int32 argNum, int32 rdOrWrite )
107 { VSsTaskStubCarrier *newCarrier;
109 newCarrier = VMS_PI__malloc( sizeof(VSsTaskStubCarrier) );
110 newCarrier->taskStub = taskStub;
111 newCarrier->argNum = argNum;
112 newCarrier->isReader = rdOrWrite == READER;
113 }
115 //==========================================================================
116 //
117 //
118 /*Submit Task
119 *
120 *Uses a hash table to match the arg-pointers to each other. So, an
121 * argument-pointer is one-to-one with a hash-table entry.
122 *
123 *If overlapping region detection is enabled, then a hash entry is one
124 * link in a ring of all entries that overlap each other. For example,
125 * say region A shared common addresses with region B, but the pointers
126 * to them are different, then the hash entries for the two would be
127 * linked in a ring. When a pointer is processed, all the pointers in
128 * the ring are processed (Doesn't differentiate independent siblings
129 * from parent-child or conjoined twins overlap..)
130 * NOT ENABLED AS OF MAY 25 2012
131 *
132 *A hash entry has a queue of tasks that are waiting to access the
133 * pointed-to region. The queue goes in the order of creation of
134 * the tasks. Each entry in the queue has a pointer to the task-stub
135 * and whether the task reads-only vs writes to the hash-entry's region.
136 *
137 *A hash entry also has a count of the enabled but not yet finished readers
138 * of the region. It also has a flag that says whether a writer has been
139 * enabled and is not yet finished.
140 *
141 *There are two kinds of events that access a hash entry: creation of a
142 * task and end of a task.
143 *
144 *
145 * ========================== creation ==========================
146 *
147 *At creation, make a task-stub. Set the count of blocking propendents
148 * to the number of controlled arguments (a task can have
149 * arguments that are not controlled by the language, like simple integer
150 * inputs from the sequential portion. Note that all controlled arguments
151 * are pointers, and marked as controlled in the application code).
152 *
153 *The controlled arguments are then processed one by one.
154 *Processing an argument means getting the hash of the pointer. Then,
155 * looking up the hash entry. (If none, create one).
156 *With the hash entry:
157 *
158 *If the arg is a reader, and the entry does not have an enabled
159 * non-finished writer, and the queue is empty (could be prev readers,
160 * then a writer that got queued and now new readers that have to also be
161 * queued).
162 *The reader is free. So, decrement the blocking-propendent count in
163 * the task-stub. If the count is zero, then put the task-stub into the
164 * readyQ.
165 *At the same time, increment the hash-entry's count of enabled and
166 * non-finished readers.
167 *
168 *Otherwise, the reader is put into the hash-entry's Q of waiters
169 *
170 *If the arg is a writer, plus the entry does not have a current writer,
171 * plus the number of enabled non-finished readers is zero, plus the Q is
172 * empty, then the writer is free. Mark the entry has having an
173 * enabled and non-finished writer. Decrement the blocking-propendent
174 * count in the writer's task-stub. If the count is zero, then put the
175 * task-stub into the readyQ.
176 *
177 *Otherwise, put the writer into the entry's Q of waiters.
178 *
179 *No matter what, if the hash entry was chained, put it at the start of
180 * the chain. (Means no-longer-used pointers accumulate at end of chain,
181 * decide garbage collection of no-longer-used pointers later)
182 *
183 *
184 * ========================== end of task ===========================
185 *
186 *At the end of a task,
187 *The task's controlled arguments are processed one by one.
188 *Processing an argument means getting the hash of the pointer. Then,
189 * looking up the hash entry (and putting the entry at the start of the
190 * chain, if there was a chain).
191 *With the hash entry:
192 *
193 *If the arg is a reader, then decrement the enabled and non-finished
194 * reader-count in the hash-entry. If the count becomes zero, then take
195 * the next entry from the Q. It should be a writer, or else there's a
196 * bug in this algorithm.
197 *Set the hash-entry to have an enabled non-finished writer. Decrement
198 * the blocking-propendent-count of the writer's task-stub. If the count
199 * has reached zero, then put the task-stub into the readyQ.
200 *
201 *If the arg is a writer, then clear the enabled non-finished writer flag
202 * of the hash-entry. Take the next entry from the Q.
203 *If it is a writer, then turn the flag back on. Decrement the writer's
204 * blocking-propendent-count in its task-stub. If it becomes zero, then
205 * put the task-stub into the readyQ.
206 *
207 *If it is a reader, then increment the hash-entry's count of enabled
208 * non-finished readers. Decrement the blocking propendents count of the
209 * reader's task-stub. If it reaches zero, then put the task-stub into the
210 * readyQ.
211 *Then repeat until encounter a writer -- put that writer back into the Q.
212 *
213 *That should be it -- that should work.
214 */
215 void
216 handleSubmitTask( VSsSemReq *semReq, VSsSemEnv *semEnv )
217 { uint32 key[3];
218 HashEntry *rawHashEntry; //has char *, but use with uint32 *
219 VSsPointerEntry *ptrEntry; //contents of hash table entry for an arg pointer
220 void **args;
221 VSsTaskStub *taskStub;
222 VSsTaskType *taskType;
223 VSsTaskStubCarrier *taskCarrier;
225 HashTable *
226 argPtrHashTbl = semEnv->argPtrHashTbl;
228 DEBUG__printf1(dbgRqstHdlr,"Submit request from processor %d",semReq->callingSlv->slaveID)
230 /* ========================== creation ==========================
231 *
232 *At creation, make a task-stub. Set the count of blocking propendents
233 * to the number of controlled arguments (a task can have
234 * arguments that are not controlled by the language, like simple integer
235 * inputs from the sequential portion. Note that all controlled arguments
236 * are pointers, and marked as controlled in the application code).
237 */
238 args = semReq->args;
239 taskType = semReq->taskType;
240 taskStub = create_task_stub( taskType, args );//copies arg ptrs
241 taskStub->numBlockingProp = taskType->numCtldArgs;
242 taskStub->taskID = semReq->taskID; //may be NULL
243 taskStub->numChildTasks = 0;
245 VSsSemData* parentSemData = (VSsSemData*) semReq->callingSlv->semanticData;
246 if(parentSemData->taskStub != NULL){ //calling is task
247 taskStub->parentIsTask = TRUE;
248 taskStub->parent = (void*) parentSemData->taskStub;
249 parentSemData->taskStub->numChildTasks++;
250 } else {
251 taskStub->parentIsTask = FALSE;
252 taskStub->parent = (void*) parentSemData->threadInfo;
253 parentSemData->threadInfo->numChildTasks++;
254 }
257 /*The controlled arguments are then processed one by one.
258 *Processing an argument means getting the hash of the pointer. Then,
259 * looking up the hash entry. (If none, create one).
260 */
261 int32 argNum;
262 for( argNum = 0; argNum < taskType->numCtldArgs; argNum++ )
263 {
264 key[0] = 2; //two 32b values in key
265 *( (uint64*)&key[1]) = (uint64)args[argNum]; //write 64b into two 32b
267 /*If the hash entry was chained, put it at the
268 * start of the chain. (Means no-longer-used pointers accumulate
269 * at end of chain, decide garbage collection later) */
270 rawHashEntry = getEntryFromTable32( key, argPtrHashTbl );
271 if( rawHashEntry == NULL )
272 { //adding a value auto-creates the hash-entry
273 ptrEntry = create_pointer_entry();
274 rawHashEntry = addValueIntoTable32( key, ptrEntry, argPtrHashTbl );
275 }
276 else
277 { ptrEntry = (VSsPointerEntry *)rawHashEntry->content;
278 if( ptrEntry == NULL )
279 { ptrEntry = create_pointer_entry();
280 rawHashEntry = addValueIntoTable32(key, ptrEntry, argPtrHashTbl);
281 }
282 }
283 taskStub->ptrEntries[argNum] = ptrEntry;
285 /*Have the hash entry.
286 *If the arg is a reader and the entry does not have an enabled
287 * non-finished writer, and the queue is empty. */
288 if( taskType->argTypes[argNum] == READER )
289 { if( !ptrEntry->hasEnabledNonFinishedWriter &&
290 isEmptyPrivQ( ptrEntry->waitersQ ) )
291 { /*The reader is free. So, decrement the blocking-propendent
292 * count in the task-stub. If the count is zero, then put the
293 * task-stub into the readyQ. At the same time, increment
294 * the hash-entry's count of enabled and non-finished readers.*/
295 taskStub->numBlockingProp -= 1;
296 if( taskStub->numBlockingProp == 0 )
297 { writePrivQ( taskStub, semEnv->taskReadyQ );
298 }
299 ptrEntry->numEnabledNonDoneReaders += 1;
300 }
301 else
302 { /*Otherwise, the reader is put into the hash-entry's Q of
303 * waiters*/
304 taskCarrier = create_task_carrier( taskStub, argNum, READER );
305 writePrivQ( taskCarrier, ptrEntry->waitersQ );
306 }
307 }
308 else //arg is a writer
309 { /*the arg is a writer, plus the entry does not have a current
310 * writer, plus the number of enabled non-finished readers is
311 * zero, (the Q must be empty, else bug!) then the writer is free*/
312 if( !ptrEntry->hasEnabledNonFinishedWriter &&
313 ptrEntry->numEnabledNonDoneReaders == 0 )
314 { /*Mark the entry has having a enabled and non-finished writer.
315 * Decrement the blocking-propenden count in the writer's
316 * task-stub. If the count is zero, then put the task-stub
317 * into the readyQ.*/
318 taskStub->numBlockingProp -= 1;
319 if( taskStub->numBlockingProp == 0 )
320 { writePrivQ( taskStub, semEnv->taskReadyQ );
321 }
322 ptrEntry->hasEnabledNonFinishedWriter = TRUE;
323 }
324 else
325 {/*Otherwise, put the writer into the entry's Q of waiters.*/
326 taskCarrier = create_task_carrier( taskStub, argNum, WRITER );
327 writePrivQ( taskCarrier, ptrEntry->waitersQ );
328 }
329 }
330 } //for argNum
333 resume_slaveVP( semReq->callingSlv, semEnv );
335 return;
336 }
338 inline void
339 handleSubmitTaskWID( VSsSemReq *semReq, VSsSemEnv *semEnv)
340 {
341 }
344 /* ========================== end of task ===========================
345 *
346 *At the end of a task,
347 *The task's controlled arguments are processed one by one.
348 *Processing an argument means getting the hash of the pointer. Then,
349 * looking up the hash entry (and putting the entry at the start of the
350 * chain, if there was a chain).
351 *With the hash entry:
352 *
353 *If the arg is a reader, then decrement the enabled and non-finished
354 * reader-count in the hash-entry. If the count becomes zero, then take
355 * the next entry from the Q. It should be a writer, or else there's a
356 * bug in this algorithm.
357 *Set the hash-entry to have an enabled non-finished writer. Decrement
358 * the blocking-propendent-count of the writer's task-stub. If the count
359 * has reached zero, then put the task-stub into the readyQ.
360 *
361 *If the arg is a writer, then clear the enabled non-finished writer flag
362 * of the hash-entry. Take the next entry from the waiters Q.
363 *If it is a writer, then turn the flag back on. Decrement the writer's
364 * blocking-propendent-count in its task-stub. If it becomes zero, then
365 * put the task-stub into the readyQ.
366 *
367 *If waiter is a reader, then do a loop, getting all waiting readers.
368 * For each, increment the hash-entry's count of enabled
369 * non-finished readers. Decrement the blocking propendents count of the
370 * reader's task-stub. If it reaches zero, then put the task-stub into the
371 * readyQ.
372 *Repeat until encounter a writer -- put that writer back into the Q.
373 *
374 *May 2012 -- not keeping track of how many references to a given ptrEntry
375 * exist, so no way to garbage collect..
376 *TODO: Might be safe to delete an entry when task ends and waiterQ empty
377 * and no readers and no writers..
378 */
379 void
380 handleEndTask( VSsSemReq *semReq, VSsSemEnv *semEnv )
381 { uint32 key[3];
382 HashEntry *rawHashEntry;
383 VSsPointerEntry *ptrEntry; //contents of hash table entry for an arg pointer
384 void **args;
385 VSsTaskStub *endingTaskStub, *waitingTaskStub;
386 VSsTaskType *endingTaskType;
387 VSsWaiterCarrier *waitingTaskCarrier;
388 VSsPointerEntry **ptrEntries;
390 HashTable *
391 ptrHashTbl = semEnv->argPtrHashTbl;
393 DEBUG__printf1(dbgRqstHdlr,"EndTask request from processor %d",semReq->callingSlv->slaveID)
395 /* ========================== end of task ===========================
396 *At the end of a task, the task-stub is sent in the request.
397 */
398 endingTaskStub =
399 ((VSsSemData *)semReq->callingSlv->semanticData)->taskStub;
400 args = endingTaskStub->args;
401 endingTaskType = endingTaskStub->taskType;
402 ptrEntries = endingTaskStub->ptrEntries; //saved in stub when create
404 /* Check if parent was waiting on this task */
405 if(endingTaskStub->parentIsTask){
406 VSsTaskStub* parent = (VSsTaskStub*) endingTaskStub->parent;
407 parent->numChildTasks--;
408 if(parent->isWaiting && parent->numChildTasks == 0){
409 parent->isWaiting = FALSE;
410 resume_slaveVP( parent->slaveAssignedTo, semEnv );
411 }
412 } else {
413 VSsThreadInfo* parent = (VSsThreadInfo*) endingTaskStub->parent;
414 parent->numChildTasks--;
415 if(parent->isWaiting && parent->numChildTasks == 0){
416 parent->isWaiting = FALSE;
417 resume_slaveVP( parent->slaveAssignedTo, semEnv );
418 }
419 }
421 /*The task's controlled arguments are processed one by one.
422 *Processing an argument means getting arg-pointer's entry.
423 */
424 int32 argNum;
425 for( argNum = 0; argNum < endingTaskType->numCtldArgs; argNum++ )
426 {
427 /* commented out 'cause saving entry ptr when create stub
428 key[0] = 2; //says are 2 32b values in key
429 *( (uint64*)&key[1] ) = args[argNum]; //write 64b ptr into two 32b
431 /*If the hash entry was chained, put it at the
432 * start of the chain. (Means no-longer-used pointers accumulate
433 * at end of chain, decide garbage collection later)
434 */
435 /*NOTE: don't do hash lookups here, instead, have a pointer to the
436 * hash entry inside task-stub, put there during task creation.
437 rawHashEntry = getEntryFromTable32( key, ptrHashTbl );
438 ptrEntry = (VSsPointerEntry *)rawHashEntry->content;
439 if( ptrEntry == NULL )
440 VMS_App__throw_exception("hash entry NULL", NULL, NULL);
441 */
443 ptrEntry = ptrEntries[argNum];
444 /*check if the ending task was reader of this arg*/
445 if( endingTaskType->argTypes[argNum] == READER )
446 { /*then decrement the enabled and non-finished reader-count in
447 * the hash-entry. */
448 ptrEntry->numEnabledNonDoneReaders -= 1;
450 /*If the count becomes zero, then take the next entry from the Q.
451 *It should be a writer, or else there's a bug in this algorithm.*/
452 if( ptrEntry->numEnabledNonDoneReaders == 0 )
453 { waitingTaskCarrier = readPrivQ( ptrEntry->waitersQ );
454 if( waitingTaskCarrier == NULL )
455 { //TODO: looks safe to delete the ptr entry at this point
456 continue; //next iter of loop
457 }
458 if( waitingTaskCarrier->type == READER )
459 VMS_App__throw_exception("READER waiting", NULL, NULL);
461 waitingTaskStub = waitingTaskCarrier->taskStub;
463 /*Set the hash-entry to have an enabled non-finished writer.*/
464 ptrEntry->hasEnabledNonFinishedWriter = TRUE;
466 /* Decrement the blocking-propendent-count of the writer's
467 * task-stub. If the count has reached zero, then put the
468 * task-stub into the readyQ.*/
469 waitingTaskStub->numBlockingProp -= 1;
470 if( waitingTaskStub->numBlockingProp == 0 )
471 { writePrivQ( waitingTaskStub, semEnv->taskReadyQ );
472 }
473 }
474 }
475 else /*the ending task is a writer of this arg*/
476 { /*clear the enabled non-finished writer flag of the hash-entry.*/
477 ptrEntry->hasEnabledNonFinishedWriter = FALSE;
479 /*Take the next waiter from the hash-entry's Q.*/
480 waitingTaskCarrier = readPrivQ( ptrEntry->waitersQ );
481 if( waitingTaskCarrier == NULL )
482 { //TODO: looks safe to delete ptr entry at this point
483 continue; //go to next iter of loop, done here.
484 }
485 waitingTaskStub = waitingTaskCarrier->taskStub;
487 /*If task is a writer of this hash-entry's pointer*/
488 if( waitingTaskCarrier->type == WRITER )
489 { /* then turn the flag back on.*/
490 ptrEntry->hasEnabledNonFinishedWriter = TRUE;
491 /*Decrement the writer's blocking-propendent-count in task-stub
492 * If it becomes zero, then put the task-stub into the readyQ.*/
493 waitingTaskStub->numBlockingProp -= 1;
494 if( waitingTaskStub->numBlockingProp == 0 )
495 { writePrivQ( waitingTaskStub, semEnv->taskReadyQ );
496 }
497 }
498 else
499 { /*Waiting task is a reader, so do a loop, of all waiting readers
500 * until encounter a writer or waitersQ is empty*/
501 while( TRUE ) /*The checks guarantee have a waiting reader*/
502 { /*Increment the hash-entry's count of enabled non-finished
503 * readers.*/
504 ptrEntry->numEnabledNonDoneReaders += 1;
506 /*Decrement the blocking propendents count of the reader's
507 * task-stub. If it reaches zero, then put the task-stub
508 * into the readyQ.*/
509 waitingTaskStub->numBlockingProp -= 1;
510 if( waitingTaskStub->numBlockingProp == 0 )
511 { writePrivQ( waitingTaskStub, semEnv->taskReadyQ );
512 }
513 /*Get next waiting task*/
514 waitingTaskCarrier = peekPrivQ( ptrEntry->waitersQ );
515 if( waitingTaskCarrier == NULL ) break;
516 if( waitingTaskCarrier->type == WRITER ) break;
517 waitingTaskCarrier = readPrivQ( ptrEntry->waitersQ );
518 waitingTaskStub = waitingTaskCarrier->taskStub;
519 }//while waiter is a reader
520 }//if-else, first waiting task is a reader
521 }//if-else, check of ending task, whether writer or reader
522 }//for argnum in ending task
524 //done ending the task, now free the stub + args copy
525 VMS_PI__free( endingTaskStub->ptrEntries );
526 VMS_PI__free( endingTaskStub );
528 //Resume the slave that animated the task -- assigner will give new task
529 ((VSsSemData *)semReq->callingSlv->semanticData)->needsTaskAssigned =
530 TRUE;
531 resume_slaveVP( semReq->callingSlv, semEnv );
533 return;
534 }
537 //========================== Task Comm handlers ===========================
541 //============================ Send Handlers ==============================
542 /*Send of Type -- The semantic request has the receiving task ID and Type
543 *
544 *Messages of a given Type have to be kept separate.. so need a separate
545 * entry in the hash table for each pair: receiverID, Type
546 *
547 *Also, if same sender sends multiple before any get received, then need to
548 * stack the sends up -- even if a send waits until it's paired, several
549 * separate tasks can send to the same receiver, and doing hash on the
550 * receive task, so they will stack up.
551 */
552 void
553 handleSendTypeTo( VSsSemReq *semReq, VSsSemEnv *semEnv )
554 { SlaveVP *senderSlv, *receiverSlv;
555 int32 *senderID, *receiverID;
556 int32 *key, keySz, receiverIDNumInt;
557 VSsSemReq *waitingReq;
558 HashEntry *entry;
559 HashTable *commHashTbl = semEnv->commHashTbl;
561 DEBUG__printf1(dbgRqstHdlr,"SendType request from processor %d",semReq->senderSlv->slaveID)
563 receiverID = semReq->receiverID; //For "send", know both send & recv procrs
564 senderSlv = semReq->senderSlv;
566 receiverIDNumInt = receiverID[0] + 1; //pos 0 doesn't include itself
567 keySz = receiverIDNumInt * sizeof(int32) + sizeof(int32);
568 key = VMS_PI__malloc( keySz );
569 memcpy( key, receiverID, receiverIDNumInt * sizeof(int32) );
570 key[ receiverIDNumInt ] = semReq->msgType; //no +1 'cause starts at 0
572 entry = giveEntryElseInsertReqst32( key, semReq, commHashTbl );
573 if( entry == NULL ) return; //was just inserted
575 //if here, found a waiting request with same key
576 waitingReq = (VSsSemReq *)entry->content;
578 //At this point, know have waiting request(s) -- either sends or recv
579 //Note, can only have max of one receive waiting, and cannot have both
580 // sends and receives waiting (they would have paired off)
581 // but can have multiple sends from diff sending VPs, all same msg-type
582 if( waitingReq->reqType == send_type_to )
583 { //waiting request is another send, so stack this up on list
584 // but first clone the sending request so it persists.
585 VSsSemReq *clonedReq = cloneReq( semReq );
586 clonedReq-> nextReqInHashEntry = waitingReq->nextReqInHashEntry;
587 waitingReq->nextReqInHashEntry = clonedReq;
588 DEBUG__printf2( dbgRqstHdlr, "linked requests: %p, %p ", clonedReq,\
589 waitingReq )
590 return;
591 }
592 else
593 {
594 #ifdef HOLISTIC__TURN_ON_OBSERVE_UCC
595 Dependency newd;
596 newd.from_vp = senderID->slaveID;
597 newd.from_task = senderID->assignCount;
598 newd.to_vp = receiverID->slaveID;
599 newd.to_task = receiverID->assignCount +1;
600 //(newd,semEnv->commDependenciesList);
601 addToListOfArrays(Dependency,newd,semEnv->dynDependenciesList);
602 int32 groupId = semReq->msgType;
603 if(semEnv->ntonGroupsInfo->numInArray <= groupId){
604 makeHighestDynArrayIndexBeAtLeast(semEnv->ntonGroupsInfo, groupId);
605 }
606 if(semEnv->ntonGroups[groupId] == NULL){
607 semEnv->ntonGroups[groupId] = new_NtoN(groupId);
608 }
609 Unit u;
610 u.vp = senderID->slaveID;
611 u.task = senderID->assignCount;
612 addToListOfArrays(Unit,u,semEnv->ntonGroups[groupId]->senders);
613 u.vp = receiverID->slaveID;
614 u.task = receiverID->assignCount +1;
615 addToListOfArrays(Unit,u,semEnv->ntonGroups[groupId]->receivers);
616 #endif
618 //set receiver slave, from the waiting request
619 receiverSlv = waitingReq->receiverSlv;
621 //waiting request is a receive_type_to, so it pairs to this send
622 //First, remove the waiting receive request from the entry
623 entry->content = waitingReq->nextReqInHashEntry;
624 VMS_PI__free( waitingReq ); //Don't use contents -- so free it
626 if( entry->content == NULL )
627 { //TODO: mod hash table to double-link, so can delete entry from
628 // table without hashing the key and looking it up again
629 deleteEntryFromTable32( (uint32*)entry->key, commHashTbl ); //frees hashEntry
630 }
632 //attach msg that's in this send request to receiving task's Slv
633 // when comes back from suspend will have msg in dataRetFromReq
634 receiverSlv->dataRetFromReq = semReq->msg;
636 //bring both processors back from suspend
637 resume_slaveVP( senderSlv, semEnv );
638 resume_slaveVP( receiverSlv, semEnv );
640 return;
641 }
642 }
645 /*Looks like can make single handler for both sends..
646 */
647 //TODO: combine both send handlers into single handler
648 void
649 handleSendFromTo( VSsSemReq *semReq, VSsSemEnv *semEnv)
650 { SlaveVP *senderSlv, *receiverSlv;
651 int32 *senderID, *receiverID;
652 int32 *key, keySz, receiverIDNumInt, senderIDNumInt;
653 VSsSemReq *waitingReq;
654 HashEntry *entry;
655 HashTable *commHashTbl = semEnv->commHashTbl;
657 DEBUG__printf2(dbgRqstHdlr,"SendFromTo request from processor %d to %d",semReq->senderID,semReq->receiverID)
659 receiverID = semReq->receiverID; //For "send", know both send & recv procrs
660 senderID = semReq->senderID;
661 //receiverSlv = semReq->receiverSlv;
662 senderSlv = semReq->senderSlv;
664 receiverIDNumInt = receiverID[0] + 1; //pos 0 doesn't include itself
665 senderIDNumInt = senderID[0] + 1;
666 keySz = (receiverIDNumInt + senderIDNumInt) * sizeof(int32);
667 key = VMS_PI__malloc( keySz );
668 memcpy( key, receiverID, receiverIDNumInt * sizeof(int32) );
669 memcpy( &key[receiverIDNumInt], senderID, senderIDNumInt * sizeof(int32) );
671 entry = giveEntryElseInsertReqst32( key, semReq, commHashTbl );
672 if( entry == NULL ) return; //was just inserted
674 waitingReq = (VSsSemReq *)entry->content;
676 //At this point, know have waiting request(s) -- either sends or recv
677 if( waitingReq->reqType == send_from_to )
678 { printf("\n ERROR: shouldn't be two send-from-tos waiting \n");
679 }
680 else
681 { //waiting request is a receive, so it completes pair with this send
682 #ifdef HOLISTIC__TURN_ON_OBSERVE_UCC
683 Dependency newd;
684 newd.from_vp = sendPr->slaveID;
685 newd.from_task = sendPr->assignCount;
686 newd.to_vp = receivePr->slaveID;
687 newd.to_task = receivePr->assignCount +1;
688 //addToListOfArraysDependency(newd,semEnv->commDependenciesList);
689 addToListOfArrays(Dependency,newd,semEnv->commDependenciesList);
690 #endif
692 //set receiver slave, from the waiting request
693 receiverSlv = waitingReq->receiverSlv;
695 //First, remove the waiting receive request from the entry
696 entry->content = waitingReq->nextReqInHashEntry;
697 VMS_PI__free( waitingReq ); //Don't use contents -- so free it
699 //can only be one waiting req for "from-to" semantics
700 if( entry->content != NULL )
701 {
702 printf("\nERROR in handleSendFromTo\n");
703 }
704 deleteEntryFromTable32( (uint32*)entry->key, commHashTbl ); //frees HashEntry
706 //attach msg that's in this send request to receiving procr
707 // when comes back from suspend, will have msg in dataRetFromReq
708 receiverSlv->dataRetFromReq = semReq->msg;
710 //bring both processors back from suspend
711 resume_slaveVP( senderSlv, semEnv );
712 resume_slaveVP( receiverSlv, semEnv );
714 return;
715 }
716 }
720 //============================== Receives ===========================
721 //
724 void
725 handleReceiveTypeTo( VSsSemReq *semReq, VSsSemEnv *semEnv)
726 { SlaveVP *senderSlv, *receiverSlv;
727 int32 *receiverID;
728 int32 *key, keySz, receiverIDNumInt;
729 VSsSemReq *waitingReq;
730 HashEntry *entry;
731 HashTable *commHashTbl = semEnv->commHashTbl;
733 DEBUG__printf1(dbgRqstHdlr,"SendType request to %d",semReq->receiverID)
735 receiverID = semReq->receiverID; //For "send", know both send & recv procrs
736 receiverSlv = semReq->receiverSlv;
738 //key is the receiverID plus the type -- have to copy them into key
739 receiverIDNumInt = receiverID[0] + 1; //pos 0 doesn't include itself
740 keySz = receiverIDNumInt * sizeof(int32) + sizeof(int32);
741 key = VMS_PI__malloc( keySz );
742 memcpy( key, receiverID, receiverIDNumInt * sizeof(int32) );
743 key[ receiverIDNumInt ] = semReq->msgType; //no +1 'cause starts at 0
746 entry = giveEntryElseInsertReqst32( key, semReq, commHashTbl );//clones
747 if( entry == NULL ) return; //was just inserted
749 waitingReq = (VSsSemReq *)entry->content; //previously cloned by insert
751 //At this point, know have waiting request(s) -- should be send(s)
752 if( waitingReq->reqType == send_type_to )
753 {
754 //set sending slave from the request
755 senderSlv = waitingReq->senderSlv;
757 //waiting request is a send, so pair it with this receive
758 //first, remove the waiting send request from the list in entry
759 entry->content = waitingReq->nextReqInHashEntry;
760 if( entry->content == NULL )
761 { deleteEntryFromTable32( (uint32*)entry->key, commHashTbl ); //frees HashEntry
762 }
764 //attach msg that's in the send request to receiving procr
765 // when comes back from suspend, will have msg in dataRetFromReq
766 receiverSlv->dataRetFromReq = waitingReq->msg;
768 //bring both processors back from suspend
769 VMS_PI__free( waitingReq );
771 #ifdef HOLISTIC__TURN_ON_OBSERVE_UCC
772 Dependency newd;
773 newd.from_vp = sendPr->slaveID;
774 newd.from_task = sendPr->assignCount;
775 newd.to_vp = receivePr->slaveID;
776 newd.to_task = receivePr->assignCount +1;
777 //addToListOfArraysDependency(newd,semEnv->commDependenciesList);
778 addToListOfArrays(Dependency,newd,semEnv->dynDependenciesList);
779 int32 groupId = semReq->msgType;
780 if(semEnv->ntonGroupsInfo->numInArray <= groupId){
781 makeHighestDynArrayIndexBeAtLeast(semEnv->ntonGroupsInfo, groupId);
782 }
783 if(semEnv->ntonGroups[groupId] == NULL){
784 semEnv->ntonGroups[groupId] = new_NtoN(groupId);
785 }
786 Unit u;
787 u.vp = sendPr->slaveID;
788 u.task = sendPr->assignCount;
789 addToListOfArrays(Unit,u,semEnv->ntonGroups[groupId]->senders);
790 u.vp = receivePr->slaveID;
791 u.task = receivePr->assignCount +1;
792 addToListOfArrays(Unit,u,semEnv->ntonGroups[groupId]->receivers);
793 #endif
795 resume_slaveVP( senderSlv, semEnv );
796 resume_slaveVP( receiverSlv, semEnv );
798 return;
799 }
800 printf("\nLang Impl Error: Should never be two waiting receives!\n");
801 }
804 /*
805 */
806 void
807 handleReceiveFromTo( VSsSemReq *semReq, VSsSemEnv *semEnv)
808 { SlaveVP *senderSlv, *receiverSlv;
809 int32 *senderID, *receiverID;
810 int32 *key, keySz, receiverIDNumInt, senderIDNumInt;
811 VSsSemReq *waitingReq;
812 HashEntry *entry;
813 HashTable *commHashTbl = semEnv->commHashTbl;
815 DEBUG__printf2(dbgRqstHdlr,"SendFromTo request from %d to %d",semReq->senderID,semReq->receiverID)
817 receiverID = semReq->receiverID; //For "send", know both send & recv procrs
818 senderID = semReq->senderID;
819 receiverSlv = semReq->receiverSlv;
821 receiverIDNumInt = receiverID[0] + 1; //pos 0 doesn't include itself
822 senderIDNumInt = senderID[0] + 1;
823 keySz = (receiverIDNumInt + senderIDNumInt) * sizeof(int32);
824 key = VMS_PI__malloc( keySz );
825 memcpy( key, receiverID, receiverIDNumInt * sizeof(int32) );
826 memcpy( &key[receiverIDNumInt], senderID, senderIDNumInt * sizeof(int32) );
828 entry = giveEntryElseInsertReqst32( key, semReq, commHashTbl );
829 if( entry == NULL ) return; //was just inserted
831 waitingReq = (VSsSemReq *)entry->content;
833 //At this point, know have waiting request(s) -- should be send(s)
834 if( waitingReq->reqType == send_from_to )
835 { //waiting request is a send, so pair it with this receive
836 #ifdef HOLISTIC__TURN_ON_OBSERVE_UCC
837 Dependency newd;
838 newd.from_vp = sendPr->slaveID;
839 newd.from_task = sendPr->assignCount;
840 newd.to_vp = receivePr->slaveID;
841 newd.to_task = receivePr->assignCount +1;
842 //addToListOfArraysDependency(newd,semEnv->commDependenciesList);
843 addToListOfArrays(Dependency,newd,semEnv->commDependenciesList);
844 #endif
846 //have receiver slave, now set sender slave
847 senderSlv = waitingReq->senderSlv;
849 //For from-to, should only ever be a single reqst waiting tobe paird
850 entry->content = waitingReq->nextReqInHashEntry;
851 if( entry->content != NULL ) printf("\nERROR in handleRecvFromTo\n");
852 deleteEntryFromTable32( (uint32*)entry->key, commHashTbl ); //frees entry too
854 //attach msg that's in the send request to receiving procr
855 // when comes back from suspend, will have msg in dataRetFromReq
856 receiverSlv->dataRetFromReq = waitingReq->msg;
858 //bring both processors back from suspend
859 VMS_PI__free( waitingReq );
861 resume_slaveVP( senderSlv, semEnv );
862 resume_slaveVP( receiverSlv, semEnv );
864 return;
865 }
866 printf("\nLang Impl Error: Should never be two waiting receives!\n");
867 }
869 //==========================================================================
870 void
871 handleTaskwait( VSsSemReq *semReq, SlaveVP *requestingSlv, VSsSemEnv *semEnv )
872 {
873 VSsTaskStub* requestingTaskStub;
875 DEBUG__printf1(dbgRqstHdlr,"Taskwait request from processor %d",requestingSlv->slaveID)
877 VSsSemData* semData = ((VSsSemData *)semReq->callingSlv->semanticData);
879 requestingTaskStub = semData->taskStub;
881 if(requestingTaskStub == NULL){ //calling VP is hosting a thread
882 if(semData->threadInfo->numChildTasks == 0){ //nobody to wait for, proceed
883 resume_slaveVP( requestingSlv, semEnv );
884 } else { //have to wait
885 semData->threadInfo->isWaiting = TRUE;
886 return;
887 }
888 } else { //calling VP is executing a task
889 if(requestingTaskStub->numChildTasks == 0){
890 resume_slaveVP( requestingSlv, semEnv );
891 } else { //have to wait
892 requestingTaskStub->isWaiting = TRUE;
893 return;
894 }
895 }
897 }
900 //==========================================================================
901 /*
902 */
903 void
904 handleMalloc( VSsSemReq *semReq, SlaveVP *requestingSlv, VSsSemEnv *semEnv )
905 { void *ptr;
907 DEBUG__printf1(dbgRqstHdlr,"Malloc request from processor %d",requestingSlv->slaveID)
909 ptr = VMS_PI__malloc( semReq->sizeToMalloc );
910 requestingSlv->dataRetFromReq = ptr;
911 resume_slaveVP( requestingSlv, semEnv );
912 }
914 /*
915 */
916 void
917 handleFree( VSsSemReq *semReq, SlaveVP *requestingSlv, VSsSemEnv *semEnv )
918 {
919 DEBUG__printf1(dbgRqstHdlr,"Free request from processor %d",requestingSlv->slaveID)
920 VMS_PI__free( semReq->ptrToFree );
921 resume_slaveVP( requestingSlv, semEnv );
922 }
925 //===========================================================================
926 //
927 /*Uses ID as index into array of flags. If flag already set, resumes from
928 * end-label. Else, sets flag and resumes normally.
929 */
930 void inline
931 handleStartSingleton_helper( VSsSingleton *singleton, SlaveVP *reqstingSlv,
932 VSsSemEnv *semEnv )
933 {
934 if( singleton->hasFinished )
935 { //the code that sets the flag to true first sets the end instr addr
936 reqstingSlv->dataRetFromReq = singleton->endInstrAddr;
937 resume_slaveVP( reqstingSlv, semEnv );
938 return;
939 }
940 else if( singleton->hasBeenStarted )
941 { //singleton is in-progress in a diff slave, so wait for it to finish
942 writePrivQ(reqstingSlv, singleton->waitQ );
943 return;
944 }
945 else
946 { //hasn't been started, so this is the first attempt at the singleton
947 singleton->hasBeenStarted = TRUE;
948 reqstingSlv->dataRetFromReq = 0x0;
949 resume_slaveVP( reqstingSlv, semEnv );
950 return;
951 }
952 }
953 void inline
954 handleStartFnSingleton( VSsSemReq *semReq, SlaveVP *requestingSlv,
955 VSsSemEnv *semEnv )
956 { VSsSingleton *singleton;
957 DEBUG__printf1(dbgRqstHdlr,"StartFnSingleton request from processor %d",requestingSlv->slaveID)
959 singleton = &(semEnv->fnSingletons[ semReq->singletonID ]);
960 handleStartSingleton_helper( singleton, requestingSlv, semEnv );
961 }
962 void inline
963 handleStartDataSingleton( VSsSemReq *semReq, SlaveVP *requestingSlv,
964 VSsSemEnv *semEnv )
965 { VSsSingleton *singleton;
967 DEBUG__printf1(dbgRqstHdlr,"StartDataSingleton request from processor %d",requestingSlv->slaveID)
968 if( *(semReq->singletonPtrAddr) == NULL )
969 { singleton = VMS_PI__malloc( sizeof(VSsSingleton) );
970 singleton->waitQ = makeVMSQ();
971 singleton->endInstrAddr = 0x0;
972 singleton->hasBeenStarted = FALSE;
973 singleton->hasFinished = FALSE;
974 *(semReq->singletonPtrAddr) = singleton;
975 }
976 else
977 singleton = *(semReq->singletonPtrAddr);
978 handleStartSingleton_helper( singleton, requestingSlv, semEnv );
979 }
982 void inline
983 handleEndSingleton_helper( VSsSingleton *singleton, SlaveVP *requestingSlv,
984 VSsSemEnv *semEnv )
985 { PrivQueueStruc *waitQ;
986 int32 numWaiting, i;
987 SlaveVP *resumingSlv;
989 if( singleton->hasFinished )
990 { //by definition, only one slave should ever be able to run end singleton
991 // so if this is true, is an error
992 ERROR1( "singleton code ran twice", requestingSlv );
993 }
995 singleton->hasFinished = TRUE;
996 waitQ = singleton->waitQ;
997 numWaiting = numInPrivQ( waitQ );
998 for( i = 0; i < numWaiting; i++ )
999 { //they will resume inside start singleton, then jmp to end singleton
1000 resumingSlv = readPrivQ( waitQ );
1001 resumingSlv->dataRetFromReq = singleton->endInstrAddr;
1002 resume_slaveVP( resumingSlv, semEnv );
1005 resume_slaveVP( requestingSlv, semEnv );
1008 void inline
1009 handleEndFnSingleton( VSsSemReq *semReq, SlaveVP *requestingSlv,
1010 VSsSemEnv *semEnv )
1012 VSsSingleton *singleton;
1014 DEBUG__printf1(dbgRqstHdlr,"EndFnSingleton request from processor %d",requestingSlv->slaveID)
1016 singleton = &(semEnv->fnSingletons[ semReq->singletonID ]);
1017 handleEndSingleton_helper( singleton, requestingSlv, semEnv );
1019 void inline
1020 handleEndDataSingleton( VSsSemReq *semReq, SlaveVP *requestingSlv,
1021 VSsSemEnv *semEnv )
1023 VSsSingleton *singleton;
1025 DEBUG__printf1(dbgRqstHdlr,"EndDataSingleton request from processor %d",requestingSlv->slaveID)
1027 singleton = *(semReq->singletonPtrAddr);
1028 handleEndSingleton_helper( singleton, requestingSlv, semEnv );
1032 /*This executes the function in the masterVP, take the function
1033 * pointer out of the request and call it, then resume the VP.
1034 */
1035 void
1036 handleAtomic( VSsSemReq *semReq, SlaveVP *requestingSlv, VSsSemEnv *semEnv )
1038 DEBUG__printf1(dbgRqstHdlr,"Atomic request from processor %d",requestingSlv->slaveID)
1039 semReq->fnToExecInMaster( semReq->dataForFn );
1040 resume_slaveVP( requestingSlv, semEnv );
1043 /*First, it looks at the VP's semantic data, to see the highest transactionID
1044 * that VP
1045 * already has entered. If the current ID is not larger, it throws an
1046 * exception stating a bug in the code.
1047 *Otherwise it puts the current ID
1048 * there, and adds the ID to a linked list of IDs entered -- the list is
1049 * used to check that exits are properly ordered.
1050 *Next it is uses transactionID as index into an array of transaction
1051 * structures.
1052 *If the "VP_currently_executing" field is non-null, then put requesting VP
1053 * into queue in the struct. (At some point a holder will request
1054 * end-transaction, which will take this VP from the queue and resume it.)
1055 *If NULL, then write requesting into the field and resume.
1056 */
1057 void
1058 handleTransStart( VSsSemReq *semReq, SlaveVP *requestingSlv,
1059 VSsSemEnv *semEnv )
1060 { VSsSemData *semData;
1061 TransListElem *nextTransElem;
1063 DEBUG__printf1(dbgRqstHdlr,"TransStart request from processor %d",requestingSlv->slaveID)
1065 //check ordering of entering transactions is correct
1066 semData = requestingSlv->semanticData;
1067 if( semData->highestTransEntered > semReq->transID )
1068 { //throw VMS exception, which shuts down VMS.
1069 VMS_PI__throw_exception( "transID smaller than prev", requestingSlv, NULL);
1071 //add this trans ID to the list of transactions entered -- check when
1072 // end a transaction
1073 semData->highestTransEntered = semReq->transID;
1074 nextTransElem = VMS_PI__malloc( sizeof(TransListElem) );
1075 nextTransElem->transID = semReq->transID;
1076 nextTransElem->nextTrans = semData->lastTransEntered;
1077 semData->lastTransEntered = nextTransElem;
1079 //get the structure for this transaction ID
1080 VSsTrans *
1081 transStruc = &(semEnv->transactionStrucs[ semReq->transID ]);
1083 if( transStruc->VPCurrentlyExecuting == NULL )
1085 transStruc->VPCurrentlyExecuting = requestingSlv;
1086 resume_slaveVP( requestingSlv, semEnv );
1088 else
1089 { //note, might make future things cleaner if save request with VP and
1090 // add this trans ID to the linked list when gets out of queue.
1091 // but don't need for now, and lazy..
1092 writePrivQ( requestingSlv, transStruc->waitingVPQ );
1097 /*Use the trans ID to get the transaction structure from the array.
1098 *Look at VP_currently_executing to be sure it's same as requesting VP.
1099 * If different, throw an exception, stating there's a bug in the code.
1100 *Next, take the first element off the list of entered transactions.
1101 * Check to be sure the ending transaction is the same ID as the next on
1102 * the list. If not, incorrectly nested so throw an exception.
1104 *Next, get from the queue in the structure.
1105 *If it's empty, set VP_currently_executing field to NULL and resume
1106 * requesting VP.
1107 *If get somethine, set VP_currently_executing to the VP from the queue, then
1108 * resume both.
1109 */
1110 void
1111 handleTransEnd(VSsSemReq *semReq, SlaveVP *requestingSlv, VSsSemEnv *semEnv)
1112 { VSsSemData *semData;
1113 SlaveVP *waitingSlv;
1114 VSsTrans *transStruc;
1115 TransListElem *lastTrans;
1117 DEBUG__printf1(dbgRqstHdlr,"TransEnd request from processor %d",requestingSlv->slaveID)
1119 transStruc = &(semEnv->transactionStrucs[ semReq->transID ]);
1121 //make sure transaction ended in same VP as started it.
1122 if( transStruc->VPCurrentlyExecuting != requestingSlv )
1124 VMS_PI__throw_exception( "trans ended in diff VP", requestingSlv, NULL );
1127 //make sure nesting is correct -- last ID entered should == this ID
1128 semData = requestingSlv->semanticData;
1129 lastTrans = semData->lastTransEntered;
1130 if( lastTrans->transID != semReq->transID )
1132 VMS_PI__throw_exception( "trans incorrectly nested", requestingSlv, NULL );
1135 semData->lastTransEntered = semData->lastTransEntered->nextTrans;
1138 waitingSlv = readPrivQ( transStruc->waitingVPQ );
1139 transStruc->VPCurrentlyExecuting = waitingSlv;
1141 if( waitingSlv != NULL )
1142 resume_slaveVP( waitingSlv, semEnv );
1144 resume_slaveVP( requestingSlv, semEnv );