aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLoic Guegan <manzerbredes@mailbox.org>2021-05-06 16:14:57 +0200
committerLoic Guegan <manzerbredes@mailbox.org>2021-05-06 16:14:57 +0200
commitd0e57511fda12afb1049e260bcccb434b186d735 (patch)
treee8e59ce4fad415f47f958398eb5a3ec0ff7de28b
parent5f7bdb8f3cdb6a1e1f9eae7e0fbab34e2d1e5070 (diff)
Switch to timestamps
-rw-r--r--inputs.json33
-rw-r--r--src/inputs.cc123
-rw-r--r--src/inputs.hpp66
-rw-r--r--src/simulator.cc51
4 files changed, 219 insertions, 54 deletions
diff --git a/inputs.json b/inputs.json
index e5b2b99..a19f821 100644
--- a/inputs.json
+++ b/inputs.json
@@ -1,24 +1,29 @@
{
"on0":{
- "wake_interval": 5,
- "is_sender": false,
- "wake_duration": 5,
- "startup_delay": 0,
- "max_attemps" : 1,
+ "is_sender": true,
"power_off": 0,
"power_on":10,
- "use_hint": true,
- "data_size": 10
+ "use_hint": false,
+ "wake_ts": [ 1, 7, 7 ],
+ "wake_duration": [ 5, 1, 2],
+ "data_size": 50
},
"on1":{
- "wake_interval": 5,
- "is_sender": true,
- "wake_duration": 5,
- "startup_delay": 0,
- "max_attemps" : 2,
+ "is_sender": false,
+ "power_off": 0,
+ "power_on":10,
+ "use_hint": false,
+ "wake_ts": [ 1, 7, 7 ],
+ "wake_duration": [ 5, 1, 2],
+ "data_size": 50
+ },
+ "on2":{
+ "is_sender": false,
"power_off": 0,
"power_on":10,
- "use_hint":true,
- "data_size": 10
+ "use_hint": false,
+ "wake_ts": [ 1, 7, 7 ],
+ "wake_duration": [ 5, 1, 2],
+ "data_size": 50
}
} \ No newline at end of file
diff --git a/src/inputs.cc b/src/inputs.cc
index 0cba7b6..84acdb7 100644
--- a/src/inputs.cc
+++ b/src/inputs.cc
@@ -1,27 +1,138 @@
#include "inputs.hpp"
+#include "xbt/log.h"
+#include <algorithm>
#include <iostream>
#include <fstream>
Inputs::Inputs(std::string node_name){
+ // Here we doing all the boring stuff
FILE* input_file = fopen(INPUTS_FILE, "rb");
- char input_file_buffer[65536];
+ char input_file_buffer[JSON_BUFFER_SIZE];
rapidjson::FileReadStream is(input_file, input_file_buffer, sizeof(input_file_buffer));
d.ParseStream(is);
fclose(input_file);
- wake_duration=d[node_name.c_str()]["wake_duration"].GetDouble();
- wake_interval=d[node_name.c_str()]["wake_interval"].GetDouble();
- startup_delay=d[node_name.c_str()]["startup_delay"].GetDouble();
+ // Init all variables
is_sender=d[node_name.c_str()]["is_sender"].GetBool();
use_hint=d[node_name.c_str()]["use_hint"].GetBool();
- max_attempts=d[node_name.c_str()]["max_attemps"].GetInt();
data_size=d[node_name.c_str()]["data_size"].GetInt();
+ for(auto& v:d[node_name.c_str()]["wake_ts"].GetArray()){
+ wake_ts.push_back(v.GetDouble());
+ }
+ for(auto& v:d[node_name.c_str()]["wake_duration"].GetArray()){
+ wake_duration.push_back(v.GetDouble());
+ }
+
+ // Identity check
+ if(wake_ts.size()!=wake_duration.size()){
+ std::cerr << "Invalid node configuration: wake_ts.size() != wake_duration.size()" <<std::endl;
+ exit(1);
+ }
+ if(!std::is_sorted(wake_ts.begin(),wake_ts.end())){
+ std::cerr << "Invalid node configuration: wake_ts is not sorted" <<std::endl;
+ exit(1);
+ }
+
+ // Ensure events are merged
+ MergeEvents();
+}
+
+void Inputs::MergeEvents(){
+ for(int i=0;i<(wake_ts.size()-1);i++){
+ double cur_ts=wake_ts[i];
+ double next_ts=wake_ts[i+1];
+ double cur_duration=wake_duration[i];
+ double next_duration=wake_duration[i+1];
+ // If we should merge then
+ if((cur_ts+cur_duration)>=next_ts){
+ // Create variable for convenience
+ double start=cur_ts;
+ double end=std::max(cur_ts+cur_duration,next_ts+next_duration);
+ wake_duration[i]=end-start;
+ // Now remove next event
+ wake_ts.erase(wake_ts.begin() + i + 1);
+ wake_duration.erase(wake_duration.begin() + i +1);
+ // This is not optimal. Yet it is simple :D
+ MergeEvents();
+ }
+ }
+}
+
+double Inputs::GetNextTS(){
+ // Ensure the caller is smart
+ if(wake_duration.size()<2){
+ std::cerr << "You are trying to access to the next timestamp but it does not exists" <<std::endl;
+ exit(1);
+ }
+ return wake_ts[1];
+}
+
+double Inputs::GetNextDuration(){
+ // Ensure the caller is smart
+ if(wake_duration.size()<2){
+ std::cerr << "You are trying to access to the next duration but it does not exists" <<std::endl;
+ exit(1);
+ }
+ return wake_duration[1];
+}
+
+void Inputs::GotoNextEvent(){
+ wake_ts.erase(wake_ts.begin());
+ wake_duration.erase(wake_duration.begin());
+}
+
+void Inputs::DumpEvents(){
+ std::cout << "Timestamps: ";
+ for(auto a:wake_ts){
+ std::cout << std::setw(5) << a << " ";
+ }
+ std::cout << std::endl;
+ std::cout << "Wake Durations: ";
+ for(auto a:wake_duration){
+ std::cout << std::setw(5) << a << " ";
+ }
+ std::cout << std::endl;
+}
+
+void Inputs::AddEvent(double ts, double duration){
+ // First handle timestamp
+ int pos=0;
+ for(auto it = std::begin(wake_ts); it != std::end(wake_ts); ++it) {
+ if(*it>=ts){
+ wake_ts.insert(it,ts);
+ break;
+ }
+ pos++;
+ }
+
+ // Ensure that ts and duration should not go to the end
+ if(pos==wake_ts.size()){
+ wake_ts.push_back(ts);
+ wake_duration.push_back(duration);
+ }
+ else {
+ // Handle durations here
+ int pos2=0;
+ for(auto it = std::begin(wake_duration); it != std::end(wake_duration); ++it) {
+ if(pos==pos2){
+ wake_duration.insert(it,duration);
+ break;
+ }
+ else if (it+1==std::end(wake_duration)) {
+ wake_duration.push_back(duration);
+ }
+ pos2++;
+ }
+ }
+ // Don't forget
+ MergeEvents();
}
void Inputs::GeneratePlatform(std::string p){
+ // The boring stuff
FILE* input_file = fopen(INPUTS_FILE, "rb");
- char input_file_buffer[65536];
+ char input_file_buffer[JSON_BUFFER_SIZE];
rapidjson::FileReadStream is(input_file, input_file_buffer, sizeof(input_file_buffer));
rapidjson::Document d;
d.ParseStream(is);
diff --git a/src/inputs.hpp b/src/inputs.hpp
index fa8af17..23ced49 100644
--- a/src/inputs.hpp
+++ b/src/inputs.hpp
@@ -2,23 +2,81 @@
#include "rapidjson/filereadstream.h"
#include <cstdio>
#include <string>
+#include <vector>
+#include <iostream>
+#include <algorithm>
+#include "xbt/log.h"
+#include <iomanip>
#define INPUTS_FILE "inputs.json"
+/// @brief Pay attention to this strange number, you could tear your hairs out
+#define JSON_BUFFER_SIZE 65536
using namespace rapidjson;
class Inputs {
+ /// @brief RapidJSON
Document d;
+ /// @brief Current node associated with the Inputs
std::string node_name;
+ /// @brief Timestamps (at which time the nodes should wake up)
+ std::vector<double> wake_ts;
+ /// @brief Wake up time durations
+ std::vector<double> wake_duration;
+ /**
+ * Recursively merge overlapping events
+ */
+ void MergeEvents();
public:
+ /**
+ * Load node_name configuration
+ */
Inputs(std::string node_name);
+ /**
+ * Generate a SimGrid platform file from the json configuration
+ */
static void GeneratePlatform(std::string p);
+ /**
+ * Is there any event that remains in the queue ?
+ */
+ bool ShouldContinue(){return wake_ts.size()!=0;}
+ /**
+ * Is there another event to process ?
+ */
+ bool HasNext(){return(wake_ts.size()>1);}
+ /**
+ * Get current event timestamp
+ */
+ double GetTS(){return wake_ts.front();}
+ /**
+ * Get current event duration
+ */
+ double GetDuration(){return wake_duration.front();}
+ /**
+ * Get next event timestamp
+ */
+ double GetNextTS();
+ /**
+ * Get next event duration
+ */
+ double GetNextDuration();
+ /**
+ * Time travel machine (note that this function is following the second principle
+ * of thermodynamics)
+ */
+ void GotoNextEvent();
+ /**
+ * Allows to add a *FUTURE* event and merge overlapping events
+ */
+ void AddEvent(double ts, double duration);
+ /**
+ * This is the timeline
+ */
+ void DumpEvents();
- double wake_duration;
- double wake_interval;
- double startup_delay;
+ /// @brief These are public attributes, please take care they are fragile
bool is_sender;
bool use_hint;
- int max_attempts;
int data_size;
+
}; \ No newline at end of file
diff --git a/src/simulator.cc b/src/simulator.cc
index 922cd15..3254c5c 100644
--- a/src/simulator.cc
+++ b/src/simulator.cc
@@ -17,8 +17,9 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO]");
typedef unsigned int u32;
class Payload{
public:
- Payload():hint(0),containsHint(false){}
+ Payload():hint(0),duration(0),containsHint(false){}
double hint;
+ double duration;
bool containsHint;
};
@@ -63,53 +64,40 @@ static void obs_node(std::vector<std::string> args) {
XBT_INFO("Deploying observation node %s",selfName.c_str());
// Init convenien variables
- double wake_interval=i.wake_interval;
- double wake_duration=i.wake_duration;
- double startup_delay=i.startup_delay;
- int max_attempts=i.max_attempts;
bool isSender=i.is_sender;
bool useHint=i.use_hint;
bool isObserver=false;
u32 data_size=i.data_size;
// Starting node
- u32 effectiveAttemps=0;
- double effective_wake_duration=wake_duration;
- double effective_wake_interval=wake_interval;
- TURN_OFF();
- simgrid::s4u::this_actor::sleep_for(startup_delay);
- for(u32 i=0;i<max_attempts;i++){
-
- // Sleeping
+ u32 nWakeUp=0;
+ u32 nDataRcv=0;
+ while(i.ShouldContinue()){
XBT_INFO("%s is spleeping",selfName.c_str());
TURN_OFF();
- simgrid::s4u::this_actor::sleep_for(effective_wake_interval);
- effective_wake_interval=wake_interval; // Restore wake interval
+ simgrid::s4u::this_actor::sleep_until(i.GetTS());
TURN_ON();
XBT_INFO("%s wakes up",selfName.c_str());
- // Wake up: try to send/receive
- try
- {
- if(isSender){
+ // Doing wake up stuff
+ try {
+ if(isSender){ // If I am a sender
Payload *p=new Payload();
- p->containsHint=true;
- p->hint=5;
- if(useHint){
- p->containsHint=i<(max_attempts-1); // Ensure that we will wake up again
- p->hint=wake_interval;
+ if(useHint&&i.HasNext()){
+ p->hint=i.GetNextTS();
+ p->duration=i.GetNextDuration();
}
- m->put(p,data_size,effective_wake_duration);
+ m->put(p,data_size,i.GetDuration());
XBT_INFO("%s send data successfully",selfName.c_str());
isObserver=true; // Do one send for now...
isSender=false;
}
else if (!isObserver){
- Payload* p=m->get<Payload>(effective_wake_duration);
+ Payload* p=m->get<Payload>(i.GetDuration());
+ nDataRcv++;
if(p->containsHint){
XBT_INFO("%s received and hint of %f",selfName.c_str(),p->hint);
- effective_wake_interval=p->hint;
- i--; // Add new attempt
+ i.AddEvent(p->hint, p->duration);
}
else{
XBT_INFO("%s received data successfully and switch to forwarding mode",selfName.c_str());
@@ -118,6 +106,7 @@ static void obs_node(std::vector<std::string> args) {
}
else {
XBT_INFO("%s is observing is environment...",selfName.c_str());
+ simgrid::s4u::this_actor::sleep_until(i.GetDuration());
}
}
catch (...)
@@ -127,9 +116,11 @@ static void obs_node(std::vector<std::string> args) {
else
XBT_INFO("%s failed to receive data",selfName.c_str());
}
- effectiveAttemps++;
+ i.GotoNextEvent();
+ nWakeUp++;
}
+
// Done
- XBT_INFO("Observation node %s finished (attemps:%d)",selfName.c_str(),effectiveAttemps);
+ XBT_INFO("Observation node %s finished (nWakeUp:%d|nDataRcv:%d)",selfName.c_str(),nWakeUp,nDataRcv);
} \ No newline at end of file