aboutsummaryrefslogtreecommitdiff
path: root/src/simulator.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/simulator.cc')
-rw-r--r--src/simulator.cc77
1 files changed, 61 insertions, 16 deletions
diff --git a/src/simulator.cc b/src/simulator.cc
index 426c0e0..603260e 100644
--- a/src/simulator.cc
+++ b/src/simulator.cc
@@ -56,6 +56,8 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO] Loosely Coupled DSS");
/// @brief For convenience sake
typedef unsigned int u32;
+u32 nON;
+bool *data_ready;
/**
* Data that will be exchange between the nodes
@@ -74,6 +76,13 @@ public:
/// @brief Observation node code
static void obs_node(std::vector<std::string> args);
+bool is_data_rcv_ready(){
+ for(int i=0;i<nON;i++){
+ if(data_ready[i])
+ return true;
+ }
+ return false;
+}
/**
* No arguments are require (cf inputs.json)
@@ -92,11 +101,14 @@ int main(int argc, char **argv) {
XBT_INFO("-------------------------------------------------");
// Init all nodes actors
- u32 nON=simgrid::s4u::Engine::get_instance()->get_host_count();
+ nON=simgrid::s4u::Engine::get_instance()->get_host_count();
+ data_ready=(bool*)malloc(sizeof(bool)*nON);
for(u32 i=0;i<nON;i++){
std::vector<std::string> args;
std::ostringstream ss;
ss<< "on" <<i;
+ args.push_back(std::to_string(i));
+ data_ready[i]=false;
simgrid::s4u::Actor::create("obs_node", simgrid::s4u::Host::by_name(ss.str()), obs_node, args);
}
@@ -113,6 +125,7 @@ int main(int argc, char **argv) {
static void obs_node(std::vector<std::string> args) {
// Init various variables
std::string selfName = simgrid::s4u::this_actor::get_host()->get_name();
+ int id=stoi(args[0]);
simgrid::s4u::this_actor::get_host()->turn_on();
Inputs i(selfName);
simgrid::s4u::Mailbox *m = simgrid::s4u::Mailbox::by_name("medium");
@@ -163,19 +176,21 @@ static void obs_node(std::vector<std::string> args) {
TRACK_UPTIME(SEND(m_ded->put(p,i.hint_size,uptime)));
XBT_INFO("%s sent a hint successfully",CNAME);
}
- MODE_TX();
- // Then try sending the data
- if(i.extended){
- SEND(m_ded->put(p,i.data_size));
- }
- else{
- SEND(m_ded->put(p,i.data_size,uptime));
+ if(is_data_rcv_ready()){
+ MODE_TX();
+ // Then try sending the data
+ if(i.extended){
+ SEND(m_ded->put(p,i.data_size));
+ }
+ else{
+ SEND(m_ded->put(p,i.data_size,uptime));
+ }
+ // If we reach here, data has been sent successfully
+ XBT_INFO("%s sent data successfully",CNAME);
+ nSend++;
+ i.is_sender=(nSend<(i.n_nodes-1));
+ isObserver=!i.is_sender;
}
- // If we reach here, data has been sent successfully
- XBT_INFO("%s sent data successfully",CNAME);
- nSend++;
- i.is_sender=(nSend<(i.n_nodes-1));
- isObserver=!i.is_sender;
}
catch(...){
XBT_INFO("%s could not send any data",CNAME);
@@ -226,6 +241,7 @@ static void obs_node(std::vector<std::string> args) {
hint_forward=new Payload(*p); // Enable hint forwarding
hintReceived=true;
}
+ data_ready[id]=true; // Say to the receiver that he can send
if(i.extended)
p=m_ded->get<Payload>(); // Fetch data until sended
else
@@ -242,16 +258,45 @@ static void obs_node(std::vector<std::string> args) {
if(hintReceived)
i.AddEvent(hint->hint, hint->duration); // Add the hint to the event list
}
+ data_ready[id]=false;
}
forward_mode=!forward_mode; // Toggle mode (go back and forth between receiving and forwarding)
}
else {
XBT_INFO("%s is observing his environment...",CNAME);
MODE_ON();
- if(i.use_hint && hint_forward!=NULL && CLOCK < hint_forward->hint){
- FORWARD_HINT(uptime);
+ if(i.use_hint){
+ double try_for=forward_mode ? 1 : 1;
+ try_for=try_for>uptime ? uptime : try_for;
+
+ if(forward_mode && hint_forward!=NULL && CLOCK < hint_forward->hint){
+ FORWARD_HINT(try_for);
+ }
+ else {
+ Payload *p;
+ try {
+ do {
+ TRACK_UPTIME(p=m->get<Payload>(try_for));
+ } while(p->HisForward);
+ simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
+ // Start receiving hint
+ MODE_RX();
+ if(p->HasHint){
+ TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
+ XBT_INFO("%s received a hint successfully",CNAME);
+ hint_forward=new Payload(*p); // Enable hint forwarding
+ }
+ else {
+ simgrid::s4u::this_actor::sleep_for(try_for);
+ }
+ }
+ catch(...){
+ }
+
+ }
+ forward_mode=!forward_mode;
}
- else{
+ else {
simgrid::s4u::this_actor::sleep_for(uptime);
}
}