From c2affb00ff404613f45b51cd97b50773982fde5f Mon Sep 17 00:00:00 2001 From: Loic Guegan Date: Fri, 11 Nov 2022 15:47:19 +0100 Subject: Minor changes --- simulations/src/Inputs.cc | 184 ++++++++++++++++++++++ simulations/src/Inputs.hpp | 84 ++++++++++ simulations/src/scenarios.cc | 100 ++++++++++++ simulations/src/simulator.cc | 354 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 722 insertions(+) create mode 100644 simulations/src/Inputs.cc create mode 100644 simulations/src/Inputs.hpp create mode 100644 simulations/src/scenarios.cc create mode 100644 simulations/src/simulator.cc (limited to 'simulations/src') diff --git a/simulations/src/Inputs.cc b/simulations/src/Inputs.cc new file mode 100644 index 0000000..8c3ac9c --- /dev/null +++ b/simulations/src/Inputs.cc @@ -0,0 +1,184 @@ +#include "Inputs.hpp" + +#include +#include +#include + +Inputs::Inputs(std::string node_name){ + // Here we do all the boring stuff + FILE* input_file = fopen(INPUTS_FILE, "rb"); + char input_file_buffer[JSON_BUFFER_SIZE]; + rapidjson::FileReadStream is(input_file, input_file_buffer, sizeof(input_file_buffer)); + d.ParseStream(is); + fclose(input_file); + + // Init all variables + is_sender=d["nodes"][node_name.c_str()]["is_sender"].GetBool(); + use_hint=d["nodes"][node_name.c_str()]["use_hint"].GetBool(); + data_size=d["nodes"][node_name.c_str()]["data_size"].GetInt(); + extended=d["extended"].GetBool(); + seed=d["seed"].GetInt(); + hint_size=d["hint_size"].GetInt(); + n_nodes=d["nodes"].MemberCount(); + latency=d["latency"].GetDouble(); + + // Instantiate wake_ts + for(auto& v:d["nodes"][node_name.c_str()]["wake_ts"].GetArray()){ + wake_ts.push_back(v.GetDouble()); + } + + // Instantiate wake_duration + for(auto& v:d["nodes"][node_name.c_str()]["wake_duration"].GetArray()){ + wake_duration.push_back(v.GetDouble()); + } + + // Identity check + if(wake_ts.size()<1){ + std::cerr << "Invalid node configuration: wake_ts.size() == 0" <=next_ts){ + // Create variable for convenience + double start=cur_ts; + double end=std::max(cur_ts+cur_duration,next_ts+next_duration); + wake_duration[i]=end-start; + // Now remove next event + wake_ts.erase(wake_ts.begin() + i + 1); + wake_duration.erase(wake_duration.begin() + i +1); + // This is not optimal. Yet it is simple :D + MergeEvents(); + } + } +} + +double Inputs::GetNextTS(){ + // Ensure that the caller is smart + if(wake_duration.size()<2){ + std::cerr << "You are trying to access to the next timestamp but it does not exists" <=ts){ + wake_ts.insert(it,ts); + break; + } + pos++; + } + + // Ensure that ts and duration should not go to the end + if(pos==wake_ts.size()){ + wake_ts.push_back(ts); + wake_duration.push_back(duration); + } + else { + // Handle durations here + int pos2=0; + for(auto it = std::begin(wake_duration); it != std::end(wake_duration); ++it) { + if(pos==pos2){ + wake_duration.insert(it,duration); + break; + } + else if (it+1==std::end(wake_duration)) { + wake_duration.push_back(duration); + } + pos2++; + } + } + // Don't forget + MergeEvents(); +} + +void Inputs::GeneratePlatform(std::string p){ + + // The boring stuff + FILE* input_file = fopen(INPUTS_FILE, "rb"); + char input_file_buffer[JSON_BUFFER_SIZE]; + rapidjson::FileReadStream is(input_file, input_file_buffer, sizeof(input_file_buffer)); + rapidjson::Document d; + d.ParseStream(is); + fclose(input_file); + + // Write platform file + std::ofstream pf; + pf.open (p); + pf << "\n"; + pf << "\n"; + pf << "\n \n"; + pf << " \n"; + for (Value::ConstMemberIterator itr = d["nodes"].MemberBegin(); itr != d["nodes"].MemberEnd(); ++itr) + { + std::string name=itr->name.GetString(); + double power_off=d["nodes"][itr->name.GetString()]["power_off"].GetDouble(); + double power_on=d["nodes"][itr->name.GetString()]["power_on"].GetDouble(); + double power_rx=power_on+d["nodes"][itr->name.GetString()]["power_rx"].GetDouble(); + double power_tx=power_on+d["nodes"][itr->name.GetString()]["power_tx"].GetDouble(); + + // Create node + pf << " \n"; + pf << " \n"; + pf << " \n \n"; + } + for (Value::ConstMemberIterator src = d["nodes"].MemberBegin(); src != d["nodes"].MemberEnd(); ++src) + { + for (Value::ConstMemberIterator dst = d["nodes"].MemberBegin(); dst != d["nodes"].MemberEnd(); ++dst) + { + if(src->name.GetString() != dst->name.GetString()) + pf << " name.GetString()<<"\" dst=\""<name.GetString()<<"\" symmetrical=\"no\">\n"; + } + } + pf << " \n\n"; + pf.close(); +} \ No newline at end of file diff --git a/simulations/src/Inputs.hpp b/simulations/src/Inputs.hpp new file mode 100644 index 0000000..487686f --- /dev/null +++ b/simulations/src/Inputs.hpp @@ -0,0 +1,84 @@ +#include +#include + +#include +#include +#include +#include + +#define INPUTS_FILE "inputs.json" +/// @brief Pay attention to this strange number, you could tear your hairs out +#define JSON_BUFFER_SIZE 65536 + +using namespace rapidjson; + +class Inputs { + /// @brief RapidJSON + Document d; + /// @brief Current node associated with the Inputs + std::string node_name; + /// @brief Timestamps (at which time the nodes should wake up) + std::vector wake_ts; + /// @brief Wake up time durations + std::vector wake_duration; + /** + * Recursively merge overlapping events + */ + void MergeEvents(); +public: + /** + * Load node_name configuration + */ + Inputs(std::string node_name); + /** + * Generate a SimGrid platform file from the json configuration + */ + static void GeneratePlatform(std::string p); + /** + * Is there any event that remains in the queue ? + */ + bool ShouldContinue(){return wake_ts.size()!=0;} + /** + * Is there another event to process ? + */ + bool HasNext(){return wake_ts.size()>1 ;} + /** + * Get current event timestamp + */ + double GetTS(){return wake_ts.front();} + /** + * Get current event duration + */ + double GetDuration(){return wake_duration.front();} + /** + * Get next event timestamp + */ + double GetNextTS(); + /** + * Get next event duration + */ + double GetNextDuration(); + /** + * Time travel machine (note that this function is following the second principle + * of thermodynamics) + */ + void GotoNextEvent(); + /** + * Allows to add a *FUTURE* event and merge overlapping events + */ + void AddEvent(double ts, double duration); + /** + * This is the timeline + */ + void DumpEvents(); + + /// @brief These are public attributes, please take care they are fragile + bool is_sender; + bool use_hint; + bool extended; + int data_size; + int hint_size; + int seed; + int n_nodes; + double latency; +}; \ No newline at end of file diff --git a/simulations/src/scenarios.cc b/simulations/src/scenarios.cc new file mode 100644 index 0000000..90a7849 --- /dev/null +++ b/simulations/src/scenarios.cc @@ -0,0 +1,100 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include + + +#define RAND(min,max) (rand()%((max)-(min)+1)+(min)) + +using namespace std; +using namespace rapidjson; + +int main(int argc, char **argv){ + // Setup seed + if(argc!=16){ + cerr << "Usage: " << argv[0] << + " " << + " " << + endl; + exit(1); + } + + // Init parameters + int seed=atoi(argv[1]); + double simtime=stod(argv[2]); + unsigned int wakeupevery=atoi(argv[3]); + unsigned int wakeupfor=stoi(argv[4]); + unsigned int n_nodes=atoi(argv[5]); + bool extended=!strcmp("true",argv[6]); + bool hint=!strcmp("true",argv[7]); + double poff=stod(argv[8]); + double pon=stod(argv[9]); + double prx=stod(argv[10]); + double ptx=stod(argv[11]); + unsigned int datasize=atoi(argv[12]); + string bitrate(argv[13]); + unsigned int hintsize=atoi(argv[14]); + double latency=stod(argv[15]); + + + // Setup seed + srand(seed); + + // Create document + Document d; + d.SetObject(); + d.AddMember("seed",Value().SetInt(seed),d.GetAllocator()); + Value bitrateValue; + bitrateValue.SetString(bitrate.c_str(),bitrate.size(),d.GetAllocator()); + d.AddMember("bitrate",bitrateValue,d.GetAllocator()); + d.AddMember("latency",latency,d.GetAllocator()); + d.AddMember("extended",extended,d.GetAllocator()); + d.AddMember("hint_size",hintsize,d.GetAllocator()); + + // Create nodes + Value nodes(kObjectType); + for(int i=0;i writer(buffer); + d.Accept(writer); + cout << buffer.GetString(); + + return 0; +} \ No newline at end of file diff --git a/simulations/src/simulator.cc b/simulations/src/simulator.cc new file mode 100644 index 0000000..0215b32 --- /dev/null +++ b/simulations/src/simulator.cc @@ -0,0 +1,354 @@ +#include +#include +#include +#include +#include + +#include +#include + +#include "Inputs.hpp" +#include "simgrid/s4u/Actor.hpp" + + +#define PLATFORM_FILE "platform.xml" +#define MODE_OFF() simgrid::s4u::this_actor::get_host()->set_pstate(0); +#define MODE_ON() simgrid::s4u::this_actor::get_host()->set_pstate(1); +#define MODE_RX() simgrid::s4u::this_actor::get_host()->set_pstate(2); +#define MODE_TX() simgrid::s4u::this_actor::get_host()->set_pstate(3); +#define CLOCK (simgrid::s4u::Engine::get_clock()) +#define CNAME (selfName.c_str()) +#define FOR(t) (thint,HINT->duration); \ + i.AddEvent(HINT->hint, HINT->duration); \ + } +#define TRACK_UPTIME(instruction) \ + { \ + instruction; \ + uptime=upuntil-CLOCK; \ + uptime=uptime > 0 ? uptime : 0; \ + } +/// @brief Note that we need to simulate latency our self since we need to send instantaneous messages +#define SEND(instruction) \ + { \ + TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(i.latency > uptime ? uptime : i.latency)); \ + instruction; \ + } + +#define FORWARD_HINT(TRY_FORWARD_DURING) \ + { \ + if(hint_forward!=NULL && CLOCKhint){ \ + hint_forward->HisForward=true; \ + hint_forward->DedicatedMailbox="hint_forward"; \ + try { \ + XBT_INFO("%s try to forward a hint",CNAME); \ + TRACK_UPTIME(m->put(hint_forward,0,TRY_FORWARD_DURING)); \ + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(hint_forward->DedicatedMailbox); \ + MODE_TX(); \ + SEND(m_ded->put(hint_forward,0,uptime)); \ + XBT_INFO("%s forward a hint successfully",CNAME); \ + } \ + catch(...){ \ + XBT_INFO("%s fail to forward a hint",CNAME); \ + MODE_ON(); \ + uptime=upuntil-CLOCK; \ + TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(FOR(TRY_FORWARD_DURING))); \ + } \ + } \ + } + + +/// @brief Required by SimGrid +XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO] Loosely Coupled DSS"); + +/// @brief For convenience sake +typedef unsigned int u32; + +/** + * Data that will be exchange between the nodes + */ +class Payload{ +public: + Payload():hint(0),duration(0),HasHint(false),HisForward(false),HasData(false),DataSize(0),Abort(false){} + Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox),HisForward(p.HisForward),HasData(p.HasData),DataSize(p.DataSize),Abort(p.Abort){} + double hint; // The timestamp that should be used by the receiver + double duration; // The duration that should be used by the receiver + bool HasHint; + bool HasData; // This way observer could check if they want to receive data (maybe they already received data) + bool HisForward; + bool Abort; // Allow the receiver to abort a communication (if they already received the data for example) and unlock the sender + u32 DataSize; + std::string DedicatedMailbox; // Dedicated mailbox used by the sender/receiver +}; + +/// @brief Observation node code +static void obs_node(std::vector args); + +/** + * No arguments are require (cf inputs.json) + */ +int main(int argc, char **argv) { + + // Build engine + sg_host_energy_plugin_init(); + simgrid::s4u::Engine engine(&argc, argv); + Inputs::GeneratePlatform(PLATFORM_FILE); + engine.load_platform(PLATFORM_FILE); + + // Headline + XBT_INFO("-------------------------------------------------"); + XBT_INFO("Sarting loosely coupled data dissemination experiments"); + XBT_INFO("-------------------------------------------------"); + + // Init all nodes actors + u32 nON=simgrid::s4u::Engine::get_instance()->get_host_count(); + for(u32 i=0;i args; // No args + std::ostringstream ss; + ss<< "on" < args) { + // Init various variables + std::string selfName = simgrid::s4u::this_actor::get_host()->get_name(); + simgrid::s4u::this_actor::get_host()->turn_on(); + Inputs i=(selfName); // Load node input parameters from the json file + simgrid::s4u::Mailbox *m = simgrid::s4u::Mailbox::by_name("medium"); + XBT_INFO("Deploying observation node %s",CNAME); + + // Starting node + u32 nWakeUp=0; + u32 nDataRcv=0; + u32 nSendFail=0; + u32 nRcvFail=0; + u32 nSend=0; + u32 hint_added=0; + double totalUptime=0; + Payload *hint_forward=NULL; // Contains the hint to forward + bool is_sender=i.is_sender; // This variable might change if all receiver have received the data + bool isObserver=false; + double timeDataRcv=-1; + while(i.ShouldContinue()){ + // Start by sleeping + XBT_INFO("%s is sleeping",CNAME); + MODE_OFF(); + simgrid::s4u::this_actor::sleep_until(i.GetTS()); + MODE_ON(); + XBT_INFO("%s wakes up",CNAME); + + // Doing wake up stuff + double uptime=i.GetDuration(); // Store the remaining wake up duration (updated during the node uptime) + double upsince=CLOCK; // Store the time at which the node woke up + double upuntil=i.GetTS()+i.GetDuration(); // Store the time at which the node should sleep + bool forward_mode=false; // Turned on and off every x seconds by the receiver (to switch between forward hint mode and receiving data mode) + bool forward_only=false; // When observer receive a hint it switch to forward only up to the next wake up time + bool sendhint_mode=false; // Turned on and off every x seconds by the sender (to switch between send hint and send data) + while(CLOCK < upuntil) + { + // ---------- SENDER ---------- + if(is_sender){ + // Send hint if send hint mode is enable + if(i.use_hint && sendhint_mode && i.HasNext()){ + Payload *p=new Payload(); + p->DedicatedMailbox="hintmailbox"+selfName; // Use a dedicated mailbox + p->HasHint=true; + p->duration=i.GetNextDuration(); + p->hint=i.GetNextTS(); + p->DataSize=i.hint_size; + try { + TRACK_UPTIME(m->put(p,0,FOR(0.3))); // Init connection with a receiver + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + MODE_TX(); + SEND(m_ded->put(p,p->DataSize,uptime)); // Send the actual hint + MODE_ON(); + XBT_INFO("%s sent a hint successfully",CNAME); + } + catch(...){} + } + // Send data if send hint mode is disable + else{ + Payload *p=new Payload(); + p->DedicatedMailbox="datamailbox"+selfName; + p->HasData=true; + p->HasHint=false; + p->DataSize=i.data_size; + // Add hint to the data if possible + if(i.use_hint && i.HasNext()){ + p->HasHint=true; + p->duration=i.GetNextDuration(); + p->hint=i.GetNextTS(); + p->DataSize+=i.hint_size; // Don't forget!! + } + // Send the data + try { + TRACK_UPTIME(m->put(p,0,FOR(1))); + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + Payload *ack=m_ded->get(); + if(!ack->Abort){ + MODE_TX(); + XBT_INFO("%s try a send",CNAME); + if(i.extended){ + SEND(m_ded->put(p,p->DataSize)); + } + else{ + SEND(m_ded->put(p,p->DataSize,uptime)); + } + XBT_INFO("%s sent data successfully",CNAME); + nSend++; + is_sender=(nSend<(i.n_nodes-1)); // Stop sending if all nodes received + isObserver=!is_sender; // Switch to observer mode if all nodes received the data + } + else { + simgrid::s4u::this_actor::sleep_for(FOR(1)); + } + } + catch(...){} + } + sendhint_mode=!sendhint_mode; // Switch back and forth between sending hint and data + MODE_ON(); + } + // ---------- RECEIVER ---------- + else if(!isObserver){ + // Forward hint mode + if(forward_mode){ + if(i.use_hint && hint_forward!=NULL && CLOCK < hint_forward->hint){ + try { + FORWARD_HINT(FOR(0.3)); // Try forward for 0.3 seconds then switch to received mode + } + catch(...){} + } + } + else { // Receiving mode + Payload *p; // Received data + try { + // Get the instantaneous message + do { + TRACK_UPTIME(p=m->get(FOR(1))); + if(p->HisForward){ + if(hint_forward==NULL || (hint_forward !=NULL && p->hint>hint_forward->hint)){ + simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + MODE_RX(); + TRACK_UPTIME(p=m_ded->get(uptime)); + MODE_ON(); + XBT_INFO("%s received a forwarded hint successfully",CNAME); + if(CLOCK < p->hint){ + ADD_EVENT(p); + hint_forward=new Payload(*p); + hint_added++; + } + } + } + } while(p->HisForward); + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + // Start receiving data + MODE_RX(); + if(p->HasHint && !p->HasData){ + TRACK_UPTIME(p=m_ded->get(uptime)); + XBT_INFO("%s received a hint successfully",CNAME); + hint_forward=new Payload(*p); // Enable hint forwarding + if(CLOCK < p->hint){ + ADD_EVENT(p); + hint_forward=new Payload(*p); + hint_added++; + } + } + else { + // Inform the sender that we do not want to abort + Payload *ack=new Payload(); + ack->Abort=false; + m_ded->put(ack,0); // Instantaneous msg + + if(i.extended){ + p=m_ded->get(); // Fetch data until sended + } + else{ + TRACK_UPTIME(p=m_ded->get(uptime)); // Fetch data until sended or uptime expire + } + // If we reach here, data has been received successfully + XBT_INFO("%s received data successfully",CNAME); + timeDataRcv=CLOCK; + if(p->HasHint){ + XBT_INFO("%s received a hint along with data successfully",CNAME); + hint_forward=new Payload(*p); // Enable hint forwarding + } + nDataRcv++; + isObserver=true; + is_sender=false; + } + + }catch(...){ + XBT_INFO("%s could not receive any data",CNAME); + nRcvFail++; + } + } + forward_mode=!forward_mode; // Toggle mode (go back and forth between receiving and forwarding) + } + // ---------- OBSERVER ---------- + else { + XBT_INFO("%s is observing his environment...",CNAME); + MODE_ON(); + // If use hint we should listen for the sender + if(i.use_hint){ + if((forward_mode|forward_only) && hint_forward!=NULL && CLOCK < hint_forward->hint){ + FORWARD_HINT(FOR(1)); + } + else { + Payload *p; + try { + do { + TRACK_UPTIME(p=m->get(FOR(1))); + } while(p->HisForward); // Ignore forwarded hint + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + // Start receiving hint from sender + if(p->HasData){ + Payload *ack=new Payload(); + ack->Abort=true; + m_ded->put(ack,0); + simgrid::s4u::this_actor::sleep_for(FOR(1)); + } + else if(p->HasHint){ + MODE_RX(); + TRACK_UPTIME(p=m_ded->get(uptime)); + XBT_INFO("%s received a hint successfully",CNAME); + hint_forward=new Payload(*p); // Enable hint forwarding + forward_only=true; + } + else { + simgrid::s4u::this_actor::sleep_for(FOR(1)); + } + } + catch(...){ + } + + } + forward_mode=!forward_mode; + } + else { + simgrid::s4u::this_actor::sleep_until(upuntil); + } + } + uptime=upuntil-CLOCK; // Note that uptime can be < 0 in extended mode + uptime=uptime > 0 ? uptime : 0; // Just in case + } + // Load next event + i.GotoNextEvent(); + nWakeUp++; // Increase the number of wake up + totalUptime+=CLOCK-upsince; // Synchronize total uptime + } + // Done + MODE_OFF() + XBT_INFO("Observation node %s finished [LOG2PARSE](node:%s|isSender:%d|nSend:%d|nWakeUp:%d|nDataRcv:%d|nSendFail:%d|nRcvFail:%d|totalUptime:%f|seed:%d|hint_added:%d|timeDataRcv:%f)",CNAME,CNAME,i.is_sender,nSend,nWakeUp,nDataRcv,nSendFail,nRcvFail,totalUptime,i.seed,hint_added,timeDataRcv); +} \ No newline at end of file -- cgit v1.2.3