summaryrefslogtreecommitdiff
path: root/simulations/src
diff options
context:
space:
mode:
Diffstat (limited to 'simulations/src')
-rw-r--r--simulations/src/Inputs.cc184
-rw-r--r--simulations/src/Inputs.hpp84
-rw-r--r--simulations/src/scenarios.cc100
-rw-r--r--simulations/src/simulator.cc354
4 files changed, 722 insertions, 0 deletions
diff --git a/simulations/src/Inputs.cc b/simulations/src/Inputs.cc
new file mode 100644
index 0000000..8c3ac9c
--- /dev/null
+++ b/simulations/src/Inputs.cc
@@ -0,0 +1,184 @@
+#include "Inputs.hpp"
+
+#include <algorithm>
+#include <iostream>
+#include <fstream>
+
+Inputs::Inputs(std::string node_name){
+ // Here we do all the boring stuff
+ FILE* input_file = fopen(INPUTS_FILE, "rb");
+ char input_file_buffer[JSON_BUFFER_SIZE];
+ rapidjson::FileReadStream is(input_file, input_file_buffer, sizeof(input_file_buffer));
+ d.ParseStream(is);
+ fclose(input_file);
+
+ // Init all variables
+ is_sender=d["nodes"][node_name.c_str()]["is_sender"].GetBool();
+ use_hint=d["nodes"][node_name.c_str()]["use_hint"].GetBool();
+ data_size=d["nodes"][node_name.c_str()]["data_size"].GetInt();
+ extended=d["extended"].GetBool();
+ seed=d["seed"].GetInt();
+ hint_size=d["hint_size"].GetInt();
+ n_nodes=d["nodes"].MemberCount();
+ latency=d["latency"].GetDouble();
+
+ // Instantiate wake_ts
+ for(auto& v:d["nodes"][node_name.c_str()]["wake_ts"].GetArray()){
+ wake_ts.push_back(v.GetDouble());
+ }
+
+ // Instantiate wake_duration
+ for(auto& v:d["nodes"][node_name.c_str()]["wake_duration"].GetArray()){
+ wake_duration.push_back(v.GetDouble());
+ }
+
+ // Identity check
+ if(wake_ts.size()<1){
+ std::cerr << "Invalid node configuration: wake_ts.size() == 0" <<std::endl;
+ exit(1);
+ }
+ if(wake_ts.size()!=wake_duration.size()){
+ std::cerr << "Invalid node configuration: wake_ts.size() != wake_duration.size()" <<std::endl;
+ exit(1);
+ }
+ if(!std::is_sorted(wake_ts.begin(),wake_ts.end())){
+ std::cerr << "Invalid node configuration: wake_ts is not sorted" <<std::endl;
+ exit(1);
+ }
+
+ // Ensure events are merged
+ MergeEvents();
+}
+
+void Inputs::MergeEvents(){
+ for(int i=0;i<(wake_ts.size()-1);i++){
+ double cur_ts=wake_ts[i];
+ double next_ts=wake_ts[i+1];
+ double cur_duration=wake_duration[i];
+ double next_duration=wake_duration[i+1];
+ // If we should merge then
+ if((cur_ts+cur_duration)>=next_ts){
+ // Create variable for convenience
+ double start=cur_ts;
+ double end=std::max(cur_ts+cur_duration,next_ts+next_duration);
+ wake_duration[i]=end-start;
+ // Now remove next event
+ wake_ts.erase(wake_ts.begin() + i + 1);
+ wake_duration.erase(wake_duration.begin() + i +1);
+ // This is not optimal. Yet it is simple :D
+ MergeEvents();
+ }
+ }
+}
+
+double Inputs::GetNextTS(){
+ // Ensure that the caller is smart
+ if(wake_duration.size()<2){
+ std::cerr << "You are trying to access to the next timestamp but it does not exists" <<std::endl;
+ exit(1);
+ }
+ return wake_ts[1];
+}
+
+double Inputs::GetNextDuration(){
+ // Ensure that the caller is smart
+ if(wake_duration.size()<2){
+ std::cerr << "You are trying to access to the next duration but it does not exists" <<std::endl;
+ exit(1);
+ }
+ return wake_duration[1];
+}
+
+void Inputs::GotoNextEvent(){
+ wake_ts.erase(wake_ts.begin());
+ wake_duration.erase(wake_duration.begin());
+}
+
+void Inputs::DumpEvents(){
+ std::cout << "Timestamps: ";
+ for(auto a:wake_ts){
+ std::cout << std::setw(5) << a << " ";
+ }
+ std::cout << std::endl;
+ std::cout << "Wake Durations: ";
+ for(auto a:wake_duration){
+ std::cout << std::setw(5) << a << " ";
+ }
+ std::cout << std::endl;
+}
+
+void Inputs::AddEvent(double ts, double duration){
+ // First handle timestamp
+ int pos=0;
+ for(auto it = std::begin(wake_ts); it != std::end(wake_ts); ++it) {
+ if(*it>=ts){
+ wake_ts.insert(it,ts);
+ break;
+ }
+ pos++;
+ }
+
+ // Ensure that ts and duration should not go to the end
+ if(pos==wake_ts.size()){
+ wake_ts.push_back(ts);
+ wake_duration.push_back(duration);
+ }
+ else {
+ // Handle durations here
+ int pos2=0;
+ for(auto it = std::begin(wake_duration); it != std::end(wake_duration); ++it) {
+ if(pos==pos2){
+ wake_duration.insert(it,duration);
+ break;
+ }
+ else if (it+1==std::end(wake_duration)) {
+ wake_duration.push_back(duration);
+ }
+ pos2++;
+ }
+ }
+ // Don't forget
+ MergeEvents();
+}
+
+void Inputs::GeneratePlatform(std::string p){
+
+ // The boring stuff
+ FILE* input_file = fopen(INPUTS_FILE, "rb");
+ char input_file_buffer[JSON_BUFFER_SIZE];
+ rapidjson::FileReadStream is(input_file, input_file_buffer, sizeof(input_file_buffer));
+ rapidjson::Document d;
+ d.ParseStream(is);
+ fclose(input_file);
+
+ // Write platform file
+ std::ofstream pf;
+ pf.open (p);
+ pf << "<?xml version='1.0'?>\n";
+ pf << "<!DOCTYPE platform SYSTEM \"http://simgrid.gforge.inria.fr/simgrid/simgrid.dtd\">\n";
+ pf << "<platform version=\"4.1\">\n <AS id=\"AS0\" routing=\"Full\">\n";
+ pf << " <link id=\"link\" bandwidth=\""<<d["bitrate"].GetString()<<"\" latency=\"0ms\" sharing_policy=\"SHARED\"></link>\n";
+ for (Value::ConstMemberIterator itr = d["nodes"].MemberBegin(); itr != d["nodes"].MemberEnd(); ++itr)
+ {
+ std::string name=itr->name.GetString();
+ double power_off=d["nodes"][itr->name.GetString()]["power_off"].GetDouble();
+ double power_on=d["nodes"][itr->name.GetString()]["power_on"].GetDouble();
+ double power_rx=power_on+d["nodes"][itr->name.GetString()]["power_rx"].GetDouble();
+ double power_tx=power_on+d["nodes"][itr->name.GetString()]["power_tx"].GetDouble();
+
+ // Create node
+ pf << " <host id=\""<<name<<"\" speed=\"100.0f,100.0f,100.0f,100.0f\" pstate=\"0\">\n";
+ pf << " <prop id=\"wattage_per_state\" value=\""<< power_off<<":"<<power_off<<", "<< power_on<<":"<<power_on<<", "<<power_rx<<":"<<power_rx<<", "<<power_tx<<":"<<power_tx<<"\" />\n";
+ pf << " <prop id=\"wattage_off\" value=\"0\" />\n </host>\n";
+ }
+ for (Value::ConstMemberIterator src = d["nodes"].MemberBegin(); src != d["nodes"].MemberEnd(); ++src)
+ {
+ for (Value::ConstMemberIterator dst = d["nodes"].MemberBegin(); dst != d["nodes"].MemberEnd(); ++dst)
+ {
+ if(src->name.GetString() != dst->name.GetString())
+ pf << " <route src=\""<<src->name.GetString()<<"\" dst=\""<<dst->name.GetString()<<"\" symmetrical=\"no\"><link_ctn id=\"link\"/></route>\n";
+ }
+ }
+ pf << " </AS>\n</platform>\n";
+ pf.close();
+} \ No newline at end of file
diff --git a/simulations/src/Inputs.hpp b/simulations/src/Inputs.hpp
new file mode 100644
index 0000000..487686f
--- /dev/null
+++ b/simulations/src/Inputs.hpp
@@ -0,0 +1,84 @@
+#include <rapidjson/document.h>
+#include <rapidjson/filereadstream.h>
+
+#include <cstdio>
+#include <string>
+#include <vector>
+#include <iomanip>
+
+#define INPUTS_FILE "inputs.json"
+/// @brief Pay attention to this strange number, you could tear your hairs out
+#define JSON_BUFFER_SIZE 65536
+
+using namespace rapidjson;
+
+class Inputs {
+ /// @brief RapidJSON
+ Document d;
+ /// @brief Current node associated with the Inputs
+ std::string node_name;
+ /// @brief Timestamps (at which time the nodes should wake up)
+ std::vector<double> wake_ts;
+ /// @brief Wake up time durations
+ std::vector<double> wake_duration;
+ /**
+ * Recursively merge overlapping events
+ */
+ void MergeEvents();
+public:
+ /**
+ * Load node_name configuration
+ */
+ Inputs(std::string node_name);
+ /**
+ * Generate a SimGrid platform file from the json configuration
+ */
+ static void GeneratePlatform(std::string p);
+ /**
+ * Is there any event that remains in the queue ?
+ */
+ bool ShouldContinue(){return wake_ts.size()!=0;}
+ /**
+ * Is there another event to process ?
+ */
+ bool HasNext(){return wake_ts.size()>1 ;}
+ /**
+ * Get current event timestamp
+ */
+ double GetTS(){return wake_ts.front();}
+ /**
+ * Get current event duration
+ */
+ double GetDuration(){return wake_duration.front();}
+ /**
+ * Get next event timestamp
+ */
+ double GetNextTS();
+ /**
+ * Get next event duration
+ */
+ double GetNextDuration();
+ /**
+ * Time travel machine (note that this function is following the second principle
+ * of thermodynamics)
+ */
+ void GotoNextEvent();
+ /**
+ * Allows to add a *FUTURE* event and merge overlapping events
+ */
+ void AddEvent(double ts, double duration);
+ /**
+ * This is the timeline
+ */
+ void DumpEvents();
+
+ /// @brief These are public attributes, please take care they are fragile
+ bool is_sender;
+ bool use_hint;
+ bool extended;
+ int data_size;
+ int hint_size;
+ int seed;
+ int n_nodes;
+ double latency;
+}; \ No newline at end of file
diff --git a/simulations/src/scenarios.cc b/simulations/src/scenarios.cc
new file mode 100644
index 0000000..90a7849
--- /dev/null
+++ b/simulations/src/scenarios.cc
@@ -0,0 +1,100 @@
+#include <cstdlib>
+#include <cstring>
+#include <rapidjson/document.h>
+#include <rapidjson/filewritestream.h>
+#include <rapidjson/prettywriter.h>
+
+#include <cstdio>
+#include <iostream>
+#include <time.h>
+#include <sstream>
+
+
+#define RAND(min,max) (rand()%((max)-(min)+1)+(min))
+
+using namespace std;
+using namespace rapidjson;
+
+int main(int argc, char **argv){
+ // Setup seed
+ if(argc!=16){
+ cerr << "Usage: " << argv[0] <<
+ " <seed> <simtime> <wakeupevery> <wakeupfor> <n_nodes>" <<
+ " <extended> <hint> <poff> <pon> <prx> <ptx> <datasize> <bitrate> <hintsize> <latency>" <<
+ endl;
+ exit(1);
+ }
+
+ // Init parameters
+ int seed=atoi(argv[1]);
+ double simtime=stod(argv[2]);
+ unsigned int wakeupevery=atoi(argv[3]);
+ unsigned int wakeupfor=stoi(argv[4]);
+ unsigned int n_nodes=atoi(argv[5]);
+ bool extended=!strcmp("true",argv[6]);
+ bool hint=!strcmp("true",argv[7]);
+ double poff=stod(argv[8]);
+ double pon=stod(argv[9]);
+ double prx=stod(argv[10]);
+ double ptx=stod(argv[11]);
+ unsigned int datasize=atoi(argv[12]);
+ string bitrate(argv[13]);
+ unsigned int hintsize=atoi(argv[14]);
+ double latency=stod(argv[15]);
+
+
+ // Setup seed
+ srand(seed);
+
+ // Create document
+ Document d;
+ d.SetObject();
+ d.AddMember("seed",Value().SetInt(seed),d.GetAllocator());
+ Value bitrateValue;
+ bitrateValue.SetString(bitrate.c_str(),bitrate.size(),d.GetAllocator());
+ d.AddMember("bitrate",bitrateValue,d.GetAllocator());
+ d.AddMember("latency",latency,d.GetAllocator());
+ d.AddMember("extended",extended,d.GetAllocator());
+ d.AddMember("hint_size",hintsize,d.GetAllocator());
+
+ // Create nodes
+ Value nodes(kObjectType);
+ for(int i=0;i<n_nodes;i++){
+ Value node(kObjectType);
+ node.SetObject();
+ node.AddMember("use_hint",hint,d.GetAllocator());
+ node.AddMember("power_off",poff,d.GetAllocator());
+ node.AddMember("power_on",pon,d.GetAllocator());
+ node.AddMember("power_rx",prx,d.GetAllocator());
+ node.AddMember("power_tx",ptx,d.GetAllocator());
+ node.AddMember("is_sender",i==0,d.GetAllocator());
+ node.AddMember("data_size",datasize,d.GetAllocator());
+
+ // Setup ts and durations
+ Value ts(kArrayType);
+ Value duration(kArrayType);
+ for(unsigned int i=0;i<simtime;i+=wakeupevery){
+ ts.PushBack(Value().SetDouble(RAND(i,i+wakeupevery-wakeupfor)),d.GetAllocator());
+ duration.PushBack(Value().SetDouble(wakeupfor),d.GetAllocator());
+ }
+ node.AddMember("wake_ts",ts,d.GetAllocator());
+ node.AddMember("wake_duration",duration,d.GetAllocator());
+
+
+ // Add node to nodes
+ std::ostringstream ss;
+ ss<< "on" <<i;
+ Value key(ss.str().c_str(), d.GetAllocator());
+ nodes.AddMember(key,node,d.GetAllocator());
+ }
+ d.AddMember("nodes",nodes,d.GetAllocator());
+
+
+ // Write to stdout
+ StringBuffer buffer;
+ PrettyWriter<StringBuffer> writer(buffer);
+ d.Accept(writer);
+ cout << buffer.GetString();
+
+ return 0;
+} \ No newline at end of file
diff --git a/simulations/src/simulator.cc b/simulations/src/simulator.cc
new file mode 100644
index 0000000..0215b32
--- /dev/null
+++ b/simulations/src/simulator.cc
@@ -0,0 +1,354 @@
+#include <simgrid/s4u.hpp>
+#include <simgrid/s4u/Mailbox.hpp>
+#include <simgrid/s4u/Host.hpp>
+#include <simgrid/plugins/energy.h>
+#include <xbt/log.h>
+
+#include <string>
+#include <sstream>
+
+#include "Inputs.hpp"
+#include "simgrid/s4u/Actor.hpp"
+
+
+#define PLATFORM_FILE "platform.xml"
+#define MODE_OFF() simgrid::s4u::this_actor::get_host()->set_pstate(0);
+#define MODE_ON() simgrid::s4u::this_actor::get_host()->set_pstate(1);
+#define MODE_RX() simgrid::s4u::this_actor::get_host()->set_pstate(2);
+#define MODE_TX() simgrid::s4u::this_actor::get_host()->set_pstate(3);
+#define CLOCK (simgrid::s4u::Engine::get_clock())
+#define CNAME (selfName.c_str())
+#define FOR(t) (t<uptime?t:uptime)
+#define ADD_EVENT(HINT) \
+ { \
+ XBT_INFO("%s add a new hint at %f for a duration of %f",CNAME,HINT->hint,HINT->duration); \
+ i.AddEvent(HINT->hint, HINT->duration); \
+ }
+#define TRACK_UPTIME(instruction) \
+ { \
+ instruction; \
+ uptime=upuntil-CLOCK; \
+ uptime=uptime > 0 ? uptime : 0; \
+ }
+/// @brief Note that we need to simulate latency our self since we need to send instantaneous messages
+#define SEND(instruction) \
+ { \
+ TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(i.latency > uptime ? uptime : i.latency)); \
+ instruction; \
+ }
+
+#define FORWARD_HINT(TRY_FORWARD_DURING) \
+ { \
+ if(hint_forward!=NULL && CLOCK<hint_forward->hint){ \
+ hint_forward->HisForward=true; \
+ hint_forward->DedicatedMailbox="hint_forward"; \
+ try { \
+ XBT_INFO("%s try to forward a hint",CNAME); \
+ TRACK_UPTIME(m->put(hint_forward,0,TRY_FORWARD_DURING)); \
+ simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(hint_forward->DedicatedMailbox); \
+ MODE_TX(); \
+ SEND(m_ded->put(hint_forward,0,uptime)); \
+ XBT_INFO("%s forward a hint successfully",CNAME); \
+ } \
+ catch(...){ \
+ XBT_INFO("%s fail to forward a hint",CNAME); \
+ MODE_ON(); \
+ uptime=upuntil-CLOCK; \
+ TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(FOR(TRY_FORWARD_DURING))); \
+ } \
+ } \
+ }
+
+
+/// @brief Required by SimGrid
+XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO] Loosely Coupled DSS");
+
+/// @brief For convenience sake
+typedef unsigned int u32;
+
+/**
+ * Data that will be exchange between the nodes
+ */
+class Payload{
+public:
+ Payload():hint(0),duration(0),HasHint(false),HisForward(false),HasData(false),DataSize(0),Abort(false){}
+ Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox),HisForward(p.HisForward),HasData(p.HasData),DataSize(p.DataSize),Abort(p.Abort){}
+ double hint; // The timestamp that should be used by the receiver
+ double duration; // The duration that should be used by the receiver
+ bool HasHint;
+ bool HasData; // This way observer could check if they want to receive data (maybe they already received data)
+ bool HisForward;
+ bool Abort; // Allow the receiver to abort a communication (if they already received the data for example) and unlock the sender
+ u32 DataSize;
+ std::string DedicatedMailbox; // Dedicated mailbox used by the sender/receiver
+};
+
+/// @brief Observation node code
+static void obs_node(std::vector<std::string> args);
+
+/**
+ * No arguments are require (cf inputs.json)
+ */
+int main(int argc, char **argv) {
+
+ // Build engine
+ sg_host_energy_plugin_init();
+ simgrid::s4u::Engine engine(&argc, argv);
+ Inputs::GeneratePlatform(PLATFORM_FILE);
+ engine.load_platform(PLATFORM_FILE);
+
+ // Headline
+ XBT_INFO("-------------------------------------------------");
+ XBT_INFO("Sarting loosely coupled data dissemination experiments");
+ XBT_INFO("-------------------------------------------------");
+
+ // Init all nodes actors
+ u32 nON=simgrid::s4u::Engine::get_instance()->get_host_count();
+ for(u32 i=0;i<nON;i++){
+ std::vector<std::string> args; // No args
+ std::ostringstream ss;
+ ss<< "on" <<i;
+ simgrid::s4u::Actor::create("ON", simgrid::s4u::Host::by_name(ss.str()), obs_node, args);
+ }
+
+ // Launch the simulation
+ engine.run();
+ XBT_INFO("Simulation took %fs", simgrid::s4u::Engine::get_clock());
+ XBT_INFO("The simulated platform file is available in \"%s\"",PLATFORM_FILE);
+ return (0);
+}
+
+/**
+ * This is the brain behind each node
+ */
+static void obs_node(std::vector<std::string> args) {
+ // Init various variables
+ std::string selfName = simgrid::s4u::this_actor::get_host()->get_name();
+ simgrid::s4u::this_actor::get_host()->turn_on();
+ Inputs i=(selfName); // Load node input parameters from the json file
+ simgrid::s4u::Mailbox *m = simgrid::s4u::Mailbox::by_name("medium");
+ XBT_INFO("Deploying observation node %s",CNAME);
+
+ // Starting node
+ u32 nWakeUp=0;
+ u32 nDataRcv=0;
+ u32 nSendFail=0;
+ u32 nRcvFail=0;
+ u32 nSend=0;
+ u32 hint_added=0;
+ double totalUptime=0;
+ Payload *hint_forward=NULL; // Contains the hint to forward
+ bool is_sender=i.is_sender; // This variable might change if all receiver have received the data
+ bool isObserver=false;
+ double timeDataRcv=-1;
+ while(i.ShouldContinue()){
+ // Start by sleeping
+ XBT_INFO("%s is sleeping",CNAME);
+ MODE_OFF();
+ simgrid::s4u::this_actor::sleep_until(i.GetTS());
+ MODE_ON();
+ XBT_INFO("%s wakes up",CNAME);
+
+ // Doing wake up stuff
+ double uptime=i.GetDuration(); // Store the remaining wake up duration (updated during the node uptime)
+ double upsince=CLOCK; // Store the time at which the node woke up
+ double upuntil=i.GetTS()+i.GetDuration(); // Store the time at which the node should sleep
+ bool forward_mode=false; // Turned on and off every x seconds by the receiver (to switch between forward hint mode and receiving data mode)
+ bool forward_only=false; // When observer receive a hint it switch to forward only up to the next wake up time
+ bool sendhint_mode=false; // Turned on and off every x seconds by the sender (to switch between send hint and send data)
+ while(CLOCK < upuntil)
+ {
+ // ---------- SENDER ----------
+ if(is_sender){
+ // Send hint if send hint mode is enable
+ if(i.use_hint && sendhint_mode && i.HasNext()){
+ Payload *p=new Payload();
+ p->DedicatedMailbox="hintmailbox"+selfName; // Use a dedicated mailbox
+ p->HasHint=true;
+ p->duration=i.GetNextDuration();
+ p->hint=i.GetNextTS();
+ p->DataSize=i.hint_size;
+ try {
+ TRACK_UPTIME(m->put(p,0,FOR(0.3))); // Init connection with a receiver
+ simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
+ MODE_TX();
+ SEND(m_ded->put(p,p->DataSize,uptime)); // Send the actual hint
+ MODE_ON();
+ XBT_INFO("%s sent a hint successfully",CNAME);
+ }
+ catch(...){}
+ }
+ // Send data if send hint mode is disable
+ else{
+ Payload *p=new Payload();
+ p->DedicatedMailbox="datamailbox"+selfName;
+ p->HasData=true;
+ p->HasHint=false;
+ p->DataSize=i.data_size;
+ // Add hint to the data if possible
+ if(i.use_hint && i.HasNext()){
+ p->HasHint=true;
+ p->duration=i.GetNextDuration();
+ p->hint=i.GetNextTS();
+ p->DataSize+=i.hint_size; // Don't forget!!
+ }
+ // Send the data
+ try {
+ TRACK_UPTIME(m->put(p,0,FOR(1)));
+ simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
+ Payload *ack=m_ded->get<Payload>();
+ if(!ack->Abort){
+ MODE_TX();
+ XBT_INFO("%s try a send",CNAME);
+ if(i.extended){
+ SEND(m_ded->put(p,p->DataSize));
+ }
+ else{
+ SEND(m_ded->put(p,p->DataSize,uptime));
+ }
+ XBT_INFO("%s sent data successfully",CNAME);
+ nSend++;
+ is_sender=(nSend<(i.n_nodes-1)); // Stop sending if all nodes received
+ isObserver=!is_sender; // Switch to observer mode if all nodes received the data
+ }
+ else {
+ simgrid::s4u::this_actor::sleep_for(FOR(1));
+ }
+ }
+ catch(...){}
+ }
+ sendhint_mode=!sendhint_mode; // Switch back and forth between sending hint and data
+ MODE_ON();
+ }
+ // ---------- RECEIVER ----------
+ else if(!isObserver){
+ // Forward hint mode
+ if(forward_mode){
+ if(i.use_hint && hint_forward!=NULL && CLOCK < hint_forward->hint){
+ try {
+ FORWARD_HINT(FOR(0.3)); // Try forward for 0.3 seconds then switch to received mode
+ }
+ catch(...){}
+ }
+ }
+ else { // Receiving mode
+ Payload *p; // Received data
+ try {
+ // Get the instantaneous message
+ do {
+ TRACK_UPTIME(p=m->get<Payload>(FOR(1)));
+ if(p->HisForward){
+ if(hint_forward==NULL || (hint_forward !=NULL && p->hint>hint_forward->hint)){
+ simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
+ MODE_RX();
+ TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
+ MODE_ON();
+ XBT_INFO("%s received a forwarded hint successfully",CNAME);
+ if(CLOCK < p->hint){
+ ADD_EVENT(p);
+ hint_forward=new Payload(*p);
+ hint_added++;
+ }
+ }
+ }
+ } while(p->HisForward);
+ simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
+ // Start receiving data
+ MODE_RX();
+ if(p->HasHint && !p->HasData){
+ TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
+ XBT_INFO("%s received a hint successfully",CNAME);
+ hint_forward=new Payload(*p); // Enable hint forwarding
+ if(CLOCK < p->hint){
+ ADD_EVENT(p);
+ hint_forward=new Payload(*p);
+ hint_added++;
+ }
+ }
+ else {
+ // Inform the sender that we do not want to abort
+ Payload *ack=new Payload();
+ ack->Abort=false;
+ m_ded->put(ack,0); // Instantaneous msg
+
+ if(i.extended){
+ p=m_ded->get<Payload>(); // Fetch data until sended
+ }
+ else{
+ TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); // Fetch data until sended or uptime expire
+ }
+ // If we reach here, data has been received successfully
+ XBT_INFO("%s received data successfully",CNAME);
+ timeDataRcv=CLOCK;
+ if(p->HasHint){
+ XBT_INFO("%s received a hint along with data successfully",CNAME);
+ hint_forward=new Payload(*p); // Enable hint forwarding
+ }
+ nDataRcv++;
+ isObserver=true;
+ is_sender=false;
+ }
+
+ }catch(...){
+ XBT_INFO("%s could not receive any data",CNAME);
+ nRcvFail++;
+ }
+ }
+ forward_mode=!forward_mode; // Toggle mode (go back and forth between receiving and forwarding)
+ }
+ // ---------- OBSERVER ----------
+ else {
+ XBT_INFO("%s is observing his environment...",CNAME);
+ MODE_ON();
+ // If use hint we should listen for the sender
+ if(i.use_hint){
+ if((forward_mode|forward_only) && hint_forward!=NULL && CLOCK < hint_forward->hint){
+ FORWARD_HINT(FOR(1));
+ }
+ else {
+ Payload *p;
+ try {
+ do {
+ TRACK_UPTIME(p=m->get<Payload>(FOR(1)));
+ } while(p->HisForward); // Ignore forwarded hint
+ simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
+ // Start receiving hint from sender
+ if(p->HasData){
+ Payload *ack=new Payload();
+ ack->Abort=true;
+ m_ded->put(ack,0);
+ simgrid::s4u::this_actor::sleep_for(FOR(1));
+ }
+ else if(p->HasHint){
+ MODE_RX();
+ TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
+ XBT_INFO("%s received a hint successfully",CNAME);
+ hint_forward=new Payload(*p); // Enable hint forwarding
+ forward_only=true;
+ }
+ else {
+ simgrid::s4u::this_actor::sleep_for(FOR(1));
+ }
+ }
+ catch(...){
+ }
+
+ }
+ forward_mode=!forward_mode;
+ }
+ else {
+ simgrid::s4u::this_actor::sleep_until(upuntil);
+ }
+ }
+ uptime=upuntil-CLOCK; // Note that uptime can be < 0 in extended mode
+ uptime=uptime > 0 ? uptime : 0; // Just in case
+ }
+ // Load next event
+ i.GotoNextEvent();
+ nWakeUp++; // Increase the number of wake up
+ totalUptime+=CLOCK-upsince; // Synchronize total uptime
+ }
+ // Done
+ MODE_OFF()
+ XBT_INFO("Observation node %s finished [LOG2PARSE](node:%s|isSender:%d|nSend:%d|nWakeUp:%d|nDataRcv:%d|nSendFail:%d|nRcvFail:%d|totalUptime:%f|seed:%d|hint_added:%d|timeDataRcv:%f)",CNAME,CNAME,i.is_sender,nSend,nWakeUp,nDataRcv,nSendFail,nRcvFail,totalUptime,i.seed,hint_added,timeDataRcv);
+} \ No newline at end of file