diff options
| author | Loic Guegan <manzerbredes@mailbox.org> | 2022-11-11 15:47:19 +0100 |
|---|---|---|
| committer | Loic Guegan <manzerbredes@mailbox.org> | 2022-11-11 15:47:19 +0100 |
| commit | c2affb00ff404613f45b51cd97b50773982fde5f (patch) | |
| tree | 9a1263afec087c958b32d2ad48e691fc69db0df6 /simulations/src | |
| parent | b2ad7e6897077899ce70ecc8a4d994b3adc010ae (diff) | |
Minor changes
Diffstat (limited to 'simulations/src')
| -rw-r--r-- | simulations/src/Inputs.cc | 184 | ||||
| -rw-r--r-- | simulations/src/Inputs.hpp | 84 | ||||
| -rw-r--r-- | simulations/src/scenarios.cc | 100 | ||||
| -rw-r--r-- | simulations/src/simulator.cc | 354 |
4 files changed, 722 insertions, 0 deletions
diff --git a/simulations/src/Inputs.cc b/simulations/src/Inputs.cc new file mode 100644 index 0000000..8c3ac9c --- /dev/null +++ b/simulations/src/Inputs.cc @@ -0,0 +1,184 @@ +#include "Inputs.hpp" + +#include <algorithm> +#include <iostream> +#include <fstream> + +Inputs::Inputs(std::string node_name){ + // Here we do all the boring stuff + FILE* input_file = fopen(INPUTS_FILE, "rb"); + char input_file_buffer[JSON_BUFFER_SIZE]; + rapidjson::FileReadStream is(input_file, input_file_buffer, sizeof(input_file_buffer)); + d.ParseStream(is); + fclose(input_file); + + // Init all variables + is_sender=d["nodes"][node_name.c_str()]["is_sender"].GetBool(); + use_hint=d["nodes"][node_name.c_str()]["use_hint"].GetBool(); + data_size=d["nodes"][node_name.c_str()]["data_size"].GetInt(); + extended=d["extended"].GetBool(); + seed=d["seed"].GetInt(); + hint_size=d["hint_size"].GetInt(); + n_nodes=d["nodes"].MemberCount(); + latency=d["latency"].GetDouble(); + + // Instantiate wake_ts + for(auto& v:d["nodes"][node_name.c_str()]["wake_ts"].GetArray()){ + wake_ts.push_back(v.GetDouble()); + } + + // Instantiate wake_duration + for(auto& v:d["nodes"][node_name.c_str()]["wake_duration"].GetArray()){ + wake_duration.push_back(v.GetDouble()); + } + + // Identity check + if(wake_ts.size()<1){ + std::cerr << "Invalid node configuration: wake_ts.size() == 0" <<std::endl; + exit(1); + } + if(wake_ts.size()!=wake_duration.size()){ + std::cerr << "Invalid node configuration: wake_ts.size() != wake_duration.size()" <<std::endl; + exit(1); + } + if(!std::is_sorted(wake_ts.begin(),wake_ts.end())){ + std::cerr << "Invalid node configuration: wake_ts is not sorted" <<std::endl; + exit(1); + } + + // Ensure events are merged + MergeEvents(); +} + +void Inputs::MergeEvents(){ + for(int i=0;i<(wake_ts.size()-1);i++){ + double cur_ts=wake_ts[i]; + double next_ts=wake_ts[i+1]; + double cur_duration=wake_duration[i]; + double next_duration=wake_duration[i+1]; + // If we should merge then + if((cur_ts+cur_duration)>=next_ts){ + // Create variable for convenience + double start=cur_ts; + double end=std::max(cur_ts+cur_duration,next_ts+next_duration); + wake_duration[i]=end-start; + // Now remove next event + wake_ts.erase(wake_ts.begin() + i + 1); + wake_duration.erase(wake_duration.begin() + i +1); + // This is not optimal. Yet it is simple :D + MergeEvents(); + } + } +} + +double Inputs::GetNextTS(){ + // Ensure that the caller is smart + if(wake_duration.size()<2){ + std::cerr << "You are trying to access to the next timestamp but it does not exists" <<std::endl; + exit(1); + } + return wake_ts[1]; +} + +double Inputs::GetNextDuration(){ + // Ensure that the caller is smart + if(wake_duration.size()<2){ + std::cerr << "You are trying to access to the next duration but it does not exists" <<std::endl; + exit(1); + } + return wake_duration[1]; +} + +void Inputs::GotoNextEvent(){ + wake_ts.erase(wake_ts.begin()); + wake_duration.erase(wake_duration.begin()); +} + +void Inputs::DumpEvents(){ + std::cout << "Timestamps: "; + for(auto a:wake_ts){ + std::cout << std::setw(5) << a << " "; + } + std::cout << std::endl; + std::cout << "Wake Durations: "; + for(auto a:wake_duration){ + std::cout << std::setw(5) << a << " "; + } + std::cout << std::endl; +} + +void Inputs::AddEvent(double ts, double duration){ + // First handle timestamp + int pos=0; + for(auto it = std::begin(wake_ts); it != std::end(wake_ts); ++it) { + if(*it>=ts){ + wake_ts.insert(it,ts); + break; + } + pos++; + } + + // Ensure that ts and duration should not go to the end + if(pos==wake_ts.size()){ + wake_ts.push_back(ts); + wake_duration.push_back(duration); + } + else { + // Handle durations here + int pos2=0; + for(auto it = std::begin(wake_duration); it != std::end(wake_duration); ++it) { + if(pos==pos2){ + wake_duration.insert(it,duration); + break; + } + else if (it+1==std::end(wake_duration)) { + wake_duration.push_back(duration); + } + pos2++; + } + } + // Don't forget + MergeEvents(); +} + +void Inputs::GeneratePlatform(std::string p){ + + // The boring stuff + FILE* input_file = fopen(INPUTS_FILE, "rb"); + char input_file_buffer[JSON_BUFFER_SIZE]; + rapidjson::FileReadStream is(input_file, input_file_buffer, sizeof(input_file_buffer)); + rapidjson::Document d; + d.ParseStream(is); + fclose(input_file); + + // Write platform file + std::ofstream pf; + pf.open (p); + pf << "<?xml version='1.0'?>\n"; + pf << "<!DOCTYPE platform SYSTEM \"http://simgrid.gforge.inria.fr/simgrid/simgrid.dtd\">\n"; + pf << "<platform version=\"4.1\">\n <AS id=\"AS0\" routing=\"Full\">\n"; + pf << " <link id=\"link\" bandwidth=\""<<d["bitrate"].GetString()<<"\" latency=\"0ms\" sharing_policy=\"SHARED\"></link>\n"; + for (Value::ConstMemberIterator itr = d["nodes"].MemberBegin(); itr != d["nodes"].MemberEnd(); ++itr) + { + std::string name=itr->name.GetString(); + double power_off=d["nodes"][itr->name.GetString()]["power_off"].GetDouble(); + double power_on=d["nodes"][itr->name.GetString()]["power_on"].GetDouble(); + double power_rx=power_on+d["nodes"][itr->name.GetString()]["power_rx"].GetDouble(); + double power_tx=power_on+d["nodes"][itr->name.GetString()]["power_tx"].GetDouble(); + + // Create node + pf << " <host id=\""<<name<<"\" speed=\"100.0f,100.0f,100.0f,100.0f\" pstate=\"0\">\n"; + pf << " <prop id=\"wattage_per_state\" value=\""<< power_off<<":"<<power_off<<", "<< power_on<<":"<<power_on<<", "<<power_rx<<":"<<power_rx<<", "<<power_tx<<":"<<power_tx<<"\" />\n"; + pf << " <prop id=\"wattage_off\" value=\"0\" />\n </host>\n"; + } + for (Value::ConstMemberIterator src = d["nodes"].MemberBegin(); src != d["nodes"].MemberEnd(); ++src) + { + for (Value::ConstMemberIterator dst = d["nodes"].MemberBegin(); dst != d["nodes"].MemberEnd(); ++dst) + { + if(src->name.GetString() != dst->name.GetString()) + pf << " <route src=\""<<src->name.GetString()<<"\" dst=\""<<dst->name.GetString()<<"\" symmetrical=\"no\"><link_ctn id=\"link\"/></route>\n"; + } + } + pf << " </AS>\n</platform>\n"; + pf.close(); +}
\ No newline at end of file diff --git a/simulations/src/Inputs.hpp b/simulations/src/Inputs.hpp new file mode 100644 index 0000000..487686f --- /dev/null +++ b/simulations/src/Inputs.hpp @@ -0,0 +1,84 @@ +#include <rapidjson/document.h> +#include <rapidjson/filereadstream.h> + +#include <cstdio> +#include <string> +#include <vector> +#include <iomanip> + +#define INPUTS_FILE "inputs.json" +/// @brief Pay attention to this strange number, you could tear your hairs out +#define JSON_BUFFER_SIZE 65536 + +using namespace rapidjson; + +class Inputs { + /// @brief RapidJSON + Document d; + /// @brief Current node associated with the Inputs + std::string node_name; + /// @brief Timestamps (at which time the nodes should wake up) + std::vector<double> wake_ts; + /// @brief Wake up time durations + std::vector<double> wake_duration; + /** + * Recursively merge overlapping events + */ + void MergeEvents(); +public: + /** + * Load node_name configuration + */ + Inputs(std::string node_name); + /** + * Generate a SimGrid platform file from the json configuration + */ + static void GeneratePlatform(std::string p); + /** + * Is there any event that remains in the queue ? + */ + bool ShouldContinue(){return wake_ts.size()!=0;} + /** + * Is there another event to process ? + */ + bool HasNext(){return wake_ts.size()>1 ;} + /** + * Get current event timestamp + */ + double GetTS(){return wake_ts.front();} + /** + * Get current event duration + */ + double GetDuration(){return wake_duration.front();} + /** + * Get next event timestamp + */ + double GetNextTS(); + /** + * Get next event duration + */ + double GetNextDuration(); + /** + * Time travel machine (note that this function is following the second principle + * of thermodynamics) + */ + void GotoNextEvent(); + /** + * Allows to add a *FUTURE* event and merge overlapping events + */ + void AddEvent(double ts, double duration); + /** + * This is the timeline + */ + void DumpEvents(); + + /// @brief These are public attributes, please take care they are fragile + bool is_sender; + bool use_hint; + bool extended; + int data_size; + int hint_size; + int seed; + int n_nodes; + double latency; +};
\ No newline at end of file diff --git a/simulations/src/scenarios.cc b/simulations/src/scenarios.cc new file mode 100644 index 0000000..90a7849 --- /dev/null +++ b/simulations/src/scenarios.cc @@ -0,0 +1,100 @@ +#include <cstdlib> +#include <cstring> +#include <rapidjson/document.h> +#include <rapidjson/filewritestream.h> +#include <rapidjson/prettywriter.h> + +#include <cstdio> +#include <iostream> +#include <time.h> +#include <sstream> + + +#define RAND(min,max) (rand()%((max)-(min)+1)+(min)) + +using namespace std; +using namespace rapidjson; + +int main(int argc, char **argv){ + // Setup seed + if(argc!=16){ + cerr << "Usage: " << argv[0] << + " <seed> <simtime> <wakeupevery> <wakeupfor> <n_nodes>" << + " <extended> <hint> <poff> <pon> <prx> <ptx> <datasize> <bitrate> <hintsize> <latency>" << + endl; + exit(1); + } + + // Init parameters + int seed=atoi(argv[1]); + double simtime=stod(argv[2]); + unsigned int wakeupevery=atoi(argv[3]); + unsigned int wakeupfor=stoi(argv[4]); + unsigned int n_nodes=atoi(argv[5]); + bool extended=!strcmp("true",argv[6]); + bool hint=!strcmp("true",argv[7]); + double poff=stod(argv[8]); + double pon=stod(argv[9]); + double prx=stod(argv[10]); + double ptx=stod(argv[11]); + unsigned int datasize=atoi(argv[12]); + string bitrate(argv[13]); + unsigned int hintsize=atoi(argv[14]); + double latency=stod(argv[15]); + + + // Setup seed + srand(seed); + + // Create document + Document d; + d.SetObject(); + d.AddMember("seed",Value().SetInt(seed),d.GetAllocator()); + Value bitrateValue; + bitrateValue.SetString(bitrate.c_str(),bitrate.size(),d.GetAllocator()); + d.AddMember("bitrate",bitrateValue,d.GetAllocator()); + d.AddMember("latency",latency,d.GetAllocator()); + d.AddMember("extended",extended,d.GetAllocator()); + d.AddMember("hint_size",hintsize,d.GetAllocator()); + + // Create nodes + Value nodes(kObjectType); + for(int i=0;i<n_nodes;i++){ + Value node(kObjectType); + node.SetObject(); + node.AddMember("use_hint",hint,d.GetAllocator()); + node.AddMember("power_off",poff,d.GetAllocator()); + node.AddMember("power_on",pon,d.GetAllocator()); + node.AddMember("power_rx",prx,d.GetAllocator()); + node.AddMember("power_tx",ptx,d.GetAllocator()); + node.AddMember("is_sender",i==0,d.GetAllocator()); + node.AddMember("data_size",datasize,d.GetAllocator()); + + // Setup ts and durations + Value ts(kArrayType); + Value duration(kArrayType); + for(unsigned int i=0;i<simtime;i+=wakeupevery){ + ts.PushBack(Value().SetDouble(RAND(i,i+wakeupevery-wakeupfor)),d.GetAllocator()); + duration.PushBack(Value().SetDouble(wakeupfor),d.GetAllocator()); + } + node.AddMember("wake_ts",ts,d.GetAllocator()); + node.AddMember("wake_duration",duration,d.GetAllocator()); + + + // Add node to nodes + std::ostringstream ss; + ss<< "on" <<i; + Value key(ss.str().c_str(), d.GetAllocator()); + nodes.AddMember(key,node,d.GetAllocator()); + } + d.AddMember("nodes",nodes,d.GetAllocator()); + + + // Write to stdout + StringBuffer buffer; + PrettyWriter<StringBuffer> writer(buffer); + d.Accept(writer); + cout << buffer.GetString(); + + return 0; +}
\ No newline at end of file diff --git a/simulations/src/simulator.cc b/simulations/src/simulator.cc new file mode 100644 index 0000000..0215b32 --- /dev/null +++ b/simulations/src/simulator.cc @@ -0,0 +1,354 @@ +#include <simgrid/s4u.hpp> +#include <simgrid/s4u/Mailbox.hpp> +#include <simgrid/s4u/Host.hpp> +#include <simgrid/plugins/energy.h> +#include <xbt/log.h> + +#include <string> +#include <sstream> + +#include "Inputs.hpp" +#include "simgrid/s4u/Actor.hpp" + + +#define PLATFORM_FILE "platform.xml" +#define MODE_OFF() simgrid::s4u::this_actor::get_host()->set_pstate(0); +#define MODE_ON() simgrid::s4u::this_actor::get_host()->set_pstate(1); +#define MODE_RX() simgrid::s4u::this_actor::get_host()->set_pstate(2); +#define MODE_TX() simgrid::s4u::this_actor::get_host()->set_pstate(3); +#define CLOCK (simgrid::s4u::Engine::get_clock()) +#define CNAME (selfName.c_str()) +#define FOR(t) (t<uptime?t:uptime) +#define ADD_EVENT(HINT) \ + { \ + XBT_INFO("%s add a new hint at %f for a duration of %f",CNAME,HINT->hint,HINT->duration); \ + i.AddEvent(HINT->hint, HINT->duration); \ + } +#define TRACK_UPTIME(instruction) \ + { \ + instruction; \ + uptime=upuntil-CLOCK; \ + uptime=uptime > 0 ? uptime : 0; \ + } +/// @brief Note that we need to simulate latency our self since we need to send instantaneous messages +#define SEND(instruction) \ + { \ + TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(i.latency > uptime ? uptime : i.latency)); \ + instruction; \ + } + +#define FORWARD_HINT(TRY_FORWARD_DURING) \ + { \ + if(hint_forward!=NULL && CLOCK<hint_forward->hint){ \ + hint_forward->HisForward=true; \ + hint_forward->DedicatedMailbox="hint_forward"; \ + try { \ + XBT_INFO("%s try to forward a hint",CNAME); \ + TRACK_UPTIME(m->put(hint_forward,0,TRY_FORWARD_DURING)); \ + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(hint_forward->DedicatedMailbox); \ + MODE_TX(); \ + SEND(m_ded->put(hint_forward,0,uptime)); \ + XBT_INFO("%s forward a hint successfully",CNAME); \ + } \ + catch(...){ \ + XBT_INFO("%s fail to forward a hint",CNAME); \ + MODE_ON(); \ + uptime=upuntil-CLOCK; \ + TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(FOR(TRY_FORWARD_DURING))); \ + } \ + } \ + } + + +/// @brief Required by SimGrid +XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO] Loosely Coupled DSS"); + +/// @brief For convenience sake +typedef unsigned int u32; + +/** + * Data that will be exchange between the nodes + */ +class Payload{ +public: + Payload():hint(0),duration(0),HasHint(false),HisForward(false),HasData(false),DataSize(0),Abort(false){} + Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox),HisForward(p.HisForward),HasData(p.HasData),DataSize(p.DataSize),Abort(p.Abort){} + double hint; // The timestamp that should be used by the receiver + double duration; // The duration that should be used by the receiver + bool HasHint; + bool HasData; // This way observer could check if they want to receive data (maybe they already received data) + bool HisForward; + bool Abort; // Allow the receiver to abort a communication (if they already received the data for example) and unlock the sender + u32 DataSize; + std::string DedicatedMailbox; // Dedicated mailbox used by the sender/receiver +}; + +/// @brief Observation node code +static void obs_node(std::vector<std::string> args); + +/** + * No arguments are require (cf inputs.json) + */ +int main(int argc, char **argv) { + + // Build engine + sg_host_energy_plugin_init(); + simgrid::s4u::Engine engine(&argc, argv); + Inputs::GeneratePlatform(PLATFORM_FILE); + engine.load_platform(PLATFORM_FILE); + + // Headline + XBT_INFO("-------------------------------------------------"); + XBT_INFO("Sarting loosely coupled data dissemination experiments"); + XBT_INFO("-------------------------------------------------"); + + // Init all nodes actors + u32 nON=simgrid::s4u::Engine::get_instance()->get_host_count(); + for(u32 i=0;i<nON;i++){ + std::vector<std::string> args; // No args + std::ostringstream ss; + ss<< "on" <<i; + simgrid::s4u::Actor::create("ON", simgrid::s4u::Host::by_name(ss.str()), obs_node, args); + } + + // Launch the simulation + engine.run(); + XBT_INFO("Simulation took %fs", simgrid::s4u::Engine::get_clock()); + XBT_INFO("The simulated platform file is available in \"%s\"",PLATFORM_FILE); + return (0); +} + +/** + * This is the brain behind each node + */ +static void obs_node(std::vector<std::string> args) { + // Init various variables + std::string selfName = simgrid::s4u::this_actor::get_host()->get_name(); + simgrid::s4u::this_actor::get_host()->turn_on(); + Inputs i=(selfName); // Load node input parameters from the json file + simgrid::s4u::Mailbox *m = simgrid::s4u::Mailbox::by_name("medium"); + XBT_INFO("Deploying observation node %s",CNAME); + + // Starting node + u32 nWakeUp=0; + u32 nDataRcv=0; + u32 nSendFail=0; + u32 nRcvFail=0; + u32 nSend=0; + u32 hint_added=0; + double totalUptime=0; + Payload *hint_forward=NULL; // Contains the hint to forward + bool is_sender=i.is_sender; // This variable might change if all receiver have received the data + bool isObserver=false; + double timeDataRcv=-1; + while(i.ShouldContinue()){ + // Start by sleeping + XBT_INFO("%s is sleeping",CNAME); + MODE_OFF(); + simgrid::s4u::this_actor::sleep_until(i.GetTS()); + MODE_ON(); + XBT_INFO("%s wakes up",CNAME); + + // Doing wake up stuff + double uptime=i.GetDuration(); // Store the remaining wake up duration (updated during the node uptime) + double upsince=CLOCK; // Store the time at which the node woke up + double upuntil=i.GetTS()+i.GetDuration(); // Store the time at which the node should sleep + bool forward_mode=false; // Turned on and off every x seconds by the receiver (to switch between forward hint mode and receiving data mode) + bool forward_only=false; // When observer receive a hint it switch to forward only up to the next wake up time + bool sendhint_mode=false; // Turned on and off every x seconds by the sender (to switch between send hint and send data) + while(CLOCK < upuntil) + { + // ---------- SENDER ---------- + if(is_sender){ + // Send hint if send hint mode is enable + if(i.use_hint && sendhint_mode && i.HasNext()){ + Payload *p=new Payload(); + p->DedicatedMailbox="hintmailbox"+selfName; // Use a dedicated mailbox + p->HasHint=true; + p->duration=i.GetNextDuration(); + p->hint=i.GetNextTS(); + p->DataSize=i.hint_size; + try { + TRACK_UPTIME(m->put(p,0,FOR(0.3))); // Init connection with a receiver + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + MODE_TX(); + SEND(m_ded->put(p,p->DataSize,uptime)); // Send the actual hint + MODE_ON(); + XBT_INFO("%s sent a hint successfully",CNAME); + } + catch(...){} + } + // Send data if send hint mode is disable + else{ + Payload *p=new Payload(); + p->DedicatedMailbox="datamailbox"+selfName; + p->HasData=true; + p->HasHint=false; + p->DataSize=i.data_size; + // Add hint to the data if possible + if(i.use_hint && i.HasNext()){ + p->HasHint=true; + p->duration=i.GetNextDuration(); + p->hint=i.GetNextTS(); + p->DataSize+=i.hint_size; // Don't forget!! + } + // Send the data + try { + TRACK_UPTIME(m->put(p,0,FOR(1))); + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + Payload *ack=m_ded->get<Payload>(); + if(!ack->Abort){ + MODE_TX(); + XBT_INFO("%s try a send",CNAME); + if(i.extended){ + SEND(m_ded->put(p,p->DataSize)); + } + else{ + SEND(m_ded->put(p,p->DataSize,uptime)); + } + XBT_INFO("%s sent data successfully",CNAME); + nSend++; + is_sender=(nSend<(i.n_nodes-1)); // Stop sending if all nodes received + isObserver=!is_sender; // Switch to observer mode if all nodes received the data + } + else { + simgrid::s4u::this_actor::sleep_for(FOR(1)); + } + } + catch(...){} + } + sendhint_mode=!sendhint_mode; // Switch back and forth between sending hint and data + MODE_ON(); + } + // ---------- RECEIVER ---------- + else if(!isObserver){ + // Forward hint mode + if(forward_mode){ + if(i.use_hint && hint_forward!=NULL && CLOCK < hint_forward->hint){ + try { + FORWARD_HINT(FOR(0.3)); // Try forward for 0.3 seconds then switch to received mode + } + catch(...){} + } + } + else { // Receiving mode + Payload *p; // Received data + try { + // Get the instantaneous message + do { + TRACK_UPTIME(p=m->get<Payload>(FOR(1))); + if(p->HisForward){ + if(hint_forward==NULL || (hint_forward !=NULL && p->hint>hint_forward->hint)){ + simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + MODE_RX(); + TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); + MODE_ON(); + XBT_INFO("%s received a forwarded hint successfully",CNAME); + if(CLOCK < p->hint){ + ADD_EVENT(p); + hint_forward=new Payload(*p); + hint_added++; + } + } + } + } while(p->HisForward); + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + // Start receiving data + MODE_RX(); + if(p->HasHint && !p->HasData){ + TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); + XBT_INFO("%s received a hint successfully",CNAME); + hint_forward=new Payload(*p); // Enable hint forwarding + if(CLOCK < p->hint){ + ADD_EVENT(p); + hint_forward=new Payload(*p); + hint_added++; + } + } + else { + // Inform the sender that we do not want to abort + Payload *ack=new Payload(); + ack->Abort=false; + m_ded->put(ack,0); // Instantaneous msg + + if(i.extended){ + p=m_ded->get<Payload>(); // Fetch data until sended + } + else{ + TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); // Fetch data until sended or uptime expire + } + // If we reach here, data has been received successfully + XBT_INFO("%s received data successfully",CNAME); + timeDataRcv=CLOCK; + if(p->HasHint){ + XBT_INFO("%s received a hint along with data successfully",CNAME); + hint_forward=new Payload(*p); // Enable hint forwarding + } + nDataRcv++; + isObserver=true; + is_sender=false; + } + + }catch(...){ + XBT_INFO("%s could not receive any data",CNAME); + nRcvFail++; + } + } + forward_mode=!forward_mode; // Toggle mode (go back and forth between receiving and forwarding) + } + // ---------- OBSERVER ---------- + else { + XBT_INFO("%s is observing his environment...",CNAME); + MODE_ON(); + // If use hint we should listen for the sender + if(i.use_hint){ + if((forward_mode|forward_only) && hint_forward!=NULL && CLOCK < hint_forward->hint){ + FORWARD_HINT(FOR(1)); + } + else { + Payload *p; + try { + do { + TRACK_UPTIME(p=m->get<Payload>(FOR(1))); + } while(p->HisForward); // Ignore forwarded hint + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + // Start receiving hint from sender + if(p->HasData){ + Payload *ack=new Payload(); + ack->Abort=true; + m_ded->put(ack,0); + simgrid::s4u::this_actor::sleep_for(FOR(1)); + } + else if(p->HasHint){ + MODE_RX(); + TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); + XBT_INFO("%s received a hint successfully",CNAME); + hint_forward=new Payload(*p); // Enable hint forwarding + forward_only=true; + } + else { + simgrid::s4u::this_actor::sleep_for(FOR(1)); + } + } + catch(...){ + } + + } + forward_mode=!forward_mode; + } + else { + simgrid::s4u::this_actor::sleep_until(upuntil); + } + } + uptime=upuntil-CLOCK; // Note that uptime can be < 0 in extended mode + uptime=uptime > 0 ? uptime : 0; // Just in case + } + // Load next event + i.GotoNextEvent(); + nWakeUp++; // Increase the number of wake up + totalUptime+=CLOCK-upsince; // Synchronize total uptime + } + // Done + MODE_OFF() + XBT_INFO("Observation node %s finished [LOG2PARSE](node:%s|isSender:%d|nSend:%d|nWakeUp:%d|nDataRcv:%d|nSendFail:%d|nRcvFail:%d|totalUptime:%f|seed:%d|hint_added:%d|timeDataRcv:%f)",CNAME,CNAME,i.is_sender,nSend,nWakeUp,nDataRcv,nSendFail,nRcvFail,totalUptime,i.seed,hint_added,timeDataRcv); +}
\ No newline at end of file |
