Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > BestEffortMessaging
changeset 4:7ba5a3a6102d multiple_p2p_trigger
System with multiple point 2 point trigger working
| author | Merten Sach <msach@mailbox.tu-berlin.de> |
|---|---|
| date | Tue, 13 Mar 2012 18:22:12 +0100 |
| parents | 5c0fb7c519d7 |
| children | 95a03e431480 |
| files | LossyCom.c LossyCom.h |
| diffstat | 2 files changed, 128 insertions(+), 58 deletions(-) [+] |
line diff
1.1 --- a/LossyCom.c Tue Mar 13 15:21:36 2012 +0100 1.2 +++ b/LossyCom.c Tue Mar 13 18:22:12 2012 +0100 1.3 @@ -3,8 +3,9 @@ 1.4 * For a detailed description see header file. 1.5 */ 1.6 1.7 +#include <string.h> 1.8 + 1.9 #include "LossyCom.h" 1.10 - 1.11 #include "VMS_Implementations/VMS_impl/vmalloc.h" 1.12 1.13 /* 1.14 @@ -20,7 +21,7 @@ 1.15 if(exchange == NULL) 1.16 return NULL; 1.17 1.18 - exchange->triggerCounter = 0; 1.19 + exchange->BroadcastTriggerCounter = 0; 1.20 exchange->numEndpoints = numEndpoints; 1.21 exchange->outboxArray = VMS_WL__malloc(sizeof(lossyCom__msg_t)*numEndpoints); 1.22 if(exchange->outboxArray == NULL){ 1.23 @@ -28,6 +29,16 @@ 1.24 return NULL; 1.25 } 1.26 1.27 + exchange->p2pTriggerCounter = VMS_WL__malloc(sizeof(uint16_t)*numEndpoints); 1.28 + if(exchange->p2pTriggerCounter == NULL){ 1.29 + VMS_WL__free(exchange->outboxArray); 1.30 + VMS_WL__free(exchange); 1.31 + return NULL; 1.32 + } 1.33 + 1.34 + //reset all point 2 point trigger counter 1.35 + memset((void*)exchange->p2pTriggerCounter, 0, sizeof(uint16_t)*numEndpoints); 1.36 + 1.37 return exchange; 1.38 } 1.39 1.40 @@ -43,100 +54,157 @@ 1.41 lossyCom__msgHandler msgHandler, 1.42 void* msgHandlerData) 1.43 { 1.44 - localEndpoint->localTriggerCopy = 0; 1.45 + localEndpoint->lastReceivedBroadcastTrigger = 0; 1.46 localEndpoint->endpointID = endpointID; 1.47 localEndpoint->centralExchange = centralExchange; 1.48 localEndpoint->msgHandler = msgHandler; 1.49 localEndpoint->msgHandlerData = msgHandlerData; 1.50 } 1.51 1.52 +inline lossyCom__msg_t prepareMsg(uint16_t triggerValue, 1.53 + lossyCom__endpointID_t receiverEndpointID, 1.54 + lossyCom__msgBody_t msg) 1.55 +{ 1.56 + lossyCom__msg_t msgDraft; 1.57 + 1.58 + msgDraft = (0 | msg); 1.59 + msgDraft |= ((lossyCom__msg_t)receiverEndpointID << ENDPOINT_ID_SHIFT); 1.60 + msgDraft |= ((lossyCom__msg_t)triggerValue << TRIGGER_SHIFT); 1.61 + 1.62 + return msgDraft; 1.63 +} 1.64 + 1.65 /* 1.66 * This broadcasts a message to all connected receivers 1.67 */ 1.68 -void inline lossyCom__broadcastMsg(lossyCom__endpoint_t* localEndpoint, 1.69 - lossyCom__msgBody_t msg) 1.70 +void lossyCom__broadcastMsg(lossyCom__endpoint_t* localEndpoint, 1.71 + lossyCom__msgBody_t msgBody) 1.72 { 1.73 - lossyCom__sendMsg(localEndpoint, 1.74 - BROADCAST_ID, 1.75 - msg); 1.76 + lossyCom__exchange_t* centralExchange = localEndpoint->centralExchange; 1.77 + uint16_t increasedTrigger; 1.78 + lossyCom__msg_t msg; 1.79 + 1.80 + increasedTrigger = centralExchange->BroadcastTriggerCounter +1; 1.81 + 1.82 + //build message 1.83 + msg = prepareMsg(increasedTrigger, BROADCAST_ID, msgBody); 1.84 + 1.85 + //write msg to central exchange 1.86 + centralExchange->outboxArray[localEndpoint->endpointID] = msg; 1.87 + 1.88 + //update broadcast trigger counter 1.89 + centralExchange->BroadcastTriggerCounter = increasedTrigger; 1.90 } 1.91 1.92 /* 1.93 * This sends a message another endpoint. Again it is not guaranteed that the 1.94 * message is received. But in most cases it will. 1.95 */ 1.96 -void inline lossyCom__sendMsg(lossyCom__endpoint_t* localEndpoint, 1.97 +void lossyCom__sendMsg(lossyCom__endpoint_t* localEndpoint, 1.98 lossyCom__endpointID_t receiverEndpointID, 1.99 - lossyCom__msgBody_t msg) 1.100 + lossyCom__msgBody_t msgBody) 1.101 { 1.102 - lossyCom__msg_t msgDraft; 1.103 - uint16_t triggerCopy; 1.104 + lossyCom__exchange_t* centralExchange = localEndpoint->centralExchange; 1.105 + lossyCom__msg_t msg; 1.106 + uint16_t increasedTrigger; 1.107 1.108 - //build message 1.109 - msgDraft = (0 | msg); 1.110 - msgDraft |= ((lossyCom__msg_t)receiverEndpointID << ENDPOINT_ID_SHIFT); 1.111 + increasedTrigger = 1.112 + centralExchange->p2pTriggerCounter[receiverEndpointID] +1; 1.113 1.114 - triggerCopy = localEndpoint->centralExchange->triggerCounter +1; 1.115 - msgDraft |= ((lossyCom__msg_t)triggerCopy << TRIGGER_SHIFT); 1.116 + //build message 1.117 + msg = prepareMsg(increasedTrigger, receiverEndpointID, msgBody); 1.118 1.119 //write msg to central exchange 1.120 - localEndpoint->centralExchange->outboxArray[localEndpoint->endpointID] = 1.121 - msgDraft; 1.122 + centralExchange->outboxArray[localEndpoint->endpointID] = msg; 1.123 1.124 - localEndpoint->centralExchange->triggerCounter = triggerCopy; 1.125 - //printf("send updated trigger to %d\n", triggerCopy); 1.126 + //write back increased trigger counter 1.127 + centralExchange->p2pTriggerCounter[receiverEndpointID] = increasedTrigger; 1.128 } 1.129 1.130 -void inline lossyCom__receiveMsg(lossyCom__endpoint_t* localEndpoint) 1.131 +int inline isUnreceivedMsg(uint16_t msgTrigger, 1.132 + uint16_t lastReceivedTrigger, 1.133 + uint16_t triggerSnapshot) 1.134 { 1.135 - uint16_t currentTriggerCopy; 1.136 + if(msgTrigger > lastReceivedTrigger || 1.137 + msgTrigger <= triggerSnapshot) 1.138 + { 1.139 + // check if the message is new (msg trigger > archived trigger) 1.140 + // and already valid (msgTrigger <= currentTriggerCopy) 1.141 + if((msgTrigger > lastReceivedTrigger && 1.142 + msgTrigger <= triggerSnapshot) || 1.143 + ((int64_t) triggerSnapshot- // check for triggerCounterOverflow 1.144 + (int64_t)lastReceivedTrigger < -MAX_TRIGGER/2)) 1.145 + { 1.146 + return TRUE; 1.147 + } 1.148 + } 1.149 + return FALSE; 1.150 +} 1.151 + 1.152 +void lossyCom__receiveMsg(lossyCom__endpoint_t* localEndpoint) 1.153 +{ 1.154 + uint16_t broadcastTriggerSnapshot, p2pTriggerSnapshot; 1.155 + lossyCom__exchange_t* centralExchange; 1.156 lossyCom__endpointID_t senderEndpointID; 1.157 - lossyCom__endpointID_t receiverID; 1.158 + lossyCom__endpointID_t receiverEndpointID; 1.159 lossyCom__msg_t msgCopy; 1.160 uint16_t msgTrigger; 1.161 lossyCom__msgBody_t msgBody; 1.162 1.163 - senderEndpointID = 0; 1.164 - currentTriggerCopy = localEndpoint->centralExchange->triggerCounter; 1.165 + centralExchange = localEndpoint->centralExchange; 1.166 + //save trigger counter to know find valid messages 1.167 + broadcastTriggerSnapshot = centralExchange->BroadcastTriggerCounter; 1.168 + p2pTriggerSnapshot = 1.169 + centralExchange->p2pTriggerCounter[localEndpoint->endpointID]; 1.170 1.171 //new message arrived if trigger counter is higher than the last time read 1.172 - if(currentTriggerCopy > localEndpoint->localTriggerCopy) 1.173 + if( broadcastTriggerSnapshot > localEndpoint->lastReceivedBroadcastTrigger || 1.174 + p2pTriggerSnapshot > localEndpoint->lastReceivedp2pTrigger) 1.175 { 1.176 - while(senderEndpointID < localEndpoint->centralExchange->numEndpoints) 1.177 + senderEndpointID = 0; 1.178 + //search outboxes for new messages 1.179 + while(senderEndpointID < centralExchange->numEndpoints) 1.180 { 1.181 + //ignore own outbox 1.182 if(senderEndpointID != localEndpoint->endpointID) 1.183 { 1.184 - msgCopy = localEndpoint->centralExchange->outboxArray[senderEndpointID]; 1.185 + msgCopy = centralExchange->outboxArray[senderEndpointID]; 1.186 msgTrigger = 0xFFFF & (msgCopy >> TRIGGER_SHIFT); 1.187 + receiverEndpointID = 0xFFFF & (msgCopy >> ENDPOINT_ID_SHIFT); 1.188 1.189 - 1.190 - if(msgTrigger > localEndpoint->localTriggerCopy || 1.191 - msgTrigger <= currentTriggerCopy) 1.192 - { 1.193 - // check if the message is new (msg trigger > archived trigger) 1.194 - // and already valid (msgTrigger <= currentTriggerCopy) 1.195 - if((msgTrigger > localEndpoint->localTriggerCopy && 1.196 - msgTrigger <= currentTriggerCopy) || 1.197 - ((int64_t)currentTriggerCopy- // check for triggerCounterOverflow 1.198 - (int64_t)localEndpoint->localTriggerCopy < -MAX_TRIGGER/2)) 1.199 + if(receiverEndpointID == BROADCAST_ID){//receive broadcast message 1.200 + if(isUnreceivedMsg(msgTrigger, 1.201 + localEndpoint->lastReceivedBroadcastTrigger, 1.202 + broadcastTriggerSnapshot)) 1.203 { 1.204 //let the message handler parse the message 1.205 - msgBody = 0xFFFFFFFF & msgCopy; 1.206 - receiverID = 0xFFFF & (msgCopy >> ENDPOINT_ID_SHIFT); 1.207 + msgBody = 0xFFFFFFFF & msgCopy; 1.208 //only receive broadcast and p2p for own receiverID 1.209 - if(receiverID == BROADCAST_ID || 1.210 - receiverID == localEndpoint->endpointID) 1.211 - { 1.212 - (*(localEndpoint->msgHandler))(senderEndpointID, 1.213 - msgBody, 1.214 - localEndpoint->msgHandlerData); 1.215 - } 1.216 - } 1.217 + 1.218 + (*(localEndpoint->msgHandler))(senderEndpointID, 1.219 + msgBody, 1.220 + localEndpoint->msgHandlerData); 1.221 + } 1.222 + }else{//point 2 point message 1.223 + if(receiverEndpointID == localEndpoint->endpointID && 1.224 + isUnreceivedMsg(msgTrigger, 1.225 + localEndpoint->lastReceivedp2pTrigger, 1.226 + p2pTriggerSnapshot)) 1.227 + { 1.228 + //let the message handler parse the message 1.229 + msgBody = 0xFFFFFFFF & msgCopy; 1.230 + //only receive broadcast and p2p for own receiverID 1.231 + 1.232 + (*(localEndpoint->msgHandler))(senderEndpointID, 1.233 + msgBody, 1.234 + localEndpoint->msgHandlerData); 1.235 + } 1.236 } 1.237 } 1.238 senderEndpointID++; 1.239 - } 1.240 + }//search outbox loop 1.241 } 1.242 //save last TriggerCounter of last parsed Msg 1.243 - localEndpoint->localTriggerCopy = currentTriggerCopy; 1.244 + localEndpoint->lastReceivedBroadcastTrigger = broadcastTriggerSnapshot; 1.245 + localEndpoint->lastReceivedp2pTrigger = p2pTriggerSnapshot; 1.246 } 1.247 \ No newline at end of file
2.1 --- a/LossyCom.h Tue Mar 13 15:21:36 2012 +0100 2.2 +++ b/LossyCom.h Tue Mar 13 18:22:12 2012 +0100 2.3 @@ -58,20 +58,22 @@ 2.4 * Central communication structure. 2.5 */ 2.6 typedef struct{ 2.7 - volatile uint16_t triggerCounter; 2.8 - uint16_t numEndpoints; 2.9 - lossyCom__msg_t* outboxArray; 2.10 + volatile uint16_t BroadcastTriggerCounter; 2.11 + volatile uint16_t* p2pTriggerCounter; 2.12 + uint16_t numEndpoints; 2.13 + lossyCom__msg_t* outboxArray; 2.14 }lossyCom__exchange_t; 2.15 2.16 /* 2.17 * Endpoint data structure. 2.18 */ 2.19 typedef struct { 2.20 - uint16_t localTriggerCopy; 2.21 + uint16_t lastReceivedBroadcastTrigger; 2.22 + uint16_t lastReceivedp2pTrigger; 2.23 lossyCom__endpointID_t endpointID; 2.24 - lossyCom__exchange_t* centralExchange; 2.25 - lossyCom__msgHandler msgHandler; 2.26 - void* msgHandlerData; 2.27 + lossyCom__exchange_t* centralExchange; 2.28 + lossyCom__msgHandler msgHandler; 2.29 + void* msgHandlerData; 2.30 } lossyCom__endpoint_t; 2.31 2.32 /***********************************
