diff options
Diffstat (limited to 'src/simulator.cc')
| -rw-r--r-- | src/simulator.cc | 122 |
1 files changed, 88 insertions, 34 deletions
diff --git a/src/simulator.cc b/src/simulator.cc index 9322a2b..76bb0fb 100644 --- a/src/simulator.cc +++ b/src/simulator.cc @@ -28,10 +28,28 @@ /// @brief Note that we need to simulate latency our self since we need to send instantaneous messages #define SEND(instruction) \ { \ - simgrid::s4u::this_actor::sleep_for(i.latency); \ + TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(i.latency > uptime ? uptime : i.latency)); \ instruction; \ } +#define FORWARD_HINT(TRY_FORWARD_DURING) \ + { \ + if(hint_forward!=NULL && CLOCK<hint_forward->hint){ \ + hint_forward->HisForward=true; \ + hint_forward->DedicatedMailbox="hint_forward"; \ + try { \ + XBT_INFO("%s try to forward a hint",CNAME); \ + TRACK_UPTIME(m->put(hint_forward,0,TRY_FORWARD_DURING)); \ + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(hint_forward->DedicatedMailbox); \ + MODE_TX(); \ + SEND(m_ded->put(hint_forward,0,uptime)); \ + } \ + catch(...){ \ + XBT_INFO("%s fail to forward a hint",CNAME); \ + } \ + } \ + } + /// @brief Required by SimGrid XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO] Loosely Coupled DSS"); @@ -44,11 +62,12 @@ typedef unsigned int u32; */ class Payload{ public: - Payload():hint(0),duration(0),HasHint(false){} - Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox){} + Payload():hint(0),duration(0),HasHint(false),HisForward(false){} + Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox),HisForward(p.HisForward){} double hint; // The timestamp that should be used by the receiver double duration; // The duration that should be used by the receiver bool HasHint; + bool HisForward; std::string DedicatedMailbox; // Dedicated mailbox used by the sender/receiver }; @@ -107,6 +126,7 @@ static void obs_node(std::vector<std::string> args) { u32 nRcvFail=0; u32 nSend=0; double totalUptime=0; + Payload *hint_forward=NULL; while(i.ShouldContinue()){ // Start by sleeping XBT_INFO("%s is spleeping",CNAME); @@ -119,6 +139,7 @@ static void obs_node(std::vector<std::string> args) { double uptime=i.GetDuration(); double upsince=simgrid::s4u::Engine::get_clock(); double upuntil=i.GetTS()+i.GetDuration(); + bool forward_mode=false; while(uptime>0) { if(i.is_sender){ @@ -135,7 +156,6 @@ static void obs_node(std::vector<std::string> args) { // This allow first to detect if their is a receiver // (to not cause deadlock for the extended mode) and second // to inform the receiver if he should get a hint first - MODE_TX(); TRACK_UPTIME(m->put(p,0,uptime)); simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); // First send hint if it is required @@ -143,6 +163,7 @@ static void obs_node(std::vector<std::string> args) { TRACK_UPTIME(SEND(m_ded->put(p,i.hint_size,uptime))); XBT_INFO("%s sent a hint successfully",CNAME); } + MODE_TX(); // Then try sending the data if(i.extended){ SEND(m_ded->put(p,i.data_size)); @@ -162,42 +183,75 @@ static void obs_node(std::vector<std::string> args) { } } else if(!isObserver){ - Payload *p; // Received data - Payload *hint; // To Save the received hint - bool hintReceived=false; // In case of error during data rx this will be use to check if we could use the *hint Payload object - try { - // Get the instantaneous message - TRACK_UPTIME(p=m->get<Payload>(uptime)); - simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); - // Start receiving data - MODE_RX(); - if(p->HasHint){ - TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); - XBT_INFO("%s received a hint successfully",CNAME); - hint=new Payload(*p); // Save hint - hintReceived=true; + // Here we try to forward hint for 1 sec and try to receive data for 5 secs + double try_for=forward_mode ? 1 : 1; + try_for=try_for>uptime ? uptime : try_for; // Ensure we do not exceed uptime + // Forward hint mode + if(forward_mode){ + if(hint_forward!=NULL && CLOCK < hint_forward->hint){ + FORWARD_HINT(try_for); // Try forward for 5 seconds then switch to received mode + } + } + else { // Receiving mode + Payload *p; // Received data + Payload *hint; // To Save the received hint + bool hintReceived=false; // In case of error during data rx this will be use to check if we could use the *hint Payload object + try { + // Get the instantaneous message + do { + TRACK_UPTIME(p=m->get<Payload>(try_for)); + if(p->HisForward){ + if(hint_forward==NULL || (hint_forward !=NULL && p->hint!=hint_forward->hint)){ + simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + MODE_RX(); + TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); + MODE_ON(); + XBT_INFO("%s received a forwarded hint successfully",CNAME); + if(CLOCK < p->hint){ + i.AddEvent(p->hint, p->duration); + hint_forward=new Payload(*p); + } + } + } + } while(p->HisForward); + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + // Start receiving data + MODE_RX(); + if(p->HasHint){ + TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); + XBT_INFO("%s received a hint successfully",CNAME); + hint=new Payload(*p); // Save hint + hint_forward=new Payload(*p); // Enable hint forwarding + hintReceived=true; + } + if(i.extended) + p=m_ded->get<Payload>(); // Fetch data until sended + else + p=m_ded->get<Payload>(uptime); // Fetch data until sended or uptime expire + // If we reach here, data has been received successfully + XBT_INFO("%s received data successfully",CNAME); + nDataRcv++; + isObserver=true; + i.is_sender=false; + + }catch(...){ + XBT_INFO("%s could not receive any data",CNAME); + nRcvFail++; + if(hintReceived) + i.AddEvent(hint->hint, hint->duration); // Add the hint to the event list } - if(i.extended) - p=m_ded->get<Payload>(); // Fetch data until sended - else - p=m_ded->get<Payload>(uptime); // Fetch data until sended or uptime expire - // If we reach here, data has been received successfully - XBT_INFO("%s received data successfully",CNAME); - nDataRcv++; - isObserver=true; - i.is_sender=false; - - }catch(...){ - XBT_INFO("%s could not receive any data",CNAME); - nRcvFail++; - if(hintReceived) - i.AddEvent(hint->hint, hint->duration); // Add the hint to the event list } + forward_mode=!forward_mode; // Toggle mode (go back and forth between receiving and forwarding) } else { XBT_INFO("%s is observing his environment...",CNAME); MODE_ON(); - simgrid::s4u::this_actor::sleep_for(uptime); + if(hint_forward!=NULL && CLOCK < hint_forward->hint){ + FORWARD_HINT(uptime); + } + else{ + simgrid::s4u::this_actor::sleep_for(uptime); + } } uptime=upuntil-CLOCK; // Note that uptime can be < 0 in extended mode } |
