# HG changeset patch # User Nina Engelhardt # Date 1356607665 -3600 # Node ID b787a5234406fd0dcce1d2e146703f0be21637af # Parent feea343d202fbde09d3b43ff0616b372b5be729b add task throttle diff -r feea343d202f -r b787a5234406 VSs.c --- a/VSs.c Mon Oct 29 16:57:56 2012 +0100 +++ b/VSs.c Thu Dec 27 12:27:45 2012 +0100 @@ -205,6 +205,7 @@ semanticEnv->shutdownInitiated = FALSE; semanticEnv->coreIsDone = VMS_int__malloc( NUM_CORES * sizeof( bool32 ) ); + semanticEnv->numCoresDone = 0; //For each animation slot, there is an idle slave, and an initial // slave assigned as the current-task-slave. Create them here. SlaveVP *idleSlv, *slotTaskSlv; @@ -212,11 +213,14 @@ { semanticEnv->coreIsDone[coreNum] = FALSE; //use during shutdown for( slotNum = 0; slotNum < NUM_ANIM_SLOTS; ++slotNum ) - { idleSlv = VSs__create_slave_helper( &idle_fn, NULL, semanticEnv, 0); + { +#ifdef IDLE_SLAVES + idleSlv = VSs__create_slave_helper( &idle_fn, NULL, semanticEnv, 0); idleSlv->coreAnimatedBy = coreNum; idleSlv->animSlotAssignedTo = _VMSMasterEnv->allAnimSlots[coreNum][slotNum]; semanticEnv->idleSlv[coreNum][slotNum] = idleSlv; +#endif slotTaskSlv = VSs__create_slave_helper( &idle_fn, NULL, semanticEnv, 0); slotTaskSlv->coreAnimatedBy = coreNum; @@ -240,6 +244,8 @@ semanticEnv->nextCoreToGetNewSlv = 0; + semanticEnv->numInFlightTasks = 0; + semanticEnv->deferredSubmitsQ = makeVMSQ(); #ifdef EXTERNAL_SCHEDULER VSs__init_ext_scheduler(); #endif @@ -410,17 +416,30 @@ } #endif -/* It's all allocated inside VMS's big chunk -- that's about to be freed, so - * nothing to do here - + /* It's all allocated inside VMS's big chunk -- that's about to be freed, so + * nothing to do here */ +/* + int coreIdx, slotIdx; + SlaveVP* slotSlv; + for (coreIdx = 0; coreIdx < NUM_CORES; coreIdx++) { + for (slotIdx = 0; slotIdx < NUM_ANIM_SLOTS; slotIdx++) { + slotSlv = semanticEnv->slotTaskSlvs[coreIdx][slotIdx]; + VMS_int__free(slotSlv->semanticData); + VMS_int__free( slotSlv->startOfStack ); + VMS_int__free( slotSlv ); +#ifdef IDLE_SLAVES + slotSlv = semanticEnv->idleSlv[coreIdx][slotIdx]; + VMS_int__free(slotSlv->semanticData); + VMS_int__free( slotSlv->startOfStack ); + VMS_int__free( slotSlv ); +#endif + } + } - for( coreIdx = 0; coreIdx < NUM_CORES; coreIdx++ ) - { - VMS_int__free( semanticEnv->readyVPQs[coreIdx]->startOfData ); - VMS_int__free( semanticEnv->readyVPQs[coreIdx] ); - } - VMS_int__free( semanticEnv->readyVPQs ); - + freePrivQ(semanticEnv->freeExtraTaskSlvQ); + freePrivQ(semanticEnv->slavesReadyToResumeQ); + freePrivQ(semanticEnv->taskReadyQ); + freeHashTable( semanticEnv->argPtrHashTbl ); freeHashTable( semanticEnv->commHashTbl ); VMS_int__free( _VMSMasterEnv->semanticEnv ); */ diff -r feea343d202f -r b787a5234406 VSs.h --- a/VSs.h Mon Oct 29 16:57:56 2012 +0100 +++ b/VSs.h Thu Dec 27 12:27:45 2012 +0100 @@ -25,6 +25,8 @@ //=========================================================================== #define NUM_STRUCS_IN_SEM_ENV 1000 +#define MAX_TASKS_NUM 256 + //This is hardware dependent -- it's the number of cycles of scheduling // overhead -- if a work unit is fewer than this, it is better being // combined sequentially with other work @@ -227,6 +229,10 @@ bool32 *coreIsDone; int32 numCoresDone; + int numInFlightTasks; + PrivQueueStruc *deferredSubmitsQ; + int numDeferred; + #ifdef HOLISTIC__TURN_ON_OBSERVE_UCC ListOfArrays* unitList; ListOfArrays* ctlDependenciesList; @@ -244,7 +250,9 @@ #ifdef HOLISTIC__TURN_ON_PERF_COUNTERS ListOfArrays* counterList[NUM_CORES]; #endif + #ifdef IDLE_SLAVES SlaveVP* idleSlv[NUM_CORES][NUM_ANIM_SLOTS]; + #endif int shutdownInitiated; } VSsSemEnv; diff -r feea343d202f -r b787a5234406 VSs_PluginFns.c --- a/VSs_PluginFns.c Mon Oct 29 16:57:56 2012 +0100 +++ b/VSs_PluginFns.c Thu Dec 27 12:27:45 2012 +0100 @@ -415,7 +415,8 @@ newStub->isWaitingForChildTasksToEnd = FALSE; newStub->isWaitingForChildThreadsToEnd = FALSE; newStub->taskID = NULL; - + newStub->isEnded = FALSE; + return newStub; } diff -r feea343d202f -r b787a5234406 VSs_Request_Handlers.c --- a/VSs_Request_Handlers.c Mon Oct 29 16:57:56 2012 +0100 +++ b/VSs_Request_Handlers.c Thu Dec 27 12:27:45 2012 +0100 @@ -81,6 +81,12 @@ return newEntry; } +void free_pointer_entry(VSsPointerEntry* ptrEntry){ + freePrivQ(ptrEntry->waitersQ); + VMS_int__free(ptrEntry); +} + + /*malloc's space and initializes fields -- and COPIES the arg values * to new space */ @@ -122,6 +128,130 @@ return newCarrier; } +void do_submit(VSsSemReq *semReq, VSsSemEnv *semEnv){ + uint32 key[5]; + HashEntry *rawHashEntry; //has char *, but use with uint32 * + VSsPointerEntry *ptrEntry; //contents of hash table entry for an arg pointer + void **args; + VSsTaskStub *taskStub; + VSsTaskType *taskType; + VSsTaskStubCarrier *taskCarrier; + + HashTable * + argPtrHashTbl = semEnv->argPtrHashTbl; + + if(!semReq) { + DEBUG__printf(dbgRqstHdlr,"***submitted Req is null***\n") + return;} + + semEnv->numInFlightTasks++; + /* ========================== creation ========================== + * + *At creation, make a task-stub. Set the count of blocking propendents + * to the number of controlled arguments (a task can have + * arguments that are not controlled by the language, like simple integer + * inputs from the sequential portion. Note that all controlled arguments + * are pointers, and marked as controlled in the application code). + */ + args = semReq->args; + taskType = semReq->taskType; + taskStub = create_task_stub(taskType, args); //copies arg ptrs + taskStub->numBlockingProp = taskType->numCtldArgs; + taskStub->taskID = semReq->taskID; //may be NULL + + VSsSemData* + parentSemData = (VSsSemData*) semReq->callingSlv->semanticData; + taskStub->parentTaskStub = (void*) parentSemData->taskStub; + parentSemData->taskStub->numLiveChildTasks += 1; +#ifdef HOLISTIC__TURN_ON_OBSERVE_UCC + taskStub->parentUnit.vp = semReq->callingSlv->slaveID; + taskStub->parentUnit.task = semReq->callingSlv->assignCount; +#endif + + //DEBUG__printf3(dbgRqstHdlr,"Submit req from slaveID: %d, from task: %d, for task: %d", semReq->callingSlv->slaveID, parentSemData->taskStub->taskID[1], taskStub->taskID[1]) + +#ifdef EXTERNAL_SCHEDULER + //send task descriptor + VSs__submit_task_to_ext(taskStub, semEnv); +#else + /*The controlled arguments are then processed one by one. + *Processing an argument means getting the hash of the pointer. Then, + * looking up the hash entry. (If none, create one). + */ + int32 argNum; + for (argNum = 0; argNum < taskType->numCtldArgs; argNum++) { + key[0] = 4; //two 32b values in key + *((uint64*) & key[1]) = (uint64) args[argNum]; //write 64b into two 32b + *((uint64*) & key[3]) = (uint64) taskStub->parentTaskStub ; + + /*If the hash entry was chained, put it at the + * start of the chain. (Means no-longer-used pointers accumulate + * at end of chain, decide garbage collection later) */ + rawHashEntry = getEntryFromTable32(key, argPtrHashTbl); + if (rawHashEntry == NULL) { //adding a value auto-creates the hash-entry + ptrEntry = create_pointer_entry(); + rawHashEntry = addValueIntoTable32(key, ptrEntry, argPtrHashTbl); + } else { + ptrEntry = (VSsPointerEntry *) rawHashEntry->content; + if (ptrEntry == NULL) { + ptrEntry = create_pointer_entry(); + rawHashEntry = addValueIntoTable32(key, ptrEntry, argPtrHashTbl); + } + } + taskStub->ptrEntries[argNum] = ptrEntry; + + /*Have the hash entry. + *If the arg is a reader and the entry does not have an enabled + * non-finished writer, and the queue is empty. */ + if (taskType->argTypes[argNum] == READER) { + if (!ptrEntry->hasEnabledNonFinishedWriter && + isEmptyPrivQ(ptrEntry->waitersQ)) { /*The reader is free. So, decrement the blocking-propendent + * count in the task-stub. If the count is zero, then put the + * task-stub into the readyQ. At the same time, increment + * the hash-entry's count of enabled and non-finished readers.*/ + taskStub->numBlockingProp -= 1; + DEBUG__printf_w_task(dbgRqstHdlr, taskStub, "taking ptrEntry %p (read)", ptrEntry); + if (taskStub->numBlockingProp == 0) { + writePrivQ(taskStub, semEnv->taskReadyQ); + DEBUG__printf_w_task(dbgRqstHdlr, taskStub, "ready (dependencies fulfilled)"); + } else { + DEBUG__printf_w_task(dbgRqstHdlr,taskStub,"still blocked on %d args",taskStub->numBlockingProp); + } + ptrEntry->numEnabledNonDoneReaders += 1; + } else { /*Otherwise, the reader is put into the hash-entry's Q of + * waiters*/ + taskCarrier = create_task_carrier(taskStub, argNum, READER); + writePrivQ(taskCarrier, ptrEntry->waitersQ); + } + } else //arg is a writer + { /*the arg is a writer, plus the entry does not have a current + * writer, plus the number of enabled non-finished readers is + * zero, (the Q must be empty, else bug!) then the writer is free*/ + if (!ptrEntry->hasEnabledNonFinishedWriter && + ptrEntry->numEnabledNonDoneReaders == 0) { /*Mark the entry has having a enabled and non-finished writer. + * Decrement the blocking-propenden count in the writer's + * task-stub. If the count is zero, then put the task-stub + * into the readyQ.*/ + taskStub->numBlockingProp -= 1; + DEBUG__printf_w_task(dbgRqstHdlr,taskStub,"taking ptrEntry %p (write)",ptrEntry); + if (taskStub->numBlockingProp == 0) { + DEBUG__printf_w_task(dbgRqstHdlr, taskStub, "ready (dependencies fulfilled)"); + writePrivQ(taskStub, semEnv->taskReadyQ); + } else { + DEBUG__printf_w_task(dbgRqstHdlr,taskStub,"still blocked on %d args",taskStub->numBlockingProp); + } + ptrEntry->hasEnabledNonFinishedWriter = TRUE; + } else {/*Otherwise, put the writer into the entry's Q of waiters.*/ + taskCarrier = create_task_carrier(taskStub, argNum, WRITER); + writePrivQ(taskCarrier, ptrEntry->waitersQ); + } + } + } //for argNum +#endif + + resume_slaveVP(semReq->callingSlv, semEnv); +} + //========================================================================== // // @@ -225,134 +355,46 @@ */ void handleSubmitTask(VSsSemReq *semReq, VSsSemEnv *semEnv) { - uint32 key[5]; - HashEntry *rawHashEntry; //has char *, but use with uint32 * - VSsPointerEntry *ptrEntry; //contents of hash table entry for an arg pointer - void **args; - VSsTaskStub *taskStub; - VSsTaskType *taskType; - VSsTaskStubCarrier *taskCarrier; - HashTable * - argPtrHashTbl = semEnv->argPtrHashTbl; - //suspending a task always makes the slave into an extra slot slave, - // because it ends up in the resumeQ, even when resumes immediately. - //Eventually task_end will put the slave into the freeExtraTaskSlvQ - replaceWithNewSlotSlvIfNeeded( semReq->callingSlv, semEnv ); + //suspending a task always makes the slave into an extra slot slave, + // because it ends up in the resumeQ, even when resumes immediately. + //Eventually task_end will put the slave into the freeExtraTaskSlvQ + replaceWithNewSlotSlvIfNeeded(semReq->callingSlv, semEnv); - /* ========================== creation ========================== - * - *At creation, make a task-stub. Set the count of blocking propendents - * to the number of controlled arguments (a task can have - * arguments that are not controlled by the language, like simple integer - * inputs from the sequential portion. Note that all controlled arguments - * are pointers, and marked as controlled in the application code). - */ - args = semReq->args; - taskType = semReq->taskType; - taskStub = create_task_stub(taskType, args); //copies arg ptrs - taskStub->numBlockingProp = taskType->numCtldArgs; - taskStub->taskID = semReq->taskID; //may be NULL +#ifdef DEBUG__TURN_ON_DEBUG_PRINT + if (dbgRqstHdlr) { + if (semReq->taskID) { + printf("Task "); + int i; + for (i = 1; i < semReq->taskID[0]; i++) { + printf("%d,", semReq->taskID[i]); + } + printf("%d: ", semReq->taskID[i]); + } else { + printf("Anonymous "); + } + } +#endif + DEBUG__printf(dbgRqstHdlr, "submit req from slaveID %d", semReq->callingSlv->slaveID); + + // Throttle if too many tasks + + if (!isEmptyPrivQ(semEnv->deferredSubmitsQ) || semEnv->numInFlightTasks > MAX_TASKS_NUM) { + semEnv->numDeferred++; + writePrivQ(semReq, semEnv->deferredSubmitsQ); + while (!isEmptyPrivQ(semEnv->deferredSubmitsQ) && ( semEnv->numInFlightTasks < MAX_TASKS_NUM || isEmptyPrivQ(semEnv->taskReadyQ) )) { + do_submit(readPrivQ(semEnv->deferredSubmitsQ),semEnv); + semEnv->numDeferred--; + } + } else { - VSsSemData* - parentSemData = (VSsSemData*) semReq->callingSlv->semanticData; - taskStub->parentTaskStub = (void*) parentSemData->taskStub; - parentSemData->taskStub->numLiveChildTasks += 1; -#ifdef HOLISTIC__TURN_ON_OBSERVE_UCC - taskStub->parentUnit.vp = semReq->callingSlv->slaveID; - taskStub->parentUnit.task = semReq->callingSlv->assignCount; -#endif + do_submit(semReq,semEnv); + } - //DEBUG__printf3(dbgRqstHdlr,"Submit req from slaveID: %d, from task: %d, for task: %d", semReq->callingSlv->slaveID, parentSemData->taskStub->taskID[1], taskStub->taskID[1]) - if(semReq->taskID) { DEBUG__printf2(dbgRqstHdlr,"Submit req from slaveID: %d, for task: %d", semReq->callingSlv->slaveID, taskStub->taskID[1]) } - else { DEBUG__printf1(dbgRqstHdlr,"Submit req from slaveID: %d, for anonymous task", semReq->callingSlv->slaveID) } -#ifdef EXTERNAL_SCHEDULER - //send task descriptor - VSs__submit_task_to_ext(taskStub, semEnv); -#else - /*The controlled arguments are then processed one by one. - *Processing an argument means getting the hash of the pointer. Then, - * looking up the hash entry. (If none, create one). - */ - int32 argNum; - for (argNum = 0; argNum < taskType->numCtldArgs; argNum++) { - key[0] = 4; //two 32b values in key - *((uint64*) & key[1]) = (uint64) args[argNum]; //write 64b into two 32b - *((uint64*) & key[3]) = (uint64) taskStub->parentTaskStub ; - - /*If the hash entry was chained, put it at the - * start of the chain. (Means no-longer-used pointers accumulate - * at end of chain, decide garbage collection later) */ - rawHashEntry = getEntryFromTable32(key, argPtrHashTbl); - if (rawHashEntry == NULL) { //adding a value auto-creates the hash-entry - ptrEntry = create_pointer_entry(); - rawHashEntry = addValueIntoTable32(key, ptrEntry, argPtrHashTbl); - } else { - ptrEntry = (VSsPointerEntry *) rawHashEntry->content; - if (ptrEntry == NULL) { - ptrEntry = create_pointer_entry(); - rawHashEntry = addValueIntoTable32(key, ptrEntry, argPtrHashTbl); - } - } - taskStub->ptrEntries[argNum] = ptrEntry; - /*Have the hash entry. - *If the arg is a reader and the entry does not have an enabled - * non-finished writer, and the queue is empty. */ - if (taskType->argTypes[argNum] == READER) { - if (!ptrEntry->hasEnabledNonFinishedWriter && - isEmptyPrivQ(ptrEntry->waitersQ)) { /*The reader is free. So, decrement the blocking-propendent - * count in the task-stub. If the count is zero, then put the - * task-stub into the readyQ. At the same time, increment - * the hash-entry's count of enabled and non-finished readers.*/ - taskStub->numBlockingProp -= 1; - if(taskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"Reader %d now on ptrEntry %p; ",taskStub->taskID[1],ptrEntry) } - else {DEBUG__printf2(dbgRqstHdlr,"Reader %p now on ptrEntry %p; ",taskStub,ptrEntry)} - if (taskStub->numBlockingProp == 0) { - writePrivQ(taskStub, semEnv->taskReadyQ); - if(taskStub->taskID) { DEBUG__printf1(dbgRqstHdlr,"reader %d started; ",taskStub->taskID[1]) } - else { DEBUG__printf1(dbgRqstHdlr,"reader %p started; ",taskStub) } - } else { - if(taskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"reader %d still blocked on %d args; ",taskStub->taskID[1],taskStub->numBlockingProp) } - else {DEBUG__printf2(dbgRqstHdlr,"reader %p still blocked on %d args; ",taskStub,taskStub->numBlockingProp)} - } - ptrEntry->numEnabledNonDoneReaders += 1; - } else { /*Otherwise, the reader is put into the hash-entry's Q of - * waiters*/ - taskCarrier = create_task_carrier(taskStub, argNum, READER); - writePrivQ(taskCarrier, ptrEntry->waitersQ); - } - } else //arg is a writer - { /*the arg is a writer, plus the entry does not have a current - * writer, plus the number of enabled non-finished readers is - * zero, (the Q must be empty, else bug!) then the writer is free*/ - if (!ptrEntry->hasEnabledNonFinishedWriter && - ptrEntry->numEnabledNonDoneReaders == 0) { /*Mark the entry has having a enabled and non-finished writer. - * Decrement the blocking-propenden count in the writer's - * task-stub. If the count is zero, then put the task-stub - * into the readyQ.*/ - taskStub->numBlockingProp -= 1; - if(taskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d takes ptrEntry %p; ",taskStub->taskID[1],ptrEntry) } - else { DEBUG__printf2(dbgRqstHdlr,"writer %p takes ptrEntry %p; ",taskStub,ptrEntry)} - if (taskStub->numBlockingProp == 0) { - writePrivQ(taskStub, semEnv->taskReadyQ); - if(taskStub->taskID) { DEBUG__printf1(dbgRqstHdlr,"writer %d started; ",taskStub->taskID[1]) } - else { DEBUG__printf1(dbgRqstHdlr,"writer %p started; ",taskStub) } - } else { - if(taskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d still blocked on %d args; ",taskStub->taskID[1],taskStub->numBlockingProp) } - else {DEBUG__printf2(dbgRqstHdlr,"writer %p still blocked on %d args; ",taskStub,taskStub->numBlockingProp)} - } - ptrEntry->hasEnabledNonFinishedWriter = TRUE; - } else {/*Otherwise, put the writer into the entry's Q of waiters.*/ - taskCarrier = create_task_carrier(taskStub, argNum, WRITER); - writePrivQ(taskCarrier, ptrEntry->waitersQ); - } - } - } //for argNum -#endif - resume_slaveVP(semReq->callingSlv, semEnv); + return; } @@ -402,15 +444,15 @@ VSsTaskStubCarrier *waitingTaskCarrier; VSsPointerEntry **ptrEntries; - + semEnv->numInFlightTasks--; + endingSlvSemData = (VSsSemData *) semReq->callingSlv->semanticData; endingTaskStub = endingSlvSemData->taskStub; args = endingTaskStub->args; endingTaskType = endingTaskStub->taskType; ptrEntries = endingTaskStub->ptrEntries; //saved in stub when create - if(endingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"EndTask req from slaveID: %d, task: %d",semReq->callingSlv->slaveID, endingTaskStub->taskID[1]) } - else {DEBUG__printf1(dbgRqstHdlr,"EndTask req from slaveID: %d",semReq->callingSlv->slaveID)} + DEBUG__printf_w_task(dbgRqstHdlr,endingTaskStub,"EndTask req from slaveID %d",semReq->callingSlv->slaveID); //Check if parent was waiting on this task parent = (VSsTaskStub *) endingTaskStub->parentTaskStub; @@ -468,7 +510,7 @@ if (endingTaskType->argTypes[argNum] == READER) { /*then decrement the enabled and non-finished reader-count in * the hash-entry. */ ptrEntry->numEnabledNonDoneReaders -= 1; - DEBUG__printf1(dbgRqstHdlr,"Releasing read on ptrEntry %p; ",ptrEntry) + DEBUG__printf(dbgRqstHdlr,"Releasing read on ptrEntry %p",ptrEntry) #ifdef HOLISTIC__TURN_ON_OBSERVE_UCC Unit u; u.vp = semReq->callingSlv->slaveID; @@ -486,17 +528,18 @@ /*If the count becomes zero, then take the next entry from the Q. *It should be a writer, or else there's a bug in this algorithm.*/ if (ptrEntry->numEnabledNonDoneReaders == 0) { - DEBUG__printf1(dbgRqstHdlr,"ptrEntry %p now free; ",ptrEntry) + DEBUG__printf(dbgRqstHdlr,"ptrEntry %p now free",ptrEntry) waitingTaskCarrier = readPrivQ(ptrEntry->waitersQ); if (waitingTaskCarrier == NULL) { //TODO: looks safe to delete the ptr entry at this point - DEBUG__printf1(dbgRqstHdlr,"no waiting writer found for ptrEntry %p\n",ptrEntry) + DEBUG__printf(dbgRqstHdlr,"no waiting writer found for ptrEntry %p\n",ptrEntry) + //free_pointer_entry(ptrEntry); continue; //next iter of loop } if (waitingTaskCarrier->isReader) VMS_App__throw_exception("READER waiting", NULL, NULL); waitingTaskStub = waitingTaskCarrier->taskStub; - + VMS_PI__free(waitingTaskCarrier); /*Set the hash-entry to have an enabled non-finished writer.*/ ptrEntry->hasEnabledNonFinishedWriter = TRUE; @@ -504,17 +547,14 @@ * task-stub. If the count has reached zero, then put the * task-stub into the readyQ.*/ waitingTaskStub->numBlockingProp -= 1; - if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d takes ptrEntry %p; ",waitingTaskStub->taskID[1],ptrEntry) } - else { DEBUG__printf2(dbgRqstHdlr,"writer %p takes ptrEntry %p; ",waitingTaskStub,ptrEntry)} + DEBUG__printf_w_task(dbgRqstHdlr,waitingTaskStub,"taking ptrEntry %p (write)",ptrEntry); if (waitingTaskStub->numBlockingProp == 0) { - if(waitingTaskStub->taskID) { DEBUG__printf1(dbgRqstHdlr,"writer %d started; ",waitingTaskStub->taskID[1]) } - else { DEBUG__printf1(dbgRqstHdlr,"writer %p started; ",waitingTaskStub) } + DEBUG__printf_w_task(dbgRqstHdlr,waitingTaskStub,"ready (dependencies fulfilled)"); writePrivQ(waitingTaskStub, semEnv->taskReadyQ); } else { - if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d still blocked on %d args; ",waitingTaskStub->taskID[1],waitingTaskStub->numBlockingProp) } - else {DEBUG__printf2(dbgRqstHdlr,"writer %p still blocked on %d args; ",waitingTaskStub,waitingTaskStub->numBlockingProp)} + DEBUG__printf_w_task(dbgRqstHdlr,waitingTaskStub,"still blocked on %d args",waitingTaskStub->numBlockingProp); } - VMS_PI__free(waitingTaskCarrier); + } } else /*the ending task is a writer of this arg*/ { /*clear the enabled non-finished writer flag of the hash-entry.*/ @@ -552,30 +592,29 @@ ptrEntry->lastWriter.task = semReq->callingSlv->assignCount; #endif - DEBUG__printf1(dbgRqstHdlr,"Releasing write on ptrEntry %p; ",ptrEntry) + DEBUG__printf(dbgRqstHdlr,"Releasing write on ptrEntry %p; ",ptrEntry) /*Take the next waiter from the hash-entry's Q.*/ waitingTaskCarrier = readPrivQ(ptrEntry->waitersQ); if (waitingTaskCarrier == NULL) { //TODO: looks safe to delete ptr entry at this point - DEBUG__printf1(dbgRqstHdlr,"no waiting task on ptrEntry %p; ",ptrEntry) + DEBUG__printf(dbgRqstHdlr,"no waiting task on ptrEntry %p; deleting",ptrEntry); + //free_pointer_entry(ptrEntry); + //NOPE, still tasks around that kept the pointer... continue; //go to next iter of loop, done here. } waitingTaskStub = waitingTaskCarrier->taskStub; /*If task is a writer of this hash-entry's pointer*/ if (!waitingTaskCarrier->isReader) { /* then turn the flag back on.*/ - if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d takes ptrEntry %p; ",waitingTaskStub->taskID[1],ptrEntry) } - else { DEBUG__printf2(dbgRqstHdlr,"Writer %p takes ptrEntry %p; ",waitingTaskStub,ptrEntry)} + DEBUG__printf_w_task(dbgRqstHdlr,waitingTaskStub,"taking ptrEntry %p (write)",ptrEntry); ptrEntry->hasEnabledNonFinishedWriter = TRUE; /*Decrement the writer's blocking-propendent-count in task-stub * If it becomes zero, then put the task-stub into the readyQ.*/ waitingTaskStub->numBlockingProp -= 1; if (waitingTaskStub->numBlockingProp == 0) { - if(waitingTaskStub->taskID) { DEBUG__printf1(dbgRqstHdlr,"writer %d started; ",waitingTaskStub->taskID[1]) } - else {DEBUG__printf1(dbgRqstHdlr,"writer %p started; ",waitingTaskStub)} + DEBUG__printf_w_task(dbgRqstHdlr,waitingTaskStub,"ready (dependencies fulfilled)"); writePrivQ(waitingTaskStub, semEnv->taskReadyQ); } else { - if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d still blocked on %d args; ",waitingTaskStub->taskID[1],waitingTaskStub->numBlockingProp) } - else {DEBUG__printf2(dbgRqstHdlr,"writer %p still blocked on %d args; ",waitingTaskStub,waitingTaskStub->numBlockingProp)} + DEBUG__printf_w_task(dbgRqstHdlr,waitingTaskStub,"still blocked on %d args; ",waitingTaskStub->numBlockingProp); } VMS_PI__free(waitingTaskCarrier); } else { /*Waiting task is a reader, so do a loop, of all waiting readers @@ -584,26 +623,24 @@ * readers.*/ //deal with tasks suspended by taskwait_on here - these don't count as a dependency but are otherwise treated like readers if(waitingTaskCarrier->isSuspended){ - if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d still blocked on %d args; ",waitingTaskStub->taskID[1],waitingTaskStub->numBlockingProp) } - else {DEBUG__printf2(dbgRqstHdlr,"task %p taskwaiting on ptr %p resumed; ",waitingTaskStub,ptrEntry)} + DEBUG__printf_w_task(dbgRqstHdlr, waitingTaskStub, "taskwaiting on ptr %p resumed; ", ptrEntry); resume_slaveVP(waitingTaskStub->slaveAssignedTo, semEnv); } else { ptrEntry->numEnabledNonDoneReaders += 1; - if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"Reader %d now on ptrEntry %p; ",waitingTaskStub->taskID[1],ptrEntry) } - else {DEBUG__printf2(dbgRqstHdlr,"Reader %p now on ptrEntry %p; ",waitingTaskStub,ptrEntry)} + DEBUG__printf_w_task(dbgRqstHdlr, waitingTaskStub, "now on ptrEntry %p (read)",ptrEntry); + //if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"Reader %d now on ptrEntry %p; ",waitingTaskStub->taskID[1],ptrEntry) } + //else {DEBUG__printf2(dbgRqstHdlr,"Reader %p now on ptrEntry %p; ",waitingTaskStub,ptrEntry)} /*Decrement the blocking propendents count of the reader's * task-stub. If it reaches zero, then put the task-stub * into the readyQ.*/ waitingTaskStub->numBlockingProp -= 1; if (waitingTaskStub->numBlockingProp == 0) { - if(waitingTaskStub->taskID) { DEBUG__printf1(dbgRqstHdlr,"writer %d started; ",waitingTaskStub->taskID[1]) } - else {DEBUG__printf1(dbgRqstHdlr,"writer %p started; ",waitingTaskStub)} + DEBUG__printf_w_task(dbgRqstHdlr, waitingTaskStub, "ready (dependencies fulfilled)"); writePrivQ(waitingTaskStub, semEnv->taskReadyQ); } else { - if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"reader %d still blocked on %d args; ",waitingTaskStub->taskID[1],waitingTaskStub->numBlockingProp) } - else {DEBUG__printf2(dbgRqstHdlr,"reader %p still blocked on %d args; ",waitingTaskStub,waitingTaskStub->numBlockingProp)} + DEBUG__printf_w_task(dbgRqstHdlr,waitingTaskStub,"still blocked on %d args",waitingTaskStub->numBlockingProp); } } //if-else, suspended or normal reader //discard carrier @@ -611,7 +648,7 @@ /*Get next waiting task*/ waitingTaskCarrier = peekPrivQ(ptrEntry->waitersQ); if (waitingTaskCarrier == NULL) break; - if (!waitingTaskCarrier->isReader) break; + if (!waitingTaskCarrier->isReader) { waitingTaskCarrier = NULL; break;} waitingTaskCarrier = readPrivQ(ptrEntry->waitersQ); waitingTaskStub = waitingTaskCarrier->taskStub; }//while waiter is a reader @@ -636,6 +673,13 @@ writePrivQ(semReq->callingSlv, semEnv->freeExtraTaskSlvQ); } + + while (!isEmptyPrivQ(semEnv->deferredSubmitsQ) && (semEnv->numInFlightTasks < MAX_TASKS_NUM || isEmptyPrivQ(semEnv->taskReadyQ) )) { + do_submit(readPrivQ(semEnv->deferredSubmitsQ),semEnv); + semEnv->numDeferred--; + } + + //otherwise, it's a slot slave, so it will get used from matrix // so, do nothing with it, just return return; @@ -1086,11 +1130,8 @@ semData = (VSsSemData *) semReq->callingSlv->semanticData; requestingTaskStub = semData->taskStub; - if(requestingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr, - "TaskwaitOn request from processor %d, task: %d",requestingSlv->slaveID, - requestingTaskStub->taskID[1]) } - else {DEBUG__printf1(dbgRqstHdlr, "TaskwaitOn request from processor %d", - requestingSlv->slaveID);} + DEBUG__printf_w_task(dbgRqstHdlr, requestingTaskStub, "TaskwaitOn request from slave %d", + requestingSlv->slaveID); void* ptr = semReq->args;