1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
#include "simgrid/s4u.hpp"
#include <simgrid/s4u/Mailbox.hpp>
#include <string>
#include <sstream>
#include "inputs.hpp"
#include "simgrid/s4u/Host.hpp"
#include "simgrid/plugins/energy.h"
#include "xbt/log.h"
#define PLATFORM_FILE "platform.xml"
#define TURN_OFF() simgrid::s4u::this_actor::get_host()->set_pstate(0);
#define TURN_ON() simgrid::s4u::this_actor::get_host()->set_pstate(1);
XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO]");
typedef unsigned int u32;
class Payload{
public:
Payload():hint(0),containsHint(false){}
double hint;
bool containsHint;
};
/// @brief Observation unit code
static void obs_node(std::vector<std::string> args);
int main(int argc, char **argv) {
// Build engine
sg_host_energy_plugin_init();
simgrid::s4u::Engine engine(&argc, argv);
Inputs::GeneratePlatform(PLATFORM_FILE);
engine.load_platform(PLATFORM_FILE);
XBT_INFO("-------------------------------------------------");
XBT_INFO("Sarting loosely coupled data dissemination experiments");
XBT_INFO("-------------------------------------------------");
u32 nON=simgrid::s4u::Engine::get_instance()->get_host_count();
for(u32 i=0;i<nON;i++){
std::vector<std::string> args;
std::ostringstream ss;
ss<< "on" <<i;
simgrid::s4u::Actor::create("ON", simgrid::s4u::Host::by_name(ss.str()), obs_node, args);
}
// Setup/Run simulation
engine.run();
XBT_INFO("Simulation took %fs", simgrid::s4u::Engine::get_clock());
return (0);
}
static void obs_node(std::vector<std::string> args) {
std::string selfName = simgrid::s4u::this_actor::get_host()->get_name();
simgrid::s4u::this_actor::get_host()->turn_on();
Inputs i(selfName);
simgrid::s4u::Mailbox *m = simgrid::s4u::Mailbox::by_name("medium");
XBT_INFO("Deploying observation node %s",selfName.c_str());
// Init convenien variables
double wake_interval=i.wake_interval;
double wake_duration=i.wake_duration;
double startup_delay=i.startup_delay;
int max_attempts=i.max_attempts;
bool isSender=i.is_sender;
bool useHint=i.use_hint;
bool isObserver=false;
u32 data_size=i.data_size;
// Starting node
u32 effectiveAttemps=0;
double effective_wake_duration=wake_duration;
double effective_wake_interval=wake_interval;
TURN_OFF();
simgrid::s4u::this_actor::sleep_for(startup_delay);
for(u32 i=0;i<max_attempts;i++){
// Sleeping
XBT_INFO("%s is spleeping",selfName.c_str());
TURN_OFF();
simgrid::s4u::this_actor::sleep_for(effective_wake_interval);
effective_wake_interval=wake_interval; // Restore wake interval
TURN_ON();
XBT_INFO("%s wakes up",selfName.c_str());
// Wake up: try to send/receive
try
{
if(isSender){
Payload *p=new Payload();
p->containsHint=true;
p->hint=5;
if(useHint){
p->containsHint=i<(max_attempts-1); // Ensure that we will wake up again
p->hint=wake_interval;
}
m->put(p,data_size,effective_wake_duration);
XBT_INFO("%s send data successfully",selfName.c_str());
isObserver=true; // Do one send for now...
isSender=false;
}
else if (!isObserver){
Payload* p=m->get<Payload>(effective_wake_duration);
if(p->containsHint){
XBT_INFO("%s received and hint of %f",selfName.c_str(),p->hint);
effective_wake_interval=p->hint;
i--; // Add new attempt
}
else{
XBT_INFO("%s received data successfully and switch to forwarding mode",selfName.c_str());
isSender=!isSender;
}
}
else {
XBT_INFO("%s is observing is environment...",selfName.c_str());
}
}
catch (...)
{
if(isSender)
XBT_INFO("%s failed to send data",selfName.c_str());
else
XBT_INFO("%s failed to receive data",selfName.c_str());
}
effectiveAttemps++;
}
// Done
XBT_INFO("Observation node %s finished (attemps:%d)",selfName.c_str(),effectiveAttemps);
}
|