diff options
Diffstat (limited to 'src/simulator.cc')
| -rw-r--r-- | src/simulator.cc | 167 |
1 files changed, 124 insertions, 43 deletions
diff --git a/src/simulator.cc b/src/simulator.cc index a0b55bd..9dc3e58 100644 --- a/src/simulator.cc +++ b/src/simulator.cc @@ -2,6 +2,7 @@ #include <simgrid/s4u/Mailbox.hpp> #include <simgrid/s4u/Host.hpp> #include <simgrid/plugins/energy.h> +#include <vector> #include <xbt/log.h> #include <string> @@ -41,14 +42,30 @@ { \ if(hint_forward!=NULL && CLOCK<hint_forward->hint){ \ hint_forward->HisForward=true; \ + hint_forward->SenderId=selfName; \ hint_forward->DedicatedMailbox="hint_forward"; \ + hint_forward->HasData=false; \ + hint_forward->HasHint=true; \ try { \ XBT_INFO("%s try to forward a hint",CNAME); \ TRACK_UPTIME(m->put(hint_forward,0,TRY_FORWARD_DURING)); \ simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(hint_forward->DedicatedMailbox); \ - MODE_TX(); \ - SEND(m_ded->put(hint_forward,0,uptime)); \ - XBT_INFO("%s forward a hint successfully",CNAME); \ + Payload* ack_rcv=m_ded->get<Payload>(); \ + if(!hint_forward->IsAlreadySent(ack_rcv->SenderId)){ \ + Payload *ack=new Payload(selfName); \ + ack->Abort=false; \ + m_ded->put(ack,0); \ + MODE_TX(); \ + SEND(m_ded->put(hint_forward,0,uptime)); \ + XBT_INFO("%s forward a hint successfully",CNAME); \ + hint_forward->AddReceiver(ack_rcv->SenderId); \ + } \ + else { \ + Payload *abort=new Payload(selfName); \ + abort->Abort=true; \ + m_ded->put(abort,0); \ + TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(FOR(TRY_FORWARD_DURING))); \ + } \ } \ catch(...){ \ XBT_INFO("%s fail to forward a hint",CNAME); \ @@ -71,8 +88,8 @@ typedef unsigned int u32; */ class Payload{ public: - Payload():hint(0),duration(0),HasHint(false),HisForward(false),HasData(false),DataSize(0),Abort(false){} - Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox),HisForward(p.HisForward),HasData(p.HasData),DataSize(p.DataSize),Abort(p.Abort){} + Payload(std::string senderid):SenderId(senderid),hint(0),duration(0),HasHint(false),HisForward(false),HasData(false),DataSize(0),Abort(false){} + Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox),HisForward(p.HisForward),HasData(false),DataSize(p.DataSize),Abort(p.Abort),SenderId(p.SenderId){} double hint; // The timestamp that should be used by the receiver double duration; // The duration that should be used by the receiver bool HasHint; @@ -81,6 +98,18 @@ public: bool Abort; // Allow the receiver to abort a communication (if they already received the data for example) and unlock the sender u32 DataSize; std::string DedicatedMailbox; // Dedicated mailbox used by the sender/receiver + std::string SenderId; + std::vector<std::string> receivers; // Keep track of the receivers + bool IsAlreadySent(std::string receiverId){ + for(auto id:receivers){ + if(id==receiverId) + return true; + } + return false; + }; + void AddReceiver(std::string receiverId){ + receivers.push_back(receiverId); + } }; /// @brief Observation node code @@ -156,31 +185,50 @@ static void obs_node(std::vector<std::string> args) { bool forward_mode=false; // Turned on and off every x seconds by the receiver (to switch between forward hint mode and receiving data mode) bool forward_only=false; // When observer receive a hint it switch to forward only up to the next wake up time bool sendhint_mode=false; // Turned on and off every x seconds by the sender (to switch between send hint and send data) + Payload *sender_hint=new Payload(selfName); // One instance per wake up (since will send it one time per receiver it instance should be consistant across send) while(CLOCK < upuntil) { // ---------- SENDER ---------- if(is_sender){ // Send hint if send hint mode is enable if(i.use_hint && sendhint_mode && i.HasNext()){ - Payload *p=new Payload(); - p->DedicatedMailbox="hintmailbox"+selfName; // Use a dedicated mailbox - p->HasHint=true; - p->duration=i.GetNextDuration(); - p->hint=i.GetNextTS(); - p->DataSize=i.hint_size; + sender_hint->DedicatedMailbox="hintmailbox"+selfName; // Use a dedicated mailbox + sender_hint->HasHint=true; + sender_hint->HasData=false; + sender_hint->duration=i.GetNextDuration(); + sender_hint->hint=i.GetNextTS(); + sender_hint->DataSize=i.hint_size; try { - TRACK_UPTIME(m->put(p,0,FOR(0.3))); // Init connection with a receiver - simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); - MODE_TX(); - SEND(m_ded->put(p,p->DataSize,uptime)); // Send the actual hint - MODE_ON(); - XBT_INFO("%s sent a hint successfully",CNAME); + TRACK_UPTIME(m->put(sender_hint,0,FOR(0.3))); // Init connection with a receiver + simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(sender_hint->DedicatedMailbox); + Payload* ack_rcv=m_ded->get<Payload>(); + if(!sender_hint->IsAlreadySent(ack_rcv->SenderId)){ + // Send ack to receiver + Payload *ack=new Payload(selfName); + ack->Abort=false; + m_ded->put(ack,0); + // Start communication + MODE_TX(); + SEND(m_ded->put(sender_hint,sender_hint->DataSize,uptime)); // Send the actual hint + MODE_ON(); + XBT_INFO("%s sent a hint successfully",CNAME); + // Add the receiver to the list + sender_hint->AddReceiver(ack_rcv->SenderId); + } + else { + // Receiver already have the hint so abort + Payload *abort=new Payload(selfName); + abort->Abort=true; + m_ded->put(abort,0); + simgrid::s4u::this_actor::sleep_for(FOR(0.3)); // Make time progress + } + } catch(...){} } // Send data if send hint mode is disable else{ - Payload *p=new Payload(); + Payload *p=new Payload(selfName); p->DedicatedMailbox="datamailbox"+selfName; p->HasData=true; p->HasHint=false; @@ -236,9 +284,17 @@ static void obs_node(std::vector<std::string> args) { // Get the instantaneous message do { TRACK_UPTIME(p=m->get<Payload>(FOR(1))); + Payload *ack=new Payload(selfName); + ack->Abort=false; if(p->HisForward){ - if(hint_forward==NULL || (hint_forward !=NULL && p->hint>hint_forward->hint)){ - simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + // Send ack + Payload *ack=new Payload(selfName); + ack->Abort=false; + m_ded->put(ack,0); + // Check if we should receive + p=m_ded->get<Payload>(); + if(!p->Abort){ MODE_RX(); TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); MODE_ON(); @@ -249,24 +305,34 @@ static void obs_node(std::vector<std::string> args) { hint_added++; } } + p->HisForward=true; // To no get out of the loop } } while(p->HisForward); simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); - // Start receiving data + // Start receiving MODE_RX(); + // Only hint if(p->HasHint && !p->HasData){ - TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); - XBT_INFO("%s received a hint successfully",CNAME); - hint_forward=new Payload(*p); // Enable hint forwarding - if(CLOCK < p->hint){ - ADD_EVENT(p); - hint_forward=new Payload(*p); - hint_added++; + // Send ack + Payload *ack=new Payload(selfName); + ack->Abort=false; + m_ded->put(ack,0); + // Check if we should receive + p=m_ded->get<Payload>(); + if(!p->Abort){ + TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); + XBT_INFO("%s received a hint successfully",CNAME); + hint_forward=new Payload(*p); // Enable hint forwarding + if(CLOCK < p->hint){ + ADD_EVENT(p); + hint_forward=new Payload(*p); + hint_added++; + } } } else { // Inform the sender that we do not want to abort - Payload *ack=new Payload(); + Payload *ack=new Payload(selfName); ack->Abort=false; m_ded->put(ack,0); // Instantaneous msg @@ -312,40 +378,55 @@ static void obs_node(std::vector<std::string> args) { else { Payload *p; try { - do { - TRACK_UPTIME(p=m->get<Payload>(FOR(1))); - } while(p->HisForward); // Ignore forwarded hint + TRACK_UPTIME(p=m->get<Payload>(FOR(1))); simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); // Start receiving hint from sender if(p->HasData){ - Payload *ack=new Payload(); + Payload *ack=new Payload(selfName); ack->Abort=true; m_ded->put(ack,0); simgrid::s4u::this_actor::sleep_for(FOR(1)); } else if(p->HasHint){ - MODE_RX(); - TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); - XBT_INFO("%s received a hint successfully",CNAME); - hint_forward=new Payload(*p); // Enable hint forwarding - forward_only=true; + // Send ack (allow the sender to fetch our node name (selfname)) + Payload *ack=new Payload(selfName); + ack->Abort=false; + m_ded->put(ack,0); + // Check if we should receive (if sender did not send us the hint already abort should be true) + p=m_ded->get<Payload>(); + if(!p->Abort){ + MODE_RX(); + TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); + XBT_INFO("%s received a hint successfully",CNAME); + hint_forward=new Payload(*p); // Enable hint forwarding + forward_only=true; + } + else{ // Do not forget to sleep if no communications happend (otherwise time frooze) + simgrid::s4u::this_actor::sleep_for(FOR(1)); + } } else { simgrid::s4u::this_actor::sleep_for(FOR(1)); } } - catch(...){ - } + catch(...){} } - forward_mode=!forward_mode; + forward_mode=!forward_mode; // Switch between forward and receive } else { simgrid::s4u::this_actor::sleep_until(upuntil); } } - uptime=upuntil-CLOCK; // Note that uptime can be < 0 in extended mode - uptime=uptime > 0 ? uptime : 0; // Just in case + uptime=upuntil-CLOCK; + uptime=uptime > 0 ? uptime : 0; // Note that uptime can be < 0 in extended mode + + // Sometime uptimes get really, really small leading to deadlocks + // it seems that in converge towards 0 close to infinitely + if(uptime<0.001){ + uptime=0; + upuntil=CLOCK; + } } // Load next event i.GotoNextEvent(); |
