Mercurial > cgi-bin > hgwebdir.cgi > VMS > VMS_Implementations > VSs_impls > VSs__MC_shared_impl
changeset 22:b787a5234406 dev_expl_VP_and_DKU
add task throttle
author | Nina Engelhardt <nengel@mailbox.tu-berlin.de> |
---|---|
date | Thu, 27 Dec 2012 12:27:45 +0100 |
parents | feea343d202f |
children | 3787df8b95f9 |
files | VSs.c VSs.h VSs_PluginFns.c VSs_Request_Handlers.c |
diffstat | 4 files changed, 238 insertions(+), 169 deletions(-) [+] |
line diff
1.1 --- a/VSs.c Mon Oct 29 16:57:56 2012 +0100 1.2 +++ b/VSs.c Thu Dec 27 12:27:45 2012 +0100 1.3 @@ -205,6 +205,7 @@ 1.4 1.5 semanticEnv->shutdownInitiated = FALSE; 1.6 semanticEnv->coreIsDone = VMS_int__malloc( NUM_CORES * sizeof( bool32 ) ); 1.7 + semanticEnv->numCoresDone = 0; 1.8 //For each animation slot, there is an idle slave, and an initial 1.9 // slave assigned as the current-task-slave. Create them here. 1.10 SlaveVP *idleSlv, *slotTaskSlv; 1.11 @@ -212,11 +213,14 @@ 1.12 { semanticEnv->coreIsDone[coreNum] = FALSE; //use during shutdown 1.13 1.14 for( slotNum = 0; slotNum < NUM_ANIM_SLOTS; ++slotNum ) 1.15 - { idleSlv = VSs__create_slave_helper( &idle_fn, NULL, semanticEnv, 0); 1.16 + { 1.17 +#ifdef IDLE_SLAVES 1.18 + idleSlv = VSs__create_slave_helper( &idle_fn, NULL, semanticEnv, 0); 1.19 idleSlv->coreAnimatedBy = coreNum; 1.20 idleSlv->animSlotAssignedTo = 1.21 _VMSMasterEnv->allAnimSlots[coreNum][slotNum]; 1.22 semanticEnv->idleSlv[coreNum][slotNum] = idleSlv; 1.23 +#endif 1.24 1.25 slotTaskSlv = VSs__create_slave_helper( &idle_fn, NULL, semanticEnv, 0); 1.26 slotTaskSlv->coreAnimatedBy = coreNum; 1.27 @@ -240,6 +244,8 @@ 1.28 1.29 semanticEnv->nextCoreToGetNewSlv = 0; 1.30 1.31 + semanticEnv->numInFlightTasks = 0; 1.32 + semanticEnv->deferredSubmitsQ = makeVMSQ(); 1.33 #ifdef EXTERNAL_SCHEDULER 1.34 VSs__init_ext_scheduler(); 1.35 #endif 1.36 @@ -410,17 +416,30 @@ 1.37 } 1.38 1.39 #endif 1.40 -/* It's all allocated inside VMS's big chunk -- that's about to be freed, so 1.41 - * nothing to do here 1.42 - 1.43 + /* It's all allocated inside VMS's big chunk -- that's about to be freed, so 1.44 + * nothing to do here */ 1.45 +/* 1.46 + int coreIdx, slotIdx; 1.47 + SlaveVP* slotSlv; 1.48 + for (coreIdx = 0; coreIdx < NUM_CORES; coreIdx++) { 1.49 + for (slotIdx = 0; slotIdx < NUM_ANIM_SLOTS; slotIdx++) { 1.50 + slotSlv = semanticEnv->slotTaskSlvs[coreIdx][slotIdx]; 1.51 + VMS_int__free(slotSlv->semanticData); 1.52 + VMS_int__free( slotSlv->startOfStack ); 1.53 + VMS_int__free( slotSlv ); 1.54 +#ifdef IDLE_SLAVES 1.55 + slotSlv = semanticEnv->idleSlv[coreIdx][slotIdx]; 1.56 + VMS_int__free(slotSlv->semanticData); 1.57 + VMS_int__free( slotSlv->startOfStack ); 1.58 + VMS_int__free( slotSlv ); 1.59 +#endif 1.60 + } 1.61 + } 1.62 1.63 - for( coreIdx = 0; coreIdx < NUM_CORES; coreIdx++ ) 1.64 - { 1.65 - VMS_int__free( semanticEnv->readyVPQs[coreIdx]->startOfData ); 1.66 - VMS_int__free( semanticEnv->readyVPQs[coreIdx] ); 1.67 - } 1.68 - VMS_int__free( semanticEnv->readyVPQs ); 1.69 - 1.70 + freePrivQ(semanticEnv->freeExtraTaskSlvQ); 1.71 + freePrivQ(semanticEnv->slavesReadyToResumeQ); 1.72 + freePrivQ(semanticEnv->taskReadyQ); 1.73 + freeHashTable( semanticEnv->argPtrHashTbl ); 1.74 freeHashTable( semanticEnv->commHashTbl ); 1.75 VMS_int__free( _VMSMasterEnv->semanticEnv ); 1.76 */
2.1 --- a/VSs.h Mon Oct 29 16:57:56 2012 +0100 2.2 +++ b/VSs.h Thu Dec 27 12:27:45 2012 +0100 2.3 @@ -25,6 +25,8 @@ 2.4 //=========================================================================== 2.5 #define NUM_STRUCS_IN_SEM_ENV 1000 2.6 2.7 +#define MAX_TASKS_NUM 256 2.8 + 2.9 //This is hardware dependent -- it's the number of cycles of scheduling 2.10 // overhead -- if a work unit is fewer than this, it is better being 2.11 // combined sequentially with other work 2.12 @@ -227,6 +229,10 @@ 2.13 bool32 *coreIsDone; 2.14 int32 numCoresDone; 2.15 2.16 + int numInFlightTasks; 2.17 + PrivQueueStruc *deferredSubmitsQ; 2.18 + int numDeferred; 2.19 + 2.20 #ifdef HOLISTIC__TURN_ON_OBSERVE_UCC 2.21 ListOfArrays* unitList; 2.22 ListOfArrays* ctlDependenciesList; 2.23 @@ -244,7 +250,9 @@ 2.24 #ifdef HOLISTIC__TURN_ON_PERF_COUNTERS 2.25 ListOfArrays* counterList[NUM_CORES]; 2.26 #endif 2.27 + #ifdef IDLE_SLAVES 2.28 SlaveVP* idleSlv[NUM_CORES][NUM_ANIM_SLOTS]; 2.29 + #endif 2.30 int shutdownInitiated; 2.31 } 2.32 VSsSemEnv;
3.1 --- a/VSs_PluginFns.c Mon Oct 29 16:57:56 2012 +0100 3.2 +++ b/VSs_PluginFns.c Thu Dec 27 12:27:45 2012 +0100 3.3 @@ -415,7 +415,8 @@ 3.4 newStub->isWaitingForChildTasksToEnd = FALSE; 3.5 newStub->isWaitingForChildThreadsToEnd = FALSE; 3.6 newStub->taskID = NULL; 3.7 - 3.8 + newStub->isEnded = FALSE; 3.9 + 3.10 return newStub; 3.11 } 3.12
4.1 --- a/VSs_Request_Handlers.c Mon Oct 29 16:57:56 2012 +0100 4.2 +++ b/VSs_Request_Handlers.c Thu Dec 27 12:27:45 2012 +0100 4.3 @@ -81,6 +81,12 @@ 4.4 return newEntry; 4.5 } 4.6 4.7 +void free_pointer_entry(VSsPointerEntry* ptrEntry){ 4.8 + freePrivQ(ptrEntry->waitersQ); 4.9 + VMS_int__free(ptrEntry); 4.10 +} 4.11 + 4.12 + 4.13 /*malloc's space and initializes fields -- and COPIES the arg values 4.14 * to new space 4.15 */ 4.16 @@ -122,6 +128,130 @@ 4.17 return newCarrier; 4.18 } 4.19 4.20 +void do_submit(VSsSemReq *semReq, VSsSemEnv *semEnv){ 4.21 + uint32 key[5]; 4.22 + HashEntry *rawHashEntry; //has char *, but use with uint32 * 4.23 + VSsPointerEntry *ptrEntry; //contents of hash table entry for an arg pointer 4.24 + void **args; 4.25 + VSsTaskStub *taskStub; 4.26 + VSsTaskType *taskType; 4.27 + VSsTaskStubCarrier *taskCarrier; 4.28 + 4.29 + HashTable * 4.30 + argPtrHashTbl = semEnv->argPtrHashTbl; 4.31 + 4.32 + if(!semReq) { 4.33 + DEBUG__printf(dbgRqstHdlr,"***submitted Req is null***\n") 4.34 + return;} 4.35 + 4.36 + semEnv->numInFlightTasks++; 4.37 + /* ========================== creation ========================== 4.38 + * 4.39 + *At creation, make a task-stub. Set the count of blocking propendents 4.40 + * to the number of controlled arguments (a task can have 4.41 + * arguments that are not controlled by the language, like simple integer 4.42 + * inputs from the sequential portion. Note that all controlled arguments 4.43 + * are pointers, and marked as controlled in the application code). 4.44 + */ 4.45 + args = semReq->args; 4.46 + taskType = semReq->taskType; 4.47 + taskStub = create_task_stub(taskType, args); //copies arg ptrs 4.48 + taskStub->numBlockingProp = taskType->numCtldArgs; 4.49 + taskStub->taskID = semReq->taskID; //may be NULL 4.50 + 4.51 + VSsSemData* 4.52 + parentSemData = (VSsSemData*) semReq->callingSlv->semanticData; 4.53 + taskStub->parentTaskStub = (void*) parentSemData->taskStub; 4.54 + parentSemData->taskStub->numLiveChildTasks += 1; 4.55 +#ifdef HOLISTIC__TURN_ON_OBSERVE_UCC 4.56 + taskStub->parentUnit.vp = semReq->callingSlv->slaveID; 4.57 + taskStub->parentUnit.task = semReq->callingSlv->assignCount; 4.58 +#endif 4.59 + 4.60 + //DEBUG__printf3(dbgRqstHdlr,"Submit req from slaveID: %d, from task: %d, for task: %d", semReq->callingSlv->slaveID, parentSemData->taskStub->taskID[1], taskStub->taskID[1]) 4.61 + 4.62 +#ifdef EXTERNAL_SCHEDULER 4.63 + //send task descriptor 4.64 + VSs__submit_task_to_ext(taskStub, semEnv); 4.65 +#else 4.66 + /*The controlled arguments are then processed one by one. 4.67 + *Processing an argument means getting the hash of the pointer. Then, 4.68 + * looking up the hash entry. (If none, create one). 4.69 + */ 4.70 + int32 argNum; 4.71 + for (argNum = 0; argNum < taskType->numCtldArgs; argNum++) { 4.72 + key[0] = 4; //two 32b values in key 4.73 + *((uint64*) & key[1]) = (uint64) args[argNum]; //write 64b into two 32b 4.74 + *((uint64*) & key[3]) = (uint64) taskStub->parentTaskStub ; 4.75 + 4.76 + /*If the hash entry was chained, put it at the 4.77 + * start of the chain. (Means no-longer-used pointers accumulate 4.78 + * at end of chain, decide garbage collection later) */ 4.79 + rawHashEntry = getEntryFromTable32(key, argPtrHashTbl); 4.80 + if (rawHashEntry == NULL) { //adding a value auto-creates the hash-entry 4.81 + ptrEntry = create_pointer_entry(); 4.82 + rawHashEntry = addValueIntoTable32(key, ptrEntry, argPtrHashTbl); 4.83 + } else { 4.84 + ptrEntry = (VSsPointerEntry *) rawHashEntry->content; 4.85 + if (ptrEntry == NULL) { 4.86 + ptrEntry = create_pointer_entry(); 4.87 + rawHashEntry = addValueIntoTable32(key, ptrEntry, argPtrHashTbl); 4.88 + } 4.89 + } 4.90 + taskStub->ptrEntries[argNum] = ptrEntry; 4.91 + 4.92 + /*Have the hash entry. 4.93 + *If the arg is a reader and the entry does not have an enabled 4.94 + * non-finished writer, and the queue is empty. */ 4.95 + if (taskType->argTypes[argNum] == READER) { 4.96 + if (!ptrEntry->hasEnabledNonFinishedWriter && 4.97 + isEmptyPrivQ(ptrEntry->waitersQ)) { /*The reader is free. So, decrement the blocking-propendent 4.98 + * count in the task-stub. If the count is zero, then put the 4.99 + * task-stub into the readyQ. At the same time, increment 4.100 + * the hash-entry's count of enabled and non-finished readers.*/ 4.101 + taskStub->numBlockingProp -= 1; 4.102 + DEBUG__printf_w_task(dbgRqstHdlr, taskStub, "taking ptrEntry %p (read)", ptrEntry); 4.103 + if (taskStub->numBlockingProp == 0) { 4.104 + writePrivQ(taskStub, semEnv->taskReadyQ); 4.105 + DEBUG__printf_w_task(dbgRqstHdlr, taskStub, "ready (dependencies fulfilled)"); 4.106 + } else { 4.107 + DEBUG__printf_w_task(dbgRqstHdlr,taskStub,"still blocked on %d args",taskStub->numBlockingProp); 4.108 + } 4.109 + ptrEntry->numEnabledNonDoneReaders += 1; 4.110 + } else { /*Otherwise, the reader is put into the hash-entry's Q of 4.111 + * waiters*/ 4.112 + taskCarrier = create_task_carrier(taskStub, argNum, READER); 4.113 + writePrivQ(taskCarrier, ptrEntry->waitersQ); 4.114 + } 4.115 + } else //arg is a writer 4.116 + { /*the arg is a writer, plus the entry does not have a current 4.117 + * writer, plus the number of enabled non-finished readers is 4.118 + * zero, (the Q must be empty, else bug!) then the writer is free*/ 4.119 + if (!ptrEntry->hasEnabledNonFinishedWriter && 4.120 + ptrEntry->numEnabledNonDoneReaders == 0) { /*Mark the entry has having a enabled and non-finished writer. 4.121 + * Decrement the blocking-propenden count in the writer's 4.122 + * task-stub. If the count is zero, then put the task-stub 4.123 + * into the readyQ.*/ 4.124 + taskStub->numBlockingProp -= 1; 4.125 + DEBUG__printf_w_task(dbgRqstHdlr,taskStub,"taking ptrEntry %p (write)",ptrEntry); 4.126 + if (taskStub->numBlockingProp == 0) { 4.127 + DEBUG__printf_w_task(dbgRqstHdlr, taskStub, "ready (dependencies fulfilled)"); 4.128 + writePrivQ(taskStub, semEnv->taskReadyQ); 4.129 + } else { 4.130 + DEBUG__printf_w_task(dbgRqstHdlr,taskStub,"still blocked on %d args",taskStub->numBlockingProp); 4.131 + } 4.132 + ptrEntry->hasEnabledNonFinishedWriter = TRUE; 4.133 + } else {/*Otherwise, put the writer into the entry's Q of waiters.*/ 4.134 + taskCarrier = create_task_carrier(taskStub, argNum, WRITER); 4.135 + writePrivQ(taskCarrier, ptrEntry->waitersQ); 4.136 + } 4.137 + } 4.138 + } //for argNum 4.139 +#endif 4.140 + 4.141 + resume_slaveVP(semReq->callingSlv, semEnv); 4.142 +} 4.143 + 4.144 //========================================================================== 4.145 // 4.146 // 4.147 @@ -225,134 +355,46 @@ 4.148 */ 4.149 void 4.150 handleSubmitTask(VSsSemReq *semReq, VSsSemEnv *semEnv) { 4.151 - uint32 key[5]; 4.152 - HashEntry *rawHashEntry; //has char *, but use with uint32 * 4.153 - VSsPointerEntry *ptrEntry; //contents of hash table entry for an arg pointer 4.154 - void **args; 4.155 - VSsTaskStub *taskStub; 4.156 - VSsTaskType *taskType; 4.157 - VSsTaskStubCarrier *taskCarrier; 4.158 4.159 - HashTable * 4.160 - argPtrHashTbl = semEnv->argPtrHashTbl; 4.161 4.162 - //suspending a task always makes the slave into an extra slot slave, 4.163 - // because it ends up in the resumeQ, even when resumes immediately. 4.164 - //Eventually task_end will put the slave into the freeExtraTaskSlvQ 4.165 - replaceWithNewSlotSlvIfNeeded( semReq->callingSlv, semEnv ); 4.166 + //suspending a task always makes the slave into an extra slot slave, 4.167 + // because it ends up in the resumeQ, even when resumes immediately. 4.168 + //Eventually task_end will put the slave into the freeExtraTaskSlvQ 4.169 + replaceWithNewSlotSlvIfNeeded(semReq->callingSlv, semEnv); 4.170 4.171 - /* ========================== creation ========================== 4.172 - * 4.173 - *At creation, make a task-stub. Set the count of blocking propendents 4.174 - * to the number of controlled arguments (a task can have 4.175 - * arguments that are not controlled by the language, like simple integer 4.176 - * inputs from the sequential portion. Note that all controlled arguments 4.177 - * are pointers, and marked as controlled in the application code). 4.178 - */ 4.179 - args = semReq->args; 4.180 - taskType = semReq->taskType; 4.181 - taskStub = create_task_stub(taskType, args); //copies arg ptrs 4.182 - taskStub->numBlockingProp = taskType->numCtldArgs; 4.183 - taskStub->taskID = semReq->taskID; //may be NULL 4.184 +#ifdef DEBUG__TURN_ON_DEBUG_PRINT 4.185 + if (dbgRqstHdlr) { 4.186 + if (semReq->taskID) { 4.187 + printf("Task "); 4.188 + int i; 4.189 + for (i = 1; i < semReq->taskID[0]; i++) { 4.190 + printf("%d,", semReq->taskID[i]); 4.191 + } 4.192 + printf("%d: ", semReq->taskID[i]); 4.193 + } else { 4.194 + printf("Anonymous "); 4.195 + } 4.196 + } 4.197 +#endif 4.198 + DEBUG__printf(dbgRqstHdlr, "submit req from slaveID %d", semReq->callingSlv->slaveID); 4.199 + 4.200 + // Throttle if too many tasks 4.201 + 4.202 + if (!isEmptyPrivQ(semEnv->deferredSubmitsQ) || semEnv->numInFlightTasks > MAX_TASKS_NUM) { 4.203 + semEnv->numDeferred++; 4.204 + writePrivQ(semReq, semEnv->deferredSubmitsQ); 4.205 + while (!isEmptyPrivQ(semEnv->deferredSubmitsQ) && ( semEnv->numInFlightTasks < MAX_TASKS_NUM || isEmptyPrivQ(semEnv->taskReadyQ) )) { 4.206 + do_submit(readPrivQ(semEnv->deferredSubmitsQ),semEnv); 4.207 + semEnv->numDeferred--; 4.208 + } 4.209 + } else { 4.210 4.211 - VSsSemData* 4.212 - parentSemData = (VSsSemData*) semReq->callingSlv->semanticData; 4.213 - taskStub->parentTaskStub = (void*) parentSemData->taskStub; 4.214 - parentSemData->taskStub->numLiveChildTasks += 1; 4.215 -#ifdef HOLISTIC__TURN_ON_OBSERVE_UCC 4.216 - taskStub->parentUnit.vp = semReq->callingSlv->slaveID; 4.217 - taskStub->parentUnit.task = semReq->callingSlv->assignCount; 4.218 -#endif 4.219 + do_submit(semReq,semEnv); 4.220 + } 4.221 4.222 - //DEBUG__printf3(dbgRqstHdlr,"Submit req from slaveID: %d, from task: %d, for task: %d", semReq->callingSlv->slaveID, parentSemData->taskStub->taskID[1], taskStub->taskID[1]) 4.223 - if(semReq->taskID) { DEBUG__printf2(dbgRqstHdlr,"Submit req from slaveID: %d, for task: %d", semReq->callingSlv->slaveID, taskStub->taskID[1]) } 4.224 - else { DEBUG__printf1(dbgRqstHdlr,"Submit req from slaveID: %d, for anonymous task", semReq->callingSlv->slaveID) } 4.225 -#ifdef EXTERNAL_SCHEDULER 4.226 - //send task descriptor 4.227 - VSs__submit_task_to_ext(taskStub, semEnv); 4.228 -#else 4.229 - /*The controlled arguments are then processed one by one. 4.230 - *Processing an argument means getting the hash of the pointer. Then, 4.231 - * looking up the hash entry. (If none, create one). 4.232 - */ 4.233 - int32 argNum; 4.234 - for (argNum = 0; argNum < taskType->numCtldArgs; argNum++) { 4.235 - key[0] = 4; //two 32b values in key 4.236 - *((uint64*) & key[1]) = (uint64) args[argNum]; //write 64b into two 32b 4.237 - *((uint64*) & key[3]) = (uint64) taskStub->parentTaskStub ; 4.238 - 4.239 - /*If the hash entry was chained, put it at the 4.240 - * start of the chain. (Means no-longer-used pointers accumulate 4.241 - * at end of chain, decide garbage collection later) */ 4.242 - rawHashEntry = getEntryFromTable32(key, argPtrHashTbl); 4.243 - if (rawHashEntry == NULL) { //adding a value auto-creates the hash-entry 4.244 - ptrEntry = create_pointer_entry(); 4.245 - rawHashEntry = addValueIntoTable32(key, ptrEntry, argPtrHashTbl); 4.246 - } else { 4.247 - ptrEntry = (VSsPointerEntry *) rawHashEntry->content; 4.248 - if (ptrEntry == NULL) { 4.249 - ptrEntry = create_pointer_entry(); 4.250 - rawHashEntry = addValueIntoTable32(key, ptrEntry, argPtrHashTbl); 4.251 - } 4.252 - } 4.253 - taskStub->ptrEntries[argNum] = ptrEntry; 4.254 4.255 - /*Have the hash entry. 4.256 - *If the arg is a reader and the entry does not have an enabled 4.257 - * non-finished writer, and the queue is empty. */ 4.258 - if (taskType->argTypes[argNum] == READER) { 4.259 - if (!ptrEntry->hasEnabledNonFinishedWriter && 4.260 - isEmptyPrivQ(ptrEntry->waitersQ)) { /*The reader is free. So, decrement the blocking-propendent 4.261 - * count in the task-stub. If the count is zero, then put the 4.262 - * task-stub into the readyQ. At the same time, increment 4.263 - * the hash-entry's count of enabled and non-finished readers.*/ 4.264 - taskStub->numBlockingProp -= 1; 4.265 - if(taskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"Reader %d now on ptrEntry %p; ",taskStub->taskID[1],ptrEntry) } 4.266 - else {DEBUG__printf2(dbgRqstHdlr,"Reader %p now on ptrEntry %p; ",taskStub,ptrEntry)} 4.267 - if (taskStub->numBlockingProp == 0) { 4.268 - writePrivQ(taskStub, semEnv->taskReadyQ); 4.269 - if(taskStub->taskID) { DEBUG__printf1(dbgRqstHdlr,"reader %d started; ",taskStub->taskID[1]) } 4.270 - else { DEBUG__printf1(dbgRqstHdlr,"reader %p started; ",taskStub) } 4.271 - } else { 4.272 - if(taskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"reader %d still blocked on %d args; ",taskStub->taskID[1],taskStub->numBlockingProp) } 4.273 - else {DEBUG__printf2(dbgRqstHdlr,"reader %p still blocked on %d args; ",taskStub,taskStub->numBlockingProp)} 4.274 - } 4.275 - ptrEntry->numEnabledNonDoneReaders += 1; 4.276 - } else { /*Otherwise, the reader is put into the hash-entry's Q of 4.277 - * waiters*/ 4.278 - taskCarrier = create_task_carrier(taskStub, argNum, READER); 4.279 - writePrivQ(taskCarrier, ptrEntry->waitersQ); 4.280 - } 4.281 - } else //arg is a writer 4.282 - { /*the arg is a writer, plus the entry does not have a current 4.283 - * writer, plus the number of enabled non-finished readers is 4.284 - * zero, (the Q must be empty, else bug!) then the writer is free*/ 4.285 - if (!ptrEntry->hasEnabledNonFinishedWriter && 4.286 - ptrEntry->numEnabledNonDoneReaders == 0) { /*Mark the entry has having a enabled and non-finished writer. 4.287 - * Decrement the blocking-propenden count in the writer's 4.288 - * task-stub. If the count is zero, then put the task-stub 4.289 - * into the readyQ.*/ 4.290 - taskStub->numBlockingProp -= 1; 4.291 - if(taskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d takes ptrEntry %p; ",taskStub->taskID[1],ptrEntry) } 4.292 - else { DEBUG__printf2(dbgRqstHdlr,"writer %p takes ptrEntry %p; ",taskStub,ptrEntry)} 4.293 - if (taskStub->numBlockingProp == 0) { 4.294 - writePrivQ(taskStub, semEnv->taskReadyQ); 4.295 - if(taskStub->taskID) { DEBUG__printf1(dbgRqstHdlr,"writer %d started; ",taskStub->taskID[1]) } 4.296 - else { DEBUG__printf1(dbgRqstHdlr,"writer %p started; ",taskStub) } 4.297 - } else { 4.298 - if(taskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d still blocked on %d args; ",taskStub->taskID[1],taskStub->numBlockingProp) } 4.299 - else {DEBUG__printf2(dbgRqstHdlr,"writer %p still blocked on %d args; ",taskStub,taskStub->numBlockingProp)} 4.300 - } 4.301 - ptrEntry->hasEnabledNonFinishedWriter = TRUE; 4.302 - } else {/*Otherwise, put the writer into the entry's Q of waiters.*/ 4.303 - taskCarrier = create_task_carrier(taskStub, argNum, WRITER); 4.304 - writePrivQ(taskCarrier, ptrEntry->waitersQ); 4.305 - } 4.306 - } 4.307 - } //for argNum 4.308 -#endif 4.309 4.310 - resume_slaveVP(semReq->callingSlv, semEnv); 4.311 + 4.312 4.313 return; 4.314 } 4.315 @@ -402,15 +444,15 @@ 4.316 VSsTaskStubCarrier *waitingTaskCarrier; 4.317 VSsPointerEntry **ptrEntries; 4.318 4.319 - 4.320 + semEnv->numInFlightTasks--; 4.321 + 4.322 endingSlvSemData = (VSsSemData *) semReq->callingSlv->semanticData; 4.323 endingTaskStub = endingSlvSemData->taskStub; 4.324 args = endingTaskStub->args; 4.325 endingTaskType = endingTaskStub->taskType; 4.326 ptrEntries = endingTaskStub->ptrEntries; //saved in stub when create 4.327 4.328 - if(endingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"EndTask req from slaveID: %d, task: %d",semReq->callingSlv->slaveID, endingTaskStub->taskID[1]) } 4.329 - else {DEBUG__printf1(dbgRqstHdlr,"EndTask req from slaveID: %d",semReq->callingSlv->slaveID)} 4.330 + DEBUG__printf_w_task(dbgRqstHdlr,endingTaskStub,"EndTask req from slaveID %d",semReq->callingSlv->slaveID); 4.331 4.332 //Check if parent was waiting on this task 4.333 parent = (VSsTaskStub *) endingTaskStub->parentTaskStub; 4.334 @@ -468,7 +510,7 @@ 4.335 if (endingTaskType->argTypes[argNum] == READER) { /*then decrement the enabled and non-finished reader-count in 4.336 * the hash-entry. */ 4.337 ptrEntry->numEnabledNonDoneReaders -= 1; 4.338 - DEBUG__printf1(dbgRqstHdlr,"Releasing read on ptrEntry %p; ",ptrEntry) 4.339 + DEBUG__printf(dbgRqstHdlr,"Releasing read on ptrEntry %p",ptrEntry) 4.340 #ifdef HOLISTIC__TURN_ON_OBSERVE_UCC 4.341 Unit u; 4.342 u.vp = semReq->callingSlv->slaveID; 4.343 @@ -486,17 +528,18 @@ 4.344 /*If the count becomes zero, then take the next entry from the Q. 4.345 *It should be a writer, or else there's a bug in this algorithm.*/ 4.346 if (ptrEntry->numEnabledNonDoneReaders == 0) { 4.347 - DEBUG__printf1(dbgRqstHdlr,"ptrEntry %p now free; ",ptrEntry) 4.348 + DEBUG__printf(dbgRqstHdlr,"ptrEntry %p now free",ptrEntry) 4.349 waitingTaskCarrier = readPrivQ(ptrEntry->waitersQ); 4.350 if (waitingTaskCarrier == NULL) { //TODO: looks safe to delete the ptr entry at this point 4.351 - DEBUG__printf1(dbgRqstHdlr,"no waiting writer found for ptrEntry %p\n",ptrEntry) 4.352 + DEBUG__printf(dbgRqstHdlr,"no waiting writer found for ptrEntry %p\n",ptrEntry) 4.353 + //free_pointer_entry(ptrEntry); 4.354 continue; //next iter of loop 4.355 } 4.356 if (waitingTaskCarrier->isReader) 4.357 VMS_App__throw_exception("READER waiting", NULL, NULL); 4.358 4.359 waitingTaskStub = waitingTaskCarrier->taskStub; 4.360 - 4.361 + VMS_PI__free(waitingTaskCarrier); 4.362 /*Set the hash-entry to have an enabled non-finished writer.*/ 4.363 ptrEntry->hasEnabledNonFinishedWriter = TRUE; 4.364 4.365 @@ -504,17 +547,14 @@ 4.366 * task-stub. If the count has reached zero, then put the 4.367 * task-stub into the readyQ.*/ 4.368 waitingTaskStub->numBlockingProp -= 1; 4.369 - if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d takes ptrEntry %p; ",waitingTaskStub->taskID[1],ptrEntry) } 4.370 - else { DEBUG__printf2(dbgRqstHdlr,"writer %p takes ptrEntry %p; ",waitingTaskStub,ptrEntry)} 4.371 + DEBUG__printf_w_task(dbgRqstHdlr,waitingTaskStub,"taking ptrEntry %p (write)",ptrEntry); 4.372 if (waitingTaskStub->numBlockingProp == 0) { 4.373 - if(waitingTaskStub->taskID) { DEBUG__printf1(dbgRqstHdlr,"writer %d started; ",waitingTaskStub->taskID[1]) } 4.374 - else { DEBUG__printf1(dbgRqstHdlr,"writer %p started; ",waitingTaskStub) } 4.375 + DEBUG__printf_w_task(dbgRqstHdlr,waitingTaskStub,"ready (dependencies fulfilled)"); 4.376 writePrivQ(waitingTaskStub, semEnv->taskReadyQ); 4.377 } else { 4.378 - if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d still blocked on %d args; ",waitingTaskStub->taskID[1],waitingTaskStub->numBlockingProp) } 4.379 - else {DEBUG__printf2(dbgRqstHdlr,"writer %p still blocked on %d args; ",waitingTaskStub,waitingTaskStub->numBlockingProp)} 4.380 + DEBUG__printf_w_task(dbgRqstHdlr,waitingTaskStub,"still blocked on %d args",waitingTaskStub->numBlockingProp); 4.381 } 4.382 - VMS_PI__free(waitingTaskCarrier); 4.383 + 4.384 } 4.385 4.386 } else /*the ending task is a writer of this arg*/ { /*clear the enabled non-finished writer flag of the hash-entry.*/ 4.387 @@ -552,30 +592,29 @@ 4.388 ptrEntry->lastWriter.task = semReq->callingSlv->assignCount; 4.389 #endif 4.390 4.391 - DEBUG__printf1(dbgRqstHdlr,"Releasing write on ptrEntry %p; ",ptrEntry) 4.392 + DEBUG__printf(dbgRqstHdlr,"Releasing write on ptrEntry %p; ",ptrEntry) 4.393 /*Take the next waiter from the hash-entry's Q.*/ 4.394 waitingTaskCarrier = readPrivQ(ptrEntry->waitersQ); 4.395 if (waitingTaskCarrier == NULL) { //TODO: looks safe to delete ptr entry at this point 4.396 - DEBUG__printf1(dbgRqstHdlr,"no waiting task on ptrEntry %p; ",ptrEntry) 4.397 + DEBUG__printf(dbgRqstHdlr,"no waiting task on ptrEntry %p; deleting",ptrEntry); 4.398 + //free_pointer_entry(ptrEntry); 4.399 + //NOPE, still tasks around that kept the pointer... 4.400 continue; //go to next iter of loop, done here. 4.401 } 4.402 waitingTaskStub = waitingTaskCarrier->taskStub; 4.403 4.404 /*If task is a writer of this hash-entry's pointer*/ 4.405 if (!waitingTaskCarrier->isReader) { /* then turn the flag back on.*/ 4.406 - if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d takes ptrEntry %p; ",waitingTaskStub->taskID[1],ptrEntry) } 4.407 - else { DEBUG__printf2(dbgRqstHdlr,"Writer %p takes ptrEntry %p; ",waitingTaskStub,ptrEntry)} 4.408 + DEBUG__printf_w_task(dbgRqstHdlr,waitingTaskStub,"taking ptrEntry %p (write)",ptrEntry); 4.409 ptrEntry->hasEnabledNonFinishedWriter = TRUE; 4.410 /*Decrement the writer's blocking-propendent-count in task-stub 4.411 * If it becomes zero, then put the task-stub into the readyQ.*/ 4.412 waitingTaskStub->numBlockingProp -= 1; 4.413 if (waitingTaskStub->numBlockingProp == 0) { 4.414 - if(waitingTaskStub->taskID) { DEBUG__printf1(dbgRqstHdlr,"writer %d started; ",waitingTaskStub->taskID[1]) } 4.415 - else {DEBUG__printf1(dbgRqstHdlr,"writer %p started; ",waitingTaskStub)} 4.416 + DEBUG__printf_w_task(dbgRqstHdlr,waitingTaskStub,"ready (dependencies fulfilled)"); 4.417 writePrivQ(waitingTaskStub, semEnv->taskReadyQ); 4.418 } else { 4.419 - if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d still blocked on %d args; ",waitingTaskStub->taskID[1],waitingTaskStub->numBlockingProp) } 4.420 - else {DEBUG__printf2(dbgRqstHdlr,"writer %p still blocked on %d args; ",waitingTaskStub,waitingTaskStub->numBlockingProp)} 4.421 + DEBUG__printf_w_task(dbgRqstHdlr,waitingTaskStub,"still blocked on %d args; ",waitingTaskStub->numBlockingProp); 4.422 } 4.423 VMS_PI__free(waitingTaskCarrier); 4.424 } else { /*Waiting task is a reader, so do a loop, of all waiting readers 4.425 @@ -584,26 +623,24 @@ 4.426 * readers.*/ 4.427 //deal with tasks suspended by taskwait_on here - these don't count as a dependency but are otherwise treated like readers 4.428 if(waitingTaskCarrier->isSuspended){ 4.429 - if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"writer %d still blocked on %d args; ",waitingTaskStub->taskID[1],waitingTaskStub->numBlockingProp) } 4.430 - else {DEBUG__printf2(dbgRqstHdlr,"task %p taskwaiting on ptr %p resumed; ",waitingTaskStub,ptrEntry)} 4.431 + DEBUG__printf_w_task(dbgRqstHdlr, waitingTaskStub, "taskwaiting on ptr %p resumed; ", ptrEntry); 4.432 resume_slaveVP(waitingTaskStub->slaveAssignedTo, semEnv); 4.433 } else { 4.434 4.435 ptrEntry->numEnabledNonDoneReaders += 1; 4.436 - if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"Reader %d now on ptrEntry %p; ",waitingTaskStub->taskID[1],ptrEntry) } 4.437 - else {DEBUG__printf2(dbgRqstHdlr,"Reader %p now on ptrEntry %p; ",waitingTaskStub,ptrEntry)} 4.438 + DEBUG__printf_w_task(dbgRqstHdlr, waitingTaskStub, "now on ptrEntry %p (read)",ptrEntry); 4.439 + //if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"Reader %d now on ptrEntry %p; ",waitingTaskStub->taskID[1],ptrEntry) } 4.440 + //else {DEBUG__printf2(dbgRqstHdlr,"Reader %p now on ptrEntry %p; ",waitingTaskStub,ptrEntry)} 4.441 /*Decrement the blocking propendents count of the reader's 4.442 * task-stub. If it reaches zero, then put the task-stub 4.443 * into the readyQ.*/ 4.444 waitingTaskStub->numBlockingProp -= 1; 4.445 4.446 if (waitingTaskStub->numBlockingProp == 0) { 4.447 - if(waitingTaskStub->taskID) { DEBUG__printf1(dbgRqstHdlr,"writer %d started; ",waitingTaskStub->taskID[1]) } 4.448 - else {DEBUG__printf1(dbgRqstHdlr,"writer %p started; ",waitingTaskStub)} 4.449 + DEBUG__printf_w_task(dbgRqstHdlr, waitingTaskStub, "ready (dependencies fulfilled)"); 4.450 writePrivQ(waitingTaskStub, semEnv->taskReadyQ); 4.451 } else { 4.452 - if(waitingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr,"reader %d still blocked on %d args; ",waitingTaskStub->taskID[1],waitingTaskStub->numBlockingProp) } 4.453 - else {DEBUG__printf2(dbgRqstHdlr,"reader %p still blocked on %d args; ",waitingTaskStub,waitingTaskStub->numBlockingProp)} 4.454 + DEBUG__printf_w_task(dbgRqstHdlr,waitingTaskStub,"still blocked on %d args",waitingTaskStub->numBlockingProp); 4.455 } 4.456 } //if-else, suspended or normal reader 4.457 //discard carrier 4.458 @@ -611,7 +648,7 @@ 4.459 /*Get next waiting task*/ 4.460 waitingTaskCarrier = peekPrivQ(ptrEntry->waitersQ); 4.461 if (waitingTaskCarrier == NULL) break; 4.462 - if (!waitingTaskCarrier->isReader) break; 4.463 + if (!waitingTaskCarrier->isReader) { waitingTaskCarrier = NULL; break;} 4.464 waitingTaskCarrier = readPrivQ(ptrEntry->waitersQ); 4.465 waitingTaskStub = waitingTaskCarrier->taskStub; 4.466 }//while waiter is a reader 4.467 @@ -636,6 +673,13 @@ 4.468 writePrivQ(semReq->callingSlv, semEnv->freeExtraTaskSlvQ); 4.469 } 4.470 4.471 + 4.472 + while (!isEmptyPrivQ(semEnv->deferredSubmitsQ) && (semEnv->numInFlightTasks < MAX_TASKS_NUM || isEmptyPrivQ(semEnv->taskReadyQ) )) { 4.473 + do_submit(readPrivQ(semEnv->deferredSubmitsQ),semEnv); 4.474 + semEnv->numDeferred--; 4.475 + } 4.476 + 4.477 + 4.478 //otherwise, it's a slot slave, so it will get used from matrix 4.479 // so, do nothing with it, just return 4.480 return; 4.481 @@ -1086,11 +1130,8 @@ 4.482 semData = (VSsSemData *) semReq->callingSlv->semanticData; 4.483 requestingTaskStub = semData->taskStub; 4.484 4.485 - if(requestingTaskStub->taskID) { DEBUG__printf2(dbgRqstHdlr, 4.486 - "TaskwaitOn request from processor %d, task: %d",requestingSlv->slaveID, 4.487 - requestingTaskStub->taskID[1]) } 4.488 - else {DEBUG__printf1(dbgRqstHdlr, "TaskwaitOn request from processor %d", 4.489 - requestingSlv->slaveID);} 4.490 + DEBUG__printf_w_task(dbgRqstHdlr, requestingTaskStub, "TaskwaitOn request from slave %d", 4.491 + requestingSlv->slaveID); 4.492 4.493 void* ptr = semReq->args; 4.494