Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > BestEffortMessaging
view LossyCom.c @ 4:7ba5a3a6102d
System with multiple point 2 point trigger working
| author | Merten Sach <msach@mailbox.tu-berlin.de> |
|---|---|
| date | Tue, 13 Mar 2012 18:22:12 +0100 |
| parents | 5c0fb7c519d7 |
| children | 95a03e431480 |
line source
2 /*
3 * For a detailed description see header file.
4 */
6 #include <string.h>
8 #include "LossyCom.h"
9 #include "VMS_Implementations/VMS_impl/vmalloc.h"
11 /*
12 * Initializes the central exchange structure.
13 * Allocates memory to fit the number of endpoints.
14 * Returns NULL if an error occurs.
15 */
16 lossyCom__exchange_t* lossyCom__initialize(uint16_t numEndpoints)
17 {
18 lossyCom__exchange_t* exchange;
20 exchange = VMS_WL__malloc(sizeof(lossyCom__exchange_t));
21 if(exchange == NULL)
22 return NULL;
24 exchange->BroadcastTriggerCounter = 0;
25 exchange->numEndpoints = numEndpoints;
26 exchange->outboxArray = VMS_WL__malloc(sizeof(lossyCom__msg_t)*numEndpoints);
27 if(exchange->outboxArray == NULL){
28 VMS_WL__free(exchange);
29 return NULL;
30 }
32 exchange->p2pTriggerCounter = VMS_WL__malloc(sizeof(uint16_t)*numEndpoints);
33 if(exchange->p2pTriggerCounter == NULL){
34 VMS_WL__free(exchange->outboxArray);
35 VMS_WL__free(exchange);
36 return NULL;
37 }
39 //reset all point 2 point trigger counter
40 memset((void*)exchange->p2pTriggerCounter, 0, sizeof(uint16_t)*numEndpoints);
42 return exchange;
43 }
45 /*
46 * Connects the local endpoint to the central exchange structure.
47 * This registers a message handler that handles all the incomming messages.
48 * Also an endpointID is set this ID has to be between 0 and total number of
49 * endpoints-1 and the number has to be unique.
50 */
51 void lossyCom__initialize_endpoint(lossyCom__endpoint_t* localEndpoint,
52 lossyCom__exchange_t* centralExchange,
53 lossyCom__endpointID_t endpointID,
54 lossyCom__msgHandler msgHandler,
55 void* msgHandlerData)
56 {
57 localEndpoint->lastReceivedBroadcastTrigger = 0;
58 localEndpoint->endpointID = endpointID;
59 localEndpoint->centralExchange = centralExchange;
60 localEndpoint->msgHandler = msgHandler;
61 localEndpoint->msgHandlerData = msgHandlerData;
62 }
64 inline lossyCom__msg_t prepareMsg(uint16_t triggerValue,
65 lossyCom__endpointID_t receiverEndpointID,
66 lossyCom__msgBody_t msg)
67 {
68 lossyCom__msg_t msgDraft;
70 msgDraft = (0 | msg);
71 msgDraft |= ((lossyCom__msg_t)receiverEndpointID << ENDPOINT_ID_SHIFT);
72 msgDraft |= ((lossyCom__msg_t)triggerValue << TRIGGER_SHIFT);
74 return msgDraft;
75 }
77 /*
78 * This broadcasts a message to all connected receivers
79 */
80 void lossyCom__broadcastMsg(lossyCom__endpoint_t* localEndpoint,
81 lossyCom__msgBody_t msgBody)
82 {
83 lossyCom__exchange_t* centralExchange = localEndpoint->centralExchange;
84 uint16_t increasedTrigger;
85 lossyCom__msg_t msg;
87 increasedTrigger = centralExchange->BroadcastTriggerCounter +1;
89 //build message
90 msg = prepareMsg(increasedTrigger, BROADCAST_ID, msgBody);
92 //write msg to central exchange
93 centralExchange->outboxArray[localEndpoint->endpointID] = msg;
95 //update broadcast trigger counter
96 centralExchange->BroadcastTriggerCounter = increasedTrigger;
97 }
99 /*
100 * This sends a message another endpoint. Again it is not guaranteed that the
101 * message is received. But in most cases it will.
102 */
103 void lossyCom__sendMsg(lossyCom__endpoint_t* localEndpoint,
104 lossyCom__endpointID_t receiverEndpointID,
105 lossyCom__msgBody_t msgBody)
106 {
107 lossyCom__exchange_t* centralExchange = localEndpoint->centralExchange;
108 lossyCom__msg_t msg;
109 uint16_t increasedTrigger;
111 increasedTrigger =
112 centralExchange->p2pTriggerCounter[receiverEndpointID] +1;
114 //build message
115 msg = prepareMsg(increasedTrigger, receiverEndpointID, msgBody);
117 //write msg to central exchange
118 centralExchange->outboxArray[localEndpoint->endpointID] = msg;
120 //write back increased trigger counter
121 centralExchange->p2pTriggerCounter[receiverEndpointID] = increasedTrigger;
122 }
124 int inline isUnreceivedMsg(uint16_t msgTrigger,
125 uint16_t lastReceivedTrigger,
126 uint16_t triggerSnapshot)
127 {
128 if(msgTrigger > lastReceivedTrigger ||
129 msgTrigger <= triggerSnapshot)
130 {
131 // check if the message is new (msg trigger > archived trigger)
132 // and already valid (msgTrigger <= currentTriggerCopy)
133 if((msgTrigger > lastReceivedTrigger &&
134 msgTrigger <= triggerSnapshot) ||
135 ((int64_t) triggerSnapshot- // check for triggerCounterOverflow
136 (int64_t)lastReceivedTrigger < -MAX_TRIGGER/2))
137 {
138 return TRUE;
139 }
140 }
141 return FALSE;
142 }
144 void lossyCom__receiveMsg(lossyCom__endpoint_t* localEndpoint)
145 {
146 uint16_t broadcastTriggerSnapshot, p2pTriggerSnapshot;
147 lossyCom__exchange_t* centralExchange;
148 lossyCom__endpointID_t senderEndpointID;
149 lossyCom__endpointID_t receiverEndpointID;
150 lossyCom__msg_t msgCopy;
151 uint16_t msgTrigger;
152 lossyCom__msgBody_t msgBody;
154 centralExchange = localEndpoint->centralExchange;
155 //save trigger counter to know find valid messages
156 broadcastTriggerSnapshot = centralExchange->BroadcastTriggerCounter;
157 p2pTriggerSnapshot =
158 centralExchange->p2pTriggerCounter[localEndpoint->endpointID];
160 //new message arrived if trigger counter is higher than the last time read
161 if( broadcastTriggerSnapshot > localEndpoint->lastReceivedBroadcastTrigger ||
162 p2pTriggerSnapshot > localEndpoint->lastReceivedp2pTrigger)
163 {
164 senderEndpointID = 0;
165 //search outboxes for new messages
166 while(senderEndpointID < centralExchange->numEndpoints)
167 {
168 //ignore own outbox
169 if(senderEndpointID != localEndpoint->endpointID)
170 {
171 msgCopy = centralExchange->outboxArray[senderEndpointID];
172 msgTrigger = 0xFFFF & (msgCopy >> TRIGGER_SHIFT);
173 receiverEndpointID = 0xFFFF & (msgCopy >> ENDPOINT_ID_SHIFT);
175 if(receiverEndpointID == BROADCAST_ID){//receive broadcast message
176 if(isUnreceivedMsg(msgTrigger,
177 localEndpoint->lastReceivedBroadcastTrigger,
178 broadcastTriggerSnapshot))
179 {
180 //let the message handler parse the message
181 msgBody = 0xFFFFFFFF & msgCopy;
182 //only receive broadcast and p2p for own receiverID
184 (*(localEndpoint->msgHandler))(senderEndpointID,
185 msgBody,
186 localEndpoint->msgHandlerData);
187 }
188 }else{//point 2 point message
189 if(receiverEndpointID == localEndpoint->endpointID &&
190 isUnreceivedMsg(msgTrigger,
191 localEndpoint->lastReceivedp2pTrigger,
192 p2pTriggerSnapshot))
193 {
194 //let the message handler parse the message
195 msgBody = 0xFFFFFFFF & msgCopy;
196 //only receive broadcast and p2p for own receiverID
198 (*(localEndpoint->msgHandler))(senderEndpointID,
199 msgBody,
200 localEndpoint->msgHandlerData);
201 }
202 }
203 }
204 senderEndpointID++;
205 }//search outbox loop
206 }
207 //save last TriggerCounter of last parsed Msg
208 localEndpoint->lastReceivedBroadcastTrigger = broadcastTriggerSnapshot;
209 localEndpoint->lastReceivedp2pTrigger = p2pTriggerSnapshot;
210 }
