1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
|
#include <simgrid/s4u.hpp>
#include <simgrid/s4u/Mailbox.hpp>
#include <simgrid/s4u/Host.hpp>
#include <simgrid/plugins/energy.h>
#include <xbt/log.h>
#include <string>
#include <sstream>
#include "Inputs.hpp"
#include "simgrid/s4u/Actor.hpp"
#define PLATFORM_FILE "platform.xml"
#define MODE_OFF() simgrid::s4u::this_actor::get_host()->set_pstate(0);
#define MODE_ON() simgrid::s4u::this_actor::get_host()->set_pstate(1);
#define MODE_RX() simgrid::s4u::this_actor::get_host()->set_pstate(2);
#define MODE_TX() simgrid::s4u::this_actor::get_host()->set_pstate(3);
#define CLOCK (simgrid::s4u::Engine::get_clock())
#define CNAME (selfName.c_str())
#define FOR(t) (t<uptime?t:uptime)
#define ADD_EVENT(HINT) \
{ \
XBT_INFO("%s add a new hint at %f for a duration of %f",CNAME,HINT->hint,HINT->duration); \
i.AddEvent(HINT->hint, HINT->duration); \
}
#define TRACK_UPTIME(instruction) \
{ \
instruction; \
uptime=upuntil-CLOCK; \
uptime=uptime > 0 ? uptime : 0; \
}
/// @brief Note that we need to simulate latency our self since we need to send instantaneous messages
#define SEND(instruction) \
{ \
TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(i.latency > uptime ? uptime : i.latency)); \
instruction; \
}
#define FORWARD_HINT(TRY_FORWARD_DURING) \
{ \
if(hint_forward!=NULL && CLOCK<hint_forward->hint){ \
hint_forward->HisForward=true; \
hint_forward->DedicatedMailbox="hint_forward"; \
try { \
XBT_INFO("%s try to forward a hint",CNAME); \
TRACK_UPTIME(m->put(hint_forward,0,TRY_FORWARD_DURING)); \
simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(hint_forward->DedicatedMailbox); \
MODE_TX(); \
SEND(m_ded->put(hint_forward,0,uptime)); \
XBT_INFO("%s forward a hint successfully",CNAME); \
} \
catch(...){ \
XBT_INFO("%s fail to forward a hint",CNAME); \
MODE_ON(); \
uptime=upuntil-CLOCK; \
TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(FOR(TRY_FORWARD_DURING))); \
} \
} \
}
/// @brief Required by SimGrid
XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO] Loosely Coupled DSS");
/// @brief For convenience sake
typedef unsigned int u32;
/**
* Data that will be exchange between the nodes
*/
class Payload{
public:
Payload():hint(0),duration(0),HasHint(false),HisForward(false),HasData(false),DataSize(0),Abort(false){}
Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox),HisForward(p.HisForward),HasData(p.HasData),DataSize(p.DataSize),Abort(p.Abort){}
double hint; // The timestamp that should be used by the receiver
double duration; // The duration that should be used by the receiver
bool HasHint;
bool HasData; // This way observer could check if they want to receive data (maybe they already received data)
bool HisForward;
bool Abort; // Allow the receiver to abort a communication (if they already received the data for example) and unlock the sender
u32 DataSize;
std::string DedicatedMailbox; // Dedicated mailbox used by the sender/receiver
};
/// @brief Observation node code
static void obs_node(std::vector<std::string> args);
/**
* No arguments are require (cf inputs.json)
*/
int main(int argc, char **argv) {
// Build engine
sg_host_energy_plugin_init();
simgrid::s4u::Engine engine(&argc, argv);
Inputs::GeneratePlatform(PLATFORM_FILE);
engine.load_platform(PLATFORM_FILE);
// Headline
XBT_INFO("-------------------------------------------------");
XBT_INFO("Sarting loosely coupled data dissemination experiments");
XBT_INFO("-------------------------------------------------");
// Init all nodes actors
u32 nON=simgrid::s4u::Engine::get_instance()->get_host_count();
for(u32 i=0;i<nON;i++){
std::vector<std::string> args; // No args
std::ostringstream ss;
ss<< "on" <<i;
simgrid::s4u::Actor::create("ON", simgrid::s4u::Host::by_name(ss.str()), obs_node, args);
}
// Launch the simulation
engine.run();
XBT_INFO("Simulation took %fs", simgrid::s4u::Engine::get_clock());
XBT_INFO("The simulated platform file is available in \"%s\"",PLATFORM_FILE);
return (0);
}
/**
* This is the brain behind each node
*/
static void obs_node(std::vector<std::string> args) {
// Init various variables
std::string selfName = simgrid::s4u::this_actor::get_host()->get_name();
simgrid::s4u::this_actor::get_host()->turn_on();
Inputs i=(selfName); // Load node input parameters from the json file
simgrid::s4u::Mailbox *m = simgrid::s4u::Mailbox::by_name("medium");
XBT_INFO("Deploying observation node %s",CNAME);
// Starting node
u32 nWakeUp=0;
u32 nDataRcv=0;
u32 nSendFail=0;
u32 nRcvFail=0;
u32 nSend=0;
u32 hint_added=0;
double totalUptime=0;
Payload *hint_forward=NULL; // Contains the hint to forward
bool is_sender=i.is_sender; // This variable might change if all receiver have received the data
bool isObserver=false;
double timeDataRcv=-1;
while(i.ShouldContinue()){
// Start by sleeping
XBT_INFO("%s is sleeping",CNAME);
MODE_OFF();
simgrid::s4u::this_actor::sleep_until(i.GetTS());
MODE_ON();
XBT_INFO("%s wakes up",CNAME);
// Doing wake up stuff
double uptime=i.GetDuration(); // Store the remaining wake up duration (updated during the node uptime)
double upsince=CLOCK; // Store the time at which the node woke up
double upuntil=i.GetTS()+i.GetDuration(); // Store the time at which the node should sleep
bool forward_mode=false; // Turned on and off every x seconds by the receiver (to switch between forward hint mode and receiving data mode)
bool forward_only=false; // When observer receive a hint it switch to forward only up to the next wake up time
bool sendhint_mode=false; // Turned on and off every x seconds by the sender (to switch between send hint and send data)
while(CLOCK < upuntil)
{
// ---------- SENDER ----------
if(is_sender){
// Send hint if send hint mode is enable
if(i.use_hint && sendhint_mode && i.HasNext()){
Payload *p=new Payload();
p->DedicatedMailbox="hintmailbox"+selfName; // Use a dedicated mailbox
p->HasHint=true;
p->duration=i.GetNextDuration();
p->hint=i.GetNextTS();
p->DataSize=i.hint_size;
try {
TRACK_UPTIME(m->put(p,0,FOR(0.3))); // Init connection with a receiver
simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
MODE_TX();
SEND(m_ded->put(p,p->DataSize,uptime)); // Send the actual hint
MODE_ON();
XBT_INFO("%s sent a hint successfully",CNAME);
}
catch(...){}
}
// Send data if send hint mode is disable
else{
Payload *p=new Payload();
p->DedicatedMailbox="datamailbox"+selfName;
p->HasData=true;
p->HasHint=false;
p->DataSize=i.data_size;
// Add hint to the data if possible
if(i.use_hint && i.HasNext()){
p->HasHint=true;
p->duration=i.GetNextDuration();
p->hint=i.GetNextTS();
p->DataSize+=i.hint_size; // Don't forget!!
}
// Send the data
try {
TRACK_UPTIME(m->put(p,0,FOR(1)));
simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
Payload *ack=m_ded->get<Payload>();
if(!ack->Abort){
MODE_TX();
if(i.extended){
SEND(m_ded->put(p,p->DataSize));
}
else{
SEND(m_ded->put(p,p->DataSize,uptime));
}
XBT_INFO("%s sent data successfully",CNAME);
nSend++;
is_sender=(nSend<(i.n_nodes-1)); // Stop sending if all nodes received
isObserver=!is_sender; // Switch to observer mode if all nodes received the data
}
else {
simgrid::s4u::this_actor::sleep_for(FOR(1));
}
}
catch(...){}
}
sendhint_mode=!sendhint_mode; // Switch back and forth between sending hint and data
MODE_ON();
}
// ---------- RECEIVER ----------
else if(!isObserver){
// Forward hint mode
if(forward_mode){
if(i.use_hint && hint_forward!=NULL && CLOCK < hint_forward->hint){
try {
FORWARD_HINT(FOR(0.3)); // Try forward for 0.3 seconds then switch to received mode
}
catch(...){}
}
}
else { // Receiving mode
Payload *p; // Received data
try {
// Get the instantaneous message
do {
TRACK_UPTIME(p=m->get<Payload>(FOR(1)));
if(p->HisForward){
if(hint_forward==NULL || (hint_forward !=NULL && p->hint>hint_forward->hint)){
simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
MODE_RX();
TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
MODE_ON();
XBT_INFO("%s received a forwarded hint successfully",CNAME);
if(CLOCK < p->hint){
ADD_EVENT(p);
hint_forward=new Payload(*p);
hint_added++;
}
}
}
} while(p->HisForward);
simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
// Start receiving data
MODE_RX();
if(p->HasHint && !p->HasData){
TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
XBT_INFO("%s received a hint successfully",CNAME);
hint_forward=new Payload(*p); // Enable hint forwarding
if(CLOCK < p->hint){
ADD_EVENT(p);
hint_forward=new Payload(*p);
hint_added++;
}
}
else {
// Inform the sender that we do not want to abort
Payload *ack=new Payload();
ack->Abort=false;
m_ded->put(ack,0); // Instantaneous msg
if(i.extended){
p=m_ded->get<Payload>(); // Fetch data until sended
}
else{
TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); // Fetch data until sended or uptime expire
}
// If we reach here, data has been received successfully
XBT_INFO("%s received data successfully",CNAME);
timeDataRcv=CLOCK;
if(p->HasHint){
XBT_INFO("%s received a hint along with data successfully",CNAME);
hint_forward=new Payload(*p); // Enable hint forwarding
}
nDataRcv++;
isObserver=true;
is_sender=false;
}
}catch(...){
XBT_INFO("%s could not receive any data",CNAME);
nRcvFail++;
}
}
forward_mode=!forward_mode; // Toggle mode (go back and forth between receiving and forwarding)
}
// ---------- OBSERVER ----------
else {
XBT_INFO("%s is observing his environment...",CNAME);
MODE_ON();
// If use hint we should listen for the sender
if(i.use_hint){
if((forward_mode|forward_only) && hint_forward!=NULL && CLOCK < hint_forward->hint){
FORWARD_HINT(FOR(1));
}
else {
Payload *p;
try {
do {
TRACK_UPTIME(p=m->get<Payload>(FOR(1)));
} while(p->HisForward); // Ignore forwarded hint
simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
// Start receiving hint from sender
if(p->HasData){
Payload *ack=new Payload();
ack->Abort=true;
m_ded->put(ack,0);
simgrid::s4u::this_actor::sleep_for(FOR(1));
}
else if(p->HasHint){
MODE_RX();
TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
XBT_INFO("%s received a hint successfully",CNAME);
hint_forward=new Payload(*p); // Enable hint forwarding
forward_only=true;
}
else {
simgrid::s4u::this_actor::sleep_for(FOR(1));
}
}
catch(...){
}
}
forward_mode=!forward_mode;
}
else {
simgrid::s4u::this_actor::sleep_until(upuntil);
}
}
uptime=upuntil-CLOCK; // Note that uptime can be < 0 in extended mode
uptime=uptime > 0 ? uptime : 0; // Just in case
}
// Load next event
i.GotoNextEvent();
nWakeUp++; // Increase the number of wake up
totalUptime+=CLOCK-upsince; // Synchronize total uptime
}
// Done
MODE_OFF()
XBT_INFO("Observation node %s finished [LOG2PARSE](node:%s|isSender:%d|nSend:%d|nWakeUp:%d|nDataRcv:%d|nSendFail:%d|nRcvFail:%d|totalUptime:%f|seed:%d|hint_added:%d|timeDataRcv:%f)",CNAME,CNAME,i.is_sender,nSend,nWakeUp,nDataRcv,nSendFail,nRcvFail,totalUptime,i.seed,hint_added,timeDataRcv);
}
|