diff options
| author | Loic Guegan <manzerbredes@mailbox.org> | 2021-05-21 11:05:34 +0200 |
|---|---|---|
| committer | Loic Guegan <manzerbredes@mailbox.org> | 2021-05-21 11:05:34 +0200 |
| commit | b3bd42cf60bfff1b4563e4e2b702c31c9eccdb63 (patch) | |
| tree | d02777700ec48ac4712c6e25d1fb3614dd7f02b5 /src | |
| parent | 4476e5c7b1b9de4498f341dc242ca6531c8aac0d (diff) | |
| parent | 2f768e43fc882730afa33cdd1b3a86787dc6ecda (diff) | |
Merge
Diffstat (limited to 'src')
| -rw-r--r-- | src/simulator.cc | 197 |
1 files changed, 119 insertions, 78 deletions
diff --git a/src/simulator.cc b/src/simulator.cc index 603260e..02fbdc6 100644 --- a/src/simulator.cc +++ b/src/simulator.cc @@ -17,6 +17,7 @@ #define MODE_TX() simgrid::s4u::this_actor::get_host()->set_pstate(3); #define CLOCK (simgrid::s4u::Engine::get_clock()) #define CNAME (selfName.c_str()) +#define FOR(t) (t<uptime?t:uptime) #define TRACK_UPTIME(instruction) \ { \ double uptimeTrack=CLOCK; \ @@ -64,12 +65,15 @@ bool *data_ready; */ class Payload{ public: - Payload():hint(0),duration(0),HasHint(false),HisForward(false){} - Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox),HisForward(p.HisForward){} + Payload():hint(0),duration(0),HasHint(false),HisForward(false),HasData(false),DataSize(0),Abort(false){} + Payload(Payload &p):hint(p.hint),duration(p.duration),HasHint(p.HasHint),DedicatedMailbox(p.DedicatedMailbox),HisForward(p.HisForward),HasData(p.HasData),DataSize(p.DataSize),Abort(p.Abort){} double hint; // The timestamp that should be used by the receiver double duration; // The duration that should be used by the receiver bool HasHint; + bool HasData; bool HisForward; + bool Abort; + u32 DataSize; std::string DedicatedMailbox; // Dedicated mailbox used by the sender/receiver }; @@ -132,14 +136,16 @@ static void obs_node(std::vector<std::string> args) { XBT_INFO("Deploying observation node %s",CNAME); // Starting node - bool isObserver=false; u32 nWakeUp=0; u32 nDataRcv=0; u32 nSendFail=0; u32 nRcvFail=0; u32 nSend=0; + u32 hint_added=0; double totalUptime=0; - Payload *hint_forward=NULL; + Payload *hint_forward=NULL; // Contains the hint to forward + bool is_sender=i.is_sender; // This variable might change if all receiver have received the data + bool isObserver=false; while(i.ShouldContinue()){ // Start by sleeping XBT_INFO("%s is spleeping",CNAME); @@ -149,74 +155,93 @@ static void obs_node(std::vector<std::string> args) { XBT_INFO("%s wakes up",CNAME); // Doing wake up stuff - double uptime=i.GetDuration(); - double upsince=simgrid::s4u::Engine::get_clock(); - double upuntil=i.GetTS()+i.GetDuration(); - bool forward_mode=false; - while(uptime>0.00001) // Avoid dead lock with uptime since we handle it manually + double uptime=i.GetDuration(); // Store the remaining wake up duration (updated during the node uptime) + double upsince=CLOCK; // Store the time at which the node woke up + double upuntil=i.GetTS()+i.GetDuration(); // Store the time at which the node should sleep + bool forward_mode=false; // Turned on and off every x seconds by the receiver (to switch between forward hint mode and receiving data mode) + bool forward_only=false; // When observer receive a hint it switch to forward only up to the next wake up time + bool sendhint_mode=false; // Turned on and off every x seconds by the sender (to switch between send hint and send data) + while(uptime>0) { - if(i.is_sender){ - Payload *p=new Payload(); - p->DedicatedMailbox="dedicated"+selfName; - // Add hint informations to the payload - if(i.use_hint && i.HasNext()){ + // ---------- SENDER ---------- + if(is_sender){ + // Send hint if send hint mode is enable + if(i.use_hint && sendhint_mode && i.HasNext()){ + Payload *p=new Payload(); + p->DedicatedMailbox="hintmailbox"+selfName; // Use a dedicated mailbox p->HasHint=true; p->duration=i.GetNextDuration(); p->hint=i.GetNextTS(); - } - try { - // First we send and instantaneous message - // This allow first to detect if their is a receiver - // (to not cause deadlock for the extended mode) and second - // to inform the receiver if he should get a hint first - TRACK_UPTIME(m->put(p,0,uptime)); - simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); - // First send hint if it is required - if(p->HasHint){ - TRACK_UPTIME(SEND(m_ded->put(p,i.hint_size,uptime))); + p->DataSize=i.hint_size; + try { + TRACK_UPTIME(m->put(p,0,FOR(0.3))); // Init connection with a receiver + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + MODE_TX(); + SEND(m_ded->put(p,p->DataSize,uptime)); // Send the actual hint XBT_INFO("%s sent a hint successfully",CNAME); } - if(is_data_rcv_ready()){ - MODE_TX(); - // Then try sending the data - if(i.extended){ - SEND(m_ded->put(p,i.data_size)); + catch(...){ + + } + + } + // Send data if send hint mode is disable + else{ + Payload *p=new Payload(); + p->DedicatedMailbox="datamailbox"+selfName; + p->HasData=true; + p->HasHint=false; + p->DataSize=i.data_size; + // Add hint to the data if possible + if(i.use_hint && i.HasNext()){ + p->HasHint=true; + p->duration=i.GetNextDuration(); + p->hint=i.GetNextTS(); + p->DataSize+=i.hint_size; // Don't forget!! + } + // Send the data + try { + TRACK_UPTIME(m->put(p,0,FOR(1))); + simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + Payload *ack=m_ded->get<Payload>(); + if(!ack->Abort){ + MODE_TX(); + if(i.extended){ + SEND(m_ded->put(p,p->DataSize)); + } + else{ + SEND(m_ded->put(p,p->DataSize,uptime)); + } + XBT_INFO("%s sent data successfully",CNAME); + nSend++; + is_sender=(nSend<(i.n_nodes-1)); // Stop sending if all nodes received + isObserver=!is_sender; // Switch to observer mode if all nodes received the data } - else{ - SEND(m_ded->put(p,i.data_size,uptime)); + else { + simgrid::s4u::this_actor::sleep_for(FOR(1)); } - // If we reach here, data has been sent successfully - XBT_INFO("%s sent data successfully",CNAME); - nSend++; - i.is_sender=(nSend<(i.n_nodes-1)); - isObserver=!i.is_sender; } + catch(...){} } - catch(...){ - XBT_INFO("%s could not send any data",CNAME); - nSendFail++; - } + sendhint_mode=!sendhint_mode; // Switch back and forth between sending hint and data } + // ---------- RECEIVER ---------- else if(!isObserver){ - // Here we try to forward hint for 0.1 sec and try to receive data for 1 secs - double try_for=forward_mode ? 0.1 : 1; - // Then we ensure we do not exceed uptime - try_for=try_for>uptime ? uptime : try_for; - try_for=i.use_hint ? try_for : uptime; // Ensure use should use hint // Forward hint mode - if(i.use_hint && forward_mode){ - if(hint_forward!=NULL && CLOCK < hint_forward->hint){ - FORWARD_HINT(try_for); // Try forward for 5 seconds then switch to received mode + if(forward_mode){ + if(i.use_hint && hint_forward!=NULL && CLOCK < hint_forward->hint){ + try { + FORWARD_HINT(FOR(0.3)); // Try forward for 5 seconds then switch to received mode + } + catch(...){} } } else { // Receiving mode Payload *p; // Received data - Payload *hint; // To Save the received hint - bool hintReceived=false; // In case of error during data rx this will be use to check if we could use the *hint Payload object try { // Get the instantaneous message do { - TRACK_UPTIME(p=m->get<Payload>(try_for)); + TRACK_UPTIME(p=m->get<Payload>(FOR(1))); if(p->HisForward){ if(hint_forward==NULL || (hint_forward !=NULL && p->hint!=hint_forward->hint)){ simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); @@ -227,67 +252,83 @@ static void obs_node(std::vector<std::string> args) { if(CLOCK < p->hint){ i.AddEvent(p->hint, p->duration); hint_forward=new Payload(*p); + hint_added++; } } } } while(p->HisForward); simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); + Payload *ack=new Payload(); + ack->Abort=false; + m_ded->put(ack,0); // Start receiving data MODE_RX(); - if(p->HasHint){ + if(p->HasHint && !p->HasData){ TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); XBT_INFO("%s received a hint successfully",CNAME); - hint=new Payload(*p); // Save hint hint_forward=new Payload(*p); // Enable hint forwarding - hintReceived=true; + if(CLOCK < p->hint){ + i.AddEvent(p->hint, p->duration); + hint_forward=new Payload(*p); + hint_added++; + } + } + else { + if(i.extended){ + p=m_ded->get<Payload>(); // Fetch data until sended + } + else + p=m_ded->get<Payload>(uptime); // Fetch data until sended or uptime expire + // If we reach here, data has been received successfully + XBT_INFO("%s received data successfully",CNAME); + if(p->HasHint){ + XBT_INFO("%s received a hint along with data successfully",CNAME); + hint_forward=new Payload(*p); // Enable hint forwarding + } + nDataRcv++; + isObserver=true; + is_sender=false; } - data_ready[id]=true; // Say to the receiver that he can send - if(i.extended) - p=m_ded->get<Payload>(); // Fetch data until sended - else - p=m_ded->get<Payload>(uptime); // Fetch data until sended or uptime expire - // If we reach here, data has been received successfully - XBT_INFO("%s received data successfully",CNAME); - nDataRcv++; - isObserver=true; - i.is_sender=false; }catch(...){ XBT_INFO("%s could not receive any data",CNAME); nRcvFail++; - if(hintReceived) - i.AddEvent(hint->hint, hint->duration); // Add the hint to the event list } data_ready[id]=false; } forward_mode=!forward_mode; // Toggle mode (go back and forth between receiving and forwarding) } + // ---------- OBSERVER ---------- else { XBT_INFO("%s is observing his environment...",CNAME); MODE_ON(); if(i.use_hint){ - double try_for=forward_mode ? 1 : 1; - try_for=try_for>uptime ? uptime : try_for; - - if(forward_mode && hint_forward!=NULL && CLOCK < hint_forward->hint){ - FORWARD_HINT(try_for); + if((forward_mode|forward_only) && hint_forward!=NULL && CLOCK < hint_forward->hint){ + FORWARD_HINT(FOR(1)); } else { Payload *p; try { do { - TRACK_UPTIME(p=m->get<Payload>(try_for)); - } while(p->HisForward); + TRACK_UPTIME(p=m->get<Payload>(FOR(1))); + } while(p->HisForward); // Ignore forwarded hint simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); - // Start receiving hint + // Start receiving hint from sender MODE_RX(); - if(p->HasHint){ + if(p->HasData){ + Payload *ack=new Payload(); + ack->Abort=true; + m_ded->put(ack,0); + simgrid::s4u::this_actor::sleep_for(FOR(1)); + } + else if(p->HasHint){ TRACK_UPTIME(p=m_ded->get<Payload>(uptime)); XBT_INFO("%s received a hint successfully",CNAME); hint_forward=new Payload(*p); // Enable hint forwarding + forward_only=true; } else { - simgrid::s4u::this_actor::sleep_for(try_for); + simgrid::s4u::this_actor::sleep_for(FOR(1)); } } catch(...){ @@ -309,5 +350,5 @@ static void obs_node(std::vector<std::string> args) { } // Done MODE_OFF() - XBT_INFO("Observation node %s finished [LOG2PARSE](node:%s|isSender:%d|nSend:%d|nWakeUp:%d|nDataRcv:%d|nSendFail:%d|nRcvFail:%d|totalUptime:%f|seed:%d)",CNAME,CNAME,i.is_sender,nSend,nWakeUp,nDataRcv,nSendFail,nRcvFail,totalUptime,i.seed); + XBT_INFO("Observation node %s finished [LOG2PARSE](node:%s|isSender:%d|nSend:%d|nWakeUp:%d|nDataRcv:%d|nSendFail:%d|nRcvFail:%d|totalUptime:%f|seed:%d|hint_added:%d)",CNAME,CNAME,i.is_sender,nSend,nWakeUp,nDataRcv,nSendFail,nRcvFail,totalUptime,i.seed,hint_added); }
\ No newline at end of file |
