diff options
| author | Loic Guegan <manzerbredes@mailbox.org> | 2022-11-11 15:47:19 +0100 |
|---|---|---|
| committer | Loic Guegan <manzerbredes@mailbox.org> | 2022-11-11 15:47:19 +0100 |
| commit | c2affb00ff404613f45b51cd97b50773982fde5f (patch) | |
| tree | 9a1263afec087c958b32d2ad48e691fc69db0df6 /simulations/src/simulator.cc | |
| parent | b2ad7e6897077899ce70ecc8a4d994b3adc010ae (diff) | |
Minor changes
Diffstat (limited to 'simulations/src/simulator.cc')
| -rw-r--r-- | simulations/src/simulator.cc | 354 |
1 files changed, 354 insertions, 0 deletions
diff --git a/simulations/src/simulator.cc b/simulations/src/simulator.cc new file mode 100644 index 0000000..0215b32 --- /dev/null +++ b/simulations/src/simulator.cc @@ -0,0 +1,354 @@ +#include <simgrid/s4u.hpp> +#include <simgrid/s4u/Mailbox.hpp> +#include <simgrid/s4u/Host.hpp> +#include <simgrid/plugins/energy.h> +#include <xbt/log.h> + +#include <string> +#include <sstream> + +#include "Inputs.hpp" +#include "simgrid/s4u/Actor.hpp" + + +#define PLATFORM_FILE "platform.xml" +#define MODE_OFF() simgrid::s4u::this_actor::get_host()->set_pstate(0); +#define MODE_ON() simgrid::s4u::this_actor::get_host()->set_pstate(1); +#define MODE_RX() simgrid::s4u::this_actor::get_host()->set_pstate(2); +#define MODE_TX() simgrid::s4u::this_actor::get_host()->set_pstate(3); +#define CLOCK (simgrid::s4u::Engine::get_clock()) +#define CNAME (selfName.c_str()) +#define FOR(t) (t<uptime?t:uptime) +#define ADD_EVENT(HINT) \ + { \ + XBT_INFO("%s add a new hint at %f for a duration of %f",CNAME,HINT->hint,HINT->duration); \ + i.AddEvent(HINT->hint, HINT->duration); \ + } +#define TRACK_UPTIME(instruction) \ + { \ + instruction; \ + uptime=upuntil-CLOCK; \ + uptime=uptime > 0 ? uptime : 0; \ + } +/// @brief Note that we need to simulate latency our self since we need to send instantaneous messages +#define SEND(instruction) \ + { \ + TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(i.latency > uptime ? uptime : i.latency)); \ + instruction; \ + } + +#define FORWARD_HINT(TRY_FORWARD_DURING) \ + { \ + if(hint_forward!=NULL && CLOCK<hint_forward->hint){ \ + hint_forward->HisForward=true; \ + hint_forward->DedicatedMailbox="hint_forward"; \ + try { \ + XBT_INFO("%s try to forward a hint",CNAME); \ + TRACK_UPTIME(m->put(hint_forward,0,TRY_FORWARD_DURING)); \ + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(hint_forward->DedicatedMailbox); \ + MODE_TX(); \ + SEND(m_ded->put(hint_forward,0,uptime)); \ + XBT_INFO("%s forward a hint successfully",CNAME); \ + } \ + catch(...){ \ + XBT_INFO("%s fail to forward a hint",CNAME); \ + MODE_ON(); \ + uptime=upuntil-CLOCK; \ + TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(FOR(TRY_FORWARD_DURING))); \ + } \ + } \ + } + + +/// @brief Required by SimGrid +XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO] Loosely Coupled DSS"); + +/// @brief For convenience sake +typedef unsigned int u32; + +/** + * Data that will be exchange between the nodes + */ +class Payload{ +public: + Payload():hint(0),duration(0),HasHint(false),HisForward(false),HasData(false),DataSize(0),Abort(false){} + Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox),HisForward(p.HisForward),HasData(p.HasData),DataSize(p.DataSize),Abort(p.Abort){} + double hint; // The timestamp that should be used by the receiver + double duration; // The duration that should be used by the receiver + bool HasHint; + bool HasData; // This way observer could check if they want to receive data (maybe they already received data) + bool HisForward; + bool Abort; // Allow the receiver to abort a communication (if they already received the data for example) and unlock the sender + u32 DataSize; + std::string DedicatedMailbox; // Dedicated mailbox used by the sender/receiver +}; + +/// @brief Observation node code +static void obs_node(std::vector<std::string> args); + +/** + * No arguments are require (cf inputs.json) + */ +int main(int argc, char **argv) { + + // Build engine + sg_host_energy_plugin_init(); + simgrid::s4u::Engine engine(&argc, argv); + Inputs::GeneratePlatform(PLATFORM_FILE); + engine.load_platform(PLATFORM_FILE); + + // Headline + XBT_INFO("-------------------------------------------------"); + XBT_INFO("Sarting loosely coupled data dissemination experiments"); + XBT_INFO("-------------------------------------------------"); + + // Init all nodes actors + u32 nON=simgrid::s4u::Engine::get_instance()->get_host_count(); + for(u32 i=0;i<nON;i++){ + std::vector<std::string> args; // No args + std::ostringstream ss; + ss<< "on" <<i; + simgrid::s4u::Actor::create("ON", simgrid::s4u::Host::by_name(ss.str()), obs_node, args); + } + + // Launch the simulation + engine.run(); + XBT_INFO("Simulation took %fs", simgrid::s4u::Engine::get_clock()); + XBT_INFO("The simulated platform file is available in \"%s\"",PLATFORM_FILE); + return (0); +} + +/** + * This is the brain behind each node + */ +static void obs_node(std::vector<std::string> args) { + // Init various variables + std::string selfName = simgrid::s4u::this_actor::get_host()->get_name(); + simgrid::s4u::this_actor::get_host()->turn_on(); + Inputs i=(selfName); // Load node input parameters from the json file + simgrid::s4u::Mailbox *m = simgrid::s4u::Mailbox::by_name("medium"); + XBT_INFO("Deploying observation node %s",CNAME); + + // Starting node + u32 nWakeUp=0; + u32 nDataRcv=0; + u32 nSendFail=0; + u32 nRcvFail=0; + u32 nSend=0; + u32 hint_added=0; + double totalUptime=0; + Payload *hint_forward=NULL; // Contains the hint to forward + bool is_sender=i.is_sender; // This variable might change if all receiver have received the data + bool isObserver=false; + double timeDataRcv=-1; + while(i.ShouldContinue()){ + // Start by sleeping + XBT_INFO("%s is sleeping",CNAME); + MODE_OFF(); + simgrid::s4u::this_actor::sleep_until(i.GetTS()); + MODE_ON(); + XBT_INFO("%s wakes up",CNAME); + + // Doing wake up stuff + double uptime=i.GetDuration(); // Store the remaining wake up duration (updated during the node uptime) + double upsince=CLOCK; // Store the time at which the node woke up + double upuntil=i.GetTS()+i.GetDuration(); // Store the time at which the node should sleep + bool forward_mode=false; // Turned on and off every x seconds by the receiver (to switch between forward hint mode and receiving data mode) + bool forward_only=false; // When observer receive a hint it switch to forward only up to the next wake up time + bool sendhint_mode=false; // Turned on and off every x seconds by the sender (to switch between send hint and send data) + while(CLOCK < upuntil) + { + // ---------- SENDER ---------- + if(is_sender){ + // Send hint if send hint mode is enable + if(i.use_hint && sendhint_mode && i.HasNext()){ + Payload *p=new Payload(); + p->DedicatedMailbox="hintmailbox"+selfName; // Use a dedicated mailbox + p->HasHint=true; + p->duration=i.GetNextDuration(); + p->hint=i.GetNextTS(); + p->DataSize=i.hint_size; + try { + TRACK_UPTIME(m->put(p,0,FOR(0.3))); // Init connection with a receiver + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + MODE_TX(); + SEND(m_ded->put(p,p->DataSize,uptime)); // Send the actual hint + MODE_ON(); + XBT_INFO("%s sent a hint successfully",CNAME); + } + catch(...){} + } + // Send data if send hint mode is disable + else{ + Payload *p=new Payload(); + p->DedicatedMailbox="datamailbox"+selfName; + p->HasData=true; + p->HasHint=false; + p->DataSize=i.data_size; + // Add hint to the data if possible + if(i.use_hint && i.HasNext()){ + p->HasHint=true; + p->duration=i.GetNextDuration(); + p->hint=i.GetNextTS(); + p->DataSize+=i.hint_size; // Don't forget!! + } + // Send the data + try { + TRACK_UPTIME(m->put(p,0,FOR(1))); + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + Payload *ack=m_ded->get<Payload>(); + if(!ack->Abort){ + MODE_TX(); + XBT_INFO("%s try a send",CNAME); + if(i.extended){ + SEND(m_ded->put(p,p->DataSize)); + } + else{ + SEND(m_ded->put(p,p->DataSize,uptime)); + } + XBT_INFO("%s sent data successfully",CNAME); + nSend++; + is_sender=(nSend<(i.n_nodes-1)); // Stop sending if all nodes received + isObserver=!is_sender; // Switch to observer mode if all nodes received the data + } + else { + simgrid::s4u::this_actor::sleep_for(FOR(1)); + } + } + catch(...){} + } + sendhint_mode=!sendhint_mode; // Switch back and forth between sending hint and data + MODE_ON(); + } + // ---------- RECEIVER ---------- + else if(!isObserver){ + // Forward hint mode + if(forward_mode){ + if(i.use_hint && hint_forward!=NULL && CLOCK < hint_forward->hint){ + try { + FORWARD_HINT(FOR(0.3)); // Try forward for 0.3 seconds then switch to received mode + } + catch(...){} + } + } + else { // Receiving mode + Payload *p; // Received data + try { + // Get the instantaneous message + do { + TRACK_UPTIME(p=m->get<Payload>(FOR(1))); + if(p->HisForward){ + if(hint_forward==NULL || (hint_forward !=NULL && p->hint>hint_forward->hint)){ + simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + MODE_RX(); + TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); + MODE_ON(); + XBT_INFO("%s received a forwarded hint successfully",CNAME); + if(CLOCK < p->hint){ + ADD_EVENT(p); + hint_forward=new Payload(*p); + hint_added++; + } + } + } + } while(p->HisForward); + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + // Start receiving data + MODE_RX(); + if(p->HasHint && !p->HasData){ + TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); + XBT_INFO("%s received a hint successfully",CNAME); + hint_forward=new Payload(*p); // Enable hint forwarding + if(CLOCK < p->hint){ + ADD_EVENT(p); + hint_forward=new Payload(*p); + hint_added++; + } + } + else { + // Inform the sender that we do not want to abort + Payload *ack=new Payload(); + ack->Abort=false; + m_ded->put(ack,0); // Instantaneous msg + + if(i.extended){ + p=m_ded->get<Payload>(); // Fetch data until sended + } + else{ + TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); // Fetch data until sended or uptime expire + } + // If we reach here, data has been received successfully + XBT_INFO("%s received data successfully",CNAME); + timeDataRcv=CLOCK; + if(p->HasHint){ + XBT_INFO("%s received a hint along with data successfully",CNAME); + hint_forward=new Payload(*p); // Enable hint forwarding + } + nDataRcv++; + isObserver=true; + is_sender=false; + } + + }catch(...){ + XBT_INFO("%s could not receive any data",CNAME); + nRcvFail++; + } + } + forward_mode=!forward_mode; // Toggle mode (go back and forth between receiving and forwarding) + } + // ---------- OBSERVER ---------- + else { + XBT_INFO("%s is observing his environment...",CNAME); + MODE_ON(); + // If use hint we should listen for the sender + if(i.use_hint){ + if((forward_mode|forward_only) && hint_forward!=NULL && CLOCK < hint_forward->hint){ + FORWARD_HINT(FOR(1)); + } + else { + Payload *p; + try { + do { + TRACK_UPTIME(p=m->get<Payload>(FOR(1))); + } while(p->HisForward); // Ignore forwarded hint + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + // Start receiving hint from sender + if(p->HasData){ + Payload *ack=new Payload(); + ack->Abort=true; + m_ded->put(ack,0); + simgrid::s4u::this_actor::sleep_for(FOR(1)); + } + else if(p->HasHint){ + MODE_RX(); + TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); + XBT_INFO("%s received a hint successfully",CNAME); + hint_forward=new Payload(*p); // Enable hint forwarding + forward_only=true; + } + else { + simgrid::s4u::this_actor::sleep_for(FOR(1)); + } + } + catch(...){ + } + + } + forward_mode=!forward_mode; + } + else { + simgrid::s4u::this_actor::sleep_until(upuntil); + } + } + uptime=upuntil-CLOCK; // Note that uptime can be < 0 in extended mode + uptime=uptime > 0 ? uptime : 0; // Just in case + } + // Load next event + i.GotoNextEvent(); + nWakeUp++; // Increase the number of wake up + totalUptime+=CLOCK-upsince; // Synchronize total uptime + } + // Done + MODE_OFF() + XBT_INFO("Observation node %s finished [LOG2PARSE](node:%s|isSender:%d|nSend:%d|nWakeUp:%d|nDataRcv:%d|nSendFail:%d|nRcvFail:%d|totalUptime:%f|seed:%d|hint_added:%d|timeDataRcv:%f)",CNAME,CNAME,i.is_sender,nSend,nWakeUp,nDataRcv,nSendFail,nRcvFail,totalUptime,i.seed,hint_added,timeDataRcv); +}
\ No newline at end of file |
