aboutsummaryrefslogtreecommitdiff
path: root/src/simulator.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/simulator.cc')
-rw-r--r--src/simulator.cc167
1 files changed, 124 insertions, 43 deletions
diff --git a/src/simulator.cc b/src/simulator.cc
index a0b55bd..9dc3e58 100644
--- a/src/simulator.cc
+++ b/src/simulator.cc
@@ -2,6 +2,7 @@
#include <simgrid/s4u/Mailbox.hpp>
#include <simgrid/s4u/Host.hpp>
#include <simgrid/plugins/energy.h>
+#include <vector>
#include <xbt/log.h>
#include <string>
@@ -41,14 +42,30 @@
{ \
if(hint_forward!=NULL && CLOCK<hint_forward->hint){ \
hint_forward->HisForward=true; \
+ hint_forward->SenderId=selfName; \
hint_forward->DedicatedMailbox="hint_forward"; \
+ hint_forward->HasData=false; \
+ hint_forward->HasHint=true; \
try { \
XBT_INFO("%s try to forward a hint",CNAME); \
TRACK_UPTIME(m->put(hint_forward,0,TRY_FORWARD_DURING)); \
simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(hint_forward->DedicatedMailbox); \
- MODE_TX(); \
- SEND(m_ded->put(hint_forward,0,uptime)); \
- XBT_INFO("%s forward a hint successfully",CNAME); \
+ Payload* ack_rcv=m_ded->get<Payload>(); \
+ if(!hint_forward->IsAlreadySent(ack_rcv->SenderId)){ \
+ Payload *ack=new Payload(selfName); \
+ ack->Abort=false; \
+ m_ded->put(ack,0); \
+ MODE_TX(); \
+ SEND(m_ded->put(hint_forward,0,uptime)); \
+ XBT_INFO("%s forward a hint successfully",CNAME); \
+ hint_forward->AddReceiver(ack_rcv->SenderId); \
+ } \
+ else { \
+ Payload *abort=new Payload(selfName); \
+ abort->Abort=true; \
+ m_ded->put(abort,0); \
+ TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(FOR(TRY_FORWARD_DURING))); \
+ } \
} \
catch(...){ \
XBT_INFO("%s fail to forward a hint",CNAME); \
@@ -71,8 +88,8 @@ typedef unsigned int u32;
*/
class Payload{
public:
- Payload():hint(0),duration(0),HasHint(false),HisForward(false),HasData(false),DataSize(0),Abort(false){}
- Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox),HisForward(p.HisForward),HasData(p.HasData),DataSize(p.DataSize),Abort(p.Abort){}
+ Payload(std::string senderid):SenderId(senderid),hint(0),duration(0),HasHint(false),HisForward(false),HasData(false),DataSize(0),Abort(false){}
+ Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox),HisForward(p.HisForward),HasData(false),DataSize(p.DataSize),Abort(p.Abort),SenderId(p.SenderId){}
double hint; // The timestamp that should be used by the receiver
double duration; // The duration that should be used by the receiver
bool HasHint;
@@ -81,6 +98,18 @@ public:
bool Abort; // Allow the receiver to abort a communication (if they already received the data for example) and unlock the sender
u32 DataSize;
std::string DedicatedMailbox; // Dedicated mailbox used by the sender/receiver
+ std::string SenderId;
+ std::vector<std::string> receivers; // Keep track of the receivers
+ bool IsAlreadySent(std::string receiverId){
+ for(auto id:receivers){
+ if(id==receiverId)
+ return true;
+ }
+ return false;
+ };
+ void AddReceiver(std::string receiverId){
+ receivers.push_back(receiverId);
+ }
};
/// @brief Observation node code
@@ -156,31 +185,50 @@ static void obs_node(std::vector<std::string> args) {
bool forward_mode=false; // Turned on and off every x seconds by the receiver (to switch between forward hint mode and receiving data mode)
bool forward_only=false; // When observer receive a hint it switch to forward only up to the next wake up time
bool sendhint_mode=false; // Turned on and off every x seconds by the sender (to switch between send hint and send data)
+ Payload *sender_hint=new Payload(selfName); // One instance per wake up (since will send it one time per receiver it instance should be consistant across send)
while(CLOCK < upuntil)
{
// ---------- SENDER ----------
if(is_sender){
// Send hint if send hint mode is enable
if(i.use_hint && sendhint_mode && i.HasNext()){
- Payload *p=new Payload();
- p->DedicatedMailbox="hintmailbox"+selfName; // Use a dedicated mailbox
- p->HasHint=true;
- p->duration=i.GetNextDuration();
- p->hint=i.GetNextTS();
- p->DataSize=i.hint_size;
+ sender_hint->DedicatedMailbox="hintmailbox"+selfName; // Use a dedicated mailbox
+ sender_hint->HasHint=true;
+ sender_hint->HasData=false;
+ sender_hint->duration=i.GetNextDuration();
+ sender_hint->hint=i.GetNextTS();
+ sender_hint->DataSize=i.hint_size;
try {
- TRACK_UPTIME(m->put(p,0,FOR(0.3))); // Init connection with a receiver
- simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
- MODE_TX();
- SEND(m_ded->put(p,p->DataSize,uptime)); // Send the actual hint
- MODE_ON();
- XBT_INFO("%s sent a hint successfully",CNAME);
+ TRACK_UPTIME(m->put(sender_hint,0,FOR(0.3))); // Init connection with a receiver
+ simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(sender_hint->DedicatedMailbox);
+ Payload* ack_rcv=m_ded->get<Payload>();
+ if(!sender_hint->IsAlreadySent(ack_rcv->SenderId)){
+ // Send ack to receiver
+ Payload *ack=new Payload(selfName);
+ ack->Abort=false;
+ m_ded->put(ack,0);
+ // Start communication
+ MODE_TX();
+ SEND(m_ded->put(sender_hint,sender_hint->DataSize,uptime)); // Send the actual hint
+ MODE_ON();
+ XBT_INFO("%s sent a hint successfully",CNAME);
+ // Add the receiver to the list
+ sender_hint->AddReceiver(ack_rcv->SenderId);
+ }
+ else {
+ // Receiver already have the hint so abort
+ Payload *abort=new Payload(selfName);
+ abort->Abort=true;
+ m_ded->put(abort,0);
+ simgrid::s4u::this_actor::sleep_for(FOR(0.3)); // Make time progress
+ }
+
}
catch(...){}
}
// Send data if send hint mode is disable
else{
- Payload *p=new Payload();
+ Payload *p=new Payload(selfName);
p->DedicatedMailbox="datamailbox"+selfName;
p->HasData=true;
p->HasHint=false;
@@ -236,9 +284,17 @@ static void obs_node(std::vector<std::string> args) {
// Get the instantaneous message
do {
TRACK_UPTIME(p=m->get<Payload>(FOR(1)));
+ Payload *ack=new Payload(selfName);
+ ack->Abort=false;
if(p->HisForward){
- if(hint_forward==NULL || (hint_forward !=NULL && p->hint>hint_forward->hint)){
- simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
+ simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
+ // Send ack
+ Payload *ack=new Payload(selfName);
+ ack->Abort=false;
+ m_ded->put(ack,0);
+ // Check if we should receive
+ p=m_ded->get<Payload>();
+ if(!p->Abort){
MODE_RX();
TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
MODE_ON();
@@ -249,24 +305,34 @@ static void obs_node(std::vector<std::string> args) {
hint_added++;
}
}
+ p->HisForward=true; // To no get out of the loop
}
} while(p->HisForward);
simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
- // Start receiving data
+ // Start receiving
MODE_RX();
+ // Only hint
if(p->HasHint && !p->HasData){
- TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
- XBT_INFO("%s received a hint successfully",CNAME);
- hint_forward=new Payload(*p); // Enable hint forwarding
- if(CLOCK < p->hint){
- ADD_EVENT(p);
- hint_forward=new Payload(*p);
- hint_added++;
+ // Send ack
+ Payload *ack=new Payload(selfName);
+ ack->Abort=false;
+ m_ded->put(ack,0);
+ // Check if we should receive
+ p=m_ded->get<Payload>();
+ if(!p->Abort){
+ TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
+ XBT_INFO("%s received a hint successfully",CNAME);
+ hint_forward=new Payload(*p); // Enable hint forwarding
+ if(CLOCK < p->hint){
+ ADD_EVENT(p);
+ hint_forward=new Payload(*p);
+ hint_added++;
+ }
}
}
else {
// Inform the sender that we do not want to abort
- Payload *ack=new Payload();
+ Payload *ack=new Payload(selfName);
ack->Abort=false;
m_ded->put(ack,0); // Instantaneous msg
@@ -312,40 +378,55 @@ static void obs_node(std::vector<std::string> args) {
else {
Payload *p;
try {
- do {
- TRACK_UPTIME(p=m->get<Payload>(FOR(1)));
- } while(p->HisForward); // Ignore forwarded hint
+ TRACK_UPTIME(p=m->get<Payload>(FOR(1)));
simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox);
// Start receiving hint from sender
if(p->HasData){
- Payload *ack=new Payload();
+ Payload *ack=new Payload(selfName);
ack->Abort=true;
m_ded->put(ack,0);
simgrid::s4u::this_actor::sleep_for(FOR(1));
}
else if(p->HasHint){
- MODE_RX();
- TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
- XBT_INFO("%s received a hint successfully",CNAME);
- hint_forward=new Payload(*p); // Enable hint forwarding
- forward_only=true;
+ // Send ack (allow the sender to fetch our node name (selfname))
+ Payload *ack=new Payload(selfName);
+ ack->Abort=false;
+ m_ded->put(ack,0);
+ // Check if we should receive (if sender did not send us the hint already abort should be true)
+ p=m_ded->get<Payload>();
+ if(!p->Abort){
+ MODE_RX();
+ TRACK_UPTIME(p=m_ded->get<Payload>(uptime));
+ XBT_INFO("%s received a hint successfully",CNAME);
+ hint_forward=new Payload(*p); // Enable hint forwarding
+ forward_only=true;
+ }
+ else{ // Do not forget to sleep if no communications happend (otherwise time frooze)
+ simgrid::s4u::this_actor::sleep_for(FOR(1));
+ }
}
else {
simgrid::s4u::this_actor::sleep_for(FOR(1));
}
}
- catch(...){
- }
+ catch(...){}
}
- forward_mode=!forward_mode;
+ forward_mode=!forward_mode; // Switch between forward and receive
}
else {
simgrid::s4u::this_actor::sleep_until(upuntil);
}
}
- uptime=upuntil-CLOCK; // Note that uptime can be < 0 in extended mode
- uptime=uptime > 0 ? uptime : 0; // Just in case
+ uptime=upuntil-CLOCK;
+ uptime=uptime > 0 ? uptime : 0; // Note that uptime can be < 0 in extended mode
+
+ // Sometime uptimes get really, really small leading to deadlocks
+ // it seems that in converge towards 0 close to infinitely
+ if(uptime<0.001){
+ uptime=0;
+ upuntil=CLOCK;
+ }
}
// Load next event
i.GotoNextEvent();