# HG changeset patch # User Nina Engelhardt # Date 1351526276 -3600 # Node ID feea343d202fbde09d3b43ff0616b372b5be729b # Parent a7ca8f45c1c4b551f8966e81982d48fc5b2ac08e add support for more OmpSs features diff -r a7ca8f45c1c4 -r feea343d202f VSs.c --- a/VSs.c Tue Sep 25 16:12:40 2012 +0200 +++ b/VSs.c Mon Oct 29 16:57:56 2012 +0100 @@ -253,6 +253,12 @@ semanticEnv->fnSingletons[i].hasFinished = FALSE; semanticEnv->fnSingletons[i].waitQ = makeVMSQ(); semanticEnv->transactionStrucs[i].waitingVPQ = makeVMSQ(); + semanticEnv->criticalSection[i].isOccupied = FALSE; + semanticEnv->criticalSection[i].waitQ = makeVMSQ(); +#ifdef HOLISTIC__TURN_ON_PERF_COUNTERS + semanticEnv->criticalSection[i].previous.vp = 0; + semanticEnv->criticalSection[i].previous.task = 0; +#endif } semanticEnv->numLiveExtraTaskSlvs = 0; //must be last @@ -543,7 +549,41 @@ VMS_WL__send_sem_request( &reqData, animSlv ); } +void +VSs__taskwait_on(SlaveVP *animSlv,void* ptr){ + VSsSemReq reqData; + reqData.reqType = taskwait_on; + reqData.callingSlv = animSlv; + + reqData.args = ptr; + + VMS_WL__send_sem_request( &reqData, animSlv ); +} + +void +VSs__start_critical(SlaveVP *animSlv,int32 name){ + VSsSemReq reqData; + + reqData.reqType = critical_start; + reqData.callingSlv = animSlv; + + reqData.criticalID = name; + + VMS_WL__send_sem_request( &reqData, animSlv ); +} + +void +VSs__end_critical(SlaveVP *animSlv,int32 name){ + VSsSemReq reqData; + + reqData.reqType = critical_end; + reqData.callingSlv = animSlv; + + reqData.criticalID = name; + + VMS_WL__send_sem_request( &reqData, animSlv ); +} //========================== send and receive ============================ // diff -r a7ca8f45c1c4 -r feea343d202f VSs.h --- a/VSs.h Tue Sep 25 16:12:40 2012 +0200 +++ b/VSs.h Mon Oct 29 16:57:56 2012 +0100 @@ -92,6 +92,7 @@ bool32 isEnded; #ifdef HOLISTIC__TURN_ON_OBSERVE_UCC Unit parentUnit; + Unit firstOfTask; #endif } VSsTaskStub; @@ -102,6 +103,7 @@ VSsTaskStub *taskStub; int32 argNum; int32 isReader; + bool32 isSuspended; } VSsTaskStubCarrier; @@ -131,6 +133,16 @@ } VSsSingleton; +typedef struct + { + int32 isOccupied; + PrivQueueStruc *waitQ; +#ifdef HOLISTIC__TURN_ON_OBSERVE_UCC + Unit previous; +#endif + } +VSsCritical; + enum VSsReqType { submit_task = 1, @@ -145,6 +157,9 @@ receive_from_to, //=============================== taskwait, + taskwait_on, + critical_start, + critical_end, malloc_req, free_req, singleton_fn_start, @@ -186,6 +201,7 @@ void *dataForFn; int32 transID; + int32 criticalID; } /* VSsSemReq */; @@ -206,6 +222,7 @@ //fix limit on num with dynArray VSsSingleton fnSingletons[NUM_STRUCS_IN_SEM_ENV]; VSsTrans transactionStrucs[NUM_STRUCS_IN_SEM_ENV]; + VSsCritical criticalSection[NUM_STRUCS_IN_SEM_ENV]; bool32 *coreIsDone; int32 numCoresDone; @@ -319,6 +336,14 @@ void VSs__taskwait(SlaveVP *animSlv); +void +VSs__taskwait_on(SlaveVP *animSlv,void* ptr); + +void +VSs__start_critical(SlaveVP *animSlv,int32 name); + +void +VSs__end_critical(SlaveVP *animSlv,int32 name); int32 * VSs__give_self_taskID( SlaveVP *animSlv ); diff -r a7ca8f45c1c4 -r feea343d202f VSs_PluginFns.c --- a/VSs_PluginFns.c Tue Sep 25 16:12:40 2012 +0200 +++ b/VSs_PluginFns.c Mon Oct 29 16:57:56 2012 +0100 @@ -79,7 +79,9 @@ } #ifdef EXTERNAL_SCHEDULER - VSs__get_ready_tasks_from_ext(semEnv->taskReadyQ); + if(isEmptyPrivQ(semEnv->taskReadyQ)){ + VSs__get_ready_tasks_from_ext(semEnv->taskReadyQ); + } #endif //If none, speculate will have a task, so get the slot slave //TODO: false sharing ? (think not bad cause mostly read..) @@ -97,6 +99,8 @@ newTaskStub->slaveAssignedTo = returnSlv; semData->needsTaskAssigned = FALSE; #ifdef HOLISTIC__TURN_ON_OBSERVE_UCC + newTaskStub->firstOfTask.task = returnSlv->assignCount+1; + newTaskStub->firstOfTask.vp = returnSlv->slaveID; Dependency newd; newd.from_vp = newTaskStub->parentUnit.vp; newd.from_task = newTaskStub->parentUnit.task; @@ -251,7 +255,12 @@ break; case taskwait: handleTaskwait(semReq, reqSlv, semEnv); break; - + case taskwait_on: handleTaskwaitOn(semReq, reqSlv, semEnv); + break; + case critical_start: handleCriticalStart(semReq, reqSlv, semEnv); + break; + case critical_end: handleCriticalEnd(semReq, reqSlv, semEnv); + break; //==================================================================== case malloc_req: handleMalloc(semReq, reqSlv, semEnv); break; @@ -271,7 +280,7 @@ break; case trans_end: handleTransEnd(semReq, reqSlv, semEnv); break; - default: + default: VMS_PI__throw_exception("Unknown request type", reqSlv, NULL); break; } } diff -r a7ca8f45c1c4 -r feea343d202f VSs_Request_Handlers.c --- a/VSs_Request_Handlers.c Tue Sep 25 16:12:40 2012 +0200 +++ b/VSs_Request_Handlers.c Mon Oct 29 16:57:56 2012 +0100 @@ -117,6 +117,7 @@ newCarrier->taskStub = taskStub; newCarrier->argNum = argNum; newCarrier->isReader = rdOrWrite == READER; + newCarrier->isSuspended = FALSE; return newCarrier; } @@ -224,7 +225,7 @@ */ void handleSubmitTask(VSsSemReq *semReq, VSsSemEnv *semEnv) { - uint32 key[3]; + uint32 key[5]; HashEntry *rawHashEntry; //has char *, but use with uint32 * VSsPointerEntry *ptrEntry; //contents of hash table entry for an arg pointer void **args; @@ -276,9 +277,10 @@ */ int32 argNum; for (argNum = 0; argNum < taskType->numCtldArgs; argNum++) { - key[0] = 2; //two 32b values in key + key[0] = 4; //two 32b values in key *((uint64*) & key[1]) = (uint64) args[argNum]; //write 64b into two 32b - + *((uint64*) & key[3]) = (uint64) taskStub->parentTaskStub ; + /*If the hash entry was chained, put it at the * start of the chain. (Means no-longer-used pointers accumulate * at end of chain, decide garbage collection later) */ @@ -305,8 +307,15 @@ * task-stub into the readyQ. At the same time, increment * the hash-entry's count of enabled and non-finished readers.*/ taskStub->numBlockingProp -= 1; + if(taskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"Reader %d now on ptrEntry %p; ",taskStub->taskID[1],ptrEntry) } + else {DEBUG__printf2(dbgRqstHdlr,"Reader %p now on ptrEntry %p; ",taskStub,ptrEntry)} if (taskStub->numBlockingProp == 0) { writePrivQ(taskStub, semEnv->taskReadyQ); + if(taskStub->taskID) { DEBUG__printf1(dbgRqstHdlr,"reader %d started; ",taskStub->taskID[1]) } + else { DEBUG__printf1(dbgRqstHdlr,"reader %p started; ",taskStub) } + } else { + if(taskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"reader %d still blocked on %d args; ",taskStub->taskID[1],taskStub->numBlockingProp) } + else {DEBUG__printf2(dbgRqstHdlr,"reader %p still blocked on %d args; ",taskStub,taskStub->numBlockingProp)} } ptrEntry->numEnabledNonDoneReaders += 1; } else { /*Otherwise, the reader is put into the hash-entry's Q of @@ -324,8 +333,15 @@ * task-stub. If the count is zero, then put the task-stub * into the readyQ.*/ taskStub->numBlockingProp -= 1; + if(taskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d takes ptrEntry %p; ",taskStub->taskID[1],ptrEntry) } + else { DEBUG__printf2(dbgRqstHdlr,"writer %p takes ptrEntry %p; ",taskStub,ptrEntry)} if (taskStub->numBlockingProp == 0) { writePrivQ(taskStub, semEnv->taskReadyQ); + if(taskStub->taskID) { DEBUG__printf1(dbgRqstHdlr,"writer %d started; ",taskStub->taskID[1]) } + else { DEBUG__printf1(dbgRqstHdlr,"writer %p started; ",taskStub) } + } else { + if(taskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d still blocked on %d args; ",taskStub->taskID[1],taskStub->numBlockingProp) } + else {DEBUG__printf2(dbgRqstHdlr,"writer %p still blocked on %d args; ",taskStub,taskStub->numBlockingProp)} } ptrEntry->hasEnabledNonFinishedWriter = TRUE; } else {/*Otherwise, put the writer into the entry's Q of waiters.*/ @@ -452,6 +468,7 @@ if (endingTaskType->argTypes[argNum] == READER) { /*then decrement the enabled and non-finished reader-count in * the hash-entry. */ ptrEntry->numEnabledNonDoneReaders -= 1; + DEBUG__printf1(dbgRqstHdlr,"Releasing read on ptrEntry %p; ",ptrEntry) #ifdef HOLISTIC__TURN_ON_OBSERVE_UCC Unit u; u.vp = semReq->callingSlv->slaveID; @@ -461,16 +478,18 @@ Dependency newd; newd.from_vp = ptrEntry->lastWriter.vp; newd.from_task = ptrEntry->lastWriter.task; - newd.to_vp = semReq->callingSlv->slaveID; - newd.to_task = semReq->callingSlv->assignCount; + newd.to_vp = endingTaskStub->firstOfTask.vp; + newd.to_task = endingTaskStub->firstOfTask.task; addToListOfArrays(Dependency, newd, semEnv->dataDependenciesList); } #endif /*If the count becomes zero, then take the next entry from the Q. *It should be a writer, or else there's a bug in this algorithm.*/ if (ptrEntry->numEnabledNonDoneReaders == 0) { + DEBUG__printf1(dbgRqstHdlr,"ptrEntry %p now free; ",ptrEntry) waitingTaskCarrier = readPrivQ(ptrEntry->waitersQ); - if (waitingTaskCarrier == NULL) { //TODO: looks safe to delete the ptr entry at this point + if (waitingTaskCarrier == NULL) { //TODO: looks safe to delete the ptr entry at this point + DEBUG__printf1(dbgRqstHdlr,"no waiting writer found for ptrEntry %p\n",ptrEntry) continue; //next iter of loop } if (waitingTaskCarrier->isReader) @@ -485,10 +504,19 @@ * task-stub. If the count has reached zero, then put the * task-stub into the readyQ.*/ waitingTaskStub->numBlockingProp -= 1; + if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d takes ptrEntry %p; ",waitingTaskStub->taskID[1],ptrEntry) } + else { DEBUG__printf2(dbgRqstHdlr,"writer %p takes ptrEntry %p; ",waitingTaskStub,ptrEntry)} if (waitingTaskStub->numBlockingProp == 0) { + if(waitingTaskStub->taskID) { DEBUG__printf1(dbgRqstHdlr,"writer %d started; ",waitingTaskStub->taskID[1]) } + else { DEBUG__printf1(dbgRqstHdlr,"writer %p started; ",waitingTaskStub) } writePrivQ(waitingTaskStub, semEnv->taskReadyQ); + } else { + if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d still blocked on %d args; ",waitingTaskStub->taskID[1],waitingTaskStub->numBlockingProp) } + else {DEBUG__printf2(dbgRqstHdlr,"writer %p still blocked on %d args; ",waitingTaskStub,waitingTaskStub->numBlockingProp)} } + VMS_PI__free(waitingTaskCarrier); } + } else /*the ending task is a writer of this arg*/ { /*clear the enabled non-finished writer flag of the hash-entry.*/ ptrEntry->hasEnabledNonFinishedWriter = FALSE; #ifdef HOLISTIC__TURN_ON_OBSERVE_UCC @@ -501,8 +529,8 @@ Dependency newd; newd.from_vp = fragment[i].vp; newd.from_task = fragment[i].task; - newd.to_vp = semReq->callingSlv->slaveID; - newd.to_task = semReq->callingSlv->assignCount; + newd.to_vp = endingTaskStub->firstOfTask.vp; + newd.to_task = endingTaskStub->firstOfTask.task; addToListOfArrays(Dependency, newd, semEnv->warDependenciesList); } } @@ -513,8 +541,8 @@ Dependency newd; newd.from_vp = fragment[i].vp; newd.from_task = fragment[i].task; - newd.to_vp = semReq->callingSlv->slaveID; - newd.to_task = semReq->callingSlv->assignCount; + newd.to_vp = endingTaskStub->firstOfTask.vp; + newd.to_task = endingTaskStub->firstOfTask.task; addToListOfArrays(Dependency, newd, semEnv->warDependenciesList); } } @@ -522,43 +550,70 @@ clearListOfArrays(ptrEntry->readersSinceLastWriter); ptrEntry->lastWriter.vp = semReq->callingSlv->slaveID; ptrEntry->lastWriter.task = semReq->callingSlv->assignCount; - #endif + #endif + DEBUG__printf1(dbgRqstHdlr,"Releasing write on ptrEntry %p; ",ptrEntry) /*Take the next waiter from the hash-entry's Q.*/ waitingTaskCarrier = readPrivQ(ptrEntry->waitersQ); if (waitingTaskCarrier == NULL) { //TODO: looks safe to delete ptr entry at this point + DEBUG__printf1(dbgRqstHdlr,"no waiting task on ptrEntry %p; ",ptrEntry) continue; //go to next iter of loop, done here. } waitingTaskStub = waitingTaskCarrier->taskStub; /*If task is a writer of this hash-entry's pointer*/ if (!waitingTaskCarrier->isReader) { /* then turn the flag back on.*/ + if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d takes ptrEntry %p; ",waitingTaskStub->taskID[1],ptrEntry) } + else { DEBUG__printf2(dbgRqstHdlr,"Writer %p takes ptrEntry %p; ",waitingTaskStub,ptrEntry)} ptrEntry->hasEnabledNonFinishedWriter = TRUE; /*Decrement the writer's blocking-propendent-count in task-stub * If it becomes zero, then put the task-stub into the readyQ.*/ waitingTaskStub->numBlockingProp -= 1; if (waitingTaskStub->numBlockingProp == 0) { + if(waitingTaskStub->taskID) { DEBUG__printf1(dbgRqstHdlr,"writer %d started; ",waitingTaskStub->taskID[1]) } + else {DEBUG__printf1(dbgRqstHdlr,"writer %p started; ",waitingTaskStub)} writePrivQ(waitingTaskStub, semEnv->taskReadyQ); + } else { + if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d still blocked on %d args; ",waitingTaskStub->taskID[1],waitingTaskStub->numBlockingProp) } + else {DEBUG__printf2(dbgRqstHdlr,"writer %p still blocked on %d args; ",waitingTaskStub,waitingTaskStub->numBlockingProp)} } + VMS_PI__free(waitingTaskCarrier); } else { /*Waiting task is a reader, so do a loop, of all waiting readers * until encounter a writer or waitersQ is empty*/ while (TRUE) /*The checks guarantee have a waiting reader*/ { /*Increment the hash-entry's count of enabled non-finished * readers.*/ - ptrEntry->numEnabledNonDoneReaders += 1; + //deal with tasks suspended by taskwait_on here - these don't count as a dependency but are otherwise treated like readers + if(waitingTaskCarrier->isSuspended){ + if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d still blocked on %d args; ",waitingTaskStub->taskID[1],waitingTaskStub->numBlockingProp) } + else {DEBUG__printf2(dbgRqstHdlr,"task %p taskwaiting on ptr %p resumed; ",waitingTaskStub,ptrEntry)} + resume_slaveVP(waitingTaskStub->slaveAssignedTo, semEnv); + } else { - /*Decrement the blocking propendents count of the reader's - * task-stub. If it reaches zero, then put the task-stub - * into the readyQ.*/ - waitingTaskStub->numBlockingProp -= 1; - if (waitingTaskStub->numBlockingProp == 0) { - writePrivQ(waitingTaskStub, semEnv->taskReadyQ); - } + ptrEntry->numEnabledNonDoneReaders += 1; + if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"Reader %d now on ptrEntry %p; ",waitingTaskStub->taskID[1],ptrEntry) } + else {DEBUG__printf2(dbgRqstHdlr,"Reader %p now on ptrEntry %p; ",waitingTaskStub,ptrEntry)} + /*Decrement the blocking propendents count of the reader's + * task-stub. If it reaches zero, then put the task-stub + * into the readyQ.*/ + waitingTaskStub->numBlockingProp -= 1; + + if (waitingTaskStub->numBlockingProp == 0) { + if(waitingTaskStub->taskID) { DEBUG__printf1(dbgRqstHdlr,"writer %d started; ",waitingTaskStub->taskID[1]) } + else {DEBUG__printf1(dbgRqstHdlr,"writer %p started; ",waitingTaskStub)} + writePrivQ(waitingTaskStub, semEnv->taskReadyQ); + } else { + if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"reader %d still blocked on %d args; ",waitingTaskStub->taskID[1],waitingTaskStub->numBlockingProp) } + else {DEBUG__printf2(dbgRqstHdlr,"reader %p still blocked on %d args; ",waitingTaskStub,waitingTaskStub->numBlockingProp)} + } + } //if-else, suspended or normal reader + //discard carrier + VMS_PI__free(waitingTaskCarrier); /*Get next waiting task*/ waitingTaskCarrier = peekPrivQ(ptrEntry->waitersQ); if (waitingTaskCarrier == NULL) break; if (!waitingTaskCarrier->isReader) break; waitingTaskCarrier = readPrivQ(ptrEntry->waitersQ); - waitingTaskStub = waitingTaskCarrier->taskStub; + waitingTaskStub = waitingTaskCarrier->taskStub; }//while waiter is a reader }//if-else, first waiting task is a reader }//if-else, check of ending task, whether writer or reader @@ -1018,7 +1073,91 @@ } } +void handleTaskwaitOn(VSsSemReq *semReq, SlaveVP *requestingSlv, VSsSemEnv *semEnv) { + VSsTaskStub* requestingTaskStub; + VSsSemData* semData; + VSsPointerEntry* ptrEntry; + VSsTaskStubCarrier *taskCarrier; + uint32 key[5]; + HashEntry* rawHashEntry; //has char *, but use with uint32 * + HashTable* argPtrHashTbl = semEnv->argPtrHashTbl; + + semData = (VSsSemData *) semReq->callingSlv->semanticData; + requestingTaskStub = semData->taskStub; + + if(requestingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr, + "TaskwaitOn request from processor %d, task: %d",requestingSlv->slaveID, + requestingTaskStub->taskID[1]) } + else {DEBUG__printf1(dbgRqstHdlr, "TaskwaitOn request from processor %d", + requestingSlv->slaveID);} + + void* ptr = semReq->args; + + key[0] = 4; //two 32b values in key + *((uint64*) & key[1]) = (uint64) ptr; //write 64b into two 32b + *((uint64*) & key[3]) = (uint64) requestingTaskStub->parentTaskStub; + /*If the hash entry was chained, put it at the + * start of the chain. (Means no-longer-used pointers accumulate + * at end of chain, decide garbage collection later) */ + rawHashEntry = getEntryFromTable32(key, argPtrHashTbl); + if (rawHashEntry == NULL) { //adding a value auto-creates the hash-entry + ptrEntry = create_pointer_entry(); + rawHashEntry = addValueIntoTable32(key, ptrEntry, argPtrHashTbl); + } else { + ptrEntry = (VSsPointerEntry *) rawHashEntry->content; + if (ptrEntry == NULL) { + ptrEntry = create_pointer_entry(); + rawHashEntry = addValueIntoTable32(key, ptrEntry, argPtrHashTbl); + } + } + + if (!ptrEntry->hasEnabledNonFinishedWriter && + isEmptyPrivQ(ptrEntry->waitersQ)) { + resume_slaveVP(requestingSlv, semEnv); + } else { /*Otherwise, the suspended task is put into the hash-entry's Q of + * waiters*/ + taskCarrier = create_task_carrier(requestingTaskStub, -1, READER); + taskCarrier->isSuspended = TRUE; + writePrivQ(taskCarrier, ptrEntry->waitersQ); + } +} + +void handleCriticalStart(VSsSemReq *semReq, SlaveVP *requestingSlv, VSsSemEnv *semEnv){ + VSsSemData* semData; + int32 criticalID; + DEBUG__printf1(dbgRqstHdlr, "CriticalStart request from processor %d", + requestingSlv->slaveID) + + semData = (VSsSemData *) semReq->callingSlv->semanticData; + + criticalID = semReq->criticalID; + if(!semEnv->criticalSection[criticalID].isOccupied){ + semEnv->criticalSection[criticalID].isOccupied = TRUE; + resume_slaveVP(requestingSlv, semEnv); + } else { + writePrivQ(requestingSlv, semEnv->criticalSection[criticalID].waitQ); + } +} + +void handleCriticalEnd(VSsSemReq *semReq, SlaveVP *requestingSlv, VSsSemEnv *semEnv){ + VSsSemData* semData; + SlaveVP *waitingSlv; + int32 criticalID; + DEBUG__printf1(dbgRqstHdlr, "CriticalEnd request from processor %d", + requestingSlv->slaveID) + + semData = (VSsSemData *) semReq->callingSlv->semanticData; + + criticalID = semReq->criticalID; + semEnv->criticalSection[criticalID].isOccupied = FALSE; + waitingSlv = readPrivQ(semEnv->criticalSection[criticalID].waitQ); + if(waitingSlv!=NULL){ + semEnv->criticalSection[criticalID].isOccupied = TRUE; + resume_slaveVP(waitingSlv, semEnv); + } + resume_slaveVP(requestingSlv, semEnv); +} //========================================================================== /* diff -r a7ca8f45c1c4 -r feea343d202f VSs_Request_Handlers.h --- a/VSs_Request_Handlers.h Tue Sep 25 16:12:40 2012 +0200 +++ b/VSs_Request_Handlers.h Mon Oct 29 16:57:56 2012 +0100 @@ -28,6 +28,12 @@ handleReceiveFromTo( VSsSemReq *semReq, VSsSemEnv *semEnv); void handleTaskwait(VSsSemReq *semReq, SlaveVP *requestingSlv, VSsSemEnv *semEnv); +void +handleTaskwaitOn(VSsSemReq *semReq, SlaveVP *requestingSlv, VSsSemEnv *semEnv); +void +handleCriticalStart(VSsSemReq *semReq, SlaveVP *requestingSlv, VSsSemEnv *semEnv); +void +handleCriticalEnd(VSsSemReq *semReq, SlaveVP *requestingSlv, VSsSemEnv *semEnv); void handleMalloc( VSsSemReq *semReq, SlaveVP *requestingSlv, VSsSemEnv *semEnv);